#
tokens: 49547/50000 20/161 files (page 2/14)
lines: off (toggle) GitHub
raw markdown copy
This is page 2 of 14. Use http://codebase.md/aws-samples/sample-cfm-tips-mcp?lines=false&page={x} to view the full context.

# Directory Structure

```
├── .gitignore
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── diagnose_cost_optimization_hub_v2.py
├── LICENSE
├── logging_config.py
├── mcp_runbooks.json
├── mcp_server_with_runbooks.py
├── playbooks
│   ├── __init__.py
│   ├── aws_lambda
│   │   ├── __init__.py
│   │   └── lambda_optimization.py
│   ├── cloudtrail
│   │   ├── __init__.py
│   │   └── cloudtrail_optimization.py
│   ├── cloudtrail_optimization.py
│   ├── cloudwatch
│   │   ├── __init__.py
│   │   ├── aggregation_queries.py
│   │   ├── alarms_and_dashboards_analyzer.py
│   │   ├── analysis_engine.py
│   │   ├── base_analyzer.py
│   │   ├── cloudwatch_optimization_analyzer.py
│   │   ├── cloudwatch_optimization_tool.py
│   │   ├── cloudwatch_optimization.py
│   │   ├── cost_controller.py
│   │   ├── general_spend_analyzer.py
│   │   ├── logs_optimization_analyzer.py
│   │   ├── metrics_optimization_analyzer.py
│   │   ├── optimization_orchestrator.py
│   │   └── result_processor.py
│   ├── comprehensive_optimization.py
│   ├── ebs
│   │   ├── __init__.py
│   │   └── ebs_optimization.py
│   ├── ebs_optimization.py
│   ├── ec2
│   │   ├── __init__.py
│   │   └── ec2_optimization.py
│   ├── ec2_optimization.py
│   ├── lambda_optimization.py
│   ├── rds
│   │   ├── __init__.py
│   │   └── rds_optimization.py
│   ├── rds_optimization.py
│   └── s3
│       ├── __init__.py
│       ├── analyzers
│       │   ├── __init__.py
│       │   ├── api_cost_analyzer.py
│       │   ├── archive_optimization_analyzer.py
│       │   ├── general_spend_analyzer.py
│       │   ├── governance_analyzer.py
│       │   ├── multipart_cleanup_analyzer.py
│       │   └── storage_class_analyzer.py
│       ├── base_analyzer.py
│       ├── s3_aggregation_queries.py
│       ├── s3_analysis_engine.py
│       ├── s3_comprehensive_optimization_tool.py
│       ├── s3_optimization_orchestrator.py
│       └── s3_optimization.py
├── README.md
├── requirements.txt
├── runbook_functions_extended.py
├── runbook_functions.py
├── RUNBOOKS_GUIDE.md
├── services
│   ├── __init__.py
│   ├── cloudwatch_pricing.py
│   ├── cloudwatch_service_vended_log.py
│   ├── cloudwatch_service.py
│   ├── compute_optimizer.py
│   ├── cost_explorer.py
│   ├── optimization_hub.py
│   ├── performance_insights.py
│   ├── pricing.py
│   ├── s3_pricing.py
│   ├── s3_service.py
│   ├── storage_lens_service.py
│   └── trusted_advisor.py
├── setup.py
├── test_runbooks.py
├── tests
│   ├── __init__.py
│   ├── conftest.py
│   ├── integration
│   │   ├── __init__.py
│   │   ├── cloudwatch
│   │   │   └── test_cloudwatch_integration.py
│   │   ├── test_cloudwatch_comprehensive_tool_integration.py
│   │   ├── test_cloudwatch_orchestrator_integration.py
│   │   ├── test_integration_suite.py
│   │   └── test_orchestrator_integration.py
│   ├── legacy
│   │   ├── example_output_with_docs.py
│   │   ├── example_wellarchitected_output.py
│   │   ├── test_aws_session_management.py
│   │   ├── test_cloudwatch_orchestrator_pagination.py
│   │   ├── test_cloudwatch_pagination_integration.py
│   │   ├── test_cloudwatch_performance_optimizations.py
│   │   ├── test_cloudwatch_result_processor.py
│   │   ├── test_cloudwatch_timeout_issue.py
│   │   ├── test_documentation_links.py
│   │   ├── test_metrics_pagination_count.py
│   │   ├── test_orchestrator_integration.py
│   │   ├── test_pricing_cache_fix_moved.py
│   │   ├── test_pricing_cache_fix.py
│   │   ├── test_runbook_integration.py
│   │   ├── test_runbooks.py
│   │   ├── test_setup_verification.py
│   │   └── test_stack_trace_fix.py
│   ├── performance
│   │   ├── __init__.py
│   │   ├── cloudwatch
│   │   │   └── test_cloudwatch_performance.py
│   │   ├── test_cloudwatch_parallel_execution.py
│   │   ├── test_parallel_execution.py
│   │   └── test_performance_suite.py
│   ├── pytest-cloudwatch.ini
│   ├── pytest.ini
│   ├── README.md
│   ├── requirements-test.txt
│   ├── run_cloudwatch_tests.py
│   ├── run_tests.py
│   ├── test_setup_verification.py
│   ├── test_suite_main.py
│   └── unit
│       ├── __init__.py
│       ├── analyzers
│       │   ├── __init__.py
│       │   ├── conftest_cloudwatch.py
│       │   ├── test_alarms_and_dashboards_analyzer.py
│       │   ├── test_base_analyzer.py
│       │   ├── test_cloudwatch_base_analyzer.py
│       │   ├── test_cloudwatch_cost_constraints.py
│       │   ├── test_cloudwatch_general_spend_analyzer.py
│       │   ├── test_general_spend_analyzer.py
│       │   ├── test_logs_optimization_analyzer.py
│       │   └── test_metrics_optimization_analyzer.py
│       ├── cloudwatch
│       │   ├── test_cache_control.py
│       │   ├── test_cloudwatch_api_mocking.py
│       │   ├── test_cloudwatch_metrics_pagination.py
│       │   ├── test_cloudwatch_pagination_architecture.py
│       │   ├── test_cloudwatch_pagination_comprehensive_fixed.py
│       │   ├── test_cloudwatch_pagination_comprehensive.py
│       │   ├── test_cloudwatch_pagination_fixed.py
│       │   ├── test_cloudwatch_pagination_real_format.py
│       │   ├── test_cloudwatch_pagination_simple.py
│       │   ├── test_cloudwatch_query_pagination.py
│       │   ├── test_cloudwatch_unit_suite.py
│       │   ├── test_general_spend_tips_refactor.py
│       │   ├── test_import_error.py
│       │   ├── test_mcp_pagination_bug.py
│       │   └── test_mcp_surface_pagination.py
│       ├── s3
│       │   └── live
│       │       ├── test_bucket_listing.py
│       │       ├── test_s3_governance_bucket_discovery.py
│       │       └── test_top_buckets.py
│       ├── services
│       │   ├── __init__.py
│       │   ├── test_cloudwatch_cost_controller.py
│       │   ├── test_cloudwatch_query_service.py
│       │   ├── test_cloudwatch_service.py
│       │   ├── test_cost_control_routing.py
│       │   └── test_s3_service.py
│       └── test_unit_suite.py
└── utils
    ├── __init__.py
    ├── aws_client_factory.py
    ├── cache_decorator.py
    ├── cleanup_manager.py
    ├── cloudwatch_cache.py
    ├── documentation_links.py
    ├── error_handler.py
    ├── intelligent_cache.py
    ├── logging_config.py
    ├── memory_manager.py
    ├── parallel_executor.py
    ├── performance_monitor.py
    ├── progressive_timeout.py
    ├── service_orchestrator.py
    └── session_manager.py
```

# Files

--------------------------------------------------------------------------------
/tests/legacy/example_wellarchitected_output.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Example showing enhanced tool outputs with Well-Architected Framework recommendations
"""

import json
from utils.documentation_links import add_documentation_links

def show_enhanced_examples():
    """Show examples of enhanced tool outputs with Well-Architected recommendations"""
    
    print("CFM Tips - Enhanced Output with Well-Architected Framework")
    print("=" * 70)
    print()
    
    # Example 1: EC2 Right-sizing with Well-Architected guidance
    print("1. EC2 Right-sizing Analysis - Enhanced Output:")
    print("-" * 50)
    ec2_result = {
        "status": "success",
        "data": {
            "underutilized_instances": [
                {
                    "instance_id": "i-1234567890abcdef0",
                    "instance_type": "m5.2xlarge",
                    "finding": "Overprovisioned",
                    "avg_cpu_utilization": 8.5,
                    "avg_memory_utilization": 12.3,
                    "recommendation": {
                        "recommended_instance_type": "m5.large",
                        "estimated_monthly_savings": 180.50,
                        "confidence": "High"
                    }
                },
                {
                    "instance_id": "i-0987654321fedcba0",
                    "instance_type": "c5.xlarge",
                    "finding": "Underprovisioned",
                    "avg_cpu_utilization": 85.2,
                    "recommendation": {
                        "recommended_instance_type": "c5.2xlarge",
                        "estimated_monthly_cost_increase": 120.00,
                        "performance_improvement": "40%"
                    }
                }
            ],
            "count": 2,
            "total_monthly_savings": 180.50,
            "analysis_period": "14 days",
            "data_source": "AWS Compute Optimizer"
        },
        "message": "Found 2 EC2 instances with optimization opportunities"
    }
    
    enhanced_ec2 = add_documentation_links(ec2_result, "ec2", "underutilized")
    
    # Show key sections
    print("Key Findings:")
    for instance in enhanced_ec2["data"]["underutilized_instances"]:
        print(f"  • {instance['instance_id']}: {instance['finding']} - Save ${instance['recommendation'].get('estimated_monthly_savings', 0)}/month")
    
    print(f"\nTotal Monthly Savings: ${enhanced_ec2['data']['total_monthly_savings']}")
    
    print("\nWell-Architected Framework Guidance:")
    wa_framework = enhanced_ec2["wellarchitected_framework"]
    print(f"  Cost Optimization Pillar: {wa_framework['cost_optimization_pillar']}")
    
    print("\n  Applicable Principles:")
    for principle in wa_framework["applicable_principles"]:
        print(f"    • {principle['title']}: {principle['description']}")
    
    print("\n  High Priority Recommendations:")
    for rec in wa_framework["implementation_priority"]["high"]:
        print(f"    • {rec}")
    
    print("\n  Service-Specific Best Practices:")
    for rec in wa_framework["service_specific_recommendations"][:2]:  # Show first 2
        print(f"    • {rec['practice']} ({rec['impact']})")
        print(f"      Implementation: {rec['implementation']}")
    
    print("\n" + "=" * 70)
    
    # Example 2: S3 Storage Optimization
    print("\n2. S3 Storage Optimization - Enhanced Output:")
    print("-" * 50)
    s3_result = {
        "status": "success",
        "comprehensive_s3_optimization": {
            "overview": {
                "total_potential_savings": "$2,450.75",
                "analyses_completed": "6/6",
                "buckets_analyzed": 25,
                "execution_time": "42.3s"
            },
            "key_findings": [
                "15 buckets using suboptimal storage classes",
                "Found 45 incomplete multipart uploads",
                "Identified $1,200 in lifecycle policy savings",
                "3 buckets with high request costs suitable for CloudFront"
            ],
            "top_recommendations": [
                {
                    "type": "storage_class_optimization",
                    "bucket": "analytics-data-lake",
                    "finding": "Standard storage for infrequently accessed data",
                    "recommendation": "Transition to Standard-IA after 30 days",
                    "potential_savings": "$850.25/month",
                    "priority": "High"
                },
                {
                    "type": "lifecycle_policy",
                    "bucket": "backup-archives",
                    "finding": "No lifecycle policy for old backups",
                    "recommendation": "Archive to Glacier Deep Archive after 90 days",
                    "potential_savings": "$650.50/month",
                    "priority": "High"
                }
            ]
        }
    }
    
    enhanced_s3 = add_documentation_links(s3_result, "s3", "storage_optimization")
    
    print("Key Findings:")
    for finding in enhanced_s3["comprehensive_s3_optimization"]["key_findings"]:
        print(f"  • {finding}")
    
    print(f"\nTotal Potential Savings: {enhanced_s3['comprehensive_s3_optimization']['overview']['total_potential_savings']}")
    
    print("\nTop Recommendations:")
    for rec in enhanced_s3["comprehensive_s3_optimization"]["top_recommendations"]:
        print(f"  • {rec['bucket']}: {rec['recommendation']} - {rec['potential_savings']}")
    
    print("\nWell-Architected Framework Guidance:")
    wa_s3 = enhanced_s3["wellarchitected_framework"]
    
    print("  High Priority Actions:")
    for action in wa_s3["implementation_priority"]["high"]:
        print(f"    • {action}")
    
    print("  Medium Priority Actions:")
    for action in wa_s3["implementation_priority"]["medium"][:2]:  # Show first 2
        print(f"    • {action}")
    
    print("\n" + "=" * 70)
    
    # Example 3: Multi-Service Comprehensive Analysis
    print("\n3. Multi-Service Comprehensive Analysis - Enhanced Output:")
    print("-" * 50)
    comprehensive_result = {
        "status": "success",
        "comprehensive_analysis": {
            "overview": {
                "total_monthly_cost": "$8,450.25",
                "total_potential_savings": "$2,180.75",
                "savings_percentage": "25.8%",
                "services_analyzed": ["EC2", "EBS", "RDS", "Lambda", "S3"]
            },
            "service_breakdown": {
                "ec2": {"current_cost": 3200, "potential_savings": 640, "optimization_opportunities": 12},
                "ebs": {"current_cost": 850, "potential_savings": 180, "optimization_opportunities": 8},
                "rds": {"current_cost": 2100, "potential_savings": 420, "optimization_opportunities": 3},
                "lambda": {"current_cost": 150, "potential_savings": 45, "optimization_opportunities": 15},
                "s3": {"current_cost": 2150, "potential_savings": 895, "optimization_opportunities": 22}
            },
            "top_opportunities": [
                {"service": "S3", "type": "Storage Class Optimization", "savings": 895, "effort": "Low"},
                {"service": "EC2", "type": "Right-sizing", "savings": 640, "effort": "Medium"},
                {"service": "RDS", "type": "Reserved Instances", "savings": 420, "effort": "Low"}
            ]
        }
    }
    
    enhanced_comprehensive = add_documentation_links(comprehensive_result, None, "comprehensive")
    
    print("Cost Overview:")
    overview = enhanced_comprehensive["comprehensive_analysis"]["overview"]
    print(f"  • Current Monthly Cost: ${overview['total_monthly_cost']}")
    print(f"  • Potential Savings: ${overview['total_potential_savings']} ({overview['savings_percentage']})")
    
    print("\nTop Optimization Opportunities:")
    for opp in enhanced_comprehensive["comprehensive_analysis"]["top_opportunities"]:
        print(f"  • {opp['service']} - {opp['type']}: ${opp['savings']}/month ({opp['effort']} effort)")
    
    print("\nWell-Architected Framework Principles:")
    wa_comp = enhanced_comprehensive["wellarchitected_framework"]
    for principle in wa_comp["principles"][:3]:  # Show first 3
        print(f"  • {principle['title']}")
        print(f"    {principle['description']}")
        print(f"    Key practices: {', '.join(principle['best_practices'][:2])}")
        print()
    
    print("=" * 70)
    print("\nEnhanced Features Summary:")
    print("✓ Documentation links to AWS best practices")
    print("✓ Well-Architected Framework Cost Optimization pillar mapping")
    print("✓ Service-specific implementation guidance")
    print("✓ Impact assessment and priority ranking")
    print("✓ Principle-based recommendations")
    print("✓ Actionable next steps with implementation details")

if __name__ == "__main__":
    show_enhanced_examples()
```

--------------------------------------------------------------------------------
/RUNBOOKS_GUIDE.md:
--------------------------------------------------------------------------------

```markdown
# AWS Cost Optimization Runbooks with MCP v3

This guide shows how to use the AWS Cost Optimization Runbooks with the MCP server that includes proper Cost Optimization Hub permissions.

## What's Included

### Core AWS Services
- ✅ **Cost Explorer** - Retrieve cost data and usage metrics
- ✅ **Cost Optimization Hub** - With correct permissions and API calls
- ✅ **Compute Optimizer** - Get right-sizing recommendations
- ✅ **Trusted Advisor** - Cost optimization checks
- ✅ **Performance Insights** - RDS performance metrics

### Cost Optimization Runbooks
- 🔧 **EC2 Right Sizing** - Identify underutilized EC2 instances
- 💾 **EBS Optimization** - Find unused and underutilized volumes
- 🗄️ **RDS Optimization** - Identify idle and underutilized databases
- ⚡ **Lambda Optimization** - Find overprovisioned and unused functions
- 📋 **CloudTrail Optimization** - Identify duplicate management event trails
- 📊 **Comprehensive Analysis** - Multi-service cost analysis

## Quick Start

### 1. Setup
```bash
cd <replace-with-project-folder>/

# Make sure all files are executable
chmod +x mcp_server_with_runbooks.py

# Test the server
python3 -m py_compile mcp_server_with_runbooks.py
python3 -c "from playbooks.ec2.ec2_optimization import run_ec2_right_sizing_analysis; print('Playbooks OK')"
```

### 2. Configure AWS Permissions

Apply the correct IAM policy for Cost Optimization Hub:

```json
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "cost-optimization-hub:ListEnrollmentStatuses",
        "cost-optimization-hub:ListRecommendations",
        "cost-optimization-hub:GetRecommendation", 
        "cost-optimization-hub:ListRecommendationSummaries",
        "ce:GetCostAndUsage",
        "ce:GetCostForecast",
        "compute-optimizer:GetEC2InstanceRecommendations",
        "compute-optimizer:GetEBSVolumeRecommendations",
        "compute-optimizer:GetLambdaFunctionRecommendations",
        "ec2:DescribeInstances",
        "ec2:DescribeVolumes",
        "rds:DescribeDBInstances",
        "lambda:ListFunctions",
        "cloudwatch:GetMetricStatistics",
        "s3:ListBucket",
        "s3:ListObjectsV2",
        "support:DescribeTrustedAdvisorChecks",
        "support:DescribeTrustedAdvisorCheckResult",
        "pi:GetResourceMetrics",
        "cloudtrail:DescribeTrails",
        "cloudtrail:GetTrailStatus",
        "cloudtrail:GetEventSelectors"
      ],
      "Resource": "*"
    }
  ]
}
```

### 3. Install dependencies
pip install -r requirements_fixed.txt

### 4. Configure AWS credentials
aws configure

### 5.  Add the MCP server config to Amazon Q using the mcp_runbooks.json as a template
vi ~/.aws/amazonq/mcp.json

## Available Runbook Tools

### EC2 Right Sizing Runbooks

#### 1. `ec2_rightsizing`
Analyze EC2 instances for right-sizing opportunities.

**Example Usage:**
```
"Run EC2 right-sizing analysis for us-east-1 region with 14-day lookback period"
```

**Parameters:**
- `region`: AWS region to analyze
- `lookback_period_days`: Days to analyze (default: 14)
- `cpu_threshold`: CPU utilization threshold % (default: 40.0)

#### 2. `ec2_report`
Generate comprehensive EC2 right-sizing report.

**Example Usage:**
```
"Generate an EC2 right-sizing report for us-east-1 in markdown format"
```

### EBS Optimization Runbooks

#### 1. `ebs_optimization`
Analyze EBS volumes for optimization opportunities.

**Example Usage:**
```
"Analyze EBS volumes in us-east-1 for optimization opportunities"
```

#### 2. `ebs_unused`
Find unused EBS volumes that can be deleted.

**Example Usage:**
```
"Find unused EBS volumes older than 30 days in us-east-1"
```

#### 3. `ebs_report`
Generate comprehensive EBS optimization report.

**Example Usage:**
```
"Generate a comprehensive EBS optimization report for us-east-1"
```

### RDS Optimization Runbooks

#### 1. `rds_optimization`
Analyze RDS instances for optimization opportunities.

**Example Usage:**
```
"Analyze RDS instances in us-east-1 for underutilization"
```

#### 2. `rds_idle`
Find idle RDS instances with minimal activity.

**Example Usage:**
```
"Find idle RDS instances with less than 1 connection in the last 7 days"
```

#### 3. `rds_report`
Generate comprehensive RDS optimization report.

**Example Usage:**
```
"Generate an RDS optimization report for us-east-1"
```

### Lambda Optimization Runbooks

#### 1. `lambda_optimization`
Analyze Lambda functions for optimization opportunities.

**Example Usage:**
```
"Analyze Lambda functions in us-east-1 for memory optimization"
```

#### 2. `lambda_unused`
Find unused Lambda functions.

**Example Usage:**
```
"Find Lambda functions with less than 5 invocations in the last 30 days"
```

#### 3. `lambda_report`
Generate comprehensive Lambda optimization report.

**Example Usage:**
```
"Generate a Lambda optimization report for us-east-1"
```

### CloudTrail Optimization Runbooks

#### 1. `get_management_trails`
Get CloudTrail trails that have management events enabled.

**Example Usage:**
```
"Show me all CloudTrail trails with management events enabled in us-east-1"
```

#### 2. `run_cloudtrail_trails_analysis`
Analyze CloudTrail trails to identify duplicate management event trails.

**Example Usage:**
```
"Analyze CloudTrail trails in us-east-1 for cost optimization opportunities"
```

**Parameters:**
- `region`: AWS region to analyze

#### 3. `generate_cloudtrail_report`
Generate comprehensive CloudTrail optimization report.

**Example Usage:**
```
"Generate a CloudTrail optimization report for us-east-1 in markdown format"
```

**Parameters:**
- `region`: AWS region to analyze
- `format`: "json" or "markdown" (default: "json")

### Comprehensive Analysis

#### `comprehensive_analysis`
Run analysis across all services (EC2, EBS, RDS, Lambda).

**Example Usage:**
```
"Run comprehensive cost analysis for us-east-1 covering all services"
```

**Parameters:**
- `region`: AWS region to analyze
- `services`: Array of services ["ec2", "ebs", "rds", "lambda"]
- `lookback_period_days`: Days to analyze (default: 14)
- `output_format`: "json" or "markdown"

### Cost Optimization Hub Tools (Shortened)

#### 1. `list_coh_enrollment`
Check Cost Optimization Hub enrollment status.

#### 2. `get_coh_recommendations`
Get cost optimization recommendations.

#### 3. `get_coh_summaries`
Get recommendation summaries.

#### 4. `get_coh_recommendation`
Get specific recommendation by ID.

## Sample Conversation Flow
**Configure AWS credentials**
```aws configure```

**Add the MCP server config to Amazon Q using the mcp_runbooks.json as a template**
```vi ~/.aws/amazonq/mcp.json```

```bash
# Start Q with runbooks
q chat
```

**User:** "What cost optimization tools are available?"

**Q:** "I can see several AWS cost optimization tools including Cost Optimization Hub, runbooks for EC2, EBS, RDS, and Lambda optimization..."

**User:** "Run a comprehensive cost analysis for us-east-1"

**Q:** "I'll run a comprehensive cost analysis across all services for the us-east-1 region..."
*[Uses comprehensive_analysis tool]*

**User:** "Show me unused EBS volumes that are costing money"

**Q:** "Let me identify unused EBS volumes in your account..."
*[Uses ebs_unused tool]*

**User:** "Generate an EC2 right-sizing report in markdown format"

**Q:** "I'll generate a detailed EC2 right-sizing report in markdown format..."
*[Uses ec2_report tool]*

## Tool Names (For Reference)

The tool names have been shortened to fit MCP's 64-character limit:

| Purpose | Tool Name |
|----------|----------|
| `Run EC2 right sizing analysis` | `ec2_rightsizing` |
| `Generate EC2 right sizing report` | `ec2_report` |
| `Run EBS optimization analysis` | `ebs_optimization` |
| `Identify unused EBS volumes` | `ebs_unused` |
| `Generate EBS optimization report` | `ebs_report` |
| `Run RDS optimization analysis` | `rds_optimization` |
| `Iidentify idle RDS instances` | `rds_idle` |
| `Generate RDS optimization report` | `rds_report` |
| `Run Lambda optimization analysis` | `lambda_optimization` |
| `Identify unused Lambda functions` | `lambda_unused` |
| `Generate Lambda optimization report` | `lambda_report` |
| `Run comprehensive cost analysis` | `comprehensive_analysis` |
| `Get CloudTrail management trails` | `get_management_trails` |
| `Run CloudTrail trails analysis` | `run_cloudtrail_trails_analysis` |
| `Generate CloudTrail optimization report` | `generate_cloudtrail_report` |
| `List Cost Optimization Hub enrollment statuses` | `list_coh_enrollment` |
| `Get Cost Optimization Hub recommendations` | `get_coh_recommendations` |
| `Get Cost Optimization Hub recommendation summaries` | `get_coh_summaries` |
| `Get a particular Cost Optimization Hub recommendation` | `get_coh_recommendation` |

## Troubleshooting

### Common Issues

1. **Import Error for playbook functions**
   ```bash
   # Make sure PYTHONPATH is set in mcp_runbooks.json
   export PYTHONPATH="<replace-with-project-folder>"
   ```

2. **Cost Optimization Hub Errors**
   ```bash
   # Run the diagnostic first
   python3 diagnose_cost_optimization_hub_v2.py
   ```
```

--------------------------------------------------------------------------------
/utils/cache_decorator.py:
--------------------------------------------------------------------------------

```python
"""
Caching Decorator with TTL Support

Provides DAO-level caching for CloudWatch service methods with:
- Time-to-live (TTL) support
- Page-aware cache keys
- Thread-safe implementation
- Memory-efficient LRU eviction
- Optional caching (can be disabled via environment variable or parameter)
"""

import functools
import hashlib
import json
import logging
import os
import time
from typing import Any, Callable, Dict, Optional
from threading import Lock

logger = logging.getLogger(__name__)

# Global flag to enable/disable caching
# Can be controlled via environment variable: CFM_ENABLE_CACHE=false
_CACHE_ENABLED = os.getenv('CFM_ENABLE_CACHE', 'true').lower() in ('true', '1', 'yes')


class TTLCache:
    """Thread-safe cache with TTL support."""
    
    def __init__(self, ttl_seconds: int = 600, max_size: int = 1000):
        """
        Initialize TTL cache.
        
        Args:
            ttl_seconds: Time-to-live in seconds (default: 600 = 10 minutes)
            max_size: Maximum cache entries (default: 1000)
        """
        self.ttl_seconds = ttl_seconds
        self.max_size = max_size
        self.cache: Dict[str, tuple[Any, float]] = {}
        self.lock = Lock()
        self.hits = 0
        self.misses = 0
    
    def _is_expired(self, timestamp: float) -> bool:
        """Check if cache entry is expired."""
        return (time.time() - timestamp) > self.ttl_seconds
    
    def _evict_expired(self):
        """Remove expired entries."""
        current_time = time.time()
        expired_keys = [
            key for key, (_, timestamp) in self.cache.items()
            if (current_time - timestamp) > self.ttl_seconds
        ]
        for key in expired_keys:
            del self.cache[key]
    
    def _evict_lru(self):
        """Evict least recently used entries if cache is full."""
        if len(self.cache) >= self.max_size:
            # Remove oldest 10% of entries
            sorted_items = sorted(self.cache.items(), key=lambda x: x[1][1])
            num_to_remove = max(1, len(sorted_items) // 10)
            for key, _ in sorted_items[:num_to_remove]:
                del self.cache[key]
    
    def get(self, key: str) -> Optional[Any]:
        """Get value from cache if not expired."""
        with self.lock:
            if key in self.cache:
                value, timestamp = self.cache[key]
                if not self._is_expired(timestamp):
                    self.hits += 1
                    logger.debug(f"Cache HIT for key: {key[:50]}...")
                    return value
                else:
                    # Remove expired entry
                    del self.cache[key]
                    logger.debug(f"Cache EXPIRED for key: {key[:50]}...")
            
            self.misses += 1
            logger.debug(f"Cache MISS for key: {key[:50]}...")
            return None
    
    def set(self, key: str, value: Any):
        """Set value in cache with current timestamp."""
        with self.lock:
            # Evict expired entries periodically
            if len(self.cache) % 100 == 0:
                self._evict_expired()
            
            # Evict LRU if cache is full
            self._evict_lru()
            
            self.cache[key] = (value, time.time())
            logger.debug(f"Cache SET for key: {key[:50]}...")
    
    def clear(self):
        """Clear all cache entries."""
        with self.lock:
            self.cache.clear()
            self.hits = 0
            self.misses = 0
            logger.info("Cache cleared")
    
    def get_stats(self) -> Dict[str, Any]:
        """Get cache statistics."""
        with self.lock:
            total_requests = self.hits + self.misses
            hit_rate = (self.hits / total_requests * 100) if total_requests > 0 else 0
            return {
                'size': len(self.cache),
                'max_size': self.max_size,
                'hits': self.hits,
                'misses': self.misses,
                'hit_rate': round(hit_rate, 2),
                'ttl_seconds': self.ttl_seconds
            }


# Global cache instance for CloudWatch DAO methods
_cloudwatch_cache = TTLCache(ttl_seconds=600, max_size=1000)


def _generate_cache_key(func_name: str, args: tuple, kwargs: dict) -> str:
    """
    Generate cache key from function name and arguments.
    
    Includes page parameter to ensure pagination is cached correctly.
    """
    # Extract key parameters
    key_parts = [func_name]
    
    # Add positional args (excluding self)
    if args and len(args) > 1:
        key_parts.extend(str(arg) for arg in args[1:])
    
    # Add important kwargs (including page)
    important_params = [
        'region', 'page', 'lookback_days', 'namespace_filter',
        'log_group_name_prefix', 'alarm_name_prefix', 'dashboard_name_prefix',
        'can_spend_for_estimate', 'can_spend_for_exact_usage_estimate'
    ]
    
    for param in important_params:
        if param in kwargs:
            key_parts.append(f"{param}={kwargs[param]}")
    
    # Create hash of key parts
    key_string = "|".join(str(part) for part in key_parts)
    key_hash = hashlib.md5(key_string.encode()).hexdigest()
    
    return f"{func_name}:{key_hash}"


def dao_cache(ttl_seconds: int = 600, enabled: Optional[bool] = None):
    """
    Decorator for caching DAO method results with TTL.
    
    Caching can be disabled globally via CFM_ENABLE_CACHE environment variable
    or per-decorator via the enabled parameter.
    
    Args:
        ttl_seconds: Time-to-live in seconds (default: 600 = 10 minutes)
        enabled: Override global cache setting (None = use global, True/False = force)
    
    Usage:
        # Use global cache setting
        @dao_cache(ttl_seconds=600)
        async def get_log_groups(self, page: int = 1, **kwargs):
            pass
        
        # Force caching disabled for this method
        @dao_cache(ttl_seconds=600, enabled=False)
        async def get_real_time_data(self, **kwargs):
            pass
        
        # Force caching enabled for this method
        @dao_cache(ttl_seconds=600, enabled=True)
        async def get_expensive_data(self, **kwargs):
            pass
    
    Environment Variables:
        CFM_ENABLE_CACHE: Set to 'false', '0', or 'no' to disable caching globally
    """
    def decorator(func: Callable) -> Callable:
        @functools.wraps(func)
        async def async_wrapper(*args, **kwargs):
            # Check if caching is enabled
            cache_enabled = enabled if enabled is not None else _CACHE_ENABLED
            
            if not cache_enabled:
                logger.debug(f"Cache disabled for {func.__name__}, calling function directly")
                return await func(*args, **kwargs)
            
            # Generate cache key
            cache_key = _generate_cache_key(func.__name__, args, kwargs)
            
            # Try to get from cache
            cached_value = _cloudwatch_cache.get(cache_key)
            if cached_value is not None:
                logger.debug(f"Returning cached result for {func.__name__}")
                return cached_value
            
            # Call original function
            result = await func(*args, **kwargs)
            
            # Cache the result
            _cloudwatch_cache.set(cache_key, result)
            
            return result
        
        @functools.wraps(func)
        def sync_wrapper(*args, **kwargs):
            # Check if caching is enabled
            cache_enabled = enabled if enabled is not None else _CACHE_ENABLED
            
            if not cache_enabled:
                logger.debug(f"Cache disabled for {func.__name__}, calling function directly")
                return func(*args, **kwargs)
            
            # Generate cache key
            cache_key = _generate_cache_key(func.__name__, args, kwargs)
            
            # Try to get from cache
            cached_value = _cloudwatch_cache.get(cache_key)
            if cached_value is not None:
                logger.debug(f"Returning cached result for {func.__name__}")
                return cached_value
            
            # Call original function
            result = func(*args, **kwargs)
            
            # Cache the result
            _cloudwatch_cache.set(cache_key, result)
            
            return result
        
        # Return appropriate wrapper based on function type
        if asyncio.iscoroutinefunction(func):
            return async_wrapper
        else:
            return sync_wrapper
    
    return decorator


def get_cache_stats() -> Dict[str, Any]:
    """Get cache statistics."""
    stats = _cloudwatch_cache.get_stats()
    stats['enabled'] = _CACHE_ENABLED
    return stats


def clear_cache():
    """Clear all cache entries."""
    _cloudwatch_cache.clear()


def is_cache_enabled() -> bool:
    """Check if caching is currently enabled."""
    return _CACHE_ENABLED


def enable_cache():
    """Enable caching globally (runtime override)."""
    global _CACHE_ENABLED
    _CACHE_ENABLED = True
    logger.info("Cache enabled globally")


def disable_cache():
    """Disable caching globally (runtime override)."""
    global _CACHE_ENABLED
    _CACHE_ENABLED = False
    logger.info("Cache disabled globally")


def set_cache_enabled(enabled: bool):
    """
    Set cache enabled state.
    
    Args:
        enabled: True to enable caching, False to disable
    """
    global _CACHE_ENABLED
    _CACHE_ENABLED = enabled
    logger.info(f"Cache {'enabled' if enabled else 'disabled'} globally")


# Import asyncio at the end to avoid circular imports
import asyncio

```

--------------------------------------------------------------------------------
/tests/unit/cloudwatch/test_mcp_surface_pagination.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Test MCP surface pagination with real parameters to analyze response structure.
"""

import pytest
import sys
import os
import json
from datetime import datetime

# Add the project root to the path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../../..'))


@pytest.mark.skip(reason="Tests need refactoring to match actual API structure")
class TestMCPSurfacePagination:
    """Test MCP surface pagination with real parameters."""
    
    @pytest.mark.asyncio
    async def test_mcp_cloudwatch_general_spend_analysis_surface(self):
        """Test MCP surface call with real parameters to analyze response structure."""
        
        # Import the MCP function directly
        from runbook_functions import run_cloudwatch_general_spend_analysis
        
        # Test parameters from user request
        test_params = {
            "region": "us-east-1",
            "lookback_days": 30,
            "allow_minimal_cost_metrics": True,
            "page": 1
        }
        
        print(f"\n=== Testing MCP Surface Call ===")
        print(f"Parameters: {json.dumps(test_params, indent=2)}")
        
        # Call the actual MCP function
        result = await run_cloudwatch_general_spend_analysis(test_params)
        
        # Parse the response
        response_text = result[0].text
        response_data = json.loads(response_text)
        
        print(f"\n=== Response Structure Analysis ===")
        print(f"Response keys: {list(response_data.keys())}")
        
        # Check if pagination exists in the response
        if 'data' in response_data:
            print(f"Data keys: {list(response_data['data'].keys())}")
            
            # Look for pagination in different places
            pagination_locations = []
            
            # Check top-level data pagination
            if 'pagination' in response_data['data']:
                pagination_locations.append('data.pagination')
                print(f"Found pagination at data.pagination: {response_data['data']['pagination']}")
            
            # Check configuration_analysis sections
            if 'configuration_analysis' in response_data['data']:
                config_analysis = response_data['data']['configuration_analysis']
                print(f"Configuration analysis keys: {list(config_analysis.keys())}")
                
                for section_name, section_data in config_analysis.items():
                    if isinstance(section_data, dict) and 'pagination' in section_data:
                        pagination_locations.append(f'data.configuration_analysis.{section_name}.pagination')
                        print(f"Found pagination at data.configuration_analysis.{section_name}.pagination: {section_data['pagination']}")
            
            print(f"\nPagination found at locations: {pagination_locations}")
            
            # Check for items/data arrays
            items_locations = []
            if 'configuration_analysis' in response_data['data']:
                config_analysis = response_data['data']['configuration_analysis']
                for section_name, section_data in config_analysis.items():
                    if isinstance(section_data, dict):
                        if 'items' in section_data:
                            items_count = len(section_data['items']) if isinstance(section_data['items'], list) else 'not_list'
                            items_locations.append(f'data.configuration_analysis.{section_name}.items ({items_count} items)')
                        
                        # Check for specific data arrays
                        for data_key in ['log_groups', 'metrics', 'alarms', 'dashboards']:
                            if data_key in section_data and isinstance(section_data[data_key], list):
                                items_count = len(section_data[data_key])
                                items_locations.append(f'data.configuration_analysis.{section_name}.{data_key} ({items_count} items)')
            
            print(f"Items/data arrays found at: {items_locations}")
        
        # Check response metadata
        if 'runbook_metadata' in response_data:
            print(f"Runbook metadata keys: {list(response_data['runbook_metadata'].keys())}")
        
        if 'orchestrator_metadata' in response_data:
            print(f"Orchestrator metadata keys: {list(response_data['orchestrator_metadata'].keys())}")
        
        # Check for page-related fields at top level
        page_fields = []
        for key in response_data.keys():
            if 'page' in key.lower() or 'pagination' in key.lower():
                page_fields.append(f"{key}: {response_data[key]}")
        
        if page_fields:
            print(f"Top-level page-related fields: {page_fields}")
        
        # Print full response structure (truncated for readability)
        print(f"\n=== Full Response Structure (first 2000 chars) ===")
        response_str = json.dumps(response_data, indent=2, default=str)
        print(response_str[:2000] + "..." if len(response_str) > 2000 else response_str)
        
        # Assertions to verify the response structure
        assert isinstance(response_data, dict), "Response should be a dictionary"
        assert 'status' in response_data, "Response should have status field"
        assert 'data' in response_data, "Response should have data field"
        
        # Test passes if we get a valid response structure
        print(f"\n=== Test Result ===")
        print(f"✅ MCP surface call successful")
        print(f"✅ Response structure analyzed")
        print(f"✅ Pagination locations identified: {len(pagination_locations) if 'pagination_locations' in locals() else 0}")
    
    @pytest.mark.asyncio
    async def test_mcp_cloudwatch_metrics_optimization_surface(self):
        """Test MCP metrics optimization surface call."""
        
        from runbook_functions import run_cloudwatch_metrics_optimization
        
        test_params = {
            "region": "us-east-1",
            "lookback_days": 30,
            "allow_minimal_cost_metrics": True,
            "page": 1
        }
        
        print(f"\n=== Testing Metrics Optimization MCP Surface Call ===")
        
        result = await run_cloudwatch_metrics_optimization(test_params)
        response_data = json.loads(result[0].text)
        
        # Check for pagination in metrics response
        pagination_found = False
        if 'data' in response_data and 'configuration_analysis' in response_data['data']:
            config_analysis = response_data['data']['configuration_analysis']
            if 'metrics' in config_analysis and 'pagination' in config_analysis['metrics']:
                pagination_found = True
                pagination_info = config_analysis['metrics']['pagination']
                print(f"Metrics pagination: {pagination_info}")
        
        print(f"Metrics optimization pagination found: {pagination_found}")
        assert isinstance(response_data, dict), "Metrics response should be a dictionary"
    
    @pytest.mark.asyncio
    async def test_pagination_consistency_across_apis(self):
        """Test pagination consistency across different CloudWatch APIs."""
        
        from runbook_functions import (
            run_cloudwatch_general_spend_analysis,
            run_cloudwatch_metrics_optimization,
            run_cloudwatch_logs_optimization,
            run_cloudwatch_alarms_and_dashboards_optimization
        )
        
        test_params = {
            "region": "us-east-1",
            "lookback_days": 30,
            "allow_minimal_cost_metrics": True,
            "page": 1
        }
        
        apis_to_test = [
            ("general_spend", run_cloudwatch_general_spend_analysis),
            ("metrics", run_cloudwatch_metrics_optimization),
            ("logs", run_cloudwatch_logs_optimization),
            ("alarms", run_cloudwatch_alarms_and_dashboards_optimization),
        ]
        
        pagination_structures = {}
        
        for api_name, api_func in apis_to_test:
            print(f"\n=== Testing {api_name} API ===")
            
            try:
                result = await api_func(test_params)
                response_data = json.loads(result[0].text)
                
                # Find pagination structures
                pagination_paths = []
                if 'data' in response_data and 'configuration_analysis' in response_data['data']:
                    config_analysis = response_data['data']['configuration_analysis']
                    for section_name, section_data in config_analysis.items():
                        if isinstance(section_data, dict) and 'pagination' in section_data:
                            pagination_paths.append(f"data.configuration_analysis.{section_name}.pagination")
                
                pagination_structures[api_name] = pagination_paths
                print(f"{api_name} pagination paths: {pagination_paths}")
                
            except Exception as e:
                print(f"Error testing {api_name}: {str(e)}")
                pagination_structures[api_name] = f"ERROR: {str(e)}"
        
        print(f"\n=== Pagination Structure Summary ===")
        for api_name, paths in pagination_structures.items():
            print(f"{api_name}: {paths}")
        
        # Test passes if we collected pagination info from all APIs
        assert len(pagination_structures) == len(apis_to_test), "Should test all APIs"


if __name__ == '__main__':
    pytest.main([__file__, '-v', '-s'])  # -s to show print statements
```

--------------------------------------------------------------------------------
/utils/cleanup_manager.py:
--------------------------------------------------------------------------------

```python
"""
Cleanup Manager for CFM Tips MCP Server

Handles automatic cleanup of sessions, results, and temporary data.
"""

import logging
import threading
import time
import os
import glob
from datetime import datetime, timedelta
from typing import Dict, Any, Optional

logger = logging.getLogger(__name__)

class CleanupManager:
    """Manages automatic cleanup of sessions and temporary data."""
    
    def __init__(self, 
                 session_timeout_minutes: int = 60,
                 result_retention_minutes: int = 120,
                 cleanup_interval_minutes: int = 15):
        self.session_timeout_minutes = session_timeout_minutes
        self.result_retention_minutes = result_retention_minutes
        self.cleanup_interval_minutes = cleanup_interval_minutes
        self._shutdown = False
        self._cleanup_thread = None
        
        # Start cleanup thread
        self._start_cleanup_thread()
    
    def _start_cleanup_thread(self):
        """Start the background cleanup thread."""
        if self._cleanup_thread is None or not self._cleanup_thread.is_alive():
            self._cleanup_thread = threading.Thread(
                target=self._cleanup_worker,
                daemon=True,
                name="CleanupManager"
            )
            self._cleanup_thread.start()
            logger.info(f"Cleanup manager started (session timeout: {self.session_timeout_minutes}min)")
    
    def _cleanup_worker(self):
        """Background worker for periodic cleanup."""
        while not self._shutdown:
            try:
                self._perform_cleanup()
                time.sleep(self.cleanup_interval_minutes * 60)
            except Exception as e:
                logger.error(f"Error in cleanup worker: {e}")
                time.sleep(60)  # Wait 1 minute on error
    
    def _perform_cleanup(self):
        """Perform all cleanup operations."""
        logger.debug("Starting periodic cleanup")
        
        # Clean up session files
        self._cleanup_session_files()
        
        # Clean up temporary files
        self._cleanup_temp_files()
        
        # Clean up old log files
        self._cleanup_log_files()
        
        logger.debug("Periodic cleanup completed")
    
    def _cleanup_session_files(self):
        """Clean up old session database files."""
        try:
            sessions_dir = "sessions"
            if not os.path.exists(sessions_dir):
                return
            
            cutoff_time = datetime.now() - timedelta(minutes=self.session_timeout_minutes)
            cleaned_count = 0
            
            # Find all session database files
            session_files = glob.glob(os.path.join(sessions_dir, "session_*.db"))
            
            for session_file in session_files:
                try:
                    # Check file modification time
                    file_mtime = datetime.fromtimestamp(os.path.getmtime(session_file))
                    
                    if file_mtime < cutoff_time:
                        # Remove old session file
                        os.remove(session_file)
                        cleaned_count += 1
                        logger.debug(f"Cleaned up old session file: {session_file}")
                        
                        # Also remove any associated WAL and SHM files
                        for ext in ['-wal', '-shm']:
                            wal_file = session_file + ext
                            if os.path.exists(wal_file):
                                os.remove(wal_file)
                
                except Exception as e:
                    logger.warning(f"Error cleaning session file {session_file}: {e}")
            
            if cleaned_count > 0:
                logger.info(f"Cleaned up {cleaned_count} old session files")
                
        except Exception as e:
            logger.error(f"Error in session file cleanup: {e}")
    
    def _cleanup_temp_files(self):
        """Clean up temporary files and directories."""
        try:
            temp_patterns = [
                "*.tmp",
                "*.temp",
                "__pycache__/*.pyc",
                "*.log.old"
            ]
            
            cleaned_count = 0
            
            for pattern in temp_patterns:
                temp_files = glob.glob(pattern, recursive=True)
                
                for temp_file in temp_files:
                    try:
                        # Check if file is older than retention period
                        file_mtime = datetime.fromtimestamp(os.path.getmtime(temp_file))
                        cutoff_time = datetime.now() - timedelta(minutes=self.result_retention_minutes)
                        
                        if file_mtime < cutoff_time:
                            if os.path.isfile(temp_file):
                                os.remove(temp_file)
                                cleaned_count += 1
                            elif os.path.isdir(temp_file):
                                import shutil
                                shutil.rmtree(temp_file)
                                cleaned_count += 1
                    
                    except Exception as e:
                        logger.warning(f"Error cleaning temp file {temp_file}: {e}")
            
            if cleaned_count > 0:
                logger.info(f"Cleaned up {cleaned_count} temporary files")
                
        except Exception as e:
            logger.error(f"Error in temp file cleanup: {e}")
    
    def _cleanup_log_files(self):
        """Clean up old log files."""
        try:
            logs_dir = "logs"
            if not os.path.exists(logs_dir):
                return
            
            # Keep logs for 7 days
            cutoff_time = datetime.now() - timedelta(days=7)
            cleaned_count = 0
            
            log_files = glob.glob(os.path.join(logs_dir, "*.log.*"))
            
            for log_file in log_files:
                try:
                    file_mtime = datetime.fromtimestamp(os.path.getmtime(log_file))
                    
                    if file_mtime < cutoff_time:
                        os.remove(log_file)
                        cleaned_count += 1
                        logger.debug(f"Cleaned up old log file: {log_file}")
                
                except Exception as e:
                    logger.warning(f"Error cleaning log file {log_file}: {e}")
            
            if cleaned_count > 0:
                logger.info(f"Cleaned up {cleaned_count} old log files")
                
        except Exception as e:
            logger.error(f"Error in log file cleanup: {e}")
    
    def force_cleanup(self):
        """Force immediate cleanup of all resources."""
        logger.info("Forcing immediate cleanup")
        self._perform_cleanup()
        
        # Also clean up from session manager and parallel executor
        try:
            from . import get_session_manager, get_parallel_executor
            
            # Clean up session manager
            session_manager = get_session_manager()
            session_manager._cleanup_expired_sessions()
            
            # Clean up parallel executor results
            executor = get_parallel_executor()
            executor.clear_results(older_than_minutes=self.result_retention_minutes)
            
            logger.info("Force cleanup completed")
            
        except Exception as e:
            logger.error(f"Error in force cleanup: {e}")
    
    def get_cleanup_stats(self) -> Dict[str, Any]:
        """Get statistics about cleanup operations."""
        try:
            stats = {
                'session_timeout_minutes': self.session_timeout_minutes,
                'result_retention_minutes': self.result_retention_minutes,
                'cleanup_interval_minutes': self.cleanup_interval_minutes,
                'cleanup_thread_alive': self._cleanup_thread.is_alive() if self._cleanup_thread else False,
                'sessions_directory_exists': os.path.exists('sessions'),
                'logs_directory_exists': os.path.exists('logs')
            }
            
            # Count current files
            if os.path.exists('sessions'):
                session_files = glob.glob('sessions/session_*.db')
                stats['active_session_files'] = len(session_files)
            else:
                stats['active_session_files'] = 0
            
            if os.path.exists('logs'):
                log_files = glob.glob('logs/*.log*')
                stats['log_files'] = len(log_files)
            else:
                stats['log_files'] = 0
            
            return stats
            
        except Exception as e:
            logger.error(f"Error getting cleanup stats: {e}")
            return {'error': str(e)}
    
    def update_settings(self, 
                       session_timeout_minutes: Optional[int] = None,
                       result_retention_minutes: Optional[int] = None,
                       cleanup_interval_minutes: Optional[int] = None):
        """Update cleanup settings."""
        if session_timeout_minutes is not None:
            self.session_timeout_minutes = session_timeout_minutes
            logger.info(f"Updated session timeout to {session_timeout_minutes} minutes")
        
        if result_retention_minutes is not None:
            self.result_retention_minutes = result_retention_minutes
            logger.info(f"Updated result retention to {result_retention_minutes} minutes")
        
        if cleanup_interval_minutes is not None:
            self.cleanup_interval_minutes = cleanup_interval_minutes
            logger.info(f"Updated cleanup interval to {cleanup_interval_minutes} minutes")
    
    def shutdown(self):
        """Shutdown the cleanup manager."""
        logger.info("Shutting down cleanup manager")
        self._shutdown = True
        
        # Perform final cleanup
        try:
            self._perform_cleanup()
        except Exception as e:
            logger.error(f"Error in final cleanup: {e}")
        
        # Wait for cleanup thread to finish
        if self._cleanup_thread and self._cleanup_thread.is_alive():
            self._cleanup_thread.join(timeout=10)

# Global cleanup manager instance
_cleanup_manager = None

def get_cleanup_manager() -> CleanupManager:
    """Get the global cleanup manager instance."""
    global _cleanup_manager
    if _cleanup_manager is None:
        _cleanup_manager = CleanupManager()
    return _cleanup_manager
```

--------------------------------------------------------------------------------
/tests/unit/cloudwatch/test_cloudwatch_pagination_architecture.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Test CloudWatch Pagination Architecture

This test validates that the CloudWatch pagination system works correctly:
1. Fetches ALL data from AWS using proper NextToken pagination
2. Sorts client-side by estimated cost (descending)
3. Applies client-side pagination for MCP responses

The key insight: "Pagination breaking" isn't an API error - it's the correct
architecture handling large datasets that may cause performance issues.
"""

import pytest
import asyncio
from unittest.mock import Mock, patch, AsyncMock
from playbooks.cloudwatch.result_processor import CloudWatchResultProcessor
from services.cloudwatch_service import CloudWatchService, CloudWatchOperationResult


class TestCloudWatchPaginationArchitecture:
    """Test the CloudWatch pagination architecture end-to-end."""
    
    def setup_method(self):
        """Set up test fixtures."""
        self.result_processor = CloudWatchResultProcessor()
    
    def test_pagination_metadata_calculation(self):
        """Test that pagination metadata is calculated correctly."""
        # Test with 25 items, page size 10
        total_items = 25
        
        # Page 1: items 0-9
        metadata_p1 = self.result_processor.create_pagination_metadata(total_items, 1)
        assert metadata_p1.current_page == 1
        assert metadata_p1.page_size == 10
        assert metadata_p1.total_items == 25
        assert metadata_p1.total_pages == 3
        assert metadata_p1.has_next_page == True
        assert metadata_p1.has_previous_page == False
        
        # Page 2: items 10-19
        metadata_p2 = self.result_processor.create_pagination_metadata(total_items, 2)
        assert metadata_p2.current_page == 2
        assert metadata_p2.has_next_page == True
        assert metadata_p2.has_previous_page == True
        
        # Page 3: items 20-24 (partial page)
        metadata_p3 = self.result_processor.create_pagination_metadata(total_items, 3)
        assert metadata_p3.current_page == 3
        assert metadata_p3.has_next_page == False
        assert metadata_p3.has_previous_page == True
        
        # Page 4: beyond data (empty)
        metadata_p4 = self.result_processor.create_pagination_metadata(total_items, 4)
        assert metadata_p4.current_page == 4
        assert metadata_p4.has_next_page == False
        assert metadata_p4.has_previous_page == True
    
    def test_client_side_pagination_slicing(self):
        """Test that client-side pagination slices data correctly."""
        # Create test data
        items = [{'id': i, 'name': f'item_{i}'} for i in range(25)]
        
        # Test page 1 (items 0-9)
        result_p1 = self.result_processor.paginate_results(items, 1)
        assert len(result_p1['items']) == 10
        assert result_p1['items'][0]['id'] == 0
        assert result_p1['items'][9]['id'] == 9
        assert result_p1['pagination']['current_page'] == 1
        assert result_p1['pagination']['total_pages'] == 3
        
        # Test page 2 (items 10-19)
        result_p2 = self.result_processor.paginate_results(items, 2)
        assert len(result_p2['items']) == 10
        assert result_p2['items'][0]['id'] == 10
        assert result_p2['items'][9]['id'] == 19
        
        # Test page 3 (items 20-24, partial page)
        result_p3 = self.result_processor.paginate_results(items, 3)
        assert len(result_p3['items']) == 5
        assert result_p3['items'][0]['id'] == 20
        assert result_p3['items'][4]['id'] == 24
        
        # Test page 4 (beyond data, empty)
        result_p4 = self.result_processor.paginate_results(items, 4)
        assert len(result_p4['items']) == 0
        assert result_p4['pagination']['current_page'] == 4
    
    def test_cost_based_sorting_before_pagination(self):
        """Test that items are sorted by cost before pagination."""
        # Create test metrics with different estimated costs
        metrics = [
            {'MetricName': 'LowCost', 'Namespace': 'AWS/EC2', 'Dimensions': []},
            {'MetricName': 'HighCost', 'Namespace': 'Custom/App', 'Dimensions': [{'Name': 'Instance', 'Value': 'i-123'}]},
            {'MetricName': 'MediumCost', 'Namespace': 'AWS/Lambda', 'Dimensions': [{'Name': 'Function', 'Value': 'test'}]},
        ]
        
        # Process with cost enrichment and sorting
        enriched = self.result_processor.enrich_items_with_cost_estimates(metrics, 'metrics')
        sorted_metrics = self.result_processor.sort_by_cost_descending(enriched)
        
        # Verify that enrichment adds cost estimates
        assert all('estimated_monthly_cost' in metric for metric in sorted_metrics)
        
        # Verify sorting works (items are in descending cost order)
        costs = [metric['estimated_monthly_cost'] for metric in sorted_metrics]
        assert costs == sorted(costs, reverse=True)  # Should be in descending order
        
        # Verify custom namespace gets higher cost estimate than AWS namespaces
        custom_metrics = [m for m in sorted_metrics if not m['Namespace'].startswith('AWS/')]
        aws_metrics = [m for m in sorted_metrics if m['Namespace'].startswith('AWS/')]
        
        if custom_metrics and aws_metrics:
            # Custom metrics should generally have higher costs than AWS metrics
            max_custom_cost = max(m['estimated_monthly_cost'] for m in custom_metrics)
            max_aws_cost = max(m['estimated_monthly_cost'] for m in aws_metrics)
            # Note: This might be 0.0 for both in test environment, which is fine
    
    @pytest.mark.skip(reason="Test needs refactoring - mock setup is incorrect")
    @pytest.mark.asyncio
    async def test_aws_pagination_architecture(self):
        """Test that AWS API pagination works correctly (NextToken only)."""
        
        # Mock CloudWatch service
        mock_cloudwatch_service = Mock(spec=CloudWatchService)
        
        # Mock paginated response from AWS
        mock_response_page1 = CloudWatchOperationResult(
            success=True,
            data={
                'metrics': [{'MetricName': f'Metric_{i}', 'Namespace': 'AWS/EC2'} for i in range(500)],
                'total_count': 500,
                'filtered': False
            },
            operation_name='list_metrics'
        )
        
        mock_cloudwatch_service.list_metrics.return_value = mock_response_page1
        
        # Test that service is called correctly (no MaxRecords parameter)
        result = await mock_cloudwatch_service.list_metrics(namespace='AWS/EC2')
        
        # Verify the call was made without MaxRecords
        mock_cloudwatch_service.list_metrics.assert_called_once_with(namespace='AWS/EC2')
        
        # Verify we got the expected data structure
        assert result.success == True
        assert len(result.data['metrics']) == 500
        assert result.data['total_count'] == 500
    
    def test_pagination_architecture_documentation(self):
        """Document the pagination architecture for future reference."""
        
        architecture_doc = {
            "cloudwatch_pagination_architecture": {
                "step_1_aws_fetch": {
                    "description": "Fetch ALL data from AWS using proper NextToken pagination",
                    "method": "AWS paginator with NextToken (no MaxRecords)",
                    "apis_used": ["list_metrics", "describe_alarms", "describe_log_groups"],
                    "result": "Complete dataset in arbitrary AWS order"
                },
                "step_2_client_sort": {
                    "description": "Sort client-side by estimated cost (descending)",
                    "method": "Cost estimation using free metadata + sorting",
                    "cost": "Zero additional API calls",
                    "result": "Dataset ordered by cost (highest first)"
                },
                "step_3_client_paginate": {
                    "description": "Apply client-side pagination for MCP response",
                    "method": "Array slicing with 10 items per page",
                    "page_size": 10,
                    "result": "Paginated response with metadata"
                }
            },
            "why_this_architecture": {
                "aws_limitation": "AWS APIs return data in arbitrary order, not by cost",
                "sorting_requirement": "Users want to see highest-cost items first",
                "solution": "Fetch all, sort by cost, then paginate for display"
            },
            "performance_considerations": {
                "large_datasets": "4000+ metrics may cause timeouts",
                "memory_usage": "All data loaded into memory for sorting",
                "optimization": "Caching and progressive loading implemented"
            }
        }
        
        # This test passes if the architecture is documented
        assert architecture_doc["cloudwatch_pagination_architecture"]["step_1_aws_fetch"]["method"] == "AWS paginator with NextToken (no MaxRecords)"
        assert architecture_doc["cloudwatch_pagination_architecture"]["step_3_client_paginate"]["page_size"] == 10
    
    def test_edge_cases_pagination(self):
        """Test edge cases in pagination."""
        
        # Empty dataset
        empty_result = self.result_processor.paginate_results([], 1)
        assert len(empty_result['items']) == 0
        assert empty_result['pagination']['total_pages'] == 0
        assert empty_result['pagination']['has_next_page'] == False
        
        # Single item
        single_item = [{'id': 1}]
        single_result = self.result_processor.paginate_results(single_item, 1)
        assert len(single_result['items']) == 1
        assert single_result['pagination']['total_pages'] == 1
        
        # Exactly page size (10 items)
        exact_page = [{'id': i} for i in range(10)]
        exact_result = self.result_processor.paginate_results(exact_page, 1)
        assert len(exact_result['items']) == 10
        assert exact_result['pagination']['total_pages'] == 1
        assert exact_result['pagination']['has_next_page'] == False
        
        # Invalid page numbers
        items = [{'id': i} for i in range(5)]
        
        # Page 0 should default to page 1
        page_0_result = self.result_processor.paginate_results(items, 0)
        assert page_0_result['pagination']['current_page'] == 1
        
        # Negative page should default to page 1
        negative_result = self.result_processor.paginate_results(items, -1)
        assert negative_result['pagination']['current_page'] == 1


if __name__ == "__main__":
    pytest.main([__file__, "-v"])
```

--------------------------------------------------------------------------------
/tests/unit/cloudwatch/test_mcp_pagination_bug.py:
--------------------------------------------------------------------------------

```python
"""
Unit tests to identify MCP pagination bug in CloudWatch metrics optimization.

Tests the complete flow from MCP tool call through orchestrator to result processor
to identify where pagination is being bypassed.
"""

import pytest
import asyncio
import json
from unittest.mock import patch, AsyncMock, MagicMock
from mcp.types import TextContent

# Import the functions under test
from runbook_functions import run_cloudwatch_metrics_optimization


@pytest.mark.skip(reason="Tests need refactoring to match actual API structure")
class TestMCPPaginationBug:
    """Test suite to identify where pagination is failing in the MCP flow."""
    
    @pytest.fixture
    def mock_large_metrics_dataset(self):
        """Create a large dataset of 25 metrics for pagination testing."""
        return [
            {
                'MetricName': f'CustomMetric{i:02d}',
                'Namespace': 'Custom/Application' if i % 2 == 0 else 'AWS/EC2',
                'Dimensions': [{'Name': 'InstanceId', 'Value': f'i-{i:08x}'}],
                'estimated_monthly_cost': 10.0 - (i * 0.2)  # Decreasing cost
            }
            for i in range(25)
        ]
    
    @pytest.fixture
    def mock_orchestrator_response(self, mock_large_metrics_dataset):
        """Mock orchestrator response with proper structure."""
        def create_response(page=1):
            # Simulate orchestrator pagination - should return only 10 items per page
            start_idx = (page - 1) * 10
            end_idx = start_idx + 10
            page_metrics = mock_large_metrics_dataset[start_idx:end_idx]
            
            return {
                'status': 'success',
                'data': {
                    'metrics_configuration_analysis': {
                        'metrics': {
                            'metrics': page_metrics,
                            'pagination': {
                                'current_page': page,
                                'page_size': 10,
                                'total_items': 25,
                                'total_pages': 3,
                                'has_next_page': page < 3,
                                'has_previous_page': page > 1
                            },
                            'total_count': 25,
                            'namespace': 'all',
                            'filtered': False
                        }
                    }
                },
                'orchestrator_metadata': {
                    'session_id': 'test-session',
                    'region': 'us-east-1'
                }
            }
        return create_response
    
    @pytest.mark.asyncio
    async def test_orchestrator_pagination_works(self, mock_orchestrator_response):
        """Test that orchestrator correctly applies pagination."""
        
        with patch('runbook_functions.CloudWatchOptimizationOrchestrator') as mock_orchestrator_class:
            # Setup mock orchestrator instance
            mock_orchestrator = AsyncMock()
            mock_orchestrator_class.return_value = mock_orchestrator
            
            # Mock execute_analysis to return paginated responses
            def mock_execute_analysis(analysis_type, **kwargs):
                page = kwargs.get('page', 1)
                return mock_orchestrator_response(page)
            
            mock_orchestrator.execute_analysis = AsyncMock(side_effect=mock_execute_analysis)
            
            # Test page 1
            result_p1 = await run_cloudwatch_metrics_optimization({
                'region': 'us-east-1',
                'page': 1,
                'lookback_days': 30
            })
            
            # Verify result structure
            assert len(result_p1) == 1
            assert isinstance(result_p1[0], TextContent)
            
            # Parse JSON response
            data_p1 = json.loads(result_p1[0].text)
            assert data_p1['status'] == 'success'
            
            # Check metrics data
            metrics_data_p1 = data_p1['data']['metrics_configuration_analysis']['metrics']
            assert len(metrics_data_p1['metrics']) == 10, "Page 1 should have 10 metrics"
            assert metrics_data_p1['pagination']['current_page'] == 1
            assert metrics_data_p1['pagination']['total_items'] == 25
            
            # Test page 2
            result_p2 = await run_cloudwatch_metrics_optimization({
                'region': 'us-east-1',
                'page': 2,
                'lookback_days': 30
            })
            
            data_p2 = json.loads(result_p2[0].text)
            metrics_data_p2 = data_p2['data']['metrics_configuration_analysis']['metrics']
            assert len(metrics_data_p2['metrics']) == 10, "Page 2 should have 10 metrics"
            assert metrics_data_p2['pagination']['current_page'] == 2
            
            # Verify different metrics on different pages
            p1_names = [m['MetricName'] for m in metrics_data_p1['metrics']]
            p2_names = [m['MetricName'] for m in metrics_data_p2['metrics']]
            assert p1_names != p2_names, "Page 1 and Page 2 should have different metrics"
            
            # Verify orchestrator was called with correct parameters
            assert mock_orchestrator.execute_analysis.call_count == 2
            
            # Check first call (page 1)
            first_call_args = mock_orchestrator.execute_analysis.call_args_list[0]
            assert first_call_args[0][0] == 'metrics_optimization'
            assert first_call_args[1]['page'] == 1
            
            # Check second call (page 2)  
            second_call_args = mock_orchestrator.execute_analysis.call_args_list[1]
            assert second_call_args[1]['page'] == 2
    
    @pytest.mark.asyncio
    async def test_mcp_tool_bypasses_pagination(self):
        """Test to identify if MCP tool is bypassing orchestrator pagination."""
        
        # This test will help identify if there's a direct MCP call bypassing the orchestrator
        with patch('runbook_functions.CloudWatchOptimizationOrchestrator') as mock_orchestrator_class:
            mock_orchestrator = AsyncMock()
            mock_orchestrator_class.return_value = mock_orchestrator
            
            # Mock orchestrator to return a response indicating it was called
            mock_orchestrator.execute_analysis = AsyncMock(return_value={
                'status': 'success',
                'data': {
                    'metrics_configuration_analysis': {
                        'metrics': {
                            'metrics': [],
                            'pagination': {'current_page': 1, 'total_items': 0},
                            'orchestrator_called': True  # Flag to verify orchestrator was used
                        }
                    }
                }
            })
            
            # Also patch any potential direct MCP calls
            with patch('runbook_functions.mcp_cfm_tips_cloudwatch_metrics_optimization') as mock_mcp:
                mock_mcp.return_value = {
                    'status': 'success', 
                    'data': {'direct_mcp_call': True}  # Flag to identify direct MCP call
                }
                
                result = await run_cloudwatch_metrics_optimization({
                    'region': 'us-east-1',
                    'page': 1
                })
                
                # Parse result
                data = json.loads(result[0].text)
                
                # Check if orchestrator was used (expected behavior)
                if 'orchestrator_called' in str(data):
                    print("✅ Orchestrator was called - pagination should work")
                    assert mock_orchestrator.execute_analysis.called
                    assert not mock_mcp.called, "Direct MCP call should not be made"
                
                # Check if direct MCP call was made (bug scenario)
                elif 'direct_mcp_call' in str(data):
                    pytest.fail("❌ BUG IDENTIFIED: Direct MCP call bypassing orchestrator pagination")
                
                else:
                    pytest.fail("❌ Unable to determine call path - check test setup")
    
    @pytest.mark.asyncio 
    async def test_result_processor_pagination(self):
        """Test that result processor correctly paginates metrics."""
        
        from playbooks.cloudwatch.result_processor import CloudWatchResultProcessor
        
        # Create test metrics
        test_metrics = [
            {'MetricName': f'Metric{i}', 'estimated_monthly_cost': 10 - i}
            for i in range(25)
        ]
        
        processor = CloudWatchResultProcessor()
        
        # Test page 1
        result_p1 = processor.process_metrics_results(test_metrics, page=1)
        assert len(result_p1['items']) == 10
        assert result_p1['pagination']['current_page'] == 1
        assert result_p1['pagination']['total_items'] == 25
        
        # Test page 2
        result_p2 = processor.process_metrics_results(test_metrics, page=2)
        assert len(result_p2['items']) == 10
        assert result_p2['pagination']['current_page'] == 2
        
        # Verify different items
        p1_names = [item['MetricName'] for item in result_p1['items']]
        p2_names = [item['MetricName'] for item in result_p2['items']]
        assert p1_names != p2_names
    
    @pytest.mark.asyncio
    async def test_orchestrator_apply_result_processing(self):
        """Test that orchestrator's _apply_result_processing works correctly."""
        
        from playbooks.cloudwatch.optimization_orchestrator import CloudWatchOptimizationOrchestrator
        
        # Create mock result with metrics
        mock_result = {
            'status': 'success',
            'data': {
                'metrics_configuration_analysis': {
                    'metrics': {
                        'metrics': [
                            {'MetricName': f'Metric{i}', 'estimated_monthly_cost': 10 - i}
                            for i in range(25)
                        ],
                        'total_count': 25
                    }
                }
            }
        }
        
        orchestrator = CloudWatchOptimizationOrchestrator(region='us-east-1')
        
        # Test pagination application
        processed_result = orchestrator._apply_result_processing(mock_result, page=1)
        
        metrics_data = processed_result['data']['metrics_configuration_analysis']['metrics']
        assert len(metrics_data['metrics']) == 10, "Should be paginated to 10 items"
        assert 'pagination' in metrics_data, "Should have pagination metadata"
        assert metrics_data['pagination']['current_page'] == 1
        assert metrics_data['pagination']['total_items'] == 25
```

--------------------------------------------------------------------------------
/diagnose_cost_optimization_hub_v2.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Diagnostic script for Cost Optimization Hub issues - Updated with correct permissions
"""

import boto3
import json
from botocore.exceptions import ClientError

def check_cost_optimization_hub():
    """Check Cost Optimization Hub status and common issues."""
    print("🔍 Diagnosing Cost Optimization Hub Issues")
    print("=" * 50)
    
    # Test different regions where Cost Optimization Hub is available
    regions_to_test = ['us-east-1', 'us-west-2', 'eu-west-1', 'ap-southeast-1']
    
    for region in regions_to_test:
        print(f"\n📍 Testing region: {region}")
        try:
            client = boto3.client('cost-optimization-hub', region_name=region)
            
            # Test 1: Check enrollment statuses (correct API call)
            print("   ✅ Testing enrollment statuses...")
            try:
                enrollment_response = client.list_enrollment_statuses()
                print(f"   📊 Enrollment Response: {json.dumps(enrollment_response, indent=2, default=str)}")
                
                # Check if any accounts are enrolled
                items = enrollment_response.get('items', [])
                if items:
                    active_enrollments = [item for item in items if item.get('status') == 'Active']
                    if active_enrollments:
                        print(f"   ✅ Found {len(active_enrollments)} active enrollments")
                        
                        # Test 2: Try to list recommendations
                        print("   ✅ Testing list recommendations...")
                        try:
                            recommendations = client.list_recommendations(maxResults=5)
                            print(f"   📊 Found {len(recommendations.get('items', []))} recommendations")
                            print("   ✅ Cost Optimization Hub is working correctly!")
                            return True
                            
                        except ClientError as rec_error:
                            print(f"   ❌ Error listing recommendations: {rec_error.response['Error']['Code']} - {rec_error.response['Error']['Message']}")
                    else:
                        print("   ⚠️  No active enrollments found")
                        print("   💡 You need to enable Cost Optimization Hub in the AWS Console")
                else:
                    print("   ⚠️  No enrollment information found")
                    print("   💡 Cost Optimization Hub may not be set up for this account")
                    
            except ClientError as enrollment_error:
                error_code = enrollment_error.response['Error']['Code']
                error_message = enrollment_error.response['Error']['Message']
                
                if error_code == 'AccessDeniedException':
                    print("   ❌ Access denied - check IAM permissions")
                    print("   💡 Required permissions: cost-optimization-hub:ListEnrollmentStatuses")
                elif error_code == 'ValidationException':
                    print(f"   ❌ Validation error: {error_message}")
                else:
                    print(f"   ❌ Error: {error_code} - {error_message}")
                    
        except Exception as e:
            print(f"   ❌ Failed to create client for region {region}: {str(e)}")
    
    return False

def check_iam_permissions():
    """Check IAM permissions for Cost Optimization Hub."""
    print("\n🔐 Checking IAM Permissions")
    print("=" * 30)
    
    try:
        # Get current user/role
        sts_client = boto3.client('sts')
        identity = sts_client.get_caller_identity()
        print(f"Current identity: {identity.get('Arn', 'Unknown')}")
        
        # Correct required actions for Cost Optimization Hub
        required_actions = [
            'cost-optimization-hub:ListEnrollmentStatuses',
            'cost-optimization-hub:UpdateEnrollmentStatus',
            'cost-optimization-hub:GetPreferences',
            'cost-optimization-hub:UpdatePreferences',
            'cost-optimization-hub:GetRecommendation',
            'cost-optimization-hub:ListRecommendations',
            'cost-optimization-hub:ListRecommendationSummaries'
        ]
        
        print("\nRequired permissions for Cost Optimization Hub:")
        for action in required_actions:
            print(f"  - {action}")
            
        print("\nMinimal permissions for read-only access:")
        minimal_actions = [
            'cost-optimization-hub:ListEnrollmentStatuses',
            'cost-optimization-hub:ListRecommendations',
            'cost-optimization-hub:GetRecommendation',
            'cost-optimization-hub:ListRecommendationSummaries'
        ]
        for action in minimal_actions:
            print(f"  - {action}")
            
    except Exception as e:
        print(f"Error checking IAM: {str(e)}")

def test_individual_apis():
    """Test individual Cost Optimization Hub APIs."""
    print("\n🧪 Testing Individual APIs")
    print("=" * 30)
    
    try:
        client = boto3.client('cost-optimization-hub', region_name='us-east-1')
        
        # Test 1: List Enrollment Statuses
        print("\n1. Testing list_enrollment_statuses...")
        try:
            response = client.list_enrollment_statuses()
            print(f"   ✅ Success: Found {len(response.get('items', []))} enrollment records")
        except ClientError as e:
            print(f"   ❌ Failed: {e.response['Error']['Code']} - {e.response['Error']['Message']}")
        
        # Test 2: List Recommendations
        print("\n2. Testing list_recommendations...")
        try:
            response = client.list_recommendations(maxResults=5)
            print(f"   ✅ Success: Found {len(response.get('items', []))} recommendations")
        except ClientError as e:
            print(f"   ❌ Failed: {e.response['Error']['Code']} - {e.response['Error']['Message']}")
        
        # Test 3: List Recommendation Summaries
        print("\n3. Testing list_recommendation_summaries...")
        try:
            response = client.list_recommendation_summaries(maxResults=5)
            print(f"   ✅ Success: Found {len(response.get('items', []))} summaries")
        except ClientError as e:
            print(f"   ❌ Failed: {e.response['Error']['Code']} - {e.response['Error']['Message']}")
            
    except Exception as e:
        print(f"Error testing APIs: {str(e)}")

def provide_correct_iam_policy():
    """Provide the correct IAM policy for Cost Optimization Hub."""
    print("\n📋 Correct IAM Policy")
    print("=" * 25)
    
    policy = {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "cost-optimization-hub:ListEnrollmentStatuses",
                    "cost-optimization-hub:UpdateEnrollmentStatus",
                    "cost-optimization-hub:GetPreferences",
                    "cost-optimization-hub:UpdatePreferences",
                    "cost-optimization-hub:GetRecommendation",
                    "cost-optimization-hub:ListRecommendations",
                    "cost-optimization-hub:ListRecommendationSummaries"
                ],
                "Resource": "*"
            }
        ]
    }
    
    print("Full IAM Policy:")
    print(json.dumps(policy, indent=2))
    
    minimal_policy = {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "cost-optimization-hub:ListEnrollmentStatuses",
                    "cost-optimization-hub:ListRecommendations",
                    "cost-optimization-hub:GetRecommendation",
                    "cost-optimization-hub:ListRecommendationSummaries"
                ],
                "Resource": "*"
            }
        ]
    }
    
    print("\nMinimal Read-Only Policy:")
    print(json.dumps(minimal_policy, indent=2))

def provide_solutions():
    """Provide solutions for common Cost Optimization Hub issues."""
    print("\n🛠️  Updated Solutions")
    print("=" * 20)
    
    solutions = [
        {
            "issue": "AccessDeniedException",
            "solution": [
                "1. Add the correct IAM permissions (see policy above)",
                "2. The service uses different permission names than other AWS services",
                "3. Use 'cost-optimization-hub:ListEnrollmentStatuses' not 'GetEnrollmentStatus'",
                "4. Attach the policy to your IAM user/role"
            ]
        },
        {
            "issue": "No enrollment found",
            "solution": [
                "1. Go to AWS Console → Cost Optimization Hub",
                "2. Enable the service for your account",
                "3. Wait for enrollment to complete",
                "4. URL: https://console.aws.amazon.com/cost-optimization-hub/"
            ]
        },
        {
            "issue": "Service not available",
            "solution": [
                "1. Cost Optimization Hub is only available in specific regions",
                "2. Use us-east-1, us-west-2, eu-west-1, or ap-southeast-1",
                "3. The service may not be available in your region yet"
            ]
        },
        {
            "issue": "No recommendations found",
            "solution": [
                "1. Cost Optimization Hub needs time to analyze your resources",
                "2. Ensure you have resources running for at least 14 days",
                "3. The service needs sufficient usage data to generate recommendations",
                "4. Check if you have any EC2, RDS, or other supported resources"
            ]
        }
    ]
    
    for solution in solutions:
        print(f"\n🔧 {solution['issue']}:")
        for step in solution['solution']:
            print(f"   {step}")

def main():
    """Main diagnostic function."""
    print("AWS Cost Optimization Hub Diagnostic Tool v2")
    print("=" * 50)
    
    try:
        # Run diagnostics
        hub_working = check_cost_optimization_hub()
        check_iam_permissions()
        test_individual_apis()
        provide_correct_iam_policy()
        provide_solutions()
        
        print("\n" + "=" * 60)
        if hub_working:
            print("✅ DIAGNOSIS: Cost Optimization Hub appears to be working!")
        else:
            print("❌ DIAGNOSIS: Cost Optimization Hub needs to be set up.")
            
        print("\n📝 Next Steps:")
        print("1. Apply the correct IAM policy shown above")
        print("2. Enable Cost Optimization Hub in the AWS Console if needed")
        print("3. Use the updated MCP server (mcp_server_fixed_v3.py)")
        print("4. Test with the enrollment status tool first")
        
    except Exception as e:
        print(f"\n❌ Diagnostic failed: {str(e)}")
        print("Please check your AWS credentials and try again.")

if __name__ == "__main__":
    main()

```

--------------------------------------------------------------------------------
/utils/parallel_executor.py:
--------------------------------------------------------------------------------

```python
"""
Parallel Execution Engine for CFM Tips MCP Server

Provides optimized parallel execution of AWS service calls with session integration.
"""

import logging
import time
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed, Future
from typing import Dict, List, Any, Optional, Callable, Union
from dataclasses import dataclass
from datetime import datetime

logger = logging.getLogger(__name__)

@dataclass
class TaskResult:
    """Result of a parallel task execution."""
    task_id: str
    service: str
    operation: str
    status: str  # 'success', 'error', 'timeout'
    data: Any = None
    error: Optional[str] = None
    execution_time: float = 0.0
    timestamp: datetime = None
    
    def __post_init__(self):
        if self.timestamp is None:
            self.timestamp = datetime.now()

@dataclass
class ParallelTask:
    """Definition of a task to be executed in parallel."""
    task_id: str
    service: str
    operation: str
    function: Callable
    args: tuple = ()
    kwargs: Dict[str, Any] = None
    timeout: float = 30.0
    priority: int = 1  # Higher number = higher priority
    
    def __post_init__(self):
        if self.kwargs is None:
            self.kwargs = {}

class ParallelExecutor:
    """Executes AWS service calls in parallel with optimized resource management."""
    
    def __init__(self, max_workers: int = 10, default_timeout: float = 30.0):
        self.max_workers = max_workers
        self.default_timeout = default_timeout
        self.executor = ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix="CFM-Worker")
        self._active_tasks: Dict[str, Future] = {}
        self._results: Dict[str, TaskResult] = {}
        self._lock = threading.RLock()
        
        logger.info(f"Initialized ParallelExecutor with {max_workers} workers")
    
    def submit_task(self, task: ParallelTask) -> str:
        """Submit a task for parallel execution."""
        with self._lock:
            if task.task_id in self._active_tasks:
                raise ValueError(f"Task {task.task_id} already exists")
            
            future = self.executor.submit(self._execute_task, task)
            self._active_tasks[task.task_id] = future
            
            logger.debug(f"Submitted task {task.task_id} ({task.service}.{task.operation})")
            return task.task_id
    
    def submit_batch(self, tasks: List[ParallelTask]) -> List[str]:
        """Submit multiple tasks for parallel execution."""
        # Sort by priority (higher first)
        sorted_tasks = sorted(tasks, key=lambda t: t.priority, reverse=True)
        
        task_ids = []
        for task in sorted_tasks:
            try:
                task_id = self.submit_task(task)
                task_ids.append(task_id)
            except Exception as e:
                logger.error(f"Error submitting task {task.task_id}: {e}")
                # Create error result
                error_result = TaskResult(
                    task_id=task.task_id,
                    service=task.service,
                    operation=task.operation,
                    status='error',
                    error=str(e)
                )
                with self._lock:
                    self._results[task.task_id] = error_result
        
        logger.info(f"Submitted batch of {len(task_ids)} tasks")
        return task_ids
    
    def _execute_task(self, task: ParallelTask) -> TaskResult:
        """Execute a single task with timeout and error handling."""
        start_time = time.time()
        
        try:
            logger.debug(f"Executing task {task.task_id}")
            
            # Execute the function with timeout
            result_data = task.function(*task.args, **task.kwargs)
            
            execution_time = time.time() - start_time
            
            result = TaskResult(
                task_id=task.task_id,
                service=task.service,
                operation=task.operation,
                status='success',
                data=result_data,
                execution_time=execution_time
            )
            
            logger.debug(f"Task {task.task_id} completed in {execution_time:.2f}s")
            
        except Exception as e:
            execution_time = time.time() - start_time
            error_msg = str(e)
            
            result = TaskResult(
                task_id=task.task_id,
                service=task.service,
                operation=task.operation,
                status='error',
                error=error_msg,
                execution_time=execution_time
            )
            
            logger.error(f"Task {task.task_id} failed after {execution_time:.2f}s: {error_msg}")
        
        # Store result
        with self._lock:
            self._results[task.task_id] = result
            if task.task_id in self._active_tasks:
                del self._active_tasks[task.task_id]
        
        return result
    
    def wait_for_tasks(self, task_ids: List[str], timeout: Optional[float] = None) -> Dict[str, TaskResult]:
        """Wait for specific tasks to complete."""
        if timeout is None:
            timeout = self.default_timeout
        
        results = {}
        remaining_tasks = set(task_ids)
        start_time = time.time()
        
        while remaining_tasks and (time.time() - start_time) < timeout:
            completed_tasks = set()
            
            with self._lock:
                for task_id in remaining_tasks:
                    if task_id in self._results:
                        results[task_id] = self._results[task_id]
                        completed_tasks.add(task_id)
                    elif task_id not in self._active_tasks:
                        # Task not found, create error result
                        error_result = TaskResult(
                            task_id=task_id,
                            service='unknown',
                            operation='unknown',
                            status='error',
                            error='Task not found'
                        )
                        results[task_id] = error_result
                        completed_tasks.add(task_id)
            
            remaining_tasks -= completed_tasks
            
            if remaining_tasks:
                time.sleep(0.1)  # Small delay to avoid busy waiting
        
        # Handle timeout for remaining tasks
        for task_id in remaining_tasks:
            timeout_result = TaskResult(
                task_id=task_id,
                service='unknown',
                operation='unknown',
                status='timeout',
                error=f'Task timed out after {timeout}s'
            )
            results[task_id] = timeout_result
        
        logger.info(f"Completed waiting for {len(task_ids)} tasks, {len(results)} results")
        return results
    
    def wait_for_all(self, timeout: Optional[float] = None) -> Dict[str, TaskResult]:
        """Wait for all active tasks to complete."""
        with self._lock:
            active_task_ids = list(self._active_tasks.keys())
        
        if not active_task_ids:
            return {}
        
        return self.wait_for_tasks(active_task_ids, timeout)
    
    def get_result(self, task_id: str) -> Optional[TaskResult]:
        """Get result for a specific task."""
        with self._lock:
            return self._results.get(task_id)
    
    def get_all_results(self) -> Dict[str, TaskResult]:
        """Get all available results."""
        with self._lock:
            return self._results.copy()
    
    def cancel_task(self, task_id: str) -> bool:
        """Cancel a running task."""
        with self._lock:
            if task_id in self._active_tasks:
                future = self._active_tasks[task_id]
                cancelled = future.cancel()
                if cancelled:
                    del self._active_tasks[task_id]
                    # Create cancelled result
                    cancel_result = TaskResult(
                        task_id=task_id,
                        service='unknown',
                        operation='unknown',
                        status='error',
                        error='Task cancelled'
                    )
                    self._results[task_id] = cancel_result
                return cancelled
        return False
    
    def get_status(self) -> Dict[str, Any]:
        """Get executor status information."""
        with self._lock:
            active_count = len(self._active_tasks)
            completed_count = len(self._results)
            
            # Count results by status
            status_counts = {}
            for result in self._results.values():
                status_counts[result.status] = status_counts.get(result.status, 0) + 1
        
        return {
            'max_workers': self.max_workers,
            'active_tasks': active_count,
            'completed_tasks': completed_count,
            'status_breakdown': status_counts,
            'executor_alive': not self.executor._shutdown
        }
    
    def clear_results(self, older_than_minutes: int = 60):
        """Clear old results to free memory."""
        cutoff_time = datetime.now().timestamp() - (older_than_minutes * 60)
        
        with self._lock:
            old_task_ids = []
            for task_id, result in self._results.items():
                if result.timestamp.timestamp() < cutoff_time:
                    old_task_ids.append(task_id)
            
            for task_id in old_task_ids:
                del self._results[task_id]
        
        logger.info(f"Cleared {len(old_task_ids)} old results")
    
    def shutdown(self, wait: bool = True):
        """Shutdown the executor and clean up resources."""
        logger.info("Shutting down ParallelExecutor")
        
        with self._lock:
            # Cancel all active tasks
            for task_id, future in self._active_tasks.items():
                future.cancel()
            self._active_tasks.clear()
        
        # Shutdown executor
        self.executor.shutdown(wait=wait)
        logger.info("ParallelExecutor shutdown complete")

# Global executor instance
_parallel_executor = None

def get_parallel_executor() -> ParallelExecutor:
    """Get the global parallel executor instance."""
    global _parallel_executor
    if _parallel_executor is None:
        _parallel_executor = ParallelExecutor()
    return _parallel_executor

def create_task(task_id: str, service: str, operation: str, function: Callable,
                args: tuple = (), kwargs: Dict[str, Any] = None, 
                timeout: float = 30.0, priority: int = 1) -> ParallelTask:
    """Helper function to create a ParallelTask."""
    return ParallelTask(
        task_id=task_id,
        service=service,
        operation=operation,
        function=function,
        args=args,
        kwargs=kwargs or {},
        timeout=timeout,
        priority=priority
    )
```

--------------------------------------------------------------------------------
/utils/logging_config.py:
--------------------------------------------------------------------------------

```python
"""
Centralized logging configuration for CFM Tips MCP Server

"""

import logging
import sys
import os
import json
import tempfile
from datetime import datetime
from typing import Dict, Any, Optional, List


class StructuredFormatter(logging.Formatter):
    """Custom formatter for structured logging with JSON output."""
    
    def format(self, record):
        """Format log record as structured JSON."""
        log_entry = {
            'timestamp': datetime.fromtimestamp(record.created).isoformat(),
            'level': record.levelname,
            'logger': record.name,
            'module': record.module,
            'function': record.funcName,
            'line': record.lineno,
            'message': record.getMessage(),
            'thread': record.thread,
            'thread_name': record.threadName
        }
        
        # Add exception information if present
        if record.exc_info:
            log_entry['exception'] = self.formatException(record.exc_info)
        
        # Add extra fields from record
        for key, value in record.__dict__.items():
            if key not in ['name', 'msg', 'args', 'levelname', 'levelno', 'pathname', 
                          'filename', 'module', 'lineno', 'funcName', 'created', 
                          'msecs', 'relativeCreated', 'thread', 'threadName', 
                          'processName', 'process', 'getMessage', 'exc_info', 
                          'exc_text', 'stack_info']:
                log_entry[key] = value
        
        return json.dumps(log_entry)


class StandardFormatter(logging.Formatter):
    """Enhanced standard formatter with more context."""
    
    def __init__(self):
        super().__init__(
            '%(asctime)s - %(name)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s'
        )


def setup_logging(structured: bool = False, log_level: str = "INFO"):
    """
    Configure comprehensive logging for the application.
    
    Args:
        structured: Whether to use structured JSON logging
        log_level: Logging level (DEBUG, INFO, WARNING, ERROR)
    """
    
    # Create appropriate formatter
    if structured:
        formatter = StructuredFormatter()
    else:
        formatter = StandardFormatter()
    
    # Configure root logger
    root_logger = logging.getLogger()
    root_logger.setLevel(getattr(logging, log_level.upper(), logging.INFO))
    
    # Remove existing handlers
    for handler in root_logger.handlers[:]:
        root_logger.removeHandler(handler)
    
    # Add file handlers
    try:
        # Try to create logs directory if it doesn't exist
        log_dir = 'logs'
        if not os.path.exists(log_dir):
            os.makedirs(log_dir, exist_ok=True)
        
        # Try main log file in logs directory first
        log_file = os.path.join(log_dir, 'cfm_tips_mcp.log')
        file_handler = logging.FileHandler(log_file)
        file_handler.setLevel(logging.INFO)
        file_handler.setFormatter(formatter)
        root_logger.addHandler(file_handler)
        
        # Try error log file
        error_file = os.path.join(log_dir, 'cfm_tips_mcp_errors.log')
        error_handler = logging.FileHandler(error_file)
        error_handler.setLevel(logging.ERROR)
        error_handler.setFormatter(formatter)
        root_logger.addHandler(error_handler)
        
    except (OSError, PermissionError) as e:
        # If we can't write to logs directory, try current directory
        try:
            file_handler = logging.FileHandler('cfm_tips_mcp.log')
            file_handler.setLevel(logging.INFO)
            file_handler.setFormatter(formatter)
            root_logger.addHandler(file_handler)
            
            error_handler = logging.FileHandler('cfm_tips_mcp_errors.log')
            error_handler.setLevel(logging.ERROR)
            error_handler.setFormatter(formatter)
            root_logger.addHandler(error_handler)
            
        except (OSError, PermissionError):
            # If we can't write anywhere, try temp directory
            try:
                temp_dir = tempfile.gettempdir()
                temp_log = os.path.join(temp_dir, 'cfm_tips_mcp.log')
                file_handler = logging.FileHandler(temp_log)
                file_handler.setLevel(logging.INFO)
                file_handler.setFormatter(formatter)
                root_logger.addHandler(file_handler)
                
                temp_error = os.path.join(temp_dir, 'cfm_tips_mcp_errors.log')
                error_handler = logging.FileHandler(temp_error)
                error_handler.setLevel(logging.ERROR)
                error_handler.setFormatter(formatter)
                root_logger.addHandler(error_handler)
                
                # Log where we're writing files
                print(f"Warning: Using temp directory for logs: {temp_dir}")
                
            except (OSError, PermissionError):
                # If all else fails, raise error since we need file logging
                raise RuntimeError("Could not create log files in any location")
    
    return logging.getLogger(__name__)

def log_function_entry(logger, func_name, **kwargs):
    """Log function entry with parameters."""
    logger.info(f"Entering {func_name} with params: {kwargs}")

def log_function_exit(logger, func_name, result_status=None, execution_time=None):
    """Log function exit with results."""
    msg = f"Exiting {func_name}"
    if result_status:
        msg += f" - Status: {result_status}"
    if execution_time:
        msg += f" - Time: {execution_time:.2f}s"
    logger.info(msg)

def log_aws_api_call(logger, service, operation, **params):
    """Log AWS API calls."""
    logger.info(f"AWS API Call: {service}.{operation} with params: {params}")

def log_aws_api_error(logger, service, operation, error):
    """Log AWS API errors."""
    logger.error(f"AWS API Error: {service}.{operation} - {str(error)}")


def create_structured_logger(name: str, extra_fields: Optional[Dict[str, Any]] = None) -> logging.Logger:
    """
    Create a logger with structured logging capabilities.
    
    Args:
        name: Logger name
        extra_fields: Additional fields to include in all log messages
        
    Returns:
        Configured logger instance
    """
    logger = logging.getLogger(name)
    
    if extra_fields:
        # Create adapter to add extra fields
        logger = logging.LoggerAdapter(logger, extra_fields)
    
    return logger


def log_s3_operation(logger, operation: str, bucket_name: Optional[str] = None, 
                    object_key: Optional[str] = None, **kwargs):
    """
    Log S3 operations with structured data.
    
    Args:
        logger: Logger instance
        operation: S3 operation name
        bucket_name: S3 bucket name
        object_key: S3 object key
        **kwargs: Additional operation parameters
    """
    log_data = {
        'operation_type': 's3_operation',
        'operation': operation,
        'bucket_name': bucket_name,
        'object_key': object_key
    }
    log_data.update(kwargs)
    
    logger.info(f"S3 Operation: {operation}", extra=log_data)


def log_analysis_start(logger, analysis_type: str, session_id: Optional[str] = None, **kwargs):
    """
    Log analysis start with structured data.
    
    Args:
        logger: Logger instance
        analysis_type: Type of analysis
        session_id: Session identifier
        **kwargs: Additional analysis parameters
    """
    log_data = {
        'event_type': 'analysis_start',
        'analysis_type': analysis_type,
        'session_id': session_id
    }
    log_data.update(kwargs)
    
    logger.info(f"Starting analysis: {analysis_type}", extra=log_data)


def log_analysis_complete(logger, analysis_type: str, status: str, execution_time: float,
                         session_id: Optional[str] = None, **kwargs):
    """
    Log analysis completion with structured data.
    
    Args:
        logger: Logger instance
        analysis_type: Type of analysis
        status: Analysis status
        execution_time: Execution time in seconds
        session_id: Session identifier
        **kwargs: Additional analysis results
    """
    log_data = {
        'event_type': 'analysis_complete',
        'analysis_type': analysis_type,
        'status': status,
        'execution_time': execution_time,
        'session_id': session_id
    }
    log_data.update(kwargs)
    
    logger.info(f"Completed analysis: {analysis_type} - Status: {status}", extra=log_data)


def log_cost_optimization_finding(logger, finding_type: str, resource_id: str, 
                                 potential_savings: Optional[float] = None, **kwargs):
    """
    Log cost optimization findings with structured data.
    
    Args:
        logger: Logger instance
        finding_type: Type of optimization finding
        resource_id: Resource identifier
        potential_savings: Estimated cost savings
        **kwargs: Additional finding details
    """
    log_data = {
        'event_type': 'cost_optimization_finding',
        'finding_type': finding_type,
        'resource_id': resource_id,
        'potential_savings': potential_savings
    }
    log_data.update(kwargs)
    
    logger.info(f"Cost optimization finding: {finding_type} for {resource_id}", extra=log_data)


def log_session_operation(logger, operation: str, session_id: str, **kwargs):
    """
    Log session operations with structured data.
    
    Args:
        logger: Logger instance
        operation: Session operation
        session_id: Session identifier
        **kwargs: Additional operation details
    """
    log_data = {
        'event_type': 'session_operation',
        'operation': operation,
        'session_id': session_id
    }
    log_data.update(kwargs)
    
    logger.info(f"Session operation: {operation} for session {session_id}", extra=log_data)


def log_cloudwatch_operation(logger, operation: str, component: Optional[str] = None, 
                            cost_incurred: bool = False, **kwargs):
    """
    Log CloudWatch operations with structured data and cost tracking.
    
    Args:
        logger: Logger instance
        operation: CloudWatch operation name
        component: CloudWatch component (logs, metrics, alarms, dashboards)
        cost_incurred: Whether the operation incurred costs
        **kwargs: Additional operation parameters
    """
    log_data = {
        'operation_type': 'cloudwatch_operation',
        'operation': operation,
        'component': component,
        'cost_incurred': cost_incurred
    }
    log_data.update(kwargs)
    
    if cost_incurred:
        logger.warning(f"CloudWatch Operation (COST INCURRED): {operation}", extra=log_data)
    else:
        logger.info(f"CloudWatch Operation: {operation}", extra=log_data)


# CloudWatch-specific logging methods consolidated into log_cloudwatch_operation
# These specialized methods have been removed in favor of the generic log_cloudwatch_operation method


# Removed setup_cloudwatch_logging - use setup_logging instead with log_cloudwatch_operation for CloudWatch-specific events
```

--------------------------------------------------------------------------------
/setup.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
CFM Tips - AWS Cost Optimization MCP Server Setup Script

This script helps set up the CFM Tips AWS Cost Optimization MCP Server
for use with Amazon Q CLI and other MCP-compatible clients.
"""

import os
import sys
import json
import subprocess
import shlex
from pathlib import Path

def check_python_version():
    """Check if Python version is compatible."""
    if sys.version_info < (3, 8):
        print("❌ Python 3.8 or higher is required")
        print(f"   Current version: {sys.version}")
        return False
    print(f"✅ Python version: {sys.version.split()[0]}")
    return True

def check_dependencies():
    """Check if required dependencies are installed."""
    print("\n📦 Checking dependencies...")
    
    required_packages = ['boto3', 'mcp']
    missing_packages = []
    
    for package in required_packages:
        try:
            __import__(package)
            print(f"✅ {package} is installed")
        except ImportError:
            print(f"❌ {package} is missing")
            missing_packages.append(package)
    
    if missing_packages:
        print(f"\n📥 Installing missing packages: {', '.join(missing_packages)}")
        try:
            # Alternative 1: Use pip's Python API directly (most secure)
            import pip
            
            # Install each package individually using pip's internal API
            for package in missing_packages:
                try:
                    pip.main(['install', package])
                    print(f"✅ {package} installed successfully")
                except Exception as e:
                    print(f"❌ Failed to install {package}: {str(e)}")
                    return False
            
            print("✅ All dependencies installed successfully")
            return True
            
        except ImportError:
            # Alternative 2: Use importlib and sys.path manipulation
            print("⚠️  pip module not available, trying alternative method...")
            try:
                import importlib.util
                import site
                
                # Try to install using importlib (this is a fallback)
                print("❌ Cannot install packages without pip")
                print("💡 Please install missing packages manually:")
                for package in missing_packages:
                    print(f"   pip install {package}")
                return False
                
            except Exception as e:
                print(f"❌ Alternative installation method failed: {str(e)}")
                return False
        
        except Exception as e:
            print(f"❌ Failed to install dependencies: {str(e)}")
            return False
    
    return True

def check_aws_credentials():
    """Check if AWS credentials are configured."""
    print("\n🔐 Checking AWS credentials...")
    
    try:
        import boto3
        sts_client = boto3.client('sts')
        identity = sts_client.get_caller_identity()
        
        print("✅ AWS credentials are configured")
        print(f"   Account: {identity.get('Account', 'Unknown')}")
        print(f"   User/Role: {identity.get('Arn', 'Unknown')}")
        return True
        
    except Exception as e:
        print("❌ AWS credentials not configured or invalid")
        print(f"   Error: {str(e)}")
        print("\n💡 To configure AWS credentials:")
        print("   aws configure")
        print("   or set environment variables:")
        print("   export AWS_ACCESS_KEY_ID=your_access_key")
        print("   export AWS_SECRET_ACCESS_KEY=your_secret_key")
        print("   export AWS_DEFAULT_REGION=us-east-1")
        return False

def create_mcp_config():
    """Create or update MCP configuration file."""
    print("\n⚙️  Creating MCP configuration...")
    
    current_dir = os.getcwd()
    amazonq_dir = Path.home() / ".aws" / "amazonq"
    config_file = amazonq_dir / "mcp.json"
    
    # Create amazonq directory if it doesn't exist
    amazonq_dir.mkdir(parents=True, exist_ok=True)
    
    # Load existing config or create new one
    existing_config = {}
    if config_file.exists():
        try:
            with open(config_file, 'r', encoding="utf-8") as f:
                existing_config = json.load(f)
        except (json.JSONDecodeError, FileNotFoundError):
            existing_config = {}
    
    # Ensure mcpServers key exists
    if "mcpServers" not in existing_config:
        existing_config["mcpServers"] = {}
    
    # Add or update cfm-tips server config
    existing_config["mcpServers"]["cfm-tips"] = {
        "command": "python3",
        "args": [str(Path(current_dir) / "mcp_server_with_runbooks.py")],
        "env": {
            "AWS_DEFAULT_REGION": "us-east-1",
            "AWS_PROFILE": "default",
            "PYTHONPATH": current_dir
        }
    }
    
    # Write updated config
    with open(config_file, 'w', encoding="utf-8") as f:
        json.dump(existing_config, f, indent=2)
    
    # Also create local template for reference
    template_file = "mcp_runbooks.json"
    with open(template_file, 'w', encoding="utf-8") as f:
        json.dump(existing_config, f, indent=2)
    
    print(f"✅ MCP configuration updated: {config_file}")
    print(f"✅ Template created: {template_file}")
    return str(config_file)

def test_server():
    """Test the MCP server."""
    print("\n🧪 Testing MCP server...")
    
    try:
        # Alternative 1: Direct module import and testing (most secure)
        # This avoids subprocess entirely by importing the test module directly
        
        # Save current working directory
        original_cwd = os.getcwd()
        
        try:
            # Import the test module directly
            import test_runbooks
            
            # Run the main test function directly
            test_result = test_runbooks.main()
            
            if test_result:
                print("✅ Server tests passed")
                return True
            else:
                print("❌ Server tests failed")
                return False
                
        except ImportError as e:
            print(f"❌ Could not import test module: {str(e)}")
            
            # Alternative 2: Basic server validation without subprocess
            try:
                # Test basic imports that the server needs
                from mcp.server import Server
                from mcp.server.stdio import stdio_server
                import boto3
                
                # Try to import the server module
                import mcp_server_with_runbooks
                
                # Check if server object exists
                if hasattr(mcp_server_with_runbooks, 'server'):
                    print("✅ Server module validation passed")
                    return True
                else:
                    print("❌ Server object not found in module")
                    return False
                    
            except ImportError as import_err:
                print(f"❌ Server validation failed: {str(import_err)}")
                return False
        
        except Exception as test_err:
            print(f"❌ Test execution failed: {str(test_err)}")
            
            # Alternative 3: Minimal validation
            try:
                # Just check if we can import the main components
                import mcp_server_with_runbooks
                # runbook_functions is deprecated - functions are now in playbooks
                from playbooks.ec2.ec2_optimization import run_ec2_right_sizing_analysis
                
                print("✅ Basic server validation passed")
                return True
                
            except ImportError:
                print("❌ Basic server validation failed")
                return False
        
        finally:
            # Restore original working directory
            os.chdir(original_cwd)
            
    except Exception as e:
        print(f"❌ Error testing server: {str(e)}")
        print("⚠️  Continuing with setup - you can test manually later")
        return True  # Return True to continue setup even if tests fail

def show_usage_instructions(config_file):
    """Show usage instructions."""
    print("\n" + "=" * 60)
    print("🎉 CFM Tips AWS Cost Optimization MCP Server Setup Complete!")
    print("=" * 60)
    
    print("\n🚀 Quick Start:")
    print(f"   q chat ")
    
    print("\n💬 Example commands in Amazon Q:")
    examples = [
        "Run comprehensive cost analysis for us-east-1",
        "Find unused EBS volumes costing money",
        "Generate EC2 right-sizing report in markdown",
        "Show me idle RDS instances",
        "Identify unused Lambda functions"
    ]
    
    for example in examples:
        print(f"   \"{example}\"")
    
    print("\n🔧 Available tools:")
    tools = [
        "ec2_rightsizing - Find underutilized EC2 instances",
        "ebs_unused - Identify unused EBS volumes",
        "rds_idle - Find idle RDS databases",
        "lambda_unused - Identify unused Lambda functions",
        "comprehensive_analysis - Multi-service analysis"
    ]
    
    for tool in tools:
        print(f"   • {tool}")
    
    print("\n📚 Documentation:")
    print("   • README.md - Main documentation")
    print("   • RUNBOOKS_GUIDE.md - Detailed usage guide")
    print("   • CORRECTED_PERMISSIONS.md - IAM permissions")
    
    print("\n🔍 Troubleshooting:")
    print("   • python3 diagnose_cost_optimization_hub_v2.py")
    print("   • python3 test_runbooks.py")
    
    print("\n💡 Tips:")
    print("   • Ensure your AWS resources have been running for 14+ days for metrics")
    print("   • Apply the IAM permissions from CORRECTED_PERMISSIONS.md")
    print("   • Enable Cost Optimization Hub in AWS Console if needed")

def main():
    """Main setup function."""
    print("CFM Tips - AWS Cost Optimization MCP Server Setup")
    print("=" * 55)
    
    # Check prerequisites
    if not check_python_version():
        sys.exit(1)
    
    if not check_dependencies():
        sys.exit(1)
    
    # AWS credentials check (warning only)
    aws_ok = check_aws_credentials()
    
    # Create configuration
    config_file = create_mcp_config()
    
    # Test server
    test_ok = test_server()
    
    # Show results
    print("\n" + "=" * 60)
    print("Setup Summary:")
    print(f"✅ Python version: OK")
    print(f"✅ Dependencies: OK")
    print(f"{'✅' if aws_ok else '⚠️ '} AWS credentials: {'OK' if aws_ok else 'Needs configuration'}")
    print(f"✅ MCP configuration: OK")
    print(f"{'✅' if test_ok else '⚠️ '} Server tests: {'OK' if test_ok else 'Check manually'}")
    
    if aws_ok and test_ok:
        show_usage_instructions(config_file)
        print("\n🎯 Ready to use! Start with:")
        print(f"   q chat ")
    else:
        print("\n⚠️  Setup completed with warnings. Please address the issues above.")
        if not aws_ok:
            print("   Configure AWS credentials: aws configure")
        if not test_ok:
            print("   Test manually: python3 test_runbooks.py")

if __name__ == "__main__":
    main()

```

--------------------------------------------------------------------------------
/playbooks/rds_optimization.py:
--------------------------------------------------------------------------------

```python
"""
RDS Optimization Playbook

This module implements the RDS Optimization playbook from AWS Cost Optimization Playbooks.
"""

import logging
import boto3
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
from botocore.exceptions import ClientError
from services.trusted_advisor import get_trusted_advisor_checks
from services.performance_insights import get_performance_insights_metrics

logger = logging.getLogger(__name__)

def get_underutilized_rds_instances(
    region: Optional[str] = None,
    lookback_period_days: int = 14,
    cpu_threshold: float = 40.0,
    connection_threshold: float = 20.0
) -> Dict[str, Any]:
    """
    Identify underutilized RDS instances using multiple data sources with fallback logic.
    Priority: 1) Performance Insights 2) Trusted Advisor 3) CloudWatch direct
    """
    
    # Try Performance Insights first (primary)
    try:
        logger.info("Attempting RDS analysis with Performance Insights")
        result = _get_rds_from_performance_insights(region, lookback_period_days, cpu_threshold)
        if result["status"] == "success" and result["data"]["count"] > 0:
            result["data_source"] = "Performance Insights"
            return result
    except Exception as e:
        logger.warning(f"Performance Insights failed: {str(e)}")
    
    # Try Trusted Advisor (secondary)
    try:
        logger.info("Attempting RDS analysis with Trusted Advisor")
        result = _get_rds_from_trusted_advisor(region)
        if result["status"] == "success" and result["data"]["count"] > 0:
            result["data_source"] = "Trusted Advisor"
            return result
    except Exception as e:
        logger.warning(f"Trusted Advisor failed: {str(e)}")
    
    # Try CloudWatch direct (tertiary)
    try:
        logger.info("Attempting RDS analysis with CloudWatch")
        result = _get_rds_from_cloudwatch(region, lookback_period_days, cpu_threshold)
        result["data_source"] = "CloudWatch"
        return result
    except Exception as e:
        logger.error(f"All data sources failed. CloudWatch error: {str(e)}")
        return {
            "status": "error",
            "message": f"All data sources unavailable. Last error: {str(e)}",
            "attempted_sources": ["Performance Insights", "Trusted Advisor", "CloudWatch"]
        }

def _get_rds_from_performance_insights(region: Optional[str], lookback_period_days: int, cpu_threshold: float) -> Dict[str, Any]:
    """Get underutilized RDS instances from Performance Insights"""
    if region:
        rds_client = boto3.client('rds', region_name=region)
    else:
        rds_client = boto3.client('rds')
        
    response = rds_client.describe_db_instances()
    underutilized_instances = []
    
    for db_instance in response['DBInstances']:
        db_instance_identifier = db_instance['DBInstanceIdentifier']
        
        try:
            # Try to get Performance Insights metrics
            pi_result = get_performance_insights_metrics(db_instance_identifier)
            
            if pi_result["status"] == "success":
                # Analyze PI data for utilization patterns
                metrics = pi_result["data"].get("MetricList", [])
                
                # Simple analysis - in production would be more sophisticated
                low_utilization = True  # Placeholder logic
                
                if low_utilization:
                    underutilized_instances.append({
                        'db_instance_identifier': db_instance_identifier,
                        'db_instance_class': db_instance['DBInstanceClass'],
                        'engine': db_instance['Engine'],
                        'finding': 'Low Performance Insights metrics',
                        'recommendation': {
                            'action': 'Consider downsizing',
                            'estimated_monthly_savings': _calculate_rds_savings(db_instance['DBInstanceClass'])
                        }
                    })
        except Exception:
            continue
    
    return {
        "status": "success",
        "data": {
            "underutilized_instances": underutilized_instances,
            "count": len(underutilized_instances)
        },
        "message": f"Found {len(underutilized_instances)} underutilized RDS instances via Performance Insights"
    }

def _get_rds_from_trusted_advisor(region: Optional[str]) -> Dict[str, Any]:
    """Get underutilized RDS instances from Trusted Advisor"""
    ta_result = get_trusted_advisor_checks(["cost_optimizing"])
    
    if ta_result["status"] != "success":
        raise Exception("Trusted Advisor not available")
    
    underutilized_instances = []
    checks = ta_result["data"].get("checks", [])
    
    for check in checks:
        if "Idle DB Instances" in check.get('name', '') or "Low Utilization Amazon RDS" in check.get('name', ''):
            resources = check.get('result', {}).get('flaggedResources', [])
            for resource in resources:
                underutilized_instances.append({
                    'db_instance_identifier': resource.get('resourceId', 'unknown'),
                    'db_instance_class': resource.get('metadata', {}).get('Instance Class', 'unknown'),
                    'engine': resource.get('metadata', {}).get('Engine', 'unknown'),
                    'finding': 'Trusted Advisor flagged',
                    'recommendation': {
                        'action': 'Review and consider downsizing',
                        'estimated_monthly_savings': _calculate_rds_savings(resource.get('metadata', {}).get('Instance Class', 'db.t3.micro'))
                    }
                })
    
    return {
        "status": "success",
        "data": {
            "underutilized_instances": underutilized_instances,
            "count": len(underutilized_instances)
        },
        "message": f"Found {len(underutilized_instances)} underutilized RDS instances via Trusted Advisor"
    }

def _get_rds_from_cloudwatch(region: Optional[str], lookback_period_days: int, cpu_threshold: float) -> Dict[str, Any]:
    """Get underutilized RDS instances from CloudWatch metrics directly"""
    if region:
        rds_client = boto3.client('rds', region_name=region)
        cloudwatch_client = boto3.client('cloudwatch', region_name=region)
    else:
        rds_client = boto3.client('rds')
        cloudwatch_client = boto3.client('cloudwatch')
    
    # Use pagination for RDS instances
    paginator = rds_client.get_paginator('describe_db_instances')
    page_iterator = paginator.paginate()
    
    end_time = datetime.utcnow()
    start_time = end_time - timedelta(days=lookback_period_days)
    underutilized_instances = []
    
    # Process each page of DB instances
    for page in page_iterator:
        for db_instance in page['DBInstances']:
            db_instance_identifier = db_instance['DBInstanceIdentifier']
        
        try:
            cpu_response = cloudwatch_client.get_metric_statistics(
                Namespace='AWS/RDS',
                MetricName='CPUUtilization',
                Dimensions=[{'Name': 'DBInstanceIdentifier', 'Value': db_instance_identifier}],
                StartTime=start_time,
                EndTime=end_time,
                Period=86400,
                Statistics=['Average']
            )
            
            if cpu_response['Datapoints']:
                avg_cpu = sum(dp['Average'] for dp in cpu_response['Datapoints']) / len(cpu_response['Datapoints'])

                if avg_cpu < cpu_threshold:
                    underutilized_instances.append({
                        'db_instance_identifier': db_instance_identifier,
                        'db_instance_class': db_instance['DBInstanceClass'],
                        'engine': db_instance['Engine'],
                        'avg_cpu_utilization': round(avg_cpu, 2),
                        'finding': 'Low CPU Utilization',
                        'recommendation': {
                            'action': 'Consider downsizing',
                            'estimated_monthly_savings': _calculate_rds_savings(db_instance['DBInstanceClass'])
                        }
                    })
        except Exception:
            continue
    
    return {
        "status": "success",
        "data": {
            "underutilized_instances": underutilized_instances,
            "count": len(underutilized_instances)
        },
        "message": f"Found {len(underutilized_instances)} underutilized RDS instances via CloudWatch"
    }

def identify_idle_rds_instances(
    region: Optional[str] = None,
    lookback_period_days: int = 7,
    connection_threshold: float = 1.0
) -> Dict[str, Any]:
    """Identify idle RDS instances."""
    try:
        if region:
            rds_client = boto3.client('rds', region_name=region)
            cloudwatch_client = boto3.client('cloudwatch', region_name=region)
        else:
            rds_client = boto3.client('rds')
            cloudwatch_client = boto3.client('cloudwatch')
            
        # Use pagination for RDS instances
        paginator = rds_client.get_paginator('describe_db_instances')
        page_iterator = paginator.paginate()
        
        end_time = datetime.utcnow()
        start_time = end_time - timedelta(days=lookback_period_days)
        
        idle_instances = []
        
        # Process each page of DB instances
        for page in page_iterator:
            for db_instance in page['DBInstances']:
                db_instance_identifier = db_instance['DBInstanceIdentifier']
            
                try:
                    connection_response = cloudwatch_client.get_metric_statistics(
                        Namespace='AWS/RDS',
                        MetricName='DatabaseConnections',
                        Dimensions=[{'Name': 'DBInstanceIdentifier', 'Value': db_instance_identifier}],
                        StartTime=start_time,
                        EndTime=end_time,
                        Period=86400,
                        Statistics=['Maximum']
                    )
                    
                    if connection_response['Datapoints']:
                        max_connections = max(dp['Maximum'] for dp in connection_response['Datapoints'])
                        
                        if max_connections <= connection_threshold:
                            idle_instances.append({
                                'db_instance_identifier': db_instance_identifier,
                                'db_instance_class': db_instance['DBInstanceClass'],
                                'max_connections': max_connections
                            })
                            
                except Exception as e:
                    logger.warning(f"Error getting metrics for {db_instance_identifier}: {str(e)}")
                    continue
        
        return {
            "status": "success",
            "data": {
                "idle_instances": idle_instances,
                "count": len(idle_instances)
            },
            "message": f"Found {len(idle_instances)} idle RDS instances"
        }
        
    except Exception as e:
        return {"status": "error", "message": str(e)}

def _calculate_rds_savings(instance_class: str) -> float:
    """Calculate estimated RDS savings."""
    try:
        from services.pricing import get_rds_pricing
        
        pricing_result = get_rds_pricing(instance_class)
        if pricing_result.get('status') == 'success':
            return pricing_result.get('monthly_price', 100) * 0.3
        return 60
    except Exception:
        return 60

if __name__ == '__main__':
    rds = identify_idle_rds_instances()
    print(rds)
```

--------------------------------------------------------------------------------
/playbooks/lambda_optimization.py:
--------------------------------------------------------------------------------

```python
"""
Lambda Optimization Playbook

This module implements the Lambda Optimization playbook from AWS Cost Optimization Playbooks.
"""

import logging
import boto3
from typing import Dict, Any, Optional
from datetime import datetime, timedelta
from services.compute_optimizer import get_lambda_recommendations
from services.trusted_advisor import get_trusted_advisor_checks

logger = logging.getLogger(__name__)

def get_underutilized_lambda_functions(
    region: Optional[str] = None,
    lookback_period_days: int = 14,
    memory_utilization_threshold: float = 50.0,
    min_invocations: int = 100
) -> Dict[str, Any]:
    """
    Identify underutilized Lambda functions using multiple data sources with fallback logic.
    Priority: 1) Compute Optimizer 2) Trusted Advisor 3) CloudWatch direct
    """
    
    # Try Compute Optimizer first (primary)
    try:
        logger.info("Attempting Lambda analysis with Compute Optimizer")
        result = _get_lambda_from_compute_optimizer(region, lookback_period_days)
        if result["status"] == "success" and result["data"]["count"] > 0:
            result["data_source"] = "Compute Optimizer"
            return result
    except Exception as e:
        logger.warning(f"Compute Optimizer failed: {str(e)}")
    
    # Try Trusted Advisor (secondary)
    try:
        logger.info("Attempting Lambda analysis with Trusted Advisor")
        result = _get_lambda_from_trusted_advisor(region)
        if result["status"] == "success" and result["data"]["count"] > 0:
            result["data_source"] = "Trusted Advisor"
            return result
    except Exception as e:
        logger.warning(f"Trusted Advisor failed: {str(e)}")
    
    # Try CloudWatch direct (tertiary)
    try:
        logger.info("Attempting Lambda analysis with CloudWatch")
        result = _get_lambda_from_cloudwatch(region, lookback_period_days, min_invocations)
        result["data_source"] = "CloudWatch"
        return result
    except Exception as e:
        logger.error(f"All data sources failed. CloudWatch error: {str(e)}")
        return {
            "status": "error",
            "message": f"All data sources unavailable. Last error: {str(e)}",
            "attempted_sources": ["Compute Optimizer", "Trusted Advisor", "CloudWatch"]
        }

def _get_lambda_from_compute_optimizer(region: Optional[str], lookback_period_days: int) -> Dict[str, Any]:
    """Get underutilized Lambda functions from Compute Optimizer"""
    recommendations_result = get_lambda_recommendations(region=region)
    
    if recommendations_result["status"] != "success":
        raise Exception("Compute Optimizer not available")
        
    recommendations = recommendations_result["data"].get("lambdaFunctionRecommendations", [])
    analyzed_functions = []
    
    for rec in recommendations:
        if rec.get('finding') in ['Underprovisioned', 'Overprovisioned']:
            analyzed_functions.append({
                'function_name': rec.get('functionName', 'unknown'),
                'memory_size_mb': rec.get('currentMemorySize', 0),
                'finding': rec.get('finding', 'unknown'),
                'recommendation': {
                    'recommended_memory_size': rec.get('memorySizeRecommendationOptions', [{}])[0].get('memorySize', 0),
                    'estimated_monthly_savings': rec.get('memorySizeRecommendationOptions', [{}])[0].get('estimatedMonthlySavings', {}).get('value', 0)
                }
            })
    
    return {
        "status": "success",
        "data": {
            "analyzed_functions": analyzed_functions,
            "count": len(analyzed_functions)
        },
        "message": f"Found {len(analyzed_functions)} Lambda functions with optimization opportunities via Compute Optimizer"
    }

def _get_lambda_from_trusted_advisor(region: Optional[str]) -> Dict[str, Any]:
    """Get underutilized Lambda functions from Trusted Advisor"""
    ta_result = get_trusted_advisor_checks(["cost_optimizing"])
    
    if ta_result["status"] != "success":
        raise Exception("Trusted Advisor not available")
    
    analyzed_functions = []
    checks = ta_result["data"].get("checks", [])
    
    for check in checks:
        if "AWS Lambda Functions with High Error Rates" in check.get('name', '') or "Over-provisioned Lambda" in check.get('name', ''):
            resources = check.get('result', {}).get('flaggedResources', [])
            for resource in resources:
                analyzed_functions.append({
                    'function_name': resource.get('resourceId', 'unknown'),
                    'memory_size_mb': int(resource.get('metadata', {}).get('Memory Size', '0')),
                    'finding': 'Trusted Advisor flagged',
                    'recommendation': {
                        'action': 'Review memory allocation',
                        'estimated_monthly_savings': _calculate_lambda_savings(int(resource.get('metadata', {}).get('Memory Size', '128')))
                    }
                })
    
    return {
        "status": "success",
        "data": {
            "analyzed_functions": analyzed_functions,
            "count": len(analyzed_functions)
        },
        "message": f"Found {len(analyzed_functions)} Lambda functions with issues via Trusted Advisor"
    }

def _get_lambda_from_cloudwatch(region: Optional[str], lookback_period_days: int, min_invocations: int) -> Dict[str, Any]:
    """Get underutilized Lambda functions from CloudWatch metrics directly"""
    if region:
        lambda_client = boto3.client('lambda', region_name=region)
        cloudwatch_client = boto3.client('cloudwatch', region_name=region)
    else:
        lambda_client = boto3.client('lambda')
        cloudwatch_client = boto3.client('cloudwatch')
    
    # Implement pagination for list_functions
    analyzed_functions = []
    end_time = datetime.utcnow()
    start_time = end_time - timedelta(days=lookback_period_days)
    
    # Use pagination with marker
    paginator = lambda_client.get_paginator('list_functions')
    page_iterator = paginator.paginate()
    
    for page in page_iterator:
        for function in page['Functions']:
            function_name = function['FunctionName']
        
        try:
            # Get invocation metrics
            invocation_response = cloudwatch_client.get_metric_statistics(
                Namespace='AWS/Lambda',
                MetricName='Invocations',
                Dimensions=[{'Name': 'FunctionName', 'Value': function_name}],
                StartTime=start_time,
                EndTime=end_time,
                Period=86400,
                Statistics=['Sum']
            )
            
            # Get duration metrics for memory analysis
            duration_response = cloudwatch_client.get_metric_statistics(
                Namespace='AWS/Lambda',
                MetricName='Duration',
                Dimensions=[{'Name': 'FunctionName', 'Value': function_name}],
                StartTime=start_time,
                EndTime=end_time,
                Period=86400,
                Statistics=['Average']
            )
            
            if invocation_response['Datapoints'] and duration_response['Datapoints']:
                total_invocations = sum(dp['Sum'] for dp in invocation_response['Datapoints'])
                avg_duration = sum(dp['Average'] for dp in duration_response['Datapoints']) / len(duration_response['Datapoints'])
                
                if total_invocations >= min_invocations:
                    # Simple heuristic: if duration is very low, might be over-provisioned
                    if avg_duration < 1000:  # Less than 1 second average
                        analyzed_functions.append({
                            'function_name': function_name,
                            'memory_size_mb': function['MemorySize'],
                            'total_invocations': int(total_invocations),
                            'avg_duration_ms': round(avg_duration, 2),
                            'finding': 'Potentially over-provisioned memory',
                            'recommendation': {
                                'action': 'Consider reducing memory allocation',
                                'estimated_monthly_savings': _calculate_lambda_savings(function['MemorySize'])
                            }
                        })
        except Exception:
            continue
    
    return {
        "status": "success",
        "data": {
            "analyzed_functions": analyzed_functions,
            "count": len(analyzed_functions)
        },
        "message": f"Analyzed {len(analyzed_functions)} Lambda functions via CloudWatch"
    }

def identify_unused_lambda_functions(
    region: Optional[str] = None,
    lookback_period_days: int = 30,
    max_invocations: int = 5
) -> Dict[str, Any]:
    """Identify unused Lambda functions."""
    try:
        if region:
            lambda_client = boto3.client('lambda', region_name=region)
            cloudwatch_client = boto3.client('cloudwatch', region_name=region)
        else:
            lambda_client = boto3.client('lambda')
            cloudwatch_client = boto3.client('cloudwatch')
        
        # Implement pagination for listing functions
        end_time = datetime.utcnow()
        start_time = end_time - timedelta(days=lookback_period_days)
        unused_functions = []
        
        # Use paginator for list_functions
        paginator = lambda_client.get_paginator('list_functions')
        page_iterator = paginator.paginate()
        
        for page in page_iterator:
            for function in page['Functions']:
                function_name = function['FunctionName']
            
            try:
                invocation_response = cloudwatch_client.get_metric_statistics(
                    Namespace='AWS/Lambda',
                    MetricName='Invocations',
                    Dimensions=[{'Name': 'FunctionName', 'Value': function_name}],
                    StartTime=start_time,
                    EndTime=end_time,
                    Period=86400,
                    Statistics=['Sum']
                )
                
                total_invocations = 0
                if invocation_response['Datapoints']:
                    total_invocations = sum(dp['Sum'] for dp in invocation_response['Datapoints'])
                
                if total_invocations <= max_invocations:
                    unused_functions.append({
                        'function_name': function_name,
                        'memory_size_mb': function['MemorySize'],
                        'total_invocations': int(total_invocations),
                        'runtime': function.get('Runtime', '')
                    })
                    
            except Exception as e:
                logger.warning(f"Error getting metrics for {function_name}: {str(e)}")
                continue
        
        return {
            "status": "success",
            "data": {
                "unused_functions": unused_functions,
                "count": len(unused_functions)
            },
            "message": f"Found {len(unused_functions)} unused Lambda functions"
        }
        
    except Exception as e:
        return {"status": "error", "message": str(e)}

def _calculate_lambda_savings(memory_size: int) -> float:
    """Calculate estimated Lambda savings."""
    try:
        from services.pricing import get_lambda_pricing
        
        pricing_result = get_lambda_pricing(memory_size)
        if pricing_result.get('status') == 'success':
            # Estimate savings based on memory optimization
            return (memory_size / 1024) * 0.0000166667 * 100000 * 0.3  # 30% savings estimate
        return 20
    except Exception:
        return 20
```

--------------------------------------------------------------------------------
/playbooks/cloudtrail_optimization.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
CloudTrail Optimization Playbook

This playbook checks for multiple management event trails in AWS CloudTrail,
which could represent a cost optimization opportunity.

Multiple trails capturing the same management events can lead to unnecessary costs.
"""

import boto3
import logging
from datetime import datetime
from botocore.exceptions import ClientError

# Configure logging
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

class CloudTrailOptimization:
    """
    CloudTrail optimization playbook to identify cost-saving opportunities
    related to duplicate management event trails.
    """
    
    def __init__(self, region=None):
        """
        Initialize the CloudTrail optimization playbook.
        
        Args:
            region (str, optional): AWS region to analyze. If None, uses the default region.
        """
        self.region = region
        self.client = boto3.client('cloudtrail', region_name=region) if region else boto3.client('cloudtrail')
        
    def analyze_trails(self):
        """
        Analyze CloudTrail trails to identify multiple management event trails.
        
        Returns:
            dict: Analysis results including optimization recommendations.
        """
        try:
            # Get all trails using pagination
            trails = []
            next_token = None
            
            while True:
                # Prepare pagination parameters
                params = {}
                if next_token:
                    params['NextToken'] = next_token
                    
                # Make the API call
                response = self.client.list_trails(**params)
                trails.extend(response.get('Trails', []))
                
                # Check if there are more results
                if 'NextToken' in response:
                    next_token = response['NextToken']
                else:
                    break
            
            # Get detailed information for each trail
            management_event_trails = []
            for trail in trails:
                trail_arn = trail.get('TrailARN')
                trail_name = trail.get('Name')
                
                # Get trail status and configuration
                trail_info = self.client.get_trail(Name=trail_arn)
                trail_status = self.client.get_trail_status(Name=trail_arn)
                
                # Check if the trail is logging management events
                event_selectors = self.client.get_event_selectors(TrailName=trail_arn)
                
                has_management_events = False
                for selector in event_selectors.get('EventSelectors', []):
                    # Only include if management events are explicitly enabled
                    if selector.get('IncludeManagementEvents') is True:
                        has_management_events = True
                        break
                
                # Only include trails that actually have management events enabled
                if has_management_events:
                    management_event_trails.append({
                        'name': trail_name,
                        'arn': trail_arn,
                        'is_multi_region': trail_info.get('Trail', {}).get('IsMultiRegionTrail', False),
                        'is_organization': trail_info.get('Trail', {}).get('IsOrganizationTrail', False),
                        'logging_enabled': trail_status.get('IsLogging', False),
                        'region': self.region or 'default'
                    })
            
            # Analyze results
            result = {
                'status': 'success',
                'analysis_type': 'CloudTrail Optimization',
                'timestamp': datetime.now().isoformat(),
                'region': self.region or 'default',
                'data': {
                    'total_trails': len(management_event_trails),
                    'management_event_trails': len(management_event_trails),
                    'trails_details': management_event_trails
                },
                'recommendations': []
            }
            
            # Generate recommendations based on findings
            if len(management_event_trails) > 1:
                # Multiple management event trails found - potential optimization opportunity
                estimated_savings = (len(management_event_trails) - 1) * 2  # $2 per trail per month after the first one
                
                result['message'] = f"Found {len(management_event_trails)} trails capturing management events. Consider consolidation."
                result['recommendations'] = [
                    "Consolidate multiple management event trails into a single trail to reduce costs",
                    f"Potential monthly savings: ${estimated_savings:.2f}",
                    "Ensure the consolidated trail captures all required events and regions",
                    "Consider using CloudTrail Lake for more cost-effective querying of events"
                ]
                result['optimization_opportunity'] = True
                result['estimated_monthly_savings'] = estimated_savings
            else:
                result['message'] = "No duplicate management event trails found."
                result['optimization_opportunity'] = False
                result['estimated_monthly_savings'] = 0
            
            return result
            
        except ClientError as e:
            logger.error(f"Error analyzing CloudTrail trails: {e}")
            return {
                'status': 'error',
                'message': f"Failed to analyze CloudTrail trails: {str(e)}",
                'error': str(e)
            }
    
    def generate_report(self, format='json'):
        """
        Generate a CloudTrail optimization report showing only trails with management events.
        
        Args:
            format (str): Output format ('json' or 'markdown')
            
        Returns:
            dict or str: Report in the specified format
        """
        analysis_result = self.analyze_trails()
        
        if format.lower() == 'markdown':
            # Generate markdown report
            md_report = f"# CloudTrail Optimization Report - Management Events Only\n\n"
            md_report += f"**Region**: {analysis_result.get('region', 'All regions')}\n"
            md_report += f"**Analysis Date**: {analysis_result.get('timestamp')}\n\n"
            
            # Only show trails with management events enabled
            management_trails = analysis_result.get('data', {}).get('trails_details', [])
            
            md_report += f"## Summary\n"
            md_report += f"- Trails with management events enabled: {len(management_trails)}\n"
            
            if analysis_result.get('optimization_opportunity', False):
                md_report += f"- Optimization opportunity: **YES**\n"
                md_report += f"- Estimated monthly savings: **${analysis_result.get('estimated_monthly_savings', 0):.2f}**\n"
            else:
                md_report += f"- Optimization opportunity: No\n"
            
            if management_trails:
                md_report += f"\n## Management Event Trails ({len(management_trails)})\n"
                for trail in management_trails:
                    md_report += f"\n### {trail.get('name')}\n"
                    md_report += f"- ARN: {trail.get('arn')}\n"
                    md_report += f"- Multi-region: {'Yes' if trail.get('is_multi_region') else 'No'}\n"
                    md_report += f"- Organization trail: {'Yes' if trail.get('is_organization') else 'No'}\n"
                    md_report += f"- Logging enabled: {'Yes' if trail.get('logging_enabled') else 'No'}\n"
                
                if len(management_trails) > 1:
                    md_report += f"\n## Recommendations\n"
                    for rec in analysis_result.get('recommendations', []):
                        md_report += f"- {rec}\n"
            else:
                md_report += f"\n## Management Event Trails\nNo trails with management events enabled found.\n"
            
            return md_report
        else:
            # Return JSON format with only management event trails
            filtered_result = analysis_result.copy()
            filtered_result['data']['trails_shown'] = 'management_events_only'
            return filtered_result


def run_cloudtrail_optimization(region=None):
    """
    Run the CloudTrail optimization playbook.
    
    Args:
        region (str, optional): AWS region to analyze
        
    Returns:
        dict: Analysis results
    """
    optimizer = CloudTrailOptimization(region=region)
    return optimizer.analyze_trails()


def generate_cloudtrail_report(region=None, format='json'):
    """
    Generate a CloudTrail optimization report.
    
    Args:
        region (str, optional): AWS region to analyze
        format (str): Output format ('json' or 'markdown')
        
    Returns:
        dict or str: Report in the specified format
    """
    optimizer = CloudTrailOptimization(region=region)
    return optimizer.generate_report(format=format)


def get_management_trails(region=None):
    """
    Get CloudTrail trails that have management events enabled.
    
    Args:
        region (str, optional): AWS region to analyze
        
    Returns:
        list: List of trails with management events enabled
    """
    try:
        client = boto3.client('cloudtrail', region_name=region) if region else boto3.client('cloudtrail')
        
        # Get all trails using pagination
        trails = []
        next_token = None
        
        while True:
            # Prepare pagination parameters
            params = {}
            if next_token:
                params['NextToken'] = next_token
                
            # Make the API call
            response = client.list_trails(**params)
            trails.extend(response.get('Trails', []))
            
            # Check if there are more results
            if 'NextToken' in response:
                next_token = response['NextToken']
            else:
                break
        
        management_trails = []
        
        for trail in trails:
            trail_arn = trail.get('TrailARN')
            trail_name = trail.get('Name')
            
            try:
                # Get trail configuration
                trail_info = client.get_trail(Name=trail_arn)
                
                # Check event selectors to see if management events are enabled
                event_selectors = client.get_event_selectors(TrailName=trail_arn)
                
                has_management_events = False
                for selector in event_selectors.get('EventSelectors', []):
                    # Check if this selector explicitly includes management events
                    if selector.get('IncludeManagementEvents') is True:
                        has_management_events = True
                        break
                
                # Only include trails that actually have management events enabled
                if has_management_events:
                    management_trails.append({
                        'name': trail_name,
                        'arn': trail_arn,
                        'region': region or trail.get('HomeRegion', 'us-east-1'),
                        'is_multi_region': trail_info.get('Trail', {}).get('IsMultiRegionTrail', False),
                        'is_organization_trail': trail_info.get('Trail', {}).get('IsOrganizationTrail', False)
                    })
                    
            except ClientError as e:
                logger.warning(f"Could not get details for trail {trail_name}: {e}")
                continue
        print(management_trails)
        return management_trails
        
    except ClientError as e:
        logger.error(f"Error getting management trails: {e}")
        return []


if __name__ == "__main__":
    # Run the playbook directly if executed as a script
    result = run_cloudtrail_optimization()
    print(result)

```

--------------------------------------------------------------------------------
/tests/performance/cloudwatch/test_cloudwatch_performance.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Performance tests for CloudWatch optimization components.
"""

import sys
import os

# Add the project root to the Python path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../../..'))

def test_core_performance_components():
    """Test core performance optimization components."""
    print("Testing CloudWatch performance optimization components...")
    
    try:
        # Test imports only - don't initialize global caches
        from utils.cloudwatch_cache import CloudWatchMetadataCache, CloudWatchAnalysisCache
        from utils.performance_monitor import get_performance_monitor
        from utils.memory_manager import get_memory_manager
        from utils.progressive_timeout import get_timeout_handler
        
        print("✅ All performance optimization imports successful")
        
        # Test that classes can be instantiated (but don't keep instances)
        print(f"✅ CloudWatch metadata cache class available: {CloudWatchMetadataCache.__name__}")
        print(f"✅ CloudWatch analysis cache class available: {CloudWatchAnalysisCache.__name__}")
        
        # Test performance components (these should be stateless or properly managed)
        perf_monitor = get_performance_monitor()
        memory_manager = get_memory_manager()
        timeout_handler = get_timeout_handler()
        
        print(f"✅ Performance monitor initialized: {type(perf_monitor).__name__}")
        print(f"✅ Memory manager initialized: {type(memory_manager).__name__}")
        print(f"✅ Timeout handler initialized: {type(timeout_handler).__name__}")
        
        return True
        
    except Exception as e:
        print(f"❌ Error in core components test: {str(e)}")
        import traceback
        traceback.print_exc()
        return False

def test_cloudwatch_cache_operations():
    """Test CloudWatch cache operations with isolated instances."""
    print("\nTesting CloudWatch cache operations...")
    
    metadata_cache = None
    analysis_cache = None
    
    try:
        from utils.cloudwatch_cache import CloudWatchMetadataCache, CloudWatchAnalysisCache
        
        # Create isolated cache instances for testing
        metadata_cache = CloudWatchMetadataCache()
        region = "us-east-1"
        
        # Test alarm metadata
        alarm_data = {"alarm_name": "test-alarm", "state": "OK", "region": region}
        metadata_cache.put_alarm_metadata(region, alarm_data, "test-alarm")
        
        retrieved_data = metadata_cache.get_alarm_metadata(region, "test-alarm")
        assert retrieved_data == alarm_data, "Alarm metadata retrieval failed"
        
        print("✅ Alarm metadata cache operations working")
        
        # Test dashboard metadata
        dashboard_data = {"dashboard_name": "test-dashboard", "widgets": 5}
        metadata_cache.put_dashboard_metadata(region, dashboard_data, "test-dashboard")
        
        retrieved_dashboard = metadata_cache.get_dashboard_metadata(region, "test-dashboard")
        assert retrieved_dashboard == dashboard_data, "Dashboard metadata retrieval failed"
        
        print("✅ Dashboard metadata cache operations working")
        
        # Test analysis cache
        analysis_cache = CloudWatchAnalysisCache()
        analysis_type = "general_spend"
        parameters_hash = "test_hash_123"
        
        analysis_result = {
            "status": "success",
            "analysis_type": analysis_type,
            "recommendations": ["Optimize log retention", "Reduce custom metrics"],
            "cost_savings": 150.50
        }
        
        analysis_cache.put_analysis_result(analysis_type, region, parameters_hash, analysis_result)
        retrieved_result = analysis_cache.get_analysis_result(analysis_type, region, parameters_hash)
        assert retrieved_result == analysis_result, "Analysis result retrieval failed"
        
        print("✅ Analysis cache operations working")
        
        return True
        
    except Exception as e:
        print(f"❌ Error in cache operations test: {str(e)}")
        import traceback
        traceback.print_exc()
        return False
    
    finally:
        # Clean up isolated cache instances
        if metadata_cache and hasattr(metadata_cache, 'shutdown'):
            metadata_cache.shutdown()
        if analysis_cache and hasattr(analysis_cache, 'shutdown'):
            analysis_cache.shutdown()

def test_performance_monitoring():
    """Test performance monitoring functionality."""
    print("\nTesting performance monitoring...")
    
    try:
        from utils.performance_monitor import get_performance_monitor
        
        perf_monitor = get_performance_monitor()
        
        # Test analysis monitoring
        session_id = perf_monitor.start_analysis_monitoring("test_analysis", "test_execution_123")
        assert session_id is not None, "Failed to start analysis monitoring"
        
        # Record some metrics
        perf_monitor.record_cache_hit("cloudwatch_metadata", "test_analysis")
        perf_monitor.record_cache_miss("cloudwatch_analysis", "test_analysis")
        perf_monitor.record_api_call("cloudwatch", "describe_alarms", "test_analysis")
        
        # End monitoring
        perf_data = perf_monitor.end_analysis_monitoring(session_id, success=True)
        assert perf_data is not None, "Failed to end analysis monitoring"
        assert perf_data.analysis_type == "test_analysis", "Analysis type mismatch"
        
        # Get performance summary
        summary = perf_monitor.get_performance_summary()
        assert "cache_performance" in summary, "Cache performance not in summary"
        assert "system_metrics" in summary, "System metrics not in summary"
        
        print("✅ Performance monitoring working")
        
        return True
        
    except Exception as e:
        print(f"❌ Error in performance monitoring test: {str(e)}")
        import traceback
        traceback.print_exc()
        return False

def test_memory_management():
    """Test memory management functionality."""
    print("\nTesting memory management...")
    
    try:
        from utils.memory_manager import get_memory_manager
        
        memory_manager = get_memory_manager()
        
        # Test memory tracking
        tracker = memory_manager.start_memory_tracking("test_cloudwatch_analysis")
        assert tracker is not None, "Failed to start memory tracking"
        
        # Test with a class that supports weak references
        class TestObject:
            def __init__(self, data):
                self.data = data
        
        test_obj = TestObject({"large_dataset": list(range(1000))})
        
        # Register the object
        memory_manager.register_large_object(
            "test_object_123",
            test_obj,
            size_mb=1.0,
            cleanup_callback=lambda: None
        )
        
        # Stop tracking
        memory_stats = memory_manager.stop_memory_tracking("test_cloudwatch_analysis")
        assert memory_stats is not None, "Failed to stop memory tracking"
        assert "memory_delta_mb" in memory_stats, "Memory delta not in stats"
        
        # Get memory statistics
        stats = memory_manager.get_memory_statistics()
        assert "current_memory" in stats, "Current memory not in stats"
        assert "thresholds" in stats, "Thresholds not in stats"
        
        # Cleanup
        success = memory_manager.cleanup_large_object("test_object_123")
        assert success is True, "Failed to cleanup large object"
        
        print("✅ Memory management working")
        
        return True
        
    except Exception as e:
        print(f"❌ Error in memory management test: {str(e)}")
        import traceback
        traceback.print_exc()
        return False

def test_progressive_timeouts():
    """Test progressive timeout functionality."""
    print("\nTesting progressive timeouts...")
    
    try:
        from utils.progressive_timeout import get_timeout_handler, ComplexityLevel, AnalysisContext
        
        timeout_handler = get_timeout_handler()
        
        # Test timeout calculation
        context = AnalysisContext(
            analysis_type="comprehensive",
            complexity_level=ComplexityLevel.HIGH,
            estimated_data_size_mb=50.0,
            bucket_count=0,  # Not applicable for CloudWatch
            region="us-east-1",
            include_cost_analysis=True,
            lookback_days=30
        )
        
        timeout_result = timeout_handler.calculate_timeout(context)
        assert timeout_result.final_timeout > 0, "Invalid timeout calculated"
        assert len(timeout_result.reasoning) > 0, "No reasoning provided"
        
        # Test execution time recording
        timeout_handler.record_execution_time("comprehensive", 45.5, ComplexityLevel.HIGH)
        
        # Test system load recording
        timeout_handler.record_system_load(75.0)
        
        # Get performance statistics
        stats = timeout_handler.get_performance_statistics()
        assert "historical_data" in stats, "Historical data not in stats"
        assert "system_load" in stats, "System load not in stats"
        
        print("✅ Progressive timeouts working")
        
        return True
        
    except Exception as e:
        print(f"❌ Error in progressive timeouts test: {str(e)}")
        import traceback
        traceback.print_exc()
        return False

def test_cache_warming():
    """Test cache warming functionality with isolated instance."""
    print("\nTesting cache warming...")
    
    cache = None
    
    try:
        from utils.cloudwatch_cache import CloudWatchMetadataCache
        
        # Create isolated cache instance for testing
        cache = CloudWatchMetadataCache()
        
        # Test warming functions exist
        warming_functions = cache._warming_functions
        assert 'alarms_metadata' in warming_functions, "Alarms warming function not found"
        assert 'dashboards_metadata' in warming_functions, "Dashboards warming function not found"
        assert 'log_groups_metadata' in warming_functions, "Log groups warming function not found"
        assert 'metrics_metadata' in warming_functions, "Metrics warming function not found"
        
        # Test cache warming execution (these should be mocked in real tests)
        success = cache.warm_cache('alarms_metadata', region='us-east-1')
        assert success is True, "Failed to warm alarms cache"
        
        success = cache.warm_cache('dashboards_metadata', region='us-east-1')
        assert success is True, "Failed to warm dashboards cache"
        
        print("✅ Cache warming working")
        
        return True
        
    except Exception as e:
        print(f"❌ Error in cache warming test: {str(e)}")
        import traceback
        traceback.print_exc()
        return False
    
    finally:
        # Clean up isolated cache instance
        if cache and hasattr(cache, 'shutdown'):
            cache.shutdown()

def main():
    """Run all performance tests."""
    print("🚀 Starting CloudWatch Performance Tests\n")
    
    tests = [
        test_core_performance_components,
        test_cloudwatch_cache_operations,
        test_performance_monitoring,
        test_memory_management,
        test_progressive_timeouts,
        test_cache_warming
    ]
    
    passed = 0
    failed = 0
    
    try:
        for test in tests:
            try:
                if test():
                    passed += 1
                else:
                    failed += 1
            except Exception as e:
                print(f"❌ Test {test.__name__} failed with exception: {str(e)}")
                failed += 1
        
        print(f"\n📊 CloudWatch Performance Test Results:")
        print(f"✅ Passed: {passed}")
        print(f"❌ Failed: {failed}")
        print(f"📈 Success Rate: {(passed / (passed + failed) * 100):.1f}%")
        
        if failed == 0:
            print("\n🎉 All CloudWatch performance tests passed!")
            return True
        else:
            print(f"\n⚠️ {failed} CloudWatch performance test(s) failed.")
            return False
    
    except Exception as e:
        print(f"❌ Test suite failed with exception: {str(e)}")
        return False

if __name__ == "__main__":
    success = main()
    sys.exit(0 if success else 1)
```

--------------------------------------------------------------------------------
/tests/integration/test_cloudwatch_comprehensive_tool_integration.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Integration tests for CloudWatch comprehensive optimization tool.

Tests the unified comprehensive optimization tool with intelligent orchestration,
executive summary generation, and all 4 functionalities working together.
"""

import pytest
import asyncio
from unittest.mock import AsyncMock, patch, MagicMock

from playbooks.cloudwatch.optimization_orchestrator import CloudWatchOptimizationOrchestrator


@pytest.mark.integration
@pytest.mark.cloudwatch
class TestCloudWatchComprehensiveToolIntegration:
    """Test CloudWatch comprehensive optimization tool integration."""
    
    @pytest.fixture
    def comprehensive_orchestrator(self):
        """Create orchestrator for comprehensive tool testing."""
        with patch('playbooks.cloudwatch.optimization_orchestrator.ServiceOrchestrator') as mock_so, \
             patch('playbooks.cloudwatch.optimization_orchestrator.CloudWatchAnalysisEngine') as mock_ae:
            
            orchestrator = CloudWatchOptimizationOrchestrator(region='us-east-1')
            orchestrator.service_orchestrator = mock_so.return_value
            orchestrator.analysis_engine = mock_ae.return_value
            
            return orchestrator
    
    @pytest.mark.asyncio
    async def test_comprehensive_tool_basic_execution(self, comprehensive_orchestrator):
        """Test basic execution of comprehensive optimization tool."""
        # Mock the comprehensive analysis result
        expected_result = {
            "status": "success",
            "analysis_type": "comprehensive",
            "successful_analyses": 4,
            "total_analyses": 4,
            "results": {
                "general_spend": {"status": "success", "data": {"savings": 50.0}},
                "logs_optimization": {"status": "success", "data": {"savings": 30.0}},
                "metrics_optimization": {"status": "success", "data": {"savings": 25.0}},
                "alarms_and_dashboards": {"status": "success", "data": {"savings": 20.0}}
            },
            "orchestrator_metadata": {
                "session_id": "test_session",
                "region": "us-east-1",
                "total_orchestration_time": 2.5,
                "performance_optimizations": {
                    "intelligent_timeout": 120.0,
                    "cache_enabled": True,
                    "memory_management": True,
                    "performance_monitoring": True
                }
            }
        }
        
        # Mock the analysis engine as an async function
        comprehensive_orchestrator.analysis_engine.run_comprehensive_analysis = AsyncMock(return_value=expected_result)
        
        # Execute comprehensive analysis
        result = await comprehensive_orchestrator.execute_comprehensive_analysis(
            region="us-east-1",
            lookback_days=30,
            allow_cost_explorer=False,
            allow_aws_config=False,
            allow_cloudtrail=False,
            allow_minimal_cost_metrics=False
        )
        
        # Verify results
        assert result["status"] == "success"
        assert "orchestrator_metadata" in result
        assert result["orchestrator_metadata"]["region"] == "us-east-1"
        assert result["orchestrator_metadata"]["orchestrator_version"] == "1.0.0"
        
        # Verify the analysis engine was called
        comprehensive_orchestrator.analysis_engine.run_comprehensive_analysis.assert_called_once()
    
    @pytest.mark.asyncio
    async def test_comprehensive_tool_with_cost_constraints(self, comprehensive_orchestrator):
        """Test comprehensive tool respects cost constraints."""
        # Mock result with cost constraints applied
        expected_result = {
            "status": "success",
            "analysis_type": "comprehensive",
            "successful_analyses": 4,
            "total_analyses": 4,
            "results": {
                "general_spend": {"status": "success", "cost_incurred": False},
                "logs_optimization": {"status": "success", "cost_incurred": False},
                "metrics_optimization": {"status": "success", "cost_incurred": False},
                "alarms_and_dashboards": {"status": "success", "cost_incurred": False}
            },
            "orchestrator_metadata": {
                "cost_preferences": {
                    "allow_cost_explorer": False,
                    "allow_aws_config": False,
                    "allow_cloudtrail": False,
                    "allow_minimal_cost_metrics": False
                }
            }
        }
        
        comprehensive_orchestrator.analysis_engine.run_comprehensive_analysis = AsyncMock(return_value=expected_result)
        
        # Execute with strict cost constraints
        result = await comprehensive_orchestrator.execute_comprehensive_analysis(
            allow_cost_explorer=False,
            allow_aws_config=False,
            allow_cloudtrail=False,
            allow_minimal_cost_metrics=False
        )
        
        # Verify cost constraints were respected
        assert result["status"] == "success"
        assert "cost_preferences" in result["orchestrator_metadata"]
        cost_prefs = result["orchestrator_metadata"]["cost_preferences"]
        assert cost_prefs["allow_cost_explorer"] is False
        assert cost_prefs["allow_aws_config"] is False
        assert cost_prefs["allow_cloudtrail"] is False
        assert cost_prefs["allow_minimal_cost_metrics"] is False
    
    @pytest.mark.asyncio
    async def test_comprehensive_tool_error_handling(self, comprehensive_orchestrator):
        """Test comprehensive tool handles errors gracefully."""
        # Mock a partial failure scenario
        expected_result = {
            "status": "partial",
            "analysis_type": "comprehensive",
            "successful_analyses": 3,
            "total_analyses": 4,
            "results": {
                "general_spend": {"status": "success", "data": {"savings": 50.0}},
                "logs_optimization": {"status": "error", "error_message": "Simulated failure"},
                "metrics_optimization": {"status": "success", "data": {"savings": 25.0}},
                "alarms_and_dashboards": {"status": "success", "data": {"savings": 20.0}}
            }
        }
        
        comprehensive_orchestrator.analysis_engine.run_comprehensive_analysis = AsyncMock(return_value=expected_result)
        
        # Execute comprehensive analysis
        result = await comprehensive_orchestrator.execute_comprehensive_analysis()
        
        # Verify partial success is handled correctly
        assert result["status"] == "partial"
        # The orchestrator adds its own metadata, so we check the engine result was used
        assert "orchestrator_metadata" in result
    
    @pytest.mark.asyncio
    async def test_comprehensive_tool_timeout_handling(self, comprehensive_orchestrator):
        """Test comprehensive tool handles timeouts properly."""
        # Mock a timeout scenario
        async def mock_timeout_analysis(**kwargs):
            # Simulate a timeout by raising an exception
            raise Exception("Analysis comprehensive timed out after 60.0 seconds")
        
        comprehensive_orchestrator.analysis_engine.run_comprehensive_analysis.side_effect = mock_timeout_analysis
        
        # Execute with a short timeout
        result = await comprehensive_orchestrator.execute_comprehensive_analysis(
            timeout_seconds=1.0
        )
        
        # Verify timeout is handled gracefully
        assert result["status"] == "error"
        assert "timed out" in result["error_message"].lower()
    
    @pytest.mark.asyncio
    async def test_comprehensive_tool_caching_behavior(self, comprehensive_orchestrator):
        """Test comprehensive tool caching behavior."""
        # Mock successful result
        expected_result = {
            "status": "success",
            "analysis_type": "comprehensive",
            "from_cache": False,
            "successful_analyses": 4,
            "total_analyses": 4
        }
        
        comprehensive_orchestrator.analysis_engine.run_comprehensive_analysis = AsyncMock(return_value=expected_result)
        
        # First execution - should not be from cache
        result1 = await comprehensive_orchestrator.execute_comprehensive_analysis(
            region="us-east-1",
            lookback_days=30
        )
        
        assert result1["status"] == "success"
        # Note: The orchestrator adds its own metadata, so we check the engine result
        
        # Second execution - should call the same mock again
        result2 = await comprehensive_orchestrator.execute_comprehensive_analysis(
            region="us-east-1",
            lookback_days=30
        )
        
        assert result2["status"] == "success"
        # Verify both calls were made (caching is handled at lower levels)
        assert comprehensive_orchestrator.analysis_engine.run_comprehensive_analysis.call_count == 2


@pytest.mark.integration
@pytest.mark.cloudwatch
class TestCloudWatchOrchestratorRealIntegration:
    """Test CloudWatch orchestrator with more realistic scenarios."""
    
    @pytest.fixture
    def real_orchestrator(self):
        """Create orchestrator with minimal mocking for realistic testing."""
        with patch('playbooks.cloudwatch.optimization_orchestrator.ServiceOrchestrator') as mock_so:
            # Mock the service orchestrator but let other components work
            mock_so.return_value.session_id = "test_session_123"
            mock_so.return_value.get_stored_tables.return_value = ["test_table"]
            
            orchestrator = CloudWatchOptimizationOrchestrator(region='us-west-2')
            return orchestrator
    
    def test_orchestrator_initialization(self, real_orchestrator):
        """Test orchestrator initializes correctly with all components."""
        assert real_orchestrator.region == 'us-west-2'
        assert real_orchestrator.session_id == "test_session_123"
        assert real_orchestrator.analysis_engine is not None
        assert real_orchestrator.cost_controller is not None
        assert real_orchestrator.aggregation_queries is not None
    
    def test_cost_preferences_validation(self, real_orchestrator):
        """Test cost preferences validation works correctly."""
        # Test valid preferences
        result = real_orchestrator.validate_cost_preferences(
            allow_cost_explorer=True,
            allow_aws_config=False,
            allow_cloudtrail=True,
            allow_minimal_cost_metrics=False
        )
        
        assert result["valid"] is True
        assert "validated_preferences" in result
        assert result["validated_preferences"]["allow_cost_explorer"] is True
        assert result["validated_preferences"]["allow_cloudtrail"] is True
        
        # Test invalid preferences (non-boolean values)
        result = real_orchestrator.validate_cost_preferences(
            allow_cost_explorer="invalid",
            allow_aws_config=123
        )
        
        assert result["valid"] is False
        assert len(result["errors"]) > 0
    
    def test_cost_estimation(self, real_orchestrator):
        """Test cost estimation functionality."""
        # Test with no paid features
        result = real_orchestrator.get_cost_estimate(
            allow_cost_explorer=False,
            allow_aws_config=False,
            allow_cloudtrail=False,
            allow_minimal_cost_metrics=False
        )
        
        assert "cost_estimate" in result
        assert result["cost_estimate"]["total_estimated_cost"] == 0.0
        
        # Test with paid features
        result = real_orchestrator.get_cost_estimate(
            allow_cost_explorer=True,
            allow_aws_config=True,
            lookback_days=30
        )
        
        assert "cost_estimate" in result
        # Should have some estimated cost for paid features
        assert result["cost_estimate"]["total_estimated_cost"] >= 0.0
    
    def test_stored_tables_access(self, real_orchestrator):
        """Test access to stored tables."""
        tables = real_orchestrator.get_stored_tables()
        assert isinstance(tables, list)
        assert "test_table" in tables
    
    def test_analysis_results_query(self, real_orchestrator):
        """Test querying analysis results."""
        # Mock the service orchestrator query execution
        with patch.object(real_orchestrator.service_orchestrator, 'query_session_data') as mock_query:
            mock_query.return_value = [
                {"analysis_type": "general_spend", "status": "success"},
                {"analysis_type": "logs_optimization", "status": "success"}
            ]
            
            results = real_orchestrator.get_analysis_results("SELECT * FROM analysis_results")
            
            assert len(results) == 2
            assert results[0]["analysis_type"] == "general_spend"
            assert results[1]["analysis_type"] == "logs_optimization"
            
            # Verify cost control info was added
            for result in results:
                assert "cost_control_info" in result
                assert "current_preferences" in result["cost_control_info"]
```

--------------------------------------------------------------------------------
/services/pricing.py:
--------------------------------------------------------------------------------

```python
"""
AWS Pricing service module using AWS Price List API and MCP server.

This module provides functions for getting AWS pricing information.
"""

import logging
import boto3
import json
from typing import Dict, Optional, Any, List
from botocore.exceptions import ClientError

logger = logging.getLogger(__name__)

def _get_all_aws_regions() -> Dict[str, str]:
    """Get comprehensive mapping of AWS regions to location names."""
    return {
        'us-east-1': 'US East (N. Virginia)', 'us-east-2': 'US East (Ohio)',
        'us-west-1': 'US West (N. California)', 'us-west-2': 'US West (Oregon)',
        'eu-central-1': 'Europe (Frankfurt)', 'eu-west-1': 'Europe (Ireland)',
        'eu-west-2': 'Europe (London)', 'eu-west-3': 'Europe (Paris)',
        'eu-north-1': 'Europe (Stockholm)', 'eu-south-1': 'Europe (Milan)',
        'ap-northeast-1': 'Asia Pacific (Tokyo)', 'ap-northeast-2': 'Asia Pacific (Seoul)',
        'ap-northeast-3': 'Asia Pacific (Osaka)', 'ap-southeast-1': 'Asia Pacific (Singapore)',
        'ap-southeast-2': 'Asia Pacific (Sydney)', 'ap-southeast-3': 'Asia Pacific (Jakarta)',
        'ap-south-1': 'Asia Pacific (Mumbai)', 'ap-east-1': 'Asia Pacific (Hong Kong)',
        'ca-central-1': 'Canada (Central)', 'sa-east-1': 'South America (Sao Paulo)',
        'me-south-1': 'Middle East (Bahrain)', 'af-south-1': 'Africa (Cape Town)',
        'us-gov-east-1': 'AWS GovCloud (US-East)', 'us-gov-west-1': 'AWS GovCloud (US-West)',
        # Local Zones
        'us-east-1-bos-1a': 'US East (Boston)', 'us-east-1-chi-1a': 'US East (Chicago)',
        'us-east-1-dfw-1a': 'US East (Dallas)', 'us-east-1-iah-1a': 'US East (Houston)',
        'us-east-1-mci-1a': 'US East (Kansas City)', 'us-east-1-mia-1a': 'US East (Miami)',
        'us-east-1-msp-1a': 'US East (Minneapolis)', 'us-east-1-nyc-1a': 'US East (New York)',
        'us-east-1-phl-1a': 'US East (Philadelphia)', 'us-west-2-den-1a': 'US West (Denver)',
        'us-west-2-las-1a': 'US West (Las Vegas)', 'us-west-2-lax-1a': 'US West (Los Angeles)',
        'us-west-2-phx-1a': 'US West (Phoenix)', 'us-west-2-pdx-1a': 'US West (Portland)',
        'us-west-2-sea-1a': 'US West (Seattle)', 'eu-west-1-lhr-1a': 'Europe (London)',
        'ap-northeast-1-nrt-1a': 'Asia Pacific (Tokyo)', 'ap-southeast-1-sin-1a': 'Asia Pacific (Singapore)',
        # Wavelength Zones
        'us-east-1-wl1-bos-wlz-1': 'US East (Boston Wavelength)', 'us-east-1-wl1-chi-wlz-1': 'US East (Chicago Wavelength)',
        'us-east-1-wl1-dfw-wlz-1': 'US East (Dallas Wavelength)', 'us-east-1-wl1-mia-wlz-1': 'US East (Miami Wavelength)',
        'us-east-1-wl1-nyc-wlz-1': 'US East (New York Wavelength)', 'us-west-2-wl1-den-wlz-1': 'US West (Denver Wavelength)',
        'us-west-2-wl1-las-wlz-1': 'US West (Las Vegas Wavelength)', 'us-west-2-wl1-lax-wlz-1': 'US West (Los Angeles Wavelength)',
        'us-west-2-wl1-phx-wlz-1': 'US West (Phoenix Wavelength)', 'us-west-2-wl1-sea-wlz-1': 'US West (Seattle Wavelength)',
        'eu-west-1-wl1-lhr-wlz-1': 'Europe (London Wavelength)', 'ap-northeast-1-wl1-nrt-wlz-1': 'Asia Pacific (Tokyo Wavelength)',
        'ap-southeast-1-wl1-sin-wlz-1': 'Asia Pacific (Singapore Wavelength)', 'ap-southeast-2-wl1-syd-wlz-1': 'Asia Pacific (Sydney Wavelength)'
    }

def get_ec2_pricing(
    instance_type: str,
    region: str = 'us-east-1'
) -> Dict[str, Any]:
    """Get EC2 instance pricing from AWS Price List API."""
    try:
        pricing_client = boto3.client('pricing', region_name='us-east-1')
        region_map = _get_all_aws_regions()
        location = region_map.get(region, 'US East (N. Virginia)')
        
        response = pricing_client.get_products(
            ServiceCode='AmazonEC2',
            Filters=[
                {'Type': 'TERM_MATCH', 'Field': 'instanceType', 'Value': instance_type},
                {'Type': 'TERM_MATCH', 'Field': 'operatingSystem', 'Value': 'Linux'},
                {'Type': 'TERM_MATCH', 'Field': 'location', 'Value': location},
                {'Type': 'TERM_MATCH', 'Field': 'tenancy', 'Value': 'Shared'},
                {'Type': 'TERM_MATCH', 'Field': 'preInstalledSw', 'Value': 'NA'}
            ]
        )
        
        if response['PriceList']:
            price_data = json.loads(response['PriceList'][0])
            terms = price_data['terms']['OnDemand']
            term_key = list(terms.keys())[0]
            price_dimensions = terms[term_key]['priceDimensions']
            dimension_key = list(price_dimensions.keys())[0]
            hourly_price = float(price_dimensions[dimension_key]['pricePerUnit']['USD'])
            
            return {
                'status': 'success',
                'instance_type': instance_type,
                'region': region,
                'hourly_price': hourly_price,
                'monthly_price': hourly_price * 730,
                'source': 'aws_price_list_api'
            }
        
        return {'status': 'error', 'message': 'No pricing found', 'hourly_price': 0.1}
        
    except Exception as e:
        logger.error(f"Error getting EC2 pricing: {str(e)}")
        return {'status': 'error', 'message': str(e), 'hourly_price': 0.1}

def get_ebs_pricing(
    volume_type: str,
    volume_size: int,
    region: str = 'us-east-1'
) -> Dict[str, Any]:
    """Get EBS volume pricing from AWS Price List API."""
    try:
        pricing_client = boto3.client('pricing', region_name='us-east-1')
        region_map = _get_all_aws_regions()
        location = region_map.get(region, 'US East (N. Virginia)')
        
        response = pricing_client.get_products(
            ServiceCode='AmazonEC2',
            Filters=[
                {'Type': 'TERM_MATCH', 'Field': 'productFamily', 'Value': 'Storage'},
                {'Type': 'TERM_MATCH', 'Field': 'volumeType', 'Value': volume_type.upper()},
                {'Type': 'TERM_MATCH', 'Field': 'location', 'Value': location}
            ]
        )
        
        if response['PriceList']:
            price_data = json.loads(response['PriceList'][0])
            terms = price_data['terms']['OnDemand']
            term_key = list(terms.keys())[0]
            price_dimensions = terms[term_key]['priceDimensions']
            dimension_key = list(price_dimensions.keys())[0]
            gb_price = float(price_dimensions[dimension_key]['pricePerUnit']['USD'])
            
            monthly_price = gb_price * volume_size
            hourly_price = monthly_price / 730
            
            return {
                'status': 'success',
                'volume_type': volume_type,
                'volume_size': volume_size,
                'region': region,
                'hourly_price': hourly_price,
                'monthly_price': monthly_price,
                'price_per_gb_month': gb_price,
                'source': 'aws_price_list_api'
            }
        
        return {'status': 'error', 'message': 'No pricing found', 'hourly_price': 0.01}
        
    except Exception as e:
        logger.error(f"Error getting EBS pricing: {str(e)}")
        return {'status': 'error', 'message': str(e), 'hourly_price': 0.01}

def get_rds_pricing(
    instance_class: str,
    engine: str = 'mysql',
    region: str = 'us-east-1'
) -> Dict[str, Any]:
    """Get RDS instance pricing from AWS Price List API."""
    try:
        pricing_client = boto3.client('pricing', region_name='us-east-1')
        region_map = _get_all_aws_regions()
        location = region_map.get(region, 'US East (N. Virginia)')
        
        response = pricing_client.get_products(
            ServiceCode='AmazonRDS',
            Filters=[
                {'Type': 'TERM_MATCH', 'Field': 'instanceType', 'Value': instance_class},
                {'Type': 'TERM_MATCH', 'Field': 'databaseEngine', 'Value': engine.title()},
                {'Type': 'TERM_MATCH', 'Field': 'location', 'Value': location},
                {'Type': 'TERM_MATCH', 'Field': 'deploymentOption', 'Value': 'Single-AZ'}
            ]
        )
        
        if response['PriceList']:
            price_data = json.loads(response['PriceList'][0])
            terms = price_data['terms']['OnDemand']
            term_key = list(terms.keys())[0]
            price_dimensions = terms[term_key]['priceDimensions']
            dimension_key = list(price_dimensions.keys())[0]
            hourly_price = float(price_dimensions[dimension_key]['pricePerUnit']['USD'])
            
            return {
                'status': 'success',
                'instance_class': instance_class,
                'engine': engine,
                'region': region,
                'hourly_price': hourly_price,
                'monthly_price': hourly_price * 730,
                'source': 'aws_price_list_api'
            }
        
        return {'status': 'error', 'message': 'No pricing found', 'hourly_price': 0.1}
        
    except Exception as e:
        logger.error(f"Error getting RDS pricing: {str(e)}")
        return {'status': 'error', 'message': str(e), 'hourly_price': 0.1}

def get_lambda_pricing(
    memory_size: int,
    region: str = 'us-east-1'
) -> Dict[str, Any]:
    """Get Lambda function pricing from AWS Price List API."""
    try:
        pricing_client = boto3.client('pricing', region_name='us-east-1')
        region_map = _get_all_aws_regions()
        location = region_map.get(region, 'US East (N. Virginia)')
        
        response = pricing_client.get_products(
            ServiceCode='AWSLambda',
            Filters=[
                {'Type': 'TERM_MATCH', 'Field': 'location', 'Value': location}
            ]
        )
        
        if response['PriceList']:
            gb_seconds_price = 0.0000166667
            requests_price = 0.0000002
            memory_gb = memory_size / 1024
            
            return {
                'status': 'success',
                'memory_size_mb': memory_size,
                'memory_size_gb': memory_gb,
                'region': region,
                'price_per_gb_second': gb_seconds_price,
                'price_per_request': requests_price,
                'source': 'aws_price_list_api'
            }
        
        return {'status': 'error', 'message': 'No pricing found', 'price_per_gb_second': 0.0000166667}
        
    except Exception as e:
        logger.error(f"Error getting Lambda pricing: {str(e)}")
        return {'status': 'error', 'message': str(e), 'price_per_gb_second': 0.0000166667}

def get_all_regions() -> List[str]:
    """Get list of all supported AWS regions."""
    return list(_get_all_aws_regions().keys())

def get_local_zones() -> List[str]:
    """Get list of AWS Local Zones."""
    return [region for region in _get_all_aws_regions().keys() if '-1a' in region]

def get_wavelength_zones() -> List[str]:
    """Get list of AWS Wavelength Zones."""
    return [region for region in _get_all_aws_regions().keys() if '-wlz-' in region]

def get_standard_regions() -> List[str]:
    """Get list of standard AWS regions (excluding Local and Wavelength Zones)."""
    return [region for region in _get_all_aws_regions().keys() if '-1a' not in region and '-wlz-' not in region]

def is_local_zone(region: str) -> bool:
    """Check if region is a Local Zone."""
    return '-1a' in region

def is_wavelength_zone(region: str) -> bool:
    """Check if region is a Wavelength Zone."""
    return '-wlz-' in region

def get_zone_type(region: str) -> str:
    """Get zone type: standard, local, or wavelength."""
    if is_wavelength_zone(region):
        return 'wavelength'
    elif is_local_zone(region):
        return 'local'
    else:
        return 'standard'

def get_pricing_for_all_regions(service_function, *args, **kwargs) -> Dict[str, Any]:
    """Get pricing across all AWS regions, Local Zones, and Wavelength Zones."""
    results = {}
    errors = {}
    
    for region in get_all_regions():
        try:
            result = service_function(*args, region=region, **kwargs)
            if result.get('status') == 'success':
                results[region] = result
            else:
                # Store error results separately
                errors[region] = {
                    'error_message': result.get('message', 'Unknown error'),
                    'region': region
                }
                logger.warning(f"Failed to get pricing for region {region}: {result.get('message', 'Unknown error')}")
        except Exception as e:
            # Log and track exceptions
            error_message = str(e)
            errors[region] = {
                'error_message': error_message,
                'region': region,
                'exception_type': type(e).__name__
            }
            logger.warning(f"Exception while getting pricing for region {region}: {error_message}")
    
    # Calculate success rate
    total_regions = len(get_all_regions())
    success_count = len(results)
    success_rate = (success_count / total_regions) * 100 if total_regions > 0 else 0
    
    return {
        'status': 'success',
        'total_regions': total_regions,
        'regions_with_pricing': success_count,
        'success_rate': f"{success_rate:.1f}%",
        'standard_regions_analyzed': len([r for r in results.keys() if get_zone_type(r) == 'standard']),
        'local_zones_analyzed': len([r for r in results.keys() if get_zone_type(r) == 'local']),
        'wavelength_zones_analyzed': len([r for r in results.keys() if get_zone_type(r) == 'wavelength']),
        'pricing_by_region': results,
        'errors_by_region': errors if errors else None
    }
```

--------------------------------------------------------------------------------
/tests/unit/services/test_cost_control_routing.py:
--------------------------------------------------------------------------------

```python
"""
Test cost control and execution path routing functionality.

This test demonstrates the consent-based routing logic and cost transparency features
implemented in task 11.
"""

import pytest
import asyncio
from unittest.mock import Mock, patch
from datetime import datetime

from playbooks.cloudwatch.cost_controller import CostController, CostPreferences
from services.cloudwatch_service import CloudWatchService, CloudWatchServiceConfig
from playbooks.cloudwatch.optimization_orchestrator import CloudWatchOptimizationOrchestrator


class TestCostControlRouting:
    """Test cost control and consent-based routing functionality."""
    
    def setup_method(self):
        """Set up test fixtures."""
        self.cost_controller = CostController()
        
        # Mock boto3 to avoid real AWS calls
        with patch('services.cloudwatch_service.boto3'):
            # Create service with cost tracking enabled
            config = CloudWatchServiceConfig(enable_cost_tracking=True)
            self.cloudwatch_service = CloudWatchService(config=config)
        
        # Create orchestrator
        with patch('playbooks.cloudwatch.optimization_orchestrator.ServiceOrchestrator'), \
             patch('playbooks.cloudwatch.optimization_orchestrator.CloudWatchAnalysisEngine'):
            self.orchestrator = CloudWatchOptimizationOrchestrator(region='us-east-1')
    
    def test_cost_preferences_validation(self):
        """Test cost preferences validation and sanitization."""
        # Test with valid preferences
        preferences_dict = {
            'allow_cost_explorer': True,
            'allow_aws_config': False,
            'allow_cloudtrail': 'true',  # String that should be converted
            'allow_minimal_cost_metrics': 1  # Integer that should be converted
        }
        
        validated = self.cost_controller.validate_and_sanitize_preferences(preferences_dict)
        
        assert validated.allow_cost_explorer is True
        assert validated.allow_aws_config is False
        assert validated.allow_cloudtrail is True
        assert validated.allow_minimal_cost_metrics is True
    
    def test_functionality_coverage_calculation(self):
        """Test functionality coverage calculation based on enabled features."""
        # Test with no paid features enabled (free tier only)
        free_only_prefs = CostPreferences()
        coverage = self.cost_controller.get_functionality_coverage(free_only_prefs)
        
        assert coverage['overall_coverage'] == 60.0  # Free operations only
        assert coverage['free_tier_coverage'] == 60.0
        assert coverage['by_category']['cost_explorer'] == 0.0
        
        # Test with all features enabled
        all_enabled_prefs = CostPreferences(
            allow_cost_explorer=True,
            allow_aws_config=True,
            allow_cloudtrail=True,
            allow_minimal_cost_metrics=True
        )
        full_coverage = self.cost_controller.get_functionality_coverage(all_enabled_prefs)
        
        assert full_coverage['overall_coverage'] == 100.0
        assert full_coverage['by_category']['cost_explorer'] == 30.0
        assert full_coverage['by_category']['aws_config'] == 5.0
    
    def test_execution_path_routing_configuration(self):
        """Test execution path routing based on consent preferences."""
        # Test with free tier only
        free_prefs = CostPreferences()
        routing = self.cost_controller.get_execution_path_routing(free_prefs)
        
        assert routing['general_spend_analysis']['primary_path'] == 'free_apis'
        assert 'cloudwatch_config_apis' in routing['general_spend_analysis']['data_sources']
        assert 'cost_explorer' not in routing['general_spend_analysis']['data_sources']
        
        # Test with cost explorer enabled
        paid_prefs = CostPreferences(allow_cost_explorer=True)
        paid_routing = self.cost_controller.get_execution_path_routing(paid_prefs)
        
        assert paid_routing['general_spend_analysis']['primary_path'] == 'cost_explorer'
        assert 'cost_explorer' in paid_routing['general_spend_analysis']['data_sources']
    
    def test_cost_tracking_context(self):
        """Test cost tracking context creation and operation tracking."""
        prefs = CostPreferences(allow_cost_explorer=True)
        context = self.cost_controller.create_cost_tracking_context(prefs)
        
        assert context['preferences'] == prefs.__dict__
        assert context['cost_incurring_operations'] == []
        assert context['operation_count'] == 0
        
        # Track a free operation
        self.cost_controller.track_operation_execution(
            context, 'list_metrics', 'free', routing_decision='Free operation - always allowed'
        )
        
        assert context['operation_count'] == 1
        assert len(context['free_operations']) == 1
        assert context['free_operations'][0]['operation'] == 'list_metrics'
        
        # Track a paid operation
        self.cost_controller.track_operation_execution(
            context, 'cost_explorer_analysis', 'paid', cost_incurred=0.01,
            routing_decision='Paid operation allowed by allow_cost_explorer'
        )
        
        assert context['operation_count'] == 2
        assert len(context['cost_incurring_operations']) == 1
        assert context['actual_cost_incurred'] == 0.01
    
    def test_cost_transparency_report_generation(self):
        """Test comprehensive cost transparency report generation."""
        prefs = CostPreferences(allow_cost_explorer=True)
        context = self.cost_controller.create_cost_tracking_context(prefs)
        
        # Simulate some operations
        self.cost_controller.track_operation_execution(
            context, 'list_metrics', 'free', routing_decision='Free operation'
        )
        self.cost_controller.track_operation_execution(
            context, 'cost_explorer_analysis', 'paid', cost_incurred=0.01,
            routing_decision='Paid operation consented'
        )
        self.cost_controller.track_operation_execution(
            context, 'logs_insights_queries', 'blocked',
            routing_decision='Blocked: Forbidden operation'
        )
        
        report = self.cost_controller.generate_cost_transparency_report(context)
        
        assert report['session_summary']['total_operations'] == 3
        assert report['session_summary']['free_operations_count'] == 1
        assert report['session_summary']['paid_operations_count'] == 1
        assert report['session_summary']['blocked_operations_count'] == 1
        
        assert report['cost_summary']['total_actual_cost'] == 0.01
        assert 'cost_explorer_analysis' in report['cost_summary']['cost_by_operation']
        
        assert report['execution_paths']['consent_based_routing'] is True
        assert len(report['transparency_details']['routing_decisions']) == 3
    
    @pytest.mark.skip(reason="execute_with_consent_routing method not yet implemented on CloudWatchService")
    @pytest.mark.asyncio
    async def test_consent_based_routing_execution(self):
        """Test actual consent-based routing execution."""
        # Mock the CloudWatch service methods to avoid actual AWS calls
        with patch.object(self.cloudwatch_service, 'cloudwatch_client') as mock_client:
            mock_client.list_metrics.return_value = {'Metrics': []}
            
            # Test routing with consent given (should execute primary operation)
            self.cloudwatch_service.update_cost_preferences(
                CostPreferences(allow_cost_explorer=True)
            )
            
            result = await self.cloudwatch_service.execute_with_consent_routing(
                primary_operation='cost_explorer_analysis',
                fallback_operation='list_metrics',
                operation_params={}
            )
            
            assert result.success is True
            assert result.data['routing'] == 'primary'
            assert result.cost_incurred is True
            
            # Test routing without consent (should execute fallback operation)
            self.cloudwatch_service.update_cost_preferences(
                CostPreferences(allow_cost_explorer=False)
            )
            
            fallback_result = await self.cloudwatch_service.execute_with_consent_routing(
                primary_operation='cost_explorer_analysis',
                fallback_operation='list_metrics',
                operation_params={}
            )
            
            assert fallback_result.success is True
            assert fallback_result.data['routing'] == 'fallback'
            assert fallback_result.cost_incurred is False
            assert fallback_result.fallback_used is True
    
    def test_orchestrator_cost_validation(self):
        """Test orchestrator cost preference validation."""
        # Test validation with mixed preferences
        validation_result = self.orchestrator.validate_cost_preferences(
            allow_cost_explorer=True,
            allow_aws_config=False,
            allow_cloudtrail='true',
            allow_minimal_cost_metrics=0
        )
        
        assert validation_result['validation_status'] == 'success'
        assert validation_result['validated_preferences']['allow_cost_explorer'] is True
        assert validation_result['validated_preferences']['allow_cloudtrail'] is True
        assert validation_result['validated_preferences']['allow_minimal_cost_metrics'] is False
        
        # Check functionality coverage
        coverage = validation_result['functionality_coverage']
        assert coverage['overall_coverage'] > 60.0  # More than free tier
        
        # Check cost estimate
        cost_estimate = validation_result['cost_estimate']
        assert cost_estimate['total_estimated_cost'] > 0.0
        assert len(cost_estimate['enabled_operations']) > 4  # Free + some paid
    
    def test_cost_estimate_generation(self):
        """Test detailed cost estimation."""
        estimate_result = self.orchestrator.get_cost_estimate(
            allow_cost_explorer=True,
            allow_minimal_cost_metrics=True,
            lookback_days=60,
            log_group_names=['test-log-1', 'test-log-2']
        )
        
        cost_estimate = estimate_result['cost_estimate']
        
        # Should have cost for enabled paid operations
        assert cost_estimate['total_estimated_cost'] > 0.0
        assert 'cost_explorer_analysis' in cost_estimate['enabled_operations']
        assert 'minimal_cost_metrics' in cost_estimate['enabled_operations']
        
        # Should include free operations
        assert 'list_metrics' in cost_estimate['enabled_operations']
        assert 'describe_alarms' in cost_estimate['enabled_operations']
        
        # Should have cost breakdown explanation
        assert 'cost_breakdown_explanation' in estimate_result
        assert 'free_operations' in estimate_result['cost_breakdown_explanation']


if __name__ == '__main__':
    # Run a simple demonstration
    print("=== Cost Control and Routing Demonstration ===")
    
    # Create cost controller
    controller = CostController()
    
    # Test different preference scenarios
    scenarios = [
        ("Free Tier Only", CostPreferences()),
        ("Cost Explorer Enabled", CostPreferences(allow_cost_explorer=True)),
        ("All Features Enabled", CostPreferences(
            allow_cost_explorer=True,
            allow_aws_config=True,
            allow_cloudtrail=True,
            allow_minimal_cost_metrics=True
        ))
    ]
    
    for scenario_name, prefs in scenarios:
        print(f"\n--- {scenario_name} ---")
        
        # Get functionality coverage
        coverage = controller.get_functionality_coverage(prefs)
        print(f"Overall Coverage: {coverage['overall_coverage']:.1f}%")
        
        # Get routing configuration
        routing = controller.get_execution_path_routing(prefs)
        print(f"General Spend Primary Path: {routing['general_spend_analysis']['primary_path']}")
        print(f"Data Sources: {', '.join(routing['general_spend_analysis']['data_sources'])}")
        
        # Get cost estimate
        scope = {'lookback_days': 30, 'log_group_names': ['test-log']}
        estimate = controller.estimate_cost(scope, prefs)
        print(f"Estimated Cost: ${estimate.total_estimated_cost:.4f}")
        print(f"Enabled Operations: {len(estimate.enabled_operations)}")
    
    print("\n=== Cost Tracking Demonstration ===")
    
    # Demonstrate cost tracking
    prefs = CostPreferences(allow_cost_explorer=True)
    context = controller.create_cost_tracking_context(prefs)
    
    # Simulate operations
    controller.track_operation_execution(context, 'list_metrics', 'free')
    controller.track_operation_execution(context, 'cost_explorer_analysis', 'paid', 0.01)
    controller.track_operation_execution(context, 'logs_insights_queries', 'blocked')
    
    # Generate report
    report = controller.generate_cost_transparency_report(context)
    
    print(f"Total Operations: {report['session_summary']['total_operations']}")
    print(f"Free Operations: {report['session_summary']['free_operations_count']}")
    print(f"Paid Operations: {report['session_summary']['paid_operations_count']}")
    print(f"Blocked Operations: {report['session_summary']['blocked_operations_count']}")
    print(f"Total Cost: ${report['cost_summary']['total_actual_cost']:.4f}")
    print(f"Consent-based Routing: {report['execution_paths']['consent_based_routing']}")
    
    print("\n=== Task 11 Implementation Complete ===")
    print("✅ Consent-based routing logic implemented")
    print("✅ Cost tracking and logging for transparency")
    print("✅ Runtime checks for routing to free-only paths")
    print("✅ Cost reporting features showing charges")
    print("✅ Graceful degradation to free APIs")
```

--------------------------------------------------------------------------------
/utils/session_manager.py:
--------------------------------------------------------------------------------

```python
"""
Session SQL Manager for CFM Tips MCP Server

Provides centralized session management with SQL storage and automatic cleanup.
"""

import sqlite3
import json
import logging
import threading
import time
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional, Union
from contextlib import contextmanager
import os
import tempfile

logger = logging.getLogger(__name__)

class SessionManager:
    """Manages SQL sessions with automatic cleanup and thread safety."""
    
    def __init__(self, session_timeout_minutes: int = 60):
        self.session_timeout_minutes = session_timeout_minutes
        self.active_sessions: Dict[str, Dict[str, Any]] = {}
        self._lock = threading.RLock()
        self._cleanup_thread = None
        self._shutdown = False
        self._use_memory_only = False
        
        # Determine storage mode - try persistent first, then temp, then memory-only
        self.sessions_dir = None
        
        # Try persistent sessions directory first
        try:
            sessions_dir = "sessions"
            if os.path.exists(sessions_dir):
                # Test write permissions on existing directory
                test_file = os.path.join(sessions_dir, ".write_test")
                with open(test_file, 'w') as f:
                    f.write("test")
                os.remove(test_file)
                self.sessions_dir = sessions_dir
                logger.info(f"Using existing sessions directory: {self.sessions_dir}")
            else:
                # Try to create the directory
                os.makedirs(sessions_dir, exist_ok=True)
                # Test write permissions
                test_file = os.path.join(sessions_dir, ".write_test")
                with open(test_file, 'w') as f:
                    f.write("test")
                os.remove(test_file)
                self.sessions_dir = sessions_dir
                logger.info(f"Created sessions directory: {self.sessions_dir}")
        except Exception:
            # Try temporary directory
            try:
                self.sessions_dir = tempfile.mkdtemp(prefix="cfm_sessions_")
                logger.info(f"Using temporary sessions directory: {self.sessions_dir}")
            except Exception:
                # Fall back to memory-only mode
                logger.info("Using memory-only session storage")
                self._use_memory_only = True
                self.sessions_dir = None
        
        # Start cleanup thread
        self._start_cleanup_thread()
    
    def _start_cleanup_thread(self):
        """Start the background cleanup thread."""
        if self._cleanup_thread is None or not self._cleanup_thread.is_alive():
            self._cleanup_thread = threading.Thread(
                target=self._cleanup_worker,
                daemon=True,
                name="SessionCleanup"
            )
            self._cleanup_thread.start()
            logger.info("Session cleanup thread started")
    
    def _cleanup_worker(self):
        """Background worker for session cleanup."""
        while not self._shutdown:
            try:
                self._cleanup_expired_sessions()
                time.sleep(300)  # Check every 5 minutes
            except Exception as e:
                logger.error(f"Error in session cleanup: {e}")
                time.sleep(60)  # Wait 1 minute on error
    
    def _cleanup_expired_sessions(self):
        """Clean up expired sessions."""
        cutoff_time = datetime.now() - timedelta(minutes=self.session_timeout_minutes)
        
        with self._lock:
            expired_sessions = []
            
            for session_id, session_info in self.active_sessions.items():
                if session_info['last_accessed'] < cutoff_time:
                    expired_sessions.append(session_id)
            
            for session_id in expired_sessions:
                try:
                    self._close_session(session_id)
                    logger.info(f"Cleaned up expired session: {session_id}")
                except Exception as e:
                    logger.error(f"Error cleaning up session {session_id}: {e}")
    
    def create_session(self, session_id: Optional[str] = None) -> str:
        """Create a new session with SQL database or memory-only storage."""
        if session_id is None:
            session_id = f"session_{int(time.time())}_{threading.current_thread().ident}"
        
        with self._lock:
            if session_id in self.active_sessions:
                # Update last accessed time
                self.active_sessions[session_id]['last_accessed'] = datetime.now()
                return session_id
            
            try:
                if self._use_memory_only:
                    # Use in-memory SQLite database
                    conn = sqlite3.connect(":memory:", check_same_thread=False)
                    db_path = ":memory:"
                    logger.info(f"Created in-memory session: {session_id}")
                else:
                    # Create persistent session database
                    db_path = os.path.join(self.sessions_dir, f"{session_id}.db")
                    conn = sqlite3.connect(db_path, check_same_thread=False)
                    conn.execute("PRAGMA journal_mode=WAL")  # Better concurrency
                    conn.execute("PRAGMA synchronous=NORMAL")  # Better performance
                    logger.info(f"Created persistent session: {session_id}")
                
                session_info = {
                    'session_id': session_id,
                    'db_path': db_path,
                    'connection': conn,
                    'created_at': datetime.now(),
                    'last_accessed': datetime.now(),
                    'tables': set(),
                    'memory_only': self._use_memory_only
                }
                
                self.active_sessions[session_id] = session_info
                return session_id
                
            except Exception as e:
                logger.error(f"Error creating session {session_id}: {e}")
                raise
    
    @contextmanager
    def get_connection(self, session_id: str):
        """Get database connection for a session."""
        with self._lock:
            if session_id not in self.active_sessions:
                raise ValueError(f"Session {session_id} not found")
            
            session_info = self.active_sessions[session_id]
            session_info['last_accessed'] = datetime.now()
            
            try:
                yield session_info['connection']
            except Exception as e:
                logger.error(f"Database error in session {session_id}: {e}")
                raise
    
    def execute_query(self, session_id: str, query: str, params: Optional[tuple] = None) -> List[Dict[str, Any]]:
        """Execute a SQL query and return results."""
        with self.get_connection(session_id) as conn:
            cursor = conn.cursor()
            
            try:
                if params:
                    cursor.execute(query, params)
                else:
                    cursor.execute(query)
                
                # Get column names
                columns = [description[0] for description in cursor.description] if cursor.description else []
                
                # Fetch results
                rows = cursor.fetchall()
                
                # Convert to list of dictionaries
                results = []
                for row in rows:
                    results.append(dict(zip(columns, row)))
                
                conn.commit()
                return results
            except sqlite3.Error as e:
                logger.error(f"SQLite error in session {session_id}: {e}")
                logger.error(f"Query: {query}")
                if params:
                    logger.error(f"Params: {params}")
                raise
    
    def store_data(self, session_id: str, table_name: str, data: List[Dict[str, Any]], 
                   replace: bool = False) -> bool:
        """Store data in session database."""
        if not data:
            return True
        
        with self.get_connection(session_id) as conn:
            try:
                # Create table if it doesn't exist
                sample_row = data[0]
                columns = list(sample_row.keys())
                
                # Create table schema with proper escaping
                column_defs = []
                for col in columns:
                    # Escape column names to handle special characters
                    escaped_col = f'"{col}"'
                    value = sample_row[col]
                    if isinstance(value, (int, float)):
                        column_defs.append(f"{escaped_col} REAL")
                    elif isinstance(value, bool):
                        column_defs.append(f"{escaped_col} INTEGER")
                    else:
                        column_defs.append(f"{escaped_col} TEXT")
                
                # Escape table name as well
                escaped_table_name = f'"{table_name}"'
                create_sql = f"CREATE TABLE IF NOT EXISTS {escaped_table_name} ({', '.join(column_defs)})"
                conn.execute(create_sql)
                
                # Clear table if replace is True
                if replace:
                    conn.execute(f"DELETE FROM {escaped_table_name}")
                
                # Insert data with escaped column names
                escaped_columns = [f'"{col}"' for col in columns]
                placeholders = ', '.join(['?' for _ in columns])
                insert_sql = f"INSERT INTO {escaped_table_name} ({', '.join(escaped_columns)}) VALUES ({placeholders})"
                
                for row in data:
                    values = []
                    for col in columns:
                        value = row.get(col)
                        if isinstance(value, (dict, list)):
                            value = json.dumps(value)
                        values.append(value)
                    conn.execute(insert_sql, values)
                
                conn.commit()
                
                # Track table
                with self._lock:
                    if session_id in self.active_sessions:
                        self.active_sessions[session_id]['tables'].add(table_name)
                
                logger.info(f"Stored {len(data)} rows in {table_name} for session {session_id}")
                return True
                
            except Exception as e:
                logger.error(f"Error storing data in session {session_id}: {e}")
                conn.rollback()
                return False
    
    def get_session_info(self, session_id: str) -> Dict[str, Any]:
        """Get information about a session."""
        with self._lock:
            if session_id not in self.active_sessions:
                return {"error": "Session not found"}
            
            session_info = self.active_sessions[session_id].copy()
            # Remove connection object for serialization
            session_info.pop('connection', None)
            session_info['tables'] = list(session_info['tables'])
            
            # Convert datetime objects to strings
            for key in ['created_at', 'last_accessed']:
                if key in session_info and isinstance(session_info[key], datetime):
                    session_info[key] = session_info[key].isoformat()
            
            return session_info
    
    def list_sessions(self) -> List[Dict[str, Any]]:
        """List all active sessions."""
        with self._lock:
            sessions = []
            for session_id in self.active_sessions:
                sessions.append(self.get_session_info(session_id))
            return sessions
    
    def _close_session(self, session_id: str):
        """Close a session and clean up resources."""
        if session_id in self.active_sessions:
            session_info = self.active_sessions[session_id]
            
            try:
                # Close database connection
                if 'connection' in session_info:
                    session_info['connection'].close()
                
                # Remove database file (only for persistent sessions)
                if (not session_info.get('memory_only', False) and 
                    'db_path' in session_info and 
                    session_info['db_path'] != ":memory:" and
                    os.path.exists(session_info['db_path'])):
                    os.remove(session_info['db_path'])
                    
            except Exception as e:
                logger.error(f"Error closing session {session_id}: {e}")
            
            # Remove from active sessions
            del self.active_sessions[session_id]
    
    def close_session(self, session_id: str) -> bool:
        """Manually close a session."""
        with self._lock:
            if session_id not in self.active_sessions:
                return False
            
            try:
                self._close_session(session_id)
                logger.info(f"Closed session: {session_id}")
                return True
            except Exception as e:
                logger.error(f"Error closing session {session_id}: {e}")
                return False
    
    def shutdown(self):
        """Shutdown the session manager and clean up all resources."""
        logger.info("Shutting down session manager")
        self._shutdown = True
        
        with self._lock:
            # Close all active sessions
            session_ids = list(self.active_sessions.keys())
            for session_id in session_ids:
                try:
                    self._close_session(session_id)
                except Exception as e:
                    logger.error(f"Error closing session {session_id} during shutdown: {e}")
        
        # Wait for cleanup thread to finish
        if self._cleanup_thread and self._cleanup_thread.is_alive():
            self._cleanup_thread.join(timeout=5)

# Global session manager instance
_session_manager = None

def get_session_manager() -> SessionManager:
    """Get the global session manager instance."""
    global _session_manager
    if _session_manager is None:
        try:
            _session_manager = SessionManager()
        except Exception as e:
            logger.error(f"Failed to create SessionManager: {e}")
            # Create a minimal session manager that only uses memory
            _session_manager = SessionManager()
            _session_manager._use_memory_only = True
            _session_manager.sessions_dir = None
    return _session_manager
```

--------------------------------------------------------------------------------
/tests/unit/analyzers/test_cloudwatch_base_analyzer.py:
--------------------------------------------------------------------------------

```python
"""
Unit tests for CloudWatch BaseAnalyzer abstract class.

Tests the CloudWatch-specific base analyzer interface, common functionality, and abstract method enforcement.
"""

import pytest
import asyncio
from unittest.mock import Mock, AsyncMock, patch
from abc import ABC
from datetime import datetime

from playbooks.cloudwatch.base_analyzer import BaseAnalyzer


class ConcreteCloudWatchAnalyzer(BaseAnalyzer):
    """Concrete implementation of BaseAnalyzer for CloudWatch testing."""
    
    async def analyze(self, **kwargs):
        """Test implementation of analyze method."""
        return {
            'status': 'success',
            'analysis_type': 'test',
            'data': {'test': 'data'},
            'cost_incurred': False,
            'cost_incurring_operations': []
        }
    
    def get_recommendations(self, analysis_results):
        """Test implementation of get_recommendations method."""
        return [
            {
                'type': 'test_recommendation',
                'priority': 'medium',
                'title': 'Test Recommendation',
                'description': 'Test description'
            }
        ]


@pytest.mark.unit
@pytest.mark.cloudwatch
class TestCloudWatchBaseAnalyzer:
    """Test cases for CloudWatch BaseAnalyzer abstract class."""
    
    @pytest.fixture
    def mock_services(self):
        """Create mock services for testing."""
        return {
            'cost_explorer_service': Mock(),
            'config_service': Mock(),
            'metrics_service': Mock(),
            'cloudwatch_service': Mock(),
            'pricing_service': Mock(),
            'performance_monitor': Mock(),
            'memory_manager': Mock()
        }
    
    @pytest.fixture
    def concrete_analyzer(self, mock_services):
        """Create concrete analyzer instance for testing."""
        return ConcreteCloudWatchAnalyzer(**mock_services)
    
    def test_initialization(self, concrete_analyzer, mock_services):
        """Test BaseAnalyzer initialization."""
        assert concrete_analyzer.cost_explorer_service == mock_services['cost_explorer_service']
        assert concrete_analyzer.config_service == mock_services['config_service']
        assert concrete_analyzer.metrics_service == mock_services['metrics_service']
        assert concrete_analyzer.cloudwatch_service == mock_services['cloudwatch_service']
        assert concrete_analyzer.pricing_service == mock_services['pricing_service']
        assert concrete_analyzer.performance_monitor == mock_services['performance_monitor']
        assert concrete_analyzer.memory_manager == mock_services['memory_manager']
        
        # Check default values
        assert concrete_analyzer.analysis_type == 'concretecloudwatch'
        assert concrete_analyzer.version == '1.0.0'
        assert concrete_analyzer.execution_count == 0
        assert concrete_analyzer.last_execution is None
        assert concrete_analyzer.logger is not None
    
    def test_abstract_class_cannot_be_instantiated(self, mock_services):
        """Test that BaseAnalyzer cannot be instantiated directly."""
        with pytest.raises(TypeError):
            BaseAnalyzer(**mock_services)
    
    def test_prepare_analysis_context(self, concrete_analyzer):
        """Test analysis context preparation."""
        context = concrete_analyzer.prepare_analysis_context(
            region='us-east-1',
            lookback_days=30,
            session_id='test_session'
        )
        
        assert context['analysis_type'] == 'concretecloudwatch'
        assert context['analyzer_version'] == '1.0.0'
        assert context['region'] == 'us-east-1'
        assert context['lookback_days'] == 30
        assert context['session_id'] == 'test_session'
        assert 'cost_constraints' in context
        assert context['cost_constraints']['prioritize_cost_explorer'] is True
    
    def test_get_analyzer_info(self, concrete_analyzer):
        """Test analyzer information retrieval."""
        info = concrete_analyzer.get_analyzer_info()
        
        assert info['analysis_type'] == 'concretecloudwatch'
        assert info['class_name'] == 'ConcreteCloudWatchAnalyzer'
        assert info['version'] == '1.0.0'
        assert info['execution_count'] == 0
        assert info['last_execution'] is None
        assert 'services' in info
        assert 'cost_optimization' in info
    
    def test_validate_parameters_valid(self, concrete_analyzer):
        """Test parameter validation with valid parameters."""
        validation = concrete_analyzer.validate_parameters(
            region='us-east-1',
            lookback_days=30,
            timeout_seconds=60
        )
        
        assert validation['valid'] is True
        assert len(validation['errors']) == 0
        assert len(validation['warnings']) == 0
    
    def test_validate_parameters_invalid_region(self, concrete_analyzer):
        """Test parameter validation with invalid region."""
        validation = concrete_analyzer.validate_parameters(region=123)
        
        assert validation['valid'] is False
        assert any('Region must be a string' in error for error in validation['errors'])
    
    def test_validate_parameters_invalid_lookback_days(self, concrete_analyzer):
        """Test parameter validation with invalid lookback_days."""
        validation = concrete_analyzer.validate_parameters(lookback_days=-5)
        
        assert validation['valid'] is False
        assert any('lookback_days must be a positive integer' in error for error in validation['errors'])
    
    def test_validate_parameters_large_lookback_warning(self, concrete_analyzer):
        """Test parameter validation with large lookback_days generates warning."""
        validation = concrete_analyzer.validate_parameters(lookback_days=400)
        
        assert validation['valid'] is True
        assert len(validation['warnings']) > 0
        assert any('lookback_days > 365' in warning for warning in validation['warnings'])
    
    def test_validate_parameters_invalid_timeout(self, concrete_analyzer):
        """Test parameter validation with invalid timeout."""
        validation = concrete_analyzer.validate_parameters(timeout_seconds='invalid')
        
        assert validation['valid'] is False
        assert any('timeout_seconds must be a positive number' in error for error in validation['errors'])
    
    def test_validate_parameters_list_validation(self, concrete_analyzer):
        """Test parameter validation with list parameters."""
        # Valid list
        validation = concrete_analyzer.validate_parameters(
            log_group_names=['/aws/lambda/test']
        )
        assert validation['valid'] is True
        
        # Invalid list type
        validation = concrete_analyzer.validate_parameters(
            log_group_names='not-a-list'
        )
        assert validation['valid'] is False
        assert any('log_group_names must be a list' in error for error in validation['errors'])
        
        # Invalid list items
        validation = concrete_analyzer.validate_parameters(
            log_group_names=[123, 456]
        )
        assert validation['valid'] is False
        assert any('All log group names must be strings' in error for error in validation['errors'])
    
    def test_create_recommendation(self, concrete_analyzer):
        """Test creation of standardized recommendation."""
        recommendation = concrete_analyzer.create_recommendation(
            rec_type='cost_optimization',
            priority='high',
            title='Test Recommendation',
            description='Test description',
            potential_savings=25.50,
            affected_resources=['resource1', 'resource2'],
            action_items=['action1', 'action2'],
            cloudwatch_component='logs'
        )
        
        assert recommendation['type'] == 'cost_optimization'
        assert recommendation['priority'] == 'high'
        assert recommendation['title'] == 'Test Recommendation'
        assert recommendation['description'] == 'Test description'
        assert recommendation['potential_savings'] == 25.50
        assert recommendation['potential_savings_formatted'] == '$25.50'
        assert recommendation['affected_resources'] == ['resource1', 'resource2']
        assert recommendation['resource_count'] == 2
        assert recommendation['action_items'] == ['action1', 'action2']
        assert recommendation['cloudwatch_component'] == 'logs'
        assert recommendation['analyzer'] == 'concretecloudwatch'
        assert 'created_at' in recommendation
    
    def test_handle_analysis_error(self, concrete_analyzer):
        """Test error handling with different error categories."""
        context = {'test': 'context'}
        
        # Test permission error
        permission_error = Exception("Access denied to CloudWatch")
        result = concrete_analyzer.handle_analysis_error(permission_error, context)
        
        assert result['status'] == 'error'
        assert result['error_category'] == 'permissions'
        assert result['cost_incurred'] is False
        assert len(result['recommendations']) > 0
        assert result['recommendations'][0]['type'] == 'permission_fix'
        
        # Test rate limiting error
        throttle_error = Exception("Rate exceeded - throttling request")
        result = concrete_analyzer.handle_analysis_error(throttle_error, context)
        
        assert result['error_category'] == 'rate_limiting'
        assert result['recommendations'][0]['type'] == 'rate_limit_optimization'
    
    def test_log_analysis_start_and_complete(self, concrete_analyzer, mock_services):
        """Test analysis logging methods."""
        context = {'test': 'context'}
        result = {
            'status': 'success',
            'execution_time': 5.0,
            'recommendations': [{'type': 'test'}],
            'cost_incurred': True,
            'cost_incurring_operations': ['cost_explorer'],
            'primary_data_source': 'cost_explorer'
        }
        
        # Test start logging
        concrete_analyzer.log_analysis_start(context)
        assert concrete_analyzer.execution_count == 1
        assert concrete_analyzer.last_execution is not None
        
        # Test completion logging
        concrete_analyzer.log_analysis_complete(context, result)
        
        # Verify performance monitor was called if available
        if mock_services['performance_monitor']:
            assert mock_services['performance_monitor'].record_metric.called
    
    @pytest.mark.asyncio
    async def test_concrete_analyze_method(self, concrete_analyzer):
        """Test that concrete implementation works."""
        result = await concrete_analyzer.analyze(region='us-east-1')
        
        assert result['status'] == 'success'
        assert result['analysis_type'] == 'test'
        assert result['data']['test'] == 'data'
    
    def test_concrete_get_recommendations_method(self, concrete_analyzer):
        """Test that concrete get_recommendations works."""
        analysis_results = {'data': {'test': 'data'}}
        recommendations = concrete_analyzer.get_recommendations(analysis_results)
        
        assert len(recommendations) == 1
        assert recommendations[0]['type'] == 'test_recommendation'
        assert recommendations[0]['title'] == 'Test Recommendation'
    
    @pytest.mark.asyncio
    async def test_execute_with_error_handling_success(self, concrete_analyzer, mock_services):
        """Test successful execution with error handling wrapper."""
        result = await concrete_analyzer.execute_with_error_handling(
            region='us-east-1',
            lookback_days=30
        )
        
        assert result['status'] == 'success'
        assert result['analysis_type'] == 'test'  # From the concrete analyze method
        assert 'timestamp' in result
        assert 'cost_incurred' in result
        assert 'recommendations' in result
        assert concrete_analyzer.execution_count == 1
    
    @pytest.mark.asyncio
    async def test_execute_with_error_handling_validation_failure(self, concrete_analyzer):
        """Test execution with parameter validation failure."""
        result = await concrete_analyzer.execute_with_error_handling(
            region=123,  # Invalid region
            lookback_days=-5  # Invalid lookback_days
        )
        
        assert result['status'] == 'error'
        assert result['error_message'] == 'Parameter validation failed'
        assert 'validation_errors' in result
        assert len(result['validation_errors']) >= 2
    
    def test_memory_management_integration(self, concrete_analyzer, mock_services):
        """Test memory management integration."""
        mock_services['memory_manager'].start_memory_tracking.return_value = 'tracker_123'
        
        # Memory manager should be available
        assert concrete_analyzer.memory_manager is not None
        
        # Test memory tracking can be started
        tracker_id = concrete_analyzer.memory_manager.start_memory_tracking()
        assert tracker_id == 'tracker_123'


class IncompleteCloudWatchAnalyzer(BaseAnalyzer):
    """Incomplete analyzer implementation for testing abstract method enforcement."""
    
    # Missing analyze method implementation
    def get_recommendations(self, analysis_results):
        return []


class IncompleteCloudWatchAnalyzer2(BaseAnalyzer):
    """Another incomplete analyzer implementation for testing."""
    
    async def analyze(self, **kwargs):
        return {}
    
    # Missing get_recommendations method implementation


@pytest.mark.unit
@pytest.mark.cloudwatch
class TestCloudWatchBaseAnalyzerAbstractMethods:
    """Test abstract method enforcement."""
    
    @pytest.fixture
    def mock_services(self):
        """Create mock services for testing."""
        return {
            'cost_explorer_service': Mock(),
            'config_service': Mock(),
            'metrics_service': Mock(),
            'cloudwatch_service': Mock(),
            'pricing_service': Mock(),
            'performance_monitor': Mock(),
            'memory_manager': Mock()
        }
    
    def test_incomplete_analyzer_missing_analyze(self, mock_services):
        """Test that analyzer without analyze method cannot be instantiated."""
        with pytest.raises(TypeError, match="Can't instantiate abstract class"):
            IncompleteCloudWatchAnalyzer(**mock_services)
    
    def test_incomplete_analyzer_missing_get_recommendations(self, mock_services):
        """Test that analyzer without get_recommendations method cannot be instantiated."""
        with pytest.raises(TypeError, match="Can't instantiate abstract class"):
            IncompleteCloudWatchAnalyzer2(**mock_services)


if __name__ == "__main__":
    pytest.main([__file__])
```
Page 2/14FirstPrevNextLast