#
tokens: 48176/50000 11/82 files (page 3/5)
lines: off (toggle) GitHub
raw markdown copy
This is page 3 of 5. Use http://codebase.md/nictuku/meta-ads-mcp?lines=false&page={x} to view the full context.

# Directory Structure

```
├── .github
│   └── workflows
│       ├── publish-mcp.yml
│       ├── publish.yml
│       └── test.yml
├── .gitignore
├── .python-version
├── .uv.toml
├── CUSTOM_META_APP.md
├── Dockerfile
├── examples
│   ├── example_http_client.py
│   └── README.md
├── future_improvements.md
├── images
│   └── meta-ads-example.png
├── LICENSE
├── LOCAL_INSTALLATION.md
├── meta_ads_auth.sh
├── meta_ads_mcp
│   ├── __init__.py
│   ├── __main__.py
│   └── core
│       ├── __init__.py
│       ├── accounts.py
│       ├── ads_library.py
│       ├── ads.py
│       ├── adsets.py
│       ├── api.py
│       ├── auth.py
│       ├── authentication.py
│       ├── budget_schedules.py
│       ├── callback_server.py
│       ├── campaigns.py
│       ├── duplication.py
│       ├── http_auth_integration.py
│       ├── insights.py
│       ├── openai_deep_research.py
│       ├── pipeboard_auth.py
│       ├── reports.py
│       ├── resources.py
│       ├── server.py
│       ├── targeting.py
│       └── utils.py
├── META_API_NOTES.md
├── poetry.lock
├── pyproject.toml
├── README.md
├── RELEASE.md
├── requirements.txt
├── server.json
├── setup.py
├── smithery.yaml
├── STREAMABLE_HTTP_SETUP.md
└── tests
    ├── __init__.py
    ├── conftest.py
    ├── e2e_account_info_search_issue.py
    ├── README_REGRESSION_TESTS.md
    ├── README.md
    ├── test_account_info_access_fix.py
    ├── test_account_search.py
    ├── test_budget_update_e2e.py
    ├── test_budget_update.py
    ├── test_create_ad_creative_simple.py
    ├── test_create_simple_creative_e2e.py
    ├── test_dsa_beneficiary.py
    ├── test_dsa_integration.py
    ├── test_duplication_regression.py
    ├── test_duplication.py
    ├── test_dynamic_creatives.py
    ├── test_estimate_audience_size_e2e.py
    ├── test_estimate_audience_size.py
    ├── test_get_account_pages.py
    ├── test_get_ad_creatives_fix.py
    ├── test_get_ad_image_quality_improvements.py
    ├── test_get_ad_image_regression.py
    ├── test_http_transport.py
    ├── test_insights_actions_and_values_e2e.py
    ├── test_insights_pagination.py
    ├── test_integration_openai_mcp.py
    ├── test_is_dynamic_creative_adset.py
    ├── test_mobile_app_adset_creation.py
    ├── test_mobile_app_adset_issue.py
    ├── test_openai_mcp_deep_research.py
    ├── test_openai.py
    ├── test_page_discovery_integration.py
    ├── test_page_discovery.py
    ├── test_targeting_search_e2e.py
    ├── test_targeting.py
    ├── test_update_ad_creative_id.py
    └── test_upload_ad_image.py
```

# Files

--------------------------------------------------------------------------------
/tests/test_estimate_audience_size.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Unit tests for estimate_audience_size functionality in Meta Ads MCP.

This module tests the new estimate_audience_size function that replaces validate_interests
and provides comprehensive audience estimation using Meta's reachestimate API.
"""

import pytest
import json
from unittest.mock import AsyncMock, patch

from meta_ads_mcp.core.targeting import estimate_audience_size


class TestEstimateAudienceSize:
    """Test cases for estimate_audience_size function"""
    
    @pytest.mark.asyncio
    async def test_comprehensive_audience_estimation_success(self):
        """Test successful comprehensive audience estimation with complex targeting"""
        mock_response = {
            "data": [
                {
                    "estimate_mau": 1500000,
                    "estimate_dau": [
                        {"min_reach": 100000, "max_reach": 150000, "bid": 100},
                        {"min_reach": 200000, "max_reach": 250000, "bid": 200}
                    ],
                    "bid_estimates": {
                        "median": 150,
                        "min": 50,
                        "max": 300
                    },
                    "unsupported_targeting": []
                }
            ]
        }
        
        targeting_spec = {
            "age_min": 25,
            "age_max": 65,
            "geo_locations": {"countries": ["US"]},
            "flexible_spec": [
                {"interests": [{"id": "6003371567474"}]}
            ]
        }
        
        with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
            mock_api.return_value = mock_response
            
            result = await estimate_audience_size(
                access_token="test_token",
                account_id="act_123456789",
                targeting=targeting_spec,
                optimization_goal="REACH"
            )
            
            # Verify API call
            mock_api.assert_called_once_with(
                "act_123456789/reachestimate",
                "test_token",
                {
                    "targeting_spec": targeting_spec
                },
                method="GET"
            )
            
            # Verify response format
            result_data = json.loads(result)
            assert result_data["success"] is True
            assert result_data["account_id"] == "act_123456789"
            assert result_data["targeting"] == targeting_spec
            assert result_data["optimization_goal"] == "REACH"
            assert result_data["estimated_audience_size"] == 1500000
            assert "estimate_details" in result_data
            assert result_data["estimate_details"]["monthly_active_users"] == 1500000
    
    @pytest.mark.asyncio
    async def test_different_optimization_goals(self):
        """Test audience estimation with different optimization goals (parameter is preserved in response)"""
        mock_response = {
            "data": [
                {
                    "estimate_mau": 800000,
                    "estimate_dau": [],
                    "bid_estimates": {},
                    "unsupported_targeting": []
                }
            ]
        }
        
        targeting_spec = {
            "age_min": 18,
            "age_max": 45,
            "geo_locations": {"countries": ["US"]}
        }
        
        # Test different optimization goals - they should all use the same reachestimate endpoint
        test_goals = ["REACH", "LINK_CLICKS", "LANDING_PAGE_VIEWS", "CONVERSIONS", "APP_INSTALLS"]
        
        with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
            mock_api.return_value = mock_response
            
            for optimization_goal in test_goals:
                mock_api.reset_mock()
                
                result = await estimate_audience_size(
                    access_token="test_token",
                    account_id="act_123456789",
                    targeting=targeting_spec,
                    optimization_goal=optimization_goal
                )
                
                # Verify API call uses reachestimate endpoint with simplified parameters
                mock_api.assert_called_once_with(
                    "act_123456789/reachestimate",
                    "test_token",
                    {
                        "targeting_spec": targeting_spec
                    },
                    method="GET"
                )
                
                result_data = json.loads(result)
                assert result_data["success"] is True
                assert result_data["optimization_goal"] == optimization_goal
    
    @pytest.mark.asyncio
    async def test_backwards_compatibility_interest_names(self):
        """Test backwards compatibility with interest name validation"""
        mock_response = {
            "data": [
                {
                    "name": "Japan",
                    "valid": True,
                    "id": 6003700426513,
                    "audience_size": 68310258
                },
                {
                    "name": "invalidinterest",
                    "valid": False
                }
            ]
        }
        
        with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
            mock_api.return_value = mock_response
            
            result = await estimate_audience_size(
                access_token="test_token",
                interest_list=["Japan", "invalidinterest"]
            )
            
            # Verify it uses the old validation API
            mock_api.assert_called_once_with(
                "search",
                "test_token",
                {
                    "type": "adinterestvalid",
                    "interest_list": '["Japan", "invalidinterest"]'
                }
            )
            
            # Verify response matches old format
            result_data = json.loads(result)
            assert result_data == mock_response
            assert result_data["data"][0]["valid"] is True
            assert result_data["data"][1]["valid"] is False
    
    @pytest.mark.asyncio
    async def test_backwards_compatibility_interest_fbids(self):
        """Test backwards compatibility with interest FBID validation"""
        mock_response = {
            "data": [
                {
                    "id": "6003700426513",
                    "valid": True,
                    "audience_size": 68310258
                }
            ]
        }
        
        with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
            mock_api.return_value = mock_response
            
            result = await estimate_audience_size(
                access_token="test_token",
                interest_fbid_list=["6003700426513"]
            )
            
            # Verify it uses the old validation API
            mock_api.assert_called_once_with(
                "search",
                "test_token",
                {
                    "type": "adinterestvalid",
                    "interest_fbid_list": '["6003700426513"]'
                }
            )
            
            # Verify response matches old format
            result_data = json.loads(result)
            assert result_data == mock_response
            assert result_data["data"][0]["valid"] is True
    
    @pytest.mark.asyncio
    async def test_backwards_compatibility_both_interest_params(self):
        """Test backwards compatibility with both interest names and FBIDs"""
        mock_response = {
            "data": [
                {"name": "Japan", "valid": True, "id": 6003700426513, "audience_size": 68310258},
                {"id": "6003397425735", "valid": True, "audience_size": 12345678}
            ]
        }
        
        with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
            mock_api.return_value = mock_response
            
            result = await estimate_audience_size(
                access_token="test_token",
                interest_list=["Japan"],
                interest_fbid_list=["6003397425735"]
            )
            
            # Verify both parameters are passed
            mock_api.assert_called_once_with(
                "search",
                "test_token",
                {
                    "type": "adinterestvalid",
                    "interest_list": '["Japan"]',
                    "interest_fbid_list": '["6003397425735"]'
                }
            )
            
            result_data = json.loads(result)
            assert result_data == mock_response
    
    @pytest.mark.asyncio
    async def test_error_no_account_id_for_comprehensive(self):
        """Test error when account_id missing for comprehensive estimation"""
        targeting_spec = {
            "age_min": 25,
            "age_max": 65,
            "geo_locations": {"countries": ["US"]}
        }
        
        result = await estimate_audience_size(
            access_token="test_token",
            targeting=targeting_spec
        )
        
        result_data = json.loads(result)
        # The @meta_api_tool decorator wraps errors in a 'data' field
        assert "data" in result_data
        nested_data = json.loads(result_data["data"])
        assert "error" in nested_data
        assert "account_id is required" in nested_data["error"]
        assert "details" in nested_data
    
    @pytest.mark.asyncio
    async def test_error_no_targeting_for_comprehensive(self):
        """Test error when targeting missing for comprehensive estimation"""
        result = await estimate_audience_size(
            access_token="test_token",
            account_id="act_123456789"
        )
        
        result_data = json.loads(result)
        # The @meta_api_tool decorator wraps errors in a 'data' field
        assert "data" in result_data
        nested_data = json.loads(result_data["data"])
        assert "error" in nested_data
        assert "targeting specification is required" in nested_data["error"]
        assert "example" in nested_data
    
    @pytest.mark.asyncio
    async def test_error_no_parameters(self):
        """Test error when no parameters provided"""
        # Since we're using the @meta_api_tool decorator, we need to simulate
        # its behavior for error handling
        with patch('meta_ads_mcp.core.auth.get_current_access_token') as mock_auth:
            mock_auth.return_value = "test_token"
            
            result = await estimate_audience_size()
            
            result_data = json.loads(result)
            # The @meta_api_tool decorator wraps errors in a 'data' field
            assert "data" in result_data
            nested_data = json.loads(result_data["data"])
            assert "error" in nested_data
    
    @pytest.mark.asyncio
    async def test_error_backwards_compatibility_no_interests(self):
        """Test error in backwards compatibility mode with no interest parameters"""
        result = await estimate_audience_size(
            access_token="test_token"
        )
        
        result_data = json.loads(result)
        # The @meta_api_tool decorator wraps errors in a 'data' field
        assert "data" in result_data
        nested_data = json.loads(result_data["data"])
        assert "error" in nested_data
    
    @pytest.mark.asyncio
    async def test_api_error_handling(self):
        """Test handling of API errors from reachestimate"""
        targeting_spec = {
            "age_min": 25,
            "age_max": 65,
            "geo_locations": {"countries": ["US"]}
        }
        
        # Simulate API exception
        with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
            mock_api.side_effect = Exception("API connection failed")
            
            result = await estimate_audience_size(
                access_token="test_token",
                account_id="act_123456789",
                targeting=targeting_spec
            )
            
            result_data = json.loads(result)
            # The @meta_api_tool decorator wraps errors in a 'data' field
            assert "data" in result_data
            nested_data = json.loads(result_data["data"])
            assert "error" in nested_data
            assert "Failed to get audience estimation from reachestimate endpoint" in nested_data["error"]
            assert "details" in nested_data
    
    @pytest.mark.asyncio
    async def test_empty_api_response(self):
        """Test handling of empty response from reachestimate API"""
        targeting_spec = {
            "age_min": 25,
            "age_max": 65,
            "geo_locations": {"countries": ["US"]}
        }
        
        # Simulate empty API response
        mock_response = {"data": []}
        
        with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
            mock_api.return_value = mock_response
            
            result = await estimate_audience_size(
                access_token="test_token",
                account_id="act_123456789",
                targeting=targeting_spec
            )
            
            result_data = json.loads(result)
            # The @meta_api_tool decorator wraps errors in a 'data' field
            assert "data" in result_data
            nested_data = json.loads(result_data["data"])
            assert "error" in nested_data
            assert "No estimation data returned" in nested_data["error"]
            assert "raw_response" in nested_data
    
    @pytest.mark.asyncio
    async def test_comprehensive_estimation_with_minimal_targeting(self):
        """Test comprehensive estimation with minimal targeting requirements"""
        mock_response = {
            "data": [
                {
                    "estimate_mau": 500000,
                    "estimate_dau": [],
                    "bid_estimates": {},
                    "unsupported_targeting": []
                }
            ]
        }
        
        minimal_targeting = {
            "age_min": 18,
            "age_max": 65,
            "geo_locations": {"countries": ["US"]}
        }
        
        with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
            mock_api.return_value = mock_response
            
            result = await estimate_audience_size(
                access_token="test_token",
                account_id="act_123456789",
                targeting=minimal_targeting
            )
            
            result_data = json.loads(result)
            assert result_data["success"] is True
            assert result_data["estimated_audience_size"] == 500000
            assert result_data["targeting"] == minimal_targeting
    
    @pytest.mark.asyncio
    async def test_estimate_details_structure(self):
        """Test that estimate_details contains expected structure"""
        mock_response = {
            "data": [
                {
                    "estimate_mau": 2000000,
                    "estimate_dau": [
                        {"min_reach": 150000, "max_reach": 200000, "bid": 120}
                    ],
                    "bid_estimates": {
                        "median": 180,
                        "min": 80,
                        "max": 350
                    },
                    "unsupported_targeting": ["custom_audiences"]
                }
            ]
        }
        
        targeting_spec = {
            "age_min": 25,
            "age_max": 45,
            "geo_locations": {"countries": ["US"]},
            "flexible_spec": [
                {"interests": [{"id": "6003371567474"}, {"id": "6003462346642"}]}
            ]
        }
        
        with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
            mock_api.return_value = mock_response
            
            result = await estimate_audience_size(
                access_token="test_token",
                account_id="act_123456789",
                targeting=targeting_spec,
                optimization_goal="CONVERSIONS"
            )
            
            result_data = json.loads(result)
            assert result_data["success"] is True
            
            # Check estimate_details structure
            details = result_data["estimate_details"]
            assert "monthly_active_users" in details
            assert "daily_outcomes_curve" in details
            assert "bid_estimate" in details
            assert "unsupported_targeting" in details
            
            assert details["monthly_active_users"] == 2000000
            assert len(details["daily_outcomes_curve"]) == 1
            assert details["bid_estimate"]["median"] == 180
            assert "custom_audiences" in details["unsupported_targeting"]
    
    @pytest.mark.asyncio
    async def test_function_registration(self):
        """Test that the function is properly registered as an MCP tool"""
        # This test verifies the function has the correct decorators
        assert hasattr(estimate_audience_size, '__wrapped__')  # From @meta_api_tool
        
        # Verify function signature
        import inspect
        sig = inspect.signature(estimate_audience_size)
        
        expected_params = [
            'access_token', 'account_id', 'targeting', 'optimization_goal',
            'interest_list', 'interest_fbid_list'
        ]
        
        for param in expected_params:
            assert param in sig.parameters
        
        # Verify default values
        assert sig.parameters['optimization_goal'].default == "REACH"
        assert sig.parameters['access_token'].default is None
        assert sig.parameters['targeting'].default is None
```

--------------------------------------------------------------------------------
/meta_ads_mcp/core/openai_deep_research.py:
--------------------------------------------------------------------------------

```python
"""OpenAI MCP Deep Research tools for Meta Ads API.

This module implements the required 'search' and 'fetch' tools for OpenAI's 
ChatGPT Deep Research feature, providing access to Meta Ads data in the format 
expected by ChatGPT.

The tools expose Meta Ads data (accounts, campaigns, ads, etc.) as searchable 
and fetchable records for ChatGPT Deep Research analysis.
"""

import json
import re
from typing import List, Dict, Any, Optional
from .api import meta_api_tool, make_api_request
from .server import mcp_server
from .utils import logger


class MetaAdsDataManager:
    """Manages Meta Ads data for OpenAI MCP search and fetch operations"""
    
    def __init__(self):
        self._cache = {}
        logger.debug("MetaAdsDataManager initialized")
    
    async def _get_ad_accounts(self, access_token: str, limit: int = 200) -> List[Dict[str, Any]]:
        """Get ad accounts data"""
        try:
            endpoint = "me/adaccounts"
            params = {
                "fields": "id,name,account_id,account_status,amount_spent,balance,currency,business_city,business_country_code",
                "limit": limit
            }
            
            data = await make_api_request(endpoint, access_token, params)
            
            if "data" in data:
                return data["data"]
            return []
        except Exception as e:
            logger.error(f"Error fetching ad accounts: {e}")
            return []
    
    async def _get_campaigns(self, access_token: str, account_id: str, limit: int = 25) -> List[Dict[str, Any]]:
        """Get campaigns data for an account"""
        try:
            endpoint = f"{account_id}/campaigns"
            params = {
                "fields": "id,name,status,objective,daily_budget,lifetime_budget,start_time,stop_time,created_time,updated_time",
                "limit": limit
            }
            
            data = await make_api_request(endpoint, access_token, params)
            
            if "data" in data:
                return data["data"]
            return []
        except Exception as e:
            logger.error(f"Error fetching campaigns for {account_id}: {e}")
            return []
    
    async def _get_ads(self, access_token: str, account_id: str, limit: int = 25) -> List[Dict[str, Any]]:
        """Get ads data for an account"""
        try:
            endpoint = f"{account_id}/ads"
            params = {
                "fields": "id,name,status,creative,targeting,bid_amount,created_time,updated_time",
                "limit": limit
            }
            
            data = await make_api_request(endpoint, access_token, params)
            
            if "data" in data:
                return data["data"]
            return []
        except Exception as e:
            logger.error(f"Error fetching ads for {account_id}: {e}")
            return []
    
    async def _get_pages_for_account(self, access_token: str, account_id: str) -> List[Dict[str, Any]]:
        """Get pages associated with an account"""
        try:
            # Import the page discovery function from ads module
            from .ads import _discover_pages_for_account
            
            # Ensure account_id has the 'act_' prefix
            if not account_id.startswith("act_"):
                account_id = f"act_{account_id}"
            
            page_discovery_result = await _discover_pages_for_account(account_id, access_token)
            
            if not page_discovery_result.get("success"):
                return []
            
            # Return page data in a consistent format
            return [{
                "id": page_discovery_result["page_id"],
                "name": page_discovery_result.get("page_name", "Unknown"),
                "source": page_discovery_result.get("source", "unknown"),
                "account_id": account_id
            }]
        except Exception as e:
            logger.error(f"Error fetching pages for {account_id}: {e}")
            return []
    
    async def _get_businesses(self, access_token: str, user_id: str = "me", limit: int = 25) -> List[Dict[str, Any]]:
        """Get businesses accessible by the current user"""
        try:
            endpoint = f"{user_id}/businesses"
            params = {
                "fields": "id,name,created_time,verification_status",
                "limit": limit
            }
            
            data = await make_api_request(endpoint, access_token, params)
            
            if "data" in data:
                return data["data"]
            return []
        except Exception as e:
            logger.error(f"Error fetching businesses: {e}")
            return []
    
    async def search_records(self, query: str, access_token: str) -> List[str]:
        """Search Meta Ads data and return matching record IDs
        
        Args:
            query: Search query string
            access_token: Meta API access token
            
        Returns:
            List of record IDs that match the query
        """
        logger.info(f"Searching Meta Ads data with query: {query}")
        
        # Normalize query for matching
        query_lower = query.lower()
        query_terms = re.findall(r'\w+', query_lower)
        
        matching_ids = []
        
        try:
            # Search ad accounts
            accounts = await self._get_ad_accounts(access_token, limit=200)
            for account in accounts:
                account_text = f"{account.get('name', '')} {account.get('id', '')} {account.get('account_status', '')} {account.get('business_city', '')} {account.get('business_country_code', '')}".lower()
                
                if any(term in account_text for term in query_terms):
                    record_id = f"account:{account['id']}"
                    matching_ids.append(record_id)
                    
                    # Cache the account data
                    self._cache[record_id] = {
                        "id": record_id,
                        "type": "account",
                        "title": f"Ad Account: {account.get('name', 'Unnamed Account')}",
                        "text": f"Meta Ads Account {account.get('name', 'Unnamed')} (ID: {account.get('id', 'N/A')}) - Status: {account.get('account_status', 'Unknown')}, Currency: {account.get('currency', 'Unknown')}, Spent: ${account.get('amount_spent', 0)}, Balance: ${account.get('balance', 0)}",
                        "metadata": {
                            "account_id": account.get('id'),
                            "account_name": account.get('name'),
                            "status": account.get('account_status'),
                            "currency": account.get('currency'),
                            "business_location": f"{account.get('business_city', '')}, {account.get('business_country_code', '')}".strip(', '),
                            "data_type": "meta_ads_account"
                        },
                        "raw_data": account
                    }
                    
                    # Also search campaigns for this account if it matches
                    campaigns = await self._get_campaigns(access_token, account['id'], limit=10)
                    for campaign in campaigns:
                        campaign_text = f"{campaign.get('name', '')} {campaign.get('objective', '')} {campaign.get('status', '')}".lower()
                        
                        if any(term in campaign_text for term in query_terms):
                            campaign_record_id = f"campaign:{campaign['id']}"
                            matching_ids.append(campaign_record_id)
                            
                            # Cache the campaign data
                            self._cache[campaign_record_id] = {
                                "id": campaign_record_id,
                                "type": "campaign",
                                "title": f"Campaign: {campaign.get('name', 'Unnamed Campaign')}",
                                "text": f"Meta Ads Campaign {campaign.get('name', 'Unnamed')} (ID: {campaign.get('id', 'N/A')}) - Objective: {campaign.get('objective', 'Unknown')}, Status: {campaign.get('status', 'Unknown')}, Daily Budget: ${campaign.get('daily_budget', 'Not set')}, Account: {account.get('name', 'Unknown')}",
                                "metadata": {
                                    "campaign_id": campaign.get('id'),
                                    "campaign_name": campaign.get('name'),
                                    "objective": campaign.get('objective'),
                                    "status": campaign.get('status'),
                                    "account_id": account.get('id'),
                                    "account_name": account.get('name'),
                                    "data_type": "meta_ads_campaign"
                                },
                                "raw_data": campaign
                            }
            
            # If query specifically mentions "ads" or "ad", also search individual ads
            if any(term in ['ad', 'ads', 'advertisement', 'creative'] for term in query_terms):
                for account in accounts[:3]:  # Limit to first 3 accounts for performance
                    ads = await self._get_ads(access_token, account['id'], limit=10)
                    for ad in ads:
                        ad_text = f"{ad.get('name', '')} {ad.get('status', '')}".lower()
                        
                        if any(term in ad_text for term in query_terms):
                            ad_record_id = f"ad:{ad['id']}"
                            matching_ids.append(ad_record_id)
                            
                            # Cache the ad data
                            self._cache[ad_record_id] = {
                                "id": ad_record_id,
                                "type": "ad",
                                "title": f"Ad: {ad.get('name', 'Unnamed Ad')}",
                                "text": f"Meta Ad {ad.get('name', 'Unnamed')} (ID: {ad.get('id', 'N/A')}) - Status: {ad.get('status', 'Unknown')}, Bid Amount: ${ad.get('bid_amount', 'Not set')}, Account: {account.get('name', 'Unknown')}",
                                "metadata": {
                                    "ad_id": ad.get('id'),
                                    "ad_name": ad.get('name'),
                                    "status": ad.get('status'),
                                    "account_id": account.get('id'),
                                    "account_name": account.get('name'),
                                    "data_type": "meta_ads_ad"
                                },
                                "raw_data": ad
                            }
            
            # If query specifically mentions "page" or "pages", also search pages
            if any(term in ['page', 'pages', 'facebook page'] for term in query_terms):
                for account in accounts[:5]:  # Limit to first 5 accounts for performance
                    pages = await self._get_pages_for_account(access_token, account['id'])
                    for page in pages:
                        page_text = f"{page.get('name', '')} {page.get('source', '')}".lower()
                        
                        if any(term in page_text for term in query_terms):
                            page_record_id = f"page:{page['id']}"
                            matching_ids.append(page_record_id)
                            
                            # Cache the page data
                            self._cache[page_record_id] = {
                                "id": page_record_id,
                                "type": "page",
                                "title": f"Facebook Page: {page.get('name', 'Unnamed Page')}",
                                "text": f"Facebook Page {page.get('name', 'Unnamed')} (ID: {page.get('id', 'N/A')}) - Source: {page.get('source', 'Unknown')}, Account: {account.get('name', 'Unknown')}",
                                "metadata": {
                                    "page_id": page.get('id'),
                                    "page_name": page.get('name'),
                                    "source": page.get('source'),
                                    "account_id": account.get('id'),
                                    "account_name": account.get('name'),
                                    "data_type": "meta_ads_page"
                                },
                                "raw_data": page
                            }
            
            # If query specifically mentions "business" or "businesses", also search businesses
            if any(term in ['business', 'businesses', 'company', 'companies'] for term in query_terms):
                businesses = await self._get_businesses(access_token, limit=25)
                for business in businesses:
                    business_text = f"{business.get('name', '')} {business.get('verification_status', '')}".lower()
                    
                    if any(term in business_text for term in query_terms):
                        business_record_id = f"business:{business['id']}"
                        matching_ids.append(business_record_id)
                        
                        # Cache the business data
                        self._cache[business_record_id] = {
                            "id": business_record_id,
                            "type": "business",
                            "title": f"Business: {business.get('name', 'Unnamed Business')}",
                            "text": f"Meta Business {business.get('name', 'Unnamed')} (ID: {business.get('id', 'N/A')}) - Created: {business.get('created_time', 'Unknown')}, Verification: {business.get('verification_status', 'Unknown')}",
                            "metadata": {
                                "business_id": business.get('id'),
                                "business_name": business.get('name'),
                                "created_time": business.get('created_time'),
                                "verification_status": business.get('verification_status'),
                                "data_type": "meta_ads_business"
                            },
                            "raw_data": business
                        }
        
        except Exception as e:
            logger.error(f"Error during search operation: {e}")
            # Return empty list on error, but don't raise exception
            return []
        
        logger.info(f"Search completed. Found {len(matching_ids)} matching records")
        return matching_ids[:50]  # Limit to 50 results for performance
    
    def fetch_record(self, record_id: str) -> Optional[Dict[str, Any]]:
        """Fetch a cached record by ID
        
        Args:
            record_id: The record ID to fetch
            
        Returns:
            Record data or None if not found
        """
        logger.info(f"Fetching record: {record_id}")
        
        record = self._cache.get(record_id)
        if record:
            logger.debug(f"Record found in cache: {record['type']}")
            return record
        else:
            logger.warning(f"Record not found in cache: {record_id}")
            return None


# Global data manager instance
_data_manager = MetaAdsDataManager()


@mcp_server.tool()
@meta_api_tool
async def search(
    query: str,
    access_token: Optional[str] = None
) -> str:
    """
    Search through Meta Ads data and return matching record IDs.
    It searches across ad accounts, campaigns, ads, pages, and businesses to find relevant records
    based on the provided query.
    
    Args:
        query: Search query string to find relevant Meta Ads records
        access_token: Meta API access token (optional - will use cached token if not provided)
        
    Returns:
        JSON response with list of matching record IDs
        
    Example Usage:
        search(query="active campaigns")
        search(query="account spending")
        search(query="facebook ads performance")
        search(query="facebook pages")
        search(query="user businesses")
    """
    if not query:
        return json.dumps({
            "error": "query parameter is required",
            "ids": []
        }, indent=2)
    
    try:
        # Use the data manager to search records
        matching_ids = await _data_manager.search_records(query, access_token)
        
        response = {
            "ids": matching_ids,
            "query": query,
            "total_results": len(matching_ids)
        }
        
        logger.info(f"Search successful. Query: '{query}', Results: {len(matching_ids)}")
        return json.dumps(response, indent=2)
        
    except Exception as e:
        error_msg = str(e)
        logger.error(f"Error in search tool: {error_msg}")
        
        return json.dumps({
            "error": "Failed to search Meta Ads data",
            "details": error_msg,
            "ids": [],
            "query": query
        }, indent=2)


@mcp_server.tool()
async def fetch(
    id: str
) -> str:
    """
    Fetch complete record data by ID.
    It retrieves the full data for a specific record identified by its ID.
    
    Args:
        id: The record ID to fetch (format: "type:id", e.g., "account:act_123456")
        
    Returns:
        JSON response with complete record data including id, title, text, and metadata
        
    Example Usage:
        fetch(id="account:act_123456789")
        fetch(id="campaign:23842588888640185")
        fetch(id="ad:23842614006130185")
        fetch(id="page:123456789")
    """
    if not id:
        return json.dumps({
            "error": "id parameter is required"
        }, indent=2)
    
    try:
        # Use the data manager to fetch the record
        record = _data_manager.fetch_record(id)
        
        if record:
            logger.info(f"Record fetched successfully: {id}")
            return json.dumps(record, indent=2)
        else:
            logger.warning(f"Record not found: {id}")
            return json.dumps({
                "error": f"Record not found: {id}",
                "id": id
            }, indent=2)
            
    except Exception as e:
        error_msg = str(e)
        logger.error(f"Error in fetch tool: {error_msg}")
        
        return json.dumps({
            "error": "Failed to fetch record",
            "details": error_msg,
            "id": id
        }, indent=2) 
```

--------------------------------------------------------------------------------
/meta_ads_mcp/core/duplication.py:
--------------------------------------------------------------------------------

```python
"""Duplication functionality for Meta Ads API."""

import json
import os
import httpx
from typing import Optional, Dict, Any, List, Union
from .server import mcp_server
from .api import meta_api_tool
from . import auth
from .http_auth_integration import FastMCPAuthIntegration


# Only register the duplication functions if the environment variable is set
ENABLE_DUPLICATION = bool(os.environ.get("META_ADS_ENABLE_DUPLICATION", ""))

if ENABLE_DUPLICATION:
    @mcp_server.tool()
    @meta_api_tool
    async def duplicate_campaign(
        campaign_id: str,
        access_token: Optional[str] = None,
        name_suffix: Optional[str] = " - Copy",
        include_ad_sets: bool = True,
        include_ads: bool = True,
        include_creatives: bool = True,
        copy_schedule: bool = False,
        new_daily_budget: Optional[float] = None,
        new_status: Optional[str] = "PAUSED"
    ) -> str:
        """
        Duplicate a Meta Ads campaign with all its ad sets and ads.

        Recommended: Use this to run robust experiments.
        
        Args:
            campaign_id: Meta Ads campaign ID to duplicate
            name_suffix: Suffix to add to the duplicated campaign name
            include_ad_sets: Whether to duplicate ad sets within the campaign
            include_ads: Whether to duplicate ads within ad sets
            include_creatives: Whether to duplicate ad creatives
            copy_schedule: Whether to copy the campaign schedule
            new_daily_budget: Override the daily budget for the new campaign
            new_status: Status for the new campaign (ACTIVE or PAUSED)
        """
        return await _forward_duplication_request(
            "campaign",
            campaign_id,
            access_token,
            {
                "name_suffix": name_suffix,
                "include_ad_sets": include_ad_sets,
                "include_ads": include_ads,
                "include_creatives": include_creatives,
                "copy_schedule": copy_schedule,
                "new_daily_budget": new_daily_budget,
                "new_status": new_status
            }
        )

    @mcp_server.tool()
    @meta_api_tool
    async def duplicate_adset(
        adset_id: str,
        access_token: Optional[str] = None,
        target_campaign_id: Optional[str] = None,
        name_suffix: Optional[str] = " - Copy",
        include_ads: bool = True,
        include_creatives: bool = True,
        new_daily_budget: Optional[float] = None,
        new_targeting: Optional[Dict[str, Any]] = None,
        new_status: Optional[str] = "PAUSED"
    ) -> str:
        """
        Duplicate a Meta Ads ad set with its ads.

        Recommended: Use this to run robust experiments.
        
        Args:
            adset_id: Meta Ads ad set ID to duplicate
            target_campaign_id: Campaign ID to move the duplicated ad set to (optional)
            name_suffix: Suffix to add to the duplicated ad set name
            include_ads: Whether to duplicate ads within the ad set
            include_creatives: Whether to duplicate ad creatives
            new_daily_budget: Override the daily budget for the new ad set
            new_targeting: Override targeting settings for the new ad set
            new_status: Status for the new ad set (ACTIVE or PAUSED)
        """
        return await _forward_duplication_request(
            "adset",
            adset_id,
            access_token,
            {
                "target_campaign_id": target_campaign_id,
                "name_suffix": name_suffix,
                "include_ads": include_ads,
                "include_creatives": include_creatives,
                "new_daily_budget": new_daily_budget,
                "new_targeting": new_targeting,
                "new_status": new_status
            }
        )

    @mcp_server.tool()
    @meta_api_tool
    async def duplicate_ad(
        ad_id: str,
        access_token: Optional[str] = None,
        target_adset_id: Optional[str] = None,
        name_suffix: Optional[str] = " - Copy",
        duplicate_creative: bool = True,
        new_creative_name: Optional[str] = None,
        new_status: Optional[str] = "PAUSED"
    ) -> str:
        """
        Duplicate a Meta Ads ad.

        Recommended: Use this to run robust experiments.
        
        Args:
            ad_id: Meta Ads ad ID to duplicate
            target_adset_id: Ad set ID to move the duplicated ad to (optional)
            name_suffix: Suffix to add to the duplicated ad name
            duplicate_creative: Whether to duplicate the ad creative
            new_creative_name: Override name for the duplicated creative
            new_status: Status for the new ad (ACTIVE or PAUSED)
        """
        return await _forward_duplication_request(
            "ad",
            ad_id,
            access_token,
            {
                "target_adset_id": target_adset_id,
                "name_suffix": name_suffix,
                "duplicate_creative": duplicate_creative,
                "new_creative_name": new_creative_name,
                "new_status": new_status
            }
        )

    @mcp_server.tool()
    @meta_api_tool
    async def duplicate_creative(
        creative_id: str,
        access_token: Optional[str] = None,
        name_suffix: Optional[str] = " - Copy",
        new_primary_text: Optional[str] = None,
        new_headline: Optional[str] = None,
        new_description: Optional[str] = None,
        new_cta_type: Optional[str] = None,
        new_destination_url: Optional[str] = None
    ) -> str:
        """
        Duplicate a Meta Ads creative.

        Recommended: Use this to run robust experiments.
        
        Args:
            creative_id: Meta Ads creative ID to duplicate
            name_suffix: Suffix to add to the duplicated creative name
            new_primary_text: Override the primary text for the new creative
            new_headline: Override the headline for the new creative
            new_description: Override the description for the new creative
            new_cta_type: Override the call-to-action type for the new creative
            new_destination_url: Override the destination URL for the new creative
        """
        return await _forward_duplication_request(
            "creative",
            creative_id,
            access_token,
            {
                "name_suffix": name_suffix,
                "new_primary_text": new_primary_text,
                "new_headline": new_headline,
                "new_description": new_description,
                "new_cta_type": new_cta_type,
                "new_destination_url": new_destination_url
            }
        )


async def _forward_duplication_request(resource_type: str, resource_id: str, access_token: str, options: Dict[str, Any]) -> str:
    """
    Forward duplication request to the cloud-hosted MCP API using dual-header authentication.
    
    This implements the dual-header authentication pattern for MCP server callbacks:
    - Authorization: Bearer <facebook_token> - Facebook access token for Meta API calls
    - X-Pipeboard-Token: <pipeboard_token> - Pipeboard API token for authentication
    
    Args:
        resource_type: Type of resource to duplicate (campaign, adset, ad, creative)
        resource_id: ID of the resource to duplicate
        access_token: Meta API access token (optional, will use context if not provided)
        options: Duplication options
    """
    try:
        # Get tokens from the request context that were set by the HTTP auth middleware
        # In the dual-header authentication pattern:
        # - Pipeboard token comes from X-Pipeboard-Token header (for authentication)
        # - Facebook token comes from Authorization header (for Meta API calls)
        
        # Get tokens from context set by AuthInjectionMiddleware
        pipeboard_token = FastMCPAuthIntegration.get_pipeboard_token()
        facebook_token = FastMCPAuthIntegration.get_auth_token()
        
        # Use provided access_token parameter if no Facebook token found in context
        if not facebook_token:
            facebook_token = access_token if access_token else await auth.get_current_access_token()
        
        # Validate we have both required tokens
        if not pipeboard_token:
            return json.dumps({
                "error": "authentication_required",
                "message": "Pipeboard API token not found",
                "details": {
                    "required": "Valid Pipeboard token via X-Pipeboard-Token header",
                    "received_headers": "Check that the MCP server is forwarding the X-Pipeboard-Token header"
                }
            }, indent=2)
            
        if not facebook_token:
            return json.dumps({
                "error": "authentication_required",
                "message": "Meta Ads access token not found",
                "details": {
                    "required": "Valid Meta access token from authenticated session",
                    "check": "Ensure Facebook account is connected and token is valid"
                }
            }, indent=2)

        # Construct the API endpoint
        base_url = "https://mcp.pipeboard.co"
        endpoint = f"{base_url}/api/meta/duplicate/{resource_type}/{resource_id}"
        
        # Prepare the dual-header authentication as per API documentation
        headers = {
            "Authorization": f"Bearer {facebook_token}",  # Facebook token for Meta API
            "X-Pipeboard-Token": pipeboard_token,         # Pipeboard token for auth
            "Content-Type": "application/json",
            "User-Agent": "meta-ads-mcp/1.0"
        }
        
        # Remove None values from options
        clean_options = {k: v for k, v in options.items() if v is not None}
        
        # Make the request to the cloud service
        async with httpx.AsyncClient(timeout=30.0) as client:
            response = await client.post(
                endpoint,
                headers=headers,
                json=clean_options
            )
            
            if response.status_code == 200:
                result = response.json()
                return json.dumps(result, indent=2)
            elif response.status_code == 400:
                # Validation failed
                try:
                    error_data = response.json()
                    return json.dumps({
                        "success": False,
                        "error": "validation_failed",
                        "errors": error_data.get("errors", [response.text]),
                        "warnings": error_data.get("warnings", [])
                    }, indent=2)
                except:
                    return json.dumps({
                        "success": False,
                        "error": "validation_failed",
                        "errors": [response.text],
                        "warnings": []
                    }, indent=2)
            elif response.status_code == 401:
                return json.dumps({
                    "success": False,
                    "error": "authentication_error",
                    "message": "Invalid or expired API token"
                }, indent=2)
            elif response.status_code == 402:
                try:
                    error_data = response.json()
                    return json.dumps({
                        "success": False,
                        "error": "subscription_required",
                        "message": error_data.get("message", "This feature is not available in your current plan"),
                        "upgrade_url": error_data.get("upgrade_url", "https://pipeboard.co/upgrade"),
                        "suggestion": error_data.get("suggestion", "Please upgrade your account to access this feature")
                    }, indent=2)
                except:
                    return json.dumps({
                        "success": False,
                        "error": "subscription_required",
                        "message": "This feature is not available in your current plan",
                        "upgrade_url": "https://pipeboard.co/upgrade",
                        "suggestion": "Please upgrade your account to access this feature"
                    }, indent=2)
            elif response.status_code == 403:
                try:
                    error_data = response.json()
                    # Check if this is a premium feature error
                    if error_data.get("error") == "premium_feature":
                        return json.dumps({
                            "success": False,
                            "error": "premium_feature_required",
                            "message": error_data.get("message", "This is a premium feature that requires subscription"),
                            "details": error_data.get("details", {
                                "upgrade_url": "https://pipeboard.co/upgrade",
                                "suggestion": "Please upgrade your account to access this feature"
                            })
                        }, indent=2)
                    else:
                        # Default to facebook connection required
                        return json.dumps({
                            "success": False,
                            "error": "facebook_connection_required",
                            "message": error_data.get("message", "You need to connect your Facebook account first"),
                            "details": error_data.get("details", {
                                "login_flow_url": "/connections",
                                "auth_flow_url": "/api/meta/auth"
                            })
                        }, indent=2)
                except:
                    return json.dumps({
                        "success": False,
                        "error": "facebook_connection_required",
                        "message": "You need to connect your Facebook account first",
                        "details": {
                            "login_flow_url": "/connections",
                            "auth_flow_url": "/api/meta/auth"
                        }
                    }, indent=2)
            elif response.status_code == 404:
                return json.dumps({
                    "success": False,
                    "error": "resource_not_found",
                    "message": f"{resource_type.title()} not found or access denied",
                    "suggestion": f"Verify the {resource_type} ID and your Facebook account permissions"
                }, indent=2)
            elif response.status_code == 429:
                return json.dumps({
                    "error": "rate_limit_exceeded", 
                    "message": "Meta API rate limit exceeded",
                    "details": {
                        "suggestion": "Please wait before retrying",
                        "retry_after": response.headers.get("Retry-After", "60")
                    }
                }, indent=2)
            elif response.status_code == 502:
                try:
                    error_data = response.json()
                    return json.dumps({
                        "success": False,
                        "error": "meta_api_error",
                        "message": error_data.get("message", "Facebook API error"),
                        "recoverable": True,
                        "suggestion": "Please wait 5 minutes before retrying"
                    }, indent=2)
                except:
                    return json.dumps({
                        "success": False,
                        "error": "meta_api_error",
                        "message": "Facebook API error",
                        "recoverable": True,
                        "suggestion": "Please wait 5 minutes before retrying"
                    }, indent=2)
            else:
                error_detail = response.text
                try:
                    error_json = response.json()
                    error_detail = error_json.get("message", error_detail)
                except:
                    pass
                
                return json.dumps({
                    "error": "duplication_failed",
                    "message": f"Failed to duplicate {resource_type}",
                    "details": {
                        "status_code": response.status_code,
                        "error_detail": error_detail,
                        "resource_type": resource_type,
                        "resource_id": resource_id
                    }
                }, indent=2)
    
    except httpx.TimeoutException:
        return json.dumps({
            "error": "request_timeout",
            "message": "Request to duplication service timed out",
            "details": {
                "suggestion": "Please try again later",
                "timeout": "30 seconds"
            }
        }, indent=2)
    
    except httpx.RequestError as e:
        return json.dumps({
            "error": "network_error", 
            "message": "Failed to connect to duplication service",
            "details": {
                "error": str(e),
                "suggestion": "Check your internet connection and try again"
            }
        }, indent=2)
    
    except Exception as e:
        return json.dumps({
            "error": "unexpected_error",
            "message": f"Unexpected error during {resource_type} duplication",
            "details": {
                "error": str(e),
                "resource_type": resource_type,
                "resource_id": resource_id
            }
        }, indent=2)


def _get_estimated_components(resource_type: str, options: Dict[str, Any]) -> Dict[str, Any]:
    """Get estimated components that would be duplicated."""
    if resource_type == "campaign":
        components = {"campaigns": 1}
        if options.get("include_ad_sets", True):
            components["ad_sets"] = "3-5 (estimated)"
        if options.get("include_ads", True):
            components["ads"] = "5-15 (estimated)"
        if options.get("include_creatives", True):
            components["creatives"] = "5-15 (estimated)"
        return components
    elif resource_type == "adset":
        components = {"ad_sets": 1}
        if options.get("include_ads", True):
            components["ads"] = "2-5 (estimated)"
        if options.get("include_creatives", True):
            components["creatives"] = "2-5 (estimated)"
        return components
    elif resource_type == "ad":
        components = {"ads": 1}
        if options.get("duplicate_creative", True):
            components["creatives"] = 1
        return components
    elif resource_type == "creative":
        return {"creatives": 1}
    
    return {} 
```

--------------------------------------------------------------------------------
/tests/test_get_ad_image_quality_improvements.py:
--------------------------------------------------------------------------------

```python
"""Tests for get_ad_image quality improvements.

These tests verify that the get_ad_image function now correctly prioritizes
high-quality ad creative images over profile thumbnails.

Key improvements tested:
1. Prioritizes image_urls_for_viewing over thumbnail_url
2. Uses image_url as second priority
3. Uses object_story_spec.link_data.picture as third priority
4. Only uses thumbnail_url as last resort
5. Better logging to show which image source is being used
"""

import pytest
import json
from unittest.mock import AsyncMock, patch, MagicMock
from meta_ads_mcp.core.ads import get_ad_image
from meta_ads_mcp.core.utils import extract_creative_image_urls


class TestGetAdImageQualityImprovements:
    """Test cases for image quality improvements in get_ad_image function."""
    
    @pytest.mark.asyncio
    async def test_prioritizes_image_urls_for_viewing_over_thumbnail(self):
        """Test that image_urls_for_viewing is prioritized over thumbnail_url."""
        
        # Mock responses for creative with both high-quality and thumbnail URLs
        mock_ad_data = {
            "account_id": "act_123456789",
            "creative": {"id": "creative_123456789"}
        }
        
        mock_creative_details = {
            "id": "creative_123456789",
            "name": "Test Creative"
            # No image_hash - triggers fallback
        }
        
        # Mock get_ad_creatives response with both URLs
        mock_get_ad_creatives_response = json.dumps({
            "data": [
                {
                    "id": "creative_123456789",
                    "name": "Test Creative",
                    "status": "ACTIVE",
                    "thumbnail_url": "https://example.com/thumbnail_64x64.jpg",  # Low quality
                    "image_url": "https://example.com/full_image.jpg",  # Medium quality
                    "image_urls_for_viewing": [
                        "https://example.com/high_quality_image.jpg",  # Highest quality
                        "https://example.com/alt_high_quality.jpg"
                    ],
                    "object_story_spec": {
                        "link_data": {
                            "picture": "https://example.com/object_story_picture.jpg"
                        }
                    }
                }
            ]
        })
        
        # Mock PIL Image processing
        mock_pil_image = MagicMock()
        mock_pil_image.mode = "RGB"
        mock_pil_image.convert.return_value = mock_pil_image
        
        mock_byte_stream = MagicMock()
        mock_byte_stream.getvalue.return_value = b"fake_jpeg_data"
        
        with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api, \
             patch('meta_ads_mcp.core.ads.get_ad_creatives', new_callable=AsyncMock) as mock_get_creatives, \
             patch('meta_ads_mcp.core.ads.download_image', new_callable=AsyncMock) as mock_download, \
             patch('meta_ads_mcp.core.ads.PILImage.open') as mock_pil_open, \
             patch('meta_ads_mcp.core.ads.io.BytesIO') as mock_bytesio:
            
            mock_api.side_effect = [mock_ad_data, mock_creative_details]
            mock_get_creatives.return_value = mock_get_ad_creatives_response
            mock_download.return_value = b"fake_image_bytes"
            mock_pil_open.return_value = mock_pil_image
            mock_bytesio.return_value = mock_byte_stream
            
            # This should prioritize image_urls_for_viewing[0]
            result = await get_ad_image(access_token="test_token", ad_id="test_ad_id")
            
            # Verify it used the highest quality URL
            assert result is not None
            mock_download.assert_called_once_with("https://example.com/high_quality_image.jpg")
    
    @pytest.mark.asyncio
    async def test_falls_back_to_image_url_when_image_urls_for_viewing_unavailable(self):
        """Test fallback to image_url when image_urls_for_viewing is not available."""
        
        # Mock responses for creative without image_urls_for_viewing
        mock_ad_data = {
            "account_id": "act_123456789",
            "creative": {"id": "creative_123456789"}
        }
        
        mock_creative_details = {
            "id": "creative_123456789",
            "name": "Test Creative"
        }
        
        # Mock get_ad_creatives response without image_urls_for_viewing
        mock_get_ad_creatives_response = json.dumps({
            "data": [
                {
                    "id": "creative_123456789",
                    "name": "Test Creative",
                    "status": "ACTIVE",
                    "thumbnail_url": "https://example.com/thumbnail.jpg",
                    "image_url": "https://example.com/full_image.jpg",  # Should be used
                    "object_story_spec": {
                        "link_data": {
                            "picture": "https://example.com/object_story_picture.jpg"
                        }
                    }
                }
            ]
        })
        
        # Mock PIL Image processing
        mock_pil_image = MagicMock()
        mock_pil_image.mode = "RGB"
        mock_pil_image.convert.return_value = mock_pil_image
        
        mock_byte_stream = MagicMock()
        mock_byte_stream.getvalue.return_value = b"fake_jpeg_data"
        
        with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api, \
             patch('meta_ads_mcp.core.ads.get_ad_creatives', new_callable=AsyncMock) as mock_get_creatives, \
             patch('meta_ads_mcp.core.ads.download_image', new_callable=AsyncMock) as mock_download, \
             patch('meta_ads_mcp.core.ads.PILImage.open') as mock_pil_open, \
             patch('meta_ads_mcp.core.ads.io.BytesIO') as mock_bytesio:
            
            mock_api.side_effect = [mock_ad_data, mock_creative_details]
            mock_get_creatives.return_value = mock_get_ad_creatives_response
            mock_download.return_value = b"fake_image_bytes"
            mock_pil_open.return_value = mock_pil_image
            mock_bytesio.return_value = mock_byte_stream
            
            # This should fall back to image_url
            result = await get_ad_image(access_token="test_token", ad_id="test_ad_id")
            
            # Verify it used image_url
            assert result is not None
            mock_download.assert_called_once_with("https://example.com/full_image.jpg")
    
    @pytest.mark.asyncio
    async def test_falls_back_to_object_story_spec_picture_when_image_url_unavailable(self):
        """Test fallback to object_story_spec.link_data.picture when image_url is not available."""
        
        # Mock responses for creative without image_url
        mock_ad_data = {
            "account_id": "act_123456789",
            "creative": {"id": "creative_123456789"}
        }
        
        mock_creative_details = {
            "id": "creative_123456789",
            "name": "Test Creative"
        }
        
        # Mock get_ad_creatives response without image_url
        mock_get_ad_creatives_response = json.dumps({
            "data": [
                {
                    "id": "creative_123456789",
                    "name": "Test Creative",
                    "status": "ACTIVE",
                    "thumbnail_url": "https://example.com/thumbnail.jpg",
                    "object_story_spec": {
                        "link_data": {
                            "picture": "https://example.com/object_story_picture.jpg"  # Should be used
                        }
                    }
                }
            ]
        })
        
        # Mock PIL Image processing
        mock_pil_image = MagicMock()
        mock_pil_image.mode = "RGB"
        mock_pil_image.convert.return_value = mock_pil_image
        
        mock_byte_stream = MagicMock()
        mock_byte_stream.getvalue.return_value = b"fake_jpeg_data"
        
        with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api, \
             patch('meta_ads_mcp.core.ads.get_ad_creatives', new_callable=AsyncMock) as mock_get_creatives, \
             patch('meta_ads_mcp.core.ads.download_image', new_callable=AsyncMock) as mock_download, \
             patch('meta_ads_mcp.core.ads.PILImage.open') as mock_pil_open, \
             patch('meta_ads_mcp.core.ads.io.BytesIO') as mock_bytesio:
            
            mock_api.side_effect = [mock_ad_data, mock_creative_details]
            mock_get_creatives.return_value = mock_get_ad_creatives_response
            mock_download.return_value = b"fake_image_bytes"
            mock_pil_open.return_value = mock_pil_image
            mock_bytesio.return_value = mock_byte_stream
            
            # This should fall back to object_story_spec.link_data.picture
            result = await get_ad_image(access_token="test_token", ad_id="test_ad_id")
            
            # Verify it used object_story_spec.link_data.picture
            assert result is not None
            mock_download.assert_called_once_with("https://example.com/object_story_picture.jpg")
    
    @pytest.mark.asyncio
    async def test_uses_thumbnail_url_only_as_last_resort(self):
        """Test that thumbnail_url is only used when no other options are available."""
        
        # Mock responses for creative with only thumbnail_url
        mock_ad_data = {
            "account_id": "act_123456789",
            "creative": {"id": "creative_123456789"}
        }
        
        mock_creative_details = {
            "id": "creative_123456789",
            "name": "Test Creative"
        }
        
        # Mock get_ad_creatives response with only thumbnail_url
        mock_get_ad_creatives_response = json.dumps({
            "data": [
                {
                    "id": "creative_123456789",
                    "name": "Test Creative",
                    "status": "ACTIVE",
                    "thumbnail_url": "https://example.com/thumbnail_only.jpg"  # Only option
                }
            ]
        })
        
        # Mock PIL Image processing
        mock_pil_image = MagicMock()
        mock_pil_image.mode = "RGB"
        mock_pil_image.convert.return_value = mock_pil_image
        
        mock_byte_stream = MagicMock()
        mock_byte_stream.getvalue.return_value = b"fake_jpeg_data"
        
        with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api, \
             patch('meta_ads_mcp.core.ads.get_ad_creatives', new_callable=AsyncMock) as mock_get_creatives, \
             patch('meta_ads_mcp.core.ads.download_image', new_callable=AsyncMock) as mock_download, \
             patch('meta_ads_mcp.core.ads.PILImage.open') as mock_pil_open, \
             patch('meta_ads_mcp.core.ads.io.BytesIO') as mock_bytesio:
            
            mock_api.side_effect = [mock_ad_data, mock_creative_details]
            mock_get_creatives.return_value = mock_get_ad_creatives_response
            mock_download.return_value = b"fake_image_bytes"
            mock_pil_open.return_value = mock_pil_image
            mock_bytesio.return_value = mock_byte_stream
            
            # This should use thumbnail_url as last resort
            result = await get_ad_image(access_token="test_token", ad_id="test_ad_id")
            
            # Verify it used thumbnail_url
            assert result is not None
            mock_download.assert_called_once_with("https://example.com/thumbnail_only.jpg")
    
    def test_extract_creative_image_urls_prioritizes_quality(self):
        """Test that extract_creative_image_urls correctly prioritizes image quality."""
        
        # Test creative with multiple image URLs
        test_creative = {
            "id": "creative_123456789",
            "name": "Test Creative",
            "thumbnail_url": "https://example.com/thumbnail.jpg",  # Lowest priority
            "image_url": "https://example.com/image.jpg",  # Medium priority
            "image_urls_for_viewing": [
                "https://example.com/high_quality_1.jpg",  # Highest priority
                "https://example.com/high_quality_2.jpg"
            ],
            "object_story_spec": {
                "link_data": {
                    "picture": "https://example.com/object_story_picture.jpg"  # High priority
                }
            }
        }
        
        # Extract URLs
        urls = extract_creative_image_urls(test_creative)
        
        # Verify correct priority order
        assert len(urls) >= 4
        assert urls[0] == "https://example.com/high_quality_1.jpg"  # First priority
        assert urls[1] == "https://example.com/high_quality_2.jpg"  # Second priority
        assert "https://example.com/image.jpg" in urls  # Medium priority
        assert "https://example.com/object_story_picture.jpg" in urls  # High priority
        assert urls[-1] == "https://example.com/thumbnail.jpg"  # Last priority
    
    def test_extract_creative_image_urls_handles_missing_fields(self):
        """Test that extract_creative_image_urls handles missing fields gracefully."""
        
        # Test creative with minimal fields
        test_creative = {
            "id": "creative_123456789",
            "name": "Minimal Creative",
            "thumbnail_url": "https://example.com/thumbnail.jpg"
        }
        
        # Extract URLs
        urls = extract_creative_image_urls(test_creative)
        
        # Should still work with only thumbnail_url
        assert len(urls) == 1
        assert urls[0] == "https://example.com/thumbnail.jpg"
    
    def test_extract_creative_image_urls_removes_duplicates(self):
        """Test that extract_creative_image_urls removes duplicate URLs."""
        
        # Test creative with duplicate URLs
        test_creative = {
            "id": "creative_123456789",
            "name": "Duplicate URLs Creative",
            "thumbnail_url": "https://example.com/same_url.jpg",
            "image_url": "https://example.com/same_url.jpg",  # Duplicate
            "image_urls_for_viewing": [
                "https://example.com/same_url.jpg",  # Duplicate
                "https://example.com/unique_url.jpg"
            ]
        }
        
        # Extract URLs
        urls = extract_creative_image_urls(test_creative)
        
        # Should remove duplicates while preserving order
        assert len(urls) == 2
        assert urls[0] == "https://example.com/same_url.jpg"  # First occurrence
        assert urls[1] == "https://example.com/unique_url.jpg"
    
    @pytest.mark.asyncio
    async def test_get_ad_image_with_real_world_example(self):
        """Test with a real-world example that mimics the actual API response structure."""
        
        # Mock responses based on real API data
        mock_ad_data = {
            "account_id": "act_15975950",
            "creative": {"id": "606995022142818"}
        }
        
        mock_creative_details = {
            "id": "606995022142818",
            "name": "Test Creative"
        }
        
        # Mock get_ad_creatives response based on real data
        mock_get_ad_creatives_response = json.dumps({
            "data": [
                {
                    "id": "606995022142818",
                    "name": "Test Creative",
                    "status": "ACTIVE",
                    "thumbnail_url": "https://external.fbsb6-1.fna.fbcdn.net/emg1/v/t13/13476424677788553381?url=https%3A%2F%2Fwww.facebook.com%2Fads%2Fimage%2F%3Fd%3DAQLuJ5l4AROBvIUchp4g4JXxIT5uAZiAsgHQkD8Iw7BeVtkXNUUfs3leWpqQplJCJdixVIg3mq9KichJ64eRfM-r8aY4GtVQp8TvS_HBByJ8fGg_Cs7Kb8YkN4IDwJ4iQIIkMx30LycCKzuYtp9M-vOk&fb_obo=1&utld=facebook.com&stp=c0.5000x0.5000f_dst-emg0_p64x64_q75_tt6&edm=AEuWsiQEAAAA&_nc_gid=_QBCRbZxDq-i1ZiGEXxW2w&_nc_eui2=AeEbQXzmAdoqWLIXjuTDJ0xAoThZu47BlQqhOFm7jsGVCloP48Ep6Y_qIA5tcqrcSDff5f_k8xGzFIpD7PnUws8c&_nc_oc=Adn3GeYlXxbfEeY0wCBSgNdlwO80wXt5R5bgY2NozdroZ6CRSaXIaOSjVSK9S1LsqsY4GL_0dVzU80RY8QMucEkZ&ccb=13-1&oh=06_Q3-1AcBKUD0rfLGATAveIM5hMSWG9c7DsJzq2arvOl8W4Bpn&oe=688C87B2&_nc_sid=58080a",
                    "image_url": "https://scontent.fbsb6-1.fna.fbcdn.net/v/t45.1600-4/518574136_1116014047008737_2492837958169838537_n.png?stp=dst-jpg_tt6&_nc_cat=109&ccb=1-7&_nc_sid=890911&_nc_eui2=AeHbHqoiAUgF0QeX-tvUoDjYeTyJad_QEPF5PIlp39AQ8dP8cvOlHwiJjny8AUv7xxAlYyy5BGCqFU_oVM9CI7ln&_nc_ohc=VTTYlMOAWZoQ7kNvwGjLMW5&_nc_oc=AdnYDrpNrLovWZC_RG4tvoICGPjBNfzNJimhx-4SKW4BU2i_yzL00dX0-OiYEYokq394g8xR-1a-OuVDAm4HsSJy&_nc_zt=1&_nc_ht=scontent.fbsb6-1.fna&edm=AEuWsiQEAAAA&_nc_gid=_QBCRbZxDq-i1ZiGEXxW2w&oh=00_AfTujKmF365FnGgcokkkdWnK-vmnzQK8Icvlk0kB8SKM3g&oe=68906FC4",
                    "image_urls_for_viewing": [
                        "https://scontent.fbsb6-1.fna.fbcdn.net/v/t45.1600-4/518574136_1116014047008737_2492837958169838537_n.png?stp=dst-jpg_tt6&_nc_cat=109&ccb=1-7&_nc_sid=890911&_nc_eui2=AeHbHqoiAUgF0QeX-tvUoDjYeTyJad_QEPF5PIlp39AQ8dP8cvOlHwiJjny8AUv7xxAlYyy5BGCqFU_oVM9CI7ln&_nc_ohc=VTTYlMOAWZoQ7kNvwGjLMW5&_nc_oc=AdnYDrpNrLovWZC_RG4tvoICGPjBNfzNJimhx-4SKW4BU2i_yzL00dX0-OiYEYokq394g8xR-1a-OuVDAm4HsSJy&_nc_zt=1&_nc_ht=scontent.fbsb6-1.fna&edm=AEuWsiQEAAAA&_nc_gid=_QBCRbZxDq-i1ZiGEXxW2w&oh=00_AfTujKmF365FnGgcokkkdWnK-vmnzQK8Icvlk0kB8SKM3g&oe=68906FC4",
                        "https://external.fbsb6-1.fna.fbcdn.net/emg1/v/t13/13476424677788553381?url=https%3A%2F%2Fwww.facebook.com%2Fads%2Fimage%2F%3Fd%3DAQLuJ5l4AROBvIUchp4g4JXxIT5uAZiAsgHQkD8Iw7BeVtkXNUUfs3leWpqQplJCJdixVIg3mq9KichJ64eRfM-r8aY4GtVQp8TvS_HBByJ8fGg_Cs7Kb8YkN4IDwJ4iQIIkMx30LycCKzuYtp9M-vOk&fb_obo=1&utld=facebook.com&stp=c0.5000x0.5000f_dst-emg0_p64x64_q75_tt6&edm=AEuWsiQEAAAA&_nc_gid=_QBCRbZxDq-i1ZiGEXxW2w&_nc_eui2=AeEbQXzmAdoqWLIXjuTDJ0xAoThZu47BlQqhOFm7jsGVCloP48Ep6Y_qIA5tcqrcSDff5f_k8xGzFIpD7PnUws8c&_nc_oc=Adn3GeYlXxbfEeY0wCBSgNdlwO80wXt5R5bgY2NozdroZ6CRSaXIaOSjVSK9S1LsqsY4GL_0dVzU80RY8QMucEkZ&ccb=13-1&oh=06_Q3-1AcBKUD0rfLGATAveIM5hMSWG9c7DsJzq2arvOl8W4Bpn&oe=688C87B2&_nc_sid=58080a"
                    ]
                }
            ]
        })
        
        # Mock PIL Image processing
        mock_pil_image = MagicMock()
        mock_pil_image.mode = "RGB"
        mock_pil_image.convert.return_value = mock_pil_image
        
        mock_byte_stream = MagicMock()
        mock_byte_stream.getvalue.return_value = b"fake_jpeg_data"
        
        with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api, \
             patch('meta_ads_mcp.core.ads.get_ad_creatives', new_callable=AsyncMock) as mock_get_creatives, \
             patch('meta_ads_mcp.core.ads.download_image', new_callable=AsyncMock) as mock_download, \
             patch('meta_ads_mcp.core.ads.PILImage.open') as mock_pil_open, \
             patch('meta_ads_mcp.core.ads.io.BytesIO') as mock_bytesio:
            
            mock_api.side_effect = [mock_ad_data, mock_creative_details]
            mock_get_creatives.return_value = mock_get_ad_creatives_response
            mock_download.return_value = b"fake_image_bytes"
            mock_pil_open.return_value = mock_pil_image
            mock_bytesio.return_value = mock_byte_stream
            
            # This should use the first image_urls_for_viewing URL (high quality)
            result = await get_ad_image(access_token="test_token", ad_id="test_ad_id")
            
            # Verify it used the high-quality URL (not the thumbnail)
            assert result is not None
            expected_url = "https://scontent.fbsb6-1.fna.fbcdn.net/v/t45.1600-4/518574136_1116014047008737_2492837958169838537_n.png?stp=dst-jpg_tt6&_nc_cat=109&ccb=1-7&_nc_sid=890911&_nc_eui2=AeHbHqoiAUgF0QeX-tvUoDjYeTyJad_QEPF5PIlp39AQ8dP8cvOlHwiJjny8AUv7xxAlYyy5BGCqFU_oVM9CI7ln&_nc_ohc=VTTYlMOAWZoQ7kNvwGjLMW5&_nc_oc=AdnYDrpNrLovWZC_RG4tvoICGPjBNfzNJimhx-4SKW4BU2i_yzL00dX0-OiYEYokq394g8xR-1a-OuVDAm4HsSJy&_nc_zt=1&_nc_ht=scontent.fbsb6-1.fna&edm=AEuWsiQEAAAA&_nc_gid=_QBCRbZxDq-i1ZiGEXxW2w&oh=00_AfTujKmF365FnGgcokkkdWnK-vmnzQK8Icvlk0kB8SKM3g&oe=68906FC4"
            mock_download.assert_called_once_with(expected_url) 
```

--------------------------------------------------------------------------------
/tests/test_get_ad_image_regression.py:
--------------------------------------------------------------------------------

```python
"""Regression tests for get_ad_image function fixes.

Tests for multiple issues that were fixed:

1. JSON Parsing Error: 'TypeError: the JSON object must be str, bytes or bytearray, not dict'
   - Caused by wrong parameter order and incorrect JSON parsing
   - Fixed by correcting parameter order and JSON parsing logic

2. Missing Image Hash Support: "Error: No image hashes found in creative"
   - Many modern creatives don't have image_hash but have direct URLs
   - Fixed by adding direct URL fallback using image_urls_for_viewing and thumbnail_url

3. Image Quality Issue: Function was returning profile thumbnails instead of ad creative images
   - Fixed by prioritizing image_urls_for_viewing over thumbnail_url
   - Added proper fallback hierarchy: image_urls_for_viewing > image_url > object_story_spec.picture > thumbnail_url

The fixes enable get_ad_image to work with both traditional hash-based and modern URL-based creatives,
and ensure high-quality images are returned instead of thumbnails.
"""

import pytest
import json
from unittest.mock import AsyncMock, patch, MagicMock
from meta_ads_mcp.core.ads import get_ad_image


@pytest.mark.asyncio
class TestGetAdImageRegressionFix:
    """Regression test cases for the get_ad_image JSON parsing bug fix."""
    
    async def test_get_ad_image_json_parsing_regression_fix(self):
        """Regression test: ensure get_ad_image doesn't throw JSON parsing error."""
        
        # Mock responses for the main API flow
        mock_ad_data = {
            "account_id": "act_123456789",
            "creative": {"id": "creative_123456789"}
        }
        
        mock_creative_details = {
            "id": "creative_123456789",
            "name": "Test Creative", 
            "image_hash": "test_hash_123"
        }
        
        mock_image_data = {
            "data": [{
                "hash": "test_hash_123",
                "url": "https://example.com/image.jpg",
                "width": 1200,
                "height": 628,
                "name": "test_image.jpg",
                "status": "ACTIVE"
            }]
        }
        
        # Mock PIL Image processing to return a valid Image object
        mock_pil_image = MagicMock()
        mock_pil_image.mode = "RGB"
        mock_pil_image.convert.return_value = mock_pil_image
        
        mock_byte_stream = MagicMock()
        mock_byte_stream.getvalue.return_value = b"fake_jpeg_data"
        
        with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api, \
             patch('meta_ads_mcp.core.ads.download_image', new_callable=AsyncMock) as mock_download, \
             patch('meta_ads_mcp.core.ads.PILImage.open') as mock_pil_open, \
             patch('meta_ads_mcp.core.ads.io.BytesIO') as mock_bytesio:
            
            mock_api.side_effect = [mock_ad_data, mock_creative_details, mock_image_data]
            mock_download.return_value = b"fake_image_bytes"
            mock_pil_open.return_value = mock_pil_image
            mock_bytesio.return_value = mock_byte_stream
            
            # This should NOT raise "the JSON object must be str, bytes or bytearray, not dict"
            # Previously this would fail with: TypeError: the JSON object must be str, bytes or bytearray, not dict
            result = await get_ad_image(access_token="test_token", ad_id="120228922871870272")
            
            # Verify we get an Image object (success) - the exact test depends on the mocking
            # The key is that we don't get the JSON parsing error
            assert result is not None
            
            # The main regression check: if we got here without an exception, the JSON parsing is fixed
            # We might get different results based on mocking, but the critical JSON parsing should work
            
    async def test_get_ad_image_fallback_path_json_parsing(self):
        """Test the fallback path that calls get_ad_creatives handles JSON parsing correctly."""
        
        # Mock responses that trigger the fallback path (no direct image hash)
        mock_ad_data = {
            "account_id": "act_123456789",
            "creative": {"id": "creative_123456789"}
        }
        
        mock_creative_details = {
            "id": "creative_123456789",
            "name": "Test Creative"
            # No image_hash - this will trigger the fallback
        }
        
        # Mock get_ad_creatives response (wrapped format that caused the original bug)
        mock_get_ad_creatives_response = json.dumps({
            "data": json.dumps({
                "data": [
                    {
                        "id": "creative_123456789",
                        "name": "Test Creative",
                        "object_story_spec": {
                            "link_data": {
                                "image_hash": "fallback_hash_123"
                            }
                        }
                    }
                ]
            })
        })
        
        mock_image_data = {
            "data": [{
                "hash": "fallback_hash_123",
                "url": "https://example.com/fallback_image.jpg",
                "width": 1200,
                "height": 628
            }]
        }
        
        # Mock PIL Image processing
        mock_pil_image = MagicMock()
        mock_pil_image.mode = "RGB"
        mock_pil_image.convert.return_value = mock_pil_image
        
        mock_byte_stream = MagicMock()
        mock_byte_stream.getvalue.return_value = b"fake_jpeg_data"
        
        with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api, \
             patch('meta_ads_mcp.core.ads.get_ad_creatives', new_callable=AsyncMock) as mock_get_creatives, \
             patch('meta_ads_mcp.core.ads.download_image', new_callable=AsyncMock) as mock_download, \
             patch('meta_ads_mcp.core.ads.PILImage.open') as mock_pil_open, \
             patch('meta_ads_mcp.core.ads.io.BytesIO') as mock_bytesio:
            
            mock_api.side_effect = [mock_ad_data, mock_creative_details, mock_image_data]
            mock_get_creatives.return_value = mock_get_ad_creatives_response
            mock_download.return_value = b"fake_image_bytes"
            mock_pil_open.return_value = mock_pil_image
            mock_bytesio.return_value = mock_byte_stream
            
            # This should handle the wrapped JSON response correctly
            # Previously would fail: TypeError: the JSON object must be str, bytes or bytearray, not dict
            result = await get_ad_image(access_token="test_token", ad_id="120228922871870272")
            
            # Verify the fallback path worked - key is no JSON parsing exception
            assert result is not None
            # Verify get_ad_creatives was called (fallback path was triggered)
            mock_get_creatives.assert_called_once()
    
    async def test_get_ad_image_no_ad_id(self):
        """Test get_ad_image with no ad_id provided."""
        
        result = await get_ad_image(access_token="test_token", ad_id=None)
        
        # Should return error string, not throw JSON parsing error
        assert isinstance(result, str)
        assert "Error: No ad ID provided" in result
    
    async def test_get_ad_image_parameter_order_regression(self):
        """Regression test: ensure get_ad_creatives is called with correct parameter order."""
        
        # This test ensures we don't regress to calling get_ad_creatives(ad_id, "", access_token)
        # which was the original bug
        
        mock_ad_data = {
            "account_id": "act_123456789", 
            "creative": {"id": "creative_123456789"}
        }
        
        mock_creative_details = {
            "id": "creative_123456789",
            "name": "Test Creative"
            # No image_hash to trigger fallback
        }
        
        with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api, \
             patch('meta_ads_mcp.core.ads.get_ad_creatives', new_callable=AsyncMock) as mock_get_creatives:
            
            mock_api.side_effect = [mock_ad_data, mock_creative_details]
            mock_get_creatives.return_value = json.dumps({"data": json.dumps({"data": []})})
            
            # Call get_ad_image - it should reach the fallback path
            result = await get_ad_image(access_token="test_token", ad_id="test_ad_id")
            
            # Verify get_ad_creatives was called with correct parameter names (not positional)
            mock_get_creatives.assert_called_once_with(ad_id="test_ad_id", access_token="test_token")
            
                         # The key regression test: this should not have raised a JSON parsing error
    
    async def test_get_ad_image_direct_url_fallback_with_image_urls_for_viewing(self):
        """Test direct URL fallback using image_urls_for_viewing when no image_hash found."""
        
        # Mock responses for modern creative without image_hash
        mock_ad_data = {
            "account_id": "act_123456789",
            "creative": {"id": "creative_123456789"}
        }
        
        mock_creative_details = {
            "id": "creative_123456789",
            "name": "Modern Creative"
            # No image_hash - this will trigger fallback
        }
        
        # Mock get_ad_creatives response with direct URLs
        mock_get_ad_creatives_response = json.dumps({
            "data": [
                {
                    "id": "creative_123456789",
                    "name": "Modern Creative",
                    "status": "ACTIVE",
                    "thumbnail_url": "https://example.com/thumb.jpg",
                    "image_urls_for_viewing": [
                        "https://example.com/full_image.jpg",
                        "https://example.com/alt_image.jpg"
                    ]
                }
            ]
        })
        
        # Mock PIL Image processing
        mock_pil_image = MagicMock()
        mock_pil_image.mode = "RGB"
        mock_pil_image.convert.return_value = mock_pil_image
        
        mock_byte_stream = MagicMock()
        mock_byte_stream.getvalue.return_value = b"fake_jpeg_data"
        
        with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api, \
             patch('meta_ads_mcp.core.ads.get_ad_creatives', new_callable=AsyncMock) as mock_get_creatives, \
             patch('meta_ads_mcp.core.ads.download_image', new_callable=AsyncMock) as mock_download, \
             patch('meta_ads_mcp.core.ads.PILImage.open') as mock_pil_open, \
             patch('meta_ads_mcp.core.ads.io.BytesIO') as mock_bytesio:
            
            mock_api.side_effect = [mock_ad_data, mock_creative_details]
            mock_get_creatives.return_value = mock_get_ad_creatives_response
            mock_download.return_value = b"fake_image_bytes"
            mock_pil_open.return_value = mock_pil_image
            mock_bytesio.return_value = mock_byte_stream
            
            # This should use direct URL fallback successfully
            result = await get_ad_image(access_token="test_token", ad_id="test_ad_id")
            
            # Verify it used the direct URL approach
            assert result is not None
            mock_get_creatives.assert_called_once()
            mock_download.assert_called_once_with("https://example.com/full_image.jpg")
    
    async def test_get_ad_image_direct_url_fallback_with_thumbnail_url_only(self):
        """Test direct URL fallback using thumbnail_url when image_urls_for_viewing not available."""
        
        # Mock responses for creative with only thumbnail_url
        mock_ad_data = {
            "account_id": "act_123456789",
            "creative": {"id": "creative_123456789"}
        }
        
        mock_creative_details = {
            "id": "creative_123456789",
            "name": "Thumbnail Only Creative"
            # No image_hash
        }
        
        # Mock get_ad_creatives response with only thumbnail_url
        mock_get_ad_creatives_response = json.dumps({
            "data": [
                {
                    "id": "creative_123456789",
                    "name": "Thumbnail Only Creative",
                    "status": "ACTIVE",
                    "thumbnail_url": "https://example.com/thumb_only.jpg"
                    # No image_urls_for_viewing
                }
            ]
        })
        
        # Mock PIL Image processing
        mock_pil_image = MagicMock()
        mock_pil_image.mode = "RGB"
        mock_pil_image.convert.return_value = mock_pil_image
        
        mock_byte_stream = MagicMock()
        mock_byte_stream.getvalue.return_value = b"fake_jpeg_data"
        
        with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api, \
             patch('meta_ads_mcp.core.ads.get_ad_creatives', new_callable=AsyncMock) as mock_get_creatives, \
             patch('meta_ads_mcp.core.ads.download_image', new_callable=AsyncMock) as mock_download, \
             patch('meta_ads_mcp.core.ads.PILImage.open') as mock_pil_open, \
             patch('meta_ads_mcp.core.ads.io.BytesIO') as mock_bytesio:
            
            mock_api.side_effect = [mock_ad_data, mock_creative_details]
            mock_get_creatives.return_value = mock_get_ad_creatives_response
            mock_download.return_value = b"fake_image_bytes"
            mock_pil_open.return_value = mock_pil_image
            mock_bytesio.return_value = mock_byte_stream
            
            # This should fall back to thumbnail_url
            result = await get_ad_image(access_token="test_token", ad_id="test_ad_id")
            
            # Verify it used the thumbnail URL
            assert result is not None
            mock_download.assert_called_once_with("https://example.com/thumb_only.jpg")
    
    async def test_get_ad_image_no_direct_urls_available(self):
        """Test error handling when no direct URLs are available."""
        
        # Mock responses for creative without any URLs
        mock_ad_data = {
            "account_id": "act_123456789",
            "creative": {"id": "creative_123456789"}
        }
        
        mock_creative_details = {
            "id": "creative_123456789",
            "name": "No URLs Creative"
            # No image_hash
        }
        
        # Mock get_ad_creatives response without URLs
        mock_get_ad_creatives_response = json.dumps({
            "data": [
                {
                    "id": "creative_123456789", 
                    "name": "No URLs Creative",
                    "status": "ACTIVE"
                    # No thumbnail_url or image_urls_for_viewing
                }
            ]
        })
        
        with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api, \
             patch('meta_ads_mcp.core.ads.get_ad_creatives', new_callable=AsyncMock) as mock_get_creatives:
            
            mock_api.side_effect = [mock_ad_data, mock_creative_details]
            mock_get_creatives.return_value = mock_get_ad_creatives_response
            
            # This should return appropriate error
            result = await get_ad_image(access_token="test_token", ad_id="test_ad_id")
            
            # Should get error about no URLs
            assert isinstance(result, str)
            assert "No image URLs found" in result
    
    async def test_get_ad_image_direct_url_download_failure(self):
        """Test error handling when direct URL download fails."""
        
        # Mock responses for creative with URLs but download failure
        mock_ad_data = {
            "account_id": "act_123456789",
            "creative": {"id": "creative_123456789"}
        }
        
        mock_creative_details = {
            "id": "creative_123456789",
            "name": "Download Fail Creative"
        }
        
        mock_get_ad_creatives_response = json.dumps({
            "data": [
                {
                    "id": "creative_123456789",
                    "name": "Download Fail Creative",
                    "image_urls_for_viewing": ["https://example.com/broken_image.jpg"]
                }
            ]
        })
        
        with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api, \
             patch('meta_ads_mcp.core.ads.get_ad_creatives', new_callable=AsyncMock) as mock_get_creatives, \
             patch('meta_ads_mcp.core.ads.download_image', new_callable=AsyncMock) as mock_download:
            
            mock_api.side_effect = [mock_ad_data, mock_creative_details]
            mock_get_creatives.return_value = mock_get_ad_creatives_response
            mock_download.return_value = None  # Simulate download failure
            
            # This should return download error
            result = await get_ad_image(access_token="test_token", ad_id="test_ad_id")
            
            # Should get error about download failure
            assert isinstance(result, str)
            assert "Failed to download image from direct URL" in result
    
    async def test_get_ad_image_quality_improvement_prioritizes_high_quality(self):
        """Test that the image quality improvement correctly prioritizes high-quality images over thumbnails."""
        
        # Mock responses for creative with both high-quality and thumbnail URLs
        mock_ad_data = {
            "account_id": "act_123456789",
            "creative": {"id": "creative_123456789"}
        }
        
        mock_creative_details = {
            "id": "creative_123456789",
            "name": "Quality Test Creative"
        }
        
        # Mock get_ad_creatives response with both URLs
        mock_get_ad_creatives_response = json.dumps({
            "data": [
                {
                    "id": "creative_123456789",
                    "name": "Quality Test Creative",
                    "status": "ACTIVE",
                    "thumbnail_url": "https://example.com/thumbnail_64x64.jpg",  # Low quality thumbnail
                    "image_url": "https://example.com/full_image.jpg",  # Medium quality
                    "image_urls_for_viewing": [
                        "https://example.com/high_quality_image.jpg",  # Highest quality
                        "https://example.com/alt_high_quality.jpg"
                    ],
                    "object_story_spec": {
                        "link_data": {
                            "picture": "https://example.com/object_story_picture.jpg"
                        }
                    }
                }
            ]
        })
        
        # Mock PIL Image processing
        mock_pil_image = MagicMock()
        mock_pil_image.mode = "RGB"
        mock_pil_image.convert.return_value = mock_pil_image
        
        mock_byte_stream = MagicMock()
        mock_byte_stream.getvalue.return_value = b"fake_jpeg_data"
        
        with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api, \
             patch('meta_ads_mcp.core.ads.get_ad_creatives', new_callable=AsyncMock) as mock_get_creatives, \
             patch('meta_ads_mcp.core.ads.download_image', new_callable=AsyncMock) as mock_download, \
             patch('meta_ads_mcp.core.ads.PILImage.open') as mock_pil_open, \
             patch('meta_ads_mcp.core.ads.io.BytesIO') as mock_bytesio:
            
            mock_api.side_effect = [mock_ad_data, mock_creative_details]
            mock_get_creatives.return_value = mock_get_ad_creatives_response
            mock_download.return_value = b"fake_image_bytes"
            mock_pil_open.return_value = mock_pil_image
            mock_bytesio.return_value = mock_byte_stream
            
            # This should prioritize image_urls_for_viewing[0] over thumbnail_url
            result = await get_ad_image(access_token="test_token", ad_id="test_ad_id")
            
            # Verify it used the highest quality URL, not the thumbnail
            assert result is not None
            mock_download.assert_called_once_with("https://example.com/high_quality_image.jpg")
            
            # Verify it did NOT use the thumbnail URL
            # Check that the call was made with the high-quality URL, not the thumbnail
            mock_download.assert_called_once_with("https://example.com/high_quality_image.jpg") 
```

--------------------------------------------------------------------------------
/tests/test_dsa_integration.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Integration test for DSA beneficiary functionality.

This test demonstrates the complete DSA beneficiary implementation working end-to-end,
including detection, parameter support, and error handling.
"""

import pytest
import json
from unittest.mock import AsyncMock, patch

from meta_ads_mcp.core.adsets import create_adset, get_adset_details
from meta_ads_mcp.core.accounts import get_account_info


class TestDSAIntegration:
    """Integration tests for DSA beneficiary functionality"""
    
    @pytest.mark.asyncio
    async def test_dsa_beneficiary_complete_workflow(self):
        """Test complete DSA beneficiary workflow from account detection to ad set creation"""
        
        # Step 1: Get account info and detect DSA requirement
        mock_account_response = {
            "id": "act_701351919139047",
            "name": "Test European Account",
            "account_status": 1,
            "business_country_code": "DE",  # Germany - DSA compliant
            "business_city": "Berlin",
            "currency": "EUR"
        }
        
        with patch('meta_ads_mcp.core.accounts.make_api_request', new_callable=AsyncMock) as mock_account_api:
            with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
                mock_auth.return_value = "test_access_token"
                mock_account_api.return_value = mock_account_response
                
                # Get account info and verify DSA detection
                result = await get_account_info(account_id="act_701351919139047")
                
                # Handle new return format (dictionary instead of JSON string)
                if isinstance(result, dict):
                    result_data = result
                else:
                    result_data = json.loads(result)
                
                # Verify DSA requirement is detected
                assert result_data["business_country_code"] == "DE"
                assert result_data["dsa_required"] == True
                assert "DSA (Digital Services Act)" in result_data["dsa_compliance_note"]
        
        # Step 2: Create ad set with DSA beneficiary
        mock_adset_response = {
            "id": "23842588888640185",
            "name": "Test European Ad Set",
            "status": "PAUSED",
            "dsa_beneficiary": "Test Organization GmbH"
        }
        
        with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_adset_api:
            with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
                mock_auth.return_value = "test_access_token"
                mock_adset_api.return_value = mock_adset_response
                
                # Create ad set with DSA beneficiary
                result = await create_adset(
                    account_id="act_701351919139047",
                    campaign_id="23842588888640184",
                    name="Test European Ad Set",
                    optimization_goal="LINK_CLICKS",
                    billing_event="IMPRESSIONS",
                    dsa_beneficiary="Test Organization GmbH"
                )
                
                result_data = json.loads(result)
                
                # Verify successful creation with DSA beneficiary
                assert result_data["id"] == "23842588888640185"
                assert result_data["dsa_beneficiary"] == "Test Organization GmbH"
                
                # Verify API call included DSA beneficiary parameter
                mock_adset_api.assert_called_once()
                call_args = mock_adset_api.call_args
                params = call_args[0][2]  # Third argument is params
                assert "dsa_beneficiary" in params
                assert params["dsa_beneficiary"] == "Test Organization GmbH"
    
    @pytest.mark.asyncio
    async def test_dsa_beneficiary_error_handling_integration(self):
        """Test DSA beneficiary error handling in real-world scenarios"""
        
        # Test 1: Missing DSA beneficiary for European account
        mock_error_response = {
            "error": {
                "message": "Enter the person or organization that benefits from ads in this ad set",
                "type": "OAuthException",
                "code": 100
            }
        }
        
        with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
            with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
                mock_auth.return_value = "test_access_token"
                mock_api.side_effect = Exception(json.dumps(mock_error_response))
                
                result = await create_adset(
                    account_id="act_701351919139047",
                    campaign_id="23842588888640184",
                    name="Test European Ad Set",
                    optimization_goal="LINK_CLICKS",
                    billing_event="IMPRESSIONS"
                    # No DSA beneficiary provided
                )
                
                result_data = json.loads(result)
                
                # Handle response wrapped in 'data' field by meta_api_tool decorator
                if "data" in result_data:
                    actual_data = json.loads(result_data["data"])
                else:
                    actual_data = result_data
                
                # Verify error is properly handled
                assert "error" in actual_data
                assert "benefits from ads" in actual_data["error"]
        
        # Test 2: Permission error for DSA beneficiary
        with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
            with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
                mock_auth.return_value = "test_access_token"
                mock_api.side_effect = Exception("Permission denied: business_management permission required")
                
                result = await create_adset(
                    account_id="act_701351919139047",
                    campaign_id="23842588888640184",
                    name="Test European Ad Set",
                    optimization_goal="LINK_CLICKS",
                    billing_event="IMPRESSIONS",
                    dsa_beneficiary="Test Organization GmbH"
                )
                
                result_data = json.loads(result)
                
                # Handle response wrapped in 'data' field by meta_api_tool decorator
                if "data" in result_data:
                    actual_data = json.loads(result_data["data"])
                else:
                    actual_data = result_data
                
                # Verify permission error is handled
                assert "error" in actual_data
                assert "permission" in actual_data["error"].lower()
    
    @pytest.mark.asyncio
    async def test_dsa_beneficiary_regional_compliance_integration(self):
        """Test DSA beneficiary compliance across different regions"""
        
        # Test 1: European account (DSA required)
        mock_de_account_response = {
            "id": "act_de",
            "name": "German Account",
            "business_country_code": "DE",
            "currency": "EUR"
        }
        
        with patch('meta_ads_mcp.core.accounts.make_api_request', new_callable=AsyncMock) as mock_api:
            with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
                mock_auth.return_value = "test_access_token"
                mock_api.return_value = mock_de_account_response
                
                result = await get_account_info(account_id="act_de")
                
                # Handle new return format (dictionary instead of JSON string)
                if isinstance(result, dict):
                    result_data = result
                else:
                    result_data = json.loads(result)
                
                # Verify DSA requirement for German account
                assert result_data["business_country_code"] == "DE"
                assert result_data["dsa_required"] == True
        
        # Test 2: US account (DSA not required)
        mock_us_account_response = {
            "id": "act_us",
            "name": "US Account",
            "business_country_code": "US",
            "currency": "USD"
        }
        
        with patch('meta_ads_mcp.core.accounts.make_api_request', new_callable=AsyncMock) as mock_api:
            with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
                mock_auth.return_value = "test_access_token"
                mock_api.return_value = mock_us_account_response
                
                result = await get_account_info(account_id="act_us")
                
                # Handle new return format (dictionary instead of JSON string)
                if isinstance(result, dict):
                    result_data = result
                else:
                    result_data = json.loads(result)
                
                # Verify no DSA requirement for US account
                assert result_data["business_country_code"] == "US"
                assert result_data["dsa_required"] == False
    
    @pytest.mark.asyncio
    async def test_dsa_beneficiary_parameter_formats_integration(self):
        """Test different DSA beneficiary parameter formats in real scenarios"""
        
        test_cases = [
            "Test Organization GmbH",
            "Test Organization, Inc.",
            "Test Organization Ltd.",
            "Test Organization AG",
            "Test Organization BV",
            "Test Organization SARL"
        ]
        
        for beneficiary_name in test_cases:
            mock_response = {
                "id": "23842588888640185",
                "name": "Test Ad Set",
                "status": "PAUSED",
                "dsa_beneficiary": beneficiary_name
            }
            
            with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
                with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
                    mock_auth.return_value = "test_access_token"
                    mock_api.return_value = mock_response
                    
                    result = await create_adset(
                        account_id="act_701351919139047",
                        campaign_id="23842588888640184",
                        name="Test Ad Set",
                        optimization_goal="LINK_CLICKS",
                        billing_event="IMPRESSIONS",
                        dsa_beneficiary=beneficiary_name
                    )
                    
                    result_data = json.loads(result)
                    
                    # Verify successful creation with different formats
                    assert result_data["id"] == "23842588888640185"
                    assert result_data["dsa_beneficiary"] == beneficiary_name
    
    @pytest.mark.asyncio
    async def test_dsa_beneficiary_retrieval_integration(self):
        """Test complete workflow including retrieving DSA beneficiary information"""
        
        # Step 1: Create ad set with DSA beneficiary
        mock_create_response = {
            "id": "120229746629010183",
            "name": "Test Ad Set with DSA",
            "status": "PAUSED"
        }
        
        with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
            with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
                mock_auth.return_value = "test_access_token"
                mock_api.return_value = mock_create_response
                
                # Create ad set
                result = await create_adset(
                    account_id="act_701351919139047",
                    campaign_id="120229656904980183",
                    name="Test Ad Set with DSA",
                    optimization_goal="LINK_CLICKS",
                    billing_event="IMPRESSIONS",
                    dsa_beneficiary="Test Organization Inc"
                )
                
                result_data = json.loads(result)
                adset_id = result_data["id"]
                
                # Verify creation was successful
                assert adset_id == "120229746629010183"
        
        # Step 2: Retrieve ad set details including DSA beneficiary
        mock_details_response = {
            "id": "120229746629010183",
            "name": "Test Ad Set with DSA",
            "campaign_id": "120229656904980183",
            "status": "PAUSED",
            "daily_budget": "1000",
            "targeting": {
                "geo_locations": {"countries": ["US"]},
                "age_min": 25,
                "age_max": 65
            },
            "bid_amount": 200,
            "optimization_goal": "LINK_CLICKS",
            "billing_event": "IMPRESSIONS",
            "dsa_beneficiary": "Test Organization Inc"
        }
        
        with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
            with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
                mock_auth.return_value = "test_access_token"
                mock_api.return_value = mock_details_response
                
                # Retrieve ad set details
                result = await get_adset_details(adset_id=adset_id)
                result_data = json.loads(result)
                
                # Verify DSA beneficiary field is retrieved correctly
                assert result_data["id"] == "120229746629010183"
                assert "dsa_beneficiary" in result_data
                assert result_data["dsa_beneficiary"] == "Test Organization Inc"
                
                # Verify API call included dsa_beneficiary in fields
                mock_api.assert_called_once()
                call_args = mock_api.call_args
                assert "dsa_beneficiary" in str(call_args)
    
    @pytest.mark.asyncio
    async def test_dsa_beneficiary_us_account_integration(self):
        """Test DSA beneficiary behavior for US accounts (optional parameter)"""
        
        # Step 1: Verify US account doesn't require DSA
        mock_us_account_response = {
            "id": "act_701351919139047",
            "name": "US Business Account",
            "business_country_code": "US",
            "account_status": 1
        }
        
        with patch('meta_ads_mcp.core.accounts.make_api_request', new_callable=AsyncMock) as mock_api:
            with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
                mock_auth.return_value = "test_access_token"
                mock_api.return_value = mock_us_account_response
                
                result = await get_account_info(account_id="act_701351919139047")
                result_data = json.loads(result)
                
                # Verify US account doesn't require DSA
                assert result_data["business_country_code"] == "US"
                assert result_data["dsa_required"] == False
        
        # Step 2: Create ad set without DSA beneficiary (should work for US)
        mock_create_response = {
            "id": "120229746624860183",
            "name": "Test US Ad Set",
            "status": "PAUSED"
        }
        
        with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
            with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
                mock_auth.return_value = "test_access_token"
                mock_api.return_value = mock_create_response
                
                result = await create_adset(
                    account_id="act_701351919139047",
                    campaign_id="120229656904980183",
                    name="Test US Ad Set",
                    optimization_goal="LINK_CLICKS",
                    billing_event="IMPRESSIONS"
                    # No DSA beneficiary provided
                )
                
                result_data = json.loads(result)
                
                # Verify creation was successful without DSA beneficiary
                assert result_data["id"] == "120229746624860183"
        
        # Step 3: Retrieve ad set details (should not have dsa_beneficiary field)
        mock_details_response = {
            "id": "120229746624860183",
            "name": "Test US Ad Set",
            "campaign_id": "120229656904980183",
            "status": "PAUSED",
            "daily_budget": "1000",
            "targeting": {
                "geo_locations": {"countries": ["US"]}
            },
            "bid_amount": 200,
            "optimization_goal": "LINK_CLICKS",
            "billing_event": "IMPRESSIONS"
            # No dsa_beneficiary field
        }
        
        with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
            with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
                mock_auth.return_value = "test_access_token"
                mock_api.return_value = mock_details_response
                
                result = await get_adset_details(adset_id="120229746624860183")
                result_data = json.loads(result)
                
                # Verify ad set details are retrieved correctly
                assert result_data["id"] == "120229746624860183"
                assert "dsa_beneficiary" not in result_data  # Should not be present for US accounts 
    
    @pytest.mark.asyncio
    async def test_account_info_requires_account_id(self):
        """Test that get_account_info requires an account_id parameter"""
        
        with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
            mock_auth.return_value = "test_access_token"
            
            # Test without account_id parameter
            result = await get_account_info(account_id=None)
            
            # Handle new return format (dictionary instead of JSON string)
            if isinstance(result, dict):
                result_data = result
            else:
                result_data = json.loads(result)
            
            # Verify error message for missing account_id
            assert "error" in result_data
            assert "Account ID is required" in result_data["error"]["message"]
            assert "Please specify an account_id parameter" in result_data["error"]["details"]
            assert "example" in result_data["error"]
    
    @pytest.mark.asyncio
    async def test_account_info_inaccessible_account_error(self):
        """Test that get_account_info provides helpful error for inaccessible accounts"""
        
        # Mock permission error for direct account access (first API call)
        mock_permission_error = {
            "error": {
                "message": "Insufficient access privileges",
                "type": "OAuthException",
                "code": 200
            }
        }
        
        # Mock accessible accounts response (second API call)
        mock_accessible_accounts = {
            "data": [
                {"id": "act_123", "name": "Test Account 1"},
                {"id": "act_456", "name": "Test Account 2"}
            ]
        }
        
        with patch('meta_ads_mcp.core.accounts.make_api_request', new_callable=AsyncMock) as mock_api:
            with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
                mock_auth.return_value = "test_access_token"
                # First call returns permission error, second call returns accessible accounts
                mock_api.side_effect = [mock_permission_error, mock_accessible_accounts]
                
                result = await get_account_info(account_id="act_inaccessible")
                
                # Handle new return format (dictionary instead of JSON string)
                if isinstance(result, dict):
                    result_data = result
                else:
                    result_data = json.loads(result)
                
                # Verify helpful error message for inaccessible account
                assert "error" in result_data
                assert "not accessible to your user account" in result_data["error"]["message"]
                assert "accessible_accounts" in result_data["error"]
                assert "suggestion" in result_data["error"]
                assert len(result_data["error"]["accessible_accounts"]) == 2 
```

--------------------------------------------------------------------------------
/tests/test_mobile_app_adset_creation.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Unit Tests for Mobile App Adset Creation Functionality

This test suite validates the mobile app parameters implementation for the
create_adset function in meta_ads_mcp/core/adsets.py.

Test cases cover:
- Mobile app adset creation success scenarios
- promoted_object parameter validation and formatting
- destination_type parameter validation
- Mobile app specific error handling
- Cross-platform mobile app support (iOS, Android)
- Integration with APP_INSTALLS optimization goal

Usage:
    uv run python -m pytest tests/test_mobile_app_adset_creation.py -v

Related to Issue #008: Missing Mobile App Parameters in create_adset Function
"""

import pytest
import json
import asyncio
from unittest.mock import AsyncMock, patch, MagicMock
from typing import Dict, Any, List

# Import the function to test
from meta_ads_mcp.core.adsets import create_adset


class TestMobileAppAdsetCreation:
    """Test suite for mobile app adset creation functionality"""
    
    @pytest.fixture
    def mock_api_request(self):
        """Mock for the make_api_request function"""
        with patch('meta_ads_mcp.core.adsets.make_api_request') as mock:
            mock.return_value = {
                "id": "test_mobile_adset_id",
                "name": "Test Mobile App Adset",
                "optimization_goal": "APP_INSTALLS",
                "promoted_object": {
                    "application_id": "123456789012345",
                    "object_store_url": "https://apps.apple.com/app/id123456789"
                },
                "destination_type": "APP_STORE"
            }
            yield mock
    
    @pytest.fixture
    def mock_auth_manager(self):
        """Mock for the authentication manager"""
        with patch('meta_ads_mcp.core.api.auth_manager') as mock, \
             patch('meta_ads_mcp.core.auth.get_current_access_token') as mock_get_token:
            # Mock a valid access token
            mock.get_current_access_token.return_value = "test_access_token"
            mock.is_token_valid.return_value = True
            mock.app_id = "test_app_id"
            mock_get_token.return_value = "test_access_token"
            yield mock
    
    @pytest.fixture
    def valid_mobile_app_params(self):
        """Valid mobile app parameters for testing"""
        return {
            "account_id": "act_123456789",
            "campaign_id": "campaign_123456789",
            "name": "Test Mobile App Adset",
            "optimization_goal": "APP_INSTALLS",
            "billing_event": "IMPRESSIONS",
            "targeting": {
                "age_min": 18,
                "age_max": 65,
                "app_install_state": "not_installed",
                "geo_locations": {"countries": ["US"]},
                "user_device": ["Android_Smartphone", "iPhone"],
                "user_os": ["Android", "iOS"]
            }
        }
    
    @pytest.fixture
    def ios_promoted_object(self):
        """Valid iOS app promoted object"""
        return {
            "application_id": "123456789012345",
            "object_store_url": "https://apps.apple.com/app/id123456789",
            "custom_event_type": "APP_INSTALL"
        }
    
    @pytest.fixture
    def android_promoted_object(self):
        """Valid Android app promoted object"""
        return {
            "application_id": "987654321098765",
            "object_store_url": "https://play.google.com/store/apps/details?id=com.example.app",
            "custom_event_type": "APP_INSTALL"
        }
    
    @pytest.fixture 
    def promoted_object_with_pixel(self):
        """Promoted object with Facebook pixel for tracking"""
        return {
            "application_id": "123456789012345",
            "object_store_url": "https://apps.apple.com/app/id123456789",
            "custom_event_type": "APP_INSTALL",
            "pixel_id": "pixel_123456789"
        }

    # Test: Mobile App Adset Creation Success
    @pytest.mark.asyncio
    async def test_mobile_app_adset_creation_success_ios(
        self, mock_api_request, mock_auth_manager, valid_mobile_app_params, ios_promoted_object
    ):
        """Test successful iOS mobile app adset creation"""
        
        result = await create_adset(
            **valid_mobile_app_params,
            promoted_object=ios_promoted_object,
            destination_type="APP_STORE"
        )
        
        # Parse the result
        result_data = json.loads(result)
        
        # Verify the API was called with correct parameters
        mock_api_request.assert_called_once()
        call_args = mock_api_request.call_args
        
        # Check endpoint (first argument)
        assert call_args[0][0] == f"{valid_mobile_app_params['account_id']}/adsets"
        
        # Check parameters (third argument)
        params = call_args[0][2]
        assert 'promoted_object' in params
        assert 'destination_type' in params
        
        # Verify promoted_object is properly JSON-encoded
        promoted_obj_param = json.loads(params['promoted_object']) if isinstance(params['promoted_object'], str) else params['promoted_object']
        assert promoted_obj_param['application_id'] == ios_promoted_object['application_id']
        assert promoted_obj_param['object_store_url'] == ios_promoted_object['object_store_url']
        
        # Verify destination_type
        assert params['destination_type'] == "APP_STORE"
        
        # Verify response structure
        assert 'id' in result_data
        assert result_data['optimization_goal'] == "APP_INSTALLS"

    @pytest.mark.asyncio
    async def test_mobile_app_adset_creation_success_android(
        self, mock_api_request, mock_auth_manager, valid_mobile_app_params, android_promoted_object
    ):
        """Test successful Android mobile app adset creation"""
        
        result = await create_adset(
            **valid_mobile_app_params,
            promoted_object=android_promoted_object,
            destination_type="APP_STORE"
        )
        
        # Parse the result  
        result_data = json.loads(result)
        
        # Verify the API was called
        mock_api_request.assert_called_once()
        call_args = mock_api_request.call_args
        params = call_args[0][2]
        
        # Verify Android-specific promoted_object
        promoted_obj_param = json.loads(params['promoted_object']) if isinstance(params['promoted_object'], str) else params['promoted_object']
        assert promoted_obj_param['application_id'] == android_promoted_object['application_id']
        assert "play.google.com" in promoted_obj_param['object_store_url']
        
        # Verify response
        assert 'id' in result_data

    @pytest.mark.asyncio
    async def test_mobile_app_adset_with_pixel_tracking(
        self, mock_api_request, mock_auth_manager, valid_mobile_app_params, promoted_object_with_pixel
    ):
        """Test mobile app adset creation with Facebook pixel tracking"""
        
        result = await create_adset(
            **valid_mobile_app_params,
            promoted_object=promoted_object_with_pixel,
            destination_type="APP_STORE"
        )
        
        # Verify pixel_id is included
        call_args = mock_api_request.call_args
        params = call_args[0][2]
        promoted_obj_param = json.loads(params['promoted_object']) if isinstance(params['promoted_object'], str) else params['promoted_object']
        
        assert 'pixel_id' in promoted_obj_param
        assert promoted_obj_param['pixel_id'] == promoted_object_with_pixel['pixel_id']

    # Test: Parameter Validation
    @pytest.mark.asyncio
    async def test_invalid_promoted_object_missing_application_id(
        self, mock_api_request, mock_auth_manager, valid_mobile_app_params
    ):
        """Test validation error for promoted_object missing application_id"""
        
        invalid_promoted_object = {
            "object_store_url": "https://apps.apple.com/app/id123456789",
            "custom_event_type": "APP_INSTALL"
        }
        
        result = await create_adset(
            **valid_mobile_app_params,
            promoted_object=invalid_promoted_object,
            destination_type="APP_STORE"
        )
        
        result_data = json.loads(result)
        
        # Should return validation error - check for data wrapper format
        if "data" in result_data:
            error_data = json.loads(result_data["data"]) 
            assert 'error' in error_data
            assert 'application_id' in error_data['error'].lower()
        else:
            assert 'error' in result_data
            assert 'application_id' in result_data['error'].lower()

    @pytest.mark.asyncio 
    async def test_invalid_promoted_object_missing_store_url(
        self, mock_api_request, mock_auth_manager, valid_mobile_app_params
    ):
        """Test validation error for promoted_object missing object_store_url"""
        
        invalid_promoted_object = {
            "application_id": "123456789012345",
            "custom_event_type": "APP_INSTALL"
        }
        
        result = await create_adset(
            **valid_mobile_app_params,
            promoted_object=invalid_promoted_object,
            destination_type="APP_STORE"
        )
        
        result_data = json.loads(result)
        
        # Should return validation error - check for data wrapper format
        if "data" in result_data:
            error_data = json.loads(result_data["data"]) 
            assert 'error' in error_data
            assert 'object_store_url' in error_data['error'].lower()
        else:
            assert 'error' in result_data
            assert 'object_store_url' in result_data['error'].lower()

    @pytest.mark.asyncio
    async def test_invalid_destination_type(
        self, mock_api_request, mock_auth_manager, valid_mobile_app_params, ios_promoted_object
    ):
        """Test validation error for invalid destination_type value"""
        
        result = await create_adset(
            **valid_mobile_app_params,
            promoted_object=ios_promoted_object,
            destination_type="INVALID_TYPE"
        )
        
        result_data = json.loads(result)
        
        # Should return validation error - check for data wrapper format
        if "data" in result_data:
            error_data = json.loads(result_data["data"]) 
            assert 'error' in error_data
            assert 'destination_type' in error_data['error'].lower()
        else:
            assert 'error' in result_data
            assert 'destination_type' in result_data['error'].lower()

    @pytest.mark.asyncio
    async def test_app_installs_requires_promoted_object(
        self, mock_api_request, mock_auth_manager, valid_mobile_app_params
    ):
        """Test that APP_INSTALLS optimization goal requires promoted_object"""
        
        result = await create_adset(
            **valid_mobile_app_params,
            # Missing promoted_object
            destination_type="APP_STORE"
        )
        
        result_data = json.loads(result)
        
        # Should return validation error - check for data wrapper format
        if "data" in result_data:
            error_data = json.loads(result_data["data"]) 
            assert 'error' in error_data
            assert 'promoted_object' in error_data['error'].lower()
            assert 'app_installs' in error_data['error'].lower()
        else:
            assert 'error' in result_data
            assert 'promoted_object' in result_data['error'].lower()
            assert 'app_installs' in result_data['error'].lower()

    # Test: Cross-platform Support
    @pytest.mark.asyncio
    async def test_ios_app_store_url_validation(
        self, mock_api_request, mock_auth_manager, valid_mobile_app_params
    ):
        """Test iOS App Store URL format validation"""
        
        ios_promoted_object = {
            "application_id": "123456789012345",
            "object_store_url": "https://apps.apple.com/app/id123456789",
            "custom_event_type": "APP_INSTALL"
        }
        
        result = await create_adset(
            **valid_mobile_app_params,
            promoted_object=ios_promoted_object,
            destination_type="APP_STORE"
        )
        
        result_data = json.loads(result)
        
        # Should succeed for valid iOS URL
        assert 'error' not in result_data or result_data.get('error') is None

    @pytest.mark.asyncio
    async def test_google_play_url_validation(
        self, mock_api_request, mock_auth_manager, valid_mobile_app_params
    ):
        """Test Google Play Store URL format validation"""
        
        android_promoted_object = {
            "application_id": "987654321098765",
            "object_store_url": "https://play.google.com/store/apps/details?id=com.example.app",
            "custom_event_type": "APP_INSTALL"
        }
        
        result = await create_adset(
            **valid_mobile_app_params,
            promoted_object=android_promoted_object,
            destination_type="APP_STORE"
        )
        
        result_data = json.loads(result)
        
        # Should succeed for valid Google Play URL
        assert 'error' not in result_data or result_data.get('error') is None

    @pytest.mark.asyncio
    async def test_invalid_store_url_format(
        self, mock_api_request, mock_auth_manager, valid_mobile_app_params
    ):
        """Test validation error for invalid app store URL format"""
        
        invalid_promoted_object = {
            "application_id": "123456789012345",
            "object_store_url": "https://example.com/invalid-url",
            "custom_event_type": "APP_INSTALL"
        }
        
        result = await create_adset(
            **valid_mobile_app_params,
            promoted_object=invalid_promoted_object,
            destination_type="APP_STORE"
        )
        
        result_data = json.loads(result)
        
        # Should return validation error for invalid URL - check for data wrapper format
        if "data" in result_data:
            error_data = json.loads(result_data["data"]) 
            assert 'error' in error_data
            assert 'store url' in error_data['error'].lower() or 'object_store_url' in error_data['error'].lower()
        else:
            assert 'error' in result_data
            assert 'store url' in result_data['error'].lower() or 'object_store_url' in result_data['error'].lower()

    # Test: Destination Type Variations
    @pytest.mark.asyncio
    async def test_deeplink_destination_type(
        self, mock_api_request, mock_auth_manager, valid_mobile_app_params, ios_promoted_object
    ):
        """Test DEEPLINK destination_type"""
        
        result = await create_adset(
            **valid_mobile_app_params,
            promoted_object=ios_promoted_object,
            destination_type="DEEPLINK"
        )
        
        call_args = mock_api_request.call_args
        params = call_args[0][2]
        assert params['destination_type'] == "DEEPLINK"

    @pytest.mark.asyncio
    async def test_app_install_destination_type(
        self, mock_api_request, mock_auth_manager, valid_mobile_app_params, ios_promoted_object
    ):
        """Test APP_INSTALL destination_type"""
        
        result = await create_adset(
            **valid_mobile_app_params,
            promoted_object=ios_promoted_object,
            destination_type="APP_INSTALL"
        )
        
        call_args = mock_api_request.call_args
        params = call_args[0][2]
        assert params['destination_type'] == "APP_INSTALL"

    @pytest.mark.asyncio
    async def test_on_ad_destination_type_for_lead_generation(
        self, mock_api_request, mock_auth_manager, valid_mobile_app_params
    ):
        """Test ON_AD destination_type for lead generation campaigns (Issue #009 fix)"""
        
        # Create a lead generation adset configuration (without promoted_object since it's for lead gen, not mobile apps)
        lead_gen_params = valid_mobile_app_params.copy()
        lead_gen_params.update({
            "optimization_goal": "LEAD_GENERATION",
            "billing_event": "IMPRESSIONS"
        })
        
        result = await create_adset(
            **lead_gen_params,
            destination_type="ON_AD"
        )
        
        # Should pass validation and include destination_type in API call
        call_args = mock_api_request.call_args
        params = call_args[0][2]
        assert params['destination_type'] == "ON_AD"

    @pytest.mark.asyncio
    async def test_on_ad_validation_passes(
        self, mock_api_request, mock_auth_manager, valid_mobile_app_params
    ):
        """Test that ON_AD destination_type passes validation (Issue #009 regression test)"""
        
        # Use parameters that work with ON_AD (lead generation, not mobile app)
        lead_gen_params = valid_mobile_app_params.copy()
        lead_gen_params.update({
            "optimization_goal": "LEAD_GENERATION",
            "billing_event": "IMPRESSIONS"
        })
        
        result = await create_adset(
            **lead_gen_params,
            destination_type="ON_AD"
        )
        
        result_data = json.loads(result)
        
        # Should NOT return a validation error about destination_type
        # Before the fix, this would return: "Invalid destination_type: ON_AD" 
        if "data" in result_data:
            error_data = json.loads(result_data["data"])
            assert "error" not in error_data or "destination_type" not in error_data.get("error", "").lower()
        else:
            assert "error" not in result_data or "destination_type" not in result_data.get("error", "").lower()

    # Test: Error Handling
    @pytest.mark.asyncio
    async def test_meta_api_error_handling(
        self, mock_auth_manager, valid_mobile_app_params, ios_promoted_object
    ):
        """Test handling of Meta API errors for mobile app adsets"""
        
        with patch('meta_ads_mcp.core.adsets.make_api_request') as mock_api:
            # Mock Meta API error response
            mock_api.side_effect = Exception("HTTP Error: 400 - Select a dataset and conversion event for your ad set")
            
            result = await create_adset(
                **valid_mobile_app_params,
                promoted_object=ios_promoted_object,
                destination_type="APP_STORE"
            )
            
            result_data = json.loads(result)
            
            # Should handle the error gracefully - check for data wrapper format
            if "data" in result_data:
                error_data = json.loads(result_data["data"]) 
                assert 'error' in error_data
                # Check for error text in either error message or details
                error_text = error_data.get('error', '').lower()
                details_text = error_data.get('details', '').lower()
                assert 'dataset' in error_text or 'conversion event' in error_text or \
                       'dataset' in details_text or 'conversion event' in details_text
            else:
                assert 'error' in result_data
                error_text = result_data.get('error', '').lower()
                details_text = result_data.get('details', '').lower()
                assert 'dataset' in error_text or 'conversion event' in error_text or \
                       'dataset' in details_text or 'conversion event' in details_text

    # Test: Backward Compatibility
    @pytest.mark.asyncio
    async def test_backward_compatibility_non_mobile_campaigns(
        self, mock_api_request, mock_auth_manager
    ):
        """Test that non-mobile campaigns still work without mobile app parameters"""
        
        non_mobile_params = {
            "account_id": "act_123456789",
            "campaign_id": "campaign_123456789", 
            "name": "Test Web Adset",
            "optimization_goal": "LINK_CLICKS",
            "billing_event": "LINK_CLICKS",
            "targeting": {
                "age_min": 18,
                "age_max": 65,
                "geo_locations": {"countries": ["US"]}
            }
        }
        
        result = await create_adset(**non_mobile_params)
        
        # Should work without mobile app parameters
        mock_api_request.assert_called_once()
        call_args = mock_api_request.call_args
        params = call_args[0][2]
        
        # Should not include mobile app parameters
        assert 'promoted_object' not in params
        assert 'destination_type' not in params

    @pytest.mark.asyncio
    async def test_optional_mobile_parameters(
        self, mock_api_request, mock_auth_manager, valid_mobile_app_params, ios_promoted_object
    ):
        """Test that mobile app parameters are optional for non-APP_INSTALLS campaigns"""
        
        non_app_install_params = valid_mobile_app_params.copy()
        non_app_install_params['optimization_goal'] = "REACH"
        
        result = await create_adset(
            **non_app_install_params,
            # Mobile app parameters should be optional for non-APP_INSTALLS
            promoted_object=ios_promoted_object,
            destination_type="APP_STORE"
        )
        
        # Should work and include mobile parameters if provided
        mock_api_request.assert_called_once()
        call_args = mock_api_request.call_args
        params = call_args[0][2]
        
        # Mobile parameters should be included if provided
        assert 'promoted_object' in params
        assert 'destination_type' in params
```

--------------------------------------------------------------------------------
/tests/test_targeting_search_e2e.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
End-to-End Targeting Search Test for Meta Ads MCP

This test validates that the targeting search tools correctly find and return
targeting options (interests, behaviors, demographics, geo locations) from the
Meta Ads API through a pre-authenticated MCP server.

Test functions:
- search_interests
- get_interest_suggestions  
- validate_interests
- search_behaviors
- search_demographics
- search_geo_locations
"""

import requests
import json
import os
import sys
from typing import Dict, Any, List

# Load environment variables from .env file
try:
    from dotenv import load_dotenv
    load_dotenv()
    print("✅ Loaded environment variables from .env file")
except ImportError:
    print("⚠️  python-dotenv not installed, using system environment variables only")

class TargetingSearchTester:
    """Test suite focused on targeting search functionality"""
    
    def __init__(self, base_url: str = "http://localhost:8080"):
        self.base_url = base_url.rstrip('/')
        self.endpoint = f"{self.base_url}/mcp/"
        self.request_id = 1
        
        # Test data for validation
        self.test_queries = {
            "interests": ["baseball", "cooking", "travel"],
            "geo_locations": ["New York", "California", "Japan"],
            "interest_suggestions": ["Basketball", "Soccer"],
            "demographics": ["life_events", "industries", "family_statuses"]
        }
        
    def _make_request(self, method: str, params: Dict[str, Any] = None, 
                     headers: Dict[str, str] = None) -> Dict[str, Any]:
        """Make a JSON-RPC request to the MCP server"""
        
        default_headers = {
            "Content-Type": "application/json",
            "Accept": "application/json, text/event-stream",
            "User-Agent": "Targeting-Search-Test-Client/1.0"
        }
        
        if headers:
            default_headers.update(headers)
        
        payload = {
            "jsonrpc": "2.0",
            "method": method,
            "id": self.request_id
        }
        
        if params:
            payload["params"] = params
        
        try:
            response = requests.post(
                self.endpoint,
                headers=default_headers,
                json=payload,
                timeout=15
            )
            
            self.request_id += 1
            
            return {
                "status_code": response.status_code,
                "headers": dict(response.headers),
                "json": response.json() if response.status_code == 200 else None,
                "text": response.text,
                "success": response.status_code == 200
            }
            
        except requests.exceptions.RequestException as e:
            return {
                "status_code": 0,
                "headers": {},
                "json": None,
                "text": str(e),
                "success": False,
                "error": str(e)
            }

    def test_search_interests(self) -> Dict[str, Any]:
        """Test search_interests functionality"""
        
        print(f"\n🔍 Testing search_interests function")
        results = {}
        
        for query in self.test_queries["interests"]:
            print(f"   🔎 Searching for interests: '{query}'")
            
            result = self._make_request("tools/call", {
                "name": "search_interests",
                "arguments": {
                    "query": query,
                    "limit": 5
                }
            })
            
            if not result["success"]:
                results[query] = {
                    "success": False,
                    "error": result.get("text", "Unknown error")
                }
                print(f"   ❌ Failed: {result.get('text', 'Unknown error')}")
                continue
            
            # Parse response
            response_data = result["json"]["result"]
            content = response_data.get("content", [{}])[0].get("text", "")
            
            try:
                parsed_content = json.loads(content)
                
                if "error" in parsed_content:
                    results[query] = {
                        "success": False,
                        "error": parsed_content["error"]
                    }
                    print(f"   ❌ API Error: {parsed_content['error']}")
                    continue
                
                interests = parsed_content.get("data", [])
                
                results[query] = {
                    "success": True,
                    "count": len(interests),
                    "interests": interests[:3],  # Keep first 3 for display
                    "has_required_fields": all(
                        "id" in interest and "name" in interest 
                        for interest in interests
                    )
                }
                
                print(f"   ✅ Found {len(interests)} interests")
                for interest in interests[:3]:
                    print(f"      • {interest.get('name', 'N/A')} (ID: {interest.get('id', 'N/A')})")
                
            except json.JSONDecodeError:
                results[query] = {
                    "success": False,
                    "error": "Invalid JSON response",
                    "raw_content": content
                }
                print(f"   ❌ Invalid JSON: {content}")
        
        return results

    def test_get_interest_suggestions(self) -> Dict[str, Any]:
        """Test get_interest_suggestions functionality"""
        
        print(f"\n🔍 Testing get_interest_suggestions function")
        
        interest_list = self.test_queries["interest_suggestions"]
        print(f"   🔎 Getting suggestions for: {interest_list}")
        
        result = self._make_request("tools/call", {
            "name": "get_interest_suggestions",
            "arguments": {
                "interest_list": interest_list,
                "limit": 5
            }
        })
        
        if not result["success"]:
            return {
                "success": False,
                "error": result.get("text", "Unknown error")
            }
        
        # Parse response
        response_data = result["json"]["result"]
        content = response_data.get("content", [{}])[0].get("text", "")
        
        try:
            parsed_content = json.loads(content)
            
            if "error" in parsed_content:
                return {
                    "success": False,
                    "error": parsed_content["error"]
                }
            
            suggestions = parsed_content.get("data", [])
            
            result_data = {
                "success": True,
                "count": len(suggestions),
                "suggestions": suggestions[:3],  # Keep first 3 for display
                "has_required_fields": all(
                    "id" in suggestion and "name" in suggestion 
                    for suggestion in suggestions
                )
            }
            
            print(f"   ✅ Found {len(suggestions)} suggestions")
            for suggestion in suggestions[:3]:
                print(f"      • {suggestion.get('name', 'N/A')} (ID: {suggestion.get('id', 'N/A')})")
            
            return result_data
            
        except json.JSONDecodeError:
            return {
                "success": False,
                "error": "Invalid JSON response",
                "raw_content": content
            }

    def test_validate_interests(self) -> Dict[str, Any]:
        """Test validate_interests functionality"""
        
        print(f"\n🔍 Testing validate_interests function")
        
        # Test with known valid and invalid interest names
        test_interests = ["Japan", "Basketball", "invalidinterestname12345"]
        print(f"   🔎 Validating interests: {test_interests}")
        
        result = self._make_request("tools/call", {
            "name": "validate_interests",
            "arguments": {
                "interest_list": test_interests
            }
        })
        
        if not result["success"]:
            return {
                "success": False,
                "error": result.get("text", "Unknown error")
            }
        
        # Parse response
        response_data = result["json"]["result"]
        content = response_data.get("content", [{}])[0].get("text", "")
        
        try:
            parsed_content = json.loads(content)
            
            if "error" in parsed_content:
                return {
                    "success": False,
                    "error": parsed_content["error"]
                }
            
            validations = parsed_content.get("data", [])
            
            result_data = {
                "success": True,
                "count": len(validations),
                "validations": validations,
                "has_valid_interests": any(
                    validation.get("valid", False) for validation in validations
                ),
                "has_invalid_interests": any(
                    not validation.get("valid", True) for validation in validations
                )
            }
            
            print(f"   ✅ Validated {len(validations)} interests")
            for validation in validations:
                status = "✅" if validation.get("valid") else "❌"
                print(f"      {status} {validation.get('name', 'N/A')}")
            
            return result_data
            
        except json.JSONDecodeError:
            return {
                "success": False,
                "error": "Invalid JSON response",
                "raw_content": content
            }

    def test_search_behaviors(self) -> Dict[str, Any]:
        """Test search_behaviors functionality"""
        
        print(f"\n🔍 Testing search_behaviors function")
        
        result = self._make_request("tools/call", {
            "name": "search_behaviors",
            "arguments": {
                "limit": 5
            }
        })
        
        if not result["success"]:
            return {
                "success": False,
                "error": result.get("text", "Unknown error")
            }
        
        # Parse response
        response_data = result["json"]["result"]
        content = response_data.get("content", [{}])[0].get("text", "")
        
        try:
            parsed_content = json.loads(content)
            
            if "error" in parsed_content:
                return {
                    "success": False,
                    "error": parsed_content["error"]
                }
            
            behaviors = parsed_content.get("data", [])
            
            result_data = {
                "success": True,
                "count": len(behaviors),
                "behaviors": behaviors[:3],  # Keep first 3 for display
                "has_required_fields": all(
                    "id" in behavior and "name" in behavior 
                    for behavior in behaviors
                )
            }
            
            print(f"   ✅ Found {len(behaviors)} behaviors")
            for behavior in behaviors[:3]:
                print(f"      • {behavior.get('name', 'N/A')} (ID: {behavior.get('id', 'N/A')})")
            
            return result_data
            
        except json.JSONDecodeError:
            return {
                "success": False,
                "error": "Invalid JSON response",
                "raw_content": content
            }

    def test_search_demographics(self) -> Dict[str, Any]:
        """Test search_demographics functionality"""
        
        print(f"\n🔍 Testing search_demographics function")
        results = {}
        
        for demo_class in self.test_queries["demographics"]:
            print(f"   🔎 Searching demographics class: '{demo_class}'")
            
            result = self._make_request("tools/call", {
                "name": "search_demographics",
                "arguments": {
                    "demographic_class": demo_class,
                    "limit": 3
                }
            })
            
            if not result["success"]:
                results[demo_class] = {
                    "success": False,
                    "error": result.get("text", "Unknown error")
                }
                print(f"   ❌ Failed: {result.get('text', 'Unknown error')}")
                continue
            
            # Parse response
            response_data = result["json"]["result"]
            content = response_data.get("content", [{}])[0].get("text", "")
            
            try:
                parsed_content = json.loads(content)
                
                if "error" in parsed_content:
                    results[demo_class] = {
                        "success": False,
                        "error": parsed_content["error"]
                    }
                    print(f"   ❌ API Error: {parsed_content['error']}")
                    continue
                
                demographics = parsed_content.get("data", [])
                
                results[demo_class] = {
                    "success": True,
                    "count": len(demographics),
                    "demographics": demographics[:2],  # Keep first 2 for display
                    "has_required_fields": all(
                        "id" in demo and "name" in demo 
                        for demo in demographics
                    )
                }
                
                print(f"   ✅ Found {len(demographics)} {demo_class}")
                for demo in demographics[:2]:
                    print(f"      • {demo.get('name', 'N/A')} (ID: {demo.get('id', 'N/A')})")
                
            except json.JSONDecodeError:
                results[demo_class] = {
                    "success": False,
                    "error": "Invalid JSON response",
                    "raw_content": content
                }
                print(f"   ❌ Invalid JSON: {content}")
        
        return results

    def test_search_geo_locations(self) -> Dict[str, Any]:
        """Test search_geo_locations functionality"""
        
        print(f"\n🔍 Testing search_geo_locations function")
        results = {}
        
        for query in self.test_queries["geo_locations"]:
            print(f"   🔎 Searching for locations: '{query}'")
            
            result = self._make_request("tools/call", {
                "name": "search_geo_locations",
                "arguments": {
                    "query": query,
                    "location_types": ["country", "region", "city"],
                    "limit": 3
                }
            })
            
            if not result["success"]:
                results[query] = {
                    "success": False,
                    "error": result.get("text", "Unknown error")
                }
                print(f"   ❌ Failed: {result.get('text', 'Unknown error')}")
                continue
            
            # Parse response
            response_data = result["json"]["result"]
            content = response_data.get("content", [{}])[0].get("text", "")
            
            try:
                parsed_content = json.loads(content)
                
                if "error" in parsed_content:
                    results[query] = {
                        "success": False,
                        "error": parsed_content["error"]
                    }
                    print(f"   ❌ API Error: {parsed_content['error']}")
                    continue
                
                locations = parsed_content.get("data", [])
                
                results[query] = {
                    "success": True,
                    "count": len(locations),
                    "locations": locations[:3],  # Keep first 3 for display
                    "has_required_fields": all(
                        "key" in location and "name" in location and "type" in location
                        for location in locations
                    )
                }
                
                print(f"   ✅ Found {len(locations)} locations")
                for location in locations[:3]:
                    print(f"      • {location.get('name', 'N/A')} ({location.get('type', 'N/A')}, Key: {location.get('key', 'N/A')})")
                
            except json.JSONDecodeError:
                results[query] = {
                    "success": False,
                    "error": "Invalid JSON response",
                    "raw_content": content
                }
                print(f"   ❌ Invalid JSON: {content}")
        
        return results

    def run_targeting_search_tests(self) -> bool:
        """Run comprehensive targeting search tests"""
        
        print("🚀 Meta Ads Targeting Search End-to-End Test Suite")
        print("="*60)
        
        # Check server availability
        try:
            response = requests.get(f"{self.base_url}/", timeout=5)
            server_running = response.status_code in [200, 404]
        except:
            server_running = False
        
        if not server_running:
            print("❌ Server is not running at", self.base_url)
            print("   Please start the server with:")
            print("   python3 -m meta_ads_mcp --transport streamable-http --port 8080")
            return False
        
        print("✅ Server is running")
        print("🔐 Using implicit authentication from server")
        
        # Test 1: Search Interests
        print("\n" + "="*60)
        print("📋 PHASE 1: Testing Interest Search")
        print("="*60)
        
        interests_results = self.test_search_interests()
        interests_success = any(
            result.get("success") and result.get("count", 0) > 0 
            for result in interests_results.values()
        )
        
        # Test 2: Interest Suggestions
        print("\n" + "="*60)
        print("📋 PHASE 2: Testing Interest Suggestions")
        print("="*60)
        
        suggestions_result = self.test_get_interest_suggestions()
        suggestions_success = suggestions_result.get("success") and suggestions_result.get("count", 0) > 0
        
        # Test 3: Interest Validation
        print("\n" + "="*60)
        print("📋 PHASE 3: Testing Interest Validation")
        print("="*60)
        
        validation_result = self.test_validate_interests()
        validation_success = (validation_result.get("success") and 
                            validation_result.get("has_valid_interests") and 
                            validation_result.get("has_invalid_interests"))
        
        # Test 4: Behavior Search
        print("\n" + "="*60)
        print("📋 PHASE 4: Testing Behavior Search")
        print("="*60)
        
        behaviors_result = self.test_search_behaviors()
        behaviors_success = behaviors_result.get("success") and behaviors_result.get("count", 0) > 0
        
        # Test 5: Demographics Search
        print("\n" + "="*60)
        print("📋 PHASE 5: Testing Demographics Search")
        print("="*60)
        
        demographics_results = self.test_search_demographics()
        demographics_success = any(
            result.get("success") and result.get("count", 0) > 0 
            for result in demographics_results.values()
        )
        
        # Test 6: Geo Location Search
        print("\n" + "="*60)
        print("📋 PHASE 6: Testing Geo Location Search")
        print("="*60)
        
        geo_results = self.test_search_geo_locations()
        geo_success = any(
            result.get("success") and result.get("count", 0) > 0 
            for result in geo_results.values()
        )
        
        # Final assessment
        print("\n" + "="*60)
        print("📊 FINAL RESULTS")
        print("="*60)
        
        all_tests = [
            ("Interest Search", interests_success),
            ("Interest Suggestions", suggestions_success),
            ("Interest Validation", validation_success),
            ("Behavior Search", behaviors_success),
            ("Demographics Search", demographics_success),
            ("Geo Location Search", geo_success)
        ]
        
        passed_tests = sum(1 for _, success in all_tests if success)
        total_tests = len(all_tests)
        
        for test_name, success in all_tests:
            status = "✅ PASSED" if success else "❌ FAILED"
            print(f"   • {test_name}: {status}")
        
        overall_success = passed_tests >= 4  # At least 4 out of 6 tests should pass
        
        if overall_success:
            print(f"\n✅ Targeting search tests: SUCCESS ({passed_tests}/{total_tests} passed)")
            print("   • Core targeting search functionality is working")
            print("   • Meta Ads API integration is functional")
            return True
        else:
            print(f"\n❌ Targeting search tests: FAILED ({passed_tests}/{total_tests} passed)")
            print("   • Some targeting search functions are not working properly")
            return False


def main():
    """Main test execution"""
    tester = TargetingSearchTester()
    success = tester.run_targeting_search_tests()
    
    if success:
        print("\n🎉 All targeting search tests passed!")
    else:
        print("\n⚠️  Some targeting search tests failed - see details above")
    
    sys.exit(0 if success else 1)


if __name__ == "__main__":
    main() 
```

--------------------------------------------------------------------------------
/tests/test_insights_actions_and_values_e2e.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Unit Tests for Insights Actions and Action Values Functionality

This test suite validates the actions and action_values field implementation for the
get_insights function in meta_ads_mcp/core/insights.py.

Test cases cover:
- Actions and action_values field inclusion in API requests
- Different levels of aggregation (ad, adset, campaign, account)
- Time range handling with actions and action_values
- Error handling and validation
- Purchase data extraction from actions and action_values
"""

import pytest
import json
import asyncio
import requests
from unittest.mock import AsyncMock, patch, MagicMock
from typing import Dict, Any, List

# Import the function to test
from meta_ads_mcp.core.insights import get_insights


class TestInsightsActionsAndValues:
    """Test suite for actions and action_values in insights"""
    
    @pytest.fixture
    def mock_api_request(self):
        """Mock for the make_api_request function"""
        with patch('meta_ads_mcp.core.insights.make_api_request') as mock:
            mock.return_value = {
                "data": [
                    {
                        "campaign_id": "test_campaign_id",
                        "campaign_name": "Test Campaign",
                        "impressions": "1000",
                        "clicks": "50",
                        "spend": "100.00",
                        "actions": [
                            {"action_type": "purchase", "value": "5"},
                            {"action_type": "lead", "value": "3"},
                            {"action_type": "view_content", "value": "20"}
                        ],
                        "action_values": [
                            {"action_type": "purchase", "value": "500.00"},
                            {"action_type": "lead", "value": "150.00"},
                            {"action_type": "view_content", "value": "0.00"}
                        ],
                        "cost_per_action_type": [
                            {"action_type": "purchase", "value": "20.00"},
                            {"action_type": "lead", "value": "33.33"}
                        ]
                    }
                ],
                "paging": {}
            }
            yield mock
    
    @pytest.fixture
    def mock_auth_manager(self):
        """Mock for the authentication manager"""
        with patch('meta_ads_mcp.core.api.auth_manager') as mock, \
             patch('meta_ads_mcp.core.auth.get_current_access_token') as mock_get_token:
            # Mock a valid access token
            mock.get_current_access_token.return_value = "test_access_token"
            mock.is_token_valid.return_value = True
            mock.app_id = "test_app_id"
            mock_get_token.return_value = "test_access_token"
            yield mock
    
    @pytest.fixture
    def valid_campaign_id(self):
        """Valid campaign ID for testing"""
        return "123456789"
    
    @pytest.fixture
    def valid_account_id(self):
        """Valid account ID for testing"""
        return "act_701351919139047"
    
    @pytest.mark.asyncio
    async def test_actions_and_action_values_included_in_fields(self, mock_api_request, mock_auth_manager, valid_campaign_id):
        """Test that actions and action_values are included in the fields parameter"""
        
        result = await get_insights(
            object_id=valid_campaign_id,
            time_range="last_30d",
            level="campaign"
        )
        
        # Parse the result
        result_data = json.loads(result)
        
        # Verify the API was called with correct parameters
        mock_api_request.assert_called_once()
        call_args = mock_api_request.call_args
        
        # Check that the endpoint is correct (first argument)
        assert call_args[0][0] == f"{valid_campaign_id}/insights"
        
        # Check that actions and action_values are included in fields parameter
        params = call_args[0][2]  # Third positional argument is params
        assert 'fields' in params
        
        fields = params['fields']
        assert 'actions' in fields
        assert 'action_values' in fields
        assert 'cost_per_action_type' in fields
        
        # Verify the response structure
        assert 'data' in result_data
        assert len(result_data['data']) > 0
        assert 'actions' in result_data['data'][0]
        assert 'action_values' in result_data['data'][0]
    
    @pytest.mark.asyncio
    async def test_purchase_data_extraction(self, mock_api_request, mock_auth_manager, valid_campaign_id):
        """Test that purchase data can be extracted from actions and action_values"""
        
        result = await get_insights(
            object_id=valid_campaign_id,
            time_range="last_30d",
            level="campaign"
        )
        
        # Parse the result
        result_data = json.loads(result)
        
        # Get the first data point
        data_point = result_data['data'][0]
        
        # Extract purchase data from actions
        actions = data_point.get('actions', [])
        purchase_actions = [action for action in actions if action.get('action_type') == 'purchase']
        
        # Extract purchase data from action_values
        action_values = data_point.get('action_values', [])
        purchase_values = [action_value for action_value in action_values if action_value.get('action_type') == 'purchase']
        
        # Verify purchase data exists
        assert len(purchase_actions) > 0, "No purchase actions found"
        assert len(purchase_values) > 0, "No purchase action_values found"
        
        # Verify purchase data values
        purchase_count = purchase_actions[0].get('value')
        purchase_value = purchase_values[0].get('value')
        
        assert purchase_count == "5", f"Expected purchase count 5, got {purchase_count}"
        assert purchase_value == "500.00", f"Expected purchase value 500.00, got {purchase_value}"
    
    @pytest.mark.asyncio
    async def test_actions_at_adset_level(self, mock_api_request, mock_auth_manager, valid_campaign_id):
        """Test actions and action_values at adset level"""
        
        result = await get_insights(
            object_id=valid_campaign_id,
            time_range="last_30d",
            level="adset"
        )
        
        # Parse the result
        result_data = json.loads(result)
        
        # Verify the API was called with correct parameters
        mock_api_request.assert_called_once()
        call_args = mock_api_request.call_args
        
        # Check that the level parameter is correct
        params = call_args[0][2]
        assert params['level'] == 'adset'
        
        # Verify the response structure
        assert 'data' in result_data
    
    @pytest.mark.asyncio
    async def test_actions_at_ad_level(self, mock_api_request, mock_auth_manager, valid_campaign_id):
        """Test actions and action_values at ad level"""
        
        result = await get_insights(
            object_id=valid_campaign_id,
            time_range="last_30d",
            level="ad"
        )
        
        # Parse the result
        result_data = json.loads(result)
        
        # Verify the API was called with correct parameters
        mock_api_request.assert_called_once()
        call_args = mock_api_request.call_args
        
        # Check that the level parameter is correct
        params = call_args[0][2]
        assert params['level'] == 'ad'
        
        # Verify the response structure
        assert 'data' in result_data
    
    @pytest.mark.asyncio
    async def test_actions_with_custom_time_range(self, mock_api_request, mock_auth_manager, valid_campaign_id):
        """Test actions and action_values with custom time range"""
        
        custom_time_range = {"since": "2024-01-01", "until": "2024-01-31"}
        
        result = await get_insights(
            object_id=valid_campaign_id,
            time_range=custom_time_range,
            level="campaign"
        )
        
        # Parse the result
        result_data = json.loads(result)
        
        # Verify the API was called with correct parameters
        mock_api_request.assert_called_once()
        call_args = mock_api_request.call_args
        
        # Check that time_range is properly formatted
        params = call_args[0][2]
        assert 'time_range' in params
        assert params['time_range'] == json.dumps(custom_time_range)
        
        # Verify the response structure
        assert 'data' in result_data
    
    @pytest.mark.asyncio
    async def test_actions_with_breakdown(self, mock_api_request, mock_auth_manager, valid_campaign_id):
        """Test actions and action_values with breakdown dimension"""
        
        result = await get_insights(
            object_id=valid_campaign_id,
            time_range="last_30d",
            level="campaign",
            breakdown="age"
        )
        
        # Parse the result
        result_data = json.loads(result)
        
        # Verify the API was called with correct parameters
        mock_api_request.assert_called_once()
        call_args = mock_api_request.call_args
        
        # Check that breakdown is included
        params = call_args[0][2]
        assert 'breakdowns' in params
        assert params['breakdowns'] == 'age'
        
        # Verify the response structure
        assert 'data' in result_data
    
    @pytest.mark.asyncio
    async def test_actions_without_object_id(self, mock_api_request, mock_auth_manager):
        """Test error handling when no object_id is provided"""
        
        result = await get_insights(
            time_range="last_30d",
            level="campaign"
        )
        
        # Parse the result. The decorator returns a dict error for missing required args
        if isinstance(result, dict):
            result_data = result
        else:
            result_data = json.loads(result)

        assert 'error' in result_data
        assert "missing 1 required positional argument: 'object_id'" in result_data['error']
        
        # Verify API was not called
        mock_api_request.assert_not_called()
    
    @pytest.mark.asyncio
    async def test_actions_with_invalid_time_range(self, mock_api_request, mock_auth_manager, valid_campaign_id):
        """Test error handling with invalid time range"""
        
        invalid_time_range = {"since": "2024-01-01"}  # Missing "until"
        
        result = await get_insights(
            object_id=valid_campaign_id,
            time_range=invalid_time_range,
            level="campaign"
        )
        
        # Parse the result
        result_data = json.loads(result)
        
        # The error response is wrapped in a 'data' field
        if 'data' in result_data:
            error_data = json.loads(result_data['data'])
            assert 'error' in error_data
            assert 'since' in error_data['error'] and 'until' in error_data['error']
        else:
            assert 'error' in result_data
            assert 'since' in result_data['error'] and 'until' in result_data['error']
        
        # Verify API was not called
        mock_api_request.assert_not_called()
    
    @pytest.mark.asyncio
    async def test_actions_api_error_handling(self, mock_api_request, mock_auth_manager, valid_campaign_id):
        """Test error handling when API call fails"""
        
        # Mock API to raise an exception
        mock_api_request.side_effect = Exception("API Error")
        
        result = await get_insights(
            object_id=valid_campaign_id,
            time_range="last_30d",
            level="campaign"
        )
        
        # Parse the result
        # The API error handling returns a dict directly, not a JSON string
        if isinstance(result, dict):
            result_data = result
        else:
            result_data = json.loads(result)
        
        # Verify error response
        assert 'error' in result_data
        assert 'API Error' in result_data['error']
    
    @pytest.mark.asyncio
    async def test_actions_fields_completeness(self, mock_api_request, mock_auth_manager, valid_campaign_id):
        """Test that all required fields are included in the request"""
        
        result = await get_insights(
            object_id=valid_campaign_id,
            time_range="last_30d",
            level="campaign"
        )
        
        # Verify the API was called with correct parameters
        mock_api_request.assert_called_once()
        call_args = mock_api_request.call_args
        
        # Check that all required fields are included
        params = call_args[0][2]
        fields = params['fields']
        
        # Required fields for actions and action_values
        required_fields = [
            'account_id', 'account_name', 'campaign_id', 'campaign_name',
            'adset_id', 'adset_name', 'ad_id', 'ad_name',
            'impressions', 'clicks', 'spend', 'cpc', 'cpm', 'ctr',
            'reach', 'frequency', 'actions', 'action_values', 'conversions',
            'unique_clicks', 'cost_per_action_type'
        ]
        
        for field in required_fields:
            assert field in fields, f"Field '{field}' not found in fields parameter"
    
    @pytest.mark.asyncio
    async def test_multiple_action_types(self, mock_api_request, mock_auth_manager, valid_campaign_id):
        """Test handling of multiple action types in the response"""
        
        result = await get_insights(
            object_id=valid_campaign_id,
            time_range="last_30d",
            level="campaign"
        )
        
        # Parse the result
        result_data = json.loads(result)
        
        # Get the first data point
        data_point = result_data['data'][0]
        
        # Check that multiple action types are present
        actions = data_point.get('actions', [])
        action_types = [action.get('action_type') for action in actions]
        
        assert 'purchase' in action_types, "Purchase action type not found"
        assert 'lead' in action_types, "Lead action type not found"
        assert 'view_content' in action_types, "View content action type not found"
        
        # Check action_values has corresponding entries
        action_values = data_point.get('action_values', [])
        action_value_types = [action_value.get('action_type') for action_value in action_values]
        
        assert 'purchase' in action_value_types, "Purchase action_value type not found"
        assert 'lead' in action_value_types, "Lead action_value type not found"


@pytest.mark.e2e
@pytest.mark.skip(reason="E2E test - run manually only")
class TestInsightsActionsAndValuesE2E:
    """E2E tests for actions and action_values via MCP HTTP server"""
    
    def __init__(self, base_url: str = "http://localhost:8080"):
        self.base_url = base_url.rstrip('/')
        self.endpoint = f"{self.base_url}/mcp/"
        self.request_id = 1
        # Default account from workspace rules
        self.account_id = "act_701351919139047"

    def _make_request(self, method: str, params: Dict[str, Any] = None) -> Dict[str, Any]:
        headers = {
            "Content-Type": "application/json",
            "Accept": "application/json, text/event-stream",
            "User-Agent": "Insights-E2E-Test-Client/1.0"
        }
        payload = {
            "jsonrpc": "2.0",
            "method": method,
            "id": self.request_id
        }
        if params:
            payload["params"] = params
        try:
            resp = requests.post(self.endpoint, headers=headers, json=payload, timeout=30)
            self.request_id += 1
            return {
                "status_code": resp.status_code,
                "json": resp.json() if resp.status_code == 200 else None,
                "text": resp.text,
                "success": resp.status_code == 200
            }
        except requests.exceptions.RequestException as e:
            return {"status_code": 0, "json": None, "text": str(e), "success": False, "error": str(e)}

    def _check_for_errors(self, parsed_content: Dict[str, Any]) -> Dict[str, Any]:
        if "data" in parsed_content:
            data = parsed_content["data"]
            if isinstance(data, dict) and 'error' in data:
                return {"has_error": True, "error_message": data['error'], "format": "wrapped_dict"}
            if isinstance(data, str):
                try:
                    error_data = json.loads(data)
                    if 'error' in error_data:
                        return {"has_error": True, "error_message": error_data['error'], "format": "wrapped_json"}
                except json.JSONDecodeError:
                    pass
        if 'error' in parsed_content:
            return {"has_error": True, "error_message": parsed_content['error'], "format": "direct"}
        return {"has_error": False}

    def test_tool_exists_and_has_required_fields(self):
        result = self._make_request("tools/list", {})
        assert result["success"], f"tools/list failed: {result.get('text', '')}"
        tools = result["json"]["result"].get("tools", [])
        tool = next((t for t in tools if t["name"] == "get_insights"), None)
        assert tool is not None, "get_insights tool not found"
        props = tool.get("inputSchema", {}).get("properties", {})
        for req in ["object_id", "time_range", "breakdown", "level"]:
            assert req in props, f"Missing parameter in schema: {req}"

    def test_get_insights_account_level(self):
        params = {
            "name": "get_insights",
            "arguments": {
                "object_id": self.account_id,
                "time_range": "last_30d",
                "level": "account"
            }
        }
        result = self._make_request("tools/call", params)
        assert result["success"], f"tools/call failed: {result.get('text', '')}"
        response_data = result["json"]["result"]
        content = response_data.get("content", [{}])[0].get("text", "")
        parsed = json.loads(content)
        err = self._check_for_errors(parsed)
        # Don't fail if auth or permissions block; just assert structure is parsable
        if err["has_error"]:
            assert isinstance(err["error_message"], (str, dict))
        else:
            # Expect data list on success
            data = parsed.get("data") if isinstance(parsed, dict) else None
            assert data is not None

def main():
    tester = TestInsightsActionsAndValuesE2E()
    print("🚀 Insights Actions E2E (manual)")
    # Basic smoke run
    try:
        tester.test_tool_exists_and_has_required_fields()
        print("✅ Tool schema ok")
        tester.test_get_insights_account_level()
        print("✅ Account-level insights request executed")
    except AssertionError as e:
        print(f"❌ Assertion failed: {e}")
    except Exception as e:
        print(f"❌ Error: {e}")

if __name__ == "__main__":
    main()


def extract_purchase_data(insights_data):
    """
    Helper function to extract purchase data from insights response.
    
    Args:
        insights_data: The data array from insights response
        
    Returns:
        dict: Dictionary with purchase count and value
    """
    if not insights_data or len(insights_data) == 0:
        return {"purchase_count": 0, "purchase_value": 0.0}
    
    data_point = insights_data[0]
    
    # Extract purchase count from actions
    actions = data_point.get('actions', [])
    purchase_actions = [action for action in actions if action.get('action_type') == 'purchase']
    purchase_count = int(purchase_actions[0].get('value', 0)) if purchase_actions else 0
    
    # Extract purchase value from action_values
    action_values = data_point.get('action_values', [])
    purchase_values = [action_value for action_value in action_values if action_value.get('action_type') == 'purchase']
    purchase_value = float(purchase_values[0].get('value', 0)) if purchase_values else 0.0
    
    return {
        "purchase_count": purchase_count,
        "purchase_value": purchase_value
    }


class TestPurchaseDataExtraction:
    """Test suite for purchase data extraction helper function"""
    
    def test_extract_purchase_data_with_purchases(self):
        """Test extraction when purchase data is present"""
        insights_data = [{
            "actions": [
                {"action_type": "purchase", "value": "5"},
                {"action_type": "lead", "value": "3"}
            ],
            "action_values": [
                {"action_type": "purchase", "value": "500.00"},
                {"action_type": "lead", "value": "150.00"}
            ]
        }]
        
        result = extract_purchase_data(insights_data)
        
        assert result["purchase_count"] == 5
        assert result["purchase_value"] == 500.0
    
    def test_extract_purchase_data_without_purchases(self):
        """Test extraction when no purchase data is present"""
        insights_data = [{
            "actions": [
                {"action_type": "lead", "value": "3"},
                {"action_type": "view_content", "value": "20"}
            ],
            "action_values": [
                {"action_type": "lead", "value": "150.00"},
                {"action_type": "view_content", "value": "0.00"}
            ]
        }]
        
        result = extract_purchase_data(insights_data)
        
        assert result["purchase_count"] == 0
        assert result["purchase_value"] == 0.0
    
    def test_extract_purchase_data_empty_data(self):
        """Test extraction with empty data"""
        insights_data = []
        
        result = extract_purchase_data(insights_data)
        
        assert result["purchase_count"] == 0
        assert result["purchase_value"] == 0.0 
```

--------------------------------------------------------------------------------
/meta_ads_mcp/core/auth.py:
--------------------------------------------------------------------------------

```python
"""Authentication related functionality for Meta Ads API."""

from typing import Any, Dict, Optional
import time
import platform
import pathlib
import os
import webbrowser
import asyncio
import json
from .utils import logger
import requests

# Import from the new callback server module
from .callback_server import (
    start_callback_server,
    shutdown_callback_server,
    token_container,
    callback_server_port
)

# Import the new Pipeboard authentication
from .pipeboard_auth import pipeboard_auth_manager

# Auth constants
# Scope includes pages_show_list and pages_read_engagement to fix issue #16
# where get_account_pages failed for regular users due to missing page permissions
AUTH_SCOPE = "business_management,public_profile,pages_show_list,pages_read_engagement"
AUTH_REDIRECT_URI = "http://localhost:8888/callback"
AUTH_RESPONSE_TYPE = "token"

# Log important configuration information
logger.info("Authentication module initialized")
logger.info(f"Auth scope: {AUTH_SCOPE}")
logger.info(f"Default redirect URI: {AUTH_REDIRECT_URI}")

# Global flag for authentication state
needs_authentication = False

# Meta configuration singleton
class MetaConfig:
    _instance = None
    
    def __new__(cls):
        if cls._instance is None:
            logger.debug("Creating new MetaConfig instance")
            cls._instance = super(MetaConfig, cls).__new__(cls)
            cls._instance.app_id = os.environ.get("META_APP_ID", "779761636818489")
            logger.info(f"MetaConfig initialized with app_id from env/default: {cls._instance.app_id}")
        return cls._instance
    
    def set_app_id(self, app_id):
        """Set the Meta App ID for API calls"""
        logger.info(f"Setting Meta App ID: {app_id}")
        self.app_id = app_id
        # Also update environment variable for modules that might read directly from it
        os.environ["META_APP_ID"] = app_id
        logger.debug(f"Updated META_APP_ID environment variable: {os.environ.get('META_APP_ID')}")
    
    def get_app_id(self):
        """Get the current Meta App ID"""
        # Check if we have one set
        if hasattr(self, 'app_id') and self.app_id:
            logger.debug(f"Using app_id from instance: {self.app_id}")
            return self.app_id
        
        # If not, try environment variable
        env_app_id = os.environ.get("META_APP_ID", "")
        if env_app_id:
            logger.debug(f"Using app_id from environment: {env_app_id}")
            # Update our instance for future use
            self.app_id = env_app_id
            return env_app_id
        
        logger.warning("No app_id found in instance or environment variables")
        return ""
    
    def is_configured(self):
        """Check if the Meta configuration is complete"""
        app_id = self.get_app_id()
        configured = bool(app_id)
        logger.debug(f"MetaConfig.is_configured() = {configured} (app_id: {app_id})")
        return configured

# Create singleton instance
meta_config = MetaConfig()

class TokenInfo:
    """Stores token information including expiration"""
    def __init__(self, access_token: str, expires_in: Optional[int] = None, user_id: Optional[str] = None):
        self.access_token = access_token
        self.expires_in = expires_in
        self.user_id = user_id
        self.created_at = int(time.time())
        logger.debug(f"TokenInfo created. Expires in: {expires_in if expires_in else 'Not specified'}")
    
    def is_expired(self) -> bool:
        """Check if the token is expired"""
        if not self.expires_in:
            return False  # If no expiration is set, assume it's not expired
        
        current_time = int(time.time())
        return current_time > (self.created_at + self.expires_in)
    
    def serialize(self) -> Dict[str, Any]:
        """Convert to a dictionary for storage"""
        return {
            "access_token": self.access_token,
            "expires_in": self.expires_in,
            "user_id": self.user_id,
            "created_at": self.created_at
        }
    
    @classmethod
    def deserialize(cls, data: Dict[str, Any]) -> 'TokenInfo':
        """Create from a stored dictionary"""
        token = cls(
            access_token=data.get("access_token", ""),
            expires_in=data.get("expires_in"),
            user_id=data.get("user_id")
        )
        token.created_at = data.get("created_at", int(time.time()))
        return token


class AuthManager:
    """Manages authentication with Meta APIs"""
    def __init__(self, app_id: str, redirect_uri: str = AUTH_REDIRECT_URI):
        self.app_id = app_id
        self.redirect_uri = redirect_uri
        self.token_info = None
        # Check for Pipeboard token first
        self.use_pipeboard = bool(os.environ.get("PIPEBOARD_API_TOKEN", ""))
        if not self.use_pipeboard:
            self._load_cached_token()
    
    def _get_token_cache_path(self) -> pathlib.Path:
        """Get the platform-specific path for token cache file"""
        if platform.system() == "Windows":
            base_path = pathlib.Path(os.environ.get("APPDATA", ""))
        elif platform.system() == "Darwin":  # macOS
            base_path = pathlib.Path.home() / "Library" / "Application Support"
        else:  # Assume Linux/Unix
            base_path = pathlib.Path.home() / ".config"
        
        # Create directory if it doesn't exist
        cache_dir = base_path / "meta-ads-mcp"
        cache_dir.mkdir(parents=True, exist_ok=True)
        
        return cache_dir / "token_cache.json"
    
    def _load_cached_token(self) -> bool:
        """Load token from cache if available"""
        cache_path = self._get_token_cache_path()
        
        if not cache_path.exists():
            return False
        
        try:
            with open(cache_path, "r") as f:
                data = json.load(f)
                
                # Validate the cached data structure
                required_fields = ["access_token", "created_at"]
                if not all(field in data for field in required_fields):
                    logger.warning("Cached token data is missing required fields")
                    return False
                
                # Check if the token looks valid (basic format check)
                if not data.get("access_token") or len(data["access_token"]) < 20:
                    logger.warning("Cached token appears malformed")
                    return False
                
                self.token_info = TokenInfo.deserialize(data)
                
                # Check if token is expired
                if self.token_info.is_expired():
                    logger.info("Cached token is expired, removing cache file")
                    # Remove the expired cache file
                    try:
                        cache_path.unlink()
                        logger.info(f"Removed expired token cache: {cache_path}")
                    except Exception as e:
                        logger.warning(f"Could not remove expired cache file: {e}")
                    self.token_info = None
                    return False
                
                # Additional validation: check if token is too old (more than 60 days)
                current_time = int(time.time())
                if self.token_info.created_at and (current_time - self.token_info.created_at) > (60 * 24 * 3600):
                    logger.warning("Cached token is too old (more than 60 days), removing cache file")
                    try:
                        cache_path.unlink()
                        logger.info(f"Removed old token cache: {cache_path}")
                    except Exception as e:
                        logger.warning(f"Could not remove old cache file: {e}")
                    self.token_info = None
                    return False
                
                logger.info(f"Loaded cached token (expires in {(self.token_info.created_at + self.token_info.expires_in) - int(time.time())} seconds)")
                return True
        except Exception as e:
            logger.error(f"Error loading cached token: {e}")
            # If there's any error reading the cache, try to remove the corrupted file
            try:
                cache_path.unlink()
                logger.info(f"Removed corrupted token cache: {cache_path}")
            except Exception as cleanup_error:
                logger.warning(f"Could not remove corrupted cache file: {cleanup_error}")
            return False
    
    def _save_token_to_cache(self) -> None:
        """Save token to cache file"""
        if not self.token_info:
            return
        
        cache_path = self._get_token_cache_path()
        
        try:
            with open(cache_path, "w") as f:
                json.dump(self.token_info.serialize(), f)
            logger.info(f"Token cached at: {cache_path}")
        except Exception as e:
            logger.error(f"Error saving token to cache: {e}")
    
    def get_auth_url(self) -> str:
        """Generate the Facebook OAuth URL for desktop app flow"""
        return (
            f"https://www.facebook.com/v22.0/dialog/oauth?"
            f"client_id={self.app_id}&"
            f"redirect_uri={self.redirect_uri}&"
            f"scope={AUTH_SCOPE}&"
            f"response_type={AUTH_RESPONSE_TYPE}"
        )
    
    def authenticate(self, force_refresh: bool = False) -> Optional[str]:
        """
        Authenticate with Meta APIs
        
        Args:
            force_refresh: Force token refresh even if cached token exists
            
        Returns:
            Access token if successful, None otherwise
        """
        # If Pipeboard auth is available, use that instead
        if self.use_pipeboard:
            logger.info("Using Pipeboard authentication")
            return pipeboard_auth_manager.get_access_token(force_refresh=force_refresh)
        
        # Otherwise, use the original OAuth flow
        # Check if we already have a valid token
        if not force_refresh and self.token_info and not self.token_info.is_expired():
            return self.token_info.access_token
        
        # Start the callback server if not already running
        try:
            port = start_callback_server()
            
            # Update redirect URI with the actual port
            self.redirect_uri = f"http://localhost:{port}/callback"
            
            # Generate the auth URL
            auth_url = self.get_auth_url()
            
            # Open browser with auth URL
            logger.info(f"Opening browser with URL: {auth_url}")
            webbrowser.open(auth_url)
            
            # We don't wait for the token here anymore
            # The token will be processed by the callback server
            # Just return None to indicate we've started the flow
            return None
        except Exception as e:
            logger.error(f"Failed to start callback server: {e}")
            logger.info("Callback server disabled. OAuth authentication flow cannot be used.")
            return None
    
    def get_access_token(self) -> Optional[str]:
        """
        Get the current access token, refreshing if necessary
        
        Returns:
            Access token if available, None otherwise
        """
        # If using Pipeboard, always delegate to the Pipeboard auth manager
        if self.use_pipeboard:
            return pipeboard_auth_manager.get_access_token()
            
        if not self.token_info or self.token_info.is_expired():
            return None
        
        return self.token_info.access_token
        
    def invalidate_token(self) -> None:
        """Invalidate the current token, usually because it has expired or is invalid"""
        # If using Pipeboard, delegate to the Pipeboard auth manager
        if self.use_pipeboard:
            pipeboard_auth_manager.invalidate_token()
            return
            
        if self.token_info:
            logger.info(f"Invalidating token: {self.token_info.access_token[:10]}...")
            self.token_info = None
            
            # Signal that authentication is needed
            global needs_authentication
            needs_authentication = True
            
            # Remove the cached token file
            try:
                cache_path = self._get_token_cache_path()
                if cache_path.exists():
                    os.remove(cache_path)
                    logger.info(f"Removed cached token file: {cache_path}")
            except Exception as e:
                logger.error(f"Error removing cached token file: {e}")
    
    def clear_token(self) -> None:
        """Alias for invalidate_token for consistency with other APIs"""
        self.invalidate_token()


def process_token_response(token_container):
    """Process the token response from Facebook."""
    global needs_authentication, auth_manager
    
    if token_container and token_container.get('token'):
        logger.info("Processing token response from Facebook OAuth")
        
        # Exchange the short-lived token for a long-lived token
        short_lived_token = token_container['token']
        long_lived_token_info = exchange_token_for_long_lived(short_lived_token)
        
        if long_lived_token_info:
            logger.info(f"Successfully exchanged for long-lived token (expires in {long_lived_token_info.expires_in} seconds)")
            
            try:
                auth_manager.token_info = long_lived_token_info
                logger.info(f"Long-lived token info set in auth_manager, expires in {long_lived_token_info.expires_in} seconds")
            except NameError:
                logger.error("auth_manager not defined when trying to process token")
                
            try:
                logger.info("Attempting to save long-lived token to cache")
                auth_manager._save_token_to_cache()
                logger.info(f"Long-lived token successfully saved to cache at {auth_manager._get_token_cache_path()}")
            except Exception as e:
                logger.error(f"Error saving token to cache: {e}")
                
            needs_authentication = False
            return True
        else:
            # Fall back to the short-lived token if exchange fails
            logger.warning("Failed to exchange for long-lived token, using short-lived token instead")
            token_info = TokenInfo(
                access_token=short_lived_token,
                expires_in=token_container.get('expires_in', 0)
            )
            
            try:
                auth_manager.token_info = token_info
                logger.info(f"Short-lived token info set in auth_manager, expires in {token_info.expires_in} seconds")
            except NameError:
                logger.error("auth_manager not defined when trying to process token")
                
            try:
                logger.info("Attempting to save token to cache")
                auth_manager._save_token_to_cache()
                logger.info(f"Token successfully saved to cache at {auth_manager._get_token_cache_path()}")
            except Exception as e:
                logger.error(f"Error saving token to cache: {e}")
                
            needs_authentication = False
            return True
    else:
        logger.warning("Received empty token in process_token_response")
        needs_authentication = True
        return False


def exchange_token_for_long_lived(short_lived_token):
    """
    Exchange a short-lived token for a long-lived token (60 days validity).
    
    Args:
        short_lived_token: The short-lived access token received from OAuth flow
        
    Returns:
        TokenInfo object with the long-lived token, or None if exchange failed
    """
    logger.info("Attempting to exchange short-lived token for long-lived token")
    
    try:
        # Get the app ID from the configuration
        app_id = meta_config.get_app_id()
        
        # Get the app secret - this should be securely stored
        app_secret = os.environ.get("META_APP_SECRET", "")
        
        if not app_id or not app_secret:
            logger.error("Missing app_id or app_secret for token exchange")
            return None
            
        # Make the API request to exchange the token
        url = "https://graph.facebook.com/v22.0/oauth/access_token"
        params = {
            "grant_type": "fb_exchange_token",
            "client_id": app_id,
            "client_secret": app_secret,
            "fb_exchange_token": short_lived_token
        }
        
        logger.debug(f"Making token exchange request to {url}")
        response = requests.get(url, params=params)
        
        if response.status_code == 200:
            data = response.json()
            logger.debug(f"Token exchange response: {data}")
            
            # Create TokenInfo from the response
            # The response includes access_token and expires_in (in seconds)
            new_token = data.get("access_token")
            expires_in = data.get("expires_in")
            
            if new_token:
                logger.info(f"Received long-lived token, expires in {expires_in} seconds (~{expires_in//86400} days)")
                return TokenInfo(
                    access_token=new_token,
                    expires_in=expires_in
                )
            else:
                logger.error("No access_token in exchange response")
                return None
        else:
            logger.error(f"Token exchange failed with status {response.status_code}: {response.text}")
            return None
    except Exception as e:
        logger.error(f"Error exchanging token: {e}")
        return None


async def get_current_access_token() -> Optional[str]:
    """Get the current access token from auth manager"""
    # Check for environment variable first - this takes highest precedence
    env_token = os.environ.get("META_ACCESS_TOKEN")
    if env_token:
        logger.debug("Using access token from META_ACCESS_TOKEN environment variable")
        # Basic validation
        if len(env_token) < 20:  # Most Meta tokens are much longer
            logger.error(f"TOKEN VALIDATION FAILED: Token from environment variable appears malformed (length: {len(env_token)})")
            return None
        return env_token
        
    # Use the singleton auth manager
    global auth_manager
    
    # Log the function call and current app ID
    logger.debug("get_current_access_token() called")
    app_id = meta_config.get_app_id()
    logger.debug(f"Current app_id: {app_id}")
    
    # Check if using Pipeboard authentication
    using_pipeboard = auth_manager.use_pipeboard
    
    # Check if app_id is valid - but only if not using Pipeboard authentication
    if not app_id and not using_pipeboard:
        logger.error("TOKEN VALIDATION FAILED: No valid app_id configured")
        logger.error("Please set META_APP_ID environment variable or configure via meta_config.set_app_id()")
        return None
    
    # Attempt to get access token
    try:
        token = auth_manager.get_access_token()
        
        if token:
            # Add basic token validation - check if it looks like a valid token
            if len(token) < 20:  # Most Meta tokens are much longer
                logger.error(f"TOKEN VALIDATION FAILED: Token appears malformed (length: {len(token)})")
                auth_manager.invalidate_token()
                return None
                
            logger.debug(f"Access token found in auth_manager (starts with: {token[:10]}...)")
            return token
        else:
            logger.warning("No valid access token available in auth_manager")
            
            # Check why token might be missing
            if hasattr(auth_manager, 'token_info') and auth_manager.token_info:
                if auth_manager.token_info.is_expired():
                    logger.error("TOKEN VALIDATION FAILED: Token is expired")
                    # Add expiration details
                    if hasattr(auth_manager.token_info, 'expires_in') and auth_manager.token_info.expires_in:
                        expiry_time = auth_manager.token_info.created_at + auth_manager.token_info.expires_in
                        current_time = int(time.time())
                        expired_seconds_ago = current_time - expiry_time
                        logger.error(f"Token expired {expired_seconds_ago} seconds ago")
                elif not auth_manager.token_info.access_token:
                    logger.error("TOKEN VALIDATION FAILED: Token object exists but access_token is empty")
                else:
                    logger.error("TOKEN VALIDATION FAILED: Token exists but was rejected for unknown reason")
            else:
                logger.error("TOKEN VALIDATION FAILED: No token information available")
                
            # Suggest next steps for troubleshooting
            logger.error("To fix: Try re-authenticating or check if your token has been revoked")
            return None
    except Exception as e:
        logger.error(f"Error getting access token: {str(e)}")
        import traceback
        logger.error(f"Token validation stacktrace: {traceback.format_exc()}")
        return None


def login():
    """
    Start the login flow to authenticate with Meta
    """
    print("Starting Meta Ads authentication flow...")
    
    try:
        # Start the callback server first
        try:
            port = start_callback_server()
        except Exception as callback_error:
            print(f"Error: {callback_error}")
            print("Callback server is disabled. Please use alternative authentication methods:")
            print("- Set PIPEBOARD_API_TOKEN environment variable for Pipeboard authentication")
            print("- Or provide a direct META_ACCESS_TOKEN environment variable")
            return
        
        # Get the auth URL and open the browser
        auth_url = auth_manager.get_auth_url()
        print(f"Opening browser with URL: {auth_url}")
        webbrowser.open(auth_url)
        
        # Wait for token to be received
        print("Waiting for authentication to complete...")
        max_wait = 300  # 5 minutes
        wait_interval = 2  # 2 seconds
        
        for _ in range(max_wait // wait_interval):
            if token_container["token"]:
                token = token_container["token"]
                print("Authentication successful!")
                # Verify token works by getting basic user info
                try:
                    from .api import make_api_request
                    result = asyncio.run(make_api_request("me", token, {}))
                    print(f"Authenticated as: {result.get('name', 'Unknown')} (ID: {result.get('id', 'Unknown')})")
                    return
                except Exception as e:
                    print(f"Warning: Could not verify token: {e}")
                    return
            time.sleep(wait_interval)
        
        print("Authentication timed out. Please try again.")
    except Exception as e:
        print(f"Error during authentication: {e}")
        print(f"Direct authentication URL: {auth_manager.get_auth_url()}")
        print("You can manually open this URL in your browser to complete authentication.")

# Initialize auth manager with a placeholder - will be updated at runtime
META_APP_ID = os.environ.get("META_APP_ID", "YOUR_META_APP_ID")

# Create the auth manager
auth_manager = AuthManager(META_APP_ID) 
```

--------------------------------------------------------------------------------
/meta_ads_mcp/core/pipeboard_auth.py:
--------------------------------------------------------------------------------

```python
"""Authentication with Meta Ads API via pipeboard.co."""

import os
import json
import time
import requests
from pathlib import Path
import platform
from typing import Optional, Dict, Any
from .utils import logger

# Enable more detailed logging
import logging
logger.setLevel(logging.DEBUG)

# Base URL for pipeboard API
PIPEBOARD_API_BASE = "https://pipeboard.co/api"

# Debug message about API base URL
logger.info(f"Pipeboard API base URL: {PIPEBOARD_API_BASE}")

class TokenInfo:
    """Stores token information including expiration"""
    def __init__(self, access_token: str, expires_at: Optional[str] = None, token_type: Optional[str] = None):
        self.access_token = access_token
        self.expires_at = expires_at
        self.token_type = token_type
        self.created_at = int(time.time())
        logger.debug(f"TokenInfo created. Expires at: {expires_at if expires_at else 'Not specified'}")
    
    def is_expired(self) -> bool:
        """Check if the token is expired"""
        if not self.expires_at:
            logger.debug("No expiration date set for token, assuming not expired")
            return False  # If no expiration is set, assume it's not expired
        
        # Parse ISO 8601 date format to timestamp
        try:
            # Convert the expires_at string to a timestamp
            # Format is like "2023-12-31T23:59:59.999Z" or "2023-12-31T23:59:59.999+00:00"
            from datetime import datetime
            
            # Remove the Z suffix if present and handle +00:00 format
            expires_at_str = self.expires_at
            if expires_at_str.endswith('Z'):
                expires_at_str = expires_at_str[:-1]  # Remove Z
                
            # Handle microseconds if present
            if '.' in expires_at_str:
                datetime_format = "%Y-%m-%dT%H:%M:%S.%f"
            else:
                datetime_format = "%Y-%m-%dT%H:%M:%S"
                
            # Handle timezone offset
            timezone_offset = "+00:00"
            if "+" in expires_at_str:
                expires_at_str, timezone_offset = expires_at_str.split("+")
                timezone_offset = "+" + timezone_offset
            
            # Parse the datetime without timezone info
            expires_datetime = datetime.strptime(expires_at_str, datetime_format)
            
            # Convert to timestamp (assume UTC)
            expires_timestamp = expires_datetime.timestamp()
            current_time = time.time()
            
            # Check if token is expired and log result
            is_expired = current_time > expires_timestamp
            time_diff = expires_timestamp - current_time
            if is_expired:
                logger.debug(f"Token is expired! Current time: {datetime.fromtimestamp(current_time)}, " 
                             f"Expires at: {datetime.fromtimestamp(expires_timestamp)}, "
                             f"Expired {abs(time_diff):.0f} seconds ago")
            else:
                logger.debug(f"Token is still valid. Expires at: {datetime.fromtimestamp(expires_timestamp)}, "
                             f"Time remaining: {time_diff:.0f} seconds")
            
            return is_expired
        except Exception as e:
            logger.error(f"Error parsing expiration date: {e}")
            # Log the actual value to help diagnose format issues
            logger.error(f"Invalid expires_at value: '{self.expires_at}'")
            # Log detailed error information
            import traceback
            logger.error(f"Traceback: {traceback.format_exc()}")
            return False  # If we can't parse the date, assume it's not expired
    
    def serialize(self) -> Dict[str, Any]:
        """Convert to a dictionary for storage"""
        return {
            "access_token": self.access_token,
            "expires_at": self.expires_at,
            "token_type": self.token_type,
            "created_at": self.created_at
        }
    
    @classmethod
    def deserialize(cls, data: Dict[str, Any]) -> 'TokenInfo':
        """Create from a stored dictionary"""
        logger.debug(f"Deserializing token data with keys: {', '.join(data.keys())}")
        if 'expires_at' in data:
            logger.debug(f"Token expires_at from cache: {data['expires_at']}")
        
        token = cls(
            access_token=data.get("access_token", ""),
            expires_at=data.get("expires_at"),
            token_type=data.get("token_type")
        )
        token.created_at = data.get("created_at", int(time.time()))
        return token


class PipeboardAuthManager:
    """Manages authentication with Meta APIs via pipeboard.co"""
    def __init__(self):
        self.api_token = os.environ.get("PIPEBOARD_API_TOKEN", "")
        logger.debug(f"PipeboardAuthManager initialized with API token: {self.api_token[:5]}..." if self.api_token else "No API token")
        if self.api_token:
            logger.info("Pipeboard authentication enabled. Will use pipeboard.co for Meta authentication.")
        else:
            logger.info("Pipeboard authentication not enabled. Set PIPEBOARD_API_TOKEN environment variable to enable.")
        self.token_info = None
        # Note: Token caching is disabled to always fetch fresh tokens from Pipeboard
    
    def _get_token_cache_path(self) -> Path:
        """Get the platform-specific path for token cache file"""
        if platform.system() == "Windows":
            base_path = Path(os.environ.get("APPDATA", ""))
        elif platform.system() == "Darwin":  # macOS
            base_path = Path.home() / "Library" / "Application Support"
        else:  # Assume Linux/Unix
            base_path = Path.home() / ".config"
        
        # Create directory if it doesn't exist
        cache_dir = base_path / "meta-ads-mcp"
        cache_dir.mkdir(parents=True, exist_ok=True)
        
        cache_path = cache_dir / "pipeboard_token_cache.json"
        logger.debug(f"Token cache path: {cache_path}")
        return cache_path
    
    def _load_cached_token(self) -> bool:
        """Load token from cache if available"""
        cache_path = self._get_token_cache_path()
        
        if not cache_path.exists():
            logger.debug(f"Token cache file not found at {cache_path}")
            return False
        
        try:
            with open(cache_path, "r") as f:
                logger.debug(f"Reading token cache from {cache_path}")
                data = json.load(f)
                
                # Validate the cached data structure
                required_fields = ["access_token"]
                if not all(field in data for field in required_fields):
                    logger.warning("Cached token data is missing required fields")
                    return False
                
                # Check if the token looks valid (basic format check)
                if not data.get("access_token") or len(data["access_token"]) < 20:
                    logger.warning("Cached token appears malformed")
                    return False
                
                self.token_info = TokenInfo.deserialize(data)
                
                # Log token details (partial token for security)
                masked_token = self.token_info.access_token[:10] + "..." + self.token_info.access_token[-5:] if self.token_info.access_token else "None"
                logger.debug(f"Loaded token: {masked_token}")
                
                # Check if token is expired
                if self.token_info.is_expired():
                    logger.info("Cached token is expired, removing cache file")
                    # Remove the expired cache file
                    try:
                        cache_path.unlink()
                        logger.info(f"Removed expired token cache: {cache_path}")
                    except Exception as e:
                        logger.warning(f"Could not remove expired cache file: {e}")
                    self.token_info = None
                    return False
                
                # Additional validation: check if token is too old (more than 60 days)
                current_time = int(time.time())
                if self.token_info.created_at and (current_time - self.token_info.created_at) > (60 * 24 * 3600):
                    logger.warning("Cached token is too old (more than 60 days), removing cache file")
                    try:
                        cache_path.unlink()
                        logger.info(f"Removed old token cache: {cache_path}")
                    except Exception as e:
                        logger.warning(f"Could not remove old cache file: {e}")
                    self.token_info = None
                    return False
                
                logger.info(f"Loaded cached token (expires at {self.token_info.expires_at})")
                return True
        except json.JSONDecodeError as e:
            logger.error(f"Error parsing token cache file: {e}")
            logger.debug("Token cache file might be corrupted, trying to read raw content")
            try:
                with open(cache_path, "r") as f:
                    raw_content = f.read()
                logger.debug(f"Raw cache file content (first 100 chars): {raw_content[:100]}")
            except Exception as e2:
                logger.error(f"Could not read raw cache file: {e2}")
            # If there's any error reading the cache, try to remove the corrupted file
            try:
                cache_path.unlink()
                logger.info(f"Removed corrupted token cache: {cache_path}")
            except Exception as cleanup_error:
                logger.warning(f"Could not remove corrupted cache file: {cleanup_error}")
            return False
        except Exception as e:
            logger.error(f"Error loading cached token: {e}")
            # If there's any error reading the cache, try to remove the corrupted file
            try:
                cache_path.unlink()
                logger.info(f"Removed corrupted token cache: {cache_path}")
            except Exception as cleanup_error:
                logger.warning(f"Could not remove corrupted cache file: {cleanup_error}")
            return False
    
    def _save_token_to_cache(self) -> None:
        """Save token to cache file"""
        if not self.token_info:
            logger.debug("No token to save to cache")
            return
        
        cache_path = self._get_token_cache_path()
        
        try:
            token_data = self.token_info.serialize()
            logger.debug(f"Saving token to cache. Expires at: {token_data.get('expires_at')}")
            
            with open(cache_path, "w") as f:
                json.dump(token_data, f)
            logger.info(f"Token cached at: {cache_path}")
        except Exception as e:
            logger.error(f"Error saving token to cache: {e}")
    
    def initiate_auth_flow(self) -> Dict[str, str]:
        """
        Initiate the Meta OAuth flow via pipeboard.co
        
        Returns:
            Dict with loginUrl and status info
        """
        if not self.api_token:
            logger.error("No PIPEBOARD_API_TOKEN environment variable set")
            raise ValueError("No PIPEBOARD_API_TOKEN environment variable set")
            
        # Exactly match the format used in meta_auth_test.sh
        url = f"{PIPEBOARD_API_BASE}/meta/auth?api_token={self.api_token}"
        headers = {
            "Content-Type": "application/json"
        }
        
        logger.info(f"Initiating auth flow with POST request to {url}")
        
        try:
            # Make the POST request exactly as in the working meta_auth_test.sh script
            response = requests.post(url, headers=headers)
            logger.info(f"Auth flow response status: {response.status_code}")
            
            # Better error handling
            if response.status_code != 200:
                logger.error(f"Auth flow error: HTTP {response.status_code}")
                error_text = response.text if response.text else "No response content"
                logger.error(f"Response content: {error_text}")
                if response.status_code == 404:
                    raise ValueError(f"Pipeboard API endpoint not found. Check if the server is running at {PIPEBOARD_API_BASE}")
                elif response.status_code == 401:
                    raise ValueError(f"Unauthorized: Invalid API token. Check your PIPEBOARD_API_TOKEN.")
                
                response.raise_for_status()
            
            # Parse the response    
            try:
                data = response.json()
                logger.info(f"Received response keys: {', '.join(data.keys())}")
            except json.JSONDecodeError:
                logger.error(f"Could not parse JSON response: {response.text}")
                raise ValueError(f"Invalid JSON response from auth endpoint: {response.text[:100]}")
            
            # Log auth flow response (without sensitive information)
            if 'loginUrl' in data:
                logger.info(f"Auth flow initiated successfully with login URL: {data['loginUrl'][:30]}...")
            else:
                logger.warning(f"Auth flow response missing loginUrl field. Response keys: {', '.join(data.keys())}")
            
            return data
        except requests.exceptions.ConnectionError as e:
            logger.error(f"Connection error to Pipeboard: {e}")
            logger.debug(f"Attempting to connect to: {PIPEBOARD_API_BASE}")
            raise
        except requests.exceptions.Timeout as e:
            logger.error(f"Timeout connecting to Pipeboard: {e}")
            raise
        except requests.exceptions.RequestException as e:
            logger.error(f"Error initiating auth flow: {e}")
            raise
        except Exception as e:
            logger.error(f"Unexpected error initiating auth flow: {e}")
            raise
    
    def get_access_token(self, force_refresh: bool = False) -> Optional[str]:
        """
        Get the current access token, refreshing if necessary or if forced
        
        Args:
            force_refresh: Force token refresh even if cached token exists
            
        Returns:
            Access token if available, None otherwise
        """
        # First check if API token is configured
        if not self.api_token:
            logger.error("TOKEN VALIDATION FAILED: No Pipeboard API token configured")
            logger.error("Please set PIPEBOARD_API_TOKEN environment variable")
            return None
            
        logger.info("Getting fresh token from Pipeboard (caching disabled)")
        
        # If force refresh or no token/expired token, get a new one from Pipeboard
        try:
            # Make a request to get the token, using the same URL format as initiate_auth_flow
            url = f"{PIPEBOARD_API_BASE}/meta/token?api_token={self.api_token}"
            headers = {
                "Content-Type": "application/json"
            }
            
            logger.info(f"Requesting token from {url}")
            
            # Add timeout for better error messages
            try:
                response = requests.get(url, headers=headers, timeout=10)
            except requests.exceptions.Timeout:
                logger.error("TOKEN VALIDATION FAILED: Timeout while connecting to Pipeboard API")
                logger.error(f"Could not connect to {PIPEBOARD_API_BASE} within 10 seconds")
                return None
            except requests.exceptions.ConnectionError:
                logger.error("TOKEN VALIDATION FAILED: Connection error with Pipeboard API")
                logger.error(f"Could not connect to {PIPEBOARD_API_BASE} - check if service is running")
                return None
                
            logger.info(f"Token request response status: {response.status_code}")
            
            # Better error handling with response content
            if response.status_code != 200:
                logger.error(f"TOKEN VALIDATION FAILED: HTTP error {response.status_code}")
                error_text = response.text if response.text else "No response content"
                logger.error(f"Response content: {error_text}")
                
                # Add more specific error messages for common status codes
                if response.status_code == 401:
                    logger.error("Authentication failed: Invalid Pipeboard API token")
                elif response.status_code == 404:
                    logger.error("Endpoint not found: Check if Pipeboard API service is running correctly")
                elif response.status_code == 400:
                    logger.error("Bad request: The request to Pipeboard API was malformed")
                    
                response.raise_for_status()
                
            try:
                data = response.json()
                logger.info(f"Received token response with keys: {', '.join(data.keys())}")
            except json.JSONDecodeError:
                logger.error("TOKEN VALIDATION FAILED: Invalid JSON response from Pipeboard API")
                logger.error(f"Response content (first 100 chars): {response.text[:100]}")
                return None
            
            # Validate response data
            if "access_token" not in data:
                logger.error("TOKEN VALIDATION FAILED: No access_token in Pipeboard API response")
                logger.error(f"Response keys: {', '.join(data.keys())}")
                if "error" in data:
                    logger.error(f"Error details: {data['error']}")
                else:
                    logger.error("No error information available in response")
                return None
                
            # Create new token info
            self.token_info = TokenInfo(
                access_token=data.get("access_token"),
                expires_at=data.get("expires_at"),
                token_type=data.get("token_type", "bearer")
            )
            
            # Note: Token caching is disabled
            
            masked_token = self.token_info.access_token[:10] + "..." + self.token_info.access_token[-5:] if self.token_info.access_token else "None"
            logger.info(f"Successfully retrieved access token: {masked_token}")
            return self.token_info.access_token
        except requests.RequestException as e:
            status_code = e.response.status_code if hasattr(e, 'response') and e.response else None
            response_text = e.response.text if hasattr(e, 'response') and e.response else "No response"
            
            if status_code == 401:
                logger.error(f"Unauthorized: Check your PIPEBOARD_API_TOKEN. Response: {response_text}")
            elif status_code == 404:
                logger.error(f"No token available: You might need to complete authorization first. Response: {response_text}")
                # Return None so caller can handle the auth flow
                return None
            else:
                logger.error(f"Error getting access token (status {status_code}): {e}")
                logger.error(f"Response content: {response_text}")
            return None
        except Exception as e:
            logger.error(f"Unexpected error getting access token: {e}")
            return None
        
    def invalidate_token(self) -> None:
        """Invalidate the current token, usually because it has expired or is invalid"""
        if self.token_info:
            logger.info(f"Invalidating token: {self.token_info.access_token[:10]}...")
            self.token_info = None
            
            # Remove the cached token file
            try:
                cache_path = self._get_token_cache_path()
                if cache_path.exists():
                    os.remove(cache_path)
                    logger.info(f"Removed cached token file: {cache_path}")
                else:
                    logger.debug(f"No token cache file to remove: {cache_path}")
            except Exception as e:
                logger.error(f"Error removing cached token file: {e}")
        else:
            logger.debug("No token to invalidate")

    def test_token_validity(self) -> bool:
        """
        Test if the current token is valid with the Meta Graph API
        
        Returns:
            True if valid, False otherwise
        """
        if not self.token_info or not self.token_info.access_token:
            logger.debug("No token to test")
            logger.error("TOKEN VALIDATION FAILED: Missing token to test")
            return False
            
        # Log token details for debugging (partial token for security)
        masked_token = self.token_info.access_token[:5] + "..." + self.token_info.access_token[-5:] if self.token_info.access_token else "None"
        token_type = self.token_info.token_type if hasattr(self.token_info, 'token_type') and self.token_info.token_type else "bearer"
        logger.debug(f"Testing token validity (token: {masked_token}, type: {token_type})")
            
        try:
            # Make a simple request to the /me endpoint to test the token
            META_GRAPH_API_VERSION = "v22.0"
            url = f"https://graph.facebook.com/{META_GRAPH_API_VERSION}/me"
            headers = {"Authorization": f"Bearer {self.token_info.access_token}"}
            
            logger.debug(f"Testing token validity with request to {url}")
            
            # Add timeout and better error handling
            try:
                response = requests.get(url, headers=headers, timeout=10)
            except requests.exceptions.Timeout:
                logger.error("TOKEN VALIDATION FAILED: Timeout while connecting to Meta API")
                logger.error("The Graph API did not respond within 10 seconds")
                return False
            except requests.exceptions.ConnectionError:
                logger.error("TOKEN VALIDATION FAILED: Connection error with Meta API")
                logger.error("Could not establish connection to Graph API - check network connectivity")
                return False
            
            if response.status_code == 200:
                data = response.json()
                logger.debug(f"Token is valid. User ID: {data.get('id')}")
                # Add more useful user information for debugging
                user_info = f"User ID: {data.get('id')}"
                if 'name' in data:
                    user_info += f", Name: {data.get('name')}"
                logger.info(f"Meta API token validated successfully ({user_info})")
                return True
            else:
                logger.error(f"TOKEN VALIDATION FAILED: API returned status {response.status_code}")
                
                # Try to parse the error response for more detailed information
                try:
                    error_data = response.json()
                    if 'error' in error_data:
                        error_obj = error_data.get('error', {})
                        error_code = error_obj.get('code', 'unknown')
                        error_message = error_obj.get('message', 'Unknown error')
                        logger.error(f"Meta API error: Code {error_code} - {error_message}")
                        
                        # Add specific guidance for common error codes
                        if error_code == 190:
                            logger.error("Error indicates the token is invalid or has expired")
                        elif error_code == 4:
                            logger.error("Error indicates rate limiting - too many requests")
                        elif error_code == 200:
                            logger.error("Error indicates API permissions or configuration issue")
                    else:
                        logger.error(f"No error object in response: {error_data}")
                except json.JSONDecodeError:
                    logger.error(f"Could not parse error response: {response.text[:200]}")
                
                return False
        except Exception as e:
            logger.error(f"TOKEN VALIDATION FAILED: Unexpected error: {str(e)}")
            
            # Add stack trace for debugging complex issues
            import traceback
            logger.error(f"Stack trace: {traceback.format_exc()}")
            
            return False


# Create singleton instance
pipeboard_auth_manager = PipeboardAuthManager() 
```
Page 3/5FirstPrevNextLast