#
tokens: 39174/50000 6/82 files (page 4/5)
lines: off (toggle) GitHub
raw markdown copy
This is page 4 of 5. Use http://codebase.md/nictuku/meta-ads-mcp?page={x} to view the full context.

# Directory Structure

```
├── .github
│   └── workflows
│       ├── publish-mcp.yml
│       ├── publish.yml
│       └── test.yml
├── .gitignore
├── .python-version
├── .uv.toml
├── CUSTOM_META_APP.md
├── Dockerfile
├── examples
│   ├── example_http_client.py
│   └── README.md
├── future_improvements.md
├── images
│   └── meta-ads-example.png
├── LICENSE
├── LOCAL_INSTALLATION.md
├── meta_ads_auth.sh
├── meta_ads_mcp
│   ├── __init__.py
│   ├── __main__.py
│   └── core
│       ├── __init__.py
│       ├── accounts.py
│       ├── ads_library.py
│       ├── ads.py
│       ├── adsets.py
│       ├── api.py
│       ├── auth.py
│       ├── authentication.py
│       ├── budget_schedules.py
│       ├── callback_server.py
│       ├── campaigns.py
│       ├── duplication.py
│       ├── http_auth_integration.py
│       ├── insights.py
│       ├── openai_deep_research.py
│       ├── pipeboard_auth.py
│       ├── reports.py
│       ├── resources.py
│       ├── server.py
│       ├── targeting.py
│       └── utils.py
├── META_API_NOTES.md
├── poetry.lock
├── pyproject.toml
├── README.md
├── RELEASE.md
├── requirements.txt
├── server.json
├── setup.py
├── smithery.yaml
├── STREAMABLE_HTTP_SETUP.md
└── tests
    ├── __init__.py
    ├── conftest.py
    ├── e2e_account_info_search_issue.py
    ├── README_REGRESSION_TESTS.md
    ├── README.md
    ├── test_account_info_access_fix.py
    ├── test_account_search.py
    ├── test_budget_update_e2e.py
    ├── test_budget_update.py
    ├── test_create_ad_creative_simple.py
    ├── test_create_simple_creative_e2e.py
    ├── test_dsa_beneficiary.py
    ├── test_dsa_integration.py
    ├── test_duplication_regression.py
    ├── test_duplication.py
    ├── test_dynamic_creatives.py
    ├── test_estimate_audience_size_e2e.py
    ├── test_estimate_audience_size.py
    ├── test_get_account_pages.py
    ├── test_get_ad_creatives_fix.py
    ├── test_get_ad_image_quality_improvements.py
    ├── test_get_ad_image_regression.py
    ├── test_http_transport.py
    ├── test_insights_actions_and_values_e2e.py
    ├── test_insights_pagination.py
    ├── test_integration_openai_mcp.py
    ├── test_is_dynamic_creative_adset.py
    ├── test_mobile_app_adset_creation.py
    ├── test_mobile_app_adset_issue.py
    ├── test_openai_mcp_deep_research.py
    ├── test_openai.py
    ├── test_page_discovery_integration.py
    ├── test_page_discovery.py
    ├── test_targeting_search_e2e.py
    ├── test_targeting.py
    ├── test_update_ad_creative_id.py
    └── test_upload_ad_image.py
```

# Files

--------------------------------------------------------------------------------
/meta_ads_mcp/core/targeting.py:
--------------------------------------------------------------------------------

```python
"""Targeting search functionality for Meta Ads API."""

import json
from typing import Optional, List, Dict, Any
import os
from .api import meta_api_tool, make_api_request
from .server import mcp_server


@mcp_server.tool()
@meta_api_tool
async def search_interests(query: str, access_token: Optional[str] = None, limit: int = 25) -> str:
    """
    Search for interest targeting options by keyword.
    
    Args:
        query: Search term for interests (e.g., "baseball", "cooking", "travel")
        access_token: Meta API access token (optional - will use cached token if not provided)
        limit: Maximum number of results to return (default: 25)
    
    Returns:
        JSON string containing interest data with id, name, audience_size, and path fields
    """
    if not query:
        return json.dumps({"error": "No search query provided"}, indent=2)
    
    endpoint = "search"
    params = {
        "type": "adinterest",
        "q": query,
        "limit": limit
    }
    
    data = await make_api_request(endpoint, access_token, params)
    
    return json.dumps(data, indent=2)


@mcp_server.tool()
@meta_api_tool
async def get_interest_suggestions(interest_list: List[str], access_token: Optional[str] = None, limit: int = 25) -> str:
    """
    Get interest suggestions based on existing interests.
    
    Args:
        interest_list: List of interest names to get suggestions for (e.g., ["Basketball", "Soccer"])
        access_token: Meta API access token (optional - will use cached token if not provided)  
        limit: Maximum number of suggestions to return (default: 25)
    
    Returns:
        JSON string containing suggested interests with id, name, audience_size, and description fields
    """
    if not interest_list:
        return json.dumps({"error": "No interest list provided"}, indent=2)
    
    endpoint = "search"
    params = {
        "type": "adinterestsuggestion", 
        "interest_list": json.dumps(interest_list),
        "limit": limit
    }
    
    data = await make_api_request(endpoint, access_token, params)
    
    return json.dumps(data, indent=2)


@mcp_server.tool()
@meta_api_tool
async def estimate_audience_size(
    access_token: Optional[str] = None,
    account_id: Optional[str] = None,
    targeting: Optional[Dict[str, Any]] = None,
    optimization_goal: str = "REACH",
    # Backwards compatibility for simple interest validation
    interest_list: Optional[List[str]] = None,
    interest_fbid_list: Optional[List[str]] = None
) -> str:
    """
    Estimate audience size for targeting specifications using Meta's delivery_estimate API.
    
    This function provides comprehensive audience estimation for complex targeting combinations
    including demographics, geography, interests, and behaviors. It also maintains backwards
    compatibility for simple interest validation.
    
    Args:
        access_token: Meta API access token (optional - will use cached token if not provided)
        account_id: Meta Ads account ID (format: act_XXXXXXXXX) - required for comprehensive estimation
        targeting: Complete targeting specification including demographics, geography, interests, etc.
                  Example: {
                      "age_min": 25,
                      "age_max": 65,
                      "geo_locations": {"countries": ["PL"]},
                      "flexible_spec": [
                          {"interests": [{"id": "6003371567474"}]},
                          {"interests": [{"id": "6003462346642"}]}
                      ]
                  }
        optimization_goal: Optimization goal for estimation (default: "REACH"). 
                          Options: "REACH", "LINK_CLICKS", "IMPRESSIONS", "CONVERSIONS", etc.
        interest_list: [DEPRECATED - for backwards compatibility] List of interest names to validate
        interest_fbid_list: [DEPRECATED - for backwards compatibility] List of interest IDs to validate
    
    Returns:
        JSON string with audience estimation results including estimated_audience_size,
        reach_estimate, and targeting validation
    """
    # Handle backwards compatibility - simple interest validation
    # Check if we're in backwards compatibility mode (interest params provided OR no comprehensive params)
    is_backwards_compatible_call = (interest_list or interest_fbid_list) or (not account_id and not targeting)
    
    if is_backwards_compatible_call and not targeting:
        if not interest_list and not interest_fbid_list:
            return json.dumps({"error": "No interest list or FBID list provided"}, indent=2)
        
        endpoint = "search"
        params = {
            "type": "adinterestvalid"
        }
        
        if interest_list:
            params["interest_list"] = json.dumps(interest_list)
        
        if interest_fbid_list:
            params["interest_fbid_list"] = json.dumps(interest_fbid_list)
        
        data = await make_api_request(endpoint, access_token, params)
        
        return json.dumps(data, indent=2)
    
    # Comprehensive audience estimation using delivery_estimate API
    if not account_id:
        return json.dumps({
            "error": "account_id is required for comprehensive audience estimation",
            "details": "For simple interest validation, use interest_list or interest_fbid_list parameters"
        }, indent=2)
    
    if not targeting:
        return json.dumps({
            "error": "targeting specification is required for comprehensive audience estimation",
            "example": {
                "age_min": 25,
                "age_max": 65,
                "geo_locations": {"countries": ["US"]},
                "flexible_spec": [
                    {"interests": [{"id": "6003371567474"}]}
                ]
            }
        }, indent=2)
    
    # Preflight validation: require at least one location OR a custom audience
    def _has_location_or_custom_audience(t: Dict[str, Any]) -> bool:
        if not isinstance(t, dict):
            return False
        geo = t.get("geo_locations") or {}
        if isinstance(geo, dict):
            for key in [
                "countries",
                "regions",
                "cities",
                "zips",
                "geo_markets",
                "country_groups"
            ]:
                val = geo.get(key)
                if isinstance(val, list) and len(val) > 0:
                    return True
        # Top-level custom audiences
        ca = t.get("custom_audiences")
        if isinstance(ca, list) and len(ca) > 0:
            return True
        # Custom audiences within flexible_spec
        flex = t.get("flexible_spec")
        if isinstance(flex, list):
            for spec in flex:
                if isinstance(spec, dict):
                    ca_spec = spec.get("custom_audiences")
                    if isinstance(ca_spec, list) and len(ca_spec) > 0:
                        return True
        return False

    if not _has_location_or_custom_audience(targeting):
        return json.dumps({
            "error": "Missing target audience location",
            "details": "Select at least one location in targeting.geo_locations or include a custom audience.",
            "action_required": "Add geo_locations with countries/regions/cities/zips or include custom_audiences.",
            "example": {
                "geo_locations": {"countries": ["US"]},
                "age_min": 25,
                "age_max": 65
            }
        }, indent=2)
    
    # Build reach estimate request (using correct Meta API endpoint)
    endpoint = f"{account_id}/reachestimate"
    params = {
        "targeting_spec": targeting
    }
    
    # Note: reachestimate endpoint doesn't support optimization_goal or objective parameters
    
    try:
        data = await make_api_request(endpoint, access_token, params, method="GET")
        
        # Surface Graph API errors directly for better diagnostics.
        # If reachestimate fails, optionally attempt a fallback using delivery_estimate.
        if isinstance(data, dict) and "error" in data:
            # Special handling for Missing Target Audience Location error (subcode 1885364)
            try:
                err_wrapper = data.get("error", {})
                details_obj = err_wrapper.get("details", {})
                raw_err = details_obj.get("error", {}) if isinstance(details_obj, dict) else {}
                if (
                    isinstance(raw_err, dict) and (
                        raw_err.get("error_subcode") == 1885364 or
                        raw_err.get("error_user_title") == "Missing Target Audience Location"
                    )
                ):
                    return json.dumps({
                        "error": "Missing target audience location",
                        "details": raw_err.get("error_user_msg") or "Select at least one location, or choose a custom audience.",
                        "endpoint_used": f"{account_id}/reachestimate",
                        "action_required": "Add geo_locations with at least one of countries/regions/cities/zips or include custom_audiences.",
                        "blame_field_specs": raw_err.get("error_data", {}).get("blame_field_specs") if isinstance(raw_err.get("error_data"), dict) else None
                    }, indent=2)
            except Exception:
                pass
            # Allow disabling fallback via environment variable
            # Default: fallback disabled unless explicitly enabled by setting DISABLE flag to "0"
            disable_fallback = os.environ.get("META_MCP_DISABLE_DELIVERY_FALLBACK", "1") == "1"
            if disable_fallback:
                return json.dumps({
                    "error": "Graph API returned an error for reachestimate",
                    "details": data.get("error"),
                    "endpoint_used": f"{account_id}/reachestimate",
                    "request_params": {
                        "has_targeting_spec": bool(targeting),
                    },
                    "note": "delivery_estimate fallback disabled via META_MCP_DISABLE_DELIVERY_FALLBACK"
                }, indent=2)

            # Try fallback to delivery_estimate endpoint
            try:
                fallback_endpoint = f"{account_id}/delivery_estimate"
                fallback_params = {
                    "targeting_spec": json.dumps(targeting),
                    # Some API versions accept optimization_goal here
                    "optimization_goal": optimization_goal
                }
                fallback_data = await make_api_request(fallback_endpoint, access_token, fallback_params, method="GET")
                
                # If fallback returns usable data, format similarly
                if isinstance(fallback_data, dict) and "data" in fallback_data and len(fallback_data["data"]) > 0:
                    estimate_data = fallback_data["data"][0]
                    formatted_response = {
                        "success": True,
                        "account_id": account_id,
                        "targeting": targeting,
                        "optimization_goal": optimization_goal,
                        "estimated_audience_size": estimate_data.get("estimate_mau", 0),
                        "estimate_details": {
                            "monthly_active_users": estimate_data.get("estimate_mau", 0),
                            "daily_outcomes_curve": estimate_data.get("estimate_dau", []),
                            "bid_estimate": estimate_data.get("bid_estimates", {}),
                            "unsupported_targeting": estimate_data.get("unsupported_targeting", [])
                        },
                        "raw_response": fallback_data,
                        "fallback_endpoint_used": "delivery_estimate"
                    }
                    return json.dumps(formatted_response, indent=2)
                
                # Fallback returned but not in expected format
                return json.dumps({
                    "error": "Graph API returned an error for reachestimate; delivery_estimate fallback did not return usable data",
                    "reachestimate_error": data.get("error"),
                    "fallback_endpoint_used": "delivery_estimate",
                    "fallback_raw_response": fallback_data,
                    "endpoint_used": f"{account_id}/reachestimate",
                    "request_params": {
                        "has_targeting_spec": bool(targeting)
                    }
                }, indent=2)
            except Exception as _fallback_exc:
                return json.dumps({
                    "error": "Graph API returned an error for reachestimate; delivery_estimate fallback also failed",
                    "reachestimate_error": data.get("error"),
                    "fallback_endpoint_used": "delivery_estimate",
                    "fallback_exception": str(_fallback_exc),
                    "endpoint_used": f"{account_id}/reachestimate",
                    "request_params": {
                        "has_targeting_spec": bool(targeting)
                    }
                }, indent=2)

        # Format the response for easier consumption
        if "data" in data:
            response_data = data["data"]
            # Case 1: delivery_estimate-like list structure
            if isinstance(response_data, list) and len(response_data) > 0:
                estimate_data = response_data[0]
                formatted_response = {
                    "success": True,
                    "account_id": account_id,
                    "targeting": targeting,
                    "optimization_goal": optimization_goal,
                    "estimated_audience_size": estimate_data.get("estimate_mau", 0),
                    "estimate_details": {
                        "monthly_active_users": estimate_data.get("estimate_mau", 0),
                        "daily_outcomes_curve": estimate_data.get("estimate_dau", []),
                        "bid_estimate": estimate_data.get("bid_estimates", {}),
                        "unsupported_targeting": estimate_data.get("unsupported_targeting", [])
                    },
                    "raw_response": data
                }
                return json.dumps(formatted_response, indent=2)
            # Case 1b: explicit handling for empty list responses
            if isinstance(response_data, list) and len(response_data) == 0:
                return json.dumps({
                    "error": "No estimation data returned from Meta API",
                    "raw_response": data,
                    "debug_info": {
                        "response_keys": list(data.keys()) if isinstance(data, dict) else "not_a_dict",
                        "response_type": str(type(data)),
                        "endpoint_used": f"{account_id}/reachestimate"
                    }
                }, indent=2)
            # Case 2: reachestimate dict structure with bounds
            if isinstance(response_data, dict):
                lower = response_data.get("users_lower_bound", response_data.get("estimate_mau_lower_bound"))
                upper = response_data.get("users_upper_bound", response_data.get("estimate_mau_upper_bound"))
                estimate_ready = response_data.get("estimate_ready")
                midpoint = None
                try:
                    if isinstance(lower, (int, float)) and isinstance(upper, (int, float)):
                        midpoint = int((lower + upper) / 2)
                except Exception:
                    midpoint = None
                formatted_response = {
                    "success": True,
                    "account_id": account_id,
                    "targeting": targeting,
                    "optimization_goal": optimization_goal,
                    "estimated_audience_size": midpoint if midpoint is not None else 0,
                    "estimate_details": {
                        "users_lower_bound": lower,
                        "users_upper_bound": upper,
                        "estimate_ready": estimate_ready
                    },
                    "raw_response": data
                }
                return json.dumps(formatted_response, indent=2)
        else:
            return json.dumps({
                "error": "No estimation data returned from Meta API",
                "raw_response": data,
                "debug_info": {
                    "response_keys": list(data.keys()) if isinstance(data, dict) else "not_a_dict",
                    "response_type": str(type(data)),
                    "endpoint_used": f"{account_id}/reachestimate"
                }
            }, indent=2)
    
    except Exception as e:
        # Try fallback to delivery_estimate first when an exception occurs (unless disabled)
        # Default: fallback disabled unless explicitly enabled by setting DISABLE flag to "0"
        disable_fallback = os.environ.get("META_MCP_DISABLE_DELIVERY_FALLBACK", "1") == "1"
        if not disable_fallback:
            try:
                fallback_endpoint = f"{account_id}/delivery_estimate"
                fallback_params = {
                "targeting_spec": json.dumps(targeting) if isinstance(targeting, dict) else targeting,
                "optimization_goal": optimization_goal
            }
                fallback_data = await make_api_request(fallback_endpoint, access_token, fallback_params, method="GET")
            
                if isinstance(fallback_data, dict) and "data" in fallback_data and len(fallback_data["data"]) > 0:
                    estimate_data = fallback_data["data"][0]
                    formatted_response = {
                        "success": True,
                        "account_id": account_id,
                        "targeting": targeting,
                        "optimization_goal": optimization_goal,
                        "estimated_audience_size": estimate_data.get("estimate_mau", 0),
                        "estimate_details": {
                            "monthly_active_users": estimate_data.get("estimate_mau", 0),
                            "daily_outcomes_curve": estimate_data.get("estimate_dau", []),
                            "bid_estimate": estimate_data.get("bid_estimates", {}),
                            "unsupported_targeting": estimate_data.get("unsupported_targeting", [])
                        },
                        "raw_response": fallback_data,
                        "fallback_endpoint_used": "delivery_estimate"
                    }
                    return json.dumps(formatted_response, indent=2)
            except Exception as _fallback_exc:
                # If fallback also fails, proceed to detailed error handling below
                pass

        # Check if this is the specific Business Manager system user permission error
        error_str = str(e)
        if "100" in error_str and "33" in error_str:
            # Try to provide fallback estimation using individual interests if available
            interests_found = []
            if targeting and "interests" in targeting:
                interests_found.extend([interest.get("id") for interest in targeting["interests"] if interest.get("id")])
            elif targeting and "flexible_spec" in targeting:
                for spec in targeting["flexible_spec"]:
                    if "interests" in spec:
                        interests_found.extend([interest.get("id") for interest in spec["interests"] if interest.get("id")])
            
            if interests_found:
                # Attempt to get individual interest data as fallback
                try:
                    fallback_result = await estimate_audience_size(
                        access_token=access_token,
                        interest_fbid_list=interests_found
                    )
                    fallback_data = json.loads(fallback_result)
                    
                    return json.dumps({
                        "comprehensive_targeting_failed": True,
                        "error_code": "100-33",
                        "fallback_used": True,
                        "details": {
                            "issue": "reachestimate endpoint returned error - possibly due to targeting parameters or account limitations",
                            "solution": "Individual interest validation used as fallback - comprehensive targeting may have specific requirements",
                            "endpoint_used": f"{account_id}/reachestimate"
                        },
                        "individual_interest_data": fallback_data,
                        "note": "Individual interest audience sizes provided as fallback. Comprehensive targeting via reachestimate endpoint failed."
                    }, indent=2)
                except:
                    pass
            
            return json.dumps({
                "error": "reachestimate endpoint returned error (previously was incorrectly using delivery_estimate)",
                "error_code": "100-33", 
                "details": {
                    "issue": "The endpoint returned an error, possibly due to targeting parameters or account limitations",
                    "endpoint_used": f"{account_id}/reachestimate",
                    "previous_issue": "Code was previously using non-existent delivery_estimate endpoint - now fixed",
                    "available_alternative": "Use interest_list or interest_fbid_list parameters for individual interest validation"
                },
                "raw_error": error_str
            }, indent=2)
        else:
            return json.dumps({
                "error": f"Failed to get audience estimation from reachestimate endpoint: {str(e)}",
                "details": "Check targeting parameters and account permissions",
                "error_type": "general_api_error",
                "endpoint_used": f"{account_id}/reachestimate"
            }, indent=2)


@mcp_server.tool()
@meta_api_tool
async def search_behaviors(access_token: Optional[str] = None, limit: int = 50) -> str:
    """
    Get all available behavior targeting options.
    
    Args:
        access_token: Meta API access token (optional - will use cached token if not provided)
        limit: Maximum number of results to return (default: 50)
    
    Returns:
        JSON string containing behavior targeting options with id, name, audience_size bounds, path, and description
    """
    endpoint = "search"
    params = {
        "type": "adTargetingCategory",
        "class": "behaviors",
        "limit": limit
    }
    
    data = await make_api_request(endpoint, access_token, params)
    
    return json.dumps(data, indent=2)


@mcp_server.tool()
@meta_api_tool
async def search_demographics(access_token: Optional[str] = None, demographic_class: str = "demographics", limit: int = 50) -> str:
    """
    Get demographic targeting options.
    
    Args:
        access_token: Meta API access token (optional - will use cached token if not provided)
        demographic_class: Type of demographics to retrieve. Options: 'demographics', 'life_events', 
                          'industries', 'income', 'family_statuses', 'user_device', 'user_os' (default: 'demographics')
        limit: Maximum number of results to return (default: 50)
    
    Returns:
        JSON string containing demographic targeting options with id, name, audience_size bounds, path, and description
    """
    endpoint = "search"
    params = {
        "type": "adTargetingCategory",
        "class": demographic_class,
        "limit": limit
    }
    
    data = await make_api_request(endpoint, access_token, params)
    
    return json.dumps(data, indent=2)


@mcp_server.tool()
@meta_api_tool
async def search_geo_locations(query: str, access_token: Optional[str] = None, 
                             location_types: Optional[List[str]] = None, limit: int = 25) -> str:
    """
    Search for geographic targeting locations.
    
    Args:
        query: Search term for locations (e.g., "New York", "California", "Japan")
        access_token: Meta API access token (optional - will use cached token if not provided)
        location_types: Types of locations to search. Options: ['country', 'region', 'city', 'zip', 
                       'geo_market', 'electoral_district']. If not specified, searches all types.
        limit: Maximum number of results to return (default: 25)
    
    Returns:
        JSON string containing location data with key, name, type, and geographic hierarchy information
    """
    if not query:
        return json.dumps({"error": "No search query provided"}, indent=2)
    
    endpoint = "search"
    params = {
        "type": "adgeolocation",
        "q": query,
        "limit": limit
    }
    
    if location_types:
        params["location_types"] = json.dumps(location_types)
    
    data = await make_api_request(endpoint, access_token, params)
    
    return json.dumps(data, indent=2) 
```

--------------------------------------------------------------------------------
/tests/test_estimate_audience_size_e2e.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
End-to-End Audience Estimation Test for Meta Ads MCP

This test validates that the new estimate_audience_size function correctly provides
comprehensive audience estimation and backwards compatibility for interest validation
through a pre-authenticated MCP server.

Usage:
    1. Start the server: uv run python -m meta_ads_mcp --transport streamable-http --port 8080
    2. Run test: uv run python tests/test_estimate_audience_size_e2e.py

Or with pytest (manual only):
    uv run python -m pytest tests/test_estimate_audience_size_e2e.py -v -m e2e

Test scenarios:
1. Comprehensive audience estimation with complex targeting
2. Backwards compatibility with simple interest validation
3. Error handling for invalid parameters
4. Different optimization goals
"""

import pytest
import requests
import json
import os
import sys
from typing import Dict, Any, List

# Load environment variables from .env file
try:
    from dotenv import load_dotenv
    load_dotenv()
    print("✅ Loaded environment variables from .env file")
except ImportError:
    print("⚠️  python-dotenv not installed, using system environment variables only")

@pytest.mark.e2e
@pytest.mark.skip(reason="E2E test - run manually only")
class AudienceEstimationTester:
    """Test suite focused on audience estimation functionality"""
    
    def __init__(self, base_url: str = "http://localhost:8080"):
        self.base_url = base_url.rstrip('/')
        self.endpoint = f"{self.base_url}/mcp/"
        self.request_id = 1
        
        # Default account ID from workspace rules
        self.account_id = "act_701351919139047"
        
        # Test targeting specifications
        self.test_targeting_specs = {
            "simple_demographics": {
                "age_min": 25,
                "age_max": 65,
                "geo_locations": {"countries": ["US"]}
            },
            "demographics_with_interests": {
                "age_min": 18,
                "age_max": 35,
                "geo_locations": {"countries": ["PL"]},
                "flexible_spec": [
                    {"interests": [{"id": "6003371567474"}]}  # Business interest
                ]
            },
            "complex_targeting": {
                "age_min": 25,
                "age_max": 55,
                "geo_locations": {"countries": ["US"], "regions": [{"key": "3847"}]},  # California
                "flexible_spec": [
                    {"interests": [{"id": "6003371567474"}, {"id": "6003462346642"}]},  # Business + Technology
                    {"behaviors": [{"id": "6007101597783"}]}  # Business travelers
                ]
            },
            "mobile_app_targeting": {
                "age_min": 18,
                "age_max": 45,
                "geo_locations": {"countries": ["US"]},
                "user_device": ["mobile"],
                "user_os": ["iOS", "Android"],
                "flexible_spec": [
                    {"interests": [{"id": "6003139266461"}]}  # Mobile games
                ]
            }
        }
        
        # Test interest lists for backwards compatibility
        self.test_interests = {
            "valid_names": ["Japan", "Basketball", "Technology"],
            "mixed_validity": ["Japan", "invalidinterestname12345", "Basketball"],
            "valid_fbids": ["6003700426513", "6003397425735"],  # Japan, Tennis
            "invalid_fbids": ["999999999999", "000000000000"]
        }
        
    def _make_request(self, method: str, params: Dict[str, Any] = None, 
                     headers: Dict[str, str] = None) -> Dict[str, Any]:
        """Make a JSON-RPC request to the MCP server"""
        
        default_headers = {
            "Content-Type": "application/json",
            "Accept": "application/json, text/event-stream",
            "User-Agent": "Audience-Estimation-Test-Client/1.0"
        }
        
        if headers:
            default_headers.update(headers)
        
        payload = {
            "jsonrpc": "2.0",
            "method": method,
            "id": self.request_id
        }
        
        if params:
            payload["params"] = params
        
        try:
            response = requests.post(
                self.endpoint,
                headers=default_headers,
                json=payload,
                timeout=20  # Increased timeout for delivery estimates
            )
            
            self.request_id += 1
            
            return {
                "status_code": response.status_code,
                "headers": dict(response.headers),
                "json": response.json() if response.status_code == 200 else None,
                "text": response.text,
                "success": response.status_code == 200
            }
            
        except requests.exceptions.RequestException as e:
            return {
                "status_code": 0,
                "headers": {},
                "json": None,
                "text": str(e),
                "success": False,
                "error": str(e)
            }

    def _check_for_errors(self, parsed_content: Dict[str, Any]) -> Dict[str, Any]:
        """Properly handle both wrapped and direct error formats"""
        
        # Check for data wrapper format first
        if "data" in parsed_content:
            data = parsed_content["data"]
            
            # Handle case where data is already parsed (dict/list)
            if isinstance(data, dict) and 'error' in data:
                return {
                    "has_error": True,
                    "error_message": data['error'],
                    "error_details": data.get('details', ''),
                    "format": "wrapped_dict"
                }
            
            # Handle case where data is a JSON string that needs parsing
            if isinstance(data, str):
                try:
                    error_data = json.loads(data)
                    if 'error' in error_data:
                        return {
                            "has_error": True,
                            "error_message": error_data['error'],
                            "error_details": error_data.get('details', ''),
                            "format": "wrapped_json"
                        }
                except json.JSONDecodeError:
                    # Data field exists but isn't valid JSON
                    pass
        
        # Check for direct error format
        if 'error' in parsed_content:
            return {
                "has_error": True,
                "error_message": parsed_content['error'],
                "error_details": parsed_content.get('details', ''),
                "format": "direct"
            }
        
        return {"has_error": False}

    def _extract_data(self, parsed_content: Dict[str, Any]) -> Any:
        """Extract successful response data from various wrapper formats"""
        
        if "data" in parsed_content:
            data = parsed_content["data"]
            
            # Handle case where data is already parsed
            if isinstance(data, (list, dict)):
                return data
            
            # Handle case where data is a JSON string
            if isinstance(data, str):
                try:
                    return json.loads(data)
                except json.JSONDecodeError:
                    return None
        
        # Handle direct format (data at top level)
        if isinstance(parsed_content, (list, dict)):
            return parsed_content
        
        return None

    def test_pl_only_reachestimate_bounds(self) -> Dict[str, Any]:
        """Verify PL-only reachestimate returns expected bounds and midpoint.
        
        Prerequisite: Start server with fallback disabled so reachestimate is used directly.
        Example:
            export META_MCP_DISABLE_DELIVERY_FALLBACK=1
            uv run python -m meta_ads_mcp --transport streamable-http --port 8080
        """
        print(f"\n🇵🇱 Testing PL-only reachestimate bounds (fallback disabled)")
        local_account_id = "act_3182643988557192"
        targeting_spec = {"geo_locations": {"countries": ["PL"]}}
        expected_lower = 18600000
        expected_upper = 21900000
        expected_midpoint = 20250000

        result = self._make_request("tools/call", {
            "name": "estimate_audience_size",
            "arguments": {
                "account_id": local_account_id,
                "targeting": targeting_spec,
                "optimization_goal": "REACH"
            }
        })

        if not result["success"]:
            print(f"   ❌ Request failed: {result.get('text', 'Unknown error')}")
            return {"success": False, "error": result.get("text", "Unknown error")}

        response_data = result["json"]["result"]
        content = response_data.get("content", [{}])[0].get("text", "")
        try:
            parsed_content = json.loads(content)
        except json.JSONDecodeError:
            print(f"   ❌ Invalid JSON response")
            return {"success": False, "error": "Invalid JSON"}

        error_info = self._check_for_errors(parsed_content)
        if error_info["has_error"]:
            print(f"   ❌ API Error: {error_info['error_message']}")
            return {"success": False, "error": error_info["error_message"], "error_format": error_info["format"]}

        if not parsed_content.get("success", False):
            print(f"   ❌ Response indicates failure but no error message found")
            return {"success": False, "error": "Unexpected failure"}

        details = parsed_content.get("estimate_details", {}) or {}
        lower = details.get("users_lower_bound")
        upper = details.get("users_upper_bound")
        midpoint = parsed_content.get("estimated_audience_size")
        fallback_used = parsed_content.get("fallback_endpoint_used")

        ok = (
            lower == expected_lower and
            upper == expected_upper and
            midpoint == expected_midpoint and
            (fallback_used is None)
        )

        if ok:
            print(f"   ✅ Bounds: {lower:,}–{upper:,}; midpoint: {midpoint:,}")
            return {
                "success": True,
                "users_lower_bound": lower,
                "users_upper_bound": upper,
                "midpoint": midpoint
            }
        else:
            print(f"   ❌ Unexpected values: lower={lower}, upper={upper}, midpoint={midpoint}, fallback={fallback_used}")
            return {
                "success": False,
                "users_lower_bound": lower,
                "users_upper_bound": upper,
                "midpoint": midpoint,
                "fallback_endpoint_used": fallback_used
            }

    def test_comprehensive_audience_estimation(self) -> Dict[str, Any]:
        """Test comprehensive audience estimation with complex targeting"""
        
        print(f"\n🎯 Testing Comprehensive Audience Estimation")
        results = {}
        
        for spec_name, targeting_spec in self.test_targeting_specs.items():
            print(f"   📊 Testing targeting: '{spec_name}'")
            
            result = self._make_request("tools/call", {
                "name": "estimate_audience_size",
                "arguments": {
                    "account_id": self.account_id,
                    "targeting": targeting_spec,
                    "optimization_goal": "REACH"
                }
            })
            
            if not result["success"]:
                results[spec_name] = {
                    "success": False,
                    "error": result.get("text", "Unknown error")
                }
                print(f"   ❌ Failed: {result.get('text', 'Unknown error')}")
                continue
            
            # Parse response
            response_data = result["json"]["result"]
            content = response_data.get("content", [{}])[0].get("text", "")
            
            try:
                parsed_content = json.loads(content)
                
                # Check for errors using robust helper method
                error_info = self._check_for_errors(parsed_content)
                if error_info["has_error"]:
                    results[spec_name] = {
                        "success": False,
                        "error": error_info["error_message"],
                        "error_format": error_info["format"]
                    }
                    print(f"   ❌ API Error: {error_info['error_message']}")
                    continue
                
                # Check for expected fields in comprehensive estimation
                has_success = parsed_content.get("success", False)
                has_estimate = "estimated_audience_size" in parsed_content
                has_details = "estimate_details" in parsed_content
                
                results[spec_name] = {
                    "success": has_success and has_estimate,
                    "has_estimate": has_estimate,
                    "has_details": has_details,
                    "estimated_size": parsed_content.get("estimated_audience_size", 0),
                    "optimization_goal": parsed_content.get("optimization_goal"),
                    "raw_response": parsed_content
                }
                
                if has_success and has_estimate:
                    estimate_size = parsed_content.get("estimated_audience_size", 0)
                    print(f"   ✅ Estimated audience: {estimate_size:,} people")
                else:
                    print(f"   ⚠️  Incomplete response: success={has_success}, estimate={has_estimate}")
                
            except json.JSONDecodeError:
                results[spec_name] = {
                    "success": False,
                    "error": "Invalid JSON response",
                    "raw_content": content
                }
                print(f"   ❌ Invalid JSON: {content[:100]}...")
        
        return results

    def test_backwards_compatibility_interest_validation(self) -> Dict[str, Any]:
        """Test backwards compatibility with simple interest validation"""
        
        print(f"\n🔄 Testing Backwards Compatibility (Interest Validation)")
        results = {}
        
        # Test with interest names
        print(f"   📝 Testing interest name validation")
        
        result = self._make_request("tools/call", {
            "name": "estimate_audience_size",
            "arguments": {
                "interest_list": self.test_interests["mixed_validity"]
            }
        })
        
        if result["success"]:
            response_data = result["json"]["result"]
            content = response_data.get("content", [{}])[0].get("text", "")
            
            try:
                parsed_content = json.loads(content)
                
                # Check for errors first
                error_info = self._check_for_errors(parsed_content)
                if error_info["has_error"]:
                    results["interest_names"] = {
                        "success": False,
                        "error": error_info["error_message"],
                        "error_format": error_info["format"]
                    }
                    print(f"   ❌ API Error: {error_info['error_message']}")
                else:
                    # Extract data using robust helper method
                    validations = self._extract_data(parsed_content)
                    if validations and isinstance(validations, list):
                        results["interest_names"] = {
                            "success": True,
                            "count": len(validations),
                            "has_valid": any(v.get("valid", False) for v in validations),
                            "has_invalid": any(not v.get("valid", True) for v in validations),
                            "validations": validations
                        }
                        print(f"   ✅ Validated {len(validations)} interests")
                        for validation in validations:
                            status = "✅" if validation.get("valid") else "❌"
                            print(f"      {status} {validation.get('name', 'N/A')}")
                    else:
                        results["interest_names"] = {"success": False, "error": "No validation data"}
                        print(f"   ❌ No validation data returned")
                    
            except json.JSONDecodeError:
                results["interest_names"] = {"success": False, "error": "Invalid JSON"}
                print(f"   ❌ Invalid JSON response")
        else:
            results["interest_names"] = {"success": False, "error": result.get("text", "Request failed")}
            print(f"   ❌ Request failed: {result.get('text', 'Unknown error')}")
        
        # Test with interest FBIDs
        print(f"   🔢 Testing interest FBID validation")
        
        result = self._make_request("tools/call", {
            "name": "estimate_audience_size",
            "arguments": {
                "interest_fbid_list": self.test_interests["valid_fbids"]
            }
        })
        
        if result["success"]:
            response_data = result["json"]["result"]
            content = response_data.get("content", [{}])[0].get("text", "")
            
            try:
                parsed_content = json.loads(content)
                
                # Check for errors first
                error_info = self._check_for_errors(parsed_content)
                if error_info["has_error"]:
                    results["interest_fbids"] = {
                        "success": False,
                        "error": error_info["error_message"],
                        "error_format": error_info["format"]
                    }
                    print(f"   ❌ API Error: {error_info['error_message']}")
                else:
                    # Extract data using robust helper method
                    validations = self._extract_data(parsed_content)
                    if validations and isinstance(validations, list):
                        results["interest_fbids"] = {
                            "success": True,
                            "count": len(validations),
                            "all_valid": all(v.get("valid", False) for v in validations),
                            "validations": validations
                        }
                        print(f"   ✅ Validated {len(validations)} FBID interests")
                        for validation in validations:
                            status = "✅" if validation.get("valid") else "❌"
                            print(f"      {status} FBID: {validation.get('id', 'N/A')}")
                    else:
                        results["interest_fbids"] = {"success": False, "error": "No validation data"}
                        print(f"   ❌ No validation data returned")
                    
            except json.JSONDecodeError:
                results["interest_fbids"] = {"success": False, "error": "Invalid JSON"}
                print(f"   ❌ Invalid JSON response")
        else:
            results["interest_fbids"] = {"success": False, "error": result.get("text", "Request failed")}
            print(f"   ❌ Request failed: {result.get('text', 'Unknown error')}")
        
        return results

    def test_different_optimization_goals(self) -> Dict[str, Any]:
        """Test audience estimation with different optimization goals"""
        
        print(f"\n🎯 Testing Different Optimization Goals")
        results = {}
        
        optimization_goals = ["REACH", "LINK_CLICKS", "CONVERSIONS", "APP_INSTALLS"]
        base_targeting = self.test_targeting_specs["simple_demographics"]
        
        for goal in optimization_goals:
            print(f"   🎯 Testing optimization goal: '{goal}'")
            
            result = self._make_request("tools/call", {
                "name": "estimate_audience_size",
                "arguments": {
                    "account_id": self.account_id,
                    "targeting": base_targeting,
                    "optimization_goal": goal
                }
            })
            
            if result["success"]:
                response_data = result["json"]["result"]
                content = response_data.get("content", [{}])[0].get("text", "")
                
                try:
                    parsed_content = json.loads(content)
                    
                    # Check for errors first
                    error_info = self._check_for_errors(parsed_content)
                    if error_info["has_error"]:
                        results[goal] = {
                            "success": False,
                            "error": error_info["error_message"],
                            "error_format": error_info["format"]
                        }
                        print(f"   ❌ {goal}: {error_info['error_message']}")
                    elif parsed_content.get("success", False):
                        results[goal] = {
                            "success": True,
                            "estimated_size": parsed_content.get("estimated_audience_size", 0),
                            "goal_used": parsed_content.get("optimization_goal")
                        }
                        estimate_size = parsed_content.get("estimated_audience_size", 0)
                        print(f"   ✅ {goal}: {estimate_size:,} people")
                    else:
                        results[goal] = {
                            "success": False,
                            "error": "Response indicates failure but no error message found"
                        }
                        print(f"   ❌ {goal}: Response indicates failure but no error message found")
                        
                except json.JSONDecodeError:
                    results[goal] = {"success": False, "error": "Invalid JSON"}
                    print(f"   ❌ {goal}: Invalid JSON response")
            else:
                results[goal] = {"success": False, "error": result.get("text", "Request failed")}
                print(f"   ❌ {goal}: Request failed")
        
        return results

    def test_error_handling(self) -> Dict[str, Any]:
        """Test error handling for invalid parameters"""
        
        print(f"\n⚠️  Testing Error Handling")
        results = {}
        
        # Test 1: No parameters
        print(f"   🚫 Testing with no parameters")
        result = self._make_request("tools/call", {
            "name": "estimate_audience_size",
            "arguments": {}
        })
        
        results["no_params"] = self._parse_error_response(result, "Should require targeting or interest validation")
        
        # Test 2: Account ID without targeting
        print(f"   🚫 Testing account ID without targeting")
        result = self._make_request("tools/call", {
            "name": "estimate_audience_size",
            "arguments": {
                "account_id": self.account_id
            }
        })
        
        results["no_targeting"] = self._parse_error_response(result, "Should require targeting specification")
        
        # Test 3: Invalid targeting structure
        print(f"   🚫 Testing invalid targeting structure")
        result = self._make_request("tools/call", {
            "name": "estimate_audience_size",
            "arguments": {
                "account_id": self.account_id,
                "targeting": {"invalid": "structure"}
            }
        })
        
        results["invalid_targeting"] = self._parse_error_response(result, "Should handle invalid targeting")
        
        # Test 4: Missing location in targeting (no geo_locations or custom audiences)
        print(f"   🚫 Testing missing location in targeting")
        result = self._make_request("tools/call", {
            "name": "estimate_audience_size",
            "arguments": {
                "account_id": self.account_id,
                # Interests present but no geo_locations and no custom_audiences
                "targeting": {
                    "age_min": 18,
                    "age_max": 35,
                    "flexible_spec": [
                        {"interests": [{"id": "6003371567474"}]}
                    ]
                }
            }
        })
        results["missing_location"] = self._parse_error_response(result, "Should require a location or custom audience")
        
        return results
    
    def _parse_error_response(self, result: Dict[str, Any], description: str) -> Dict[str, Any]:
        """Helper to parse and validate error responses"""
        
        if not result["success"]:
            print(f"   ✅ {description}: Request failed as expected")
            return {"success": True, "error_type": "request_failure"}
        
        response_data = result["json"]["result"]
        content = response_data.get("content", [{}])[0].get("text", "")
        
        try:
            parsed_content = json.loads(content)
            
            # Use robust error checking helper method
            error_info = self._check_for_errors(parsed_content)
            if error_info["has_error"]:
                print(f"   ✅ {description}: {error_info['error_message']}")
                return {
                    "success": True, 
                    "error_message": error_info["error_message"],
                    "error_format": error_info["format"]
                }
            else:
                print(f"   ❌ {description}: No error returned when expected")
                return {"success": False, "unexpected_success": True}
                
        except json.JSONDecodeError:
            print(f"   ❌ {description}: Invalid JSON response")
            return {"success": False, "error": "Invalid JSON"}

    def run_audience_estimation_tests(self) -> bool:
        """Run comprehensive audience estimation tests"""
        
        print("🚀 Meta Ads Audience Estimation End-to-End Test Suite")
        print("="*70)
        
        # Check server availability
        try:
            response = requests.get(f"{self.base_url}/", timeout=5)
            server_running = response.status_code in [200, 404]
        except:
            server_running = False
        
        if not server_running:
            print("❌ Server is not running at", self.base_url)
            print("   Please start the server with:")
            print("   python3 -m meta_ads_mcp --transport streamable-http --port 8080")
            return False
        
        print("✅ Server is running")
        print("🔐 Using implicit authentication from server")
        print(f"🏢 Using account ID: {self.account_id}")
        
        # Test 0: PL-only reachestimate bounds verification
        print("\n" + "="*70)
        print("📋 PHASE 0: PL-only reachestimate bounds verification (fallback disabled)")
        print("="*70)
        pl_only_results = self.test_pl_only_reachestimate_bounds()
        pl_only_success = pl_only_results.get("success", False)

        # Test 1: Comprehensive Audience Estimation
        print("\n" + "="*70)
        print("📋 PHASE 1: Testing Comprehensive Audience Estimation")
        print("="*70)
        
        comprehensive_results = self.test_comprehensive_audience_estimation()
        comprehensive_success = any(
            result.get("success") and result.get("estimated_size", 0) > 0
            for result in comprehensive_results.values()
        )
        
        # Test 2: Backwards Compatibility
        print("\n" + "="*70)
        print("📋 PHASE 2: Testing Backwards Compatibility")
        print("="*70)
        
        compat_results = self.test_backwards_compatibility_interest_validation()
        compat_success = (
            compat_results.get("interest_names", {}).get("success", False) and
            compat_results.get("interest_fbids", {}).get("success", False)
        )
        
        # Test 3: Different Optimization Goals
        print("\n" + "="*70)
        print("📋 PHASE 3: Testing Different Optimization Goals")
        print("="*70)
        
        goals_results = self.test_different_optimization_goals()
        goals_success = any(
            result.get("success") and result.get("estimated_size", 0) > 0
            for result in goals_results.values()
        )
        
        # Test 4: Error Handling
        print("\n" + "="*70)
        print("📋 PHASE 4: Testing Error Handling")
        print("="*70)
        
        error_results = self.test_error_handling()
        error_success = all(
            result.get("success", False) for result in error_results.values()
        )
        
        # Final assessment
        print("\n" + "="*70)
        print("📊 FINAL RESULTS")
        print("="*70)
        
        all_tests = [
            ("PL-only Reachestimate Bounds", pl_only_success),
            ("Comprehensive Estimation", comprehensive_success),
            ("Backwards Compatibility", compat_success),
            ("Optimization Goals", goals_success),
            ("Error Handling", error_success)
        ]
        
        passed_tests = sum(1 for _, success in all_tests if success)
        total_tests = len(all_tests)
        
        for test_name, success in all_tests:
            status = "✅ PASSED" if success else "❌ FAILED"
            print(f"   • {test_name}: {status}")
        
        overall_success = passed_tests >= 3  # At least 3 out of 4 tests should pass
        
        if overall_success:
            print(f"\n✅ Audience estimation tests: SUCCESS ({passed_tests}/{total_tests} passed)")
            print("   • Comprehensive audience estimation is working")
            print("   • Backwards compatibility is maintained")
            print("   • Meta reachestimate API integration is functional")
            return True
        else:
            print(f"\n❌ Audience estimation tests: FAILED ({passed_tests}/{total_tests} passed)")
            print("   • Some audience estimation features are not working properly")
            return False


def main():
    """Main test execution"""
    tester = AudienceEstimationTester()
    success = tester.run_audience_estimation_tests()
    
    if success:
        print("\n🎉 All audience estimation tests passed!")
    else:
        print("\n⚠️  Some audience estimation tests failed - see details above")
    
    sys.exit(0 if success else 1)


if __name__ == "__main__":
    main()
```

--------------------------------------------------------------------------------
/tests/test_dsa_beneficiary.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Unit tests for DSA (Digital Services Act) beneficiary functionality in Meta Ads MCP.

This module tests the implementation of DSA beneficiary field support for ad set creation,
including detection of DSA requirements, parameter validation, and error handling.
"""

import pytest
import json
from unittest.mock import AsyncMock, patch, MagicMock

from meta_ads_mcp.core.adsets import create_adset, get_adset_details
from meta_ads_mcp.core.accounts import get_account_info


class TestDSABeneficiaryDetection:
    """Test cases for detecting DSA beneficiary requirements"""
    
    @pytest.mark.asyncio
    async def test_dsa_requirement_detection_business_account(self):
        """Test DSA requirement detection for European business accounts"""
        mock_account_response = {
            "id": "act_701351919139047",
            "name": "Test European Business Account",
            "account_status": 1,
            "business_country_code": "DE",  # Germany - DSA compliant
            "business_city": "Berlin",
            "currency": "EUR"
        }
        
        with patch('meta_ads_mcp.core.accounts.make_api_request', new_callable=AsyncMock) as mock_api:
            with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
                mock_auth.return_value = "test_access_token"
                mock_api.return_value = mock_account_response
                
                result = await get_account_info(account_id="act_701351919139047")
                
                # Handle new return format (dictionary instead of JSON string)
                if isinstance(result, dict):
                    result_data = result
                else:
                    result_data = json.loads(result)
                
                # Verify account info is retrieved
                assert result_data["id"] == "act_701351919139047"
                assert result_data["business_country_code"] == "DE"
                
                # Verify DSA requirement detection
                assert "dsa_required" in result_data or "business_country_code" in result_data
    
    @pytest.mark.asyncio
    async def test_dsa_requirement_detection_non_dsa_region(self):
        """Test detection for non-DSA compliant regions"""
        mock_account_response = {
            "id": "act_123456789",
            "name": "Test US Account",
            "account_status": 1,
            "business_country_code": "US",  # US - not DSA compliant
            "business_city": "New York",
            "currency": "USD"
        }
        
        with patch('meta_ads_mcp.core.accounts.make_api_request', new_callable=AsyncMock) as mock_api:
            with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
                mock_auth.return_value = "test_access_token"
                mock_api.return_value = mock_account_response
                
                result = await get_account_info(account_id="act_123456789")
                
                # Handle new return format (dictionary instead of JSON string)
                if isinstance(result, dict):
                    result_data = result
                else:
                    result_data = json.loads(result)
                
                # Verify no DSA requirement for US accounts
                assert result_data["business_country_code"] == "US"
    
    @pytest.mark.asyncio
    async def test_dsa_requirement_detection_error_handling(self):
        """Test error handling when account info cannot be retrieved"""
        with patch('meta_ads_mcp.core.accounts.make_api_request', new_callable=AsyncMock) as mock_api:
            with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
                mock_auth.return_value = "test_access_token"
                mock_api.side_effect = Exception("API Error")
                
                result = await get_account_info(account_id="act_invalid")
                
                # Handle new return format (dictionary instead of JSON string)
                if isinstance(result, dict):
                    result_data = result
                else:
                    result_data = json.loads(result)
                
                # Verify error is properly handled
                assert "error" in result_data
    
    @pytest.mark.asyncio
    async def test_account_info_requires_account_id(self):
        """Test that get_account_info requires an account_id parameter"""
        
        with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
            mock_auth.return_value = "test_access_token"
            
            # Test without account_id parameter
            result = await get_account_info(account_id=None)
            
            # Handle new return format (dictionary instead of JSON string)
            if isinstance(result, dict):
                result_data = result
            else:
                result_data = json.loads(result)
            
            # Verify error message for missing account_id
            assert "error" in result_data
            assert "Account ID is required" in result_data["error"]["message"]
            assert "Please specify an account_id parameter" in result_data["error"]["details"]
            assert "example" in result_data["error"]
    
    @pytest.mark.asyncio
    async def test_account_info_inaccessible_account_error(self):
        """Test that get_account_info provides helpful error for inaccessible accounts"""
        
        # Mock permission error for direct account access (first API call)
        mock_permission_error = {
            "error": {
                "message": "Insufficient access privileges",
                "type": "OAuthException",
                "code": 200
            }
        }
        
        # Mock accessible accounts response (second API call)
        mock_accessible_accounts = {
            "data": [
                {"id": "act_123", "name": "Test Account 1"},
                {"id": "act_456", "name": "Test Account 2"}
            ]
        }
        
        with patch('meta_ads_mcp.core.accounts.make_api_request', new_callable=AsyncMock) as mock_api:
            with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
                mock_auth.return_value = "test_access_token"
                # First call returns permission error, second call returns accessible accounts
                mock_api.side_effect = [mock_permission_error, mock_accessible_accounts]
                
                result = await get_account_info(account_id="act_inaccessible")
                
                # Handle new return format (dictionary instead of JSON string)
                if isinstance(result, dict):
                    result_data = result
                else:
                    result_data = json.loads(result)
                
                # Verify helpful error message for inaccessible account
                assert "error" in result_data
                assert "not accessible to your user account" in result_data["error"]["message"]
                assert "accessible_accounts" in result_data["error"]
                assert "suggestion" in result_data["error"]
                assert len(result_data["error"]["accessible_accounts"]) == 2


class TestDSABeneficiaryParameter:
    """Test cases for DSA beneficiary parameter support"""
    
    @pytest.mark.asyncio
    async def test_create_adset_with_dsa_beneficiary_success(self):
        """Test successful ad set creation with DSA beneficiary parameter"""
        mock_response = {
            "id": "23842588888640185",
            "name": "Test Ad Set with DSA",
            "status": "PAUSED",
            "dsa_beneficiary": "Test Organization GmbH"
        }
        
        with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
            with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
                mock_auth.return_value = "test_access_token"
                mock_api.return_value = mock_response
                
                result = await create_adset(
                    account_id="act_701351919139047",
                    campaign_id="23842588888640184",
                    name="Test Ad Set with DSA",
                    optimization_goal="LINK_CLICKS",
                    billing_event="IMPRESSIONS",
                    dsa_beneficiary="Test Organization GmbH"
                )
                
                # Verify the API was called with DSA beneficiary parameter
                mock_api.assert_called_once()
                call_args = mock_api.call_args
                assert "dsa_beneficiary" in str(call_args)
                
                # Verify response contains ad set ID
                result_data = json.loads(result)
                assert "id" in result_data
    
    @pytest.mark.asyncio
    async def test_create_adset_with_dsa_beneficiary_validation_error(self):
        """Test error handling when DSA beneficiary parameter is invalid"""
        mock_error_response = {
            "error": {
                "message": "DSA beneficiary required for European compliance",
                "type": "OAuthException",
                "code": 100
            }
        }
        
        with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
            with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
                mock_auth.return_value = "test_access_token"
                mock_api.side_effect = Exception("DSA beneficiary required for European compliance")
                
                result = await create_adset(
                    account_id="act_701351919139047",
                    campaign_id="23842588888640184",
                    name="Test Ad Set",
                    optimization_goal="LINK_CLICKS",
                    billing_event="IMPRESSIONS"
                    # No DSA beneficiary provided
                )
                
                # Verify error message is clear and actionable
                result_data = json.loads(result)
                
                # Handle response wrapped in 'data' field by meta_api_tool decorator
                if "data" in result_data:
                    actual_data = json.loads(result_data["data"])
                else:
                    actual_data = result_data
                
                assert "DSA beneficiary required" in actual_data.get("error", "")
    
    @pytest.mark.asyncio
    async def test_create_adset_without_dsa_beneficiary_dsa_required(self):
        """Test error when DSA beneficiary is required but not provided"""
        with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
            with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
                mock_auth.return_value = "test_access_token"
                mock_api.side_effect = Exception("Enter the person or organization that benefits from ads in this ad set")
                
                result = await create_adset(
                    account_id="act_701351919139047",
                    campaign_id="23842588888640184",
                    name="Test Ad Set",
                    optimization_goal="LINK_CLICKS",
                    billing_event="IMPRESSIONS"
                    # No DSA beneficiary provided
                )
                
                # Verify error message is clear and actionable
                result_data = json.loads(result)
                
                # Handle response wrapped in 'data' field by meta_api_tool decorator
                if "data" in result_data:
                    actual_data = json.loads(result_data["data"])
                else:
                    actual_data = result_data
                
                assert "benefits from ads" in actual_data.get("error", "")
    
    @pytest.mark.asyncio
    async def test_create_adset_dsa_beneficiary_in_targeting(self):
        """Test that DSA beneficiary is not added to targeting spec"""
        with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
            with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
                mock_auth.return_value = "test_access_token"
                mock_api.return_value = {"id": "23842588888640185"}
                
                result = await create_adset(
                    account_id="act_701351919139047",
                    campaign_id="23842588888640184",
                    name="Test Ad Set",
                    optimization_goal="LINK_CLICKS",
                    billing_event="IMPRESSIONS",
                    targeting={"geo_locations": {"countries": ["DE"]}},
                    dsa_beneficiary="Test Organization GmbH"
                )
                
                # Verify the API was called
                mock_api.assert_called_once()
                call_args = mock_api.call_args
                
                # Verify DSA beneficiary is sent as separate parameter, not in targeting
                call_str = str(call_args)
                assert "dsa_beneficiary" in call_str
                assert "Test Organization GmbH" in call_str
    
    @pytest.mark.asyncio
    async def test_create_adset_dsa_beneficiary_parameter_formats(self):
        """Test different formats for DSA beneficiary parameter"""
        test_cases = [
            "Simple Organization",
            "Organization with Special Chars: GmbH & Co. KG",
            "Organization with Numbers: Test123 Inc.",
            "Very Long Organization Name That Exceeds Normal Limits But Should Still Work"
        ]
        
        for beneficiary_name in test_cases:
            with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
                with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
                    mock_auth.return_value = "test_access_token"
                    mock_api.return_value = {"id": "23842588888640185"}
                    
                    result = await create_adset(
                        account_id="act_701351919139047",
                        campaign_id="23842588888640184",
                        name="Test Ad Set",
                        optimization_goal="LINK_CLICKS",
                        billing_event="IMPRESSIONS",
                        dsa_beneficiary=beneficiary_name
                    )
                    
                    # Verify the API was called with the beneficiary name
                    mock_api.assert_called_once()
                    call_args = mock_api.call_args
                    assert beneficiary_name in str(call_args)


class TestDSAPermissionHandling:
    """Test cases for permission-related DSA beneficiary issues"""
    
    @pytest.mark.asyncio
    async def test_dsa_beneficiary_missing_business_management_permission(self):
        """Test error handling when business_management permissions are missing"""
        with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
            with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
                mock_auth.return_value = "test_access_token"
                mock_api.side_effect = Exception("Permission denied: business_management permission required")
                
                result = await create_adset(
                    account_id="act_701351919139047",
                    campaign_id="23842588888640184",
                    name="Test Ad Set",
                    optimization_goal="LINK_CLICKS",
                    billing_event="IMPRESSIONS",
                    dsa_beneficiary="Test Organization GmbH"
                )
                
                # Verify permission error is handled
                result_data = json.loads(result)
                
                # Handle response wrapped in 'data' field by meta_api_tool decorator
                if "data" in result_data:
                    actual_data = json.loads(result_data["data"])
                else:
                    actual_data = result_data
                
                assert "permission" in actual_data.get("error", "").lower()
    
    @pytest.mark.asyncio
    async def test_dsa_beneficiary_api_limitation_handling(self):
        """Test handling when API doesn't support dsa_beneficiary parameter"""
        with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
            with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
                mock_auth.return_value = "test_access_token"
                mock_api.side_effect = Exception("Parameter dsa_beneficiary is not supported")
                
                result = await create_adset(
                    account_id="act_701351919139047",
                    campaign_id="23842588888640184",
                    name="Test Ad Set",
                    optimization_goal="LINK_CLICKS",
                    billing_event="IMPRESSIONS",
                    dsa_beneficiary="Test Organization GmbH"
                )
                
                # Verify API limitation error is handled
                result_data = json.loads(result)
                
                # Handle response wrapped in 'data' field by meta_api_tool decorator
                if "data" in result_data:
                    actual_data = json.loads(result_data["data"])
                else:
                    actual_data = result_data
                
                assert "not supported" in actual_data.get("error", "").lower()


class TestDSARegionalCompliance:
    """Test cases for regional DSA compliance"""
    
    @pytest.mark.asyncio
    async def test_dsa_compliance_european_regions(self):
        """Test DSA compliance for European regions"""
        european_countries = ["DE", "FR", "IT", "ES", "NL", "BE", "AT", "IE", "DK", "SE", "FI", "NO"]
        
        for country_code in european_countries:
            mock_account_response = {
                "id": f"act_{country_code.lower()}",
                "name": f"Test {country_code} Account",
                "account_status": 1,
                "business_country_code": country_code,
                "currency": "EUR"
            }
            
            with patch('meta_ads_mcp.core.accounts.make_api_request', new_callable=AsyncMock) as mock_api:
                with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
                    mock_auth.return_value = "test_access_token"
                    mock_api.return_value = mock_account_response
                    
                    result = await get_account_info(account_id=f"act_{country_code.lower()}")
                    result_data = json.loads(result)
                    
                    # Verify European countries are detected as DSA compliant
                    assert result_data["business_country_code"] == country_code
    
    @pytest.mark.asyncio
    async def test_dsa_compliance_non_european_regions(self):
        """Test DSA compliance for non-European regions"""
        non_european_countries = ["US", "CA", "AU", "JP", "BR", "IN"]
        
        for country_code in non_european_countries:
            mock_account_response = {
                "id": f"act_{country_code.lower()}",
                "name": f"Test {country_code} Account",
                "account_status": 1,
                "business_country_code": country_code,
                "currency": "USD"
            }
            
            with patch('meta_ads_mcp.core.accounts.make_api_request', new_callable=AsyncMock) as mock_api:
                with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
                    mock_auth.return_value = "test_access_token"
                    mock_api.return_value = mock_account_response
                    
                    result = await get_account_info(account_id=f"act_{country_code.lower()}")
                    result_data = json.loads(result)
                    
                    # Verify non-European countries are not DSA compliant
                    assert result_data["business_country_code"] == country_code
    
    @pytest.mark.asyncio
    async def test_dsa_beneficiary_validation_by_region(self):
        """Test DSA beneficiary validation based on region"""
        # Test European account (should require DSA beneficiary)
        european_mock_response = {
            "id": "act_de",
            "name": "German Account",
            "business_country_code": "DE"
        }
        
        # Test US account (should not require DSA beneficiary)
        us_mock_response = {
            "id": "act_us",
            "name": "US Account",
            "business_country_code": "US"
        }
        
        # Test European account
        with patch('meta_ads_mcp.core.accounts.make_api_request', new_callable=AsyncMock) as mock_api:
            with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
                mock_auth.return_value = "test_access_token"
                mock_api.return_value = european_mock_response
                
                result = await get_account_info(account_id="act_de")
                result_data = json.loads(result)
                
                assert result_data["business_country_code"] == "DE"
        
        # Test US account
        with patch('meta_ads_mcp.core.accounts.make_api_request', new_callable=AsyncMock) as mock_api:
            with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
                mock_auth.return_value = "test_access_token"
                mock_api.return_value = us_mock_response
                
                result = await get_account_info(account_id="act_us")
                result_data = json.loads(result)
                
                assert result_data["business_country_code"] == "US"


class TestDSAErrorHandling:
    """Test cases for comprehensive DSA error handling"""
    
    @pytest.mark.asyncio
    async def test_dsa_beneficiary_clear_error_message(self):
        """Test that DSA-related errors provide clear, actionable messages"""
        error_scenarios = [
            ("DSA beneficiary required for European compliance", "DSA beneficiary"),
            ("Enter the person or organization that benefits from ads", "benefits from ads"),
            ("Permission denied: business_management required", "permission"),
            ("Parameter dsa_beneficiary is not supported", "not supported")
        ]
        
        for error_message, expected_keyword in error_scenarios:
            with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
                with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
                    mock_auth.return_value = "test_access_token"
                    mock_api.side_effect = Exception(error_message)
                    
                    result = await create_adset(
                        account_id="act_701351919139047",
                        campaign_id="23842588888640184",
                        name="Test Ad Set",
                        optimization_goal="LINK_CLICKS",
                        billing_event="IMPRESSIONS"
                    )
                    
                    # Verify error message contains expected keyword
                    result_data = json.loads(result)
                    
                    # Handle response wrapped in 'data' field by meta_api_tool decorator
                    if "data" in result_data:
                        actual_data = json.loads(result_data["data"])
                    else:
                        actual_data = result_data
                    
                    assert expected_keyword.lower() in actual_data.get("error", "").lower()
    
    @pytest.mark.asyncio
    async def test_dsa_beneficiary_fallback_behavior(self):
        """Test fallback behavior for unexpected DSA-related errors"""
        with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
            with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
                mock_auth.return_value = "test_access_token"
                mock_api.side_effect = Exception("Unexpected DSA-related error")
                
                result = await create_adset(
                    account_id="act_701351919139047",
                    campaign_id="23842588888640184",
                    name="Test Ad Set",
                    optimization_goal="LINK_CLICKS",
                    billing_event="IMPRESSIONS"
                )
                
                # Verify fallback error handling
                result_data = json.loads(result)
                
                # Handle response wrapped in 'data' field by meta_api_tool decorator
                if "data" in result_data:
                    actual_data = json.loads(result_data["data"])
                else:
                    actual_data = result_data
                
                assert "error" in actual_data


class TestDSABeneficiaryRetrieval:
    """Test cases for retrieving DSA beneficiary information from ad sets"""
    
    @pytest.mark.asyncio
    async def test_get_adset_details_with_dsa_beneficiary(self):
        """Test retrieving ad set details that include DSA beneficiary field"""
        mock_adset_response = {
            "id": "120229746629010183",
            "name": "Test Ad Set with DSA",
            "campaign_id": "120229656904980183",
            "status": "PAUSED",
            "daily_budget": "1000",
            "targeting": {
                "geo_locations": {"countries": ["US"]},
                "age_min": 25,
                "age_max": 65
            },
            "bid_amount": 200,
            "optimization_goal": "LINK_CLICKS",
            "billing_event": "IMPRESSIONS",
            "dsa_beneficiary": "Test Organization Inc"
        }
        
        with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
            with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
                mock_auth.return_value = "test_access_token"
                mock_api.return_value = mock_adset_response
                
                result = await get_adset_details(adset_id="120229746629010183")
                result_data = json.loads(result)
                
                # Verify DSA beneficiary field is present and correct
                assert "dsa_beneficiary" in result_data
                assert result_data["dsa_beneficiary"] == "Test Organization Inc"
                assert result_data["id"] == "120229746629010183"
    
    @pytest.mark.asyncio
    async def test_get_adset_details_without_dsa_beneficiary(self):
        """Test retrieving ad set details that don't have DSA beneficiary field"""
        mock_adset_response = {
            "id": "120229746624860183",
            "name": "Test Ad Set without DSA",
            "campaign_id": "120229656904980183",
            "status": "PAUSED",
            "daily_budget": "1000",
            "targeting": {
                "geo_locations": {"countries": ["US"]},
                "age_min": 25,
                "age_max": 65
            },
            "bid_amount": 200,
            "optimization_goal": "LINK_CLICKS",
            "billing_event": "IMPRESSIONS"
            # No dsa_beneficiary field
        }
        
        with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
            with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
                mock_auth.return_value = "test_access_token"
                mock_api.return_value = mock_adset_response
                
                result = await get_adset_details(adset_id="120229746624860183")
                result_data = json.loads(result)
                
                # Verify ad set details are retrieved correctly
                assert result_data["id"] == "120229746624860183"
                assert "dsa_beneficiary" not in result_data  # Should not be present
    
    @pytest.mark.asyncio
    async def test_get_adset_details_empty_dsa_beneficiary(self):
        """Test retrieving ad set details with empty DSA beneficiary field"""
        mock_adset_response = {
            "id": "120229746629010183",
            "name": "Test Ad Set with Empty DSA",
            "campaign_id": "120229656904980183",
            "status": "PAUSED",
            "daily_budget": "1000",
            "targeting": {
                "geo_locations": {"countries": ["US"]}
            },
            "bid_amount": 200,
            "optimization_goal": "LINK_CLICKS",
            "billing_event": "IMPRESSIONS",
            "dsa_beneficiary": ""  # Empty string
        }
        
        with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
            with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
                mock_auth.return_value = "test_access_token"
                mock_api.return_value = mock_adset_response
                
                result = await get_adset_details(adset_id="120229746629010183")
                result_data = json.loads(result)
                
                # Verify empty DSA beneficiary field is handled correctly
                assert "dsa_beneficiary" in result_data
                assert result_data["dsa_beneficiary"] == ""
    
    @pytest.mark.asyncio
    async def test_get_adset_details_dsa_beneficiary_field_requested(self):
        """Test that the API request includes dsa_beneficiary in the fields parameter"""
        with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
            with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
                mock_auth.return_value = "test_access_token"
                mock_api.return_value = {"id": "120229746629010183"}
                
                result = await get_adset_details(adset_id="120229746629010183")
                
                # Verify the API was called with dsa_beneficiary in fields
                mock_api.assert_called_once()
                call_args = mock_api.call_args
                assert "dsa_beneficiary" in str(call_args)
    
    @pytest.mark.asyncio
    async def test_get_adset_details_error_handling(self):
        """Test error handling when retrieving ad set details fails"""
        with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
            with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
                mock_auth.return_value = "test_access_token"
                mock_api.side_effect = Exception("Ad set not found")
                
                result = await get_adset_details(adset_id="invalid_adset_id")
                
                # Handle response format - could be dict or JSON string
                if isinstance(result, dict):
                    result_data = result
                else:
                    result_data = json.loads(result)
                
                # Verify error is properly handled
                assert "error" in result_data 
```

--------------------------------------------------------------------------------
/tests/test_budget_update_e2e.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
End-to-End Budget Update Test for Meta Ads MCP

This test validates that the budget update functionality correctly updates
ad set budgets through the Meta Ads API through a pre-authenticated MCP server.

Test functions:
- update_adset (with daily_budget parameter)
- update_adset (with lifetime_budget parameter)
- update_adset (with both budget types)
- Error handling for invalid budgets
- Budget update with other parameters
"""

import requests
import json
import os
import sys
import time
from typing import Dict, Any, List

# Load environment variables from .env file
try:
    from dotenv import load_dotenv
    load_dotenv()
    print("✅ Loaded environment variables from .env file")
except ImportError:
    print("⚠️  python-dotenv not installed, using system environment variables only")

class BudgetUpdateTester:
    """Test suite focused on budget update functionality"""
    
    def __init__(self, base_url: str = "http://localhost:8080"):
        self.base_url = base_url.rstrip('/')
        self.endpoint = f"{self.base_url}/mcp/"
        self.request_id = 1
        
        # Test data for validation
        self.test_budgets = {
            "daily_budgets": ["5000", "10000", "25000"],  # $50, $100, $250
            "lifetime_budgets": ["50000", "100000", "250000"],  # $500, $1000, $2500
            "invalid_budgets": ["-1000", "0", "invalid_budget", "999999999999"]
        }
        
        # Test ad set IDs specifically created for budget testing
        self.test_adset_ids = [
            "120229734413930183",
            "120229734413930183",
            "120229734413930183",
            "120229734413930183",
            "120229734413930183",
            "120229734413930183"
        ]
        
        # Rate limiting tracking
        self.rate_limit_hit = False
        self.last_rate_limit_time = 0
    
    def _wait_for_rate_limit(self, error_msg: str) -> bool:
        """Wait if we hit rate limiting, return True if we should retry"""
        if "rate limit" in error_msg.lower() or "too many changes" in error_msg.lower():
            if not self.rate_limit_hit:
                print(f"   ⏳ Rate limit hit! Waiting 1 hour before continuing...")
                print(f"      • Meta Ads API allows only 4 budget changes per hour")
                print(f"      • You can manually continue by pressing Enter when ready")
                self.rate_limit_hit = True
                self.last_rate_limit_time = time.time()
                
                # Wait for user input or 1 hour
                try:
                    input("   Press Enter when ready to continue (or wait 1 hour)...")
                    print("   ✅ Continuing with tests...")
                    return True
                except KeyboardInterrupt:
                    print("   ❌ Test interrupted by user")
                    return False
            else:
                print(f"   ⏳ Still rate limited, waiting...")
                return False
        return False

    def _make_request(self, method: str, params: Dict[str, Any] = None, 
                     headers: Dict[str, str] = None) -> Dict[str, Any]:
        """Make a JSON-RPC request to the MCP server"""
        
        default_headers = {
            "Content-Type": "application/json",
            "Accept": "application/json, text/event-stream",
            "User-Agent": "Budget-Update-Test-Client/1.0"
        }
        
        if headers:
            default_headers.update(headers)
        
        payload = {
            "jsonrpc": "2.0",
            "method": method,
            "id": self.request_id
        }
        
        if params:
            payload["params"] = params
        
        try:
            response = requests.post(
                self.endpoint,
                headers=default_headers,
                json=payload,
                timeout=15
            )
            
            self.request_id += 1
            
            return {
                "status_code": response.status_code,
                "headers": dict(response.headers),
                "json": response.json() if response.status_code == 200 else None,
                "text": response.text,
                "success": response.status_code == 200
            }
            
        except requests.exceptions.RequestException as e:
            return {
                "status_code": 0,
                "headers": {},
                "json": None,
                "text": str(e),
                "success": False,
                "error": str(e)
            }

    def test_daily_budget_update(self) -> Dict[str, Any]:
        """Test daily budget update functionality"""
        
        print(f"\n💰 Testing daily budget update function")
        results = {}
        
        for budget in self.test_budgets["daily_budgets"]:
            print(f"   💰 Updating daily budget to: ${int(budget)/100:.2f}")
            
            # Retry logic for rate limiting
            max_retries = 3
            for attempt in range(max_retries):
                result = self._make_request("tools/call", {
                    "name": "update_adset",
                    "arguments": {
                        "adset_id": self.test_adset_ids[0],
                        "daily_budget": budget
                    }
                })
                
                if not result["success"]:
                    results[budget] = {
                        "success": False,
                        "error": result.get("text", "Unknown error")
                    }
                    print(f"   ❌ Failed: {result.get('text', 'Unknown error')}")
                    break
                
                # Parse response
                response_data = result["json"]["result"]
                content = response_data.get("content", [{}])[0].get("text", "")
                
                try:
                    parsed_content = json.loads(content)
                    
                    # Check for successful update indicators
                    has_id = "id" in parsed_content
                    has_daily_budget = "daily_budget" in parsed_content
                    has_success = "success" in parsed_content
                    has_error = "error" in parsed_content
                    
                    # Handle rate limiting and API errors
                    if has_error:
                        error_msg = parsed_content.get("error", "")
                        if "rate limit" in error_msg.lower() or "too many changes" in error_msg.lower():
                            if attempt < max_retries - 1:  # Don't retry on last attempt
                                if self._wait_for_rate_limit(error_msg):
                                    print(f"   🔄 Retrying after rate limit...")
                                    continue
                                else:
                                    break
                            else:
                                results[budget] = {
                                    "success": True,  # Rate limiting is expected behavior
                                    "has_success": False,
                                    "has_error": True,
                                    "rate_limited": True,
                                    "error_message": error_msg
                                }
                                print(f"   ⚠️  Rate limited (expected): {error_msg}")
                                break
                        else:
                            results[budget] = {
                                "success": False,
                                "has_error": True,
                                "error_message": error_msg
                            }
                            print(f"   ❌ API Error: {error_msg}")
                            break
                    
                    results[budget] = {
                        "success": True,
                        "has_id": has_id,
                        "has_daily_budget": has_daily_budget,
                        "has_success": has_success,
                        "updated_budget": parsed_content.get("daily_budget", "N/A"),
                        "adset_id": parsed_content.get("id", "N/A")
                    }
                    
                    print(f"   ✅ Updated daily budget to ${int(budget)/100:.2f}")
                    print(f"      • Ad Set ID: {parsed_content.get('id', 'N/A')}")
                    print(f"      • Success: {parsed_content.get('success', 'N/A')}")
                    print(f"      • Raw Response: {parsed_content}")
                    
                    # Note: Meta Ads API returns {"success": true} for updates
                    # The actual updated values can be verified by fetching ad set details
                    break  # Success, exit retry loop
                    
                except json.JSONDecodeError:
                    results[budget] = {
                        "success": False,
                        "error": "Invalid JSON response",
                        "raw_content": content
                    }
                    print(f"   ❌ Invalid JSON: {content}")
                    break
        
        return results

    def test_lifetime_budget_update(self) -> Dict[str, Any]:
        """Test lifetime budget update functionality"""
        
        print(f"\n💰 Testing lifetime budget update function")
        print(f"   ⚠️  Note: Meta Ads API may reject lifetime budget updates if ad set has daily budget")
        results = {}
        
        for budget in self.test_budgets["lifetime_budgets"]:
            print(f"   💰 Updating lifetime budget to: ${int(budget)/100:.2f}")
            
            # Retry logic for rate limiting
            max_retries = 3
            for attempt in range(max_retries):
                result = self._make_request("tools/call", {
                    "name": "update_adset",
                    "arguments": {
                        "adset_id": self.test_adset_ids[1],
                        "lifetime_budget": budget
                    }
                })
                
                if not result["success"]:
                    results[budget] = {
                        "success": False,
                        "error": result.get("text", "Unknown error")
                    }
                    print(f"   ❌ Failed: {result.get('text', 'Unknown error')}")
                    break
                
                # Parse response
                response_data = result["json"]["result"]
                content = response_data.get("content", [{}])[0].get("text", "")
                
                try:
                    parsed_content = json.loads(content)
                    
                    # Check for successful update indicators
                    has_id = "id" in parsed_content
                    has_lifetime_budget = "lifetime_budget" in parsed_content
                    has_success = "success" in parsed_content
                    has_error = "error" in parsed_content
                    
                    # Handle rate limiting and API errors
                    if has_error:
                        error_msg = parsed_content.get("error", "")
                        if "rate limit" in error_msg.lower() or "too many changes" in error_msg.lower():
                            if attempt < max_retries - 1:  # Don't retry on last attempt
                                if self._wait_for_rate_limit(error_msg):
                                    print(f"   🔄 Retrying after rate limit...")
                                    continue
                                else:
                                    break
                            else:
                                results[budget] = {
                                    "success": True,  # Rate limiting is expected behavior
                                    "has_success": False,
                                    "has_error": True,
                                    "rate_limited": True,
                                    "error_message": error_msg
                                }
                                print(f"   ⚠️  Rate limited (expected): {error_msg}")
                                break
                        elif "should be recurring budget" in error_msg.lower() or "cannot switch" in error_msg.lower():
                            results[budget] = {
                                "success": False,
                                "has_error": True,
                                "api_limitation": "Cannot switch from daily to lifetime budget",
                                "error_message": error_msg
                            }
                            print(f"   ⚠️  API Limitation: {error_msg}")
                            break
                        else:
                            results[budget] = {
                                "success": False,
                                "has_error": True,
                                "error_message": error_msg
                            }
                            print(f"   ❌ API Error: {error_msg}")
                            break
                    
                    results[budget] = {
                        "success": True,
                        "has_id": has_id,
                        "has_lifetime_budget": has_lifetime_budget,
                        "has_success": has_success,
                        "updated_budget": parsed_content.get("lifetime_budget", "N/A"),
                        "adset_id": parsed_content.get("id", "N/A")
                    }
                    
                    print(f"   ✅ Updated lifetime budget to ${int(budget)/100:.2f}")
                    print(f"      • Ad Set ID: {parsed_content.get('id', 'N/A')}")
                    print(f"      • Success: {parsed_content.get('success', 'N/A')}")
                    
                    # Note: Meta Ads API returns {"success": true} for updates
                    # The actual updated values can be verified by fetching ad set details
                    break  # Success, exit retry loop
                    
                except json.JSONDecodeError:
                    results[budget] = {
                        "success": False,
                        "error": "Invalid JSON response",
                        "raw_content": content
                    }
                    print(f"   ❌ Invalid JSON: {content}")
                    break
        
        return results

    def test_both_budget_types_update(self) -> Dict[str, Any]:
        """Test updating both daily and lifetime budget simultaneously"""
        
        print(f"\n💰 Testing both budget types update function")
        print(f"   ⚠️  Note: Meta Ads API may reject this if ad set has existing daily budget")
        
        daily_budget = "15000"  # $150
        lifetime_budget = "150000"  # $1500
        
        print(f"   💰 Updating both budgets - Daily: ${int(daily_budget)/100:.2f}, Lifetime: ${int(lifetime_budget)/100:.2f}")
        
        result = self._make_request("tools/call", {
            "name": "update_adset",
                                "arguments": {
                        "adset_id": self.test_adset_ids[2],
                        "daily_budget": daily_budget,
                        "lifetime_budget": lifetime_budget
                    }
        })
        
        if not result["success"]:
            return {
                "success": False,
                "error": result.get("text", "Unknown error")
            }
        
        # Parse response
        response_data = result["json"]["result"]
        content = response_data.get("content", [{}])[0].get("text", "")
        
        try:
            parsed_content = json.loads(content)
            
            if "error" in parsed_content:
                error_msg = parsed_content.get("error", "")
                if "rate limit" in error_msg.lower() or "too many changes" in error_msg.lower():
                    return {
                        "success": True,  # Rate limiting is expected behavior
                        "rate_limited": True,
                        "error_message": error_msg
                    }
                else:
                    return {
                        "success": False,
                        "error": error_msg,
                        "api_limitation": "Cannot have both daily and lifetime budgets"
                    }
            
            # Check for successful update indicators
            has_id = "id" in parsed_content
            has_daily_budget = "daily_budget" in parsed_content
            has_lifetime_budget = "lifetime_budget" in parsed_content
            has_success = "success" in parsed_content
            
            result_data = {
                "success": True,
                "has_id": has_id,
                "has_daily_budget": has_daily_budget,
                "has_lifetime_budget": has_lifetime_budget,
                "has_success": has_success,
                "daily_budget": parsed_content.get("daily_budget", "N/A"),
                "lifetime_budget": parsed_content.get("lifetime_budget", "N/A"),
                "adset_id": parsed_content.get("id", "N/A")
            }
            
            print(f"   ✅ Updated both budgets successfully")
            print(f"      • Ad Set ID: {parsed_content.get('id', 'N/A')}")
            print(f"      • Success: {parsed_content.get('success', 'N/A')}")
            
            # Note: Meta Ads API returns {"success": true} for updates
            # The actual updated values can be verified by fetching ad set details
            
            return result_data
            
        except json.JSONDecodeError:
            return {
                "success": False,
                "error": "Invalid JSON response",
                "raw_content": content
            }

    def test_budget_update_with_other_parameters(self) -> Dict[str, Any]:
        """Test budget update combined with other parameters"""
        
        print(f"\n💰 Testing budget update with other parameters")
        
        result = self._make_request("tools/call", {
            "name": "update_adset",
            "arguments": {
                "adset_id": self.test_adset_ids[3],
                "daily_budget": "7500",  # $75
                "status": "PAUSED",
                "bid_amount": 1000,
                "bid_strategy": "LOWEST_COST_WITH_BID_CAP"
            }
        })
        
        if not result["success"]:
            return {
                "success": False,
                "error": result.get("text", "Unknown error")
            }
        
        # Parse response
        response_data = result["json"]["result"]
        content = response_data.get("content", [{}])[0].get("text", "")
        
        try:
            parsed_content = json.loads(content)
            
            if "error" in parsed_content:
                error_msg = parsed_content.get("error", "")
                if "rate limit" in error_msg.lower() or "too many changes" in error_msg.lower():
                    return {
                        "success": True,  # Rate limiting is expected behavior
                        "rate_limited": True,
                        "error_message": error_msg
                    }
                else:
                    return {
                        "success": False,
                        "error": error_msg
                    }
            
            # Check for successful update indicators
            has_id = "id" in parsed_content
            has_daily_budget = "daily_budget" in parsed_content
            has_status = "status" in parsed_content
            has_success = "success" in parsed_content
            
            result_data = {
                "success": True,
                "has_id": has_id,
                "has_daily_budget": has_daily_budget,
                "has_status": has_status,
                "has_success": has_success,
                "daily_budget": parsed_content.get("daily_budget", "N/A"),
                "status": parsed_content.get("status", "N/A"),
                "adset_id": parsed_content.get("id", "N/A")
            }
            
            print(f"   ✅ Updated budget with other parameters successfully")
            print(f"      • Ad Set ID: {parsed_content.get('id', 'N/A')}")
            print(f"      • Success: {parsed_content.get('success', 'N/A')}")
            
            # Note: Meta Ads API returns {"success": true} for updates
            # The actual updated values can be verified by fetching ad set details
            
            return result_data
            
        except json.JSONDecodeError:
            return {
                "success": False,
                "error": "Invalid JSON response",
                "raw_content": content
            }

    def test_invalid_budget_handling(self) -> Dict[str, Any]:
        """Test error handling for invalid budget values"""
        
        print(f"\n💰 Testing invalid budget handling")
        results = {}
        
        for invalid_budget in self.test_budgets["invalid_budgets"]:
            print(f"   💰 Testing invalid budget: '{invalid_budget}'")
            
            result = self._make_request("tools/call", {
                "name": "update_adset",
                "arguments": {
                    "adset_id": self.test_adset_ids[4],
                    "daily_budget": invalid_budget
                }
            })
            
            if not result["success"]:
                results[invalid_budget] = {
                    "success": False,
                    "error": result.get("text", "Unknown error")
                }
                print(f"   ❌ Request failed: {result.get('text', 'Unknown error')}")
                continue
            
            # Parse response
            response_data = result["json"]["result"]
            content = response_data.get("content", [{}])[0].get("text", "")
            
            try:
                parsed_content = json.loads(content)
                
                # For invalid budgets, we expect an error response
                has_error = "error" in parsed_content or "data" in parsed_content
                has_details = "details" in parsed_content
                
                # Check if the error is a proper validation error (not a rate limit or other issue)
                error_msg = parsed_content.get("error", "")
                if not error_msg and "data" in parsed_content:
                    try:
                        data_content = json.loads(parsed_content.get("data", ""))
                        if "error" in data_content:
                            error_msg = data_content["error"].get("message", "")
                    except:
                        pass
                
                is_validation_error = any(keyword in error_msg.lower() for keyword in [
                    "must be a number", "greater than or equal to 0", "too high", "too low", "invalid parameter",
                    "budget is too low", "budget is too high", "decrease your ad set budget"
                ])
                
                results[invalid_budget] = {
                    "success": has_error and is_validation_error,  # Success if we got proper validation error
                    "has_error": has_error,
                    "has_details": has_details,
                    "is_validation_error": is_validation_error,
                    "error_message": error_msg or parsed_content.get("error", "No error field"),
                    "details": parsed_content.get("details", "No details field")
                }
                
                if has_error and is_validation_error:
                    print(f"   ✅ Properly handled invalid budget '{invalid_budget}'")
                    print(f"      • Error: {parsed_content.get('error', 'N/A')}")
                elif has_error:
                    print(f"   ⚠️  Got error but not validation error for '{invalid_budget}'")
                    print(f"      • Error: {parsed_content.get('error', 'N/A')}")
                else:
                    print(f"   ❌ Unexpected success for invalid budget '{invalid_budget}'")
                    print(f"      • Response: {parsed_content}")
                
            except json.JSONDecodeError:
                results[invalid_budget] = {
                    "success": False,
                    "error": "Invalid JSON response",
                    "raw_content": content
                }
                print(f"   ❌ Invalid JSON: {content}")
        
        return results

    def test_budget_update_with_targeting(self) -> Dict[str, Any]:
        """Test budget update combined with targeting update"""
        
        print(f"\n💰 Testing budget update with targeting")
        
        targeting = {
            "age_min": 25,
            "age_max": 45,
            "geo_locations": {"countries": ["US", "CA"]}
        }
        
        result = self._make_request("tools/call", {
            "name": "update_adset",
            "arguments": {
                "adset_id": self.test_adset_ids[5],
                "daily_budget": "8500",  # $85
                "targeting": targeting
            }
        })
        
        if not result["success"]:
            return {
                "success": False,
                "error": result.get("text", "Unknown error")
            }
        
        # Parse response
        response_data = result["json"]["result"]
        content = response_data.get("content", [{}])[0].get("text", "")
        
        try:
            parsed_content = json.loads(content)
            
            if "error" in parsed_content:
                error_msg = parsed_content.get("error", "")
                if "rate limit" in error_msg.lower() or "too many changes" in error_msg.lower():
                    return {
                        "success": True,  # Rate limiting is expected behavior
                        "rate_limited": True,
                        "error_message": error_msg
                    }
                else:
                    return {
                        "success": False,
                        "error": error_msg
                    }
            
            # Check for successful update indicators
            has_id = "id" in parsed_content
            has_daily_budget = "daily_budget" in parsed_content
            has_success = "success" in parsed_content
            
            result_data = {
                "success": True,
                "has_id": has_id,
                "has_daily_budget": has_daily_budget,
                "has_success": has_success,
                "daily_budget": parsed_content.get("daily_budget", "N/A"),
                "adset_id": parsed_content.get("id", "N/A")
            }
            
            print(f"   ✅ Updated budget with targeting successfully")
            print(f"      • Ad Set ID: {parsed_content.get('id', 'N/A')}")
            print(f"      • Success: {parsed_content.get('success', 'N/A')}")
            
            # Note: Meta Ads API returns {"success": true} for updates
            # The actual updated values can be verified by fetching ad set details
            
            return result_data
            
        except json.JSONDecodeError:
            return {
                "success": False,
                "error": "Invalid JSON response",
                "raw_content": content
            }

    def run_budget_update_tests(self) -> bool:
        """Run comprehensive budget update tests"""
        
        print("🚀 Meta Ads Budget Update End-to-End Test Suite")
        print("="*60)
        
        # Check server availability
        try:
            response = requests.get(f"{self.base_url}/", timeout=5)
            server_running = response.status_code in [200, 404]
        except:
            server_running = False
        
        if not server_running:
            print("❌ Server is not running at", self.base_url)
            print("   Please start the server with:")
            print("   python3 -m meta_ads_mcp --transport streamable-http --port 8080")
            return False
        
        print("✅ Server is running")
        print("🔐 Using implicit authentication from server")
        print("⚠️  Note: This test uses ad sets specifically created for budget testing")
        print("⚠️  Note: Campaign uses ad set level budgets - testing budget updates at ad set level")
        print("⚠️  Note: Meta Ads API allows only 4 budget changes per hour - test will wait if rate limited")
        
        # Test 1: Daily Budget Updates
        print("\n" + "="*60)
        print("📋 PHASE 1: Testing Daily Budget Updates")
        print("="*60)
        
        daily_results = self.test_daily_budget_update()
        daily_success = any(
            result.get("success") or 
            (result.get("success") and result.get("rate_limited"))
            for result in daily_results.values()
        )
        
        # Test 2: Lifetime Budget Updates
        print("\n" + "="*60)
        print("📋 PHASE 2: Testing Lifetime Budget Updates")
        print("="*60)
        
        lifetime_results = self.test_lifetime_budget_update()
        lifetime_success = any(
            result.get("success") or 
            (result.get("success") and result.get("rate_limited")) or
            (not result.get("success") and result.get("api_limitation"))
            for result in lifetime_results.values()
        )
        
        # Test 3: Both Budget Types
        print("\n" + "="*60)
        print("📋 PHASE 3: Testing Both Budget Types")
        print("="*60)
        
        both_budgets_result = self.test_both_budget_types_update()
        both_budgets_success = (both_budgets_result.get("success") or
                              (not both_budgets_result.get("success") and 
                               both_budgets_result.get("rate_limited")) or
                              (not both_budgets_result.get("success") and 
                               both_budgets_result.get("api_limitation")))
        
        # Test 4: Budget with Other Parameters
        print("\n" + "="*60)
        print("📋 PHASE 4: Testing Budget with Other Parameters")
        print("="*60)
        
        other_params_result = self.test_budget_update_with_other_parameters()
        other_params_success = (other_params_result.get("success") or
                              (other_params_result.get("success") and 
                               other_params_result.get("rate_limited")))
        
        # Test 5: Invalid Budget Handling
        print("\n" + "="*60)
        print("📋 PHASE 5: Testing Invalid Budget Handling")
        print("="*60)
        
        invalid_results = self.test_invalid_budget_handling()
        invalid_success = any(
            result.get("success") and result.get("is_validation_error") 
            for result in invalid_results.values()
        )
        
        # Test 6: Budget with Targeting
        print("\n" + "="*60)
        print("📋 PHASE 6: Testing Budget with Targeting")
        print("="*60)
        
        targeting_result = self.test_budget_update_with_targeting()
        targeting_success = (targeting_result.get("success") or
                           (targeting_result.get("success") and 
                            targeting_result.get("rate_limited")))
        
        # Final assessment
        print("\n" + "="*60)
        print("📊 FINAL RESULTS")
        print("="*60)
        
        all_tests = [
            ("Daily Budget Updates", daily_success),
            ("Lifetime Budget Updates", lifetime_success),
            ("Both Budget Types", both_budgets_success),
            ("Budget with Other Parameters", other_params_success),
            ("Invalid Budget Handling", invalid_success),
            ("Budget with Targeting", targeting_success)
        ]
        
        passed_tests = sum(1 for _, success in all_tests if success)
        total_tests = len(all_tests)
        
        for test_name, success in all_tests:
            status = "✅ PASSED" if success else "❌ FAILED"
            print(f"   • {test_name}: {status}")
        
        overall_success = passed_tests >= 4  # At least 4 out of 6 tests should pass
        
        if overall_success:
            print(f"\n✅ Budget update tests: SUCCESS ({passed_tests}/{total_tests} passed)")
            print("   • Core budget update functionality is working")
            print("   • Meta Ads API integration is functional")
            print("   • Error handling is working properly")
            return True
        else:
            print(f"\n❌ Budget update tests: FAILED ({passed_tests}/{total_tests} passed)")
            print("   • Some budget update functions are not working properly")
            print("   • Check API permissions and ad set IDs")
            return False


def main():
    """Main test execution"""
    tester = BudgetUpdateTester()
    success = tester.run_budget_update_tests()
    
    if success:
        print("\n🎉 All budget update tests passed!")
    else:
        print("\n⚠️  Some budget update tests failed - see details above")
    
    sys.exit(0 if success else 1)


if __name__ == "__main__":
    main() 
```

--------------------------------------------------------------------------------
/tests/test_dynamic_creatives.py:
--------------------------------------------------------------------------------

```python
"""Tests for dynamic creative features including multiple headlines and descriptions.

Tests for the enhanced create_ad_creative function that supports:
- Multiple headlines
- Multiple descriptions  
- Dynamic creative optimization settings
- Creative update functionality
"""

import pytest
import json
from unittest.mock import AsyncMock, patch
from meta_ads_mcp.core.ads import create_ad_creative, update_ad_creative


@pytest.mark.asyncio
class TestDynamicCreatives:
    """Test cases for dynamic creative features."""
    
    async def test_create_ad_creative_single_headline(self):
        """Test creating ad creative with single headline (simple case)."""
        
        sample_creative_data = {
            "id": "123456789",
            "name": "Test Creative",
            "status": "ACTIVE"
        }
        
        with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
            mock_api.return_value = sample_creative_data
            
            result = await create_ad_creative(
                access_token="test_token",
                account_id="act_123456789",
                name="Test Creative",
                image_hash="abc123",
                page_id="987654321",
                link_url="https://example.com",
                message="Test message",
                headline="Single Headline",
                call_to_action_type="LEARN_MORE"
            )
            
            result_data = json.loads(result)
            assert result_data["success"] is True
            
            # Verify the API call was made with single headline in object_story_spec
            call_args_list = mock_api.call_args_list
            assert len(call_args_list) >= 1
            
            # First call should be the creative creation
            first_call = call_args_list[0]
            creative_data = first_call[0][2]  # params is the third argument
            
            # Should use object_story_spec with link_data for simple creatives (not asset_feed_spec)
            assert "object_story_spec" in creative_data
            assert "link_data" in creative_data["object_story_spec"]
            assert creative_data["object_story_spec"]["link_data"]["name"] == "Single Headline"
            assert "asset_feed_spec" not in creative_data
    
    async def test_create_ad_creative_single_description(self):
        """Test creating ad creative with single description (simple case)."""
        
        sample_creative_data = {
            "id": "123456789",
            "name": "Test Creative",
            "status": "ACTIVE"
        }
        
        with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
            mock_api.return_value = sample_creative_data
            
            result = await create_ad_creative(
                access_token="test_token",
                account_id="act_123456789",
                name="Test Creative",
                image_hash="abc123",
                page_id="987654321",
                link_url="https://example.com",
                message="Test message",
                description="Single Description",
                call_to_action_type="LEARN_MORE"
            )
            
            result_data = json.loads(result)
            assert result_data["success"] is True
            
            # Verify the API call was made with single description in object_story_spec
            call_args_list = mock_api.call_args_list
            assert len(call_args_list) >= 1
            
            # First call should be the creative creation
            first_call = call_args_list[0]
            creative_data = first_call[0][2]  # params is the third argument
            
            # Should use object_story_spec with link_data for simple creatives (not asset_feed_spec)
            assert "object_story_spec" in creative_data
            assert "link_data" in creative_data["object_story_spec"]
            assert creative_data["object_story_spec"]["link_data"]["description"] == "Single Description"
            assert "asset_feed_spec" not in creative_data
    
    async def test_create_ad_creative_cannot_mix_headline_and_headlines(self):
        """Test that mixing headline and headlines parameters raises error."""
        
        result = await create_ad_creative(
            access_token="test_token",
            account_id="act_123456789",
            name="Test Creative",
            image_hash="abc123",
            page_id="987654321",
            link_url="https://example.com",
            message="Test message",
            headline="Single Headline",
            headlines=["Headline 1", "Headline 2"],
            call_to_action_type="LEARN_MORE"
        )
        
        result_data = json.loads(result)
        
        # Check if error is wrapped in "data" field (MCP error response format)
        if "data" in result_data:
            error_data = json.loads(result_data["data"])
            assert "error" in error_data
            assert "Cannot specify both 'headline' and 'headlines'" in error_data["error"]
        else:
            assert "error" in result_data
            assert "Cannot specify both 'headline' and 'headlines'" in result_data["error"]
    
    async def test_create_ad_creative_cannot_mix_description_and_descriptions(self):
        """Test that mixing description and descriptions parameters raises error."""
        
        result = await create_ad_creative(
            access_token="test_token",
            account_id="act_123456789",
            name="Test Creative",
            image_hash="abc123",
            page_id="987654321",
            link_url="https://example.com",
            message="Test message",
            description="Single Description",
            descriptions=["Description 1", "Description 2"],
            call_to_action_type="LEARN_MORE"
        )
        
        result_data = json.loads(result)
        
        # Check if error is wrapped in "data" field (MCP error response format)
        if "data" in result_data:
            error_data = json.loads(result_data["data"])
            assert "error" in error_data
            assert "Cannot specify both 'description' and 'descriptions'" in error_data["error"]
        else:
            assert "error" in result_data
            assert "Cannot specify both 'description' and 'descriptions'" in result_data["error"]

    
    async def test_create_ad_creative_multiple_headlines(self):
        """Test creating ad creative with multiple headlines."""
        
        sample_creative_data = {
            "id": "123456789",
            "name": "Test Creative",
            "status": "ACTIVE"
        }
        
        with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
            mock_api.return_value = sample_creative_data
            
            result = await create_ad_creative(
                access_token="test_token",
                account_id="act_123456789",
                name="Test Creative",
                image_hash="abc123",
                page_id="987654321",
                link_url="https://example.com",
                message="Test message",
                headlines=["Headline 1", "Headline 2", "Headline 3"],
                call_to_action_type="LEARN_MORE"
            )
            
            result_data = json.loads(result)
            assert result_data["success"] is True
            
            # Verify the API call was made with multiple headlines
            # We need to check the first call (creative creation), not the second call (details fetch)
            call_args_list = mock_api.call_args_list
            assert len(call_args_list) >= 1
            
            # First call should be the creative creation
            first_call = call_args_list[0]
            creative_data = first_call[0][2]  # params is the third argument
            
            # Should use asset_feed_spec with headlines array format
            assert "asset_feed_spec" in creative_data
            assert "headlines" in creative_data["asset_feed_spec"]
            assert creative_data["asset_feed_spec"]["headlines"] == [
                {"text": "Headline 1"}, 
                {"text": "Headline 2"}, 
                {"text": "Headline 3"}
            ]
    
    async def test_create_ad_creative_multiple_descriptions(self):
        """Test creating ad creative with multiple descriptions."""
        
        sample_creative_data = {
            "id": "123456789",
            "name": "Test Creative",
            "status": "ACTIVE"
        }
        
        with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
            mock_api.return_value = sample_creative_data
            
            result = await create_ad_creative(
                access_token="test_token",
                account_id="act_123456789",
                name="Test Creative",
                image_hash="abc123",
                page_id="987654321",
                link_url="https://example.com",
                message="Test message",
                descriptions=["Description 1", "Description 2"],
                call_to_action_type="LEARN_MORE"
            )
            
            result_data = json.loads(result)
            assert result_data["success"] is True
            
            # Verify the API call was made with multiple descriptions
            # We need to check the first call (creative creation), not the second call (details fetch)
            call_args_list = mock_api.call_args_list
            assert len(call_args_list) >= 1
            
            # First call should be the creative creation
            first_call = call_args_list[0]
            creative_data = first_call[0][2]  # params is the third argument
            
            # Should use asset_feed_spec with descriptions array format
            assert "asset_feed_spec" in creative_data
            assert "descriptions" in creative_data["asset_feed_spec"]
            assert creative_data["asset_feed_spec"]["descriptions"] == [
                {"text": "Description 1"}, 
                {"text": "Description 2"}
            ]
    
    async def test_create_ad_creative_dynamic_creative_spec(self):
        """Test creating ad creative with dynamic_creative_spec optimization settings."""
        
        sample_creative_data = {
            "id": "123456789",
            "name": "Test Creative",
            "status": "ACTIVE"
        }
        
        with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
            mock_api.return_value = sample_creative_data
            
            result = await create_ad_creative(
                access_token="test_token",
                account_id="act_123456789",
                name="Test Creative",
                image_hash="abc123",
                page_id="987654321",
                link_url="https://example.com",
                message="Test message",
                headlines=["Headline 1", "Headline 2"],
                descriptions=["Description 1", "Description 2"],
                dynamic_creative_spec={
                    "headline_optimization": True,
                    "description_optimization": True
                },
                call_to_action_type="LEARN_MORE"
            )
            
            result_data = json.loads(result)
            assert result_data["success"] is True
            
            # Verify the API call was made with dynamic_creative_spec
            # We need to check the first call (creative creation), not the second call (details fetch)
            call_args_list = mock_api.call_args_list
            assert len(call_args_list) >= 1
            
            # First call should be the creative creation
            first_call = call_args_list[0]
            creative_data = first_call[0][2]  # params is the third argument
            
            # Should include dynamic_creative_spec
            assert "dynamic_creative_spec" in creative_data
            assert creative_data["dynamic_creative_spec"]["headline_optimization"] is True
            assert creative_data["dynamic_creative_spec"]["description_optimization"] is True
    
    async def test_create_ad_creative_multiple_headlines_and_descriptions(self):
        """Test creating ad creative with both multiple headlines and descriptions."""
        
        sample_creative_data = {
            "id": "123456789",
            "name": "Test Creative",
            "status": "ACTIVE"
        }
        
        with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
            mock_api.return_value = sample_creative_data
            
            result = await create_ad_creative(
                access_token="test_token",
                account_id="act_123456789",
                name="Test Creative",
                image_hash="abc123",
                page_id="987654321",
                link_url="https://example.com",
                message="Test message",
                headlines=["Headline 1", "Headline 2", "Headline 3"],
                descriptions=["Description 1", "Description 2"],
                dynamic_creative_spec={
                    "headline_optimization": True,
                    "description_optimization": True
                },
                call_to_action_type="LEARN_MORE"
            )
            
            result_data = json.loads(result)
            assert result_data["success"] is True
            
            # Verify the API call was made with both multiple headlines and descriptions
            # We need to check the first call (creative creation), not the second call (details fetch)
            call_args_list = mock_api.call_args_list
            assert len(call_args_list) >= 1
            
            # First call should be the creative creation
            first_call = call_args_list[0]
            creative_data = first_call[0][2]  # params is the third argument
            
            assert "asset_feed_spec" in creative_data
            assert "headlines" in creative_data["asset_feed_spec"]
            assert "descriptions" in creative_data["asset_feed_spec"]
            assert "dynamic_creative_spec" in creative_data
    
    async def test_create_ad_creative_validation_max_headlines(self):
        """Test validation for maximum number of headlines."""
        
        # Create list with more than 5 headlines (assuming 5 is the limit)
        too_many_headlines = [f"Headline {i}" for i in range(6)]
        
        result = await create_ad_creative(
            access_token="test_token",
            account_id="act_123456789",
            name="Test Creative",
            image_hash="abc123",
            page_id="987654321",
            headlines=too_many_headlines
        )
        
        result_data = json.loads(result)
        # The error might be wrapped in a 'data' field
        if "data" in result_data:
            error_data = json.loads(result_data["data"])
            assert "error" in error_data
            assert "maximum" in error_data["error"].lower() or "limit" in error_data["error"].lower()
        else:
            assert "error" in result_data
            assert "maximum" in result_data["error"].lower() or "limit" in result_data["error"].lower()
    
    async def test_create_ad_creative_validation_max_descriptions(self):
        """Test validation for maximum number of descriptions."""
        
        # Create list with more than 5 descriptions (assuming 5 is the limit)
        too_many_descriptions = [f"Description {i}" for i in range(6)]
        
        result = await create_ad_creative(
            access_token="test_token",
            account_id="act_123456789",
            name="Test Creative",
            image_hash="abc123",
            page_id="987654321",
            descriptions=too_many_descriptions
        )
        
        result_data = json.loads(result)
        # The error might be wrapped in a 'data' field
        if "data" in result_data:
            error_data = json.loads(result_data["data"])
            assert "error" in error_data
            assert "maximum" in error_data["error"].lower() or "limit" in error_data["error"].lower()
        else:
            assert "error" in result_data
            assert "maximum" in result_data["error"].lower() or "limit" in result_data["error"].lower()
    

    
    async def test_update_ad_creative_add_headlines(self):
        """Test updating an existing creative to add multiple headlines."""
        
        sample_creative_data = {
            "id": "123456789",
            "name": "Updated Creative",
            "status": "ACTIVE"
        }
        
        with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
            mock_api.return_value = sample_creative_data
            
            result = await update_ad_creative(
                access_token="test_token",
                creative_id="123456789",
                headlines=["New Headline 1", "New Headline 2", "New Headline 3"]
            )
            
            result_data = json.loads(result)
            assert result_data["success"] is True
            
            # Verify the API call was made with updated headlines
            # We need to check the first call (creative update), not the second call (details fetch)
            call_args_list = mock_api.call_args_list
            assert len(call_args_list) >= 1
            
            # First call should be the creative update
            first_call = call_args_list[0]
            creative_data = first_call[0][2]  # params is the third argument
            
            assert "asset_feed_spec" in creative_data
            assert "headlines" in creative_data["asset_feed_spec"]
            assert creative_data["asset_feed_spec"]["headlines"] == [
                {"text": "New Headline 1"}, 
                {"text": "New Headline 2"}, 
                {"text": "New Headline 3"}
            ]
    
    async def test_update_ad_creative_add_descriptions(self):
        """Test updating an existing creative to add multiple descriptions."""
        
        sample_creative_data = {
            "id": "123456789",
            "name": "Updated Creative",
            "status": "ACTIVE"
        }
        
        with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
            mock_api.return_value = sample_creative_data
            
            result = await update_ad_creative(
                access_token="test_token",
                creative_id="123456789",
                descriptions=["New Description 1", "New Description 2"]
            )
            
            result_data = json.loads(result)
            assert result_data["success"] is True
            
            # Verify the API call was made with updated descriptions
            # We need to check the first call (creative update), not the second call (details fetch)
            call_args_list = mock_api.call_args_list
            assert len(call_args_list) >= 1
            
            # First call should be the creative update
            first_call = call_args_list[0]
            creative_data = first_call[0][2]  # params is the third argument
            
            assert "asset_feed_spec" in creative_data
            assert "descriptions" in creative_data["asset_feed_spec"]
            assert creative_data["asset_feed_spec"]["descriptions"] == [
                {"text": "New Description 1"}, 
                {"text": "New Description 2"}
            ]
    
    async def test_update_ad_creative_update_dynamic_spec(self):
        """Test updating an existing creative's dynamic_creative_spec."""
        
        sample_creative_data = {
            "id": "123456789",
            "name": "Updated Creative",
            "status": "ACTIVE"
        }
        
        with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
            mock_api.return_value = sample_creative_data
            
            result = await update_ad_creative(
                access_token="test_token",
                creative_id="123456789",
                dynamic_creative_spec={
                    "headline_optimization": True,
                    "description_optimization": False
                }
            )
            
            result_data = json.loads(result)
            assert result_data["success"] is True
            
            # Verify the API call was made with updated dynamic_creative_spec
            # We need to check the first call (creative update), not the second call (details fetch)
            call_args_list = mock_api.call_args_list
            assert len(call_args_list) >= 1
            
            # First call should be the creative update
            first_call = call_args_list[0]
            creative_data = first_call[0][2]  # params is the third argument
            
            assert "dynamic_creative_spec" in creative_data
            assert creative_data["dynamic_creative_spec"]["headline_optimization"] is True
            assert creative_data["dynamic_creative_spec"]["description_optimization"] is False
    
    async def test_update_ad_creative_no_creative_id(self):
        """Test update_ad_creative with empty creative_id provided."""
        
        result = await update_ad_creative(
            creative_id="",  # Now provide the required parameter but with empty value
            access_token="test_token",
            headlines=["New Headline"]
        )
        
        result_data = json.loads(result)
        # The error might be wrapped in a 'data' field
        if "data" in result_data:
            error_data = json.loads(result_data["data"])
            assert "error" in error_data
            assert "creative id" in error_data["error"].lower()
        else:
            assert "error" in result_data
            assert "creative id" in result_data["error"].lower()
    
    async def test_update_ad_creative_cannot_mix_headline_and_headlines(self):
        """Test that mixing headline and headlines parameters raises error in update."""
        
        result = await update_ad_creative(
            access_token="test_token",
            creative_id="123456789",
            headline="Single Headline",
            headlines=["Headline 1", "Headline 2"]
        )
        
        result_data = json.loads(result)
        
        # Check if error is wrapped in "data" field (MCP error response format)
        if "data" in result_data:
            error_data = json.loads(result_data["data"])
            assert "error" in error_data
            assert "Cannot specify both 'headline' and 'headlines'" in error_data["error"]
        else:
            assert "error" in result_data
            assert "Cannot specify both 'headline' and 'headlines'" in result_data["error"]
    
    async def test_update_ad_creative_cannot_mix_description_and_descriptions(self):
        """Test that mixing description and descriptions parameters raises error in update."""
        
        result = await update_ad_creative(
            access_token="test_token",
            creative_id="123456789",
            description="Single Description",
            descriptions=["Description 1", "Description 2"]
        )
        
        result_data = json.loads(result)
        
        # Check if error is wrapped in "data" field (MCP error response format)
        if "data" in result_data:
            error_data = json.loads(result_data["data"])
            assert "error" in error_data
            assert "Cannot specify both 'description' and 'descriptions'" in error_data["error"]
        else:
            assert "error" in result_data
            assert "Cannot specify both 'description' and 'descriptions'" in result_data["error"]

    async def test_update_ad_creative_validation_max_headlines(self):
        """Test validation for maximum number of headlines in update."""
        
        # Create list with more than 5 headlines (limit)
        too_many_headlines = [f"Headline {i}" for i in range(6)]
        
        result = await update_ad_creative(
            access_token="test_token",
            creative_id="123456789",
            headlines=too_many_headlines
        )
        
        result_data = json.loads(result)
        # The error might be wrapped in a 'data' field
        if "data" in result_data:
            error_data = json.loads(result_data["data"])
            assert "error" in error_data
            assert "maximum 5 headlines" in error_data["error"].lower()
        else:
            assert "error" in result_data
            assert "maximum 5 headlines" in result_data["error"].lower()
    
    async def test_update_ad_creative_validation_max_descriptions(self):
        """Test validation for maximum number of descriptions in update."""
        
        # Create list with more than 5 descriptions (limit)
        too_many_descriptions = [f"Description {i}" for i in range(6)]
        
        result = await update_ad_creative(
            access_token="test_token",
            creative_id="123456789",
            descriptions=too_many_descriptions
        )
        
        result_data = json.loads(result)
        # The error might be wrapped in a 'data' field
        if "data" in result_data:
            error_data = json.loads(result_data["data"])
            assert "error" in error_data
            assert "maximum 5 descriptions" in error_data["error"].lower()
        else:
            assert "error" in result_data
            assert "maximum 5 descriptions" in result_data["error"].lower()
    
    async def test_update_ad_creative_validation_headline_length(self):
        """Test validation for headline character limit."""
        
        # Create headline longer than 40 characters
        long_headline = "A" * 41
        
        result = await update_ad_creative(
            access_token="test_token",
            creative_id="123456789",
            headlines=[long_headline]
        )
        
        result_data = json.loads(result)
        # The error might be wrapped in a 'data' field
        if "data" in result_data:
            error_data = json.loads(result_data["data"])
            assert "error" in error_data
            assert "40 character limit" in error_data["error"]
        else:
            assert "error" in result_data
            assert "40 character limit" in result_data["error"]
    
    async def test_update_ad_creative_validation_description_length(self):
        """Test validation for description character limit."""
        
        # Create description longer than 125 characters
        long_description = "A" * 126
        
        result = await update_ad_creative(
            access_token="test_token",
            creative_id="123456789",
            descriptions=[long_description]
        )
        
        result_data = json.loads(result)
        # The error might be wrapped in a 'data' field
        if "data" in result_data:
            error_data = json.loads(result_data["data"])
            assert "error" in error_data
            assert "125 character limit" in error_data["error"]
        else:
            assert "error" in result_data
            assert "125 character limit" in result_data["error"]
    
    async def test_update_ad_creative_name_only(self):
        """Test updating just the creative name."""
        
        sample_creative_data = {
            "id": "123456789",
            "name": "Updated Creative Name",
            "status": "ACTIVE"
        }
        
        with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
            mock_api.return_value = sample_creative_data
            
            result = await update_ad_creative(
                access_token="test_token",
                creative_id="123456789",
                name="Updated Creative Name"
            )
            
            result_data = json.loads(result)
            assert result_data["success"] is True
            
            # Verify the API call was made with just the name
            call_args_list = mock_api.call_args_list
            assert len(call_args_list) >= 1
            
            first_call = call_args_list[0]
            creative_data = first_call[0][2]  # params is the third argument
            
            assert "name" in creative_data
            assert creative_data["name"] == "Updated Creative Name"
            # Should not have asset_feed_spec since no dynamic content
            assert "asset_feed_spec" not in creative_data
    
    async def test_update_ad_creative_message_only(self):
        """Test updating just the creative message."""
        
        sample_creative_data = {
            "id": "123456789",
            "name": "Test Creative",
            "status": "ACTIVE"
        }
        
        with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
            mock_api.return_value = sample_creative_data
            
            result = await update_ad_creative(
                access_token="test_token",
                creative_id="123456789",
                message="Updated message text"
            )
            
            result_data = json.loads(result)
            assert result_data["success"] is True
            
            # Verify the API call was made with object_story_spec
            call_args_list = mock_api.call_args_list
            assert len(call_args_list) >= 1
            
            first_call = call_args_list[0]
            creative_data = first_call[0][2]  # params is the third argument
            
            assert "object_story_spec" in creative_data
            assert creative_data["object_story_spec"]["link_data"]["message"] == "Updated message text"
    
    async def test_update_ad_creative_cta_with_dynamic_content(self):
        """Test updating call_to_action_type with dynamic creative content."""
        
        sample_creative_data = {
            "id": "123456789",
            "name": "Test Creative",
            "status": "ACTIVE"
        }
        
        with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
            mock_api.return_value = sample_creative_data
            
            result = await update_ad_creative(
                access_token="test_token",
                creative_id="123456789",
                headlines=["Test Headline"],
                call_to_action_type="SHOP_NOW"
            )
            
            result_data = json.loads(result)
            assert result_data["success"] is True
            
            # Verify CTA is added to asset_feed_spec when dynamic content exists
            call_args_list = mock_api.call_args_list
            assert len(call_args_list) >= 1
            
            first_call = call_args_list[0]
            creative_data = first_call[0][2]  # params is the third argument
            
            assert "asset_feed_spec" in creative_data
            assert "call_to_action_types" in creative_data["asset_feed_spec"]
            assert creative_data["asset_feed_spec"]["call_to_action_types"] == ["SHOP_NOW"]
    
    async def test_update_ad_creative_cta_without_dynamic_content(self):
        """Test updating call_to_action_type without dynamic creative content."""
        
        sample_creative_data = {
            "id": "123456789",
            "name": "Test Creative",
            "status": "ACTIVE"
        }
        
        with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
            mock_api.return_value = sample_creative_data
            
            result = await update_ad_creative(
                access_token="test_token",
                creative_id="123456789",
                call_to_action_type="LEARN_MORE"
            )
            
            result_data = json.loads(result)
            assert result_data["success"] is True
            
            # Verify CTA is added to object_story_spec when no dynamic content
            call_args_list = mock_api.call_args_list
            assert len(call_args_list) >= 1
            
            first_call = call_args_list[0]
            creative_data = first_call[0][2]  # params is the third argument
            
            assert "object_story_spec" in creative_data
            assert "call_to_action" in creative_data["object_story_spec"]["link_data"]
            assert creative_data["object_story_spec"]["link_data"]["call_to_action"]["type"] == "LEARN_MORE"
    
    async def test_update_ad_creative_combined_updates(self):
        """Test updating multiple parameters at once."""
        
        sample_creative_data = {
            "id": "123456789",
            "name": "Multi-Updated Creative",
            "status": "ACTIVE"
        }
        
        with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
            mock_api.return_value = sample_creative_data
            
            result = await update_ad_creative(
                access_token="test_token",
                creative_id="123456789",
                name="Multi-Updated Creative",
                message="Updated message",
                headlines=["New Headline 1", "New Headline 2"],
                descriptions=["New Description 1"],
                call_to_action_type="SIGN_UP"
            )
            
            result_data = json.loads(result)
            assert result_data["success"] is True
            
            # Verify all updates are included
            call_args_list = mock_api.call_args_list
            assert len(call_args_list) >= 1
            
            first_call = call_args_list[0]
            creative_data = first_call[0][2]  # params is the third argument
            
            # Should have name
            assert creative_data["name"] == "Multi-Updated Creative"
            
            # Should have asset_feed_spec with all dynamic content
            assert "asset_feed_spec" in creative_data
            assert "headlines" in creative_data["asset_feed_spec"]
            assert "descriptions" in creative_data["asset_feed_spec"]
            assert "primary_texts" in creative_data["asset_feed_spec"]
            assert "call_to_action_types" in creative_data["asset_feed_spec"]
            
            # Verify content
            assert creative_data["asset_feed_spec"]["headlines"] == [
                {"text": "New Headline 1"}, {"text": "New Headline 2"}
            ]
            assert creative_data["asset_feed_spec"]["descriptions"] == [
                {"text": "New Description 1"}
            ]
            assert creative_data["asset_feed_spec"]["primary_texts"] == [
                {"text": "Updated message"}
            ]
            assert creative_data["asset_feed_spec"]["call_to_action_types"] == ["SIGN_UP"]
    
    async def test_update_ad_creative_api_error_handling(self):
        """Test error handling when API request fails."""
        
        with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
            mock_api.side_effect = Exception("API request failed")
            
            result = await update_ad_creative(
                access_token="test_token",
                creative_id="123456789",
                name="Test Creative"
            )
            
            result_data = json.loads(result)
            # The error might be wrapped in a 'data' field
            if "data" in result_data:
                error_data = json.loads(result_data["data"])
                assert "error" in error_data
                assert error_data["error"] == "Failed to update ad creative"
                assert "details" in error_data
                assert "update_data_sent" in error_data
            else:
                assert "error" in result_data
                assert result_data["error"] == "Failed to update ad creative"
                assert "details" in result_data
                assert "update_data_sent" in result_data
 
```

--------------------------------------------------------------------------------
/tests/test_duplication_regression.py:
--------------------------------------------------------------------------------

```python
"""Comprehensive regression tests for duplication module."""

import os
import json
import pytest
import httpx
from unittest.mock import patch, AsyncMock, MagicMock
import importlib


class TestDuplicationFeatureToggle:
    """Test feature toggle functionality to prevent regression."""
    
    def test_feature_disabled_by_default(self):
        """Ensure duplication is disabled by default."""
        with patch.dict(os.environ, {}, clear=True):
            # Force reload to pick up environment changes
            import importlib
            from meta_ads_mcp.core import duplication
            importlib.reload(duplication)
            
            assert not duplication.ENABLE_DUPLICATION
            # Note: Functions may persist in module namespace due to previous test runs
            # The important thing is that ENABLE_DUPLICATION flag is False
    
    def test_feature_enabled_with_env_var(self):
        """Ensure duplication is enabled when environment variable is set."""
        with patch.dict(os.environ, {"META_ADS_ENABLE_DUPLICATION": "1"}):
            # Force reload to pick up environment changes
            import importlib
            from meta_ads_mcp.core import duplication
            importlib.reload(duplication)
            
            assert duplication.ENABLE_DUPLICATION
            assert hasattr(duplication, 'duplicate_campaign')
            assert hasattr(duplication, 'duplicate_adset')
            assert hasattr(duplication, 'duplicate_ad')
            assert hasattr(duplication, 'duplicate_creative')
    
    def test_feature_enabled_with_various_truthy_values(self):
        """Test that various truthy values enable the feature."""
        truthy_values = ["1", "true", "TRUE", "yes", "YES", "on", "ON", "enabled"]
        
        for value in truthy_values:
            with patch.dict(os.environ, {"META_ADS_ENABLE_DUPLICATION": value}):
                import importlib
                from meta_ads_mcp.core import duplication
                importlib.reload(duplication)
                
                assert duplication.ENABLE_DUPLICATION, f"Value '{value}' should enable the feature"
    
    def test_feature_disabled_with_empty_string(self):
        """Test that empty string disables the feature."""
        with patch.dict(os.environ, {"META_ADS_ENABLE_DUPLICATION": ""}):
            import importlib
            from meta_ads_mcp.core import duplication
            importlib.reload(duplication)
            
            assert not duplication.ENABLE_DUPLICATION


class TestDuplicationDecorators:
    """Test that decorators are applied correctly to prevent regression."""
    
    @pytest.fixture(autouse=True)
    def enable_feature(self):
        """Enable the duplication feature for these tests."""
        with patch.dict(os.environ, {"META_ADS_ENABLE_DUPLICATION": "1"}):
            import importlib
            from meta_ads_mcp.core import duplication
            importlib.reload(duplication)
            yield duplication
    
    def test_functions_have_meta_api_tool_decorator(self, enable_feature):
        """Ensure all duplication functions have @meta_api_tool decorator."""
        duplication = enable_feature
        
        functions = ['duplicate_campaign', 'duplicate_adset', 'duplicate_ad', 'duplicate_creative']
        
        for func_name in functions:
            func = getattr(duplication, func_name)
            
            # Check that function has been wrapped by meta_api_tool
            # The meta_api_tool decorator should add access token handling
            assert callable(func), f"{func_name} should be callable"
            
            # Check function signature includes access_token parameter
            import inspect
            sig = inspect.signature(func)
            assert 'access_token' in sig.parameters, f"{func_name} should have access_token parameter"
            assert sig.parameters['access_token'].default is None, f"{func_name} access_token should default to None"
    
    @pytest.mark.asyncio
    async def test_functions_are_mcp_tools(self, enable_feature):
        """Ensure all duplication functions are registered as MCP tools."""
        # This test ensures the @mcp_server.tool() decorator is working
        from meta_ads_mcp.core.server import mcp_server
        
        # Get all registered tool names (list_tools is async)
        tools = await mcp_server.list_tools()
        tool_names = [tool.name for tool in tools]
        
        expected_tools = ['duplicate_campaign', 'duplicate_adset', 'duplicate_ad', 'duplicate_creative']
        
        for tool_name in expected_tools:
            assert tool_name in tool_names, f"{tool_name} should be registered as an MCP tool"


class TestDuplicationAPIContract:
    """Test API contract to prevent regression in external API calls."""
    
    @pytest.fixture(autouse=True)
    def enable_feature(self):
        """Enable the duplication feature for these tests."""
        with patch.dict(os.environ, {"META_ADS_ENABLE_DUPLICATION": "1"}):
            import importlib
            from meta_ads_mcp.core import duplication
            importlib.reload(duplication)
            yield duplication
    
    @pytest.mark.asyncio
    async def test_api_endpoint_construction(self, enable_feature):
        """Test that API endpoints are constructed correctly."""
        duplication = enable_feature
        
        test_cases = [
            ("campaign", "123456789", "https://mcp.pipeboard.co/api/meta/duplicate/campaign/123456789"),
            ("adset", "987654321", "https://mcp.pipeboard.co/api/meta/duplicate/adset/987654321"),
            ("ad", "555666777", "https://mcp.pipeboard.co/api/meta/duplicate/ad/555666777"),
            ("creative", "111222333", "https://mcp.pipeboard.co/api/meta/duplicate/creative/111222333"),
        ]
        
        for resource_type, resource_id, expected_url in test_cases:
            # Mock dual-header authentication
            with patch("meta_ads_mcp.core.duplication.FastMCPAuthIntegration") as mock_auth:
                mock_auth.get_pipeboard_token.return_value = "pipeboard_token"
                mock_auth.get_auth_token.return_value = "facebook_token"
                
                with patch("meta_ads_mcp.core.duplication.httpx.AsyncClient") as mock_client:
                    mock_response = AsyncMock()
                    mock_response.status_code = 200
                    mock_response.json.return_value = {"success": True}
                    mock_client.return_value.__aenter__.return_value.post.return_value = mock_response
                    
                    await duplication._forward_duplication_request(
                        resource_type, resource_id, "test_token", {}
                    )
                    
                    # Verify the correct URL was called
                    call_args = mock_client.return_value.__aenter__.return_value.post.call_args
                    actual_url = call_args[0][0]
                    assert actual_url == expected_url, f"Expected {expected_url}, got {actual_url}"

    @pytest.mark.asyncio
    async def test_request_headers_format(self, enable_feature):
        """Test that request headers are formatted correctly."""
        duplication = enable_feature
        
        # Mock dual-header authentication
        with patch("meta_ads_mcp.core.duplication.FastMCPAuthIntegration") as mock_auth:
            mock_auth.get_pipeboard_token.return_value = "pipeboard_token_12345"
            mock_auth.get_auth_token.return_value = "facebook_token_67890"
            
            with patch("meta_ads_mcp.core.duplication.httpx.AsyncClient") as mock_client:
                mock_response = AsyncMock()
                mock_response.status_code = 200
                mock_response.json.return_value = {"success": True}
                mock_client.return_value.__aenter__.return_value.post.return_value = mock_response
                
                await duplication._forward_duplication_request(
                    "campaign", "123456789", "test_token_12345", {"name_suffix": " - Test"}
                )
                
                # Verify dual headers are sent correctly
                call_args = mock_client.return_value.__aenter__.return_value.post.call_args
                headers = call_args[1]["headers"]
                
                # Check the dual-header authentication pattern
                assert headers["Authorization"] == "Bearer facebook_token_67890"  # Facebook token for Meta API
                assert headers["X-Pipeboard-Token"] == "pipeboard_token_12345"   # Pipeboard token for auth
                assert headers["Content-Type"] == "application/json"
                assert headers["User-Agent"] == "meta-ads-mcp/1.0"

    @pytest.mark.asyncio
    async def test_request_timeout_configuration(self, enable_feature):
        """Test that request timeout is configured correctly."""
        duplication = enable_feature
        
        # Mock dual-header authentication
        with patch("meta_ads_mcp.core.duplication.FastMCPAuthIntegration") as mock_auth:
            mock_auth.get_pipeboard_token.return_value = "pipeboard_token"
            mock_auth.get_auth_token.return_value = "facebook_token"
            
            with patch("meta_ads_mcp.core.duplication.httpx.AsyncClient") as mock_client:
                mock_response = AsyncMock()
                mock_response.status_code = 200
                mock_response.json.return_value = {"success": True}
                mock_client.return_value.__aenter__.return_value.post.return_value = mock_response
                
                await duplication._forward_duplication_request(
                    "campaign", "123456789", "test_token", {}
                )
                
                # Verify timeout is set to 30 seconds
                mock_client.assert_called_once_with(timeout=30.0)


class TestDuplicationErrorHandling:
    """Test error handling to prevent regression in error scenarios."""
    
    @pytest.fixture(autouse=True)
    def enable_feature(self):
        """Enable the duplication feature for these tests."""
        with patch.dict(os.environ, {"META_ADS_ENABLE_DUPLICATION": "1"}):
            import importlib
            from meta_ads_mcp.core import duplication
            importlib.reload(duplication)
            yield duplication
    
    @pytest.mark.asyncio
    async def test_missing_access_token_error(self, enable_feature):
        """Test error handling when authentication tokens are missing."""
        duplication = enable_feature
        
        # Test missing Pipeboard token (primary authentication failure)
        with patch("meta_ads_mcp.core.duplication.FastMCPAuthIntegration") as mock_auth:
            mock_auth.get_pipeboard_token.return_value = None  # No Pipeboard token
            mock_auth.get_auth_token.return_value = "facebook_token"  # Has Facebook token
            
            result = await duplication._forward_duplication_request("campaign", "123", None, {})
            result_json = json.loads(result)
            
            assert result_json["error"] == "authentication_required"
            assert "Pipeboard API token not found" in result_json["message"]
        
        # Test missing Facebook token (secondary authentication failure)
        with patch("meta_ads_mcp.core.duplication.FastMCPAuthIntegration") as mock_auth:
            mock_auth.get_pipeboard_token.return_value = "pipeboard_token"  # Has Pipeboard token
            mock_auth.get_auth_token.return_value = None  # No Facebook token
            
            with patch("meta_ads_mcp.core.auth.get_current_access_token") as mock_get_token:
                mock_get_token.return_value = None  # No fallback token
                
                result = await duplication._forward_duplication_request("campaign", "123", None, {})
                result_json = json.loads(result)
                
                assert result_json["error"] == "authentication_required"
                assert "Meta Ads access token not found" in result_json["message"]
    
    @pytest.mark.asyncio
    async def test_http_status_code_handling(self, enable_feature):
        """Test handling of various HTTP status codes."""
        duplication = enable_feature
        
        status_code_tests = [
            (200, "success_response", "json"),
            (400, "validation_failed", "error"),
            (401, "authentication_error", "error"),
            (402, "subscription_required", "error"),
            (403, "facebook_connection_required", "error"),
            (404, "resource_not_found", "error"),
            (429, "rate_limit_exceeded", "error"),
            (502, "meta_api_error", "error"),
            (500, "duplication_failed", "error"),
        ]
        
        for status_code, expected_error_type, response_type in status_code_tests:
            with patch("meta_ads_mcp.core.duplication.httpx.AsyncClient") as mock_client, \
                 patch("meta_ads_mcp.core.duplication.FastMCPAuthIntegration.get_pipeboard_token", return_value="test_pipeboard_token"), \
                 patch("meta_ads_mcp.core.duplication.FastMCPAuthIntegration.get_auth_token", return_value="test_facebook_token"):
                # Use MagicMock instead of AsyncMock for more predictable behavior
                mock_response = MagicMock()
                mock_response.status_code = status_code
                
                if status_code == 200:
                    mock_response.json.return_value = {"success": True, "id": "new_123"}
                elif status_code == 400:
                    mock_response.json.return_value = {"errors": ["Invalid parameter"], "warnings": []}
                elif status_code == 401:
                    mock_response.json.side_effect = Exception("No JSON")
                    mock_response.text = "Unauthorized"
                elif status_code == 402:
                    mock_response.json.return_value = {
                        "message": "This feature is not available in your current plan",
                        "upgrade_url": "https://pipeboard.co/upgrade",
                        "suggestion": "Please upgrade your account to access this feature"
                    }
                elif status_code == 403:
                    mock_response.json.return_value = {
                        "message": "You need to connect your Facebook account first",
                        "details": {
                            "login_flow_url": "/connections",
                            "auth_flow_url": "/api/meta/auth"
                        }
                    }
                elif status_code == 404:
                    mock_response.json.side_effect = Exception("No JSON")
                    mock_response.text = "Not Found"
                elif status_code == 429:
                    mock_response.headers.get.return_value = "60"
                    mock_response.json.side_effect = Exception("No JSON")
                    mock_response.text = "Rate limited"
                elif status_code == 502:
                    mock_response.json.return_value = {"message": "Facebook API error"}
                else:
                    mock_response.json.side_effect = Exception("No JSON")
                    mock_response.text = f"Error {status_code}"
                
                mock_client.return_value.__aenter__.return_value.post.return_value = mock_response
                
                result = await duplication._forward_duplication_request(
                    "campaign", "123", "token", {}
                )
                result_json = json.loads(result)
                
                if response_type == "error":
                    if status_code == 401:
                        assert result_json["error"] == expected_error_type
                    elif status_code == 403:
                        assert result_json["error"] == expected_error_type
                    elif status_code == 400:
                        assert result_json["error"] == expected_error_type
                    elif status_code == 404:
                        assert result_json["error"] == expected_error_type
                    elif status_code == 502:
                        assert result_json["error"] == expected_error_type
                    else:
                        assert result_json["error"] == expected_error_type
                else:
                    assert "success" in result_json or "id" in result_json
    
    @pytest.mark.asyncio
    async def test_network_error_handling(self, enable_feature):
        """Test handling of network errors."""
        duplication = enable_feature
        
        network_errors = [
            (httpx.TimeoutException("Timeout"), "request_timeout"),
            (httpx.RequestError("Connection failed"), "network_error"),
            (Exception("Unexpected error"), "unexpected_error"),
        ]
        
        for exception, expected_error in network_errors:
            with patch("meta_ads_mcp.core.duplication.httpx.AsyncClient") as mock_client, \
                 patch("meta_ads_mcp.core.duplication.FastMCPAuthIntegration.get_pipeboard_token", return_value="test_pipeboard_token"), \
                 patch("meta_ads_mcp.core.duplication.FastMCPAuthIntegration.get_auth_token", return_value="test_facebook_token"):
                mock_client.return_value.__aenter__.return_value.post.side_effect = exception
                
                result = await duplication._forward_duplication_request(
                    "campaign", "123", "token", {}
                )
                result_json = json.loads(result)
                
                assert result_json["error"] == expected_error


class TestDuplicationParameterHandling:
    """Test parameter handling to prevent regression in data processing."""
    
    @pytest.fixture(autouse=True)
    def enable_feature(self):
        """Enable the duplication feature for these tests."""
        with patch.dict(os.environ, {"META_ADS_ENABLE_DUPLICATION": "1"}):
            import importlib
            from meta_ads_mcp.core import duplication
            importlib.reload(duplication)
            yield duplication
    
    @pytest.mark.asyncio
    async def test_none_values_filtered_from_options(self, enable_feature):
        """Test that None values are filtered from options."""
        duplication = enable_feature
        
        # Mock dual-header authentication  
        with patch("meta_ads_mcp.core.duplication.FastMCPAuthIntegration") as mock_auth:
            mock_auth.get_pipeboard_token.return_value = "pipeboard_token"
            mock_auth.get_auth_token.return_value = "facebook_token"
            
            with patch("meta_ads_mcp.core.duplication.httpx.AsyncClient") as mock_client:
                mock_response = AsyncMock()
                mock_response.status_code = 200
                mock_response.json.return_value = {"success": True}
                mock_client.return_value.__aenter__.return_value.post.return_value = mock_response
                
                # Test with options containing None values
                options_with_none = {
                    "name_suffix": " - Test",
                    "new_daily_budget": None,
                    "new_status": "PAUSED",
                    "new_headline": None,
                }
                
                await duplication._forward_duplication_request(
                    "campaign", "123", "token", options_with_none
                )
                
                # Verify None values were filtered out
                call_args = mock_client.return_value.__aenter__.return_value.post.call_args
                json_payload = call_args[1]["json"]
                
                assert "name_suffix" in json_payload
                assert "new_status" in json_payload
                assert "new_daily_budget" not in json_payload
                assert "new_headline" not in json_payload
    
    @pytest.mark.asyncio
    async def test_campaign_duplication_parameter_forwarding(self, enable_feature):
        """Test that campaign duplication forwards all parameters correctly."""
        duplication = enable_feature
        
        with patch("meta_ads_mcp.core.duplication._forward_duplication_request") as mock_forward:
            mock_forward.return_value = '{"success": true}'
            
            # Test with all parameters
            result = await duplication.duplicate_campaign(
                campaign_id="123456789",
                access_token="test_token",
                name_suffix=" - New Copy",
                include_ad_sets=False,
                include_ads=True,
                include_creatives=False,
                copy_schedule=True,
                new_daily_budget=100.50,
                new_status="ACTIVE"
            )
            
            # Verify parameters were forwarded correctly
            mock_forward.assert_called_once_with(
                "campaign",
                "123456789",
                "test_token",
                {
                    "name_suffix": " - New Copy",
                    "include_ad_sets": False,
                    "include_ads": True,
                    "include_creatives": False,
                    "copy_schedule": True,
                    "new_daily_budget": 100.50,
                    "new_status": "ACTIVE"
                }
            )
    
    @pytest.mark.asyncio
    async def test_adset_duplication_parameter_forwarding(self, enable_feature):
        """Test that ad set duplication forwards all parameters correctly including new_targeting."""
        duplication = enable_feature
        
        with patch("meta_ads_mcp.core.duplication._forward_duplication_request") as mock_forward:
            mock_forward.return_value = '{"success": true}'
            
            # Test with all parameters including new_targeting
            result = await duplication.duplicate_adset(
                adset_id="987654321",
                access_token="test_token",
                target_campaign_id="campaign_123",
                name_suffix=" - Targeted Copy",
                include_ads=False,
                include_creatives=True,
                new_daily_budget=200.00,
                new_targeting={
                    "age_min": 25,
                    "age_max": 45,
                    "geo_locations": {
                        "countries": ["US", "CA"]
                    }
                },
                new_status="ACTIVE"
            )
            
            # Verify parameters were forwarded correctly
            mock_forward.assert_called_once_with(
                "adset",
                "987654321",
                "test_token",
                {
                    "target_campaign_id": "campaign_123",
                    "name_suffix": " - Targeted Copy",
                    "include_ads": False,
                    "include_creatives": True,
                    "new_daily_budget": 200.00,
                    "new_targeting": {
                        "age_min": 25,
                        "age_max": 45,
                        "geo_locations": {
                            "countries": ["US", "CA"]
                        }
                    },
                    "new_status": "ACTIVE"
                }
            )
    
    def test_estimated_components_calculation(self, enable_feature):
        """Test that estimated components are calculated correctly."""
        duplication = enable_feature
        
        test_cases = [
            # Campaign with all components
            ("campaign", {"include_ad_sets": True, "include_ads": True, "include_creatives": True}, 
             {"campaigns": 1, "ad_sets": "3-5 (estimated)", "ads": "5-15 (estimated)", "creatives": "5-15 (estimated)"}),
            
            # Campaign with no sub-components
            ("campaign", {"include_ad_sets": False, "include_ads": False, "include_creatives": False},
             {"campaigns": 1}),
            
            # Ad set with ads
            ("adset", {"include_ads": True, "include_creatives": True},
             {"ad_sets": 1, "ads": "2-5 (estimated)", "creatives": "2-5 (estimated)"}),
            
            # Ad set without ads
            ("adset", {"include_ads": False, "include_creatives": False},
             {"ad_sets": 1}),
            
            # Single ad with creative
            ("ad", {"duplicate_creative": True},
             {"ads": 1, "creatives": 1}),
            
            # Single ad without creative
            ("ad", {"duplicate_creative": False},
             {"ads": 1}),
            
            # Single creative
            ("creative", {},
             {"creatives": 1}),
        ]
        
        for resource_type, options, expected in test_cases:
            result = duplication._get_estimated_components(resource_type, options)
            assert result == expected, f"Failed for {resource_type} with {options}"


class TestDuplicationIntegration:
    """Integration tests to prevent regression in end-to-end functionality."""
    
    @pytest.fixture(autouse=True)
    def enable_feature(self):
        """Enable the duplication feature for these tests."""
        with patch.dict(os.environ, {"META_ADS_ENABLE_DUPLICATION": "1"}):
            import importlib
            from meta_ads_mcp.core import duplication
            importlib.reload(duplication)
            yield duplication
    
    @pytest.mark.asyncio
    async def test_end_to_end_successful_duplication(self, enable_feature):
        """Test complete successful duplication flow."""
        duplication = enable_feature
        
        # Mock the auth system completely to bypass the @meta_api_tool decorator checks
        with patch("meta_ads_mcp.core.duplication.FastMCPAuthIntegration") as mock_auth_integration:
            # Mock dual authentication tokens
            mock_auth_integration.get_pipeboard_token.return_value = "pipeboard_token"
            mock_auth_integration.get_auth_token.return_value = "facebook_token"
            
            with patch("meta_ads_mcp.core.duplication.httpx.AsyncClient") as mock_client:
                        # Mock successful response
                        mock_response = MagicMock()
                        mock_response.status_code = 200
                        mock_response.json.return_value = {
                            "success": True,
                            "original_campaign_id": "123456789",
                            "new_campaign_id": "987654321",
                            "duplicated_components": {
                                "campaign": {"id": "987654321", "name": "Test Campaign - Copy"},
                                "ad_sets": [{"id": "111", "name": "Ad Set 1 - Copy"}],
                                "ads": [{"id": "222", "name": "Ad 1 - Copy"}],
                                "creatives": [{"id": "333", "name": "Creative 1 - Copy"}]
                            },
                            "warnings": [],
                            "subscription": {
                                "status": "active"
                            }
                        }
                        mock_client.return_value.__aenter__.return_value.post.return_value = mock_response
                        
                        # Call the function with explicit token
                        result = await duplication.duplicate_campaign(
                            campaign_id="123456789",
                            access_token="facebook_token",  # Use the mocked token
                            name_suffix=" - Test Copy"
                        )
                        
                        # Verify result - handle @meta_api_tool wrapper
                        result_json = json.loads(result)
                        if "data" in result_json:
                            actual_result = json.loads(result_json["data"])
                        else:
                            actual_result = result_json
                            
                        assert actual_result["success"] is True
                        assert actual_result["new_campaign_id"] == "987654321"
                        assert "duplicated_components" in actual_result
    
    @pytest.mark.asyncio
    async def test_facebook_connection_error_flow(self, enable_feature):
        """Test Facebook connection required error flow."""
        duplication = enable_feature
        
        # Mock the auth system completely to bypass the @meta_api_tool decorator checks  
        with patch("meta_ads_mcp.core.duplication.FastMCPAuthIntegration") as mock_auth_integration:
            # Mock dual authentication tokens
            mock_auth_integration.get_pipeboard_token.return_value = "pipeboard_token"
            mock_auth_integration.get_auth_token.return_value = "facebook_token"
            
            with patch("meta_ads_mcp.core.duplication.httpx.AsyncClient") as mock_client:
                        # Mock 403 response (Facebook connection required)
                        mock_response = MagicMock()
                        mock_response.status_code = 403
                        mock_response.json.return_value = {
                            "message": "You need to connect your Facebook account first",
                            "details": {
                                "login_flow_url": "/connections",
                                "auth_flow_url": "/api/meta/auth"
                            }
                        }
                        mock_client.return_value.__aenter__.return_value.post.return_value = mock_response
                        
                        result = await duplication.duplicate_campaign(
                            campaign_id="123456789",
                            access_token="facebook_token"  # Use the mocked token
                        )
                        
                        # The @meta_api_tool decorator wraps the result in a data field
                        result_json = json.loads(result)
                        if "data" in result_json:
                            actual_result = json.loads(result_json["data"])
                        else:
                            actual_result = result_json
                            
                        assert actual_result["success"] is False
                        assert actual_result["error"] == "facebook_connection_required"
                        assert actual_result["message"] == "You need to connect your Facebook account first"
                        assert "details" in actual_result
                        assert actual_result["details"]["login_flow_url"] == "/connections"
    
    @pytest.mark.asyncio
    async def test_subscription_required_error_flow(self, enable_feature):
        """Test subscription required error flow."""
        duplication = enable_feature
        
        # Mock the auth system completely to bypass the @meta_api_tool decorator checks
        with patch("meta_ads_mcp.core.duplication.FastMCPAuthIntegration") as mock_auth_integration:
            # Mock dual authentication tokens
            mock_auth_integration.get_pipeboard_token.return_value = "pipeboard_token"
            mock_auth_integration.get_auth_token.return_value = "facebook_token"
            
            with patch("meta_ads_mcp.core.duplication.httpx.AsyncClient") as mock_client:
                        # Mock 402 response (subscription required)
                        mock_response = MagicMock()
                        mock_response.status_code = 402
                        mock_response.json.return_value = {
                            "message": "This feature is not available in your current plan",
                            "upgrade_url": "https://pipeboard.co/upgrade",
                            "suggestion": "Please upgrade your account to access this feature"
                        }
                        mock_client.return_value.__aenter__.return_value.post.return_value = mock_response
                        
                        result = await duplication.duplicate_campaign(
                            campaign_id="123456789",
                            access_token="facebook_token"  # Use the mocked token
                        )
                        
                        # The @meta_api_tool decorator wraps the result in a data field
                        result_json = json.loads(result)
                        if "data" in result_json:
                            actual_result = json.loads(result_json["data"])
                        else:
                            actual_result = result_json
                            
                        assert actual_result["success"] is False
                        assert actual_result["error"] == "subscription_required"
                        assert actual_result["message"] == "This feature is not available in your current plan"
                        assert actual_result["upgrade_url"] == "https://pipeboard.co/upgrade"
                        assert actual_result["suggestion"] == "Please upgrade your account to access this feature"


class TestDuplicationTokenHandling:
    """Test access token handling to prevent auth regression."""
    
    @pytest.fixture(autouse=True)
    def enable_feature(self):
        """Enable the duplication feature for these tests."""
        with patch.dict(os.environ, {"META_ADS_ENABLE_DUPLICATION": "1"}):
            import importlib
            from meta_ads_mcp.core import duplication
            importlib.reload(duplication)
            yield duplication
    
    @pytest.mark.asyncio
    async def test_meta_api_tool_decorator_token_handling(self, enable_feature):
        """Test that @meta_api_tool decorator properly handles explicit tokens."""
        duplication = enable_feature
        
        # Test with explicit token - this should bypass auth system entirely
        with patch("meta_ads_mcp.core.duplication._forward_duplication_request") as mock_forward:
            mock_forward.return_value = '{"success": true}'
            
            # Call with explicit access_token
            await duplication.duplicate_campaign(
                campaign_id="123456789",
                access_token="explicit_token_12345"
            )
            
            # Verify the explicit token was passed through
            mock_forward.assert_called_once()
            call_args = mock_forward.call_args[0]
            assert call_args[2] == "explicit_token_12345"  # access_token is 3rd argument
    
    @pytest.mark.asyncio
    async def test_explicit_token_overrides_injection(self, enable_feature):
        """Test that explicit token overrides auto-injection."""
        duplication = enable_feature
        
        with patch("meta_ads_mcp.core.auth.get_current_access_token") as mock_get_token:
            mock_get_token.return_value = "injected_token"
            
            with patch("meta_ads_mcp.core.duplication._forward_duplication_request") as mock_forward:
                mock_forward.return_value = '{"success": true}'
                
                # Call with explicit access_token
                await duplication.duplicate_campaign(
                    campaign_id="123456789",
                    access_token="explicit_token_12345"
                )
                
                # Verify the explicit token was used, not the injected one
                mock_forward.assert_called_once()
                call_args = mock_forward.call_args[0]
                assert call_args[2] == "explicit_token_12345"  # access_token is 3rd argument


class TestDuplicationRegressionEdgeCases:
    """Test edge cases that could cause regressions."""
    
    @pytest.fixture(autouse=True)
    def enable_feature(self):
        """Enable the duplication feature for these tests."""
        with patch.dict(os.environ, {"META_ADS_ENABLE_DUPLICATION": "1"}):
            import importlib
            from meta_ads_mcp.core import duplication
            importlib.reload(duplication)
            yield duplication
    
    @pytest.mark.asyncio
    async def test_empty_string_parameters(self, enable_feature):
        """Test handling of empty string parameters."""
        duplication = enable_feature
        
        with patch("meta_ads_mcp.core.duplication._forward_duplication_request") as mock_forward:
            mock_forward.return_value = '{"success": true}'
            
            # Test with empty strings
            await duplication.duplicate_campaign(
                campaign_id="123456789",
                access_token="token",
                name_suffix="",  # Empty string
                new_status=""    # Empty string
            )
            
            # Verify empty strings are preserved (not filtered like None)
            call_args = mock_forward.call_args[0]
            options = call_args[3]
            assert options["name_suffix"] == ""
            assert options["new_status"] == ""
    
    @pytest.mark.asyncio
    async def test_unicode_parameters(self, enable_feature):
        """Test handling of unicode parameters."""
        duplication = enable_feature
        
        # Mock dual-header authentication
        with patch("meta_ads_mcp.core.duplication.FastMCPAuthIntegration") as mock_auth:
            mock_auth.get_pipeboard_token.return_value = "pipeboard_token"
            mock_auth.get_auth_token.return_value = "facebook_token"
            
            with patch("meta_ads_mcp.core.duplication.httpx.AsyncClient") as mock_client:
                mock_response = AsyncMock()
                mock_response.status_code = 200
                mock_response.json.return_value = {"success": True}
                mock_client.return_value.__aenter__.return_value.post.return_value = mock_response
                
                # Test with unicode characters
                unicode_suffix = " - 复制版本 🚀"
                await duplication._forward_duplication_request(
                    "campaign", "123", "token", {"name_suffix": unicode_suffix}
                )
                
                # Verify unicode is preserved in the request
                call_args = mock_client.return_value.__aenter__.return_value.post.call_args
                json_payload = call_args[1]["json"]
                assert json_payload["name_suffix"] == unicode_suffix

    @pytest.mark.asyncio
    async def test_large_parameter_values(self, enable_feature):
        """Test handling of large parameter values."""
        duplication = enable_feature
        
        # Mock dual-header authentication
        with patch("meta_ads_mcp.core.duplication.FastMCPAuthIntegration") as mock_auth:
            mock_auth.get_pipeboard_token.return_value = "pipeboard_token"
            mock_auth.get_auth_token.return_value = "facebook_token"
            
            with patch("meta_ads_mcp.core.duplication.httpx.AsyncClient") as mock_client:
                mock_response = AsyncMock()
                mock_response.status_code = 200
                mock_response.json.return_value = {"success": True}
                mock_client.return_value.__aenter__.return_value.post.return_value = mock_response
                
                # Test with very large budget value
                large_budget = 999999999.99
                await duplication._forward_duplication_request(
                    "campaign", "123", "token", {"new_daily_budget": large_budget}
                )
                
                # Verify large values are preserved
                call_args = mock_client.return_value.__aenter__.return_value.post.call_args
                json_payload = call_args[1]["json"]
                assert json_payload["new_daily_budget"] == large_budget
    
    def test_module_reload_safety(self):
        """Test that module can be safely reloaded without side effects."""
        # This tests for common issues like global state pollution
        
        # Enable feature
        with patch.dict(os.environ, {"META_ADS_ENABLE_DUPLICATION": "1"}):
            import importlib
            from meta_ads_mcp.core import duplication
            importlib.reload(duplication)
            
            assert duplication.ENABLE_DUPLICATION
            assert hasattr(duplication, 'duplicate_campaign')
        
        # Disable feature and reload
        with patch.dict(os.environ, {"META_ADS_ENABLE_DUPLICATION": ""}):
            importlib.reload(duplication)
            
            assert not duplication.ENABLE_DUPLICATION
            # Note: Functions may still exist in the module namespace due to Python's 
            # module loading behavior, but they won't be registered as MCP tools
            # This is expected behavior and not a problem for the feature toggle
        
        # Re-enable feature and reload
        with patch.dict(os.environ, {"META_ADS_ENABLE_DUPLICATION": "1"}):
            importlib.reload(duplication)
            
            assert duplication.ENABLE_DUPLICATION
            assert hasattr(duplication, 'duplicate_campaign') 
```
Page 4/5FirstPrevNextLast