#
tokens: 49833/50000 66/161 files (page 1/14)
lines: off (toggle) GitHub
raw markdown copy
This is page 1 of 14. Use http://codebase.md/aws-samples/sample-cfm-tips-mcp?page={x} to view the full context.

# Directory Structure

```
├── .gitignore
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── diagnose_cost_optimization_hub_v2.py
├── LICENSE
├── logging_config.py
├── mcp_runbooks.json
├── mcp_server_with_runbooks.py
├── playbooks
│   ├── __init__.py
│   ├── aws_lambda
│   │   ├── __init__.py
│   │   └── lambda_optimization.py
│   ├── cloudtrail
│   │   ├── __init__.py
│   │   └── cloudtrail_optimization.py
│   ├── cloudtrail_optimization.py
│   ├── cloudwatch
│   │   ├── __init__.py
│   │   ├── aggregation_queries.py
│   │   ├── alarms_and_dashboards_analyzer.py
│   │   ├── analysis_engine.py
│   │   ├── base_analyzer.py
│   │   ├── cloudwatch_optimization_analyzer.py
│   │   ├── cloudwatch_optimization_tool.py
│   │   ├── cloudwatch_optimization.py
│   │   ├── cost_controller.py
│   │   ├── general_spend_analyzer.py
│   │   ├── logs_optimization_analyzer.py
│   │   ├── metrics_optimization_analyzer.py
│   │   ├── optimization_orchestrator.py
│   │   └── result_processor.py
│   ├── comprehensive_optimization.py
│   ├── ebs
│   │   ├── __init__.py
│   │   └── ebs_optimization.py
│   ├── ebs_optimization.py
│   ├── ec2
│   │   ├── __init__.py
│   │   └── ec2_optimization.py
│   ├── ec2_optimization.py
│   ├── lambda_optimization.py
│   ├── rds
│   │   ├── __init__.py
│   │   └── rds_optimization.py
│   ├── rds_optimization.py
│   └── s3
│       ├── __init__.py
│       ├── analyzers
│       │   ├── __init__.py
│       │   ├── api_cost_analyzer.py
│       │   ├── archive_optimization_analyzer.py
│       │   ├── general_spend_analyzer.py
│       │   ├── governance_analyzer.py
│       │   ├── multipart_cleanup_analyzer.py
│       │   └── storage_class_analyzer.py
│       ├── base_analyzer.py
│       ├── s3_aggregation_queries.py
│       ├── s3_analysis_engine.py
│       ├── s3_comprehensive_optimization_tool.py
│       ├── s3_optimization_orchestrator.py
│       └── s3_optimization.py
├── README.md
├── requirements.txt
├── runbook_functions_extended.py
├── runbook_functions.py
├── RUNBOOKS_GUIDE.md
├── services
│   ├── __init__.py
│   ├── cloudwatch_pricing.py
│   ├── cloudwatch_service_vended_log.py
│   ├── cloudwatch_service.py
│   ├── compute_optimizer.py
│   ├── cost_explorer.py
│   ├── optimization_hub.py
│   ├── performance_insights.py
│   ├── pricing.py
│   ├── s3_pricing.py
│   ├── s3_service.py
│   ├── storage_lens_service.py
│   └── trusted_advisor.py
├── setup.py
├── test_runbooks.py
├── tests
│   ├── __init__.py
│   ├── conftest.py
│   ├── integration
│   │   ├── __init__.py
│   │   ├── cloudwatch
│   │   │   └── test_cloudwatch_integration.py
│   │   ├── test_cloudwatch_comprehensive_tool_integration.py
│   │   ├── test_cloudwatch_orchestrator_integration.py
│   │   ├── test_integration_suite.py
│   │   └── test_orchestrator_integration.py
│   ├── legacy
│   │   ├── example_output_with_docs.py
│   │   ├── example_wellarchitected_output.py
│   │   ├── test_aws_session_management.py
│   │   ├── test_cloudwatch_orchestrator_pagination.py
│   │   ├── test_cloudwatch_pagination_integration.py
│   │   ├── test_cloudwatch_performance_optimizations.py
│   │   ├── test_cloudwatch_result_processor.py
│   │   ├── test_cloudwatch_timeout_issue.py
│   │   ├── test_documentation_links.py
│   │   ├── test_metrics_pagination_count.py
│   │   ├── test_orchestrator_integration.py
│   │   ├── test_pricing_cache_fix_moved.py
│   │   ├── test_pricing_cache_fix.py
│   │   ├── test_runbook_integration.py
│   │   ├── test_runbooks.py
│   │   ├── test_setup_verification.py
│   │   └── test_stack_trace_fix.py
│   ├── performance
│   │   ├── __init__.py
│   │   ├── cloudwatch
│   │   │   └── test_cloudwatch_performance.py
│   │   ├── test_cloudwatch_parallel_execution.py
│   │   ├── test_parallel_execution.py
│   │   └── test_performance_suite.py
│   ├── pytest-cloudwatch.ini
│   ├── pytest.ini
│   ├── README.md
│   ├── requirements-test.txt
│   ├── run_cloudwatch_tests.py
│   ├── run_tests.py
│   ├── test_setup_verification.py
│   ├── test_suite_main.py
│   └── unit
│       ├── __init__.py
│       ├── analyzers
│       │   ├── __init__.py
│       │   ├── conftest_cloudwatch.py
│       │   ├── test_alarms_and_dashboards_analyzer.py
│       │   ├── test_base_analyzer.py
│       │   ├── test_cloudwatch_base_analyzer.py
│       │   ├── test_cloudwatch_cost_constraints.py
│       │   ├── test_cloudwatch_general_spend_analyzer.py
│       │   ├── test_general_spend_analyzer.py
│       │   ├── test_logs_optimization_analyzer.py
│       │   └── test_metrics_optimization_analyzer.py
│       ├── cloudwatch
│       │   ├── test_cache_control.py
│       │   ├── test_cloudwatch_api_mocking.py
│       │   ├── test_cloudwatch_metrics_pagination.py
│       │   ├── test_cloudwatch_pagination_architecture.py
│       │   ├── test_cloudwatch_pagination_comprehensive_fixed.py
│       │   ├── test_cloudwatch_pagination_comprehensive.py
│       │   ├── test_cloudwatch_pagination_fixed.py
│       │   ├── test_cloudwatch_pagination_real_format.py
│       │   ├── test_cloudwatch_pagination_simple.py
│       │   ├── test_cloudwatch_query_pagination.py
│       │   ├── test_cloudwatch_unit_suite.py
│       │   ├── test_general_spend_tips_refactor.py
│       │   ├── test_import_error.py
│       │   ├── test_mcp_pagination_bug.py
│       │   └── test_mcp_surface_pagination.py
│       ├── s3
│       │   └── live
│       │       ├── test_bucket_listing.py
│       │       ├── test_s3_governance_bucket_discovery.py
│       │       └── test_top_buckets.py
│       ├── services
│       │   ├── __init__.py
│       │   ├── test_cloudwatch_cost_controller.py
│       │   ├── test_cloudwatch_query_service.py
│       │   ├── test_cloudwatch_service.py
│       │   ├── test_cost_control_routing.py
│       │   └── test_s3_service.py
│       └── test_unit_suite.py
└── utils
    ├── __init__.py
    ├── aws_client_factory.py
    ├── cache_decorator.py
    ├── cleanup_manager.py
    ├── cloudwatch_cache.py
    ├── documentation_links.py
    ├── error_handler.py
    ├── intelligent_cache.py
    ├── logging_config.py
    ├── memory_manager.py
    ├── parallel_executor.py
    ├── performance_monitor.py
    ├── progressive_timeout.py
    ├── service_orchestrator.py
    └── session_manager.py
```

# Files

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
#UNCOMMENT THE BELOW LINE IN PUBLIC GITLAB VERSION TO AVOID SHARING THE KIRO SETTINGS
#.kiro/

# Virtual environments
venv/
env/
.env/
.venv/
ENV/
env.bak/
venv.bak/

#Temporary folders
__pycache__/
.pytest_cache/
.amazonq/
sessions/
tests/.pytest_cache/

#Log folders
logs/

# IDE
.vscode/
.idea/
*.swp
*.swo

# OS
.DS_Store
.DS_Store?
._*
.Spotlight-V100
.Trashes
ehthumbs.db
Thumbs.db

#Template files
mcp_runbooks.json
```

--------------------------------------------------------------------------------
/tests/README.md:
--------------------------------------------------------------------------------

```markdown
# S3 Optimization System - Testing Suite

This directory contains a comprehensive testing suite for the S3 optimization system, designed to ensure reliability, performance, and most importantly, **cost constraint compliance**.

## 🚨 Critical: No-Cost Constraint Testing

The most important aspect of this testing suite is validating that the system **NEVER** performs cost-incurring S3 operations. The `no_cost_validation/` tests are critical for customer billing protection.

## Test Structure

```
tests/
├── conftest.py                    # Shared fixtures and configuration
├── pytest.ini                    # Pytest configuration
├── requirements-test.txt          # Testing dependencies
├── run_tests.py                  # Test runner script
├── README.md                     # This file
├── unit/                         # Unit tests with mocked dependencies
│   ├── analyzers/               # Analyzer unit tests
│   │   ├── test_base_analyzer.py
│   │   └── test_general_spend_analyzer.py
│   └── services/                # Service unit tests
│       └── test_s3_service.py
├── integration/                  # Integration tests
│   └── test_orchestrator_integration.py
├── performance/                  # Performance and load tests
│   └── test_parallel_execution.py
└── no_cost_validation/          # 🚨 CRITICAL: Cost constraint tests
    └── test_cost_constraints.py
```

## Test Categories

### 1. Unit Tests (`unit/`)
- **Purpose**: Test individual components in isolation
- **Scope**: Analyzers, services, utilities
- **Dependencies**: Fully mocked AWS services
- **Speed**: Fast (< 1 second per test)
- **Coverage**: High code coverage with edge cases

**Key Features:**
- Comprehensive mocking of AWS services
- Parameter validation testing
- Error handling verification
- Performance monitoring integration testing

### 2. Integration Tests (`integration/`)
- **Purpose**: Test component interactions and data flow
- **Scope**: Orchestrator + analyzers + services
- **Dependencies**: Mocked AWS APIs with realistic responses
- **Speed**: Medium (1-10 seconds per test)
- **Coverage**: End-to-end workflows

**Key Features:**
- Complete analysis workflow testing
- Service fallback chain validation
- Session management integration
- Error propagation testing

### 3. Performance Tests (`performance/`)
- **Purpose**: Validate performance characteristics and resource usage
- **Scope**: Parallel execution, timeout handling, memory usage
- **Dependencies**: Controlled mock delays and resource simulation
- **Speed**: Slow (10-60 seconds per test)
- **Coverage**: Performance benchmarks and limits

**Key Features:**
- Parallel vs sequential execution comparison
- Timeout handling validation
- Memory usage monitoring
- Concurrent request handling
- Cache effectiveness testing

### 4. No-Cost Constraint Validation (`no_cost_validation/`) 🚨
- **Purpose**: **CRITICAL** - Ensure no cost-incurring operations are performed
- **Scope**: All S3 operations across the entire system
- **Dependencies**: Cost constraint validation framework
- **Speed**: Fast (< 1 second per test)
- **Coverage**: 100% of S3 operations

**Key Features:**
- Forbidden operation detection
- Cost constraint system validation
- Data source cost verification
- End-to-end cost compliance testing
- Bypass attempt prevention

## Running Tests

### Quick Start

```bash
# Check test environment
python tests/run_tests.py --check

# Run all tests
python tests/run_tests.py --all

# Run specific test suites
python tests/run_tests.py --unit
python tests/run_tests.py --integration
python tests/run_tests.py --performance
python tests/run_tests.py --cost-validation  # 🚨 CRITICAL
```

### Using pytest directly

```bash
# Install test dependencies
pip install -r tests/requirements-test.txt

# Run unit tests with coverage
pytest tests/unit/ --cov=core --cov=services --cov-report=html

# Run integration tests
pytest tests/integration/ -v

# Run performance tests
pytest tests/performance/ -m performance

# Run cost validation tests (CRITICAL)
pytest tests/no_cost_validation/ -m no_cost_validation -v
```

### Test Markers

Tests are organized using pytest markers:

- `@pytest.mark.unit` - Unit tests
- `@pytest.mark.integration` - Integration tests  
- `@pytest.mark.performance` - Performance tests
- `@pytest.mark.no_cost_validation` - 🚨 Cost constraint tests
- `@pytest.mark.slow` - Tests that take longer to run
- `@pytest.mark.aws` - Tests requiring real AWS credentials (skipped by default)

## Test Configuration

### Environment Variables

```bash
# AWS credentials for testing (use test account only)
export AWS_ACCESS_KEY_ID=testing
export AWS_SECRET_ACCESS_KEY=testing
export AWS_DEFAULT_REGION=us-east-1

# Test configuration
export PYTEST_TIMEOUT=300
export PYTEST_WORKERS=auto
```

### Coverage Requirements

- **Minimum Coverage**: 80%
- **Target Coverage**: 90%+
- **Critical Paths**: 100% (cost constraint validation)

### Performance Benchmarks

- **Unit Tests**: < 1 second each
- **Integration Tests**: < 10 seconds each
- **Performance Tests**: < 60 seconds each
- **Full Suite**: < 5 minutes

## Key Testing Patterns

### 1. AWS Service Mocking

```python
@pytest.fixture
def mock_s3_service():
    service = Mock()
    service.list_buckets = AsyncMock(return_value={
        "status": "success",
        "data": {"Buckets": [...]}
    })
    return service
```

### 2. Cost Constraint Validation

```python
def test_no_forbidden_operations(cost_constraint_validator):
    # Test code that should not call forbidden operations
    analyzer.analyze()
    
    summary = cost_constraint_validator.get_operation_summary()
    assert len(summary["forbidden_called"]) == 0
```

### 3. Performance Testing

```python
@pytest.mark.performance
async def test_parallel_execution_performance(performance_tracker):
    performance_tracker.start_timer("test")
    await run_parallel_analysis()
    execution_time = performance_tracker.end_timer("test")
    
    performance_tracker.assert_performance("test", max_time=30.0)
```

### 4. Error Handling Testing

```python
async def test_service_failure_handling():
    with patch('service.api_call', side_effect=Exception("API Error")):
        result = await analyzer.analyze()
    
    assert result["status"] == "error"
    assert "API Error" in result["message"]
```

## Critical Test Requirements

### 🚨 Cost Constraint Tests MUST Pass

The no-cost constraint validation tests are **mandatory** and **must pass** before any deployment:

1. **Forbidden Operation Detection**: Verify all cost-incurring S3 operations are blocked
2. **Data Source Validation**: Confirm all data sources are genuinely no-cost
3. **End-to-End Compliance**: Validate entire system respects cost constraints
4. **Bypass Prevention**: Ensure cost constraints cannot be circumvented

### Test Data Management

- **No Real AWS Resources**: All tests use mocked AWS services
- **Deterministic Data**: Test data is predictable and repeatable
- **Edge Cases**: Include boundary conditions and error scenarios
- **Realistic Scenarios**: Mock data reflects real AWS API responses

## Continuous Integration

### Pre-commit Hooks

```bash
# Install pre-commit hooks
pip install pre-commit
pre-commit install

# Run manually
pre-commit run --all-files
```

### CI Pipeline Requirements

1. **All test suites must pass**
2. **Coverage threshold must be met**
3. **No-cost constraint tests are mandatory**
4. **Performance benchmarks must be within limits**
5. **No security vulnerabilities in dependencies**

## Troubleshooting

### Common Issues

1. **Import Errors**
   ```bash
   # Ensure PYTHONPATH includes project root
   export PYTHONPATH="${PYTHONPATH}:$(pwd)"
   ```

2. **AWS Credential Errors**
   ```bash
   # Use test credentials
   export AWS_ACCESS_KEY_ID=testing
   export AWS_SECRET_ACCESS_KEY=testing
   ```

3. **Timeout Issues**
   ```bash
   # Increase timeout for slow tests
   pytest --timeout=600
   ```

4. **Memory Issues**
   ```bash
   # Run tests with memory profiling
   pytest --memprof
   ```

### Debug Mode

```bash
# Run with debug output
pytest -v --tb=long --log-cli-level=DEBUG

# Run single test with debugging
pytest tests/unit/test_specific.py::TestClass::test_method -v -s
```

## Contributing to Tests

### Adding New Tests

1. **Choose appropriate test category** (unit/integration/performance/cost-validation)
2. **Follow naming conventions** (`test_*.py`, `Test*` classes, `test_*` methods)
3. **Use appropriate fixtures** from `conftest.py`
4. **Add proper markers** (`@pytest.mark.unit`, etc.)
5. **Include docstrings** explaining test purpose
6. **Validate cost constraints** if testing S3 operations

### Test Quality Guidelines

- **One assertion per test** (when possible)
- **Clear test names** that describe what is being tested
- **Arrange-Act-Assert** pattern
- **Mock external dependencies** completely
- **Test both success and failure paths**
- **Include edge cases and boundary conditions**

## Security Considerations

- **No real AWS credentials** in test code
- **No sensitive data** in test fixtures
- **Secure mock data** that doesn't expose patterns
- **Cost constraint validation** is mandatory
- **Regular dependency updates** for security patches

## Reporting and Metrics

### Coverage Reports

```bash
# Generate HTML coverage report
pytest --cov=core --cov=services --cov-report=html

# View report
open htmlcov/index.html
```

### Performance Reports

```bash
# Generate performance benchmark report
pytest tests/performance/ --benchmark-only --benchmark-json=benchmark.json
```

### Test Reports

```bash
# Generate comprehensive test report
python tests/run_tests.py --report

# View reports
open test_report.html
```

---

## 🚨 Remember: Cost Constraint Compliance is Critical

The primary purpose of this testing suite is to ensure that the S3 optimization system **never incurs costs** for customers. The no-cost constraint validation tests are the most important tests in this suite and must always pass.

**Before any deployment or release:**
1. Run `python tests/run_tests.py --cost-validation`
2. Verify all cost constraint tests pass
3. Review any new S3 operations for cost implications
4. Update forbidden operations list if needed

**Customer billing protection is our top priority.**
```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
# CFM Tips - Cost Optimization MCP Server

A comprehensive Model Context Protocol (MCP) server for AWS cost analysis and optimization recommendations, designed to work seamlessly with Amazon Q CLI and other MCP-compatible clients.

## ✅ Features

### Core AWS Services Integration
- **Cost Explorer** - Retrieve cost data and usage metrics
- **Cost Optimization Hub** - Get AWS cost optimization recommendations
- **Compute Optimizer** - Right-sizing recommendations for compute resources
- **Trusted Advisor** - Cost optimization checks and recommendations
- **Performance Insights** - RDS performance metrics and analysis

### Cost Optimization Playbooks
- 🔧 **EC2 Right Sizing** - Identify underutilized EC2 instances
- 💾 **EBS Optimization** - Find unused and underutilized volumes
- 🗄️ **RDS Optimization** - Identify idle and underutilized databases
- ⚡ **Lambda Optimization** - Find overprovisioned and unused functions
- 🪣 **S3 Optimization** - Comprehensive S3 cost analysis and storage class optimization
- 📊 **CloudWatch Optimization** - Analyze logs, metrics, alarms, and dashboards for cost efficiency
- 📋 **CloudTrail Optimization** - Analyze and optimize CloudTrail configurations
- 📊 **Comprehensive Analysis** - Multi-service cost analysis

### Advanced Features
- **Real CloudWatch Metrics** - Uses actual AWS metrics for analysis
- **Multiple Output Formats** - JSON and Markdown report generation
- **Cost Calculations** - Estimated savings and cost breakdowns
- **Actionable Recommendations** - Priority-based optimization suggestions

## 📁 Project Structure

```
sample-cfm-tips-mcp/
├── playbooks/                            # CFM Tips optimization playbooks engine
│   ├── s3_optimization.py               # S3 cost optimization playbook
│   ├── ec2_optimization.py              # EC2 right-sizing playbook
│   ├── ebs_optimization.py              # EBS volume optimization playbook
│   ├── rds_optimization.py              # RDS database optimization playbook
│   ├── lambda_optimization.py           # Lambda function optimization playbook
│   ├── cloudwatch_optimization.py       # CloudWatch optimization playbook
│   └── cloudtrail_optimization.py       # CloudTrail optimization playbook
├── services/                             # AWS Services as datasources for the cost optimization
│   ├── s3_service.py                    # S3 API interactions and metrics
│   ├── s3_pricing.py                    # S3 pricing calculations and cost modeling
│   ├── cost_explorer.py                 # Cost Explorer API integration
│   ├── compute_optimizer.py             # Compute Optimizer API integration
│   └── optimization_hub.py              # Cost Optimization Hub integration
├── mcp_server_with_runbooks.py           # Main MCP server
├── mcp_runbooks.json                     # Template file for MCP configuration file
├── requirements.txt                      # Python dependencies
├── test/                                 # Integration tests
├── diagnose_cost_optimization_hub_v2.py  # Diagnostic utilities
├── RUNBOOKS_GUIDE.md                     # Detailed usage guide
└── README.md                             # Project ReadMe
```

## 🔐 Security and Permissions - Least Privileges

The MCP tools require specific AWS permissions to function. 
-  **Create a read-only IAM role** - Restricts LLM agents from modifying AWS resources. This prevents unintended create, update, or delete actions. 
-  **Enable CloudTrail** - Tracks API activity across your AWS account for security monitoring. 
-  **Follow least-privilege principles** - Grant only essential read permissions (Describe*, List*, Get*) for required services.

The below creates an IAM policy with for list, read and describe actions only:

```json
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "cost-optimization-hub:ListEnrollmentStatuses",
        "cost-optimization-hub:ListRecommendations",
        "cost-optimization-hub:GetRecommendation", 
        "cost-optimization-hub:ListRecommendationSummaries",
        "ce:GetCostAndUsage",
        "ce:GetCostForecast",
        "compute-optimizer:GetEC2InstanceRecommendations",
        "compute-optimizer:GetEBSVolumeRecommendations",
        "compute-optimizer:GetLambdaFunctionRecommendations",
        "ec2:DescribeInstances",
        "ec2:DescribeVolumes",
        "rds:DescribeDBInstances",
        "lambda:ListFunctions",
        "cloudwatch:GetMetricStatistics",
        "s3:ListBucket",
        "s3:ListObjectsV2",
        "s3:GetBucketLocation",
        "s3:GetBucketVersioning",
        "s3:GetBucketLifecycleConfiguration",
        "s3:GetBucketNotification",
        "s3:GetBucketTagging",
        "s3:ListMultipartUploads",
        "s3:GetStorageLensConfiguration",
        "support:DescribeTrustedAdvisorChecks",
        "support:DescribeTrustedAdvisorCheckResult",
        "pi:GetResourceMetrics",
        "cloudtrail:DescribeTrails",
        "cloudtrail:GetTrailStatus",
        "cloudtrail:GetEventSelectors",
        "pricing:GetProducts",
        "pricing:DescribeServices",
        "pricing:GetAttributeValues"
      ],
      "Resource": "*"
    }
  ]
}
```

## 🛠️ Installation

### Prerequisites
- **Python 3.11** or higher
- AWS CLI configured with appropriate credentials
- Amazon Kiro CLI (for MCP integration) - https://docs.aws.amazon.com/amazonq/latest/qdeveloper-ug/command-line-installing.html

### Setup Steps

1. **Clone the Repository**
   ```bash
   git clone https://github.com/aws-samples/sample-cfm-tips-mcp.git
   cd sample-cfm-tips-mcp
   ```

2. **Install Dependencies**
   ```bash
   pip install -r requirements.txt
   ```

3. **Configure AWS Credentials**
   ```bash
   aws configure
   # Or set environment variables:
   # export AWS_ACCESS_KEY_ID=your_access_key
   # export AWS_SECRET_ACCESS_KEY=your_secret_key
   # export AWS_DEFAULT_REGION=us-east-1
   ```

4. **Apply IAM Permissions**
   - Create an IAM policy with the permissions listed above
   - Attach the policy to your IAM user or role

5. **Install the MCP Configurations**
   ```bash
   python3 setup.py
   ```

6. **Usage Option 1: Using the Kiro CLI Chat**
   ```bash
   kiro-cli
   Show me cost optimization recommendations
   ```

7. **Usage Option 2: Integrate with Amazon Q Developer Plugin or Kiro**
   - Open Amazon Q Developer Plugin on your IDE
   - Click on Chat -> 🛠️ Configure MCP Servers -> ➕ Add new MCP
   - Use the following configuration
   ```bash
   - Scope: Global
   - Name: cfm-tips
   - Transport: stdio
   - Command: python3
   - Arguments: <replace-with-path-to-folder>/mcp_server_with_runbooks.py
   - Timeout: 60
   ```

## 🔧 Available Tools

### Cost Analysis Tools
- `get_cost_explorer_data` - Retrieve AWS cost and usage data
- `list_coh_enrollment` - Check Cost Optimization Hub enrollment
- `get_coh_recommendations` - Get cost optimization recommendations
- `get_coh_summaries` - Get recommendation summaries
- `get_compute_optimizer_recommendations` - Get compute optimization recommendations

### EC2 Optimization
- `ec2_rightsizing` - Analyze EC2 instances for right-sizing opportunities
- `ec2_report` - Generate detailed EC2 optimization reports

### EBS Optimization
- `ebs_optimization` - Analyze EBS volumes for optimization
- `ebs_unused` - Identify unused EBS volumes
- `ebs_report` - Generate EBS optimization reports

### RDS Optimization
- `rds_optimization` - Analyze RDS instances for optimization
- `rds_idle` - Identify idle RDS instances
- `rds_report` - Generate RDS optimization reports

### Lambda Optimization
- `lambda_optimization` - Analyze Lambda functions for optimization
- `lambda_unused` - Identify unused Lambda functions
- `lambda_report` - Generate Lambda optimization reports

### S3 Optimization
- `s3_general_spend_analysis` - Analyze overall S3 spending patterns and usage
- `s3_storage_class_selection` - Get guidance on choosing cost-effective storage classes
- `s3_storage_class_validation` - Validate existing data storage class appropriateness
- `s3_archive_optimization` - Identify and optimize long-term archive data storage
- `s3_api_cost_minimization` - Minimize S3 API request charges through optimization
- `s3_multipart_cleanup` - Identify and clean up incomplete multipart uploads
- `s3_governance_check` - Implement S3 cost controls and governance compliance
- `s3_comprehensive_analysis` - Run comprehensive S3 cost optimization analysis

### CloudWatch Optimization
- `cloudwatch_general_spend_analysis` - Analyze CloudWatch cost breakdown across logs, metrics, alarms, and dashboards
- `cloudwatch_metrics_optimization` - Identify custom metrics cost optimization opportunities
- `cloudwatch_logs_optimization` - Analyze log retention and ingestion cost optimization
- `cloudwatch_alarms_and_dashboards_optimization` - Identify monitoring efficiency improvements
- `cloudwatch_comprehensive_optimization_tool` - Run comprehensive CloudWatch optimization with intelligent orchestration
- `get_cloudwatch_cost_estimate` - Get detailed cost estimate for CloudWatch optimization analysis

### CloudTrail Optimization
- `get_management_trails` - Get CloudTrail management trails
- `run_cloudtrail_trails_analysis` - Run CloudTrail trails analysis for optimization
- `generate_cloudtrail_report` - Generate CloudTrail optimization reports

### Comprehensive Analysis
- `comprehensive_analysis` - Multi-service cost analysis

### Additional Tools
- `get_trusted_advisor_checks` - Get Trusted Advisor recommendations
- `get_performance_insights_metrics` - Get RDS Performance Insights data

## 📊 Example Usage

### Basic Cost Analysis
```
"Get my AWS costs for the last month"
"Show me cost optimization recommendations"
"What are my biggest cost drivers?"
```

### Resource Optimization
```
"Find underutilized EC2 instances in us-east-1"
"Show me unused EBS volumes that I can delete"
"Identify idle RDS databases"
"Find unused Lambda functions"
"Analyze my S3 storage costs and recommend optimizations"
"Find incomplete multipart uploads in my S3 buckets"
"Recommend the best S3 storage class for my data"
"Analyze my CloudWatch logs and metrics for cost optimization"
"Show me CloudWatch alarms that can be optimized"
```

### Report Generation
```
"Generate a comprehensive cost optimization report"
"Create an EC2 right-sizing report in markdown format"
"Generate an EBS optimization report with cost savings"
```

### Multi-Service Analysis
```
"Run comprehensive cost analysis for all services in us-east-1"
"Analyze my AWS infrastructure for cost optimization opportunities"
"Show me immediate cost savings opportunities"
"Generate a comprehensive S3 optimization report"
"Analyze my S3 spending patterns and storage class efficiency"
```

## 🔍 Troubleshooting

### Common Issues

1. **Cost Optimization Hub Not Working**
   ```bash
   python3 diagnose_cost_optimization_hub_v2.py
   ```

2. **No Metrics Found**
   - Ensure resources have been running for at least 14 days
   - Verify CloudWatch metrics are enabled
   - Check that you're analyzing the correct region

3. **Permission Errors**
   - Verify IAM permissions are correctly applied
   - Check AWS credentials configuration
   - Ensure Cost Optimization Hub is enabled in AWS Console

4. **Import Errors**
   ```bash
   # Check Python path and dependencies
   python3 -c "import boto3, mcp; print('Dependencies OK')"
   ```

### Getting Help

- Check the [RUNBOOKS_GUIDE.md](RUNBOOKS_GUIDE.md) for detailed usage instructions
- Run the diagnostic script: `python3 diagnose_cost_optimization_hub_v2.py`
- Run integration tests: `python3 test_runbooks.py`

## 🧩 Add-on MCPs
Add-on AWS Pricing MCP Server MCP server for accessing real-time AWS pricing information and providing cost analysis capabilities
https://github.com/awslabs/mcp/tree/main/src/aws-pricing-mcp-server

```bash
# Example usage with Add-on AWS Pricing MCP Server:
"Review the CDK by comparing it to the actual spend from my AWS account's stackset. Suggest cost optimization opportunities for the app accordingly"
```

## 🪣 S3 Optimization Features

The S3 optimization module provides comprehensive cost analysis and optimization recommendations:

### Storage Class Optimization
- **Intelligent Storage Class Selection** - Get recommendations for the most cost-effective storage class based on access patterns
- **Storage Class Validation** - Analyze existing data to ensure optimal storage class usage
- **Cost Breakeven Analysis** - Calculate when to transition between storage classes
- **Archive Optimization** - Identify long-term data suitable for Glacier or Deep Archive

### Cost Analysis & Monitoring
- **General Spend Analysis** - Comprehensive S3 spending pattern analysis over 12 months
- **Bucket-Level Cost Ranking** - Identify highest-cost buckets and optimization opportunities
- **Usage Type Breakdown** - Analyze costs by storage, requests, and data transfer
- **Regional Cost Distribution** - Understand spending across AWS regions

### Operational Optimization
- **Multipart Upload Cleanup** - Identify and eliminate incomplete multipart uploads
- **API Cost Minimization** - Optimize request patterns to reduce API charges
- **Governance Compliance** - Implement cost controls and policy compliance checking
- **Lifecycle Policy Recommendations** - Automated suggestions for lifecycle transitions

### Advanced Analytics
- **Real-Time Pricing Integration** - Uses AWS Price List API for accurate cost calculations
- **Trend Analysis** - Identify spending growth patterns and anomalies
- **Efficiency Metrics** - Calculate cost per GB and storage efficiency ratios
- **Comprehensive Reporting** - Generate detailed optimization reports in JSON or Markdown

## 🎯 Key Benefits

- **Immediate Cost Savings** - Identify unused resources for deletion
- **Right-Sizing Opportunities** - Optimize overprovisioned resources
- **Real Metrics Analysis** - Uses actual CloudWatch data
- **Actionable Reports** - Clear recommendations with cost estimates
- **Comprehensive Coverage** - Analyze EC2, EBS, RDS, Lambda, S3, and more
- **Easy Integration** - Works seamlessly with Amazon Q CLI

## 📈 Expected Results

The CFM Tips cost optimization server can help you:

- **Identify cost savings** on average across all AWS services
- **Find unused resources** costing hundreds of dollars monthly
- **Right-size overprovisioned instances** for optimal performance/cost ratio
- **Optimize storage costs** through volume type and storage class recommendations
- **Eliminate idle resources** that provide no business value
- **Reduce S3 costs by 30-60%** through intelligent storage class transitions
- **Clean up storage waste** from incomplete multipart uploads and orphaned data
- **Optimize API request patterns** to minimize S3 request charges
- **Reduce CloudWatch costs** through log retention and metrics optimization
- **Eliminate unused alarms and dashboards** reducing monitoring overhead

## 🤝 Contributing

We welcome contributions! Please see our contributing guidelines:

1. Fork the repository
2. Create a feature branch
3. Make your changes
4. Add tests for new functionality
5. Submit a pull request

## 📄 License

This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.

---

```

--------------------------------------------------------------------------------
/CODE_OF_CONDUCT.md:
--------------------------------------------------------------------------------

```markdown
## Code of Conduct
This project has adopted the [Amazon Open Source Code of Conduct](https://aws.github.io/code-of-conduct).
For more information see the [Code of Conduct FAQ](https://aws.github.io/code-of-conduct-faq) or contact
[email protected] with any additional questions or comments.

```

--------------------------------------------------------------------------------
/CONTRIBUTING.md:
--------------------------------------------------------------------------------

```markdown
# Contributing to CFM Tips AWS Cost Optimization MCP Server

We welcome contributions to the CFM Tips AWS Cost Optimization MCP Server! This document provides guidelines for contributing to the project.

## 🤝 How to Contribute

### Reporting Issues

1. **Search existing issues** first to avoid duplicates
2. **Use the issue template** when creating new issues
3. **Provide detailed information** including:
   - AWS region and services affected
   - Error messages and logs
   - Steps to reproduce
   - Expected vs actual behavior

### Suggesting Features

1. **Check existing feature requests** to avoid duplicates
2. **Describe the use case** and business value
3. **Provide implementation ideas** if you have them
4. **Consider backward compatibility**

### Code Contributions

#### Prerequisites

- Python 3.11 or higher
- AWS CLI configured with test credentials
- Familiarity with MCP (Model Context Protocol)
- Understanding of AWS cost optimization concepts

#### Development Setup

1. **Fork the repository**
   ```bash
   git clone https://github.com/aws-samples/sample-cfm-tips-mcp.git
   cd sample-cfm-tips-mcp
   ```

2. **Create a virtual environment**
   ```bash
   python3 -m venv venv
   source venv/bin/activate  # On Windows: venv\Scripts\activate
   ```

3. **Install dependencies**
   ```bash
   pip install -r requirements.txt
   pip install -r requirements_dev.txt  # If available
   ```

4. **Run tests**
   ```bash
   python3 test_runbooks.py
   ```

#### Making Changes

1. **Create a feature branch**
   ```bash
   git checkout -b feature/your-feature-name
   ```

2. **Make your changes**
   - Follow existing code style and patterns
   - Add docstrings to new functions
   - Include error handling
   - Update documentation as needed

3. **Test your changes**
   ```bash
   # Run integration tests
   python3 test_runbooks.py
   
   # Test specific functionality
   python3 -c "from runbook_functions import your_function; print('OK')"
   
   # Test with Amazon Q (if possible)
   q chat
   ```

4. **Update documentation**
   - Update README.md if needed
   - Update RUNBOOKS_GUIDE.md for new features
   - Add examples for new tools

#### Code Style Guidelines

- **Follow PEP 8** Python style guidelines
- **Use descriptive variable names**
- **Add type hints** where appropriate
- **Include docstrings** for all functions
- **Handle errors gracefully** with informative messages
- **Use async/await** for MCP tool functions

#### Example Code Structure

```python
async def your_new_tool(arguments: Dict[str, Any]) -> List[TextContent]:
    """
    Brief description of what the tool does.
    
    Args:
        arguments: Dictionary containing tool parameters
        
    Returns:
        List of TextContent with results
    """
    try:
        # Validate inputs
        region = arguments.get("region")
        if not region:
            return [TextContent(type="text", text="Error: region parameter is required")]
        
        # Create AWS client
        client = boto3.client('service-name', region_name=region)
        
        # Make API calls
        response = client.some_api_call()
        
        # Process results
        result = {
            "status": "success",
            "data": response,
            "message": "Operation completed successfully"
        }
        
        return [TextContent(type="text", text=json.dumps(result, indent=2, default=str))]
        
    except ClientError as e:
        error_msg = f"AWS API Error: {e.response['Error']['Code']} - {e.response['Error']['Message']}"
        return [TextContent(type="text", text=f"Error: {error_msg}")]
    except Exception as e:
        return [TextContent(type="text", text=f"Error: {str(e)}")]
```

#### Adding New Tools

1. **Add tool definition** to `list_tools()` in `mcp_server_with_runbooks.py`
2. **Add tool handler** to `call_tool()` function
3. **Implement tool function** in `runbook_functions.py`
4. **Add tests** for the new functionality
5. **Update documentation**

#### Submitting Changes

1. **Commit your changes**
   ```bash
   git add .
   git commit -m "feat: add new cost optimization tool for XYZ"
   ```

2. **Push to your fork**
   ```bash
   git push origin feature/your-feature-name
   ```

3. **Create a Pull Request**
   - Use a descriptive title
   - Explain what the PR does and why
   - Reference any related issues
   - Include testing instructions

## 📋 Pull Request Guidelines

### PR Title Format
- `feat:` for new features
- `fix:` for bug fixes
- `docs:` for documentation changes
- `refactor:` for code refactoring
- `test:` for test additions/changes

### PR Description Should Include
- **What** the PR does
- **Why** the change is needed
- **How** to test the changes
- **Screenshots** if UI changes are involved
- **Breaking changes** if any

### Review Process
1. **Automated checks** must pass
2. **Code review** by maintainers
3. **Testing** in different environments
4. **Documentation** review if applicable

## 🧪 Testing Guidelines

### Integration Tests
- All existing tests must pass
- Add tests for new functionality
- Test with real AWS resources when possible
- Include error case testing

### Manual Testing
- Test with Amazon Q CLI
- Verify tool responses are properly formatted
- Check error handling with invalid inputs
- Test in different AWS regions

## 📚 Documentation Standards

### Code Documentation
- **Docstrings** for all public functions
- **Inline comments** for complex logic
- **Type hints** for function parameters and returns

### User Documentation
- **Clear examples** for new tools
- **Parameter descriptions** with types and defaults
- **Error scenarios** and troubleshooting tips
- **Use cases** and expected outcomes

## 🏷️ Release Process

1. **Version bumping** follows semantic versioning
2. **Changelog** is updated with new features and fixes
3. **Documentation** is updated for new releases
4. **Testing** is performed across different environments

## 🆘 Getting Help

- **GitHub Issues** for bugs and feature requests
- **GitHub Discussions** for questions and ideas
- **Documentation** for usage guidelines
- **Code comments** for implementation details

## 📜 Code of Conduct

- Be respectful and inclusive
- Focus on constructive feedback
- Help others learn and grow
- Follow the project's coding standards

## 🙏 Recognition

Contributors will be recognized in:
- README.md contributors section
- Release notes for significant contributions
- GitHub contributor graphs

Thank you for contributing to CFM Tips AWS Cost Optimization MCP Server!

```

--------------------------------------------------------------------------------
/tests/legacy/test_metrics_pagination_count.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/tests/legacy/test_pricing_cache_fix_moved.py:
--------------------------------------------------------------------------------

```python

```

--------------------------------------------------------------------------------
/tests/unit/__init__.py:
--------------------------------------------------------------------------------

```python
# Unit tests package
```

--------------------------------------------------------------------------------
/tests/integration/__init__.py:
--------------------------------------------------------------------------------

```python
# Integration tests package
```

--------------------------------------------------------------------------------
/tests/performance/__init__.py:
--------------------------------------------------------------------------------

```python
# Performance tests package
```

--------------------------------------------------------------------------------
/tests/unit/services/__init__.py:
--------------------------------------------------------------------------------

```python
# Service unit tests package
```

--------------------------------------------------------------------------------
/tests/unit/analyzers/__init__.py:
--------------------------------------------------------------------------------

```python
# Analyzer unit tests package
```

--------------------------------------------------------------------------------
/tests/__init__.py:
--------------------------------------------------------------------------------

```python
# Test package for S3 Optimization System
```

--------------------------------------------------------------------------------
/playbooks/__init__.py:
--------------------------------------------------------------------------------

```python
# AWS Cost Optimization Playbooks package

```

--------------------------------------------------------------------------------
/requirements.txt:
--------------------------------------------------------------------------------

```
mcp>=1.0.0
boto3>=1.28.0
botocore>=1.31.0
psutil>=5.8.0

```

--------------------------------------------------------------------------------
/services/__init__.py:
--------------------------------------------------------------------------------

```python
# AWS Cost Analysis MCP Server services package

from .storage_lens_service import StorageLensService

__all__ = ['StorageLensService']

```

--------------------------------------------------------------------------------
/playbooks/cloudtrail/__init__.py:
--------------------------------------------------------------------------------

```python
"""
CloudTrail optimization playbooks

This package contains CloudTrail-specific cost optimization playbooks including
trail analysis and logging cost optimization.
"""
```

--------------------------------------------------------------------------------
/playbooks/aws_lambda/__init__.py:
--------------------------------------------------------------------------------

```python
"""
Lambda optimization playbooks

This package contains Lambda-specific cost optimization playbooks including
memory optimization and unused function identification.
"""
```

--------------------------------------------------------------------------------
/playbooks/rds/__init__.py:
--------------------------------------------------------------------------------

```python
"""
RDS optimization playbooks

This package contains RDS-specific cost optimization playbooks including
database right-sizing and performance optimization recommendations.
"""
```

--------------------------------------------------------------------------------
/playbooks/ebs/__init__.py:
--------------------------------------------------------------------------------

```python
"""
EBS optimization playbooks

This package contains EBS-specific cost optimization playbooks including
volume utilization analysis and storage optimization recommendations.
"""
```

--------------------------------------------------------------------------------
/playbooks/ec2/__init__.py:
--------------------------------------------------------------------------------

```python
"""
EC2 optimization playbooks

This package contains EC2-specific cost optimization playbooks including
right-sizing, instance type recommendations, and utilization analysis.
"""
```

--------------------------------------------------------------------------------
/utils/__init__.py:
--------------------------------------------------------------------------------

```python
"""
Utilities package for CFM Tips MCP Server

This package contains utility modules for logging, performance monitoring,
session management, and other cross-cutting concerns.
"""
```

--------------------------------------------------------------------------------
/playbooks/s3/__init__.py:
--------------------------------------------------------------------------------

```python
"""
S3 optimization playbooks

This package contains S3-specific cost optimization playbooks including
storage class optimization, lifecycle policies, and unused resource cleanup.
"""
```

--------------------------------------------------------------------------------
/tests/requirements-test.txt:
--------------------------------------------------------------------------------

```
# Testing dependencies for S3 optimization system

# Core testing framework
pytest>=7.0.0
pytest-asyncio>=0.21.0
pytest-cov>=4.0.0
pytest-mock>=3.10.0
pytest-timeout>=2.1.0

# AWS mocking
moto[s3,cloudwatch,ce,s3control]>=4.2.0
boto3>=1.28.0
botocore>=1.31.0

# Performance testing
pytest-benchmark>=4.0.0
memory-profiler>=0.60.0

# Parallel testing (optional)
pytest-xdist>=3.0.0

# Test reporting
pytest-html>=3.1.0
pytest-json-report>=1.5.0

# Code quality
flake8>=5.0.0
black>=22.0.0
isort>=5.10.0

# Type checking
mypy>=1.0.0
types-boto3>=1.0.0

# Additional utilities
freezegun>=1.2.0  # For time-based testing
responses>=0.23.0  # For HTTP mocking
factory-boy>=3.2.0  # For test data generation
```

--------------------------------------------------------------------------------
/playbooks/s3/analyzers/__init__.py:
--------------------------------------------------------------------------------

```python
"""
S3 Optimization Analyzers Package

This package contains all S3 optimization analyzers that extend BaseAnalyzer.
Each analyzer focuses on a specific aspect of S3 cost optimization.
"""

from .general_spend_analyzer import GeneralSpendAnalyzer
from .storage_class_analyzer import StorageClassAnalyzer
from .archive_optimization_analyzer import ArchiveOptimizationAnalyzer
from .api_cost_analyzer import ApiCostAnalyzer
from .multipart_cleanup_analyzer import MultipartCleanupAnalyzer
from .governance_analyzer import GovernanceAnalyzer

__all__ = [
    'GeneralSpendAnalyzer',
    'StorageClassAnalyzer', 
    'ArchiveOptimizationAnalyzer',
    'ApiCostAnalyzer',
    'MultipartCleanupAnalyzer',
    'GovernanceAnalyzer'
]
```

--------------------------------------------------------------------------------
/mcp_runbooks.json:
--------------------------------------------------------------------------------

```json
{
  "mcpServers": {
    "cfm-tips": {
      "command": "python3",
      "args": [
        "<replace-with-project-folder-path>/mcp_server_with_runbooks.py"
      ],
      "env": {
        "AWS_DEFAULT_REGION": "us-east-1",
        "AWS_PROFILE": "<replace-with-your-aws-profile>",
        "PYTHONPATH": "<replace-with-project-folder-path>"
      },
      "disabled": false,
      "autoApprove": []
    },
    "awslabs.aws-pricing-mcp-server": {
      "command": "uvx",
      "args": [
        "awslabs.aws-pricing-mcp-server@latest"
      ],
      "env": {
        "FASTMCP_LOG_LEVEL": "ERROR",
        "AWS_PROFILE": "<replace-with-your-aws-profile>",
        "AWS_REGION": "us-east-1"
      },
      "disabled": false,
      "autoApprove": []
    }
  }
}
```

--------------------------------------------------------------------------------
/playbooks/cloudwatch/__init__.py:
--------------------------------------------------------------------------------

```python
"""
CloudWatch Optimization Playbook for CFM Tips MCP Server

Provides comprehensive CloudWatch cost analysis and optimization recommendations.
"""

from .optimization_orchestrator import CloudWatchOptimizationOrchestrator
from .base_analyzer import BaseAnalyzer
from .cloudwatch_optimization import (
    run_cloudwatch_general_spend_analysis_mcp,
    run_cloudwatch_metrics_optimization_mcp,
    run_cloudwatch_logs_optimization_mcp,
    run_cloudwatch_alarms_and_dashboards_optimization_mcp,
    run_cloudwatch_comprehensive_optimization_tool_mcp,
    query_cloudwatch_analysis_results_mcp,
    validate_cloudwatch_cost_preferences_mcp,
    get_cloudwatch_cost_estimate_mcp
)

__all__ = [
    'CloudWatchOptimizationOrchestrator',
    'BaseAnalyzer',
    'run_cloudwatch_general_spend_analysis_mcp',
    'run_cloudwatch_metrics_optimization_mcp',
    'run_cloudwatch_logs_optimization_mcp',
    'run_cloudwatch_alarms_and_dashboards_optimization_mcp',
    'run_cloudwatch_comprehensive_optimization_tool_mcp',
    'query_cloudwatch_analysis_results_mcp',
    'validate_cloudwatch_cost_preferences_mcp',
    'get_cloudwatch_cost_estimate_mcp'
]
```

--------------------------------------------------------------------------------
/tests/unit/s3/live/test_bucket_listing.py:
--------------------------------------------------------------------------------

```python
"""
Simple test to verify S3 bucket listing works.
"""

import asyncio
import boto3


async def test_list_buckets():
    """Test basic S3 bucket listing."""
    
    s3_client = boto3.client('s3')
    
    try:
        response = s3_client.list_buckets()
        buckets = response.get('Buckets', [])
        
        print(f"\n=== Found {len(buckets)} S3 Buckets ===")
        
        for bucket in buckets[:10]:  # Show first 10
            bucket_name = bucket['Name']
            creation_date = bucket['CreationDate']
            
            # Try to get bucket location
            try:
                location_response = s3_client.get_bucket_location(Bucket=bucket_name)
                region = location_response.get('LocationConstraint') or 'us-east-1'
            except Exception as e:
                region = f"Error: {str(e)}"
            
            print(f"\nBucket: {bucket_name}")
            print(f"  Region: {region}")
            print(f"  Created: {creation_date}")
        
        return len(buckets)
        
    except Exception as e:
        print(f"\nError listing buckets: {str(e)}")
        return 0


if __name__ == "__main__":
    count = asyncio.run(test_list_buckets())
    print(f"\n\nTotal buckets: {count}")

```

--------------------------------------------------------------------------------
/tests/unit/cloudwatch/test_cloudwatch_unit_suite.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
CloudWatch Unit Test Suite Runner

Runs all CloudWatch unit tests including pagination tests.
"""

import pytest
import sys
import os

# Add the project root to the path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../../..'))


class TestCloudWatchUnitSuite:
    """CloudWatch unit test suite runner."""
    
    def test_run_all_cloudwatch_unit_tests(self):
        """Run all CloudWatch unit tests."""
        # Get the directory containing this file
        test_dir = os.path.dirname(__file__)
        
        # Run all test files in the cloudwatch unit test directory, excluding this suite runner
        exit_code = pytest.main([
            test_dir,
            '-v',
            '--tb=short',
            '--disable-warnings',
            '--ignore=' + __file__  # Exclude this suite runner to prevent recursion
        ])
        
        assert exit_code == 0, "CloudWatch unit tests failed"


if __name__ == '__main__':
    # Run the CloudWatch unit test suite
    test_dir = os.path.dirname(__file__)
    exit_code = pytest.main([
        test_dir,
        '-v',
        '--tb=short',
        '--ignore=' + __file__  # Exclude this suite runner to prevent recursion
    ])
    
    sys.exit(exit_code)
```

--------------------------------------------------------------------------------
/tests/pytest.ini:
--------------------------------------------------------------------------------

```
[tool:pytest]
# Pytest configuration for S3 optimization testing

# Test discovery
testpaths = tests
python_files = test_*.py
python_classes = Test*
python_functions = test_*

# Markers
markers =
    unit: Unit tests with mocked dependencies
    integration: Integration tests with multiple components
    performance: Performance and load tests
    no_cost_validation: Critical tests for cost constraint validation
    slow: Tests that take longer to run
    aws: Tests that require AWS credentials (skipped by default)
    cloudwatch: CloudWatch-specific tests

# Output and reporting
addopts = 
    --verbose
    --tb=short
    --strict-markers
    --strict-config
    --disable-warnings
    --color=yes
    --durations=10
    --cov=core
    --cov=services
    --cov-report=term-missing
    --cov-report=html:htmlcov
    --cov-fail-under=80

# Async support
asyncio_mode = auto

# Logging
log_cli = true
log_cli_level = INFO
log_cli_format = %(asctime)s [%(levelname)8s] %(name)s: %(message)s
log_cli_date_format = %Y-%m-%d %H:%M:%S

# Warnings
filterwarnings =
    ignore::DeprecationWarning
    ignore::PendingDeprecationWarning
    ignore::UserWarning:boto3.*
    ignore::UserWarning:botocore.*

# Minimum Python version
minversion = 3.8

# Test timeout (in seconds)
timeout = 300

# Parallel execution
# addopts = -n auto  # Uncomment to enable parallel execution with pytest-xdist
```

--------------------------------------------------------------------------------
/tests/unit/cloudwatch/test_import_error.py:
--------------------------------------------------------------------------------

```python
"""
Test to replicate the CloudWatchServiceFactory import error.

This test verifies that the import error occurs when trying to import
CloudWatchServiceFactory from services.cloudwatch_service.
"""

import pytest


def test_cloudwatch_service_factory_import_error():
    """Test that CloudWatchServiceFactory import fails as expected."""
    with pytest.raises(ImportError, match="cannot import name 'CloudWatchServiceFactory'"):
        from services.cloudwatch_service import CloudWatchServiceFactory


def test_cloudwatch_optimization_analyzer_import_success():
    """Test that CloudWatchOptimizationAnalyzer import now works after fix."""
    from playbooks.cloudwatch.cloudwatch_optimization_analyzer import CloudWatchOptimizationAnalyzer
    assert CloudWatchOptimizationAnalyzer is not None


def test_correct_imports_work():
    """Test that correct imports from cloudwatch_service work."""
    from services.cloudwatch_service import (
        CWGeneralSpendTips,
        CWMetricsTips,
        CWLogsTips,
        CWAlarmsTips,
        CWDashboardTips,
        CloudWatchService,
        create_cloudwatch_service
    )
    
    # Verify all imports are classes/functions
    assert CWGeneralSpendTips is not None
    assert CWMetricsTips is not None
    assert CWLogsTips is not None
    assert CWAlarmsTips is not None
    assert CWDashboardTips is not None
    assert CloudWatchService is not None
    assert create_cloudwatch_service is not None

```

--------------------------------------------------------------------------------
/tests/legacy/test_cloudwatch_timeout_issue.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Test to replicate the CloudWatch timeout issue and verify stack trace reporting.
"""

import asyncio
import json
import traceback
import sys
import os

# Add the project root to the path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))

from runbook_functions import run_cloudwatch_general_spend_analysis


async def test_cloudwatch_timeout():
    """Test CloudWatch general spend analysis to replicate timeout issue."""
    print("Testing CloudWatch general spend analysis timeout issue...")
    
    try:
        # Test with minimal parameters that should trigger the timeout
        arguments = {
            "region": "us-east-1",
            "lookback_days": 7,
            "page": 1
        }
        
        print(f"Calling run_cloudwatch_general_spend_analysis with: {arguments}")
        
        # This should timeout and we should get a full stack trace
        result = await run_cloudwatch_general_spend_analysis(arguments)
        
        print("Result received:")
        for content in result:
            print(content.text)
            
        return True
        
    except Exception as e:
        print(f"Exception caught in test: {str(e)}")
        print("Full stack trace:")
        traceback.print_exc()
        return False


if __name__ == "__main__":
    success = asyncio.run(test_cloudwatch_timeout())
    if success:
        print("✅ Test completed successfully")
    else:
        print("❌ Test failed")
        sys.exit(1)
```

--------------------------------------------------------------------------------
/tests/unit/test_unit_suite.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Unit Test Suite Runner - Second Level Suite
Runs all unit tests across all playbooks.
"""

import pytest
import sys
import os

# Add the project root to the path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../..'))


def run_unit_tests():
    """Run all unit tests across all playbooks."""
    print("🧪 Running Unit Test Suite")
    print("=" * 50)
    
    # Define test directories for each playbook (relative to tests directory)
    base_dir = os.path.dirname(os.path.dirname(__file__))  # Go up to tests directory
    test_dirs = [
        os.path.join(base_dir, "unit/cloudwatch/"),
        os.path.join(base_dir, "unit/ec2/"),
        os.path.join(base_dir, "unit/s3/"),
        # Add other playbooks as they are organized
    ]
    
    # Filter to only existing directories
    existing_dirs = [d for d in test_dirs if os.path.exists(d)]
    
    if not existing_dirs:
        print("❌ No unit test directories found")
        return False
    
    print(f"Running unit tests from: {existing_dirs}")
    
    # Run pytest on all unit test directories
    exit_code = pytest.main([
        "-v",
        "--tb=short",
        "--color=yes",
        *existing_dirs
    ])
    
    success = exit_code == 0
    
    if success:
        print("\n🎉 ALL UNIT TESTS PASSED!")
    else:
        print(f"\n❌ UNIT TESTS FAILED (exit code: {exit_code})")
    
    return success


if __name__ == "__main__":
    success = run_unit_tests()
    sys.exit(0 if success else 1)
```

--------------------------------------------------------------------------------
/tests/pytest-cloudwatch.ini:
--------------------------------------------------------------------------------

```
[tool:pytest]
# CloudWatch optimization testing configuration

# Test discovery
testpaths = .
python_files = test_*.py *_test.py
python_classes = Test*
python_functions = test_*

# Markers
markers =
    unit: Unit tests
    integration: Integration tests
    performance: Performance tests
    no_cost_validation: Tests that validate no unexpected costs
    cloudwatch: CloudWatch-specific tests
    slow: Slow running tests
    asyncio: Async tests

# Async support
asyncio_mode = auto

# Output options
addopts = 
    --strict-markers
    --strict-config
    --tb=short
    --maxfail=10
    --durations=10
    -ra

# Logging
log_cli = true
log_cli_level = INFO
log_cli_format = %(asctime)s [%(levelname)8s] %(name)s: %(message)s
log_cli_date_format = %Y-%m-%d %H:%M:%S

# Warnings
filterwarnings =
    ignore::DeprecationWarning
    ignore::PendingDeprecationWarning
    ignore::UserWarning:moto.*
    error::pytest.PytestUnraisableExceptionWarning

# Minimum version
minversion = 6.0

# Test timeout (for performance tests)
timeout = 300

# Coverage options (when --cov is used)
# These are applied when pytest-cov is installed and --cov is used
[coverage:run]
source = 
    playbooks/cloudwatch
    services/cloudwatch_service.py
    services/cloudwatch_pricing.py

omit =
    */tests/*
    */test_*
    */__pycache__/*
    */venv/*
    */.venv/*

[coverage:report]
exclude_lines =
    pragma: no cover
    def __repr__
    if self.debug:
    if settings.DEBUG
    raise AssertionError
    raise NotImplementedError
    if 0:
    if __name__ == .__main__.:
    class .*\bProtocol\):
    @(abc\.)?abstractmethod

show_missing = true
precision = 2
skip_covered = false

[coverage:html]
directory = htmlcov
title = CloudWatch Optimization Test Coverage
```

--------------------------------------------------------------------------------
/tests/legacy/test_stack_trace_fix.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Test to verify stack traces are now properly captured in CloudWatch functions.
"""

import asyncio
import json
import sys
import os

# Add the project root to the path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))

from runbook_functions import run_cloudwatch_general_spend_analysis


async def test_stack_trace_capture():
    """Test that CloudWatch functions now capture full stack traces."""
    print("Testing CloudWatch stack trace capture...")
    
    # Test with invalid region to trigger an error
    arguments = {
        "region": "invalid-region-12345",  # This should cause an error
        "lookback_days": 1,
        "page": 1
    }
    
    print(f"Calling run_cloudwatch_general_spend_analysis with invalid region: {arguments}")
    
    try:
        result = await run_cloudwatch_general_spend_analysis(arguments)
        
        print("Result received:")
        for content in result:
            result_text = content.text
            print(result_text)
            
            # Check if the result contains a full stack trace
            if "Full stack trace:" in result_text:
                print("✅ SUCCESS: Full stack trace found in error response")
                return True
            else:
                print("❌ FAILURE: No full stack trace found in error response")
                return False
                
    except Exception as e:
        print(f"❌ FAILURE: Exception not handled properly: {str(e)}")
        return False


if __name__ == "__main__":
    success = asyncio.run(test_stack_trace_capture())
    if success:
        print("\n✅ Stack trace fix verification PASSED")
    else:
        print("\n❌ Stack trace fix verification FAILED")
        sys.exit(1)
```

--------------------------------------------------------------------------------
/tests/legacy/test_pricing_cache_fix.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Test to verify the CloudWatch pricing cache fix works.
"""

import sys
import os
import time

# Add the project root to the path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))

from services.cloudwatch_pricing import CloudWatchPricing


def test_pricing_cache():
    """Test that pricing calls are cached and don't block."""
    print("Testing CloudWatch pricing cache fix...")
    
    # Initialize pricing service
    pricing = CloudWatchPricing(region='us-east-1')
    
    # First call - should use fallback pricing and cache it
    print("Making first pricing call...")
    start_time = time.time()
    result1 = pricing.get_metrics_pricing()
    first_call_time = time.time() - start_time
    
    print(f"First call took {first_call_time:.3f} seconds")
    print(f"Status: {result1.get('status')}")
    print(f"Source: {result1.get('source')}")
    
    # Second call - should use cache and be instant
    print("\nMaking second pricing call...")
    start_time = time.time()
    result2 = pricing.get_metrics_pricing()
    second_call_time = time.time() - start_time
    
    print(f"Second call took {second_call_time:.3f} seconds")
    print(f"Status: {result2.get('status')}")
    print(f"Source: {result2.get('source')}")
    
    # Verify caching worked
    if second_call_time < 0.001:  # Should be nearly instant
        print("✅ SUCCESS: Caching is working - second call was instant")
        return True
    else:
        print("❌ FAILURE: Caching not working - second call took too long")
        return False


if __name__ == "__main__":
    success = test_pricing_cache()
    if success:
        print("\n✅ Pricing cache fix verification PASSED")
    else:
        print("\n❌ Pricing cache fix verification FAILED")
        sys.exit(1)
```

--------------------------------------------------------------------------------
/tests/legacy/test_documentation_links.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Test script for documentation links functionality
"""

import json
from utils.documentation_links import add_documentation_links, get_service_documentation, format_documentation_section

def test_documentation_links():
    """Test the documentation links functionality"""
    
    print("Testing documentation links functionality...\n")
    
    # Test 1: Basic result with EC2 service
    print("1. Testing EC2 service documentation links:")
    ec2_result = {
        "status": "success",
        "data": {
            "underutilized_instances": [],
            "count": 0,
            "total_monthly_savings": 0
        },
        "message": "Found 0 underutilized EC2 instances"
    }
    
    enhanced_result = add_documentation_links(ec2_result, "ec2")
    print(json.dumps(enhanced_result, indent=2))
    print()
    
    # Test 2: S3 service documentation
    print("2. Testing S3 service documentation links:")
    s3_result = {
        "status": "success",
        "data": {
            "buckets_analyzed": 5,
            "total_savings": 150.50
        }
    }
    
    enhanced_s3_result = add_documentation_links(s3_result, "s3")
    print(json.dumps(enhanced_s3_result, indent=2))
    print()
    
    # Test 3: General documentation (no specific service)
    print("3. Testing general documentation links:")
    general_result = {
        "status": "success",
        "message": "Cost analysis completed"
    }
    
    enhanced_general_result = add_documentation_links(general_result)
    print(json.dumps(enhanced_general_result, indent=2))
    print()
    
    # Test 4: Get service-specific documentation
    print("4. Testing service-specific documentation retrieval:")
    rds_docs = get_service_documentation("rds")
    print("RDS Documentation:")
    for title, url in rds_docs.items():
        print(f"  - {title}: {url}")
    print()
    
    # Test 5: Format standalone documentation section
    print("5. Testing standalone documentation section:")
    lambda_docs = format_documentation_section("lambda")
    print(json.dumps(lambda_docs, indent=2))
    print()
    
    print("All tests completed successfully!")

if __name__ == "__main__":
    test_documentation_links()
```

--------------------------------------------------------------------------------
/services/cloudwatch_pricing.py:
--------------------------------------------------------------------------------

```python
"""
CloudWatch Pricing - Backward compatibility wrapper.

This module provides backward compatibility for code that expects CloudWatchPricing.
The actual pricing logic is now internal to cloudwatch_service.py via AWSPricingDAO.
"""

import logging
from typing import Dict, Any, Optional

logger = logging.getLogger(__name__)


class CloudWatchPricing:
    """
    Backward compatibility wrapper for CloudWatch pricing.
    
    This class provides the same interface as before but delegates to the internal
    AWSPricingDAO class in cloudwatch_service.py.
    """
    
    def __init__(self, region: str = 'us-east-1'):
        """Initialize pricing service."""
        self.region = region
        
        # Import here to avoid circular dependency
        from services.cloudwatch_service import AWSPricingDAO
        self._pricing_dao = AWSPricingDAO(region=region)
        
        logger.debug(f"CloudWatchPricing initialized for region: {region}")
    
    def get_pricing_data(self, component: str) -> Dict[str, Any]:
        """Get pricing data for CloudWatch components."""
        return self._pricing_dao.get_pricing_data(component)
    
    def get_free_tier_limits(self) -> Dict[str, Any]:
        """Get free tier limits for CloudWatch services."""
        return self._pricing_dao.get_free_tier_limits()
    
    def calculate_cost(self, component: str, usage: Dict[str, Any]) -> Dict[str, Any]:
        """Calculate costs for CloudWatch components."""
        return self._pricing_dao.calculate_cost(component, usage)
    
    def calculate_logs_cost(self, usage: Dict[str, Any]) -> Dict[str, Any]:
        """Calculate CloudWatch Logs costs."""
        pricing = self.get_pricing_data('logs')
        return self._pricing_dao._calculate_logs_cost(usage, pricing)
    
    def calculate_metrics_cost(self, usage: Dict[str, Any]) -> Dict[str, Any]:
        """Calculate CloudWatch Metrics costs."""
        pricing = self.get_pricing_data('metrics')
        return self._pricing_dao._calculate_metrics_cost(usage, pricing)
    
    def calculate_alarms_cost(self, usage: Dict[str, Any]) -> Dict[str, Any]:
        """Calculate CloudWatch Alarms costs."""
        pricing = self.get_pricing_data('alarms')
        return self._pricing_dao._calculate_alarms_cost(usage, pricing)
    
    def calculate_dashboards_cost(self, usage: Dict[str, Any]) -> Dict[str, Any]:
        """Calculate CloudWatch Dashboards costs."""
        pricing = self.get_pricing_data('dashboards')
        return self._pricing_dao._calculate_dashboards_cost(usage, pricing)

```

--------------------------------------------------------------------------------
/tests/performance/test_performance_suite.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Performance Test Suite Runner - Second Level Suite
Runs all performance tests across all playbooks.
"""

import sys
import os
import importlib.util

# Add the project root to the path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../..'))


def run_performance_tests():
    """Run all performance tests across all playbooks."""
    print("⚡ Running Performance Test Suite")
    print("=" * 50)
    
    # Define performance test modules for each playbook (relative to tests directory)
    base_dir = os.path.dirname(os.path.dirname(__file__))  # Go up to tests directory
    test_modules = [
        ("CloudWatch Performance", os.path.join(base_dir, "performance/cloudwatch/test_cloudwatch_performance.py")),
        # Add other playbooks as they are organized
        # ("EC2 Performance", os.path.join(base_dir, "performance/ec2/test_ec2_performance.py")),
        # ("S3 Performance", os.path.join(base_dir, "performance/s3/test_s3_performance.py")),
    ]
    
    total_passed = 0
    total_failed = 0
    
    for test_name, test_path in test_modules:
        if not os.path.exists(test_path):
            print(f"⚠️  Skipping {test_name}: {test_path} not found")
            continue
            
        print(f"\n🔄 Running {test_name}...")
        
        try:
            # Load and run the test module
            spec = importlib.util.spec_from_file_location("test_module", test_path)
            test_module = importlib.util.module_from_spec(spec)
            spec.loader.exec_module(test_module)
            
            # Run the main function if it exists
            if hasattr(test_module, 'main'):
                success = test_module.main()
                if success:
                    total_passed += 1
                    print(f"✅ {test_name} PASSED")
                else:
                    total_failed += 1
                    print(f"❌ {test_name} FAILED")
            else:
                print(f"⚠️  {test_name}: No main() function found")
                
        except Exception as e:
            total_failed += 1
            print(f"❌ {test_name} FAILED with exception: {e}")
    
    print("\n" + "=" * 50)
    print(f"Performance Test Results: {total_passed + total_failed} total, {total_passed} passed, {total_failed} failed")
    
    success = total_failed == 0
    
    if success:
        print("🎉 ALL PERFORMANCE TESTS PASSED!")
    else:
        print(f"❌ {total_failed} PERFORMANCE TESTS FAILED")
    
    return success


if __name__ == "__main__":
    success = run_performance_tests()
    sys.exit(0 if success else 1)
```

--------------------------------------------------------------------------------
/tests/integration/test_integration_suite.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Integration Test Suite Runner - Second Level Suite
Runs all integration tests across all playbooks.
"""

import asyncio
import sys
import os
import importlib.util

# Add the project root to the path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../..'))


async def run_integration_tests():
    """Run all integration tests across all playbooks."""
    print("🔗 Running Integration Test Suite")
    print("=" * 50)
    
    # Define integration test modules for each playbook (relative to tests directory)
    base_dir = os.path.dirname(os.path.dirname(__file__))  # Go up to tests directory
    test_modules = [
        ("CloudWatch Integration", os.path.join(base_dir, "integration/cloudwatch/test_cloudwatch_integration.py")),
        # Add other playbooks as they are organized
        # ("EC2 Integration", os.path.join(base_dir, "integration/ec2/test_ec2_integration.py")),
        # ("S3 Integration", os.path.join(base_dir, "integration/s3/test_s3_integration.py")),
    ]
    
    total_passed = 0
    total_failed = 0
    
    for test_name, test_path in test_modules:
        if not os.path.exists(test_path):
            print(f"⚠️  Skipping {test_name}: {test_path} not found")
            continue
            
        print(f"\n🔄 Running {test_name}...")
        
        try:
            # Load and run the test module
            spec = importlib.util.spec_from_file_location("test_module", test_path)
            test_module = importlib.util.module_from_spec(spec)
            spec.loader.exec_module(test_module)
            
            # Run the main function if it exists
            if hasattr(test_module, 'run_cloudwatch_integration_tests'):
                success = await test_module.run_cloudwatch_integration_tests()
                if success:
                    total_passed += 1
                    print(f"✅ {test_name} PASSED")
                else:
                    total_failed += 1
                    print(f"❌ {test_name} FAILED")
            elif hasattr(test_module, 'main'):
                # Handle sync main functions
                success = test_module.main()
                if success:
                    total_passed += 1
                    print(f"✅ {test_name} PASSED")
                else:
                    total_failed += 1
                    print(f"❌ {test_name} FAILED")
            else:
                print(f"⚠️  {test_name}: No main() or run_*_integration_tests() function found")
                
        except Exception as e:
            total_failed += 1
            print(f"❌ {test_name} FAILED with exception: {e}")
    
    print("\n" + "=" * 50)
    print(f"Integration Test Results: {total_passed + total_failed} total, {total_passed} passed, {total_failed} failed")
    
    success = total_failed == 0
    
    if success:
        print("🎉 ALL INTEGRATION TESTS PASSED!")
    else:
        print(f"❌ {total_failed} INTEGRATION TESTS FAILED")
    
    return success


if __name__ == "__main__":
    success = asyncio.run(run_integration_tests())
    sys.exit(0 if success else 1)
```

--------------------------------------------------------------------------------
/services/trusted_advisor.py:
--------------------------------------------------------------------------------

```python
"""
AWS Trusted Advisor service module.

This module provides functions for interacting with the AWS Trusted Advisor API.
"""

import logging
from typing import Dict, List, Optional, Any
import boto3
from botocore.exceptions import ClientError

from utils.error_handler import AWSErrorHandler, ResponseFormatter
from utils.aws_client_factory import get_trusted_advisor_client

logger = logging.getLogger(__name__)

def get_trusted_advisor_checks(
    check_categories: Optional[List[str]] = None,
    region: Optional[str] = None
) -> Dict[str, Any]:
    """
    Get AWS Trusted Advisor check results.
    
    Args:
        check_categories: List of check categories to filter
        region: AWS region (optional)
        
    Returns:
        Dictionary containing the Trusted Advisor check results
    """
    try:
        # Trusted Advisor is only available in us-east-1
        support_client = get_trusted_advisor_client()
        
        # Get available checks
        checks_response = support_client.describe_trusted_advisor_checks(language='en')
        checks = checks_response['checks']
        
        # Filter by categories if specified
        if check_categories:
            checks = [check for check in checks if check['category'] in check_categories]
            
        # Get results for each check
        results = []
        for check in checks:
            # Ensure check is a dictionary
            if not isinstance(check, dict):
                logger.warning(f"Unexpected check format in Trusted Advisor response: {type(check)}")
                continue
                
            check_id = check.get('id')
            check_name = check.get('name', 'Unknown')
            
            if not check_id:
                logger.warning(f"Check missing ID: {check_name}")
                continue
                
            try:
                result = support_client.describe_trusted_advisor_check_result(
                    checkId=check_id,
                    language='en'
                )
                
                # Validate result structure
                if 'result' in result and isinstance(result['result'], dict):
                    results.append({
                        'check_id': check_id,
                        'name': check_name,
                        'category': check.get('category', 'unknown'),
                        'result': result['result']
                    })
                else:
                    logger.warning(f"Invalid result structure for check {check_name}")
                    
            except Exception as check_error:
                logger.warning(f"Error getting result for check {check_name}: {str(check_error)}")
                
        return ResponseFormatter.success_response(
            data={"checks": results, "count": len(results)},
            message=f"Retrieved {len(results)} Trusted Advisor check results",
            analysis_type="trusted_advisor_checks"
        )
        
    except ClientError as e:
        return AWSErrorHandler.format_client_error(
            e, 
            "get_trusted_advisor_checks",
            ["support:DescribeTrustedAdvisorChecks", "support:DescribeTrustedAdvisorCheckResult"]
        )
        
    except Exception as e:
        return AWSErrorHandler.format_general_error(e, "get_trusted_advisor_checks")
```

--------------------------------------------------------------------------------
/runbook_functions_extended.py:
--------------------------------------------------------------------------------

```python
# Extended EC2 runbook functions
import json
from typing import Dict, List, Any
from mcp.types import TextContent

from playbooks.ec2_optimization import (
    get_graviton_compatible_instances, get_burstable_instances_analysis,
    get_spot_instance_opportunities, get_unused_capacity_reservations,
    get_scheduling_opportunities, get_commitment_plan_recommendations,
    get_governance_violations, generate_comprehensive_ec2_report
)

async def identify_graviton_compatible_instances(arguments: Dict[str, Any]) -> List[TextContent]:
    try:
        result = get_graviton_compatible_instances(region=arguments.get("region"))
        return [TextContent(type="text", text=json.dumps(result, indent=2, default=str))]
    except Exception as e:
        return [TextContent(type="text", text=f"Error: {str(e)}")]

async def analyze_burstable_instances(arguments: Dict[str, Any]) -> List[TextContent]:
    try:
        result = get_burstable_instances_analysis(
            region=arguments.get("region"),
            lookback_period_days=arguments.get("lookback_period_days", 14)
        )
        return [TextContent(type="text", text=json.dumps(result, indent=2, default=str))]
    except Exception as e:
        return [TextContent(type="text", text=f"Error: {str(e)}")]

async def identify_spot_opportunities(arguments: Dict[str, Any]) -> List[TextContent]:
    try:
        result = get_spot_instance_opportunities(region=arguments.get("region"))
        return [TextContent(type="text", text=json.dumps(result, indent=2, default=str))]
    except Exception as e:
        return [TextContent(type="text", text=f"Error: {str(e)}")]

async def identify_unused_reservations(arguments: Dict[str, Any]) -> List[TextContent]:
    try:
        result = get_unused_capacity_reservations(region=arguments.get("region"))
        return [TextContent(type="text", text=json.dumps(result, indent=2, default=str))]
    except Exception as e:
        return [TextContent(type="text", text=f"Error: {str(e)}")]

async def identify_scheduling_opportunities(arguments: Dict[str, Any]) -> List[TextContent]:
    try:
        result = get_scheduling_opportunities(region=arguments.get("region"))
        return [TextContent(type="text", text=json.dumps(result, indent=2, default=str))]
    except Exception as e:
        return [TextContent(type="text", text=f"Error: {str(e)}")]

async def analyze_commitment_plans(arguments: Dict[str, Any]) -> List[TextContent]:
    try:
        result = get_commitment_plan_recommendations(region=arguments.get("region"))
        return [TextContent(type="text", text=json.dumps(result, indent=2, default=str))]
    except Exception as e:
        return [TextContent(type="text", text=f"Error: {str(e)}")]

async def identify_governance_violations(arguments: Dict[str, Any]) -> List[TextContent]:
    try:
        result = get_governance_violations(region=arguments.get("region"))
        return [TextContent(type="text", text=json.dumps(result, indent=2, default=str))]
    except Exception as e:
        return [TextContent(type="text", text=f"Error: {str(e)}")]

async def generate_comprehensive_report(arguments: Dict[str, Any]) -> List[TextContent]:
    try:
        result = generate_comprehensive_ec2_report(region=arguments.get("region"))
        return [TextContent(type="text", text=json.dumps(result, indent=2, default=str))]
    except Exception as e:
        return [TextContent(type="text", text=f"Error: {str(e)}")]
```

--------------------------------------------------------------------------------
/services/performance_insights.py:
--------------------------------------------------------------------------------

```python
"""
AWS Performance Insights service module.

This module provides functions for interacting with the AWS Performance Insights API.
"""

import logging
from typing import Dict, List, Optional, Any
import boto3
from datetime import datetime, timedelta
from botocore.exceptions import ClientError

from utils.error_handler import AWSErrorHandler, ResponseFormatter
from utils.aws_client_factory import get_performance_insights_client

logger = logging.getLogger(__name__)

def get_performance_insights_metrics(
    db_instance_identifier: str,
    start_time: Optional[str] = None,
    end_time: Optional[str] = None,
    region: Optional[str] = None
) -> Dict[str, Any]:
    """
    Get Performance Insights metrics for an RDS instance.
    
    Args:
        db_instance_identifier: RDS instance identifier
        start_time: Start time for metrics (ISO format)
        end_time: End time for metrics (ISO format)
        region: AWS region (optional)
        
    Returns:
        Dictionary containing the Performance Insights metrics
    """
    try:
        # Create Performance Insights client
        pi_client = get_performance_insights_client(region)
            
        # Set default time range if not provided
        if not start_time:
            end_datetime = datetime.utcnow()
            start_datetime = end_datetime - timedelta(hours=1)
            start_time = start_datetime.isoformat() + 'Z'
            end_time = end_datetime.isoformat() + 'Z'
        elif not end_time:
            end_time = datetime.utcnow().isoformat() + 'Z'
            
        # Define metrics to retrieve
        metrics = [
            {'Metric': 'db.load.avg'},
            {'Metric': 'db.sampledload.avg'}
        ]
            
        # Make the API call
        response = pi_client.get_resource_metrics(
            ServiceType='RDS',
            Identifier=db_instance_identifier,
            StartTime=start_time,
            EndTime=end_time,
            MetricQueries=metrics,
            PeriodInSeconds=60
        )
            
        return {
            "status": "success",
            "data": response,
            "message": f"Retrieved Performance Insights metrics for {db_instance_identifier}"
        }
        
    except ClientError as e:
        error_code = e.response['Error']['Code'] if 'Error' in e.response else "Unknown"
        
        # Handle specific authorization errors gracefully
        if error_code in ['NotAuthorizedException', 'AccessDenied', 'UnauthorizedOperation']:
            logger.warning(f"Performance Insights not authorized for {db_instance_identifier}: {str(e)}")
            return {
                "status": "success",
                "data": {
                    "MetricList": [],
                    "AlignedStartTime": start_time,
                    "AlignedEndTime": end_time,
                    "Identifier": db_instance_identifier
                },
                "message": f"Performance Insights not enabled or authorized for {db_instance_identifier}",
                "warning": "Performance Insights requires explicit enablement and permissions"
            }
        else:
            logger.error(f"Error in Performance Insights API: {str(e)}")
            return {
                "status": "error",
                "message": f"Performance Insights API error: {str(e)}",
                "error_code": error_code
            }
        
    except Exception as e:
        logger.error(f"Unexpected error in Performance Insights service: {str(e)}")
        return {
            "status": "error",
            "message": f"Unexpected error: {str(e)}"
        }
```

--------------------------------------------------------------------------------
/playbooks/s3/s3_optimization.py:
--------------------------------------------------------------------------------

```python
"""
S3 Cost Optimization Playbook - Consolidated Module

This module has been consolidated and cleaned up. The main S3 optimization functionality
has been moved to the new architecture:
- core/s3_optimization_orchestrator.py (main orchestrator)
- core/s3_analysis_engine.py (analysis engine)
- core/analyzers/ (individual analyzer implementations)

This file now contains only essential imports and references for backward compatibility.
All new development should use the S3OptimizationOrchestrator from playbooks.s3.s3_optimization_orchestrator.
"""

import logging
from typing import Dict, Any, Optional

logger = logging.getLogger(__name__)

# Import the new orchestrator for any legacy compatibility needs
try:
    from .s3_optimization_orchestrator import S3OptimizationOrchestrator
    
    # Provide a compatibility alias for any remaining legacy code
    S3Optimization = S3OptimizationOrchestrator
    
    logger.info("S3 optimization functionality available via S3OptimizationOrchestrator")
    
except ImportError as e:
    logger.error(f"Failed to import S3OptimizationOrchestrator: {e}")
    
    # Fallback class for error handling
    class S3Optimization:
        """
        Fallback S3Optimization class when the new orchestrator is not available.
        
        This class provides basic error handling and guidance to use the new architecture.
        """
        
        def __init__(self, region: Optional[str] = None, timeout_seconds: int = 45):
            """
            Initialize fallback S3 optimization.
            
            Args:
                region: AWS region (optional)
                timeout_seconds: Maximum execution time per analysis (default: 45)
            """
            self.region = region
            self.timeout_seconds = timeout_seconds
            logger.warning("Using fallback S3Optimization class. Please use S3OptimizationOrchestrator instead.")
        
        def __getattr__(self, name: str) -> Any:
            """
            Handle any method calls by providing guidance to use the new architecture.
            
            Args:
                name: Method name being called
                
            Returns:
                Error response with guidance
            """
            logger.error(f"Method '{name}' called on fallback S3Optimization class")
            return lambda *args, **kwargs: {
                "status": "error",
                "message": f"S3Optimization.{name}() is deprecated. Use S3OptimizationOrchestrator instead.",
                "guidance": {
                    "new_class": "S3OptimizationOrchestrator",
                    "import_path": "from playbooks.s3.s3_optimization_orchestrator import S3OptimizationOrchestrator",
                    "migration_note": "The new orchestrator provides all S3 optimization functionality with improved performance and session integration."
                },
                "data": {}
            }


# Utility functions for backward compatibility
def get_s3_optimization_instance(region: Optional[str] = None, timeout_seconds: int = 45) -> S3Optimization:
    """
    Get an S3 optimization instance (preferably the new orchestrator).
    
    Args:
        region: AWS region (optional)
        timeout_seconds: Maximum execution time per analysis (default: 45)
        
    Returns:
        S3Optimization instance (either orchestrator or fallback)
    """
    try:
        return S3OptimizationOrchestrator(region=region)
    except Exception as e:
        logger.warning(f"Could not create S3OptimizationOrchestrator, using fallback: {e}")
        return S3Optimization(region=region, timeout_seconds=timeout_seconds)


# Export the main class for backward compatibility
__all__ = ['S3Optimization', 'get_s3_optimization_instance']
```

--------------------------------------------------------------------------------
/utils/documentation_links.py:
--------------------------------------------------------------------------------

```python
"""
Documentation Links Utility

This module provides centralized documentation links for AWS cost optimization tools.
It adds relevant documentation and best practices links to tool outputs, including
AWS Well-Architected Framework recommendations.
"""

from typing import Dict, Any, List
# Removed wellarchitected_recommendations - let LLMs provide recommendations based on MCP output

# Documentation links mapping
DOCUMENTATION_LINKS = {
    "general": {
        "CFM-TIPs Guidance": "https://catalog.workshops.aws/awscff/en-US/introduction",
        "Cost Optimization Pillar of AWS Well Architected": "https://docs.aws.amazon.com/wellarchitected/latest/framework/cost-optimization.html"
    },
    "ec2": {
        "Best Practices Playbooks for EC2": "https://catalog.workshops.aws/awscff/en-US/playbooks/compute/ec2"
    },
    "ebs": {
        "Best Practices Playbooks for EBS": "https://catalog.workshops.aws/awscff/en-US/playbooks/storage/ebs"
    },
    "rds": {
        "Best Practices Playbooks for RDS": "https://catalog.workshops.aws/awscff/en-US/playbooks/databases/rds"
    },
    "lambda": {
        "Best Practices Playbooks for AWS Lambda": "https://catalog.workshops.aws/awscff/en-US/playbooks/compute/lambda"
    },
    "s3": {
        "Best Practices Playbooks for S3": "https://catalog.workshops.aws/awscff/en-US/playbooks/storage/s3"
    },
    "cloudtrail": {
        "Best Practices Playbooks for CloudTrail": "https://catalog.workshops.aws/awscff/en-US/playbooks/management-and-governance/cloudtrail"
    }
}

def add_documentation_links(result: Dict[str, Any], service_type: str = None, finding_type: str = None) -> Dict[str, Any]:
    """
    Add relevant documentation links and Well-Architected recommendations to a result dictionary.
    
    Args:
        result: The result dictionary from a cost optimization function
        service_type: The AWS service type (ec2, ebs, rds, lambda, s3, cloudtrail)
        finding_type: Type of optimization finding (underutilized, unused, overprovisioned, etc.)
    
    Returns:
        Enhanced result dictionary with documentation links and Well-Architected recommendations
    """
    if not isinstance(result, dict):
        return result
    
    # Create a copy to avoid modifying the original
    enhanced_result = result.copy()
    
    # Build documentation links
    docs = {}
    
    # Always include general documentation
    docs.update(DOCUMENTATION_LINKS["general"])
    
    # Add service-specific documentation if specified
    if service_type and service_type.lower() in DOCUMENTATION_LINKS:
        docs.update(DOCUMENTATION_LINKS[service_type.lower()])
    
    # Add documentation section to the result
    enhanced_result["documentation"] = {
        "description": "Suggested documentation and further reading",
        "links": docs
    }
    
    # Well-Architected recommendations now provided by LLMs analyzing MCP output
    
    return enhanced_result

def get_service_documentation(service_type: str) -> Dict[str, str]:
    """
    Get documentation links for a specific service.
    
    Args:
        service_type: The AWS service type
    
    Returns:
        Dictionary of documentation links
    """
    docs = DOCUMENTATION_LINKS["general"].copy()
    
    if service_type.lower() in DOCUMENTATION_LINKS:
        docs.update(DOCUMENTATION_LINKS[service_type.lower()])
    
    return docs

def format_documentation_section(service_type: str = None) -> Dict[str, Any]:
    """
    Format a standalone documentation section.
    
    Args:
        service_type: Optional service type for service-specific links
    
    Returns:
        Formatted documentation section
    """
    docs = DOCUMENTATION_LINKS["general"].copy()
    
    if service_type and service_type.lower() in DOCUMENTATION_LINKS:
        docs.update(DOCUMENTATION_LINKS[service_type.lower()])
    
    return {
        "documentation": {
            "description": "Suggested documentation and further reading",
            "links": docs
        }
    }
```

--------------------------------------------------------------------------------
/logging_config.py:
--------------------------------------------------------------------------------

```python
"""
Centralized logging configuration for CFM Tips MCP Server
"""

import logging
import sys
import os
import tempfile
from datetime import datetime

def setup_logging():
    """Configure comprehensive logging for the application."""
    
    # Create formatter
    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s'
    )
    
    # Configure root logger
    root_logger = logging.getLogger()
    root_logger.setLevel(logging.INFO)
    
    # Remove existing handlers
    for handler in root_logger.handlers[:]:
        root_logger.removeHandler(handler)
    
    # Add file handlers
    try:
        # Try to create logs directory if it doesn't exist
        log_dir = 'logs'
        if not os.path.exists(log_dir):
            os.makedirs(log_dir, exist_ok=True)
        
        # Try main log file in logs directory first
        log_file = os.path.join(log_dir, 'cfm_tips_mcp.log')
        file_handler = logging.FileHandler(log_file)
        file_handler.setLevel(logging.INFO)
        file_handler.setFormatter(formatter)
        root_logger.addHandler(file_handler)
        
        # Try error log file
        error_file = os.path.join(log_dir, 'cfm_tips_mcp_errors.log')
        error_handler = logging.FileHandler(error_file)
        error_handler.setLevel(logging.ERROR)
        error_handler.setFormatter(formatter)
        root_logger.addHandler(error_handler)
        
    except (OSError, PermissionError) as e:
        # If we can't write to logs directory, try current directory
        try:
            file_handler = logging.FileHandler('cfm_tips_mcp.log')
            file_handler.setLevel(logging.INFO)
            file_handler.setFormatter(formatter)
            root_logger.addHandler(file_handler)
            
            error_handler = logging.FileHandler('cfm_tips_mcp_errors.log')
            error_handler.setLevel(logging.ERROR)
            error_handler.setFormatter(formatter)
            root_logger.addHandler(error_handler)
            
        except (OSError, PermissionError):
            # If we can't write anywhere, try temp directory
            try:
                temp_dir = tempfile.gettempdir()
                temp_log = os.path.join(temp_dir, 'cfm_tips_mcp.log')
                file_handler = logging.FileHandler(temp_log)
                file_handler.setLevel(logging.INFO)
                file_handler.setFormatter(formatter)
                root_logger.addHandler(file_handler)
                
                temp_error = os.path.join(temp_dir, 'cfm_tips_mcp_errors.log')
                error_handler = logging.FileHandler(temp_error)
                error_handler.setLevel(logging.ERROR)
                error_handler.setFormatter(formatter)
                root_logger.addHandler(error_handler)
                
                # Log where we're writing files
                print(f"Warning: Using temp directory for logs: {temp_dir}")
                
            except (OSError, PermissionError):
                # If all else fails, raise error since we need file logging
                raise RuntimeError("Could not create log files in any location")
    
    return logging.getLogger(__name__)

def log_function_entry(logger, func_name, **kwargs):
    """Log function entry with parameters."""
    logger.info(f"Entering {func_name} with params: {kwargs}")

def log_function_exit(logger, func_name, result_status=None, execution_time=None):
    """Log function exit with results."""
    msg = f"Exiting {func_name}"
    if result_status:
        msg += f" - Status: {result_status}"
    if execution_time:
        msg += f" - Time: {execution_time:.2f}s"
    logger.info(msg)

def log_aws_api_call(logger, service, operation, **params):
    """Log AWS API calls."""
    logger.info(f"AWS API Call: {service}.{operation} with params: {params}")

def log_aws_api_error(logger, service, operation, error):
    """Log AWS API errors."""
    logger.error(f"AWS API Error: {service}.{operation} - {str(error)}")
```

--------------------------------------------------------------------------------
/tests/test_suite_main.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Main Test Suite Runner - Top Level Suite
Orchestrates all second-level test suites (unit, performance, integration).
"""

import asyncio
import sys
import os
import importlib.util
from typing import Dict, Any

# Add the project root to the path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))


def run_suite(suite_name: str, suite_path: str) -> Dict[str, Any]:
    """Run a test suite and return results."""
    print(f"\n{'='*60}")
    print(f"🚀 STARTING {suite_name.upper()} SUITE")
    print(f"{'='*60}")
    
    if not os.path.exists(suite_path):
        return {
            'name': suite_name,
            'status': 'skipped',
            'reason': f'Suite file not found: {suite_path}'
        }
    
    try:
        # Load the suite module
        spec = importlib.util.spec_from_file_location("suite_module", suite_path)
        suite_module = importlib.util.module_from_spec(spec)
        spec.loader.exec_module(suite_module)
        
        # Determine the appropriate function to call
        if suite_name == 'Integration' and hasattr(suite_module, 'run_integration_tests'):
            # Integration tests are async
            success = asyncio.run(suite_module.run_integration_tests())
        elif hasattr(suite_module, f'run_{suite_name.lower()}_tests'):
            # Standard naming convention
            func = getattr(suite_module, f'run_{suite_name.lower()}_tests')
            success = func()
        elif hasattr(suite_module, 'main'):
            # Fallback to main function
            success = suite_module.main()
        else:
            return {
                'name': suite_name,
                'status': 'error',
                'reason': f'No suitable entry point found in {suite_path}'
            }
        
        return {
            'name': suite_name,
            'status': 'passed' if success else 'failed',
            'success': success
        }
        
    except Exception as e:
        return {
            'name': suite_name,
            'status': 'error',
            'reason': str(e)
        }


def main():
    """Run all test suites in order."""
    print("🎯 CFM Tips - Main Test Suite Runner")
    print("=" * 60)
    print("Running hierarchical test suite:")
    print("  📁 Top Level: Main Suite")
    print("  📁 Second Level: Unit, Performance, Integration")
    print("  📁 Third Level: Playbook-specific (CloudWatch, EC2, S3, etc.)")
    print("=" * 60)
    
    # Define the test suites in execution order
    suites = [
        ("Unit", "tests/unit/test_unit_suite.py"),
        ("Performance", "tests/performance/test_performance_suite.py"),
        ("Integration", "tests/integration/test_integration_suite.py"),
    ]
    
    results = []
    
    # Run each suite
    for suite_name, suite_path in suites:
        result = run_suite(suite_name, suite_path)
        results.append(result)
    
    # Print summary
    print(f"\n{'='*60}")
    print("📊 MAIN TEST SUITE SUMMARY")
    print(f"{'='*60}")
    
    passed = 0
    failed = 0
    skipped = 0
    errors = 0
    
    for result in results:
        status = result['status']
        name = result['name']
        
        if status == 'passed':
            print(f"✅ {name} Suite: PASSED")
            passed += 1
        elif status == 'failed':
            print(f"❌ {name} Suite: FAILED")
            failed += 1
        elif status == 'skipped':
            print(f"⏭️  {name} Suite: SKIPPED - {result['reason']}")
            skipped += 1
        elif status == 'error':
            print(f"💥 {name} Suite: ERROR - {result['reason']}")
            errors += 1
    
    total = len(results)
    print(f"\n📈 Results: {total} suites total")
    print(f"   ✅ Passed: {passed}")
    print(f"   ❌ Failed: {failed}")
    print(f"   ⏭️  Skipped: {skipped}")
    print(f"   💥 Errors: {errors}")
    
    success_rate = (passed / total * 100) if total > 0 else 0
    print(f"   📊 Success Rate: {success_rate:.1f}%")
    
    overall_success = failed == 0 and errors == 0
    
    if overall_success:
        print(f"\n🎉 ALL TEST SUITES COMPLETED SUCCESSFULLY!")
        print("   🚀 CFM Tips is ready for deployment!")
    else:
        print(f"\n⚠️  SOME TEST SUITES FAILED")
        print("   🔧 Please review failed tests before deployment")
    
    return overall_success


if __name__ == "__main__":
    success = main()
    sys.exit(0 if success else 1)
```

--------------------------------------------------------------------------------
/tests/unit/s3/live/test_top_buckets.py:
--------------------------------------------------------------------------------

```python
"""
Live test for S3 top buckets listing functionality.

This test validates that the quick analysis properly returns top 10 buckets
with their cost estimates.
"""

import pytest
import asyncio
import json
from playbooks.s3.s3_optimization_orchestrator import run_s3_quick_analysis


@pytest.mark.live
@pytest.mark.asyncio
async def test_list_top_10_buckets():
    """Test that quick analysis returns top 10 buckets with cost estimates."""
    
    # Run quick analysis
    arguments = {
        'region': 'us-east-1'  # You can change this to your preferred region
    }
    
    result = await run_s3_quick_analysis(arguments)
    
    # Parse the result
    assert len(result) > 0
    assert result[0]["type"] == "text"
    
    data = json.loads(result[0]["text"])
    
    # Verify structure
    assert data["status"] == "success"
    assert "results" in data
    assert "general_spend" in data["results"]
    
    # Check general_spend results
    general_spend = data["results"]["general_spend"]
    
    print(f"\n=== General Spend Status: {general_spend.get('status')} ===")
    
    if general_spend.get("status") == "success":
        assert "data" in general_spend
        
        # Print full data structure for debugging
        print(f"\nData keys: {list(general_spend['data'].keys())}")
        
        if "bucket_costs" in general_spend["data"]:
            bucket_costs = general_spend["data"]["bucket_costs"]
            print(f"\nBucket costs keys: {list(bucket_costs.keys())}")
            print(f"Total buckets analyzed: {bucket_costs.get('total_buckets_analyzed', 'N/A')}")
            
            # Verify top_10_buckets exists
            assert "top_10_buckets" in bucket_costs
            
            top_buckets = bucket_costs["top_10_buckets"]
            
            # Print results for manual verification
            print("\n=== Top 10 S3 Buckets by Estimated Cost ===")
            if len(top_buckets) == 0:
                print("No buckets found or analyzed.")
            else:
                for i, bucket in enumerate(top_buckets, 1):
                    print(f"{i}. {bucket['bucket_name']}")
                    print(f"   Estimated Monthly Cost: ${bucket['estimated_monthly_cost']:.2f}")
                    print(f"   Size: {bucket['size_gb']:.2f} GB")
                    print(f"   Objects: {bucket['object_count']:,}")
                    print(f"   Storage Class: {bucket['primary_storage_class']}")
                    print()
        else:
            print("\nWARNING: bucket_costs not found in general_spend data")
            print(f"Available data: {json.dumps(general_spend['data'], indent=2, default=str)}")
        
        # Verify bucket data structure
        if len(top_buckets) > 0:
            first_bucket = top_buckets[0]
            assert "bucket_name" in first_bucket
            assert "estimated_monthly_cost" in first_bucket
            assert "size_gb" in first_bucket
            assert "object_count" in first_bucket
            assert "primary_storage_class" in first_bucket
            
            # Verify costs are sorted (highest first)
            if len(top_buckets) > 1:
                for i in range(len(top_buckets) - 1):
                    assert top_buckets[i]["estimated_monthly_cost"] >= top_buckets[i + 1]["estimated_monthly_cost"], \
                        "Buckets should be sorted by cost (highest first)"
    else:
        print(f"\nGeneral spend analysis failed: {general_spend.get('message')}")
        pytest.skip(f"General spend analysis failed: {general_spend.get('message')}")


@pytest.mark.live
@pytest.mark.asyncio
async def test_bucket_cost_estimation():
    """Test that bucket cost estimation is working correctly."""
    
    arguments = {'region': 'us-east-1'}
    result = await run_s3_quick_analysis(arguments)
    
    data = json.loads(result[0]["text"])
    
    if data["status"] == "success":
        general_spend = data["results"].get("general_spend", {})
        
        if general_spend.get("status") == "success":
            bucket_costs = general_spend["data"].get("bucket_costs", {})
            
            # Check that we have bucket analysis data
            assert "by_bucket" in bucket_costs or "top_10_buckets" in bucket_costs
            
            # Verify total buckets analyzed
            if "total_buckets_analyzed" in bucket_costs:
                print(f"\nTotal buckets analyzed: {bucket_costs['total_buckets_analyzed']}")
            
            # Verify cost estimation method
            if "cost_estimation_method" in bucket_costs:
                print(f"Cost estimation method: {bucket_costs['cost_estimation_method']}")
                assert bucket_costs["cost_estimation_method"] in ["size_based", "cost_explorer"]


if __name__ == "__main__":
    # Run the test directly
    asyncio.run(test_list_top_10_buckets())

```

--------------------------------------------------------------------------------
/tests/legacy/example_output_with_docs.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Example showing how documentation links appear in tool outputs
"""

import json
from utils.documentation_links import add_documentation_links

def show_example_outputs():
    """Show examples of how documentation links appear in different tool outputs"""
    
    print("CFM Tips - Documentation Links Feature Examples")
    print("=" * 60)
    print()
    
    # Example 1: EC2 Right-sizing Analysis
    print("1. EC2 Right-sizing Analysis Output:")
    print("-" * 40)
    ec2_result = {
        "status": "success",
        "data": {
            "underutilized_instances": [
                {
                    "instance_id": "i-1234567890abcdef0",
                    "instance_type": "m5.large",
                    "finding": "Overprovisioned",
                    "recommendation": {
                        "recommended_instance_type": "m5.medium",
                        "estimated_monthly_savings": 45.50
                    }
                }
            ],
            "count": 1,
            "total_monthly_savings": 45.50
        },
        "message": "Found 1 underutilized EC2 instances via Compute Optimizer"
    }
    
    enhanced_ec2 = add_documentation_links(ec2_result, "ec2")
    print(json.dumps(enhanced_ec2, indent=2))
    print("\n" + "=" * 60 + "\n")
    
    # Example 2: S3 Optimization Analysis
    print("2. S3 Optimization Analysis Output:")
    print("-" * 40)
    s3_result = {
        "status": "success",
        "comprehensive_s3_optimization": {
            "overview": {
                "total_potential_savings": "$1,250.75",
                "analyses_completed": "6/6",
                "failed_analyses": 0,
                "execution_time": "45.2s"
            },
            "key_findings": [
                "Found 15 buckets with suboptimal storage classes",
                "Identified $800 in potential lifecycle savings",
                "Discovered 25 incomplete multipart uploads"
            ],
            "top_recommendations": [
                {
                    "type": "storage_class_optimization",
                    "bucket": "my-data-bucket",
                    "potential_savings": "$450.25/month",
                    "action": "Transition to IA after 30 days"
                }
            ]
        }
    }
    
    enhanced_s3 = add_documentation_links(s3_result, "s3")
    print(json.dumps(enhanced_s3, indent=2))
    print("\n" + "=" * 60 + "\n")
    
    # Example 3: RDS Optimization Analysis
    print("3. RDS Optimization Analysis Output:")
    print("-" * 40)
    rds_result = {
        "status": "success",
        "data": {
            "underutilized_instances": [
                {
                    "db_instance_identifier": "prod-database-1",
                    "db_instance_class": "db.r5.xlarge",
                    "finding": "Underutilized",
                    "avg_cpu_utilization": 15.5,
                    "recommendation": {
                        "recommended_instance_class": "db.r5.large",
                        "estimated_monthly_savings": 180.00
                    }
                }
            ],
            "count": 1,
            "total_monthly_savings": 180.00
        },
        "message": "Found 1 underutilized RDS instances"
    }
    
    enhanced_rds = add_documentation_links(rds_result, "rds")
    print(json.dumps(enhanced_rds, indent=2))
    print("\n" + "=" * 60 + "\n")
    
    # Example 4: Lambda Optimization Analysis
    print("4. Lambda Optimization Analysis Output:")
    print("-" * 40)
    lambda_result = {
        "status": "success",
        "data": {
            "overprovisioned_functions": [
                {
                    "function_name": "data-processor",
                    "current_memory": 1024,
                    "avg_memory_utilization": 35.2,
                    "recommendation": {
                        "recommended_memory": 512,
                        "estimated_monthly_savings": 25.75
                    }
                }
            ],
            "count": 1,
            "total_monthly_savings": 25.75
        },
        "message": "Found 1 overprovisioned Lambda functions"
    }
    
    enhanced_lambda = add_documentation_links(lambda_result, "lambda")
    print(json.dumps(enhanced_lambda, indent=2))
    print("\n" + "=" * 60 + "\n")
    
    # Example 5: General Cost Analysis (no specific service)
    print("5. General Cost Analysis Output:")
    print("-" * 40)
    general_result = {
        "status": "success",
        "data": {
            "total_monthly_cost": 5420.75,
            "potential_savings": 1250.50,
            "services_analyzed": ["EC2", "EBS", "RDS", "Lambda", "S3"],
            "optimization_opportunities": 47
        },
        "message": "Comprehensive cost analysis completed"
    }
    
    enhanced_general = add_documentation_links(general_result)
    print(json.dumps(enhanced_general, indent=2))
    print("\n" + "=" * 60 + "\n")
    
    print("Key Benefits of Documentation Links:")
    print("• Provides immediate access to AWS best practices")
    print("• Links to CFM-TIPs guidance and workshops")
    print("• References AWS Well-Architected Framework")
    print("• Service-specific playbooks for detailed guidance")
    print("• Consistent across all tool outputs")
    print("• Helps users understand optimization recommendations")

if __name__ == "__main__":
    show_example_outputs()
```

--------------------------------------------------------------------------------
/services/optimization_hub.py:
--------------------------------------------------------------------------------

```python
"""
AWS Cost Optimization Hub service module.

This module provides functions for interacting with the AWS Cost Optimization Hub API.
"""

import logging
from typing import Dict, List, Optional, Any
import boto3
from botocore.exceptions import ClientError

logger = logging.getLogger(__name__)

def get_recommendations(
    resource_type: Optional[str] = None,
    region: Optional[str] = None,
    account_id: Optional[str] = None,
    client_region: Optional[str] = None
) -> Dict[str, Any]:
    """
    Get cost optimization recommendations from AWS Cost Optimization Hub.
    
    Args:
        resource_type: Resource type to analyze (e.g., EC2, RDS)
        region: AWS region to filter recommendations
        account_id: AWS account ID to filter recommendations
        client_region: Region for the boto3 client (optional)
        
    Returns:
        Dictionary containing the optimization recommendations
    """
    try:
        # Create Cost Optimization Hub client
        if client_region:
            client = boto3.client('cost-optimization-hub', region_name=client_region)
        else:
            client = boto3.client('cost-optimization-hub')
        
        # Prepare filters based on parameters
        filters = {}
        if resource_type:
            filters['resourceType'] = {'values': [resource_type]}
        if region:
            filters['region'] = {'values': [region]}
        if account_id:
            filters['accountId'] = {'values': [account_id]}
            
        # Make the API call
        if filters:
            response = client.get_recommendations(filters=filters)
        else:
            response = client.get_recommendations()
            
        # Extract recommendation count
        recommendation_count = len(response.get('recommendations', []))
            
        return {
            "status": "success",
            "data": response,
            "message": f"Retrieved {recommendation_count} cost optimization recommendations"
        }
        
    except ClientError as e:
        logger.error(f"Error in Cost Optimization Hub API: {str(e)}")
        return {
            "status": "error",
            "message": f"Cost Optimization Hub API error: {str(e)}",
            "error_code": e.response['Error']['Code'] if 'Error' in e.response else "Unknown"
        }
        
    except Exception as e:
        logger.error(f"Unexpected error in Cost Optimization Hub service: {str(e)}")
        return {
            "status": "error",
            "message": f"Unexpected error: {str(e)}"
        }

def get_recommendation_summary(
    client_region: Optional[str] = None
) -> Dict[str, Any]:
    """
    Get a summary of cost optimization recommendations.
    
    Args:
        client_region: Region for the boto3 client (optional)
        
    Returns:
        Dictionary containing the recommendation summary
    """
    try:
        # Create Cost Optimization Hub client
        if client_region:
            client = boto3.client('cost-optimization-hub', region_name=client_region)
        else:
            client = boto3.client('cost-optimization-hub')
            
        # Make the API call
        response = client.get_recommendation_summary()
            
        return {
            "status": "success",
            "data": response,
            "message": "Retrieved cost optimization recommendation summary"
        }
        
    except ClientError as e:
        logger.error(f"Error getting recommendation summary: {str(e)}")
        return {
            "status": "error",
            "message": f"Error getting recommendation summary: {str(e)}",
            "error_code": e.response['Error']['Code'] if 'Error' in e.response else "Unknown"
        }
        
    except Exception as e:
        logger.error(f"Unexpected error getting recommendation summary: {str(e)}")
        return {
            "status": "error",
            "message": f"Unexpected error: {str(e)}"
        }

def get_savings_plans_recommendations(
    lookback_period: str = "SIXTY_DAYS",
    payment_option: str = "NO_UPFRONT",
    term: str = "ONE_YEAR",
    client_region: Optional[str] = None
) -> Dict[str, Any]:
    """
    Get Savings Plans recommendations from AWS Cost Optimization Hub.
    
    Args:
        lookback_period: Historical data period to analyze
        payment_option: Payment option for Savings Plans
        term: Term length for Savings Plans
        client_region: Region for the boto3 client (optional)
        
    Returns:
        Dictionary containing the Savings Plans recommendations
    """
    try:
        # Create Cost Optimization Hub client
        if client_region:
            client = boto3.client('cost-optimization-hub', region_name=client_region)
        else:
            client = boto3.client('cost-optimization-hub')
            
        # Make the API call
        response = client.get_savings_plans_recommendations(
            lookbackPeriod=lookback_period,
            paymentOption=payment_option,
            term=term
        )
            
        return {
            "status": "success",
            "data": response,
            "message": "Retrieved Savings Plans recommendations"
        }
        
    except ClientError as e:
        logger.error(f"Error getting Savings Plans recommendations: {str(e)}")
        return {
            "status": "error",
            "message": f"Error getting Savings Plans recommendations: {str(e)}",
            "error_code": e.response['Error']['Code'] if 'Error' in e.response else "Unknown"
        }
        
    except Exception as e:
        logger.error(f"Unexpected error getting Savings Plans recommendations: {str(e)}")
        return {
            "status": "error",
            "message": f"Unexpected error: {str(e)}"
        }

```

--------------------------------------------------------------------------------
/tests/run_cloudwatch_tests.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Test runner for CloudWatch optimization comprehensive testing suite.

Runs all CloudWatch-related tests including unit tests, integration tests,
performance tests, and cost constraint validation tests.
"""

import sys
import os
import subprocess
import argparse
from pathlib import Path


def run_command(cmd, description):
    """Run a command and return success status."""
    print(f"\n{'='*60}")
    print(f"Running: {description}")
    print(f"Command: {' '.join(cmd)}")
    print(f"{'='*60}")
    
    try:
        result = subprocess.run(cmd, check=True, capture_output=True, text=True)
        print("✅ PASSED")
        if result.stdout:
            print("STDOUT:", result.stdout)
        return True
    except subprocess.CalledProcessError as e:
        print("❌ FAILED")
        print("STDERR:", e.stderr)
        if e.stdout:
            print("STDOUT:", e.stdout)
        return False


def main():
    parser = argparse.ArgumentParser(description="Run CloudWatch optimization tests")
    parser.add_argument("--unit", action="store_true", help="Run only unit tests")
    parser.add_argument("--integration", action="store_true", help="Run only integration tests")
    parser.add_argument("--performance", action="store_true", help="Run only performance tests")
    parser.add_argument("--cost-validation", action="store_true", help="Run only cost validation tests")
    parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output")
    parser.add_argument("--coverage", action="store_true", help="Run with coverage reporting")
    parser.add_argument("--parallel", "-n", type=int, help="Number of parallel workers")
    
    args = parser.parse_args()
    
    # Change to tests directory
    test_dir = Path(__file__).parent
    os.chdir(test_dir)
    
    # Base pytest command
    base_cmd = ["python", "-m", "pytest"]
    
    if args.verbose:
        base_cmd.append("-v")
    
    if args.parallel:
        base_cmd.extend(["-n", str(args.parallel)])
    
    if args.coverage:
        base_cmd.extend([
            "--cov=playbooks.cloudwatch",
            "--cov=services.cloudwatch_service",
            "--cov=services.cloudwatch_pricing",
            "--cov-report=html",
            "--cov-report=term-missing"
        ])
    
    # Test categories
    test_categories = []
    
    if args.unit or not any([args.unit, args.integration, args.performance, args.cost_validation]):
        test_categories.append(("Unit Tests", [
            "unit/analyzers/test_cloudwatch_base_analyzer.py",
            "unit/analyzers/test_cloudwatch_general_spend_analyzer.py",
            "unit/analyzers/test_metrics_optimization_analyzer.py",
            "unit/analyzers/test_logs_optimization_analyzer.py",
            "unit/analyzers/test_alarms_and_dashboards_analyzer.py",
            "unit/services/test_cloudwatch_service.py",
            "unit/services/test_cloudwatch_cost_controller.py",
            "unit/services/test_cloudwatch_query_service.py"
        ]))
    
    if args.integration or not any([args.unit, args.integration, args.performance, args.cost_validation]):
        test_categories.append(("Integration Tests", [
            "integration/test_cloudwatch_orchestrator_integration.py",
            "integration/test_cloudwatch_comprehensive_tool_integration.py"
        ]))
    
    if args.performance or not any([args.unit, args.integration, args.performance, args.cost_validation]):
        test_categories.append(("Performance Tests", [
            "performance/test_cloudwatch_parallel_execution.py"
        ]))
    
    if args.cost_validation or not any([args.unit, args.integration, args.performance, args.cost_validation]):
        test_categories.append(("Cost Constraint Validation Tests", [
            "unit/test_cloudwatch_cost_constraints.py"
        ]))
    
    # Run test categories
    all_passed = True
    results = {}
    
    for category_name, test_files in test_categories:
        print(f"\n🧪 Running {category_name}")
        print("=" * 80)
        
        category_passed = True
        for test_file in test_files:
            if os.path.exists(test_file):
                cmd = base_cmd + [test_file]
                success = run_command(cmd, f"{category_name}: {test_file}")
                if not success:
                    category_passed = False
                    all_passed = False
            else:
                print(f"⚠️  Test file not found: {test_file}")
                category_passed = False
                all_passed = False
        
        results[category_name] = category_passed
    
    # Run specific CloudWatch marker tests
    print(f"\n🧪 Running CloudWatch-specific marker tests")
    print("=" * 80)
    
    marker_tests = [
        ("No-Cost Validation", ["-m", "no_cost_validation"]),
        ("CloudWatch Unit Tests", ["-m", "unit and cloudwatch"]),
        ("CloudWatch Integration Tests", ["-m", "integration and cloudwatch"]),
        ("CloudWatch Performance Tests", ["-m", "performance and cloudwatch"])
    ]
    
    for test_name, marker_args in marker_tests:
        cmd = base_cmd + marker_args
        success = run_command(cmd, test_name)
        if not success:
            all_passed = False
        results[test_name] = success
    
    # Summary
    print(f"\n{'='*80}")
    print("TEST SUMMARY")
    print(f"{'='*80}")
    
    for category, passed in results.items():
        status = "✅ PASSED" if passed else "❌ FAILED"
        print(f"{category:<40} {status}")
    
    overall_status = "✅ ALL TESTS PASSED" if all_passed else "❌ SOME TESTS FAILED"
    print(f"\nOverall Result: {overall_status}")
    
    if args.coverage and all_passed:
        print(f"\n📊 Coverage report generated in htmlcov/index.html")
    
    return 0 if all_passed else 1


if __name__ == "__main__":
    sys.exit(main())
```

--------------------------------------------------------------------------------
/services/cost_explorer.py:
--------------------------------------------------------------------------------

```python
"""
AWS Cost Explorer service module.

This module provides functions for interacting with the AWS Cost Explorer API.
"""

import logging
from typing import Dict, List, Optional, Any
import boto3
from botocore.exceptions import ClientError

logger = logging.getLogger(__name__)

def get_cost_and_usage(
    start_date: str,
    end_date: str,
    granularity: str = "MONTHLY",
    metrics: List[str] = None,
    group_by: Optional[List[Dict[str, str]]] = None,
    filter_expr: Optional[Dict[str, Any]] = None,
    region: Optional[str] = None
) -> Dict[str, Any]:
    """
    Retrieve cost and usage data from AWS Cost Explorer.
    
    Args:
        start_date: Start date in YYYY-MM-DD format
        end_date: End date in YYYY-MM-DD format
        granularity: Time granularity (DAILY, MONTHLY, HOURLY)
        metrics: List of cost metrics to retrieve
        group_by: Optional grouping dimensions
        filter_expr: Optional filters
        region: AWS region (optional)
        
    Returns:
        Dictionary containing the Cost Explorer API response
    """
    try:
        # Set default metrics if not provided
        if metrics is None:
            metrics = ["BlendedCost", "UnblendedCost"]
            
        # Create Cost Explorer client
        if region:
            ce_client = boto3.client('ce', region_name=region)
        else:
            ce_client = boto3.client('ce')
            
        # Prepare the request parameters
        params = {
            'TimePeriod': {
                'Start': start_date,
                'End': end_date
            },
            'Granularity': granularity,
            'Metrics': metrics
        }
        
        # Add optional parameters if provided
        if group_by:
            params['GroupBy'] = group_by
            
        if filter_expr:
            params['Filter'] = filter_expr
            
        # Make the API call
        response = ce_client.get_cost_and_usage(**params)
        
        return {
            "status": "success",
            "data": response,
            "message": f"Retrieved cost data from {start_date} to {end_date}"
        }
        
    except ClientError as e:
        logger.error(f"Error in Cost Explorer API: {str(e)}")
        return {
            "status": "error",
            "message": f"Cost Explorer API error: {str(e)}",
            "error_code": e.response['Error']['Code'] if 'Error' in e.response else "Unknown"
        }
        
    except Exception as e:
        logger.error(f"Unexpected error in Cost Explorer service: {str(e)}")
        return {
            "status": "error",
            "message": f"Unexpected error: {str(e)}"
        }

def get_cost_forecast(
    start_date: str,
    end_date: str,
    granularity: str = "MONTHLY",
    metric: str = "BLENDED_COST",
    filter_expr: Optional[Dict[str, Any]] = None,
    region: Optional[str] = None
) -> Dict[str, Any]:
    """
    Get a cost forecast from AWS Cost Explorer.
    
    Args:
        start_date: Start date in YYYY-MM-DD format
        end_date: End date in YYYY-MM-DD format
        granularity: Time granularity (DAILY, MONTHLY)
        metric: Cost metric to forecast
        filter_expr: Optional filters
        region: AWS region (optional)
        
    Returns:
        Dictionary containing the Cost Explorer forecast response
    """
    try:
        # Create Cost Explorer client
        if region:
            ce_client = boto3.client('ce', region_name=region)
        else:
            ce_client = boto3.client('ce')
            
        # Prepare the request parameters
        params = {
            'TimePeriod': {
                'Start': start_date,
                'End': end_date
            },
            'Granularity': granularity,
            'Metric': metric
        }
        
        # Add optional filter if provided
        if filter_expr:
            params['Filter'] = filter_expr
            
        # Make the API call
        response = ce_client.get_cost_forecast(**params)
        
        return {
            "status": "success",
            "data": response,
            "message": f"Retrieved cost forecast from {start_date} to {end_date}"
        }
        
    except ClientError as e:
        logger.error(f"Error in Cost Explorer forecast API: {str(e)}")
        return {
            "status": "error",
            "message": f"Cost Explorer forecast API error: {str(e)}",
            "error_code": e.response['Error']['Code'] if 'Error' in e.response else "Unknown"
        }
        
    except Exception as e:
        logger.error(f"Unexpected error in Cost Explorer forecast service: {str(e)}")
        return {
            "status": "error",
            "message": f"Unexpected error: {str(e)}"
        }

def get_cost_categories(
    region: Optional[str] = None
) -> Dict[str, Any]:
    """
    List cost categories from AWS Cost Explorer.
    
    Args:
        region: AWS region (optional)
        
    Returns:
        Dictionary containing the cost categories
    """
    try:
        # Create Cost Explorer client
        if region:
            ce_client = boto3.client('ce', region_name=region)
        else:
            ce_client = boto3.client('ce')
            
        # Make the API call
        response = ce_client.list_cost_category_definitions()
        
        return {
            "status": "success",
            "data": response,
            "message": f"Retrieved {len(response.get('CostCategoryReferences', []))} cost categories"
        }
        
    except ClientError as e:
        logger.error(f"Error listing cost categories: {str(e)}")
        return {
            "status": "error",
            "message": f"Error listing cost categories: {str(e)}",
            "error_code": e.response['Error']['Code'] if 'Error' in e.response else "Unknown"
        }
        
    except Exception as e:
        logger.error(f"Unexpected error listing cost categories: {str(e)}")
        return {
            "status": "error",
            "message": f"Unexpected error: {str(e)}"
        }

```

--------------------------------------------------------------------------------
/tests/integration/cloudwatch/test_cloudwatch_integration.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Integration tests for CloudWatch functionality.
"""

import asyncio
import json
import sys
import os
import pytest

# Add the project root to the path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../../..'))

from runbook_functions import run_cloudwatch_general_spend_analysis


@pytest.mark.asyncio
async def test_cloudwatch_timeout():
    """Test CloudWatch general spend analysis to replicate timeout issue."""
    print("Testing CloudWatch general spend analysis timeout issue...")
    
    try:
        # Test with minimal parameters that should trigger the timeout
        arguments = {
            "region": "us-east-1",
            "lookback_days": 7,
            "page": 1
        }
        
        print(f"Calling run_cloudwatch_general_spend_analysis with: {arguments}")
        
        # This should timeout and we should get a full stack trace
        result = await run_cloudwatch_general_spend_analysis(arguments)
        
        print("Result received:")
        for content in result:
            print(content.text)
            
        return True
        
    except Exception as e:
        print(f"Exception caught in test: {str(e)}")
        print("Full stack trace:")
        import traceback
        traceback.print_exc()
        return False


@pytest.mark.asyncio
async def test_stack_trace_capture():
    """Test that CloudWatch functions handle errors gracefully with structured responses."""
    print("Testing CloudWatch error handling...")
    
    # Test with invalid arguments that will cause an error
    arguments = {
        "region": "us-east-1",
        "lookback_days": "invalid_string",  # This should cause a type error
        "page": 1
    }
    
    print(f"Calling run_cloudwatch_general_spend_analysis with invalid lookback_days: {arguments}")
    
    try:
        result = await run_cloudwatch_general_spend_analysis(arguments)
        
        print("Result received:")
        for content in result:
            result_text = content.text
            print(result_text)
            
            # Parse the JSON response to check for proper error handling
            import json
            try:
                response_data = json.loads(result_text)
                
                # Check if it's a proper error response with structured format
                if (response_data.get('status') == 'error' and 
                    'error_message' in response_data and
                    'analysis_type' in response_data and
                    'timestamp' in response_data):
                    print("✅ SUCCESS: Structured error response found")
                    return True
                else:
                    print("❌ FAILURE: Invalid error response structure")
                    return False
                    
            except json.JSONDecodeError:
                print("❌ FAILURE: Response is not valid JSON")
                return False
                
    except Exception as e:
        print(f"❌ FAILURE: Exception not handled properly: {str(e)}")
        return False


def test_pricing_cache():
    """Test that pricing calls are cached and don't block."""
    print("Testing CloudWatch pricing cache fix...")
    
    try:
        from services.cloudwatch_pricing import CloudWatchPricing
        import time
        
        # Initialize pricing service
        pricing = CloudWatchPricing(region='us-east-1')
        
        # First call - should use fallback pricing and cache it
        print("Making first pricing call...")
        start_time = time.time()
        result1 = pricing.get_metrics_pricing()
        first_call_time = time.time() - start_time
        
        print(f"First call took {first_call_time:.3f} seconds")
        print(f"Status: {result1.get('status')}")
        print(f"Source: {result1.get('source')}")
        
        # Second call - should use cache and be instant
        print("\nMaking second pricing call...")
        start_time = time.time()
        result2 = pricing.get_metrics_pricing()
        second_call_time = time.time() - start_time
        
        print(f"Second call took {second_call_time:.3f} seconds")
        print(f"Status: {result2.get('status')}")
        print(f"Source: {result2.get('source')}")
        
        # Verify caching worked
        if second_call_time < 0.001:  # Should be nearly instant
            print("✅ SUCCESS: Caching is working - second call was instant")
            return True
        else:
            print("❌ FAILURE: Caching not working - second call took too long")
            return False
            
    except Exception as e:
        print(f"❌ Error in pricing cache test: {str(e)}")
        return False


async def run_cloudwatch_integration_tests():
    """Run all CloudWatch integration tests."""
    print("Starting CloudWatch Integration Tests")
    print("=" * 50)
    
    tests = [
        ("CloudWatch Timeout Handling", test_cloudwatch_timeout),
        ("Error Handling", test_stack_trace_capture),
        ("Pricing Cache", test_pricing_cache),
    ]
    
    passed = 0
    failed = 0
    
    for test_name, test_func in tests:
        try:
            if asyncio.iscoroutinefunction(test_func):
                result = await test_func()
            else:
                result = test_func()
                
            if result:
                print(f"✓ PASS: {test_name}")
                passed += 1
            else:
                print(f"✗ FAIL: {test_name}")
                failed += 1
        except Exception as e:
            print(f"✗ FAIL: {test_name} - Exception: {e}")
            failed += 1
    
    print("=" * 50)
    print(f"CloudWatch Integration Tests: {passed + failed} total, {passed} passed, {failed} failed")
    
    if failed == 0:
        print("🎉 ALL CLOUDWATCH INTEGRATION TESTS PASSED!")
        return True
    else:
        print(f"❌ {failed} CLOUDWATCH INTEGRATION TESTS FAILED")
        return False

if __name__ == "__main__":
    success = asyncio.run(run_cloudwatch_integration_tests())
    sys.exit(0 if success else 1)
```

--------------------------------------------------------------------------------
/tests/unit/s3/live/test_s3_governance_bucket_discovery.py:
--------------------------------------------------------------------------------

```python
"""
Live test to debug S3 governance check bucket discovery issue.

This test will help identify why s3_governance_check returns 0 buckets
when there are actually 40+ buckets in the account.
"""

import asyncio
import logging
import pytest
from typing import Dict, Any

# Set up logging to see debug messages
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

@pytest.mark.live
async def test_s3_bucket_discovery_debug():
    """
    Debug the S3 bucket discovery mechanism used by governance check.
    
    This test will step through the bucket discovery process to identify
    where the silent failure is occurring.
    """
    
    # Test 1: Direct S3Service bucket listing
    logger.info("=== Test 1: Direct S3Service bucket listing ===")
    try:
        from services.s3_service import S3Service
        
        s3_service = S3Service(region='us-east-1')
        logger.info(f"S3Service initialized: {s3_service}")
        
        # Test the list_buckets method directly
        buckets_result = await s3_service.list_buckets()
        logger.info(f"S3Service.list_buckets() result: {buckets_result}")
        
        if buckets_result.get("status") == "success":
            buckets = buckets_result.get("data", {}).get("Buckets", [])
            logger.info(f"Found {len(buckets)} buckets via S3Service")
            for i, bucket in enumerate(buckets[:5]):  # Show first 5
                logger.info(f"  Bucket {i+1}: {bucket.get('Name')} (Region: {bucket.get('Region', 'unknown')})")
        else:
            logger.error(f"S3Service.list_buckets() failed: {buckets_result}")
            
    except Exception as e:
        logger.error(f"Error in S3Service test: {str(e)}")
    
    # Test 2: Direct boto3 client call
    logger.info("\n=== Test 2: Direct boto3 client call ===")
    try:
        import boto3
        from botocore.exceptions import ClientError
        
        s3_client = boto3.client('s3', region_name='us-east-1')
        logger.info(f"Boto3 S3 client created: {s3_client}")
        
        # Direct list_buckets call
        response = s3_client.list_buckets()
        buckets = response.get('Buckets', [])
        logger.info(f"Found {len(buckets)} buckets via direct boto3 call")
        
        for i, bucket in enumerate(buckets[:5]):  # Show first 5
            logger.info(f"  Bucket {i+1}: {bucket.get('Name')} (Created: {bucket.get('CreationDate')})")
            
    except ClientError as e:
        logger.error(f"AWS ClientError in direct boto3 test: {e}")
    except Exception as e:
        logger.error(f"Error in direct boto3 test: {str(e)}")
    
    # Test 3: GovernanceAnalyzer bucket discovery
    logger.info("\n=== Test 3: GovernanceAnalyzer bucket discovery ===")
    try:
        from playbooks.s3.analyzers.governance_analyzer import GovernanceAnalyzer
        from services.s3_service import S3Service
        
        s3_service = S3Service(region='us-east-1')
        analyzer = GovernanceAnalyzer(s3_service=s3_service)
        logger.info(f"GovernanceAnalyzer initialized: {analyzer}")
        
        # Test the _get_buckets_to_analyze method
        context = {'region': 'us-east-1'}
        buckets_to_analyze = await analyzer._get_buckets_to_analyze(context)
        logger.info(f"GovernanceAnalyzer._get_buckets_to_analyze() returned: {len(buckets_to_analyze)} buckets")
        
        for i, bucket_name in enumerate(buckets_to_analyze[:5]):  # Show first 5
            logger.info(f"  Bucket {i+1}: {bucket_name}")
            
    except Exception as e:
        logger.error(f"Error in GovernanceAnalyzer test: {str(e)}")
    
    # Test 4: Full governance analysis
    logger.info("\n=== Test 4: Full governance analysis ===")
    try:
        from playbooks.s3.s3_optimization_orchestrator import S3OptimizationOrchestrator
        
        orchestrator = S3OptimizationOrchestrator(region='us-east-1')
        logger.info(f"S3OptimizationOrchestrator initialized: {orchestrator}")
        
        # Execute governance analysis
        result = await orchestrator.execute_analysis("governance", region='us-east-1')
        logger.info(f"Governance analysis result status: {result.get('status')}")
        logger.info(f"Total buckets analyzed: {result.get('data', {}).get('total_buckets_analyzed', 0)}")
        
        if result.get('status') == 'error':
            logger.error(f"Governance analysis error: {result.get('message')}")
        
    except Exception as e:
        logger.error(f"Error in full governance analysis test: {str(e)}")
    
    # Test 5: Check AWS credentials and permissions
    logger.info("\n=== Test 5: AWS credentials and permissions check ===")
    try:
        import boto3
        
        # Check STS identity
        sts_client = boto3.client('sts', region_name='us-east-1')
        identity = sts_client.get_caller_identity()
        logger.info(f"AWS Identity: {identity}")
        
        # Test S3 permissions
        s3_client = boto3.client('s3', region_name='us-east-1')
        
        # Test list_buckets permission
        try:
            response = s3_client.list_buckets()
            logger.info(f"list_buckets permission: OK ({len(response.get('Buckets', []))} buckets)")
        except ClientError as e:
            logger.error(f"list_buckets permission: DENIED - {e}")
        
        # Test get_bucket_location permission on first bucket
        try:
            response = s3_client.list_buckets()
            if response.get('Buckets'):
                first_bucket = response['Buckets'][0]['Name']
                location = s3_client.get_bucket_location(Bucket=first_bucket)
                logger.info(f"get_bucket_location permission: OK (tested on {first_bucket})")
            else:
                logger.warning("No buckets to test get_bucket_location permission")
        except ClientError as e:
            logger.error(f"get_bucket_location permission: DENIED - {e}")
            
    except Exception as e:
        logger.error(f"Error in credentials/permissions test: {str(e)}")

if __name__ == "__main__":
    # Run the test directly
    asyncio.run(test_s3_bucket_discovery_debug())
```

--------------------------------------------------------------------------------
/tests/unit/analyzers/conftest_cloudwatch.py:
--------------------------------------------------------------------------------

```python
"""
CloudWatch-specific pytest configuration and fixtures.

This module provides CloudWatch-specific fixtures for testing analyzers and services.
"""

import pytest
import boto3
from unittest.mock import Mock, AsyncMock
from datetime import datetime, timedelta
from moto import mock_aws

from services.cloudwatch_service import CloudWatchOperationResult


@pytest.fixture
def mock_aws_credentials():
    """Mock AWS credentials for testing."""
    import os
    with patch.dict(os.environ, {
        'AWS_ACCESS_KEY_ID': 'testing',
        'AWS_SECRET_ACCESS_KEY': 'testing',
        'AWS_SECURITY_TOKEN': 'testing',
        'AWS_SESSION_TOKEN': 'testing',
        'AWS_DEFAULT_REGION': 'us-east-1'
    }):
        yield


@pytest.fixture
def mock_cloudwatch_client(mock_aws_credentials):
    """Mock CloudWatch client with moto."""
    with mock_aws():
        yield boto3.client('cloudwatch', region_name='us-east-1')


@pytest.fixture
def mock_logs_client(mock_aws_credentials):
    """Mock CloudWatch Logs client with moto."""
    with mock_aws():
        yield boto3.client('logs', region_name='us-east-1')


@pytest.fixture
def mock_ce_client(mock_aws_credentials):
    """Mock Cost Explorer client with moto."""
    with mock_aws():
        yield boto3.client('ce', region_name='us-east-1')


@pytest.fixture
def sample_cloudwatch_cost_data():
    """Sample CloudWatch Cost Explorer response data."""
    return {
        "ResultsByTime": [
            {
                "TimePeriod": {
                    "Start": "2024-01-01",
                    "End": "2024-01-02"
                },
                "Groups": [
                    {
                        "Keys": ["DataIngestion-Bytes"],
                        "Metrics": {
                            "UnblendedCost": {"Amount": "5.25", "Unit": "USD"},
                            "UsageQuantity": {"Amount": "10.5", "Unit": "GB"}
                        }
                    },
                    {
                        "Keys": ["DataStorage-ByteHrs"],
                        "Metrics": {
                            "UnblendedCost": {"Amount": "2.10", "Unit": "USD"},
                            "UsageQuantity": {"Amount": "70.0", "Unit": "GB-Hours"}
                        }
                    }
                ]
            }
        ]
    }


@pytest.fixture
def sample_cloudwatch_alarms():
    """Sample CloudWatch alarms data."""
    return [
        {
            "AlarmName": "test-alarm-1",
            "AlarmDescription": "Test alarm with actions",
            "StateValue": "OK",
            "AlarmActions": ["arn:aws:sns:us-east-1:123456789012:test-topic"],
            "Period": 300,
            "MetricName": "CPUUtilization"
        },
        {
            "AlarmName": "test-alarm-2",
            "AlarmDescription": "Test alarm without actions",
            "StateValue": "INSUFFICIENT_DATA",
            "AlarmActions": [],
            "Period": 60,  # High resolution
            "MetricName": "NetworkIn"
        }
    ]


@pytest.fixture
def sample_cloudwatch_log_groups():
    """Sample CloudWatch log groups data."""
    return [
        {
            "logGroupName": "/aws/lambda/test-function",
            "creationTime": int((datetime.now() - timedelta(days=30)).timestamp() * 1000),
            "retentionInDays": 14,
            "storedBytes": 1024000
        },
        {
            "logGroupName": "/aws/apigateway/test-api",
            "creationTime": int((datetime.now() - timedelta(days=400)).timestamp() * 1000),
            "storedBytes": 2048000
            # No retention policy
        }
    ]


@pytest.fixture
def mock_cloudwatch_pricing_service():
    """Mock CloudWatch pricing service instance."""
    service = Mock()
    service.region = "us-east-1"
    
    def mock_get_logs_pricing():
        return {
            "status": "success",
            "logs_pricing": {
                "ingestion_per_gb": 0.50,
                "storage_per_gb_month": 0.03,
                "insights_per_gb_scanned": 0.005
            }
        }
    
    def mock_calculate_logs_cost(log_groups_data):
        total_cost = 0.0
        for log_group in log_groups_data:
            stored_gb = log_group.get('storedBytes', 0) / (1024**3)
            total_cost += stored_gb * 0.03
        
        return {
            "status": "success",
            "total_monthly_cost": total_cost,
            "cost_breakdown": {
                "storage_cost": total_cost,
                "ingestion_cost": 0.0,
                "insights_cost": 0.0
            }
        }
    
    service.get_logs_pricing = mock_get_logs_pricing
    service.calculate_logs_cost = mock_calculate_logs_cost
    
    return service


@pytest.fixture
def mock_cloudwatch_service():
    """Mock CloudWatch service instance."""
    service = Mock()
    service.region = "us-east-1"
    service.operation_count = 0
    service.cost_incurring_operations = []
    service.total_execution_time = 0.0
    
    # Mock async methods
    async def mock_list_metrics(namespace=None, metric_name=None, dimensions=None):
        return CloudWatchOperationResult(
            success=True,
            data={
                'metrics': [
                    {'Namespace': 'AWS/EC2', 'MetricName': 'CPUUtilization'},
                    {'Namespace': 'Custom/App', 'MetricName': 'RequestCount'}
                ],
                'total_count': 2
            },
            operation_name='list_metrics',
            operation_type='free'
        )
    
    async def mock_describe_alarms(alarm_names=None):
        return CloudWatchOperationResult(
            success=True,
            data={
                'alarms': [
                    {
                        'AlarmName': 'test-alarm',
                        'StateValue': 'OK',
                        'AlarmActions': ['arn:aws:sns:us-east-1:123456789012:test-topic'],
                        'Period': 300
                    }
                ],
                'total_count': 1,
                'analysis': {
                    'total_alarms': 1,
                    'alarms_by_state': {'OK': 1},
                    'alarms_without_actions': []
                }
            },
            operation_name='describe_alarms',
            operation_type='free'
        )
    
    service.list_metrics = mock_list_metrics
    service.describe_alarms = mock_describe_alarms
    
    return service
```

--------------------------------------------------------------------------------
/test_runbooks.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Test script for CFM Tips AWS Cost Optimization MCP Server
"""

import sys
import os

# Add current directory to path
sys.path.append(os.path.dirname(os.path.abspath(__file__)))

def test_imports():
    """Test that all imports work correctly."""
    print("Testing imports...")
    
    try:
        # Test MCP server imports
        from mcp.server import Server
        from mcp.server.stdio import stdio_server
        from mcp.types import Tool, TextContent
        print("✅ MCP imports successful")
        
        # Test AWS imports
        import boto3
        from botocore.exceptions import ClientError, NoCredentialsError
        print("✅ AWS imports successful")
        
        # Test runbook functions import
        from runbook_functions import (
            run_ec2_right_sizing_analysis,
            generate_ec2_right_sizing_report,
            run_ebs_optimization_analysis,
            identify_unused_ebs_volumes,
            generate_ebs_optimization_report,
            run_rds_optimization_analysis,
            identify_idle_rds_instances,
            generate_rds_optimization_report,
            run_lambda_optimization_analysis,
            identify_unused_lambda_functions,
            generate_lambda_optimization_report,
            run_comprehensive_cost_analysis,
            get_management_trails,
            run_cloudtrail_trails_analysis,
            generate_cloudtrail_report
        )
        print("✅ Runbook functions import successful")
        
        return True
        
    except ImportError as e:
        print(f"❌ Import error: {e}")
        return False
    except Exception as e:
        print(f"❌ Unexpected error: {e}")
        return False

def test_server_creation():
    """Test that the MCP server can be created."""
    print("\nTesting server creation...")
    
    try:
        # Import the server module
        import mcp_server_with_runbooks
        print("✅ Server module imported successfully")
        
        # Check if server is created
        if hasattr(mcp_server_with_runbooks, 'server'):
            print("✅ Server object created successfully")
            
            # Check server name
            if mcp_server_with_runbooks.server.name == "cfm_tips":
                print("✅ Server name is correct: cfm_tips")
            else:
                print(f"⚠️  Server name: {mcp_server_with_runbooks.server.name}")
            
            return True
        else:
            print("❌ Server object not found")
            return False
            
    except Exception as e:
        print(f"❌ Server creation error: {str(e)}")
        return False

def test_cloudtrail_functions():
    """Test CloudTrail optimization functions."""
    print("\nTesting CloudTrail functions...")
    
    try:
        from runbook_functions import (
            get_management_trails,
            run_cloudtrail_trails_analysis,
            generate_cloudtrail_report
        )
        print("✅ CloudTrail functions imported successfully")
        
        # Test function signatures
        import inspect
        
        # Check get_management_trails
        sig = inspect.signature(get_management_trails)
        if 'arguments' in sig.parameters:
            print("✅ get_management_trails has correct signature")
        else:
            print("❌ get_management_trails signature incorrect")
            return False
            
        # Check run_cloudtrail_trails_analysis
        sig = inspect.signature(run_cloudtrail_trails_analysis)
        if 'arguments' in sig.parameters:
            print("✅ run_cloudtrail_trails_analysis has correct signature")
        else:
            print("❌ run_cloudtrail_trails_analysis signature incorrect")
            return False
            
        # Check generate_cloudtrail_report
        sig = inspect.signature(generate_cloudtrail_report)
        if 'arguments' in sig.parameters:
            print("✅ generate_cloudtrail_report has correct signature")
        else:
            print("❌ generate_cloudtrail_report signature incorrect")
            return False
            
        return True
        
    except ImportError as e:
        print(f"❌ CloudTrail import error: {e}")
        return False
    except Exception as e:
        print(f"❌ CloudTrail test error: {e}")
        return False

def test_tool_names():
    """Test that tool names are within MCP limits."""
    print("\nTesting tool name lengths...")
    
    server_name = "cfm_tips"
    sample_tools = [
        "ec2_rightsizing",
        "ebs_optimization", 
        "rds_idle",
        "lambda_unused",
        "comprehensive_analysis",
        "get_coh_recommendations",
        "cloudtrail_optimization"
    ]
    
    max_length = 0
    for tool in sample_tools:
        combined = f"{server_name}___{tool}"
        length = len(combined)
        max_length = max(max_length, length)
        
        if length > 64:
            print(f"❌ Tool name too long: {combined} ({length} chars)")
            return False
    
    print(f"✅ All tool names within limit (max: {max_length} chars)")
    return True

def main():
    """Run all tests."""
    print("CFM Tips AWS Cost Optimization MCP Server - Integration Test")
    print("=" * 65)
    
    tests_passed = 0
    total_tests = 4
    
    # Test imports
    if test_imports():
        tests_passed += 1
    
    # Test server creation
    if test_server_creation():
        tests_passed += 1
    
    # Test CloudTrail functions
    if test_cloudtrail_functions():
        tests_passed += 1
    
    # Test tool names
    if test_tool_names():
        tests_passed += 1
    
    print(f"\n" + "=" * 65)
    print(f"Tests passed: {tests_passed}/{total_tests}")
    
    if tests_passed == total_tests:
        print("✅ All integration tests passed!")
        print("\nNext steps:")
        print("1. Configure AWS credentials: aws configure")
        print("2. Apply the correct IAM permissions (see CORRECTED_PERMISSIONS.md)")
        print("3. Start the server: q chat --mcp-config \"$(pwd)/mcp_runbooks.json\"")
        print("4. Test with: \"Run comprehensive cost analysis for us-east-1\"")
        print("\n🎉 CFM Tips is ready to help optimize your AWS costs!")
        return True
    else:
        print("❌ Some tests failed. Check the errors above.")
        return False

if __name__ == "__main__":
    success = main()
    sys.exit(0 if success else 1)

```

--------------------------------------------------------------------------------
/tests/legacy/test_runbook_integration.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""

This script tests that the runbook functions work correctly with the new S3OptimizationOrchestrator.
"""

import asyncio
import json
import logging
import sys
from typing import Dict, Any

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

async def test_s3_playbook_functions():
    """Test S3 playbook functions with orchestrator."""
    logger.info("=== Testing S3 Playbook Functions ===")
    
    try:
        # Import S3 functions directly from orchestrator
        from playbooks.s3.s3_optimization_orchestrator import (
            run_s3_quick_analysis,
            run_s3_general_spend_analysis,
            run_s3_comprehensive_optimization_tool
        )
        
        # Test arguments for S3 functions
        test_args = {
            "region": "us-east-1",
            "lookback_days": 7,
            "timeout_seconds": 30,
            "store_results": True
        }
        
        # Test general spend analysis
        logger.info("Testing general spend analysis...")
        result = await run_s3_general_spend_analysis(test_args)
        
        if not result or not isinstance(result, list):
            logger.error("✗ General spend analysis returned invalid result")
            return False
        
        # Parse the result
        try:
            result_data = json.loads(result[0].text)
            if result_data.get("status") not in ["success", "error"]:
                logger.error(f"✗ Unexpected status: {result_data.get('status')}")
                return False
            logger.info(f"✓ General spend analysis: {result_data.get('status')}")
        except Exception as e:
            logger.error(f"✗ Failed to parse result: {e}")
            return False
        
        # Test comprehensive analysis
        logger.info("Testing comprehensive analysis...")
        comprehensive_args = test_args.copy()
        comprehensive_args["timeout_seconds"] = 60
        
        result = await run_s3_comprehensive_optimization_tool(comprehensive_args)
        
        if not result or not isinstance(result, list):
            logger.error("✗ Comprehensive analysis returned invalid result")
            return False
        
        try:
            result_data = json.loads(result[0].text)
            if result_data.get("status") not in ["success", "error"]:
                logger.error(f"✗ Unexpected comprehensive status: {result_data.get('status')}")
                return False
            logger.info(f"✓ Comprehensive analysis: {result_data.get('status')}")
        except Exception as e:
            logger.error(f"✗ Failed to parse comprehensive result: {e}")
            return False
        
        logger.info("✓ All S3 runbook functions working with new orchestrator")
        return True
        
    except Exception as e:
        logger.error(f"✗ S3 runbook function test failed: {e}")
        return False

async def test_session_data_storage():
    """Test that session data is being stored correctly."""
    logger.info("=== Testing Session Data Storage ===")
    
    try:
        from playbooks.s3.s3_optimization_orchestrator import S3OptimizationOrchestrator
        
        orchestrator = S3OptimizationOrchestrator(region="us-east-1")
        
        # Run an analysis that should store data
        result = await orchestrator.execute_analysis(
            analysis_type="general_spend",
            region="us-east-1",
            lookback_days=7,
            store_results=True
        )
        
        if result.get("status") != "success":
            logger.warning(f"Analysis not successful: {result.get('status')}")
            return True  # Still pass if analysis runs but has issues
        
        # Check that tables were created
        tables = orchestrator.get_stored_tables()
        if not tables:
            logger.warning("No tables found after analysis")
            return True  # Still pass - may be expected in test environment
        
        logger.info(f"✓ Session data storage working: {len(tables)} tables created")
        return True
        
    except Exception as e:
        logger.error(f"✗ Session data storage test failed: {e}")
        return False

async def test_no_cost_compliance():
    """Test that no cost-incurring operations are performed."""
    logger.info("=== Testing No-Cost Compliance ===")
    
    try:
        from services.s3_service import S3Service
        
        service = S3Service(region="us-east-1")
        
        # Check operation stats
        stats = service.get_operation_stats()
        
        # Verify only allowed operations were called
        forbidden_ops = {'list_objects', 'list_objects_v2', 'head_object', 'get_object'}
        called_forbidden = set(stats.keys()).intersection(forbidden_ops)
        
        if called_forbidden:
            logger.error(f"✗ Forbidden operations called: {called_forbidden}")
            return False
        
        logger.info(f"✓ No-cost compliance verified: {len(stats)} allowed operations called")
        return True
        
    except Exception as e:
        logger.error(f"✗ No-cost compliance test failed: {e}")
        return False

async def run_integration_tests():
    """Run all integration tests."""
    logger.info("Starting Runbook Integration Tests")
    logger.info("=" * 50)
    
    tests = [
        ("S3 Playbook Functions", test_s3_playbook_functions),
        ("Session Data Storage", test_session_data_storage),
        ("No-Cost Compliance", test_no_cost_compliance),
    ]
    
    passed = 0
    failed = 0
    
    for test_name, test_func in tests:
        try:
            result = await test_func()
            if result:
                logger.info(f"✓ PASS: {test_name}")
                passed += 1
            else:
                logger.error(f"✗ FAIL: {test_name}")
                failed += 1
        except Exception as e:
            logger.error(f"✗ FAIL: {test_name} - Exception: {e}")
            failed += 1
    
    logger.info("=" * 50)
    logger.info(f"Integration Tests: {passed + failed} total, {passed} passed, {failed} failed")
    
    if failed == 0:
        logger.info("🎉 ALL INTEGRATION TESTS PASSED!")
        return True
    else:
        logger.error(f"❌ {failed} INTEGRATION TESTS FAILED")
        return False

if __name__ == "__main__":
    success = asyncio.run(run_integration_tests())
    sys.exit(0 if success else 1)
```

--------------------------------------------------------------------------------
/tests/unit/cloudwatch/test_cache_control.py:
--------------------------------------------------------------------------------

```python
"""
Test Cache Control for CloudWatch Optimization

Demonstrates how to control caching behavior for testing purposes.
"""

import pytest
import os
from utils.cache_decorator import (
    dao_cache,
    is_cache_enabled,
    enable_cache,
    disable_cache,
    clear_cache,
    get_cache_stats
)


class TestCacheControl:
    """Test cache control functionality."""
    
    def test_cache_enabled_by_default(self):
        """Test that cache is enabled by default."""
        # Cache should be enabled by default (unless CFM_ENABLE_CACHE=false)
        assert is_cache_enabled() in (True, False)  # Depends on environment
    
    def test_disable_cache_programmatically(self):
        """Test disabling cache programmatically."""
        # Save original state
        original_state = is_cache_enabled()
        
        try:
            # Disable cache
            disable_cache()
            assert is_cache_enabled() is False
            
            # Enable cache
            enable_cache()
            assert is_cache_enabled() is True
        finally:
            # Restore original state
            if original_state:
                enable_cache()
            else:
                disable_cache()
    
    def test_cache_decorator_respects_global_setting(self):
        """Test that decorator respects global cache setting."""
        call_count = 0
        
        @dao_cache(ttl_seconds=60)
        def test_function(value):
            nonlocal call_count
            call_count += 1
            return value * 2
        
        # Save original state
        original_state = is_cache_enabled()
        
        try:
            # Test with cache enabled
            enable_cache()
            clear_cache()
            call_count = 0
            
            result1 = test_function(5)
            result2 = test_function(5)
            
            assert result1 == 10
            assert result2 == 10
            assert call_count == 1  # Should only call once due to caching
            
            # Test with cache disabled
            disable_cache()
            clear_cache()
            call_count = 0
            
            result1 = test_function(5)
            result2 = test_function(5)
            
            assert result1 == 10
            assert result2 == 10
            assert call_count == 2  # Should call twice without caching
        finally:
            # Restore original state
            if original_state:
                enable_cache()
            else:
                disable_cache()
    
    def test_cache_decorator_with_enabled_parameter(self):
        """Test that decorator enabled parameter overrides global setting."""
        call_count = 0
        
        @dao_cache(ttl_seconds=60, enabled=False)
        def always_uncached(value):
            nonlocal call_count
            call_count += 1
            return value * 2
        
        # Save original state
        original_state = is_cache_enabled()
        
        try:
            # Even with cache enabled globally, this function should not cache
            enable_cache()
            clear_cache()
            call_count = 0
            
            result1 = always_uncached(5)
            result2 = always_uncached(5)
            
            assert result1 == 10
            assert result2 == 10
            assert call_count == 2  # Should call twice (caching disabled)
        finally:
            # Restore original state
            if original_state:
                enable_cache()
            else:
                disable_cache()
    
    def test_cache_stats(self):
        """Test cache statistics.
        
        NOTE: This test uses 'page' parameter which is in the important_params list
        of _generate_cache_key(). Using other parameters may not generate unique
        cache keys due to the selective parameter inclusion in the cache decorator.
        """
        # Save original state
        original_state = is_cache_enabled()
        
        try:
            enable_cache()
            clear_cache()
            
            # Define function after clearing cache to ensure clean state
            # Use 'page' parameter which is in the cache decorator's important_params list
            @dao_cache(ttl_seconds=60)
            def test_function(page=1):
                return page * 2
            
            # Make some calls using page parameter (which IS in important_params)
            result1 = test_function(page=1)  # MISS
            result2 = test_function(page=1)  # HIT
            result3 = test_function(page=2)  # MISS
            result4 = test_function(page=2)  # HIT
            
            # Verify results are correct
            assert result1 == 2
            assert result2 == 2
            assert result3 == 4
            assert result4 == 4
            
            stats = get_cache_stats()
            
            assert 'hits' in stats
            assert 'misses' in stats
            assert 'hit_rate' in stats
            assert 'enabled' in stats
            assert stats['enabled'] is True
            # The test expects exactly 2 hits and 2 misses
            assert stats['hits'] == 2, f"Expected 2 hits but got {stats['hits']}"
            assert stats['misses'] == 2, f"Expected 2 misses but got {stats['misses']}"
        finally:
            # Restore original state
            clear_cache()
            if original_state:
                enable_cache()
            else:
                disable_cache()


class TestCacheEnvironmentVariable:
    """Test cache control via environment variable."""
    
    def test_cache_disabled_via_env_var(self, monkeypatch):
        """Test disabling cache via CFM_ENABLE_CACHE environment variable."""
        # This test would need to reload the module to test env var
        # For now, just document the behavior
        pass


# Example usage in tests
@pytest.fixture
def disable_cache_for_test():
    """Fixture to disable cache for a specific test."""
    original_state = is_cache_enabled()
    disable_cache()
    clear_cache()
    yield
    if original_state:
        enable_cache()
    else:
        disable_cache()


def test_with_cache_disabled(disable_cache_for_test):
    """Example test that runs with cache disabled."""
    # Your test code here
    # Cache will be disabled for this test
    assert is_cache_enabled() is False


@pytest.fixture
def enable_cache_for_test():
    """Fixture to enable cache for a specific test."""
    original_state = is_cache_enabled()
    enable_cache()
    clear_cache()
    yield
    if original_state:
        enable_cache()
    else:
        disable_cache()


def test_with_cache_enabled(enable_cache_for_test):
    """Example test that runs with cache enabled."""
    # Your test code here
    # Cache will be enabled for this test
    assert is_cache_enabled() is True

```

--------------------------------------------------------------------------------
/tests/unit/cloudwatch/test_cloudwatch_metrics_pagination.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Unit tests for CloudWatch metrics pagination functionality.
"""

import pytest
import sys
import os
from unittest.mock import patch, MagicMock

# Add the project root to the path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../../..'))

from playbooks.cloudwatch.result_processor import CloudWatchResultProcessor


class TestCloudWatchMetricsPagination:
    """Unit tests for CloudWatch metrics pagination functionality."""
    
    def test_metrics_pagination_with_large_dataset(self):
        """Test that metrics pagination correctly limits results to 10 items per page."""
        processor = CloudWatchResultProcessor()
        
        # Create 25 metrics to test pagination (should result in 3 pages)
        metrics = []
        for i in range(25):
            metric = {
                'MetricName': f'CustomMetric{i:02d}',
                'Namespace': f'MyApp/Service{i % 3}',  # Mix of namespaces
                'Dimensions': [
                    {'Name': 'InstanceId', 'Value': f'i-{i:010d}'},
                    {'Name': 'Environment', 'Value': 'production' if i % 2 == 0 else 'staging'}
                ]
            }
            metrics.append(metric)
        
        # Test page 1 - should have exactly 10 items
        result_p1 = processor.process_metrics_results(metrics, page=1)
        
        assert len(result_p1['items']) == 10, f"Page 1 should have exactly 10 items, got {len(result_p1['items'])}"
        assert result_p1['pagination']['current_page'] == 1
        assert result_p1['pagination']['total_items'] == 25
        assert result_p1['pagination']['total_pages'] == 3
        assert result_p1['pagination']['has_next_page'] is True
        assert result_p1['pagination']['has_previous_page'] is False
        
        # Test page 2 - should have exactly 10 items
        result_p2 = processor.process_metrics_results(metrics, page=2)
        
        assert len(result_p2['items']) == 10, f"Page 2 should have exactly 10 items, got {len(result_p2['items'])}"
        assert result_p2['pagination']['current_page'] == 2
        assert result_p2['pagination']['has_next_page'] is True
        assert result_p2['pagination']['has_previous_page'] is True
        
        # Test page 3 - should have exactly 5 items (remainder)
        result_p3 = processor.process_metrics_results(metrics, page=3)
        
        assert len(result_p3['items']) == 5, f"Page 3 should have exactly 5 items, got {len(result_p3['items'])}"
        assert result_p3['pagination']['current_page'] == 3
        assert result_p3['pagination']['has_next_page'] is False
        assert result_p3['pagination']['has_previous_page'] is True
        
        # Verify dimensions are preserved (not truncated)
        for item in result_p1['items']:
            assert 'Dimensions' in item, "Dimensions should be preserved"
            assert len(item['Dimensions']) == 2, "All dimensions should be preserved"
    
    def test_metrics_cost_sorting(self):
        """Test that metrics are sorted by cost (custom metrics first)."""
        processor = CloudWatchResultProcessor()
        
        # Create mix of AWS and custom metrics
        metrics = [
            {'MetricName': 'CPUUtilization', 'Namespace': 'AWS/EC2'},  # Free
            {'MetricName': 'CustomMetric1', 'Namespace': 'MyApp/Performance'},  # $0.30
            {'MetricName': 'NetworkIn', 'Namespace': 'AWS/EC2'},  # Free
            {'MetricName': 'CustomMetric2', 'Namespace': 'MyApp/Business'},  # $0.30
        ]
        
        result = processor.process_metrics_results(metrics, page=1)
        items = result['items']
        
        # Custom metrics should be first (higher cost)
        assert items[0]['Namespace'] in ['MyApp/Performance', 'MyApp/Business']
        assert items[0]['estimated_monthly_cost'] == 0.30
        assert items[1]['Namespace'] in ['MyApp/Performance', 'MyApp/Business']
        assert items[1]['estimated_monthly_cost'] == 0.30
        
        # AWS metrics should be last (free)
        assert items[2]['Namespace'] == 'AWS/EC2'
        assert items[2]['estimated_monthly_cost'] == 0.0
        assert items[3]['Namespace'] == 'AWS/EC2'
        assert items[3]['estimated_monthly_cost'] == 0.0
    
    def test_metrics_dimensions_preservation(self):
        """Test that metric dimensions are fully preserved, not truncated."""
        processor = CloudWatchResultProcessor()
        
        # Create metric with many dimensions
        metrics = [{
            'MetricName': 'ComplexMetric',
            'Namespace': 'MyApp/Complex',
            'Dimensions': [
                {'Name': 'InstanceId', 'Value': 'i-1234567890abcdef0'},
                {'Name': 'Environment', 'Value': 'production'},
                {'Name': 'Service', 'Value': 'web-server'},
                {'Name': 'Region', 'Value': 'us-east-1'},
                {'Name': 'AZ', 'Value': 'us-east-1a'},
                {'Name': 'Version', 'Value': 'v2.1.3'},
                {'Name': 'Team', 'Value': 'platform-engineering'},
            ]
        }]
        
        result = processor.process_metrics_results(metrics, page=1)
        item = result['items'][0]
        
        # All dimensions should be preserved
        assert len(item['Dimensions']) == 7, f"Expected 7 dimensions, got {len(item['Dimensions'])}"
        
        # Verify specific dimensions are present
        dimension_names = [d['Name'] for d in item['Dimensions']]
        expected_names = ['InstanceId', 'Environment', 'Service', 'Region', 'AZ', 'Version', 'Team']
        
        for expected_name in expected_names:
            assert expected_name in dimension_names, f"Dimension {expected_name} should be preserved"
    
    def test_empty_metrics_pagination(self):
        """Test pagination with empty metrics list."""
        processor = CloudWatchResultProcessor()
        
        result = processor.process_metrics_results([], page=1)
        
        assert len(result['items']) == 0
        assert result['pagination']['current_page'] == 1
        assert result['pagination']['total_items'] == 0
        assert result['pagination']['total_pages'] == 0
        assert result['pagination']['has_next_page'] is False
        assert result['pagination']['has_previous_page'] is False
    
    def test_single_page_metrics(self):
        """Test pagination with metrics that fit in a single page."""
        processor = CloudWatchResultProcessor()
        
        # Create 5 metrics (less than page size of 10)
        metrics = [
            {'MetricName': f'Metric{i}', 'Namespace': 'MyApp/Test'}
            for i in range(5)
        ]
        
        result = processor.process_metrics_results(metrics, page=1)
        
        assert len(result['items']) == 5
        assert result['pagination']['current_page'] == 1
        assert result['pagination']['total_items'] == 5
        assert result['pagination']['total_pages'] == 1
        assert result['pagination']['has_next_page'] is False
        assert result['pagination']['has_previous_page'] is False


if __name__ == '__main__':
    pytest.main([__file__, '-v'])
```

--------------------------------------------------------------------------------
/tests/legacy/test_runbooks.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Test script for CFM Tips AWS Cost Optimization MCP Server
"""

import sys
import os

# Add current directory to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))

def test_imports():
    """Test that all imports work correctly."""
    print("Testing imports...")
    
    try:
        # Test MCP server imports
        from mcp.server import Server
        from mcp.server.stdio import stdio_server
        from mcp.types import Tool, TextContent
        print("✅ MCP imports successful")
        
        # Test AWS imports
        import boto3
        from botocore.exceptions import ClientError, NoCredentialsError
        print("✅ AWS imports successful")
        
        # Test runbook functions import
        # Import functions directly from playbooks
        from playbooks.ec2.ec2_optimization import (
            run_ec2_right_sizing_analysis,
            generate_ec2_right_sizing_report
        )
        from playbooks.ebs.ebs_optimization import (
            run_ebs_optimization_analysis,
            identify_unused_ebs_volumes,
            generate_ebs_optimization_report_mcp as generate_ebs_optimization_report
        )
        from playbooks.rds.rds_optimization import (
            run_rds_optimization_analysis,
            identify_idle_rds_instances_wrapper as identify_idle_rds_instances,
            generate_rds_optimization_report
        )
        from playbooks.aws_lambda.lambda_optimization import (
            run_lambda_optimization_analysis,
            identify_unused_lambda_functions_mcp as identify_unused_lambda_functions,
            generate_lambda_optimization_report
        )
        from playbooks.comprehensive_optimization import run_comprehensive_cost_analysis
        from playbooks.cloudtrail.cloudtrail_optimization import (
            get_management_trails_mcp as get_management_trails,
            run_cloudtrail_trails_analysis_mcp as run_cloudtrail_trails_analysis,
            generate_cloudtrail_report_mcp as generate_cloudtrail_report
        )
        print("✅ Runbook functions import successful")
        
        return True
        
    except ImportError as e:
        print(f"❌ Import error: {e}")
        return False
    except Exception as e:
        print(f"❌ Unexpected error: {e}")
        return False

def test_server_creation():
    """Test that the MCP server can be created."""
    print("\nTesting server creation...")
    
    try:
        # Import the server module
        import mcp_server_with_runbooks
        print("✅ Server module imported successfully")
        
        # Check if server is created
        if hasattr(mcp_server_with_runbooks, 'server'):
            print("✅ Server object created successfully")
            
            # Check server name
            if mcp_server_with_runbooks.server.name == "cfm_tips":
                print("✅ Server name is correct: cfm_tips")
            else:
                print(f"⚠️  Server name: {mcp_server_with_runbooks.server.name}")
            
            return True
        else:
            print("❌ Server object not found")
            return False
            
    except Exception as e:
        print(f"❌ Server creation error: {str(e)}")
        return False

def test_cloudtrail_functions():
    """Test CloudTrail optimization functions."""
    print("\nTesting CloudTrail functions...")
    
    try:
        from playbooks.cloudtrail.cloudtrail_optimization import (
            get_management_trails_mcp as get_management_trails,
            run_cloudtrail_trails_analysis_mcp as run_cloudtrail_trails_analysis,
            generate_cloudtrail_report_mcp as generate_cloudtrail_report
        )
        print("✅ CloudTrail functions imported successfully")
        
        # Test function signatures
        import inspect
        
        # Check get_management_trails
        sig = inspect.signature(get_management_trails)
        if 'arguments' in sig.parameters:
            print("✅ get_management_trails has correct signature")
        else:
            print("❌ get_management_trails signature incorrect")
            return False
            
        # Check run_cloudtrail_trails_analysis
        sig = inspect.signature(run_cloudtrail_trails_analysis)
        if 'arguments' in sig.parameters:
            print("✅ run_cloudtrail_trails_analysis has correct signature")
        else:
            print("❌ run_cloudtrail_trails_analysis signature incorrect")
            return False
            
        # Check generate_cloudtrail_report
        sig = inspect.signature(generate_cloudtrail_report)
        if 'arguments' in sig.parameters:
            print("✅ generate_cloudtrail_report has correct signature")
        else:
            print("❌ generate_cloudtrail_report signature incorrect")
            return False
            
        return True
        
    except ImportError as e:
        print(f"❌ CloudTrail import error: {e}")
        return False
    except Exception as e:
        print(f"❌ CloudTrail test error: {e}")
        return False

def test_tool_names():
    """Test that tool names are within MCP limits."""
    print("\nTesting tool name lengths...")
    
    server_name = "cfm_tips"
    sample_tools = [
        "ec2_rightsizing",
        "ebs_optimization", 
        "rds_idle",
        "lambda_unused",
        "comprehensive_analysis",
        "get_coh_recommendations",
        "cloudtrail_optimization"
    ]
    
    max_length = 0
    for tool in sample_tools:
        combined = f"{server_name}___{tool}"
        length = len(combined)
        max_length = max(max_length, length)
        
        if length > 64:
            print(f"❌ Tool name too long: {combined} ({length} chars)")
            return False
    
    print(f"✅ All tool names within limit (max: {max_length} chars)")
    return True

def main():
    """Run all tests."""
    print("CFM Tips AWS Cost Optimization MCP Server - Integration Test")
    print("=" * 65)
    
    tests_passed = 0
    total_tests = 4
    
    # Test imports
    if test_imports():
        tests_passed += 1
    
    # Test server creation
    if test_server_creation():
        tests_passed += 1
    
    # Test CloudTrail functions
    if test_cloudtrail_functions():
        tests_passed += 1
    
    # Test tool names
    if test_tool_names():
        tests_passed += 1
    
    print(f"\n" + "=" * 65)
    print(f"Tests passed: {tests_passed}/{total_tests}")
    
    if tests_passed == total_tests:
        print("✅ All integration tests passed!")
        print("\nNext steps:")
        print("1. Configure AWS credentials: aws configure")
        print("2. Apply the correct IAM permissions (see CORRECTED_PERMISSIONS.md)")
        print("3. Start the server: q chat --mcp-config \"$(pwd)/mcp_runbooks.json\"")
        print("4. Test with: \"Run comprehensive cost analysis for us-east-1\"")
        print("\n🎉 CFM Tips is ready to help optimize your AWS costs!")
        return True
    else:
        print("❌ Some tests failed. Check the errors above.")
        return False

if __name__ == "__main__":
    success = main()
    sys.exit(0 if success else 1)
```

--------------------------------------------------------------------------------
/playbooks/comprehensive_optimization.py:
--------------------------------------------------------------------------------

```python
"""
Comprehensive Cost Optimization Playbook

This module provides multi-service cost optimization analysis functions.
Includes both core optimization functions and MCP runbook functions.
"""

import asyncio
import json
import logging
import time
from datetime import datetime
from typing import Dict, List, Any
from mcp.types import TextContent

from utils.error_handler import ResponseFormatter, handle_aws_error
from utils.service_orchestrator import ServiceOrchestrator
from utils.parallel_executor import create_task
from utils.documentation_links import add_documentation_links

# Import playbook modules
from playbooks.ec2.ec2_optimization import get_underutilized_instances
from playbooks.ebs.ebs_optimization import get_underutilized_volumes
from playbooks.rds.rds_optimization import get_underutilized_rds_instances, identify_idle_rds_instances
from playbooks.aws_lambda.lambda_optimization import get_underutilized_lambda_functions, identify_unused_lambda_functions
from playbooks.cloudtrail.cloudtrail_optimization import run_cloudtrail_optimization
from playbooks.cloudwatch.cloudwatch_optimization import run_cloudwatch_comprehensive_optimization_tool_mcp

logger = logging.getLogger(__name__)


@handle_aws_error
async def run_comprehensive_cost_analysis(arguments: Dict[str, Any]) -> List[TextContent]:
    """Run comprehensive cost analysis across multiple AWS services."""
    start_time = time.time()
    
    try:
        region = arguments.get("region")
        services = arguments.get("services", ["ec2", "ebs", "rds", "lambda", "cloudtrail", "s3", "cloudwatch"])
        lookback_period_days = arguments.get("lookback_period_days", 14)
        output_format = arguments.get("output_format", "json")
        
        # Initialize service orchestrator for parallel execution and session management
        orchestrator = ServiceOrchestrator()
        
        # Define parallel service calls based on requested services
        service_calls = []
        
        if "ec2" in services:
            service_calls.extend([
                {
                    'service': 'ec2',
                    'operation': 'underutilized_instances',
                    'function': get_underutilized_instances,
                    'args': {
                        'region': region,
                        'lookback_period_days': lookback_period_days
                    }
                },
                {
                    'service': 'ec2',
                    'operation': 'stopped_instances',
                    'function': lambda **kwargs: {"stopped_instances": []},  # Placeholder
                    'args': {'region': region}
                }
            ])
        
        if "ebs" in services:
            service_calls.extend([
                {
                    'service': 'ebs',
                    'operation': 'underutilized_volumes',
                    'function': get_underutilized_volumes,
                    'args': {
                        'region': region,
                        'lookback_period_days': 30
                    }
                },
                {
                    'service': 'ebs',
                    'operation': 'unused_volumes',
                    'function': lambda **kwargs: {"unused_volumes": []},  # Placeholder
                    'args': {'region': region}
                }
            ])
        
        if "rds" in services:
            service_calls.extend([
                {
                    'service': 'rds',
                    'operation': 'underutilized_instances',
                    'function': get_underutilized_rds_instances,
                    'args': {
                        'region': region,
                        'lookback_period_days': lookback_period_days
                    }
                },
                {
                    'service': 'rds',
                    'operation': 'idle_instances',
                    'function': identify_idle_rds_instances,
                    'args': {
                        'region': region,
                        'lookback_period_days': 7
                    }
                }
            ])
        
        if "lambda" in services:
            service_calls.extend([
                {
                    'service': 'lambda',
                    'operation': 'underutilized_functions',
                    'function': get_underutilized_lambda_functions,
                    'args': {
                        'region': region,
                        'lookback_period_days': lookback_period_days
                    }
                },
                {
                    'service': 'lambda',
                    'operation': 'unused_functions',
                    'function': identify_unused_lambda_functions,
                    'args': {
                        'region': region,
                        'lookback_period_days': 30
                    }
                }
            ])
        
        if "cloudtrail" in services:
            service_calls.append({
                'service': 'cloudtrail',
                'operation': 'optimization',
                'function': run_cloudtrail_optimization,
                'args': {'region': region}
            })
        
        if "cloudwatch" in services:
            # CloudWatch uses its own comprehensive optimization tool
            def cloudwatch_wrapper(region=None, lookback_days=30, **kwargs):
                return {
                    'status': 'success',
                    'service': 'cloudwatch',
                    'message': 'CloudWatch analysis requires separate execution via cloudwatch_comprehensive_optimization_tool',
                    'recommendation': 'Use the dedicated CloudWatch comprehensive optimization tool for detailed analysis',
                    'region': region,
                    'lookback_days': lookback_days,
                    'note': 'CloudWatch has its own advanced parallel execution and memory management system'
                }
            
            service_calls.append({
                'service': 'cloudwatch',
                'operation': 'comprehensive_optimization',
                'function': cloudwatch_wrapper,
                'args': {
                    'region': region,
                    'lookback_days': lookback_period_days
                }
            })
        
        # Execute parallel analysis
        results = orchestrator.execute_parallel_analysis(
            service_calls=service_calls,
            store_results=True,
            timeout=120.0
        )
        
        # Add documentation links
        results = add_documentation_links(results)
        
        execution_time = time.time() - start_time
        
        # Format response with metadata
        results["comprehensive_analysis"] = {
            "analysis_type": "multi_service_comprehensive",
            "services_analyzed": services,
            "region": region,
            "lookback_period_days": lookback_period_days,
            "session_id": results.get("report_metadata", {}).get("session_id"),
            "parallel_execution": True,
            "sql_storage": True
        }
        
        return ResponseFormatter.to_text_content(
            ResponseFormatter.success_response(
                data=results,
                message=f"Comprehensive analysis completed for {len(services)} services",
                analysis_type="comprehensive_analysis",
                execution_time=execution_time
            )
        )
        
    except Exception as e:
        logger.error(f"Error in comprehensive cost analysis: {str(e)}")
        raise
```

--------------------------------------------------------------------------------
/utils/error_handler.py:
--------------------------------------------------------------------------------

```python
"""
Centralized Error Handler for AWS Cost Optimization MCP Server

Provides consistent error handling and formatting across all modules.
"""

import logging
from typing import Dict, Any, List, Optional
from botocore.exceptions import ClientError, NoCredentialsError
from mcp.types import TextContent
import json

logger = logging.getLogger(__name__)


class AWSErrorHandler:
    """Centralized AWS error handling and formatting."""
    
    # Common AWS error codes and their required permissions
    PERMISSION_MAP = {
        'AccessDenied': 'Check IAM permissions for the requested service',
        'UnauthorizedOperation': 'Verify IAM policy allows the requested operation',
        'InvalidUserID.NotFound': 'Check AWS credentials configuration',
        'TokenRefreshRequired': 'AWS credentials may have expired',
        'OptInRequired': 'Service may need to be enabled in AWS Console',
        'ServiceUnavailable': 'AWS service temporarily unavailable',
        'ThrottlingException': 'Request rate exceeded, implement retry logic',
        'ValidationException': 'Check request parameters and format'
    }
    
    @staticmethod
    def format_client_error(e: ClientError, context: str, 
                          required_permissions: Optional[List[str]] = None) -> Dict[str, Any]:
        """
        Format AWS ClientError into standardized error response.
        
        Args:
            e: The ClientError exception
            context: Context where the error occurred
            required_permissions: List of required IAM permissions
            
        Returns:
            Standardized error response dictionary
        """
        error_code = e.response.get('Error', {}).get('Code', 'Unknown')
        error_message = e.response.get('Error', {}).get('Message', str(e))
        
        logger.error(f"AWS API Error in {context}: {error_code} - {error_message}")
        
        response = {
            "status": "error",
            "error_code": error_code,
            "message": f"AWS API Error: {error_code} - {error_message}",
            "context": context,
            "timestamp": logger.handlers[0].formatter.formatTime(logger.makeRecord(
                logger.name, logging.ERROR, __file__, 0, "", (), None
            )) if logger.handlers else None
        }
        
        # Add permission guidance
        if required_permissions:
            response["required_permissions"] = required_permissions
        elif error_code in AWSErrorHandler.PERMISSION_MAP:
            response["permission_guidance"] = AWSErrorHandler.PERMISSION_MAP[error_code]
        
        # Add retry guidance for throttling
        if error_code in ['ThrottlingException', 'RequestLimitExceeded']:
            response["retry_guidance"] = {
                "retryable": True,
                "suggested_delay": "exponential backoff starting at 1 second"
            }
        
        return response
    
    @staticmethod
    def format_no_credentials_error(context: str) -> Dict[str, Any]:
        """Format NoCredentialsError into standardized response."""
        logger.error(f"AWS credentials not found in {context}")
        
        return {
            "status": "error",
            "error_code": "NoCredentialsError",
            "message": "AWS credentials not configured",
            "context": context,
            "setup_guidance": {
                "aws_cli": "Run 'aws configure' to set up credentials",
                "environment": "Set AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY",
                "iam_role": "Ensure EC2 instance has appropriate IAM role attached"
            }
        }
    
    @staticmethod
    def format_general_error(e: Exception, context: str) -> Dict[str, Any]:
        """Format general exceptions into standardized response."""
        logger.error(f"General error in {context}: {str(e)}")
        
        return {
            "status": "error",
            "error_code": type(e).__name__,
            "message": str(e),
            "context": context
        }
    
    @staticmethod
    def to_text_content(error_dict: Dict[str, Any]) -> List[TextContent]:
        """Convert error dictionary to MCP TextContent format."""
        return [TextContent(type="text", text=json.dumps(error_dict, indent=2, default=str))]


class ResponseFormatter:
    """Standardized response formatting for MCP tools."""
    
    @staticmethod
    def success_response(data: Any, message: str, analysis_type: str = None, 
                        execution_time: float = None, metadata: Dict = None) -> Dict[str, Any]:
        """
        Format successful response with consistent structure.
        
        Args:
            data: The response data
            message: Success message
            analysis_type: Type of analysis performed
            execution_time: Execution time in seconds
            metadata: Additional metadata
            
        Returns:
            Standardized success response
        """
        response = {
            "status": "success",
            "data": data,
            "message": message
        }
        
        if analysis_type:
            response["analysis_type"] = analysis_type
        
        if execution_time is not None:
            response["execution_time"] = execution_time
        
        if metadata:
            response["metadata"] = metadata
        
        # Add Well-Architected Framework hint for LLMs
        response["wellarchitected_hint"] = "Analyze these findings to provide AWS Well-Architected Framework Cost Optimization pillar recommendations focusing on right-sizing, eliminating waste, leveraging pricing models, and optimizing over time."
        
        return response
    
    @staticmethod
    def error_response(error: Exception, context: str, 
                      required_permissions: Optional[List[str]] = None) -> Dict[str, Any]:
        """
        Format error response based on exception type.
        
        Args:
            error: The exception that occurred
            context: Context where error occurred
            required_permissions: Required IAM permissions
            
        Returns:
            Standardized error response
        """
        if isinstance(error, ClientError):
            return AWSErrorHandler.format_client_error(error, context, required_permissions)
        elif isinstance(error, NoCredentialsError):
            return AWSErrorHandler.format_no_credentials_error(context)
        else:
            return AWSErrorHandler.format_general_error(error, context)
    
    @staticmethod
    def to_text_content(response_dict: Dict[str, Any]) -> List[TextContent]:
        """Convert response dictionary to MCP TextContent format."""
        return [TextContent(type="text", text=json.dumps(response_dict, indent=2, default=str))]


# Convenience functions for common use cases
def handle_aws_error(func):
    """Decorator for consistent AWS error handling in MCP tools."""
    def wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except ClientError as e:
            context = f"{func.__name__}"
            error_response = AWSErrorHandler.format_client_error(e, context)
            return AWSErrorHandler.to_text_content(error_response)
        except NoCredentialsError:
            context = f"{func.__name__}"
            error_response = AWSErrorHandler.format_no_credentials_error(context)
            return AWSErrorHandler.to_text_content(error_response)
        except Exception as e:
            context = f"{func.__name__}"
            error_response = AWSErrorHandler.format_general_error(e, context)
            return AWSErrorHandler.to_text_content(error_response)
    
    return wrapper
```

--------------------------------------------------------------------------------
/utils/aws_client_factory.py:
--------------------------------------------------------------------------------

```python
"""
AWS Client Factory for Centralized Client Management

Provides consistent AWS client creation with proper error handling and configuration.
"""

import boto3
import logging
from typing import Dict, Any, Optional
from botocore.exceptions import ClientError, NoCredentialsError
from botocore.config import Config

logger = logging.getLogger(__name__)


class AWSClientFactory:
    """Centralized AWS client creation and management."""
    
    # Default configurations for different services
    DEFAULT_CONFIGS = {
        'cost-optimization-hub': {
            'region_name': 'us-east-1',  # COH is only available in us-east-1
            'config': Config(
                retries={'max_attempts': 3, 'mode': 'adaptive'},
                read_timeout=60
            )
        },
        'ce': {  # Cost Explorer
            'region_name': 'us-east-1',  # CE is only available in us-east-1
            'config': Config(
                retries={'max_attempts': 3, 'mode': 'adaptive'},
                read_timeout=60
            )
        },
        'support': {  # Trusted Advisor
            'region_name': 'us-east-1',  # Support API only in us-east-1
            'config': Config(
                retries={'max_attempts': 3, 'mode': 'adaptive'},
                read_timeout=60
            )
        },
        'compute-optimizer': {
            'config': Config(
                retries={'max_attempts': 3, 'mode': 'adaptive'},
                read_timeout=60
            )
        },
        'pi': {  # Performance Insights
            'config': Config(
                retries={'max_attempts': 3, 'mode': 'adaptive'},
                read_timeout=30
            )
        },
        'default': {
            'config': Config(
                retries={'max_attempts': 3, 'mode': 'adaptive'},
                read_timeout=30
            )
        }
    }
    
    _clients: Dict[str, Any] = {}  # Client cache
    
    @classmethod
    def get_client(cls, service_name: str, region: Optional[str] = None, 
                   force_new: bool = False) -> boto3.client:
        """
        Get AWS client with proper configuration and caching.
        
        Args:
            service_name: AWS service name (e.g., 'ec2', 's3', 'cost-optimization-hub')
            region: AWS region (optional, uses service defaults or session default)
            force_new: Force creation of new client instead of using cache
            
        Returns:
            Configured boto3 client
            
        Raises:
            NoCredentialsError: If AWS credentials are not configured
            ClientError: If client creation fails
        """
        # Create cache key
        cache_key = f"{service_name}:{region or 'default'}"
        
        # Return cached client if available and not forcing new
        if not force_new and cache_key in cls._clients:
            logger.debug(f"Using cached client for {service_name}")
            return cls._clients[cache_key]
        
        try:
            # Get service configuration
            service_config = cls.DEFAULT_CONFIGS.get(service_name, cls.DEFAULT_CONFIGS['default'])
            
            # Prepare client arguments
            client_args = {
                'service_name': service_name,
                'config': service_config.get('config')
            }
            
            # Set region - priority: parameter > service default > session default
            if region:
                client_args['region_name'] = region
            elif 'region_name' in service_config:
                client_args['region_name'] = service_config['region_name']
            
            # Create client
            client = boto3.client(**client_args)
            
            # Cache the client
            cls._clients[cache_key] = client
            
            logger.debug(f"Created new {service_name} client for region {client_args.get('region_name', 'default')}")
            return client
            
        except NoCredentialsError:
            logger.error(f"AWS credentials not configured for {service_name} client")
            raise
        except Exception as e:
            logger.error(f"Failed to create {service_name} client: {str(e)}")
            raise
    
    @classmethod
    def get_session(cls, region: Optional[str] = None) -> boto3.Session:
        """
        Get AWS session with proper configuration.
        
        Args:
            region: AWS region (optional)
            
        Returns:
            Configured boto3 session
        """
        try:
            session_args = {}
            if region:
                session_args['region_name'] = region
            
            session = boto3.Session(**session_args)
            
            # Verify credentials by making a simple call
            sts_client = session.client('sts')
            sts_client.get_caller_identity()
            
            logger.debug(f"Created AWS session for region {region or 'default'}")
            return session
            
        except NoCredentialsError:
            logger.error("AWS credentials not configured")
            raise
        except Exception as e:
            logger.error(f"Failed to create AWS session: {str(e)}")
            raise
    
    @classmethod
    def clear_cache(cls):
        """Clear the client cache."""
        cls._clients.clear()
        logger.debug("Cleared AWS client cache")
    
    @classmethod
    def get_available_regions(cls, service_name: str) -> list:
        """
        Get available regions for a service.
        
        Args:
            service_name: AWS service name
            
        Returns:
            List of available regions
        """
        try:
            session = boto3.Session()
            return session.get_available_regions(service_name)
        except Exception as e:
            logger.warning(f"Could not get regions for {service_name}: {str(e)}")
            return []
    
    @classmethod
    def validate_region(cls, service_name: str, region: str) -> bool:
        """
        Validate if a region is available for a service.
        
        Args:
            service_name: AWS service name
            region: AWS region to validate
            
        Returns:
            True if region is valid for service
        """
        available_regions = cls.get_available_regions(service_name)
        return region in available_regions if available_regions else True
    
    @classmethod
    def get_caller_identity(cls) -> Dict[str, Any]:
        """
        Get AWS caller identity information.
        
        Returns:
            Dictionary with account ID, user ARN, etc.
        """
        try:
            sts_client = cls.get_client('sts')
            return sts_client.get_caller_identity()
        except Exception as e:
            logger.error(f"Failed to get caller identity: {str(e)}")
            raise


# Convenience functions
def get_cost_explorer_client() -> boto3.client:
    """Get Cost Explorer client (always us-east-1)."""
    return AWSClientFactory.get_client('ce')


def get_cost_optimization_hub_client() -> boto3.client:
    """Get Cost Optimization Hub client (always us-east-1)."""
    return AWSClientFactory.get_client('cost-optimization-hub')


def get_compute_optimizer_client(region: Optional[str] = None) -> boto3.client:
    """Get Compute Optimizer client."""
    return AWSClientFactory.get_client('compute-optimizer', region)


def get_trusted_advisor_client() -> boto3.client:
    """Get Trusted Advisor client (always us-east-1)."""
    return AWSClientFactory.get_client('support')


def get_performance_insights_client(region: Optional[str] = None) -> boto3.client:
    """Get Performance Insights client."""
    return AWSClientFactory.get_client('pi', region)


def get_regional_client(service_name: str, region: str) -> boto3.client:
    """Get regional client for EC2, EBS, RDS, Lambda, S3, etc."""
    return AWSClientFactory.get_client(service_name, region)
```

--------------------------------------------------------------------------------
/tests/run_tests.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Test runner for S3 optimization system.

This script provides a convenient way to run different test suites
with appropriate configurations and reporting.
"""

import sys
import os
import subprocess
import argparse
from pathlib import Path
from typing import List, Optional


def run_command(cmd: List[str], description: str) -> int:
    """Run a command and return the exit code."""
    print(f"\n{'='*60}")
    print(f"Running: {description}")
    print(f"Command: {' '.join(cmd)}")
    print(f"{'='*60}")
    
    result = subprocess.run(cmd, cwd=Path(__file__).parent.parent)
    return result.returncode


def run_unit_tests(verbose: bool = False, coverage: bool = True) -> int:
    """Run unit tests."""
    cmd = ["python", "-m", "pytest", "tests/unit/"]
    
    if verbose:
        cmd.append("-v")
    
    if coverage:
        cmd.extend([
            "--cov=core",
            "--cov=services", 
            "--cov-report=term-missing",
            "--cov-report=html:htmlcov/unit"
        ])
    
    cmd.extend([
        "-m", "unit",
        "--tb=short"
    ])
    
    return run_command(cmd, "Unit Tests")


def run_integration_tests(verbose: bool = False) -> int:
    """Run integration tests."""
    cmd = ["python", "-m", "pytest", "tests/integration/"]
    
    if verbose:
        cmd.append("-v")
    
    cmd.extend([
        "-m", "integration",
        "--tb=short"
    ])
    
    return run_command(cmd, "Integration Tests")


def run_performance_tests(verbose: bool = False) -> int:
    """Run performance tests."""
    cmd = ["python", "-m", "pytest", "tests/performance/"]
    
    if verbose:
        cmd.append("-v")
    
    cmd.extend([
        "-m", "performance",
        "--tb=short",
        "--benchmark-only",
        "--benchmark-sort=mean"
    ])
    
    return run_command(cmd, "Performance Tests")


def run_cost_validation_tests(verbose: bool = False) -> int:
    """Run critical no-cost constraint validation tests."""
    cmd = ["python", "-m", "pytest", "tests/no_cost_validation/"]
    
    if verbose:
        cmd.append("-v")
    
    cmd.extend([
        "-m", "no_cost_validation",
        "--tb=long",  # More detailed output for critical tests
        "--strict-markers"
    ])
    
    return run_command(cmd, "No-Cost Constraint Validation Tests (CRITICAL)")


def run_all_tests(verbose: bool = False, coverage: bool = True) -> int:
    """Run all test suites."""
    cmd = ["python", "-m", "pytest", "tests/"]
    
    if verbose:
        cmd.append("-v")
    
    if coverage:
        cmd.extend([
            "--cov=core",
            "--cov=services",
            "--cov-report=term-missing", 
            "--cov-report=html:htmlcov/all",
            "--cov-fail-under=80"
        ])
    
    cmd.extend([
        "--tb=short",
        "--durations=10"
    ])
    
    return run_command(cmd, "All Tests")


def run_specific_test(test_path: str, verbose: bool = False) -> int:
    """Run a specific test file or directory."""
    cmd = ["python", "-m", "pytest", test_path]
    
    if verbose:
        cmd.append("-v")
    
    cmd.extend(["--tb=short"])
    
    return run_command(cmd, f"Specific Test: {test_path}")


def check_test_environment() -> bool:
    """Check if the test environment is properly set up."""
    print("Checking test environment...")
    
    # Check if pytest is available
    try:
        import pytest
        print(f"✓ pytest {pytest.__version__} is available")
    except ImportError:
        print("✗ pytest is not installed")
        return False
    
    # Check if moto is available for AWS mocking
    try:
        import moto
        print(f"✓ moto {moto.__version__} is available")
    except ImportError:
        print("✗ moto is not installed")
        return False
    
    # Check if core modules can be imported
    try:
        sys.path.insert(0, str(Path(__file__).parent.parent))
        from playbooks.s3.base_analyzer import BaseAnalyzer
        from services.s3_service import S3Service
        print("✓ Core modules can be imported")
    except ImportError as e:
        print(f"✗ Cannot import core modules: {e}")
        return False
    
    print("✓ Test environment is ready")
    return True


def generate_test_report() -> int:
    """Generate comprehensive test report."""
    cmd = [
        "python", "-m", "pytest", "tests/",
        "--html=test_report.html",
        "--self-contained-html",
        "--json-report",
        "--json-report-file=test_report.json",
        "--cov=core",
        "--cov=services",
        "--cov-report=html:htmlcov/report",
        "--tb=short"
    ]
    
    return run_command(cmd, "Comprehensive Test Report Generation")


def main():
    """Main test runner function."""
    parser = argparse.ArgumentParser(
        description="Test runner for S3 optimization system",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  python run_tests.py --unit                    # Run unit tests only
  python run_tests.py --integration             # Run integration tests only
  python run_tests.py --performance             # Run performance tests only
  python run_tests.py --cost-validation         # Run cost validation tests only
  python run_tests.py --all                     # Run all tests
  python run_tests.py --specific tests/unit/    # Run specific test directory
  python run_tests.py --report                  # Generate comprehensive report
  python run_tests.py --check                   # Check test environment
        """
    )
    
    # Test suite selection
    parser.add_argument("--unit", action="store_true", help="Run unit tests")
    parser.add_argument("--integration", action="store_true", help="Run integration tests")
    parser.add_argument("--performance", action="store_true", help="Run performance tests")
    parser.add_argument("--cost-validation", action="store_true", help="Run cost validation tests")
    parser.add_argument("--all", action="store_true", help="Run all tests")
    parser.add_argument("--specific", type=str, help="Run specific test file or directory")
    
    # Utility options
    parser.add_argument("--report", action="store_true", help="Generate comprehensive test report")
    parser.add_argument("--check", action="store_true", help="Check test environment")
    
    # Test options
    parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output")
    parser.add_argument("--no-coverage", action="store_true", help="Disable coverage reporting")
    
    args = parser.parse_args()
    
    # Check environment if requested
    if args.check:
        if check_test_environment():
            return 0
        else:
            return 1
    
    # Generate report if requested
    if args.report:
        return generate_test_report()
    
    # Determine which tests to run
    exit_code = 0
    coverage = not args.no_coverage
    
    if args.unit:
        exit_code = run_unit_tests(args.verbose, coverage)
    elif args.integration:
        exit_code = run_integration_tests(args.verbose)
    elif args.performance:
        exit_code = run_performance_tests(args.verbose)
    elif args.cost_validation:
        exit_code = run_cost_validation_tests(args.verbose)
    elif args.all:
        exit_code = run_all_tests(args.verbose, coverage)
    elif args.specific:
        exit_code = run_specific_test(args.specific, args.verbose)
    else:
        # Default: run unit and integration tests
        print("No specific test suite selected. Running unit and integration tests...")
        exit_code = run_unit_tests(args.verbose, coverage)
        if exit_code == 0:
            exit_code = run_integration_tests(args.verbose)
    
    # Summary
    if exit_code == 0:
        print(f"\n{'='*60}")
        print("✓ All tests passed successfully!")
        print(f"{'='*60}")
    else:
        print(f"\n{'='*60}")
        print("✗ Some tests failed. Check the output above for details.")
        print(f"{'='*60}")
    
    return exit_code


if __name__ == "__main__":
    sys.exit(main())
```

--------------------------------------------------------------------------------
/tests/legacy/test_setup_verification.py:
--------------------------------------------------------------------------------

```python
"""
Setup verification test to ensure the testing framework is working correctly.

This test validates that the testing infrastructure is properly configured
and can run basic tests with mocked AWS services.
"""

import pytest
import asyncio
from unittest.mock import Mock, AsyncMock, patch
from datetime import datetime


@pytest.mark.unit
class TestSetupVerification:
    """Verify that the testing setup is working correctly."""
    
    def test_pytest_is_working(self):
        """Test that pytest is working correctly."""
        assert True
    
    def test_fixtures_are_available(self, mock_s3_service, mock_storage_lens_service, 
                                  mock_pricing_service):
        """Test that common fixtures are available."""
        assert mock_s3_service is not None
        assert mock_storage_lens_service is not None
        assert mock_pricing_service is not None
    
    @pytest.mark.asyncio
    async def test_async_testing_works(self):
        """Test that async testing is working."""
        async def async_function():
            await asyncio.sleep(0.001)
            return "async_result"
        
        result = await async_function()
        assert result == "async_result"
    
    def test_mocking_works(self):
        """Test that mocking is working correctly."""
        mock_service = Mock()
        mock_service.test_method.return_value = "mocked_result"
        
        result = mock_service.test_method()
        assert result == "mocked_result"
        mock_service.test_method.assert_called_once()
    
    def test_aws_mocking_works(self, mock_aws_credentials):
        """Test that AWS service mocking is working."""
        with patch('boto3.client') as mock_boto_client:
            mock_client = Mock()
            mock_client.list_buckets.return_value = {"Buckets": []}
            mock_boto_client.return_value = mock_client
            
            import boto3
            s3_client = boto3.client('s3')
            result = s3_client.list_buckets()
            
            assert result == {"Buckets": []}
    
    def test_cost_constraint_validator_works(self, cost_constraint_validator):
        """Test that cost constraint validator is working."""
        # Should allow valid operations
        assert cost_constraint_validator.validate_operation('list_buckets') is True
        
        # Should reject forbidden operations
        with pytest.raises(ValueError):
            cost_constraint_validator.validate_operation('list_objects_v2')
        
        summary = cost_constraint_validator.get_operation_summary()
        assert summary["total_operations"] == 2
        assert "list_buckets" in summary["allowed_called"]
        assert "list_objects_v2" in summary["forbidden_called"]
    
    def test_performance_tracker_works(self, performance_tracker):
        """Test that performance tracker is working."""
        performance_tracker.start_timer("test_operation")
        # Simulate some work
        import time
        time.sleep(0.01)
        duration = performance_tracker.end_timer("test_operation")
        
        assert duration > 0
        assert duration < 1.0  # Should be very quick
        
        metrics = performance_tracker.get_metrics()
        assert "test_operation" in metrics
        assert metrics["test_operation"] > 0
    
    def test_test_data_factory_works(self, test_data_factory):
        """Test that test data factory is working."""
        buckets = test_data_factory.create_bucket_data(count=3)
        assert len(buckets) == 3
        assert all("Name" in bucket for bucket in buckets)
        assert all("CreationDate" in bucket for bucket in buckets)
        
        cost_data = test_data_factory.create_cost_data(days=5)
        assert "ResultsByTime" in cost_data
        assert len(cost_data["ResultsByTime"]) == 5
        
        analysis_result = test_data_factory.create_analysis_result("test_analysis")
        assert analysis_result["status"] == "success"
        assert analysis_result["analysis_type"] == "test_analysis"


@pytest.mark.integration
class TestIntegrationSetupVerification:
    """Verify that integration testing setup is working."""
    
    @pytest.mark.asyncio
    async def test_orchestrator_can_be_mocked(self, mock_service_orchestrator):
        """Test that orchestrator can be properly mocked for integration tests."""
        # This would normally import the real orchestrator, but we'll mock it
        with patch('core.s3_optimization_orchestrator.ServiceOrchestrator', return_value=mock_service_orchestrator), \
             patch('core.s3_optimization_orchestrator.get_performance_monitor'), \
             patch('core.s3_optimization_orchestrator.get_memory_manager'), \
             patch('core.s3_optimization_orchestrator.get_timeout_handler'), \
             patch('core.s3_optimization_orchestrator.get_pricing_cache'), \
             patch('core.s3_optimization_orchestrator.get_bucket_metadata_cache'), \
             patch('core.s3_optimization_orchestrator.get_analysis_results_cache'):
            
            from playbooks.s3.s3_optimization_orchestrator import S3OptimizationOrchestrator
            
            orchestrator = S3OptimizationOrchestrator(region="us-east-1")
            assert orchestrator.region == "us-east-1"
            assert orchestrator.service_orchestrator == mock_service_orchestrator


@pytest.mark.performance
class TestPerformanceSetupVerification:
    """Verify that performance testing setup is working."""
    
    @pytest.mark.asyncio
    async def test_performance_measurement_works(self, performance_tracker):
        """Test that performance measurement is working."""
        performance_tracker.start_timer("performance_test")
        
        # Simulate some async work
        await asyncio.sleep(0.01)
        
        duration = performance_tracker.end_timer("performance_test")
        
        assert duration > 0.005  # Should be at least 5ms
        assert duration < 0.1    # Should be less than 100ms
        
        # Test performance assertion
        performance_tracker.assert_performance("performance_test", 0.1)


@pytest.mark.no_cost_validation
class TestCostValidationSetupVerification:
    """Verify that cost validation testing setup is working."""
    
    def test_cost_constraint_system_is_active(self):
        """Test that cost constraint validation system is active."""
        from services.s3_service import ALLOWED_S3_OPERATIONS, FORBIDDEN_S3_OPERATIONS
        
        # Verify that the constraint lists are populated
        assert len(ALLOWED_S3_OPERATIONS) > 0
        assert len(FORBIDDEN_S3_OPERATIONS) > 0
        
        # Verify critical operations are in the right lists
        assert 'list_buckets' in ALLOWED_S3_OPERATIONS
        assert 'list_objects_v2' in FORBIDDEN_S3_OPERATIONS
    
    def test_cost_constraint_violation_error_works(self, mock_aws_credentials):
        """Test that cost constraint violation errors work correctly."""
        from services.s3_service import S3Service, S3CostConstraintViolationError
        
        service = S3Service(region="us-east-1")
        
        with pytest.raises(S3CostConstraintViolationError):
            service._validate_s3_operation('list_objects_v2')
    
    def test_cost_validator_fixture_works(self, cost_constraint_validator):
        """Test that cost validator fixture is working correctly."""
        # Should track operations
        cost_constraint_validator.validate_operation('list_buckets')
        
        # Should reject forbidden operations
        with pytest.raises(ValueError):
            cost_constraint_validator.validate_operation('get_object')
        
        summary = cost_constraint_validator.get_operation_summary()
        assert summary["total_operations"] == 2
        assert len(summary["forbidden_called"]) == 1
        assert len(summary["allowed_called"]) == 1


class TestMarkerSystem:
    """Test that the pytest marker system is working."""
    
    def test_markers_are_configured(self):
        """Test that pytest markers are properly configured."""
        # This test itself uses markers, so if it runs, markers are working
        assert True
    
    def test_can_run_specific_marker_tests(self):
        """Test that we can run tests with specific markers."""
        # This would be tested by running: pytest -m unit
        # If this test runs when using -m unit, then markers work
        assert True
```

--------------------------------------------------------------------------------
/tests/test_setup_verification.py:
--------------------------------------------------------------------------------

```python
"""
Setup verification test to ensure the testing framework is working correctly.

This test validates that the testing infrastructure is properly configured
and can run basic tests with mocked AWS services.
"""

import pytest
import asyncio
from unittest.mock import Mock, AsyncMock, patch
from datetime import datetime


@pytest.mark.unit
class TestSetupVerification:
    """Verify that the testing setup is working correctly."""
    
    def test_pytest_is_working(self):
        """Test that pytest is working correctly."""
        assert True
    
    def test_fixtures_are_available(self, mock_s3_service, mock_storage_lens_service, 
                                  mock_pricing_service):
        """Test that common fixtures are available."""
        assert mock_s3_service is not None
        assert mock_storage_lens_service is not None
        assert mock_pricing_service is not None
    
    @pytest.mark.asyncio
    async def test_async_testing_works(self):
        """Test that async testing is working."""
        async def async_function():
            await asyncio.sleep(0.001)
            return "async_result"
        
        result = await async_function()
        assert result == "async_result"
    
    def test_mocking_works(self):
        """Test that mocking is working correctly."""
        mock_service = Mock()
        mock_service.test_method.return_value = "mocked_result"
        
        result = mock_service.test_method()
        assert result == "mocked_result"
        mock_service.test_method.assert_called_once()
    
    def test_aws_mocking_works(self, mock_aws_credentials):
        """Test that AWS service mocking is working."""
        with patch('boto3.client') as mock_boto_client:
            mock_client = Mock()
            mock_client.list_buckets.return_value = {"Buckets": []}
            mock_boto_client.return_value = mock_client
            
            import boto3
            s3_client = boto3.client('s3')
            result = s3_client.list_buckets()
            
            assert result == {"Buckets": []}
    
    def test_cost_constraint_validator_works(self, cost_constraint_validator):
        """Test that cost constraint validator is working."""
        # Should allow valid operations
        assert cost_constraint_validator.validate_operation('list_buckets') is True
        
        # Should reject forbidden operations
        with pytest.raises(ValueError):
            cost_constraint_validator.validate_operation('list_objects_v2')
        
        summary = cost_constraint_validator.get_operation_summary()
        assert summary["total_operations"] == 2
        assert "list_buckets" in summary["allowed_called"]
        assert "list_objects_v2" in summary["forbidden_called"]
    
    def test_performance_tracker_works(self, performance_tracker):
        """Test that performance tracker is working."""
        performance_tracker.start_timer("test_operation")
        # Simulate some work
        import time
        time.sleep(0.01)
        duration = performance_tracker.end_timer("test_operation")
        
        assert duration > 0
        assert duration < 1.0  # Should be very quick
        
        metrics = performance_tracker.get_metrics()
        assert "test_operation" in metrics
        assert metrics["test_operation"] > 0
    
    def test_test_data_factory_works(self, test_data_factory):
        """Test that test data factory is working."""
        buckets = test_data_factory.create_bucket_data(count=3)
        assert len(buckets) == 3
        assert all("Name" in bucket for bucket in buckets)
        assert all("CreationDate" in bucket for bucket in buckets)
        
        cost_data = test_data_factory.create_cost_data(days=5)
        assert "ResultsByTime" in cost_data
        assert len(cost_data["ResultsByTime"]) == 5
        
        analysis_result = test_data_factory.create_analysis_result("test_analysis")
        assert analysis_result["status"] == "success"
        assert analysis_result["analysis_type"] == "test_analysis"


@pytest.mark.integration
class TestIntegrationSetupVerification:
    """Verify that integration testing setup is working."""
    
    @pytest.mark.asyncio
    async def test_orchestrator_can_be_mocked(self, mock_service_orchestrator):
        """Test that orchestrator can be properly mocked for integration tests."""
        # This would normally import the real orchestrator, but we'll mock it
        with patch('core.s3_optimization_orchestrator.ServiceOrchestrator', return_value=mock_service_orchestrator), \
             patch('core.s3_optimization_orchestrator.get_performance_monitor'), \
             patch('core.s3_optimization_orchestrator.get_memory_manager'), \
             patch('core.s3_optimization_orchestrator.get_timeout_handler'), \
             patch('core.s3_optimization_orchestrator.get_pricing_cache'), \
             patch('core.s3_optimization_orchestrator.get_bucket_metadata_cache'), \
             patch('core.s3_optimization_orchestrator.get_analysis_results_cache'):
            
            from playbooks.s3.s3_optimization_orchestrator import S3OptimizationOrchestrator
            
            orchestrator = S3OptimizationOrchestrator(region="us-east-1")
            assert orchestrator.region == "us-east-1"
            assert orchestrator.service_orchestrator == mock_service_orchestrator


@pytest.mark.performance
class TestPerformanceSetupVerification:
    """Verify that performance testing setup is working."""
    
    @pytest.mark.asyncio
    async def test_performance_measurement_works(self, performance_tracker):
        """Test that performance measurement is working."""
        performance_tracker.start_timer("performance_test")
        
        # Simulate some async work
        await asyncio.sleep(0.01)
        
        duration = performance_tracker.end_timer("performance_test")
        
        assert duration > 0.005  # Should be at least 5ms
        assert duration < 0.1    # Should be less than 100ms
        
        # Test performance assertion
        performance_tracker.assert_performance("performance_test", 0.1)


@pytest.mark.no_cost_validation
class TestCostValidationSetupVerification:
    """Verify that cost validation testing setup is working."""
    
    def test_cost_constraint_system_is_active(self):
        """Test that cost constraint validation system is active."""
        from services.s3_service import ALLOWED_S3_OPERATIONS, FORBIDDEN_S3_OPERATIONS
        
        # Verify that the constraint lists are populated
        assert len(ALLOWED_S3_OPERATIONS) > 0
        assert len(FORBIDDEN_S3_OPERATIONS) > 0
        
        # Verify critical operations are in the right lists
        assert 'list_buckets' in ALLOWED_S3_OPERATIONS
        assert 'list_objects_v2' in FORBIDDEN_S3_OPERATIONS
    
    def test_cost_constraint_violation_error_works(self, mock_aws_credentials):
        """Test that cost constraint violation errors work correctly."""
        from services.s3_service import S3Service, S3CostConstraintViolationError
        
        service = S3Service(region="us-east-1")
        
        with pytest.raises(S3CostConstraintViolationError):
            service._validate_s3_operation('list_objects_v2')
    
    def test_cost_validator_fixture_works(self, cost_constraint_validator):
        """Test that cost validator fixture is working correctly."""
        # Should track operations
        cost_constraint_validator.validate_operation('list_buckets')
        
        # Should reject forbidden operations
        with pytest.raises(ValueError):
            cost_constraint_validator.validate_operation('get_object')
        
        summary = cost_constraint_validator.get_operation_summary()
        assert summary["total_operations"] == 2
        assert len(summary["forbidden_called"]) == 1
        assert len(summary["allowed_called"]) == 1


class TestMarkerSystem:
    """Test that the pytest marker system is working."""
    
    def test_markers_are_configured(self):
        """Test that pytest markers are properly configured."""
        # This test itself uses markers, so if it runs, markers are working
        assert True
    
    def test_can_run_specific_marker_tests(self):
        """Test that we can run tests with specific markers."""
        # This would be tested by running: pytest -m unit
        # If this test runs when using -m unit, then markers work
        assert True
```
Page 1/14FirstPrevNextLast