This is page 3 of 5. Use http://codebase.md/nictuku/meta-ads-mcp?page={x} to view the full context.
# Directory Structure
```
├── .github
│ └── workflows
│ ├── publish-mcp.yml
│ ├── publish.yml
│ └── test.yml
├── .gitignore
├── .python-version
├── .uv.toml
├── CUSTOM_META_APP.md
├── Dockerfile
├── examples
│ ├── example_http_client.py
│ └── README.md
├── future_improvements.md
├── images
│ └── meta-ads-example.png
├── LICENSE
├── LOCAL_INSTALLATION.md
├── meta_ads_auth.sh
├── meta_ads_mcp
│ ├── __init__.py
│ ├── __main__.py
│ └── core
│ ├── __init__.py
│ ├── accounts.py
│ ├── ads_library.py
│ ├── ads.py
│ ├── adsets.py
│ ├── api.py
│ ├── auth.py
│ ├── authentication.py
│ ├── budget_schedules.py
│ ├── callback_server.py
│ ├── campaigns.py
│ ├── duplication.py
│ ├── http_auth_integration.py
│ ├── insights.py
│ ├── openai_deep_research.py
│ ├── pipeboard_auth.py
│ ├── reports.py
│ ├── resources.py
│ ├── server.py
│ ├── targeting.py
│ └── utils.py
├── META_API_NOTES.md
├── poetry.lock
├── pyproject.toml
├── README.md
├── RELEASE.md
├── requirements.txt
├── server.json
├── setup.py
├── smithery.yaml
├── STREAMABLE_HTTP_SETUP.md
└── tests
├── __init__.py
├── conftest.py
├── e2e_account_info_search_issue.py
├── README_REGRESSION_TESTS.md
├── README.md
├── test_account_info_access_fix.py
├── test_account_search.py
├── test_budget_update_e2e.py
├── test_budget_update.py
├── test_create_ad_creative_simple.py
├── test_create_simple_creative_e2e.py
├── test_dsa_beneficiary.py
├── test_dsa_integration.py
├── test_duplication_regression.py
├── test_duplication.py
├── test_dynamic_creatives.py
├── test_estimate_audience_size_e2e.py
├── test_estimate_audience_size.py
├── test_get_account_pages.py
├── test_get_ad_creatives_fix.py
├── test_get_ad_image_quality_improvements.py
├── test_get_ad_image_regression.py
├── test_http_transport.py
├── test_insights_actions_and_values_e2e.py
├── test_insights_pagination.py
├── test_integration_openai_mcp.py
├── test_is_dynamic_creative_adset.py
├── test_mobile_app_adset_creation.py
├── test_mobile_app_adset_issue.py
├── test_openai_mcp_deep_research.py
├── test_openai.py
├── test_page_discovery_integration.py
├── test_page_discovery.py
├── test_targeting_search_e2e.py
├── test_targeting.py
├── test_update_ad_creative_id.py
└── test_upload_ad_image.py
```
# Files
--------------------------------------------------------------------------------
/tests/test_estimate_audience_size.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Unit tests for estimate_audience_size functionality in Meta Ads MCP.
This module tests the new estimate_audience_size function that replaces validate_interests
and provides comprehensive audience estimation using Meta's reachestimate API.
"""
import pytest
import json
from unittest.mock import AsyncMock, patch
from meta_ads_mcp.core.targeting import estimate_audience_size
class TestEstimateAudienceSize:
"""Test cases for estimate_audience_size function"""
@pytest.mark.asyncio
async def test_comprehensive_audience_estimation_success(self):
"""Test successful comprehensive audience estimation with complex targeting"""
mock_response = {
"data": [
{
"estimate_mau": 1500000,
"estimate_dau": [
{"min_reach": 100000, "max_reach": 150000, "bid": 100},
{"min_reach": 200000, "max_reach": 250000, "bid": 200}
],
"bid_estimates": {
"median": 150,
"min": 50,
"max": 300
},
"unsupported_targeting": []
}
]
}
targeting_spec = {
"age_min": 25,
"age_max": 65,
"geo_locations": {"countries": ["US"]},
"flexible_spec": [
{"interests": [{"id": "6003371567474"}]}
]
}
with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
mock_api.return_value = mock_response
result = await estimate_audience_size(
access_token="test_token",
account_id="act_123456789",
targeting=targeting_spec,
optimization_goal="REACH"
)
# Verify API call
mock_api.assert_called_once_with(
"act_123456789/reachestimate",
"test_token",
{
"targeting_spec": targeting_spec
},
method="GET"
)
# Verify response format
result_data = json.loads(result)
assert result_data["success"] is True
assert result_data["account_id"] == "act_123456789"
assert result_data["targeting"] == targeting_spec
assert result_data["optimization_goal"] == "REACH"
assert result_data["estimated_audience_size"] == 1500000
assert "estimate_details" in result_data
assert result_data["estimate_details"]["monthly_active_users"] == 1500000
@pytest.mark.asyncio
async def test_different_optimization_goals(self):
"""Test audience estimation with different optimization goals (parameter is preserved in response)"""
mock_response = {
"data": [
{
"estimate_mau": 800000,
"estimate_dau": [],
"bid_estimates": {},
"unsupported_targeting": []
}
]
}
targeting_spec = {
"age_min": 18,
"age_max": 45,
"geo_locations": {"countries": ["US"]}
}
# Test different optimization goals - they should all use the same reachestimate endpoint
test_goals = ["REACH", "LINK_CLICKS", "LANDING_PAGE_VIEWS", "CONVERSIONS", "APP_INSTALLS"]
with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
mock_api.return_value = mock_response
for optimization_goal in test_goals:
mock_api.reset_mock()
result = await estimate_audience_size(
access_token="test_token",
account_id="act_123456789",
targeting=targeting_spec,
optimization_goal=optimization_goal
)
# Verify API call uses reachestimate endpoint with simplified parameters
mock_api.assert_called_once_with(
"act_123456789/reachestimate",
"test_token",
{
"targeting_spec": targeting_spec
},
method="GET"
)
result_data = json.loads(result)
assert result_data["success"] is True
assert result_data["optimization_goal"] == optimization_goal
@pytest.mark.asyncio
async def test_backwards_compatibility_interest_names(self):
"""Test backwards compatibility with interest name validation"""
mock_response = {
"data": [
{
"name": "Japan",
"valid": True,
"id": 6003700426513,
"audience_size": 68310258
},
{
"name": "invalidinterest",
"valid": False
}
]
}
with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
mock_api.return_value = mock_response
result = await estimate_audience_size(
access_token="test_token",
interest_list=["Japan", "invalidinterest"]
)
# Verify it uses the old validation API
mock_api.assert_called_once_with(
"search",
"test_token",
{
"type": "adinterestvalid",
"interest_list": '["Japan", "invalidinterest"]'
}
)
# Verify response matches old format
result_data = json.loads(result)
assert result_data == mock_response
assert result_data["data"][0]["valid"] is True
assert result_data["data"][1]["valid"] is False
@pytest.mark.asyncio
async def test_backwards_compatibility_interest_fbids(self):
"""Test backwards compatibility with interest FBID validation"""
mock_response = {
"data": [
{
"id": "6003700426513",
"valid": True,
"audience_size": 68310258
}
]
}
with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
mock_api.return_value = mock_response
result = await estimate_audience_size(
access_token="test_token",
interest_fbid_list=["6003700426513"]
)
# Verify it uses the old validation API
mock_api.assert_called_once_with(
"search",
"test_token",
{
"type": "adinterestvalid",
"interest_fbid_list": '["6003700426513"]'
}
)
# Verify response matches old format
result_data = json.loads(result)
assert result_data == mock_response
assert result_data["data"][0]["valid"] is True
@pytest.mark.asyncio
async def test_backwards_compatibility_both_interest_params(self):
"""Test backwards compatibility with both interest names and FBIDs"""
mock_response = {
"data": [
{"name": "Japan", "valid": True, "id": 6003700426513, "audience_size": 68310258},
{"id": "6003397425735", "valid": True, "audience_size": 12345678}
]
}
with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
mock_api.return_value = mock_response
result = await estimate_audience_size(
access_token="test_token",
interest_list=["Japan"],
interest_fbid_list=["6003397425735"]
)
# Verify both parameters are passed
mock_api.assert_called_once_with(
"search",
"test_token",
{
"type": "adinterestvalid",
"interest_list": '["Japan"]',
"interest_fbid_list": '["6003397425735"]'
}
)
result_data = json.loads(result)
assert result_data == mock_response
@pytest.mark.asyncio
async def test_error_no_account_id_for_comprehensive(self):
"""Test error when account_id missing for comprehensive estimation"""
targeting_spec = {
"age_min": 25,
"age_max": 65,
"geo_locations": {"countries": ["US"]}
}
result = await estimate_audience_size(
access_token="test_token",
targeting=targeting_spec
)
result_data = json.loads(result)
# The @meta_api_tool decorator wraps errors in a 'data' field
assert "data" in result_data
nested_data = json.loads(result_data["data"])
assert "error" in nested_data
assert "account_id is required" in nested_data["error"]
assert "details" in nested_data
@pytest.mark.asyncio
async def test_error_no_targeting_for_comprehensive(self):
"""Test error when targeting missing for comprehensive estimation"""
result = await estimate_audience_size(
access_token="test_token",
account_id="act_123456789"
)
result_data = json.loads(result)
# The @meta_api_tool decorator wraps errors in a 'data' field
assert "data" in result_data
nested_data = json.loads(result_data["data"])
assert "error" in nested_data
assert "targeting specification is required" in nested_data["error"]
assert "example" in nested_data
@pytest.mark.asyncio
async def test_error_no_parameters(self):
"""Test error when no parameters provided"""
# Since we're using the @meta_api_tool decorator, we need to simulate
# its behavior for error handling
with patch('meta_ads_mcp.core.auth.get_current_access_token') as mock_auth:
mock_auth.return_value = "test_token"
result = await estimate_audience_size()
result_data = json.loads(result)
# The @meta_api_tool decorator wraps errors in a 'data' field
assert "data" in result_data
nested_data = json.loads(result_data["data"])
assert "error" in nested_data
@pytest.mark.asyncio
async def test_error_backwards_compatibility_no_interests(self):
"""Test error in backwards compatibility mode with no interest parameters"""
result = await estimate_audience_size(
access_token="test_token"
)
result_data = json.loads(result)
# The @meta_api_tool decorator wraps errors in a 'data' field
assert "data" in result_data
nested_data = json.loads(result_data["data"])
assert "error" in nested_data
@pytest.mark.asyncio
async def test_api_error_handling(self):
"""Test handling of API errors from reachestimate"""
targeting_spec = {
"age_min": 25,
"age_max": 65,
"geo_locations": {"countries": ["US"]}
}
# Simulate API exception
with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
mock_api.side_effect = Exception("API connection failed")
result = await estimate_audience_size(
access_token="test_token",
account_id="act_123456789",
targeting=targeting_spec
)
result_data = json.loads(result)
# The @meta_api_tool decorator wraps errors in a 'data' field
assert "data" in result_data
nested_data = json.loads(result_data["data"])
assert "error" in nested_data
assert "Failed to get audience estimation from reachestimate endpoint" in nested_data["error"]
assert "details" in nested_data
@pytest.mark.asyncio
async def test_empty_api_response(self):
"""Test handling of empty response from reachestimate API"""
targeting_spec = {
"age_min": 25,
"age_max": 65,
"geo_locations": {"countries": ["US"]}
}
# Simulate empty API response
mock_response = {"data": []}
with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
mock_api.return_value = mock_response
result = await estimate_audience_size(
access_token="test_token",
account_id="act_123456789",
targeting=targeting_spec
)
result_data = json.loads(result)
# The @meta_api_tool decorator wraps errors in a 'data' field
assert "data" in result_data
nested_data = json.loads(result_data["data"])
assert "error" in nested_data
assert "No estimation data returned" in nested_data["error"]
assert "raw_response" in nested_data
@pytest.mark.asyncio
async def test_comprehensive_estimation_with_minimal_targeting(self):
"""Test comprehensive estimation with minimal targeting requirements"""
mock_response = {
"data": [
{
"estimate_mau": 500000,
"estimate_dau": [],
"bid_estimates": {},
"unsupported_targeting": []
}
]
}
minimal_targeting = {
"age_min": 18,
"age_max": 65,
"geo_locations": {"countries": ["US"]}
}
with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
mock_api.return_value = mock_response
result = await estimate_audience_size(
access_token="test_token",
account_id="act_123456789",
targeting=minimal_targeting
)
result_data = json.loads(result)
assert result_data["success"] is True
assert result_data["estimated_audience_size"] == 500000
assert result_data["targeting"] == minimal_targeting
@pytest.mark.asyncio
async def test_estimate_details_structure(self):
"""Test that estimate_details contains expected structure"""
mock_response = {
"data": [
{
"estimate_mau": 2000000,
"estimate_dau": [
{"min_reach": 150000, "max_reach": 200000, "bid": 120}
],
"bid_estimates": {
"median": 180,
"min": 80,
"max": 350
},
"unsupported_targeting": ["custom_audiences"]
}
]
}
targeting_spec = {
"age_min": 25,
"age_max": 45,
"geo_locations": {"countries": ["US"]},
"flexible_spec": [
{"interests": [{"id": "6003371567474"}, {"id": "6003462346642"}]}
]
}
with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
mock_api.return_value = mock_response
result = await estimate_audience_size(
access_token="test_token",
account_id="act_123456789",
targeting=targeting_spec,
optimization_goal="CONVERSIONS"
)
result_data = json.loads(result)
assert result_data["success"] is True
# Check estimate_details structure
details = result_data["estimate_details"]
assert "monthly_active_users" in details
assert "daily_outcomes_curve" in details
assert "bid_estimate" in details
assert "unsupported_targeting" in details
assert details["monthly_active_users"] == 2000000
assert len(details["daily_outcomes_curve"]) == 1
assert details["bid_estimate"]["median"] == 180
assert "custom_audiences" in details["unsupported_targeting"]
@pytest.mark.asyncio
async def test_function_registration(self):
"""Test that the function is properly registered as an MCP tool"""
# This test verifies the function has the correct decorators
assert hasattr(estimate_audience_size, '__wrapped__') # From @meta_api_tool
# Verify function signature
import inspect
sig = inspect.signature(estimate_audience_size)
expected_params = [
'access_token', 'account_id', 'targeting', 'optimization_goal',
'interest_list', 'interest_fbid_list'
]
for param in expected_params:
assert param in sig.parameters
# Verify default values
assert sig.parameters['optimization_goal'].default == "REACH"
assert sig.parameters['access_token'].default is None
assert sig.parameters['targeting'].default is None
```
--------------------------------------------------------------------------------
/meta_ads_mcp/core/openai_deep_research.py:
--------------------------------------------------------------------------------
```python
"""OpenAI MCP Deep Research tools for Meta Ads API.
This module implements the required 'search' and 'fetch' tools for OpenAI's
ChatGPT Deep Research feature, providing access to Meta Ads data in the format
expected by ChatGPT.
The tools expose Meta Ads data (accounts, campaigns, ads, etc.) as searchable
and fetchable records for ChatGPT Deep Research analysis.
"""
import json
import re
from typing import List, Dict, Any, Optional
from .api import meta_api_tool, make_api_request
from .server import mcp_server
from .utils import logger
class MetaAdsDataManager:
"""Manages Meta Ads data for OpenAI MCP search and fetch operations"""
def __init__(self):
self._cache = {}
logger.debug("MetaAdsDataManager initialized")
async def _get_ad_accounts(self, access_token: str, limit: int = 200) -> List[Dict[str, Any]]:
"""Get ad accounts data"""
try:
endpoint = "me/adaccounts"
params = {
"fields": "id,name,account_id,account_status,amount_spent,balance,currency,business_city,business_country_code",
"limit": limit
}
data = await make_api_request(endpoint, access_token, params)
if "data" in data:
return data["data"]
return []
except Exception as e:
logger.error(f"Error fetching ad accounts: {e}")
return []
async def _get_campaigns(self, access_token: str, account_id: str, limit: int = 25) -> List[Dict[str, Any]]:
"""Get campaigns data for an account"""
try:
endpoint = f"{account_id}/campaigns"
params = {
"fields": "id,name,status,objective,daily_budget,lifetime_budget,start_time,stop_time,created_time,updated_time",
"limit": limit
}
data = await make_api_request(endpoint, access_token, params)
if "data" in data:
return data["data"]
return []
except Exception as e:
logger.error(f"Error fetching campaigns for {account_id}: {e}")
return []
async def _get_ads(self, access_token: str, account_id: str, limit: int = 25) -> List[Dict[str, Any]]:
"""Get ads data for an account"""
try:
endpoint = f"{account_id}/ads"
params = {
"fields": "id,name,status,creative,targeting,bid_amount,created_time,updated_time",
"limit": limit
}
data = await make_api_request(endpoint, access_token, params)
if "data" in data:
return data["data"]
return []
except Exception as e:
logger.error(f"Error fetching ads for {account_id}: {e}")
return []
async def _get_pages_for_account(self, access_token: str, account_id: str) -> List[Dict[str, Any]]:
"""Get pages associated with an account"""
try:
# Import the page discovery function from ads module
from .ads import _discover_pages_for_account
# Ensure account_id has the 'act_' prefix
if not account_id.startswith("act_"):
account_id = f"act_{account_id}"
page_discovery_result = await _discover_pages_for_account(account_id, access_token)
if not page_discovery_result.get("success"):
return []
# Return page data in a consistent format
return [{
"id": page_discovery_result["page_id"],
"name": page_discovery_result.get("page_name", "Unknown"),
"source": page_discovery_result.get("source", "unknown"),
"account_id": account_id
}]
except Exception as e:
logger.error(f"Error fetching pages for {account_id}: {e}")
return []
async def _get_businesses(self, access_token: str, user_id: str = "me", limit: int = 25) -> List[Dict[str, Any]]:
"""Get businesses accessible by the current user"""
try:
endpoint = f"{user_id}/businesses"
params = {
"fields": "id,name,created_time,verification_status",
"limit": limit
}
data = await make_api_request(endpoint, access_token, params)
if "data" in data:
return data["data"]
return []
except Exception as e:
logger.error(f"Error fetching businesses: {e}")
return []
async def search_records(self, query: str, access_token: str) -> List[str]:
"""Search Meta Ads data and return matching record IDs
Args:
query: Search query string
access_token: Meta API access token
Returns:
List of record IDs that match the query
"""
logger.info(f"Searching Meta Ads data with query: {query}")
# Normalize query for matching
query_lower = query.lower()
query_terms = re.findall(r'\w+', query_lower)
matching_ids = []
try:
# Search ad accounts
accounts = await self._get_ad_accounts(access_token, limit=200)
for account in accounts:
account_text = f"{account.get('name', '')} {account.get('id', '')} {account.get('account_status', '')} {account.get('business_city', '')} {account.get('business_country_code', '')}".lower()
if any(term in account_text for term in query_terms):
record_id = f"account:{account['id']}"
matching_ids.append(record_id)
# Cache the account data
self._cache[record_id] = {
"id": record_id,
"type": "account",
"title": f"Ad Account: {account.get('name', 'Unnamed Account')}",
"text": f"Meta Ads Account {account.get('name', 'Unnamed')} (ID: {account.get('id', 'N/A')}) - Status: {account.get('account_status', 'Unknown')}, Currency: {account.get('currency', 'Unknown')}, Spent: ${account.get('amount_spent', 0)}, Balance: ${account.get('balance', 0)}",
"metadata": {
"account_id": account.get('id'),
"account_name": account.get('name'),
"status": account.get('account_status'),
"currency": account.get('currency'),
"business_location": f"{account.get('business_city', '')}, {account.get('business_country_code', '')}".strip(', '),
"data_type": "meta_ads_account"
},
"raw_data": account
}
# Also search campaigns for this account if it matches
campaigns = await self._get_campaigns(access_token, account['id'], limit=10)
for campaign in campaigns:
campaign_text = f"{campaign.get('name', '')} {campaign.get('objective', '')} {campaign.get('status', '')}".lower()
if any(term in campaign_text for term in query_terms):
campaign_record_id = f"campaign:{campaign['id']}"
matching_ids.append(campaign_record_id)
# Cache the campaign data
self._cache[campaign_record_id] = {
"id": campaign_record_id,
"type": "campaign",
"title": f"Campaign: {campaign.get('name', 'Unnamed Campaign')}",
"text": f"Meta Ads Campaign {campaign.get('name', 'Unnamed')} (ID: {campaign.get('id', 'N/A')}) - Objective: {campaign.get('objective', 'Unknown')}, Status: {campaign.get('status', 'Unknown')}, Daily Budget: ${campaign.get('daily_budget', 'Not set')}, Account: {account.get('name', 'Unknown')}",
"metadata": {
"campaign_id": campaign.get('id'),
"campaign_name": campaign.get('name'),
"objective": campaign.get('objective'),
"status": campaign.get('status'),
"account_id": account.get('id'),
"account_name": account.get('name'),
"data_type": "meta_ads_campaign"
},
"raw_data": campaign
}
# If query specifically mentions "ads" or "ad", also search individual ads
if any(term in ['ad', 'ads', 'advertisement', 'creative'] for term in query_terms):
for account in accounts[:3]: # Limit to first 3 accounts for performance
ads = await self._get_ads(access_token, account['id'], limit=10)
for ad in ads:
ad_text = f"{ad.get('name', '')} {ad.get('status', '')}".lower()
if any(term in ad_text for term in query_terms):
ad_record_id = f"ad:{ad['id']}"
matching_ids.append(ad_record_id)
# Cache the ad data
self._cache[ad_record_id] = {
"id": ad_record_id,
"type": "ad",
"title": f"Ad: {ad.get('name', 'Unnamed Ad')}",
"text": f"Meta Ad {ad.get('name', 'Unnamed')} (ID: {ad.get('id', 'N/A')}) - Status: {ad.get('status', 'Unknown')}, Bid Amount: ${ad.get('bid_amount', 'Not set')}, Account: {account.get('name', 'Unknown')}",
"metadata": {
"ad_id": ad.get('id'),
"ad_name": ad.get('name'),
"status": ad.get('status'),
"account_id": account.get('id'),
"account_name": account.get('name'),
"data_type": "meta_ads_ad"
},
"raw_data": ad
}
# If query specifically mentions "page" or "pages", also search pages
if any(term in ['page', 'pages', 'facebook page'] for term in query_terms):
for account in accounts[:5]: # Limit to first 5 accounts for performance
pages = await self._get_pages_for_account(access_token, account['id'])
for page in pages:
page_text = f"{page.get('name', '')} {page.get('source', '')}".lower()
if any(term in page_text for term in query_terms):
page_record_id = f"page:{page['id']}"
matching_ids.append(page_record_id)
# Cache the page data
self._cache[page_record_id] = {
"id": page_record_id,
"type": "page",
"title": f"Facebook Page: {page.get('name', 'Unnamed Page')}",
"text": f"Facebook Page {page.get('name', 'Unnamed')} (ID: {page.get('id', 'N/A')}) - Source: {page.get('source', 'Unknown')}, Account: {account.get('name', 'Unknown')}",
"metadata": {
"page_id": page.get('id'),
"page_name": page.get('name'),
"source": page.get('source'),
"account_id": account.get('id'),
"account_name": account.get('name'),
"data_type": "meta_ads_page"
},
"raw_data": page
}
# If query specifically mentions "business" or "businesses", also search businesses
if any(term in ['business', 'businesses', 'company', 'companies'] for term in query_terms):
businesses = await self._get_businesses(access_token, limit=25)
for business in businesses:
business_text = f"{business.get('name', '')} {business.get('verification_status', '')}".lower()
if any(term in business_text for term in query_terms):
business_record_id = f"business:{business['id']}"
matching_ids.append(business_record_id)
# Cache the business data
self._cache[business_record_id] = {
"id": business_record_id,
"type": "business",
"title": f"Business: {business.get('name', 'Unnamed Business')}",
"text": f"Meta Business {business.get('name', 'Unnamed')} (ID: {business.get('id', 'N/A')}) - Created: {business.get('created_time', 'Unknown')}, Verification: {business.get('verification_status', 'Unknown')}",
"metadata": {
"business_id": business.get('id'),
"business_name": business.get('name'),
"created_time": business.get('created_time'),
"verification_status": business.get('verification_status'),
"data_type": "meta_ads_business"
},
"raw_data": business
}
except Exception as e:
logger.error(f"Error during search operation: {e}")
# Return empty list on error, but don't raise exception
return []
logger.info(f"Search completed. Found {len(matching_ids)} matching records")
return matching_ids[:50] # Limit to 50 results for performance
def fetch_record(self, record_id: str) -> Optional[Dict[str, Any]]:
"""Fetch a cached record by ID
Args:
record_id: The record ID to fetch
Returns:
Record data or None if not found
"""
logger.info(f"Fetching record: {record_id}")
record = self._cache.get(record_id)
if record:
logger.debug(f"Record found in cache: {record['type']}")
return record
else:
logger.warning(f"Record not found in cache: {record_id}")
return None
# Global data manager instance
_data_manager = MetaAdsDataManager()
@mcp_server.tool()
@meta_api_tool
async def search(
query: str,
access_token: Optional[str] = None
) -> str:
"""
Search through Meta Ads data and return matching record IDs.
It searches across ad accounts, campaigns, ads, pages, and businesses to find relevant records
based on the provided query.
Args:
query: Search query string to find relevant Meta Ads records
access_token: Meta API access token (optional - will use cached token if not provided)
Returns:
JSON response with list of matching record IDs
Example Usage:
search(query="active campaigns")
search(query="account spending")
search(query="facebook ads performance")
search(query="facebook pages")
search(query="user businesses")
"""
if not query:
return json.dumps({
"error": "query parameter is required",
"ids": []
}, indent=2)
try:
# Use the data manager to search records
matching_ids = await _data_manager.search_records(query, access_token)
response = {
"ids": matching_ids,
"query": query,
"total_results": len(matching_ids)
}
logger.info(f"Search successful. Query: '{query}', Results: {len(matching_ids)}")
return json.dumps(response, indent=2)
except Exception as e:
error_msg = str(e)
logger.error(f"Error in search tool: {error_msg}")
return json.dumps({
"error": "Failed to search Meta Ads data",
"details": error_msg,
"ids": [],
"query": query
}, indent=2)
@mcp_server.tool()
async def fetch(
id: str
) -> str:
"""
Fetch complete record data by ID.
It retrieves the full data for a specific record identified by its ID.
Args:
id: The record ID to fetch (format: "type:id", e.g., "account:act_123456")
Returns:
JSON response with complete record data including id, title, text, and metadata
Example Usage:
fetch(id="account:act_123456789")
fetch(id="campaign:23842588888640185")
fetch(id="ad:23842614006130185")
fetch(id="page:123456789")
"""
if not id:
return json.dumps({
"error": "id parameter is required"
}, indent=2)
try:
# Use the data manager to fetch the record
record = _data_manager.fetch_record(id)
if record:
logger.info(f"Record fetched successfully: {id}")
return json.dumps(record, indent=2)
else:
logger.warning(f"Record not found: {id}")
return json.dumps({
"error": f"Record not found: {id}",
"id": id
}, indent=2)
except Exception as e:
error_msg = str(e)
logger.error(f"Error in fetch tool: {error_msg}")
return json.dumps({
"error": "Failed to fetch record",
"details": error_msg,
"id": id
}, indent=2)
```
--------------------------------------------------------------------------------
/meta_ads_mcp/core/duplication.py:
--------------------------------------------------------------------------------
```python
"""Duplication functionality for Meta Ads API."""
import json
import os
import httpx
from typing import Optional, Dict, Any, List, Union
from .server import mcp_server
from .api import meta_api_tool
from . import auth
from .http_auth_integration import FastMCPAuthIntegration
# Only register the duplication functions if the environment variable is set
ENABLE_DUPLICATION = bool(os.environ.get("META_ADS_ENABLE_DUPLICATION", ""))
if ENABLE_DUPLICATION:
@mcp_server.tool()
@meta_api_tool
async def duplicate_campaign(
campaign_id: str,
access_token: Optional[str] = None,
name_suffix: Optional[str] = " - Copy",
include_ad_sets: bool = True,
include_ads: bool = True,
include_creatives: bool = True,
copy_schedule: bool = False,
new_daily_budget: Optional[float] = None,
new_status: Optional[str] = "PAUSED"
) -> str:
"""
Duplicate a Meta Ads campaign with all its ad sets and ads.
Recommended: Use this to run robust experiments.
Args:
campaign_id: Meta Ads campaign ID to duplicate
name_suffix: Suffix to add to the duplicated campaign name
include_ad_sets: Whether to duplicate ad sets within the campaign
include_ads: Whether to duplicate ads within ad sets
include_creatives: Whether to duplicate ad creatives
copy_schedule: Whether to copy the campaign schedule
new_daily_budget: Override the daily budget for the new campaign
new_status: Status for the new campaign (ACTIVE or PAUSED)
"""
return await _forward_duplication_request(
"campaign",
campaign_id,
access_token,
{
"name_suffix": name_suffix,
"include_ad_sets": include_ad_sets,
"include_ads": include_ads,
"include_creatives": include_creatives,
"copy_schedule": copy_schedule,
"new_daily_budget": new_daily_budget,
"new_status": new_status
}
)
@mcp_server.tool()
@meta_api_tool
async def duplicate_adset(
adset_id: str,
access_token: Optional[str] = None,
target_campaign_id: Optional[str] = None,
name_suffix: Optional[str] = " - Copy",
include_ads: bool = True,
include_creatives: bool = True,
new_daily_budget: Optional[float] = None,
new_targeting: Optional[Dict[str, Any]] = None,
new_status: Optional[str] = "PAUSED"
) -> str:
"""
Duplicate a Meta Ads ad set with its ads.
Recommended: Use this to run robust experiments.
Args:
adset_id: Meta Ads ad set ID to duplicate
target_campaign_id: Campaign ID to move the duplicated ad set to (optional)
name_suffix: Suffix to add to the duplicated ad set name
include_ads: Whether to duplicate ads within the ad set
include_creatives: Whether to duplicate ad creatives
new_daily_budget: Override the daily budget for the new ad set
new_targeting: Override targeting settings for the new ad set
new_status: Status for the new ad set (ACTIVE or PAUSED)
"""
return await _forward_duplication_request(
"adset",
adset_id,
access_token,
{
"target_campaign_id": target_campaign_id,
"name_suffix": name_suffix,
"include_ads": include_ads,
"include_creatives": include_creatives,
"new_daily_budget": new_daily_budget,
"new_targeting": new_targeting,
"new_status": new_status
}
)
@mcp_server.tool()
@meta_api_tool
async def duplicate_ad(
ad_id: str,
access_token: Optional[str] = None,
target_adset_id: Optional[str] = None,
name_suffix: Optional[str] = " - Copy",
duplicate_creative: bool = True,
new_creative_name: Optional[str] = None,
new_status: Optional[str] = "PAUSED"
) -> str:
"""
Duplicate a Meta Ads ad.
Recommended: Use this to run robust experiments.
Args:
ad_id: Meta Ads ad ID to duplicate
target_adset_id: Ad set ID to move the duplicated ad to (optional)
name_suffix: Suffix to add to the duplicated ad name
duplicate_creative: Whether to duplicate the ad creative
new_creative_name: Override name for the duplicated creative
new_status: Status for the new ad (ACTIVE or PAUSED)
"""
return await _forward_duplication_request(
"ad",
ad_id,
access_token,
{
"target_adset_id": target_adset_id,
"name_suffix": name_suffix,
"duplicate_creative": duplicate_creative,
"new_creative_name": new_creative_name,
"new_status": new_status
}
)
@mcp_server.tool()
@meta_api_tool
async def duplicate_creative(
creative_id: str,
access_token: Optional[str] = None,
name_suffix: Optional[str] = " - Copy",
new_primary_text: Optional[str] = None,
new_headline: Optional[str] = None,
new_description: Optional[str] = None,
new_cta_type: Optional[str] = None,
new_destination_url: Optional[str] = None
) -> str:
"""
Duplicate a Meta Ads creative.
Recommended: Use this to run robust experiments.
Args:
creative_id: Meta Ads creative ID to duplicate
name_suffix: Suffix to add to the duplicated creative name
new_primary_text: Override the primary text for the new creative
new_headline: Override the headline for the new creative
new_description: Override the description for the new creative
new_cta_type: Override the call-to-action type for the new creative
new_destination_url: Override the destination URL for the new creative
"""
return await _forward_duplication_request(
"creative",
creative_id,
access_token,
{
"name_suffix": name_suffix,
"new_primary_text": new_primary_text,
"new_headline": new_headline,
"new_description": new_description,
"new_cta_type": new_cta_type,
"new_destination_url": new_destination_url
}
)
async def _forward_duplication_request(resource_type: str, resource_id: str, access_token: str, options: Dict[str, Any]) -> str:
"""
Forward duplication request to the cloud-hosted MCP API using dual-header authentication.
This implements the dual-header authentication pattern for MCP server callbacks:
- Authorization: Bearer <facebook_token> - Facebook access token for Meta API calls
- X-Pipeboard-Token: <pipeboard_token> - Pipeboard API token for authentication
Args:
resource_type: Type of resource to duplicate (campaign, adset, ad, creative)
resource_id: ID of the resource to duplicate
access_token: Meta API access token (optional, will use context if not provided)
options: Duplication options
"""
try:
# Get tokens from the request context that were set by the HTTP auth middleware
# In the dual-header authentication pattern:
# - Pipeboard token comes from X-Pipeboard-Token header (for authentication)
# - Facebook token comes from Authorization header (for Meta API calls)
# Get tokens from context set by AuthInjectionMiddleware
pipeboard_token = FastMCPAuthIntegration.get_pipeboard_token()
facebook_token = FastMCPAuthIntegration.get_auth_token()
# Use provided access_token parameter if no Facebook token found in context
if not facebook_token:
facebook_token = access_token if access_token else await auth.get_current_access_token()
# Validate we have both required tokens
if not pipeboard_token:
return json.dumps({
"error": "authentication_required",
"message": "Pipeboard API token not found",
"details": {
"required": "Valid Pipeboard token via X-Pipeboard-Token header",
"received_headers": "Check that the MCP server is forwarding the X-Pipeboard-Token header"
}
}, indent=2)
if not facebook_token:
return json.dumps({
"error": "authentication_required",
"message": "Meta Ads access token not found",
"details": {
"required": "Valid Meta access token from authenticated session",
"check": "Ensure Facebook account is connected and token is valid"
}
}, indent=2)
# Construct the API endpoint
base_url = "https://mcp.pipeboard.co"
endpoint = f"{base_url}/api/meta/duplicate/{resource_type}/{resource_id}"
# Prepare the dual-header authentication as per API documentation
headers = {
"Authorization": f"Bearer {facebook_token}", # Facebook token for Meta API
"X-Pipeboard-Token": pipeboard_token, # Pipeboard token for auth
"Content-Type": "application/json",
"User-Agent": "meta-ads-mcp/1.0"
}
# Remove None values from options
clean_options = {k: v for k, v in options.items() if v is not None}
# Make the request to the cloud service
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(
endpoint,
headers=headers,
json=clean_options
)
if response.status_code == 200:
result = response.json()
return json.dumps(result, indent=2)
elif response.status_code == 400:
# Validation failed
try:
error_data = response.json()
return json.dumps({
"success": False,
"error": "validation_failed",
"errors": error_data.get("errors", [response.text]),
"warnings": error_data.get("warnings", [])
}, indent=2)
except:
return json.dumps({
"success": False,
"error": "validation_failed",
"errors": [response.text],
"warnings": []
}, indent=2)
elif response.status_code == 401:
return json.dumps({
"success": False,
"error": "authentication_error",
"message": "Invalid or expired API token"
}, indent=2)
elif response.status_code == 402:
try:
error_data = response.json()
return json.dumps({
"success": False,
"error": "subscription_required",
"message": error_data.get("message", "This feature is not available in your current plan"),
"upgrade_url": error_data.get("upgrade_url", "https://pipeboard.co/upgrade"),
"suggestion": error_data.get("suggestion", "Please upgrade your account to access this feature")
}, indent=2)
except:
return json.dumps({
"success": False,
"error": "subscription_required",
"message": "This feature is not available in your current plan",
"upgrade_url": "https://pipeboard.co/upgrade",
"suggestion": "Please upgrade your account to access this feature"
}, indent=2)
elif response.status_code == 403:
try:
error_data = response.json()
# Check if this is a premium feature error
if error_data.get("error") == "premium_feature":
return json.dumps({
"success": False,
"error": "premium_feature_required",
"message": error_data.get("message", "This is a premium feature that requires subscription"),
"details": error_data.get("details", {
"upgrade_url": "https://pipeboard.co/upgrade",
"suggestion": "Please upgrade your account to access this feature"
})
}, indent=2)
else:
# Default to facebook connection required
return json.dumps({
"success": False,
"error": "facebook_connection_required",
"message": error_data.get("message", "You need to connect your Facebook account first"),
"details": error_data.get("details", {
"login_flow_url": "/connections",
"auth_flow_url": "/api/meta/auth"
})
}, indent=2)
except:
return json.dumps({
"success": False,
"error": "facebook_connection_required",
"message": "You need to connect your Facebook account first",
"details": {
"login_flow_url": "/connections",
"auth_flow_url": "/api/meta/auth"
}
}, indent=2)
elif response.status_code == 404:
return json.dumps({
"success": False,
"error": "resource_not_found",
"message": f"{resource_type.title()} not found or access denied",
"suggestion": f"Verify the {resource_type} ID and your Facebook account permissions"
}, indent=2)
elif response.status_code == 429:
return json.dumps({
"error": "rate_limit_exceeded",
"message": "Meta API rate limit exceeded",
"details": {
"suggestion": "Please wait before retrying",
"retry_after": response.headers.get("Retry-After", "60")
}
}, indent=2)
elif response.status_code == 502:
try:
error_data = response.json()
return json.dumps({
"success": False,
"error": "meta_api_error",
"message": error_data.get("message", "Facebook API error"),
"recoverable": True,
"suggestion": "Please wait 5 minutes before retrying"
}, indent=2)
except:
return json.dumps({
"success": False,
"error": "meta_api_error",
"message": "Facebook API error",
"recoverable": True,
"suggestion": "Please wait 5 minutes before retrying"
}, indent=2)
else:
error_detail = response.text
try:
error_json = response.json()
error_detail = error_json.get("message", error_detail)
except:
pass
return json.dumps({
"error": "duplication_failed",
"message": f"Failed to duplicate {resource_type}",
"details": {
"status_code": response.status_code,
"error_detail": error_detail,
"resource_type": resource_type,
"resource_id": resource_id
}
}, indent=2)
except httpx.TimeoutException:
return json.dumps({
"error": "request_timeout",
"message": "Request to duplication service timed out",
"details": {
"suggestion": "Please try again later",
"timeout": "30 seconds"
}
}, indent=2)
except httpx.RequestError as e:
return json.dumps({
"error": "network_error",
"message": "Failed to connect to duplication service",
"details": {
"error": str(e),
"suggestion": "Check your internet connection and try again"
}
}, indent=2)
except Exception as e:
return json.dumps({
"error": "unexpected_error",
"message": f"Unexpected error during {resource_type} duplication",
"details": {
"error": str(e),
"resource_type": resource_type,
"resource_id": resource_id
}
}, indent=2)
def _get_estimated_components(resource_type: str, options: Dict[str, Any]) -> Dict[str, Any]:
"""Get estimated components that would be duplicated."""
if resource_type == "campaign":
components = {"campaigns": 1}
if options.get("include_ad_sets", True):
components["ad_sets"] = "3-5 (estimated)"
if options.get("include_ads", True):
components["ads"] = "5-15 (estimated)"
if options.get("include_creatives", True):
components["creatives"] = "5-15 (estimated)"
return components
elif resource_type == "adset":
components = {"ad_sets": 1}
if options.get("include_ads", True):
components["ads"] = "2-5 (estimated)"
if options.get("include_creatives", True):
components["creatives"] = "2-5 (estimated)"
return components
elif resource_type == "ad":
components = {"ads": 1}
if options.get("duplicate_creative", True):
components["creatives"] = 1
return components
elif resource_type == "creative":
return {"creatives": 1}
return {}
```
--------------------------------------------------------------------------------
/tests/test_get_ad_image_quality_improvements.py:
--------------------------------------------------------------------------------
```python
"""Tests for get_ad_image quality improvements.
These tests verify that the get_ad_image function now correctly prioritizes
high-quality ad creative images over profile thumbnails.
Key improvements tested:
1. Prioritizes image_urls_for_viewing over thumbnail_url
2. Uses image_url as second priority
3. Uses object_story_spec.link_data.picture as third priority
4. Only uses thumbnail_url as last resort
5. Better logging to show which image source is being used
"""
import pytest
import json
from unittest.mock import AsyncMock, patch, MagicMock
from meta_ads_mcp.core.ads import get_ad_image
from meta_ads_mcp.core.utils import extract_creative_image_urls
class TestGetAdImageQualityImprovements:
"""Test cases for image quality improvements in get_ad_image function."""
@pytest.mark.asyncio
async def test_prioritizes_image_urls_for_viewing_over_thumbnail(self):
"""Test that image_urls_for_viewing is prioritized over thumbnail_url."""
# Mock responses for creative with both high-quality and thumbnail URLs
mock_ad_data = {
"account_id": "act_123456789",
"creative": {"id": "creative_123456789"}
}
mock_creative_details = {
"id": "creative_123456789",
"name": "Test Creative"
# No image_hash - triggers fallback
}
# Mock get_ad_creatives response with both URLs
mock_get_ad_creatives_response = json.dumps({
"data": [
{
"id": "creative_123456789",
"name": "Test Creative",
"status": "ACTIVE",
"thumbnail_url": "https://example.com/thumbnail_64x64.jpg", # Low quality
"image_url": "https://example.com/full_image.jpg", # Medium quality
"image_urls_for_viewing": [
"https://example.com/high_quality_image.jpg", # Highest quality
"https://example.com/alt_high_quality.jpg"
],
"object_story_spec": {
"link_data": {
"picture": "https://example.com/object_story_picture.jpg"
}
}
}
]
})
# Mock PIL Image processing
mock_pil_image = MagicMock()
mock_pil_image.mode = "RGB"
mock_pil_image.convert.return_value = mock_pil_image
mock_byte_stream = MagicMock()
mock_byte_stream.getvalue.return_value = b"fake_jpeg_data"
with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api, \
patch('meta_ads_mcp.core.ads.get_ad_creatives', new_callable=AsyncMock) as mock_get_creatives, \
patch('meta_ads_mcp.core.ads.download_image', new_callable=AsyncMock) as mock_download, \
patch('meta_ads_mcp.core.ads.PILImage.open') as mock_pil_open, \
patch('meta_ads_mcp.core.ads.io.BytesIO') as mock_bytesio:
mock_api.side_effect = [mock_ad_data, mock_creative_details]
mock_get_creatives.return_value = mock_get_ad_creatives_response
mock_download.return_value = b"fake_image_bytes"
mock_pil_open.return_value = mock_pil_image
mock_bytesio.return_value = mock_byte_stream
# This should prioritize image_urls_for_viewing[0]
result = await get_ad_image(access_token="test_token", ad_id="test_ad_id")
# Verify it used the highest quality URL
assert result is not None
mock_download.assert_called_once_with("https://example.com/high_quality_image.jpg")
@pytest.mark.asyncio
async def test_falls_back_to_image_url_when_image_urls_for_viewing_unavailable(self):
"""Test fallback to image_url when image_urls_for_viewing is not available."""
# Mock responses for creative without image_urls_for_viewing
mock_ad_data = {
"account_id": "act_123456789",
"creative": {"id": "creative_123456789"}
}
mock_creative_details = {
"id": "creative_123456789",
"name": "Test Creative"
}
# Mock get_ad_creatives response without image_urls_for_viewing
mock_get_ad_creatives_response = json.dumps({
"data": [
{
"id": "creative_123456789",
"name": "Test Creative",
"status": "ACTIVE",
"thumbnail_url": "https://example.com/thumbnail.jpg",
"image_url": "https://example.com/full_image.jpg", # Should be used
"object_story_spec": {
"link_data": {
"picture": "https://example.com/object_story_picture.jpg"
}
}
}
]
})
# Mock PIL Image processing
mock_pil_image = MagicMock()
mock_pil_image.mode = "RGB"
mock_pil_image.convert.return_value = mock_pil_image
mock_byte_stream = MagicMock()
mock_byte_stream.getvalue.return_value = b"fake_jpeg_data"
with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api, \
patch('meta_ads_mcp.core.ads.get_ad_creatives', new_callable=AsyncMock) as mock_get_creatives, \
patch('meta_ads_mcp.core.ads.download_image', new_callable=AsyncMock) as mock_download, \
patch('meta_ads_mcp.core.ads.PILImage.open') as mock_pil_open, \
patch('meta_ads_mcp.core.ads.io.BytesIO') as mock_bytesio:
mock_api.side_effect = [mock_ad_data, mock_creative_details]
mock_get_creatives.return_value = mock_get_ad_creatives_response
mock_download.return_value = b"fake_image_bytes"
mock_pil_open.return_value = mock_pil_image
mock_bytesio.return_value = mock_byte_stream
# This should fall back to image_url
result = await get_ad_image(access_token="test_token", ad_id="test_ad_id")
# Verify it used image_url
assert result is not None
mock_download.assert_called_once_with("https://example.com/full_image.jpg")
@pytest.mark.asyncio
async def test_falls_back_to_object_story_spec_picture_when_image_url_unavailable(self):
"""Test fallback to object_story_spec.link_data.picture when image_url is not available."""
# Mock responses for creative without image_url
mock_ad_data = {
"account_id": "act_123456789",
"creative": {"id": "creative_123456789"}
}
mock_creative_details = {
"id": "creative_123456789",
"name": "Test Creative"
}
# Mock get_ad_creatives response without image_url
mock_get_ad_creatives_response = json.dumps({
"data": [
{
"id": "creative_123456789",
"name": "Test Creative",
"status": "ACTIVE",
"thumbnail_url": "https://example.com/thumbnail.jpg",
"object_story_spec": {
"link_data": {
"picture": "https://example.com/object_story_picture.jpg" # Should be used
}
}
}
]
})
# Mock PIL Image processing
mock_pil_image = MagicMock()
mock_pil_image.mode = "RGB"
mock_pil_image.convert.return_value = mock_pil_image
mock_byte_stream = MagicMock()
mock_byte_stream.getvalue.return_value = b"fake_jpeg_data"
with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api, \
patch('meta_ads_mcp.core.ads.get_ad_creatives', new_callable=AsyncMock) as mock_get_creatives, \
patch('meta_ads_mcp.core.ads.download_image', new_callable=AsyncMock) as mock_download, \
patch('meta_ads_mcp.core.ads.PILImage.open') as mock_pil_open, \
patch('meta_ads_mcp.core.ads.io.BytesIO') as mock_bytesio:
mock_api.side_effect = [mock_ad_data, mock_creative_details]
mock_get_creatives.return_value = mock_get_ad_creatives_response
mock_download.return_value = b"fake_image_bytes"
mock_pil_open.return_value = mock_pil_image
mock_bytesio.return_value = mock_byte_stream
# This should fall back to object_story_spec.link_data.picture
result = await get_ad_image(access_token="test_token", ad_id="test_ad_id")
# Verify it used object_story_spec.link_data.picture
assert result is not None
mock_download.assert_called_once_with("https://example.com/object_story_picture.jpg")
@pytest.mark.asyncio
async def test_uses_thumbnail_url_only_as_last_resort(self):
"""Test that thumbnail_url is only used when no other options are available."""
# Mock responses for creative with only thumbnail_url
mock_ad_data = {
"account_id": "act_123456789",
"creative": {"id": "creative_123456789"}
}
mock_creative_details = {
"id": "creative_123456789",
"name": "Test Creative"
}
# Mock get_ad_creatives response with only thumbnail_url
mock_get_ad_creatives_response = json.dumps({
"data": [
{
"id": "creative_123456789",
"name": "Test Creative",
"status": "ACTIVE",
"thumbnail_url": "https://example.com/thumbnail_only.jpg" # Only option
}
]
})
# Mock PIL Image processing
mock_pil_image = MagicMock()
mock_pil_image.mode = "RGB"
mock_pil_image.convert.return_value = mock_pil_image
mock_byte_stream = MagicMock()
mock_byte_stream.getvalue.return_value = b"fake_jpeg_data"
with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api, \
patch('meta_ads_mcp.core.ads.get_ad_creatives', new_callable=AsyncMock) as mock_get_creatives, \
patch('meta_ads_mcp.core.ads.download_image', new_callable=AsyncMock) as mock_download, \
patch('meta_ads_mcp.core.ads.PILImage.open') as mock_pil_open, \
patch('meta_ads_mcp.core.ads.io.BytesIO') as mock_bytesio:
mock_api.side_effect = [mock_ad_data, mock_creative_details]
mock_get_creatives.return_value = mock_get_ad_creatives_response
mock_download.return_value = b"fake_image_bytes"
mock_pil_open.return_value = mock_pil_image
mock_bytesio.return_value = mock_byte_stream
# This should use thumbnail_url as last resort
result = await get_ad_image(access_token="test_token", ad_id="test_ad_id")
# Verify it used thumbnail_url
assert result is not None
mock_download.assert_called_once_with("https://example.com/thumbnail_only.jpg")
def test_extract_creative_image_urls_prioritizes_quality(self):
"""Test that extract_creative_image_urls correctly prioritizes image quality."""
# Test creative with multiple image URLs
test_creative = {
"id": "creative_123456789",
"name": "Test Creative",
"thumbnail_url": "https://example.com/thumbnail.jpg", # Lowest priority
"image_url": "https://example.com/image.jpg", # Medium priority
"image_urls_for_viewing": [
"https://example.com/high_quality_1.jpg", # Highest priority
"https://example.com/high_quality_2.jpg"
],
"object_story_spec": {
"link_data": {
"picture": "https://example.com/object_story_picture.jpg" # High priority
}
}
}
# Extract URLs
urls = extract_creative_image_urls(test_creative)
# Verify correct priority order
assert len(urls) >= 4
assert urls[0] == "https://example.com/high_quality_1.jpg" # First priority
assert urls[1] == "https://example.com/high_quality_2.jpg" # Second priority
assert "https://example.com/image.jpg" in urls # Medium priority
assert "https://example.com/object_story_picture.jpg" in urls # High priority
assert urls[-1] == "https://example.com/thumbnail.jpg" # Last priority
def test_extract_creative_image_urls_handles_missing_fields(self):
"""Test that extract_creative_image_urls handles missing fields gracefully."""
# Test creative with minimal fields
test_creative = {
"id": "creative_123456789",
"name": "Minimal Creative",
"thumbnail_url": "https://example.com/thumbnail.jpg"
}
# Extract URLs
urls = extract_creative_image_urls(test_creative)
# Should still work with only thumbnail_url
assert len(urls) == 1
assert urls[0] == "https://example.com/thumbnail.jpg"
def test_extract_creative_image_urls_removes_duplicates(self):
"""Test that extract_creative_image_urls removes duplicate URLs."""
# Test creative with duplicate URLs
test_creative = {
"id": "creative_123456789",
"name": "Duplicate URLs Creative",
"thumbnail_url": "https://example.com/same_url.jpg",
"image_url": "https://example.com/same_url.jpg", # Duplicate
"image_urls_for_viewing": [
"https://example.com/same_url.jpg", # Duplicate
"https://example.com/unique_url.jpg"
]
}
# Extract URLs
urls = extract_creative_image_urls(test_creative)
# Should remove duplicates while preserving order
assert len(urls) == 2
assert urls[0] == "https://example.com/same_url.jpg" # First occurrence
assert urls[1] == "https://example.com/unique_url.jpg"
@pytest.mark.asyncio
async def test_get_ad_image_with_real_world_example(self):
"""Test with a real-world example that mimics the actual API response structure."""
# Mock responses based on real API data
mock_ad_data = {
"account_id": "act_15975950",
"creative": {"id": "606995022142818"}
}
mock_creative_details = {
"id": "606995022142818",
"name": "Test Creative"
}
# Mock get_ad_creatives response based on real data
mock_get_ad_creatives_response = json.dumps({
"data": [
{
"id": "606995022142818",
"name": "Test Creative",
"status": "ACTIVE",
"thumbnail_url": "https://external.fbsb6-1.fna.fbcdn.net/emg1/v/t13/13476424677788553381?url=https%3A%2F%2Fwww.facebook.com%2Fads%2Fimage%2F%3Fd%3DAQLuJ5l4AROBvIUchp4g4JXxIT5uAZiAsgHQkD8Iw7BeVtkXNUUfs3leWpqQplJCJdixVIg3mq9KichJ64eRfM-r8aY4GtVQp8TvS_HBByJ8fGg_Cs7Kb8YkN4IDwJ4iQIIkMx30LycCKzuYtp9M-vOk&fb_obo=1&utld=facebook.com&stp=c0.5000x0.5000f_dst-emg0_p64x64_q75_tt6&edm=AEuWsiQEAAAA&_nc_gid=_QBCRbZxDq-i1ZiGEXxW2w&_nc_eui2=AeEbQXzmAdoqWLIXjuTDJ0xAoThZu47BlQqhOFm7jsGVCloP48Ep6Y_qIA5tcqrcSDff5f_k8xGzFIpD7PnUws8c&_nc_oc=Adn3GeYlXxbfEeY0wCBSgNdlwO80wXt5R5bgY2NozdroZ6CRSaXIaOSjVSK9S1LsqsY4GL_0dVzU80RY8QMucEkZ&ccb=13-1&oh=06_Q3-1AcBKUD0rfLGATAveIM5hMSWG9c7DsJzq2arvOl8W4Bpn&oe=688C87B2&_nc_sid=58080a",
"image_url": "https://scontent.fbsb6-1.fna.fbcdn.net/v/t45.1600-4/518574136_1116014047008737_2492837958169838537_n.png?stp=dst-jpg_tt6&_nc_cat=109&ccb=1-7&_nc_sid=890911&_nc_eui2=AeHbHqoiAUgF0QeX-tvUoDjYeTyJad_QEPF5PIlp39AQ8dP8cvOlHwiJjny8AUv7xxAlYyy5BGCqFU_oVM9CI7ln&_nc_ohc=VTTYlMOAWZoQ7kNvwGjLMW5&_nc_oc=AdnYDrpNrLovWZC_RG4tvoICGPjBNfzNJimhx-4SKW4BU2i_yzL00dX0-OiYEYokq394g8xR-1a-OuVDAm4HsSJy&_nc_zt=1&_nc_ht=scontent.fbsb6-1.fna&edm=AEuWsiQEAAAA&_nc_gid=_QBCRbZxDq-i1ZiGEXxW2w&oh=00_AfTujKmF365FnGgcokkkdWnK-vmnzQK8Icvlk0kB8SKM3g&oe=68906FC4",
"image_urls_for_viewing": [
"https://scontent.fbsb6-1.fna.fbcdn.net/v/t45.1600-4/518574136_1116014047008737_2492837958169838537_n.png?stp=dst-jpg_tt6&_nc_cat=109&ccb=1-7&_nc_sid=890911&_nc_eui2=AeHbHqoiAUgF0QeX-tvUoDjYeTyJad_QEPF5PIlp39AQ8dP8cvOlHwiJjny8AUv7xxAlYyy5BGCqFU_oVM9CI7ln&_nc_ohc=VTTYlMOAWZoQ7kNvwGjLMW5&_nc_oc=AdnYDrpNrLovWZC_RG4tvoICGPjBNfzNJimhx-4SKW4BU2i_yzL00dX0-OiYEYokq394g8xR-1a-OuVDAm4HsSJy&_nc_zt=1&_nc_ht=scontent.fbsb6-1.fna&edm=AEuWsiQEAAAA&_nc_gid=_QBCRbZxDq-i1ZiGEXxW2w&oh=00_AfTujKmF365FnGgcokkkdWnK-vmnzQK8Icvlk0kB8SKM3g&oe=68906FC4",
"https://external.fbsb6-1.fna.fbcdn.net/emg1/v/t13/13476424677788553381?url=https%3A%2F%2Fwww.facebook.com%2Fads%2Fimage%2F%3Fd%3DAQLuJ5l4AROBvIUchp4g4JXxIT5uAZiAsgHQkD8Iw7BeVtkXNUUfs3leWpqQplJCJdixVIg3mq9KichJ64eRfM-r8aY4GtVQp8TvS_HBByJ8fGg_Cs7Kb8YkN4IDwJ4iQIIkMx30LycCKzuYtp9M-vOk&fb_obo=1&utld=facebook.com&stp=c0.5000x0.5000f_dst-emg0_p64x64_q75_tt6&edm=AEuWsiQEAAAA&_nc_gid=_QBCRbZxDq-i1ZiGEXxW2w&_nc_eui2=AeEbQXzmAdoqWLIXjuTDJ0xAoThZu47BlQqhOFm7jsGVCloP48Ep6Y_qIA5tcqrcSDff5f_k8xGzFIpD7PnUws8c&_nc_oc=Adn3GeYlXxbfEeY0wCBSgNdlwO80wXt5R5bgY2NozdroZ6CRSaXIaOSjVSK9S1LsqsY4GL_0dVzU80RY8QMucEkZ&ccb=13-1&oh=06_Q3-1AcBKUD0rfLGATAveIM5hMSWG9c7DsJzq2arvOl8W4Bpn&oe=688C87B2&_nc_sid=58080a"
]
}
]
})
# Mock PIL Image processing
mock_pil_image = MagicMock()
mock_pil_image.mode = "RGB"
mock_pil_image.convert.return_value = mock_pil_image
mock_byte_stream = MagicMock()
mock_byte_stream.getvalue.return_value = b"fake_jpeg_data"
with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api, \
patch('meta_ads_mcp.core.ads.get_ad_creatives', new_callable=AsyncMock) as mock_get_creatives, \
patch('meta_ads_mcp.core.ads.download_image', new_callable=AsyncMock) as mock_download, \
patch('meta_ads_mcp.core.ads.PILImage.open') as mock_pil_open, \
patch('meta_ads_mcp.core.ads.io.BytesIO') as mock_bytesio:
mock_api.side_effect = [mock_ad_data, mock_creative_details]
mock_get_creatives.return_value = mock_get_ad_creatives_response
mock_download.return_value = b"fake_image_bytes"
mock_pil_open.return_value = mock_pil_image
mock_bytesio.return_value = mock_byte_stream
# This should use the first image_urls_for_viewing URL (high quality)
result = await get_ad_image(access_token="test_token", ad_id="test_ad_id")
# Verify it used the high-quality URL (not the thumbnail)
assert result is not None
expected_url = "https://scontent.fbsb6-1.fna.fbcdn.net/v/t45.1600-4/518574136_1116014047008737_2492837958169838537_n.png?stp=dst-jpg_tt6&_nc_cat=109&ccb=1-7&_nc_sid=890911&_nc_eui2=AeHbHqoiAUgF0QeX-tvUoDjYeTyJad_QEPF5PIlp39AQ8dP8cvOlHwiJjny8AUv7xxAlYyy5BGCqFU_oVM9CI7ln&_nc_ohc=VTTYlMOAWZoQ7kNvwGjLMW5&_nc_oc=AdnYDrpNrLovWZC_RG4tvoICGPjBNfzNJimhx-4SKW4BU2i_yzL00dX0-OiYEYokq394g8xR-1a-OuVDAm4HsSJy&_nc_zt=1&_nc_ht=scontent.fbsb6-1.fna&edm=AEuWsiQEAAAA&_nc_gid=_QBCRbZxDq-i1ZiGEXxW2w&oh=00_AfTujKmF365FnGgcokkkdWnK-vmnzQK8Icvlk0kB8SKM3g&oe=68906FC4"
mock_download.assert_called_once_with(expected_url)
```
--------------------------------------------------------------------------------
/tests/test_get_ad_image_regression.py:
--------------------------------------------------------------------------------
```python
"""Regression tests for get_ad_image function fixes.
Tests for multiple issues that were fixed:
1. JSON Parsing Error: 'TypeError: the JSON object must be str, bytes or bytearray, not dict'
- Caused by wrong parameter order and incorrect JSON parsing
- Fixed by correcting parameter order and JSON parsing logic
2. Missing Image Hash Support: "Error: No image hashes found in creative"
- Many modern creatives don't have image_hash but have direct URLs
- Fixed by adding direct URL fallback using image_urls_for_viewing and thumbnail_url
3. Image Quality Issue: Function was returning profile thumbnails instead of ad creative images
- Fixed by prioritizing image_urls_for_viewing over thumbnail_url
- Added proper fallback hierarchy: image_urls_for_viewing > image_url > object_story_spec.picture > thumbnail_url
The fixes enable get_ad_image to work with both traditional hash-based and modern URL-based creatives,
and ensure high-quality images are returned instead of thumbnails.
"""
import pytest
import json
from unittest.mock import AsyncMock, patch, MagicMock
from meta_ads_mcp.core.ads import get_ad_image
@pytest.mark.asyncio
class TestGetAdImageRegressionFix:
"""Regression test cases for the get_ad_image JSON parsing bug fix."""
async def test_get_ad_image_json_parsing_regression_fix(self):
"""Regression test: ensure get_ad_image doesn't throw JSON parsing error."""
# Mock responses for the main API flow
mock_ad_data = {
"account_id": "act_123456789",
"creative": {"id": "creative_123456789"}
}
mock_creative_details = {
"id": "creative_123456789",
"name": "Test Creative",
"image_hash": "test_hash_123"
}
mock_image_data = {
"data": [{
"hash": "test_hash_123",
"url": "https://example.com/image.jpg",
"width": 1200,
"height": 628,
"name": "test_image.jpg",
"status": "ACTIVE"
}]
}
# Mock PIL Image processing to return a valid Image object
mock_pil_image = MagicMock()
mock_pil_image.mode = "RGB"
mock_pil_image.convert.return_value = mock_pil_image
mock_byte_stream = MagicMock()
mock_byte_stream.getvalue.return_value = b"fake_jpeg_data"
with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api, \
patch('meta_ads_mcp.core.ads.download_image', new_callable=AsyncMock) as mock_download, \
patch('meta_ads_mcp.core.ads.PILImage.open') as mock_pil_open, \
patch('meta_ads_mcp.core.ads.io.BytesIO') as mock_bytesio:
mock_api.side_effect = [mock_ad_data, mock_creative_details, mock_image_data]
mock_download.return_value = b"fake_image_bytes"
mock_pil_open.return_value = mock_pil_image
mock_bytesio.return_value = mock_byte_stream
# This should NOT raise "the JSON object must be str, bytes or bytearray, not dict"
# Previously this would fail with: TypeError: the JSON object must be str, bytes or bytearray, not dict
result = await get_ad_image(access_token="test_token", ad_id="120228922871870272")
# Verify we get an Image object (success) - the exact test depends on the mocking
# The key is that we don't get the JSON parsing error
assert result is not None
# The main regression check: if we got here without an exception, the JSON parsing is fixed
# We might get different results based on mocking, but the critical JSON parsing should work
async def test_get_ad_image_fallback_path_json_parsing(self):
"""Test the fallback path that calls get_ad_creatives handles JSON parsing correctly."""
# Mock responses that trigger the fallback path (no direct image hash)
mock_ad_data = {
"account_id": "act_123456789",
"creative": {"id": "creative_123456789"}
}
mock_creative_details = {
"id": "creative_123456789",
"name": "Test Creative"
# No image_hash - this will trigger the fallback
}
# Mock get_ad_creatives response (wrapped format that caused the original bug)
mock_get_ad_creatives_response = json.dumps({
"data": json.dumps({
"data": [
{
"id": "creative_123456789",
"name": "Test Creative",
"object_story_spec": {
"link_data": {
"image_hash": "fallback_hash_123"
}
}
}
]
})
})
mock_image_data = {
"data": [{
"hash": "fallback_hash_123",
"url": "https://example.com/fallback_image.jpg",
"width": 1200,
"height": 628
}]
}
# Mock PIL Image processing
mock_pil_image = MagicMock()
mock_pil_image.mode = "RGB"
mock_pil_image.convert.return_value = mock_pil_image
mock_byte_stream = MagicMock()
mock_byte_stream.getvalue.return_value = b"fake_jpeg_data"
with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api, \
patch('meta_ads_mcp.core.ads.get_ad_creatives', new_callable=AsyncMock) as mock_get_creatives, \
patch('meta_ads_mcp.core.ads.download_image', new_callable=AsyncMock) as mock_download, \
patch('meta_ads_mcp.core.ads.PILImage.open') as mock_pil_open, \
patch('meta_ads_mcp.core.ads.io.BytesIO') as mock_bytesio:
mock_api.side_effect = [mock_ad_data, mock_creative_details, mock_image_data]
mock_get_creatives.return_value = mock_get_ad_creatives_response
mock_download.return_value = b"fake_image_bytes"
mock_pil_open.return_value = mock_pil_image
mock_bytesio.return_value = mock_byte_stream
# This should handle the wrapped JSON response correctly
# Previously would fail: TypeError: the JSON object must be str, bytes or bytearray, not dict
result = await get_ad_image(access_token="test_token", ad_id="120228922871870272")
# Verify the fallback path worked - key is no JSON parsing exception
assert result is not None
# Verify get_ad_creatives was called (fallback path was triggered)
mock_get_creatives.assert_called_once()
async def test_get_ad_image_no_ad_id(self):
"""Test get_ad_image with no ad_id provided."""
result = await get_ad_image(access_token="test_token", ad_id=None)
# Should return error string, not throw JSON parsing error
assert isinstance(result, str)
assert "Error: No ad ID provided" in result
async def test_get_ad_image_parameter_order_regression(self):
"""Regression test: ensure get_ad_creatives is called with correct parameter order."""
# This test ensures we don't regress to calling get_ad_creatives(ad_id, "", access_token)
# which was the original bug
mock_ad_data = {
"account_id": "act_123456789",
"creative": {"id": "creative_123456789"}
}
mock_creative_details = {
"id": "creative_123456789",
"name": "Test Creative"
# No image_hash to trigger fallback
}
with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api, \
patch('meta_ads_mcp.core.ads.get_ad_creatives', new_callable=AsyncMock) as mock_get_creatives:
mock_api.side_effect = [mock_ad_data, mock_creative_details]
mock_get_creatives.return_value = json.dumps({"data": json.dumps({"data": []})})
# Call get_ad_image - it should reach the fallback path
result = await get_ad_image(access_token="test_token", ad_id="test_ad_id")
# Verify get_ad_creatives was called with correct parameter names (not positional)
mock_get_creatives.assert_called_once_with(ad_id="test_ad_id", access_token="test_token")
# The key regression test: this should not have raised a JSON parsing error
async def test_get_ad_image_direct_url_fallback_with_image_urls_for_viewing(self):
"""Test direct URL fallback using image_urls_for_viewing when no image_hash found."""
# Mock responses for modern creative without image_hash
mock_ad_data = {
"account_id": "act_123456789",
"creative": {"id": "creative_123456789"}
}
mock_creative_details = {
"id": "creative_123456789",
"name": "Modern Creative"
# No image_hash - this will trigger fallback
}
# Mock get_ad_creatives response with direct URLs
mock_get_ad_creatives_response = json.dumps({
"data": [
{
"id": "creative_123456789",
"name": "Modern Creative",
"status": "ACTIVE",
"thumbnail_url": "https://example.com/thumb.jpg",
"image_urls_for_viewing": [
"https://example.com/full_image.jpg",
"https://example.com/alt_image.jpg"
]
}
]
})
# Mock PIL Image processing
mock_pil_image = MagicMock()
mock_pil_image.mode = "RGB"
mock_pil_image.convert.return_value = mock_pil_image
mock_byte_stream = MagicMock()
mock_byte_stream.getvalue.return_value = b"fake_jpeg_data"
with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api, \
patch('meta_ads_mcp.core.ads.get_ad_creatives', new_callable=AsyncMock) as mock_get_creatives, \
patch('meta_ads_mcp.core.ads.download_image', new_callable=AsyncMock) as mock_download, \
patch('meta_ads_mcp.core.ads.PILImage.open') as mock_pil_open, \
patch('meta_ads_mcp.core.ads.io.BytesIO') as mock_bytesio:
mock_api.side_effect = [mock_ad_data, mock_creative_details]
mock_get_creatives.return_value = mock_get_ad_creatives_response
mock_download.return_value = b"fake_image_bytes"
mock_pil_open.return_value = mock_pil_image
mock_bytesio.return_value = mock_byte_stream
# This should use direct URL fallback successfully
result = await get_ad_image(access_token="test_token", ad_id="test_ad_id")
# Verify it used the direct URL approach
assert result is not None
mock_get_creatives.assert_called_once()
mock_download.assert_called_once_with("https://example.com/full_image.jpg")
async def test_get_ad_image_direct_url_fallback_with_thumbnail_url_only(self):
"""Test direct URL fallback using thumbnail_url when image_urls_for_viewing not available."""
# Mock responses for creative with only thumbnail_url
mock_ad_data = {
"account_id": "act_123456789",
"creative": {"id": "creative_123456789"}
}
mock_creative_details = {
"id": "creative_123456789",
"name": "Thumbnail Only Creative"
# No image_hash
}
# Mock get_ad_creatives response with only thumbnail_url
mock_get_ad_creatives_response = json.dumps({
"data": [
{
"id": "creative_123456789",
"name": "Thumbnail Only Creative",
"status": "ACTIVE",
"thumbnail_url": "https://example.com/thumb_only.jpg"
# No image_urls_for_viewing
}
]
})
# Mock PIL Image processing
mock_pil_image = MagicMock()
mock_pil_image.mode = "RGB"
mock_pil_image.convert.return_value = mock_pil_image
mock_byte_stream = MagicMock()
mock_byte_stream.getvalue.return_value = b"fake_jpeg_data"
with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api, \
patch('meta_ads_mcp.core.ads.get_ad_creatives', new_callable=AsyncMock) as mock_get_creatives, \
patch('meta_ads_mcp.core.ads.download_image', new_callable=AsyncMock) as mock_download, \
patch('meta_ads_mcp.core.ads.PILImage.open') as mock_pil_open, \
patch('meta_ads_mcp.core.ads.io.BytesIO') as mock_bytesio:
mock_api.side_effect = [mock_ad_data, mock_creative_details]
mock_get_creatives.return_value = mock_get_ad_creatives_response
mock_download.return_value = b"fake_image_bytes"
mock_pil_open.return_value = mock_pil_image
mock_bytesio.return_value = mock_byte_stream
# This should fall back to thumbnail_url
result = await get_ad_image(access_token="test_token", ad_id="test_ad_id")
# Verify it used the thumbnail URL
assert result is not None
mock_download.assert_called_once_with("https://example.com/thumb_only.jpg")
async def test_get_ad_image_no_direct_urls_available(self):
"""Test error handling when no direct URLs are available."""
# Mock responses for creative without any URLs
mock_ad_data = {
"account_id": "act_123456789",
"creative": {"id": "creative_123456789"}
}
mock_creative_details = {
"id": "creative_123456789",
"name": "No URLs Creative"
# No image_hash
}
# Mock get_ad_creatives response without URLs
mock_get_ad_creatives_response = json.dumps({
"data": [
{
"id": "creative_123456789",
"name": "No URLs Creative",
"status": "ACTIVE"
# No thumbnail_url or image_urls_for_viewing
}
]
})
with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api, \
patch('meta_ads_mcp.core.ads.get_ad_creatives', new_callable=AsyncMock) as mock_get_creatives:
mock_api.side_effect = [mock_ad_data, mock_creative_details]
mock_get_creatives.return_value = mock_get_ad_creatives_response
# This should return appropriate error
result = await get_ad_image(access_token="test_token", ad_id="test_ad_id")
# Should get error about no URLs
assert isinstance(result, str)
assert "No image URLs found" in result
async def test_get_ad_image_direct_url_download_failure(self):
"""Test error handling when direct URL download fails."""
# Mock responses for creative with URLs but download failure
mock_ad_data = {
"account_id": "act_123456789",
"creative": {"id": "creative_123456789"}
}
mock_creative_details = {
"id": "creative_123456789",
"name": "Download Fail Creative"
}
mock_get_ad_creatives_response = json.dumps({
"data": [
{
"id": "creative_123456789",
"name": "Download Fail Creative",
"image_urls_for_viewing": ["https://example.com/broken_image.jpg"]
}
]
})
with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api, \
patch('meta_ads_mcp.core.ads.get_ad_creatives', new_callable=AsyncMock) as mock_get_creatives, \
patch('meta_ads_mcp.core.ads.download_image', new_callable=AsyncMock) as mock_download:
mock_api.side_effect = [mock_ad_data, mock_creative_details]
mock_get_creatives.return_value = mock_get_ad_creatives_response
mock_download.return_value = None # Simulate download failure
# This should return download error
result = await get_ad_image(access_token="test_token", ad_id="test_ad_id")
# Should get error about download failure
assert isinstance(result, str)
assert "Failed to download image from direct URL" in result
async def test_get_ad_image_quality_improvement_prioritizes_high_quality(self):
"""Test that the image quality improvement correctly prioritizes high-quality images over thumbnails."""
# Mock responses for creative with both high-quality and thumbnail URLs
mock_ad_data = {
"account_id": "act_123456789",
"creative": {"id": "creative_123456789"}
}
mock_creative_details = {
"id": "creative_123456789",
"name": "Quality Test Creative"
}
# Mock get_ad_creatives response with both URLs
mock_get_ad_creatives_response = json.dumps({
"data": [
{
"id": "creative_123456789",
"name": "Quality Test Creative",
"status": "ACTIVE",
"thumbnail_url": "https://example.com/thumbnail_64x64.jpg", # Low quality thumbnail
"image_url": "https://example.com/full_image.jpg", # Medium quality
"image_urls_for_viewing": [
"https://example.com/high_quality_image.jpg", # Highest quality
"https://example.com/alt_high_quality.jpg"
],
"object_story_spec": {
"link_data": {
"picture": "https://example.com/object_story_picture.jpg"
}
}
}
]
})
# Mock PIL Image processing
mock_pil_image = MagicMock()
mock_pil_image.mode = "RGB"
mock_pil_image.convert.return_value = mock_pil_image
mock_byte_stream = MagicMock()
mock_byte_stream.getvalue.return_value = b"fake_jpeg_data"
with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api, \
patch('meta_ads_mcp.core.ads.get_ad_creatives', new_callable=AsyncMock) as mock_get_creatives, \
patch('meta_ads_mcp.core.ads.download_image', new_callable=AsyncMock) as mock_download, \
patch('meta_ads_mcp.core.ads.PILImage.open') as mock_pil_open, \
patch('meta_ads_mcp.core.ads.io.BytesIO') as mock_bytesio:
mock_api.side_effect = [mock_ad_data, mock_creative_details]
mock_get_creatives.return_value = mock_get_ad_creatives_response
mock_download.return_value = b"fake_image_bytes"
mock_pil_open.return_value = mock_pil_image
mock_bytesio.return_value = mock_byte_stream
# This should prioritize image_urls_for_viewing[0] over thumbnail_url
result = await get_ad_image(access_token="test_token", ad_id="test_ad_id")
# Verify it used the highest quality URL, not the thumbnail
assert result is not None
mock_download.assert_called_once_with("https://example.com/high_quality_image.jpg")
# Verify it did NOT use the thumbnail URL
# Check that the call was made with the high-quality URL, not the thumbnail
mock_download.assert_called_once_with("https://example.com/high_quality_image.jpg")
```
--------------------------------------------------------------------------------
/tests/test_dsa_integration.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Integration test for DSA beneficiary functionality.
This test demonstrates the complete DSA beneficiary implementation working end-to-end,
including detection, parameter support, and error handling.
"""
import pytest
import json
from unittest.mock import AsyncMock, patch
from meta_ads_mcp.core.adsets import create_adset, get_adset_details
from meta_ads_mcp.core.accounts import get_account_info
class TestDSAIntegration:
"""Integration tests for DSA beneficiary functionality"""
@pytest.mark.asyncio
async def test_dsa_beneficiary_complete_workflow(self):
"""Test complete DSA beneficiary workflow from account detection to ad set creation"""
# Step 1: Get account info and detect DSA requirement
mock_account_response = {
"id": "act_701351919139047",
"name": "Test European Account",
"account_status": 1,
"business_country_code": "DE", # Germany - DSA compliant
"business_city": "Berlin",
"currency": "EUR"
}
with patch('meta_ads_mcp.core.accounts.make_api_request', new_callable=AsyncMock) as mock_account_api:
with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
mock_auth.return_value = "test_access_token"
mock_account_api.return_value = mock_account_response
# Get account info and verify DSA detection
result = await get_account_info(account_id="act_701351919139047")
# Handle new return format (dictionary instead of JSON string)
if isinstance(result, dict):
result_data = result
else:
result_data = json.loads(result)
# Verify DSA requirement is detected
assert result_data["business_country_code"] == "DE"
assert result_data["dsa_required"] == True
assert "DSA (Digital Services Act)" in result_data["dsa_compliance_note"]
# Step 2: Create ad set with DSA beneficiary
mock_adset_response = {
"id": "23842588888640185",
"name": "Test European Ad Set",
"status": "PAUSED",
"dsa_beneficiary": "Test Organization GmbH"
}
with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_adset_api:
with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
mock_auth.return_value = "test_access_token"
mock_adset_api.return_value = mock_adset_response
# Create ad set with DSA beneficiary
result = await create_adset(
account_id="act_701351919139047",
campaign_id="23842588888640184",
name="Test European Ad Set",
optimization_goal="LINK_CLICKS",
billing_event="IMPRESSIONS",
dsa_beneficiary="Test Organization GmbH"
)
result_data = json.loads(result)
# Verify successful creation with DSA beneficiary
assert result_data["id"] == "23842588888640185"
assert result_data["dsa_beneficiary"] == "Test Organization GmbH"
# Verify API call included DSA beneficiary parameter
mock_adset_api.assert_called_once()
call_args = mock_adset_api.call_args
params = call_args[0][2] # Third argument is params
assert "dsa_beneficiary" in params
assert params["dsa_beneficiary"] == "Test Organization GmbH"
@pytest.mark.asyncio
async def test_dsa_beneficiary_error_handling_integration(self):
"""Test DSA beneficiary error handling in real-world scenarios"""
# Test 1: Missing DSA beneficiary for European account
mock_error_response = {
"error": {
"message": "Enter the person or organization that benefits from ads in this ad set",
"type": "OAuthException",
"code": 100
}
}
with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
mock_auth.return_value = "test_access_token"
mock_api.side_effect = Exception(json.dumps(mock_error_response))
result = await create_adset(
account_id="act_701351919139047",
campaign_id="23842588888640184",
name="Test European Ad Set",
optimization_goal="LINK_CLICKS",
billing_event="IMPRESSIONS"
# No DSA beneficiary provided
)
result_data = json.loads(result)
# Handle response wrapped in 'data' field by meta_api_tool decorator
if "data" in result_data:
actual_data = json.loads(result_data["data"])
else:
actual_data = result_data
# Verify error is properly handled
assert "error" in actual_data
assert "benefits from ads" in actual_data["error"]
# Test 2: Permission error for DSA beneficiary
with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
mock_auth.return_value = "test_access_token"
mock_api.side_effect = Exception("Permission denied: business_management permission required")
result = await create_adset(
account_id="act_701351919139047",
campaign_id="23842588888640184",
name="Test European Ad Set",
optimization_goal="LINK_CLICKS",
billing_event="IMPRESSIONS",
dsa_beneficiary="Test Organization GmbH"
)
result_data = json.loads(result)
# Handle response wrapped in 'data' field by meta_api_tool decorator
if "data" in result_data:
actual_data = json.loads(result_data["data"])
else:
actual_data = result_data
# Verify permission error is handled
assert "error" in actual_data
assert "permission" in actual_data["error"].lower()
@pytest.mark.asyncio
async def test_dsa_beneficiary_regional_compliance_integration(self):
"""Test DSA beneficiary compliance across different regions"""
# Test 1: European account (DSA required)
mock_de_account_response = {
"id": "act_de",
"name": "German Account",
"business_country_code": "DE",
"currency": "EUR"
}
with patch('meta_ads_mcp.core.accounts.make_api_request', new_callable=AsyncMock) as mock_api:
with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
mock_auth.return_value = "test_access_token"
mock_api.return_value = mock_de_account_response
result = await get_account_info(account_id="act_de")
# Handle new return format (dictionary instead of JSON string)
if isinstance(result, dict):
result_data = result
else:
result_data = json.loads(result)
# Verify DSA requirement for German account
assert result_data["business_country_code"] == "DE"
assert result_data["dsa_required"] == True
# Test 2: US account (DSA not required)
mock_us_account_response = {
"id": "act_us",
"name": "US Account",
"business_country_code": "US",
"currency": "USD"
}
with patch('meta_ads_mcp.core.accounts.make_api_request', new_callable=AsyncMock) as mock_api:
with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
mock_auth.return_value = "test_access_token"
mock_api.return_value = mock_us_account_response
result = await get_account_info(account_id="act_us")
# Handle new return format (dictionary instead of JSON string)
if isinstance(result, dict):
result_data = result
else:
result_data = json.loads(result)
# Verify no DSA requirement for US account
assert result_data["business_country_code"] == "US"
assert result_data["dsa_required"] == False
@pytest.mark.asyncio
async def test_dsa_beneficiary_parameter_formats_integration(self):
"""Test different DSA beneficiary parameter formats in real scenarios"""
test_cases = [
"Test Organization GmbH",
"Test Organization, Inc.",
"Test Organization Ltd.",
"Test Organization AG",
"Test Organization BV",
"Test Organization SARL"
]
for beneficiary_name in test_cases:
mock_response = {
"id": "23842588888640185",
"name": "Test Ad Set",
"status": "PAUSED",
"dsa_beneficiary": beneficiary_name
}
with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
mock_auth.return_value = "test_access_token"
mock_api.return_value = mock_response
result = await create_adset(
account_id="act_701351919139047",
campaign_id="23842588888640184",
name="Test Ad Set",
optimization_goal="LINK_CLICKS",
billing_event="IMPRESSIONS",
dsa_beneficiary=beneficiary_name
)
result_data = json.loads(result)
# Verify successful creation with different formats
assert result_data["id"] == "23842588888640185"
assert result_data["dsa_beneficiary"] == beneficiary_name
@pytest.mark.asyncio
async def test_dsa_beneficiary_retrieval_integration(self):
"""Test complete workflow including retrieving DSA beneficiary information"""
# Step 1: Create ad set with DSA beneficiary
mock_create_response = {
"id": "120229746629010183",
"name": "Test Ad Set with DSA",
"status": "PAUSED"
}
with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
mock_auth.return_value = "test_access_token"
mock_api.return_value = mock_create_response
# Create ad set
result = await create_adset(
account_id="act_701351919139047",
campaign_id="120229656904980183",
name="Test Ad Set with DSA",
optimization_goal="LINK_CLICKS",
billing_event="IMPRESSIONS",
dsa_beneficiary="Test Organization Inc"
)
result_data = json.loads(result)
adset_id = result_data["id"]
# Verify creation was successful
assert adset_id == "120229746629010183"
# Step 2: Retrieve ad set details including DSA beneficiary
mock_details_response = {
"id": "120229746629010183",
"name": "Test Ad Set with DSA",
"campaign_id": "120229656904980183",
"status": "PAUSED",
"daily_budget": "1000",
"targeting": {
"geo_locations": {"countries": ["US"]},
"age_min": 25,
"age_max": 65
},
"bid_amount": 200,
"optimization_goal": "LINK_CLICKS",
"billing_event": "IMPRESSIONS",
"dsa_beneficiary": "Test Organization Inc"
}
with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
mock_auth.return_value = "test_access_token"
mock_api.return_value = mock_details_response
# Retrieve ad set details
result = await get_adset_details(adset_id=adset_id)
result_data = json.loads(result)
# Verify DSA beneficiary field is retrieved correctly
assert result_data["id"] == "120229746629010183"
assert "dsa_beneficiary" in result_data
assert result_data["dsa_beneficiary"] == "Test Organization Inc"
# Verify API call included dsa_beneficiary in fields
mock_api.assert_called_once()
call_args = mock_api.call_args
assert "dsa_beneficiary" in str(call_args)
@pytest.mark.asyncio
async def test_dsa_beneficiary_us_account_integration(self):
"""Test DSA beneficiary behavior for US accounts (optional parameter)"""
# Step 1: Verify US account doesn't require DSA
mock_us_account_response = {
"id": "act_701351919139047",
"name": "US Business Account",
"business_country_code": "US",
"account_status": 1
}
with patch('meta_ads_mcp.core.accounts.make_api_request', new_callable=AsyncMock) as mock_api:
with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
mock_auth.return_value = "test_access_token"
mock_api.return_value = mock_us_account_response
result = await get_account_info(account_id="act_701351919139047")
result_data = json.loads(result)
# Verify US account doesn't require DSA
assert result_data["business_country_code"] == "US"
assert result_data["dsa_required"] == False
# Step 2: Create ad set without DSA beneficiary (should work for US)
mock_create_response = {
"id": "120229746624860183",
"name": "Test US Ad Set",
"status": "PAUSED"
}
with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
mock_auth.return_value = "test_access_token"
mock_api.return_value = mock_create_response
result = await create_adset(
account_id="act_701351919139047",
campaign_id="120229656904980183",
name="Test US Ad Set",
optimization_goal="LINK_CLICKS",
billing_event="IMPRESSIONS"
# No DSA beneficiary provided
)
result_data = json.loads(result)
# Verify creation was successful without DSA beneficiary
assert result_data["id"] == "120229746624860183"
# Step 3: Retrieve ad set details (should not have dsa_beneficiary field)
mock_details_response = {
"id": "120229746624860183",
"name": "Test US Ad Set",
"campaign_id": "120229656904980183",
"status": "PAUSED",
"daily_budget": "1000",
"targeting": {
"geo_locations": {"countries": ["US"]}
},
"bid_amount": 200,
"optimization_goal": "LINK_CLICKS",
"billing_event": "IMPRESSIONS"
# No dsa_beneficiary field
}
with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
mock_auth.return_value = "test_access_token"
mock_api.return_value = mock_details_response
result = await get_adset_details(adset_id="120229746624860183")
result_data = json.loads(result)
# Verify ad set details are retrieved correctly
assert result_data["id"] == "120229746624860183"
assert "dsa_beneficiary" not in result_data # Should not be present for US accounts
@pytest.mark.asyncio
async def test_account_info_requires_account_id(self):
"""Test that get_account_info requires an account_id parameter"""
with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
mock_auth.return_value = "test_access_token"
# Test without account_id parameter
result = await get_account_info(account_id=None)
# Handle new return format (dictionary instead of JSON string)
if isinstance(result, dict):
result_data = result
else:
result_data = json.loads(result)
# Verify error message for missing account_id
assert "error" in result_data
assert "Account ID is required" in result_data["error"]["message"]
assert "Please specify an account_id parameter" in result_data["error"]["details"]
assert "example" in result_data["error"]
@pytest.mark.asyncio
async def test_account_info_inaccessible_account_error(self):
"""Test that get_account_info provides helpful error for inaccessible accounts"""
# Mock permission error for direct account access (first API call)
mock_permission_error = {
"error": {
"message": "Insufficient access privileges",
"type": "OAuthException",
"code": 200
}
}
# Mock accessible accounts response (second API call)
mock_accessible_accounts = {
"data": [
{"id": "act_123", "name": "Test Account 1"},
{"id": "act_456", "name": "Test Account 2"}
]
}
with patch('meta_ads_mcp.core.accounts.make_api_request', new_callable=AsyncMock) as mock_api:
with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
mock_auth.return_value = "test_access_token"
# First call returns permission error, second call returns accessible accounts
mock_api.side_effect = [mock_permission_error, mock_accessible_accounts]
result = await get_account_info(account_id="act_inaccessible")
# Handle new return format (dictionary instead of JSON string)
if isinstance(result, dict):
result_data = result
else:
result_data = json.loads(result)
# Verify helpful error message for inaccessible account
assert "error" in result_data
assert "not accessible to your user account" in result_data["error"]["message"]
assert "accessible_accounts" in result_data["error"]
assert "suggestion" in result_data["error"]
assert len(result_data["error"]["accessible_accounts"]) == 2
```
--------------------------------------------------------------------------------
/tests/test_mobile_app_adset_creation.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Unit Tests for Mobile App Adset Creation Functionality
This test suite validates the mobile app parameters implementation for the
create_adset function in meta_ads_mcp/core/adsets.py.
Test cases cover:
- Mobile app adset creation success scenarios
- promoted_object parameter validation and formatting
- destination_type parameter validation
- Mobile app specific error handling
- Cross-platform mobile app support (iOS, Android)
- Integration with APP_INSTALLS optimization goal
Usage:
uv run python -m pytest tests/test_mobile_app_adset_creation.py -v
Related to Issue #008: Missing Mobile App Parameters in create_adset Function
"""
import pytest
import json
import asyncio
from unittest.mock import AsyncMock, patch, MagicMock
from typing import Dict, Any, List
# Import the function to test
from meta_ads_mcp.core.adsets import create_adset
class TestMobileAppAdsetCreation:
"""Test suite for mobile app adset creation functionality"""
@pytest.fixture
def mock_api_request(self):
"""Mock for the make_api_request function"""
with patch('meta_ads_mcp.core.adsets.make_api_request') as mock:
mock.return_value = {
"id": "test_mobile_adset_id",
"name": "Test Mobile App Adset",
"optimization_goal": "APP_INSTALLS",
"promoted_object": {
"application_id": "123456789012345",
"object_store_url": "https://apps.apple.com/app/id123456789"
},
"destination_type": "APP_STORE"
}
yield mock
@pytest.fixture
def mock_auth_manager(self):
"""Mock for the authentication manager"""
with patch('meta_ads_mcp.core.api.auth_manager') as mock, \
patch('meta_ads_mcp.core.auth.get_current_access_token') as mock_get_token:
# Mock a valid access token
mock.get_current_access_token.return_value = "test_access_token"
mock.is_token_valid.return_value = True
mock.app_id = "test_app_id"
mock_get_token.return_value = "test_access_token"
yield mock
@pytest.fixture
def valid_mobile_app_params(self):
"""Valid mobile app parameters for testing"""
return {
"account_id": "act_123456789",
"campaign_id": "campaign_123456789",
"name": "Test Mobile App Adset",
"optimization_goal": "APP_INSTALLS",
"billing_event": "IMPRESSIONS",
"targeting": {
"age_min": 18,
"age_max": 65,
"app_install_state": "not_installed",
"geo_locations": {"countries": ["US"]},
"user_device": ["Android_Smartphone", "iPhone"],
"user_os": ["Android", "iOS"]
}
}
@pytest.fixture
def ios_promoted_object(self):
"""Valid iOS app promoted object"""
return {
"application_id": "123456789012345",
"object_store_url": "https://apps.apple.com/app/id123456789",
"custom_event_type": "APP_INSTALL"
}
@pytest.fixture
def android_promoted_object(self):
"""Valid Android app promoted object"""
return {
"application_id": "987654321098765",
"object_store_url": "https://play.google.com/store/apps/details?id=com.example.app",
"custom_event_type": "APP_INSTALL"
}
@pytest.fixture
def promoted_object_with_pixel(self):
"""Promoted object with Facebook pixel for tracking"""
return {
"application_id": "123456789012345",
"object_store_url": "https://apps.apple.com/app/id123456789",
"custom_event_type": "APP_INSTALL",
"pixel_id": "pixel_123456789"
}
# Test: Mobile App Adset Creation Success
@pytest.mark.asyncio
async def test_mobile_app_adset_creation_success_ios(
self, mock_api_request, mock_auth_manager, valid_mobile_app_params, ios_promoted_object
):
"""Test successful iOS mobile app adset creation"""
result = await create_adset(
**valid_mobile_app_params,
promoted_object=ios_promoted_object,
destination_type="APP_STORE"
)
# Parse the result
result_data = json.loads(result)
# Verify the API was called with correct parameters
mock_api_request.assert_called_once()
call_args = mock_api_request.call_args
# Check endpoint (first argument)
assert call_args[0][0] == f"{valid_mobile_app_params['account_id']}/adsets"
# Check parameters (third argument)
params = call_args[0][2]
assert 'promoted_object' in params
assert 'destination_type' in params
# Verify promoted_object is properly JSON-encoded
promoted_obj_param = json.loads(params['promoted_object']) if isinstance(params['promoted_object'], str) else params['promoted_object']
assert promoted_obj_param['application_id'] == ios_promoted_object['application_id']
assert promoted_obj_param['object_store_url'] == ios_promoted_object['object_store_url']
# Verify destination_type
assert params['destination_type'] == "APP_STORE"
# Verify response structure
assert 'id' in result_data
assert result_data['optimization_goal'] == "APP_INSTALLS"
@pytest.mark.asyncio
async def test_mobile_app_adset_creation_success_android(
self, mock_api_request, mock_auth_manager, valid_mobile_app_params, android_promoted_object
):
"""Test successful Android mobile app adset creation"""
result = await create_adset(
**valid_mobile_app_params,
promoted_object=android_promoted_object,
destination_type="APP_STORE"
)
# Parse the result
result_data = json.loads(result)
# Verify the API was called
mock_api_request.assert_called_once()
call_args = mock_api_request.call_args
params = call_args[0][2]
# Verify Android-specific promoted_object
promoted_obj_param = json.loads(params['promoted_object']) if isinstance(params['promoted_object'], str) else params['promoted_object']
assert promoted_obj_param['application_id'] == android_promoted_object['application_id']
assert "play.google.com" in promoted_obj_param['object_store_url']
# Verify response
assert 'id' in result_data
@pytest.mark.asyncio
async def test_mobile_app_adset_with_pixel_tracking(
self, mock_api_request, mock_auth_manager, valid_mobile_app_params, promoted_object_with_pixel
):
"""Test mobile app adset creation with Facebook pixel tracking"""
result = await create_adset(
**valid_mobile_app_params,
promoted_object=promoted_object_with_pixel,
destination_type="APP_STORE"
)
# Verify pixel_id is included
call_args = mock_api_request.call_args
params = call_args[0][2]
promoted_obj_param = json.loads(params['promoted_object']) if isinstance(params['promoted_object'], str) else params['promoted_object']
assert 'pixel_id' in promoted_obj_param
assert promoted_obj_param['pixel_id'] == promoted_object_with_pixel['pixel_id']
# Test: Parameter Validation
@pytest.mark.asyncio
async def test_invalid_promoted_object_missing_application_id(
self, mock_api_request, mock_auth_manager, valid_mobile_app_params
):
"""Test validation error for promoted_object missing application_id"""
invalid_promoted_object = {
"object_store_url": "https://apps.apple.com/app/id123456789",
"custom_event_type": "APP_INSTALL"
}
result = await create_adset(
**valid_mobile_app_params,
promoted_object=invalid_promoted_object,
destination_type="APP_STORE"
)
result_data = json.loads(result)
# Should return validation error - check for data wrapper format
if "data" in result_data:
error_data = json.loads(result_data["data"])
assert 'error' in error_data
assert 'application_id' in error_data['error'].lower()
else:
assert 'error' in result_data
assert 'application_id' in result_data['error'].lower()
@pytest.mark.asyncio
async def test_invalid_promoted_object_missing_store_url(
self, mock_api_request, mock_auth_manager, valid_mobile_app_params
):
"""Test validation error for promoted_object missing object_store_url"""
invalid_promoted_object = {
"application_id": "123456789012345",
"custom_event_type": "APP_INSTALL"
}
result = await create_adset(
**valid_mobile_app_params,
promoted_object=invalid_promoted_object,
destination_type="APP_STORE"
)
result_data = json.loads(result)
# Should return validation error - check for data wrapper format
if "data" in result_data:
error_data = json.loads(result_data["data"])
assert 'error' in error_data
assert 'object_store_url' in error_data['error'].lower()
else:
assert 'error' in result_data
assert 'object_store_url' in result_data['error'].lower()
@pytest.mark.asyncio
async def test_invalid_destination_type(
self, mock_api_request, mock_auth_manager, valid_mobile_app_params, ios_promoted_object
):
"""Test validation error for invalid destination_type value"""
result = await create_adset(
**valid_mobile_app_params,
promoted_object=ios_promoted_object,
destination_type="INVALID_TYPE"
)
result_data = json.loads(result)
# Should return validation error - check for data wrapper format
if "data" in result_data:
error_data = json.loads(result_data["data"])
assert 'error' in error_data
assert 'destination_type' in error_data['error'].lower()
else:
assert 'error' in result_data
assert 'destination_type' in result_data['error'].lower()
@pytest.mark.asyncio
async def test_app_installs_requires_promoted_object(
self, mock_api_request, mock_auth_manager, valid_mobile_app_params
):
"""Test that APP_INSTALLS optimization goal requires promoted_object"""
result = await create_adset(
**valid_mobile_app_params,
# Missing promoted_object
destination_type="APP_STORE"
)
result_data = json.loads(result)
# Should return validation error - check for data wrapper format
if "data" in result_data:
error_data = json.loads(result_data["data"])
assert 'error' in error_data
assert 'promoted_object' in error_data['error'].lower()
assert 'app_installs' in error_data['error'].lower()
else:
assert 'error' in result_data
assert 'promoted_object' in result_data['error'].lower()
assert 'app_installs' in result_data['error'].lower()
# Test: Cross-platform Support
@pytest.mark.asyncio
async def test_ios_app_store_url_validation(
self, mock_api_request, mock_auth_manager, valid_mobile_app_params
):
"""Test iOS App Store URL format validation"""
ios_promoted_object = {
"application_id": "123456789012345",
"object_store_url": "https://apps.apple.com/app/id123456789",
"custom_event_type": "APP_INSTALL"
}
result = await create_adset(
**valid_mobile_app_params,
promoted_object=ios_promoted_object,
destination_type="APP_STORE"
)
result_data = json.loads(result)
# Should succeed for valid iOS URL
assert 'error' not in result_data or result_data.get('error') is None
@pytest.mark.asyncio
async def test_google_play_url_validation(
self, mock_api_request, mock_auth_manager, valid_mobile_app_params
):
"""Test Google Play Store URL format validation"""
android_promoted_object = {
"application_id": "987654321098765",
"object_store_url": "https://play.google.com/store/apps/details?id=com.example.app",
"custom_event_type": "APP_INSTALL"
}
result = await create_adset(
**valid_mobile_app_params,
promoted_object=android_promoted_object,
destination_type="APP_STORE"
)
result_data = json.loads(result)
# Should succeed for valid Google Play URL
assert 'error' not in result_data or result_data.get('error') is None
@pytest.mark.asyncio
async def test_invalid_store_url_format(
self, mock_api_request, mock_auth_manager, valid_mobile_app_params
):
"""Test validation error for invalid app store URL format"""
invalid_promoted_object = {
"application_id": "123456789012345",
"object_store_url": "https://example.com/invalid-url",
"custom_event_type": "APP_INSTALL"
}
result = await create_adset(
**valid_mobile_app_params,
promoted_object=invalid_promoted_object,
destination_type="APP_STORE"
)
result_data = json.loads(result)
# Should return validation error for invalid URL - check for data wrapper format
if "data" in result_data:
error_data = json.loads(result_data["data"])
assert 'error' in error_data
assert 'store url' in error_data['error'].lower() or 'object_store_url' in error_data['error'].lower()
else:
assert 'error' in result_data
assert 'store url' in result_data['error'].lower() or 'object_store_url' in result_data['error'].lower()
# Test: Destination Type Variations
@pytest.mark.asyncio
async def test_deeplink_destination_type(
self, mock_api_request, mock_auth_manager, valid_mobile_app_params, ios_promoted_object
):
"""Test DEEPLINK destination_type"""
result = await create_adset(
**valid_mobile_app_params,
promoted_object=ios_promoted_object,
destination_type="DEEPLINK"
)
call_args = mock_api_request.call_args
params = call_args[0][2]
assert params['destination_type'] == "DEEPLINK"
@pytest.mark.asyncio
async def test_app_install_destination_type(
self, mock_api_request, mock_auth_manager, valid_mobile_app_params, ios_promoted_object
):
"""Test APP_INSTALL destination_type"""
result = await create_adset(
**valid_mobile_app_params,
promoted_object=ios_promoted_object,
destination_type="APP_INSTALL"
)
call_args = mock_api_request.call_args
params = call_args[0][2]
assert params['destination_type'] == "APP_INSTALL"
@pytest.mark.asyncio
async def test_on_ad_destination_type_for_lead_generation(
self, mock_api_request, mock_auth_manager, valid_mobile_app_params
):
"""Test ON_AD destination_type for lead generation campaigns (Issue #009 fix)"""
# Create a lead generation adset configuration (without promoted_object since it's for lead gen, not mobile apps)
lead_gen_params = valid_mobile_app_params.copy()
lead_gen_params.update({
"optimization_goal": "LEAD_GENERATION",
"billing_event": "IMPRESSIONS"
})
result = await create_adset(
**lead_gen_params,
destination_type="ON_AD"
)
# Should pass validation and include destination_type in API call
call_args = mock_api_request.call_args
params = call_args[0][2]
assert params['destination_type'] == "ON_AD"
@pytest.mark.asyncio
async def test_on_ad_validation_passes(
self, mock_api_request, mock_auth_manager, valid_mobile_app_params
):
"""Test that ON_AD destination_type passes validation (Issue #009 regression test)"""
# Use parameters that work with ON_AD (lead generation, not mobile app)
lead_gen_params = valid_mobile_app_params.copy()
lead_gen_params.update({
"optimization_goal": "LEAD_GENERATION",
"billing_event": "IMPRESSIONS"
})
result = await create_adset(
**lead_gen_params,
destination_type="ON_AD"
)
result_data = json.loads(result)
# Should NOT return a validation error about destination_type
# Before the fix, this would return: "Invalid destination_type: ON_AD"
if "data" in result_data:
error_data = json.loads(result_data["data"])
assert "error" not in error_data or "destination_type" not in error_data.get("error", "").lower()
else:
assert "error" not in result_data or "destination_type" not in result_data.get("error", "").lower()
# Test: Error Handling
@pytest.mark.asyncio
async def test_meta_api_error_handling(
self, mock_auth_manager, valid_mobile_app_params, ios_promoted_object
):
"""Test handling of Meta API errors for mobile app adsets"""
with patch('meta_ads_mcp.core.adsets.make_api_request') as mock_api:
# Mock Meta API error response
mock_api.side_effect = Exception("HTTP Error: 400 - Select a dataset and conversion event for your ad set")
result = await create_adset(
**valid_mobile_app_params,
promoted_object=ios_promoted_object,
destination_type="APP_STORE"
)
result_data = json.loads(result)
# Should handle the error gracefully - check for data wrapper format
if "data" in result_data:
error_data = json.loads(result_data["data"])
assert 'error' in error_data
# Check for error text in either error message or details
error_text = error_data.get('error', '').lower()
details_text = error_data.get('details', '').lower()
assert 'dataset' in error_text or 'conversion event' in error_text or \
'dataset' in details_text or 'conversion event' in details_text
else:
assert 'error' in result_data
error_text = result_data.get('error', '').lower()
details_text = result_data.get('details', '').lower()
assert 'dataset' in error_text or 'conversion event' in error_text or \
'dataset' in details_text or 'conversion event' in details_text
# Test: Backward Compatibility
@pytest.mark.asyncio
async def test_backward_compatibility_non_mobile_campaigns(
self, mock_api_request, mock_auth_manager
):
"""Test that non-mobile campaigns still work without mobile app parameters"""
non_mobile_params = {
"account_id": "act_123456789",
"campaign_id": "campaign_123456789",
"name": "Test Web Adset",
"optimization_goal": "LINK_CLICKS",
"billing_event": "LINK_CLICKS",
"targeting": {
"age_min": 18,
"age_max": 65,
"geo_locations": {"countries": ["US"]}
}
}
result = await create_adset(**non_mobile_params)
# Should work without mobile app parameters
mock_api_request.assert_called_once()
call_args = mock_api_request.call_args
params = call_args[0][2]
# Should not include mobile app parameters
assert 'promoted_object' not in params
assert 'destination_type' not in params
@pytest.mark.asyncio
async def test_optional_mobile_parameters(
self, mock_api_request, mock_auth_manager, valid_mobile_app_params, ios_promoted_object
):
"""Test that mobile app parameters are optional for non-APP_INSTALLS campaigns"""
non_app_install_params = valid_mobile_app_params.copy()
non_app_install_params['optimization_goal'] = "REACH"
result = await create_adset(
**non_app_install_params,
# Mobile app parameters should be optional for non-APP_INSTALLS
promoted_object=ios_promoted_object,
destination_type="APP_STORE"
)
# Should work and include mobile parameters if provided
mock_api_request.assert_called_once()
call_args = mock_api_request.call_args
params = call_args[0][2]
# Mobile parameters should be included if provided
assert 'promoted_object' in params
assert 'destination_type' in params
```
--------------------------------------------------------------------------------
/tests/test_targeting_search_e2e.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
End-to-End Targeting Search Test for Meta Ads MCP
This test validates that the targeting search tools correctly find and return
targeting options (interests, behaviors, demographics, geo locations) from the
Meta Ads API through a pre-authenticated MCP server.
Test functions:
- search_interests
- get_interest_suggestions
- validate_interests
- search_behaviors
- search_demographics
- search_geo_locations
"""
import requests
import json
import os
import sys
from typing import Dict, Any, List
# Load environment variables from .env file
try:
from dotenv import load_dotenv
load_dotenv()
print("✅ Loaded environment variables from .env file")
except ImportError:
print("⚠️ python-dotenv not installed, using system environment variables only")
class TargetingSearchTester:
"""Test suite focused on targeting search functionality"""
def __init__(self, base_url: str = "http://localhost:8080"):
self.base_url = base_url.rstrip('/')
self.endpoint = f"{self.base_url}/mcp/"
self.request_id = 1
# Test data for validation
self.test_queries = {
"interests": ["baseball", "cooking", "travel"],
"geo_locations": ["New York", "California", "Japan"],
"interest_suggestions": ["Basketball", "Soccer"],
"demographics": ["life_events", "industries", "family_statuses"]
}
def _make_request(self, method: str, params: Dict[str, Any] = None,
headers: Dict[str, str] = None) -> Dict[str, Any]:
"""Make a JSON-RPC request to the MCP server"""
default_headers = {
"Content-Type": "application/json",
"Accept": "application/json, text/event-stream",
"User-Agent": "Targeting-Search-Test-Client/1.0"
}
if headers:
default_headers.update(headers)
payload = {
"jsonrpc": "2.0",
"method": method,
"id": self.request_id
}
if params:
payload["params"] = params
try:
response = requests.post(
self.endpoint,
headers=default_headers,
json=payload,
timeout=15
)
self.request_id += 1
return {
"status_code": response.status_code,
"headers": dict(response.headers),
"json": response.json() if response.status_code == 200 else None,
"text": response.text,
"success": response.status_code == 200
}
except requests.exceptions.RequestException as e:
return {
"status_code": 0,
"headers": {},
"json": None,
"text": str(e),
"success": False,
"error": str(e)
}
def test_search_interests(self) -> Dict[str, Any]:
"""Test search_interests functionality"""
print(f"\n🔍 Testing search_interests function")
results = {}
for query in self.test_queries["interests"]:
print(f" 🔎 Searching for interests: '{query}'")
result = self._make_request("tools/call", {
"name": "search_interests",
"arguments": {
"query": query,
"limit": 5
}
})
if not result["success"]:
results[query] = {
"success": False,
"error": result.get("text", "Unknown error")
}
print(f" ❌ Failed: {result.get('text', 'Unknown error')}")
continue
# Parse response
response_data = result["json"]["result"]
content = response_data.get("content", [{}])[0].get("text", "")
try:
parsed_content = json.loads(content)
if "error" in parsed_content:
results[query] = {
"success": False,
"error": parsed_content["error"]
}
print(f" ❌ API Error: {parsed_content['error']}")
continue
interests = parsed_content.get("data", [])
results[query] = {
"success": True,
"count": len(interests),
"interests": interests[:3], # Keep first 3 for display
"has_required_fields": all(
"id" in interest and "name" in interest
for interest in interests
)
}
print(f" ✅ Found {len(interests)} interests")
for interest in interests[:3]:
print(f" • {interest.get('name', 'N/A')} (ID: {interest.get('id', 'N/A')})")
except json.JSONDecodeError:
results[query] = {
"success": False,
"error": "Invalid JSON response",
"raw_content": content
}
print(f" ❌ Invalid JSON: {content}")
return results
def test_get_interest_suggestions(self) -> Dict[str, Any]:
"""Test get_interest_suggestions functionality"""
print(f"\n🔍 Testing get_interest_suggestions function")
interest_list = self.test_queries["interest_suggestions"]
print(f" 🔎 Getting suggestions for: {interest_list}")
result = self._make_request("tools/call", {
"name": "get_interest_suggestions",
"arguments": {
"interest_list": interest_list,
"limit": 5
}
})
if not result["success"]:
return {
"success": False,
"error": result.get("text", "Unknown error")
}
# Parse response
response_data = result["json"]["result"]
content = response_data.get("content", [{}])[0].get("text", "")
try:
parsed_content = json.loads(content)
if "error" in parsed_content:
return {
"success": False,
"error": parsed_content["error"]
}
suggestions = parsed_content.get("data", [])
result_data = {
"success": True,
"count": len(suggestions),
"suggestions": suggestions[:3], # Keep first 3 for display
"has_required_fields": all(
"id" in suggestion and "name" in suggestion
for suggestion in suggestions
)
}
print(f" ✅ Found {len(suggestions)} suggestions")
for suggestion in suggestions[:3]:
print(f" • {suggestion.get('name', 'N/A')} (ID: {suggestion.get('id', 'N/A')})")
return result_data
except json.JSONDecodeError:
return {
"success": False,
"error": "Invalid JSON response",
"raw_content": content
}
def test_validate_interests(self) -> Dict[str, Any]:
"""Test validate_interests functionality"""
print(f"\n🔍 Testing validate_interests function")
# Test with known valid and invalid interest names
test_interests = ["Japan", "Basketball", "invalidinterestname12345"]
print(f" 🔎 Validating interests: {test_interests}")
result = self._make_request("tools/call", {
"name": "validate_interests",
"arguments": {
"interest_list": test_interests
}
})
if not result["success"]:
return {
"success": False,
"error": result.get("text", "Unknown error")
}
# Parse response
response_data = result["json"]["result"]
content = response_data.get("content", [{}])[0].get("text", "")
try:
parsed_content = json.loads(content)
if "error" in parsed_content:
return {
"success": False,
"error": parsed_content["error"]
}
validations = parsed_content.get("data", [])
result_data = {
"success": True,
"count": len(validations),
"validations": validations,
"has_valid_interests": any(
validation.get("valid", False) for validation in validations
),
"has_invalid_interests": any(
not validation.get("valid", True) for validation in validations
)
}
print(f" ✅ Validated {len(validations)} interests")
for validation in validations:
status = "✅" if validation.get("valid") else "❌"
print(f" {status} {validation.get('name', 'N/A')}")
return result_data
except json.JSONDecodeError:
return {
"success": False,
"error": "Invalid JSON response",
"raw_content": content
}
def test_search_behaviors(self) -> Dict[str, Any]:
"""Test search_behaviors functionality"""
print(f"\n🔍 Testing search_behaviors function")
result = self._make_request("tools/call", {
"name": "search_behaviors",
"arguments": {
"limit": 5
}
})
if not result["success"]:
return {
"success": False,
"error": result.get("text", "Unknown error")
}
# Parse response
response_data = result["json"]["result"]
content = response_data.get("content", [{}])[0].get("text", "")
try:
parsed_content = json.loads(content)
if "error" in parsed_content:
return {
"success": False,
"error": parsed_content["error"]
}
behaviors = parsed_content.get("data", [])
result_data = {
"success": True,
"count": len(behaviors),
"behaviors": behaviors[:3], # Keep first 3 for display
"has_required_fields": all(
"id" in behavior and "name" in behavior
for behavior in behaviors
)
}
print(f" ✅ Found {len(behaviors)} behaviors")
for behavior in behaviors[:3]:
print(f" • {behavior.get('name', 'N/A')} (ID: {behavior.get('id', 'N/A')})")
return result_data
except json.JSONDecodeError:
return {
"success": False,
"error": "Invalid JSON response",
"raw_content": content
}
def test_search_demographics(self) -> Dict[str, Any]:
"""Test search_demographics functionality"""
print(f"\n🔍 Testing search_demographics function")
results = {}
for demo_class in self.test_queries["demographics"]:
print(f" 🔎 Searching demographics class: '{demo_class}'")
result = self._make_request("tools/call", {
"name": "search_demographics",
"arguments": {
"demographic_class": demo_class,
"limit": 3
}
})
if not result["success"]:
results[demo_class] = {
"success": False,
"error": result.get("text", "Unknown error")
}
print(f" ❌ Failed: {result.get('text', 'Unknown error')}")
continue
# Parse response
response_data = result["json"]["result"]
content = response_data.get("content", [{}])[0].get("text", "")
try:
parsed_content = json.loads(content)
if "error" in parsed_content:
results[demo_class] = {
"success": False,
"error": parsed_content["error"]
}
print(f" ❌ API Error: {parsed_content['error']}")
continue
demographics = parsed_content.get("data", [])
results[demo_class] = {
"success": True,
"count": len(demographics),
"demographics": demographics[:2], # Keep first 2 for display
"has_required_fields": all(
"id" in demo and "name" in demo
for demo in demographics
)
}
print(f" ✅ Found {len(demographics)} {demo_class}")
for demo in demographics[:2]:
print(f" • {demo.get('name', 'N/A')} (ID: {demo.get('id', 'N/A')})")
except json.JSONDecodeError:
results[demo_class] = {
"success": False,
"error": "Invalid JSON response",
"raw_content": content
}
print(f" ❌ Invalid JSON: {content}")
return results
def test_search_geo_locations(self) -> Dict[str, Any]:
"""Test search_geo_locations functionality"""
print(f"\n🔍 Testing search_geo_locations function")
results = {}
for query in self.test_queries["geo_locations"]:
print(f" 🔎 Searching for locations: '{query}'")
result = self._make_request("tools/call", {
"name": "search_geo_locations",
"arguments": {
"query": query,
"location_types": ["country", "region", "city"],
"limit": 3
}
})
if not result["success"]:
results[query] = {
"success": False,
"error": result.get("text", "Unknown error")
}
print(f" ❌ Failed: {result.get('text', 'Unknown error')}")
continue
# Parse response
response_data = result["json"]["result"]
content = response_data.get("content", [{}])[0].get("text", "")
try:
parsed_content = json.loads(content)
if "error" in parsed_content:
results[query] = {
"success": False,
"error": parsed_content["error"]
}
print(f" ❌ API Error: {parsed_content['error']}")
continue
locations = parsed_content.get("data", [])
results[query] = {
"success": True,
"count": len(locations),
"locations": locations[:3], # Keep first 3 for display
"has_required_fields": all(
"key" in location and "name" in location and "type" in location
for location in locations
)
}
print(f" ✅ Found {len(locations)} locations")
for location in locations[:3]:
print(f" • {location.get('name', 'N/A')} ({location.get('type', 'N/A')}, Key: {location.get('key', 'N/A')})")
except json.JSONDecodeError:
results[query] = {
"success": False,
"error": "Invalid JSON response",
"raw_content": content
}
print(f" ❌ Invalid JSON: {content}")
return results
def run_targeting_search_tests(self) -> bool:
"""Run comprehensive targeting search tests"""
print("🚀 Meta Ads Targeting Search End-to-End Test Suite")
print("="*60)
# Check server availability
try:
response = requests.get(f"{self.base_url}/", timeout=5)
server_running = response.status_code in [200, 404]
except:
server_running = False
if not server_running:
print("❌ Server is not running at", self.base_url)
print(" Please start the server with:")
print(" python3 -m meta_ads_mcp --transport streamable-http --port 8080")
return False
print("✅ Server is running")
print("🔐 Using implicit authentication from server")
# Test 1: Search Interests
print("\n" + "="*60)
print("📋 PHASE 1: Testing Interest Search")
print("="*60)
interests_results = self.test_search_interests()
interests_success = any(
result.get("success") and result.get("count", 0) > 0
for result in interests_results.values()
)
# Test 2: Interest Suggestions
print("\n" + "="*60)
print("📋 PHASE 2: Testing Interest Suggestions")
print("="*60)
suggestions_result = self.test_get_interest_suggestions()
suggestions_success = suggestions_result.get("success") and suggestions_result.get("count", 0) > 0
# Test 3: Interest Validation
print("\n" + "="*60)
print("📋 PHASE 3: Testing Interest Validation")
print("="*60)
validation_result = self.test_validate_interests()
validation_success = (validation_result.get("success") and
validation_result.get("has_valid_interests") and
validation_result.get("has_invalid_interests"))
# Test 4: Behavior Search
print("\n" + "="*60)
print("📋 PHASE 4: Testing Behavior Search")
print("="*60)
behaviors_result = self.test_search_behaviors()
behaviors_success = behaviors_result.get("success") and behaviors_result.get("count", 0) > 0
# Test 5: Demographics Search
print("\n" + "="*60)
print("📋 PHASE 5: Testing Demographics Search")
print("="*60)
demographics_results = self.test_search_demographics()
demographics_success = any(
result.get("success") and result.get("count", 0) > 0
for result in demographics_results.values()
)
# Test 6: Geo Location Search
print("\n" + "="*60)
print("📋 PHASE 6: Testing Geo Location Search")
print("="*60)
geo_results = self.test_search_geo_locations()
geo_success = any(
result.get("success") and result.get("count", 0) > 0
for result in geo_results.values()
)
# Final assessment
print("\n" + "="*60)
print("📊 FINAL RESULTS")
print("="*60)
all_tests = [
("Interest Search", interests_success),
("Interest Suggestions", suggestions_success),
("Interest Validation", validation_success),
("Behavior Search", behaviors_success),
("Demographics Search", demographics_success),
("Geo Location Search", geo_success)
]
passed_tests = sum(1 for _, success in all_tests if success)
total_tests = len(all_tests)
for test_name, success in all_tests:
status = "✅ PASSED" if success else "❌ FAILED"
print(f" • {test_name}: {status}")
overall_success = passed_tests >= 4 # At least 4 out of 6 tests should pass
if overall_success:
print(f"\n✅ Targeting search tests: SUCCESS ({passed_tests}/{total_tests} passed)")
print(" • Core targeting search functionality is working")
print(" • Meta Ads API integration is functional")
return True
else:
print(f"\n❌ Targeting search tests: FAILED ({passed_tests}/{total_tests} passed)")
print(" • Some targeting search functions are not working properly")
return False
def main():
"""Main test execution"""
tester = TargetingSearchTester()
success = tester.run_targeting_search_tests()
if success:
print("\n🎉 All targeting search tests passed!")
else:
print("\n⚠️ Some targeting search tests failed - see details above")
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()
```
--------------------------------------------------------------------------------
/tests/test_insights_actions_and_values_e2e.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Unit Tests for Insights Actions and Action Values Functionality
This test suite validates the actions and action_values field implementation for the
get_insights function in meta_ads_mcp/core/insights.py.
Test cases cover:
- Actions and action_values field inclusion in API requests
- Different levels of aggregation (ad, adset, campaign, account)
- Time range handling with actions and action_values
- Error handling and validation
- Purchase data extraction from actions and action_values
"""
import pytest
import json
import asyncio
import requests
from unittest.mock import AsyncMock, patch, MagicMock
from typing import Dict, Any, List
# Import the function to test
from meta_ads_mcp.core.insights import get_insights
class TestInsightsActionsAndValues:
"""Test suite for actions and action_values in insights"""
@pytest.fixture
def mock_api_request(self):
"""Mock for the make_api_request function"""
with patch('meta_ads_mcp.core.insights.make_api_request') as mock:
mock.return_value = {
"data": [
{
"campaign_id": "test_campaign_id",
"campaign_name": "Test Campaign",
"impressions": "1000",
"clicks": "50",
"spend": "100.00",
"actions": [
{"action_type": "purchase", "value": "5"},
{"action_type": "lead", "value": "3"},
{"action_type": "view_content", "value": "20"}
],
"action_values": [
{"action_type": "purchase", "value": "500.00"},
{"action_type": "lead", "value": "150.00"},
{"action_type": "view_content", "value": "0.00"}
],
"cost_per_action_type": [
{"action_type": "purchase", "value": "20.00"},
{"action_type": "lead", "value": "33.33"}
]
}
],
"paging": {}
}
yield mock
@pytest.fixture
def mock_auth_manager(self):
"""Mock for the authentication manager"""
with patch('meta_ads_mcp.core.api.auth_manager') as mock, \
patch('meta_ads_mcp.core.auth.get_current_access_token') as mock_get_token:
# Mock a valid access token
mock.get_current_access_token.return_value = "test_access_token"
mock.is_token_valid.return_value = True
mock.app_id = "test_app_id"
mock_get_token.return_value = "test_access_token"
yield mock
@pytest.fixture
def valid_campaign_id(self):
"""Valid campaign ID for testing"""
return "123456789"
@pytest.fixture
def valid_account_id(self):
"""Valid account ID for testing"""
return "act_701351919139047"
@pytest.mark.asyncio
async def test_actions_and_action_values_included_in_fields(self, mock_api_request, mock_auth_manager, valid_campaign_id):
"""Test that actions and action_values are included in the fields parameter"""
result = await get_insights(
object_id=valid_campaign_id,
time_range="last_30d",
level="campaign"
)
# Parse the result
result_data = json.loads(result)
# Verify the API was called with correct parameters
mock_api_request.assert_called_once()
call_args = mock_api_request.call_args
# Check that the endpoint is correct (first argument)
assert call_args[0][0] == f"{valid_campaign_id}/insights"
# Check that actions and action_values are included in fields parameter
params = call_args[0][2] # Third positional argument is params
assert 'fields' in params
fields = params['fields']
assert 'actions' in fields
assert 'action_values' in fields
assert 'cost_per_action_type' in fields
# Verify the response structure
assert 'data' in result_data
assert len(result_data['data']) > 0
assert 'actions' in result_data['data'][0]
assert 'action_values' in result_data['data'][0]
@pytest.mark.asyncio
async def test_purchase_data_extraction(self, mock_api_request, mock_auth_manager, valid_campaign_id):
"""Test that purchase data can be extracted from actions and action_values"""
result = await get_insights(
object_id=valid_campaign_id,
time_range="last_30d",
level="campaign"
)
# Parse the result
result_data = json.loads(result)
# Get the first data point
data_point = result_data['data'][0]
# Extract purchase data from actions
actions = data_point.get('actions', [])
purchase_actions = [action for action in actions if action.get('action_type') == 'purchase']
# Extract purchase data from action_values
action_values = data_point.get('action_values', [])
purchase_values = [action_value for action_value in action_values if action_value.get('action_type') == 'purchase']
# Verify purchase data exists
assert len(purchase_actions) > 0, "No purchase actions found"
assert len(purchase_values) > 0, "No purchase action_values found"
# Verify purchase data values
purchase_count = purchase_actions[0].get('value')
purchase_value = purchase_values[0].get('value')
assert purchase_count == "5", f"Expected purchase count 5, got {purchase_count}"
assert purchase_value == "500.00", f"Expected purchase value 500.00, got {purchase_value}"
@pytest.mark.asyncio
async def test_actions_at_adset_level(self, mock_api_request, mock_auth_manager, valid_campaign_id):
"""Test actions and action_values at adset level"""
result = await get_insights(
object_id=valid_campaign_id,
time_range="last_30d",
level="adset"
)
# Parse the result
result_data = json.loads(result)
# Verify the API was called with correct parameters
mock_api_request.assert_called_once()
call_args = mock_api_request.call_args
# Check that the level parameter is correct
params = call_args[0][2]
assert params['level'] == 'adset'
# Verify the response structure
assert 'data' in result_data
@pytest.mark.asyncio
async def test_actions_at_ad_level(self, mock_api_request, mock_auth_manager, valid_campaign_id):
"""Test actions and action_values at ad level"""
result = await get_insights(
object_id=valid_campaign_id,
time_range="last_30d",
level="ad"
)
# Parse the result
result_data = json.loads(result)
# Verify the API was called with correct parameters
mock_api_request.assert_called_once()
call_args = mock_api_request.call_args
# Check that the level parameter is correct
params = call_args[0][2]
assert params['level'] == 'ad'
# Verify the response structure
assert 'data' in result_data
@pytest.mark.asyncio
async def test_actions_with_custom_time_range(self, mock_api_request, mock_auth_manager, valid_campaign_id):
"""Test actions and action_values with custom time range"""
custom_time_range = {"since": "2024-01-01", "until": "2024-01-31"}
result = await get_insights(
object_id=valid_campaign_id,
time_range=custom_time_range,
level="campaign"
)
# Parse the result
result_data = json.loads(result)
# Verify the API was called with correct parameters
mock_api_request.assert_called_once()
call_args = mock_api_request.call_args
# Check that time_range is properly formatted
params = call_args[0][2]
assert 'time_range' in params
assert params['time_range'] == json.dumps(custom_time_range)
# Verify the response structure
assert 'data' in result_data
@pytest.mark.asyncio
async def test_actions_with_breakdown(self, mock_api_request, mock_auth_manager, valid_campaign_id):
"""Test actions and action_values with breakdown dimension"""
result = await get_insights(
object_id=valid_campaign_id,
time_range="last_30d",
level="campaign",
breakdown="age"
)
# Parse the result
result_data = json.loads(result)
# Verify the API was called with correct parameters
mock_api_request.assert_called_once()
call_args = mock_api_request.call_args
# Check that breakdown is included
params = call_args[0][2]
assert 'breakdowns' in params
assert params['breakdowns'] == 'age'
# Verify the response structure
assert 'data' in result_data
@pytest.mark.asyncio
async def test_actions_without_object_id(self, mock_api_request, mock_auth_manager):
"""Test error handling when no object_id is provided"""
result = await get_insights(
time_range="last_30d",
level="campaign"
)
# Parse the result. The decorator returns a dict error for missing required args
if isinstance(result, dict):
result_data = result
else:
result_data = json.loads(result)
assert 'error' in result_data
assert "missing 1 required positional argument: 'object_id'" in result_data['error']
# Verify API was not called
mock_api_request.assert_not_called()
@pytest.mark.asyncio
async def test_actions_with_invalid_time_range(self, mock_api_request, mock_auth_manager, valid_campaign_id):
"""Test error handling with invalid time range"""
invalid_time_range = {"since": "2024-01-01"} # Missing "until"
result = await get_insights(
object_id=valid_campaign_id,
time_range=invalid_time_range,
level="campaign"
)
# Parse the result
result_data = json.loads(result)
# The error response is wrapped in a 'data' field
if 'data' in result_data:
error_data = json.loads(result_data['data'])
assert 'error' in error_data
assert 'since' in error_data['error'] and 'until' in error_data['error']
else:
assert 'error' in result_data
assert 'since' in result_data['error'] and 'until' in result_data['error']
# Verify API was not called
mock_api_request.assert_not_called()
@pytest.mark.asyncio
async def test_actions_api_error_handling(self, mock_api_request, mock_auth_manager, valid_campaign_id):
"""Test error handling when API call fails"""
# Mock API to raise an exception
mock_api_request.side_effect = Exception("API Error")
result = await get_insights(
object_id=valid_campaign_id,
time_range="last_30d",
level="campaign"
)
# Parse the result
# The API error handling returns a dict directly, not a JSON string
if isinstance(result, dict):
result_data = result
else:
result_data = json.loads(result)
# Verify error response
assert 'error' in result_data
assert 'API Error' in result_data['error']
@pytest.mark.asyncio
async def test_actions_fields_completeness(self, mock_api_request, mock_auth_manager, valid_campaign_id):
"""Test that all required fields are included in the request"""
result = await get_insights(
object_id=valid_campaign_id,
time_range="last_30d",
level="campaign"
)
# Verify the API was called with correct parameters
mock_api_request.assert_called_once()
call_args = mock_api_request.call_args
# Check that all required fields are included
params = call_args[0][2]
fields = params['fields']
# Required fields for actions and action_values
required_fields = [
'account_id', 'account_name', 'campaign_id', 'campaign_name',
'adset_id', 'adset_name', 'ad_id', 'ad_name',
'impressions', 'clicks', 'spend', 'cpc', 'cpm', 'ctr',
'reach', 'frequency', 'actions', 'action_values', 'conversions',
'unique_clicks', 'cost_per_action_type'
]
for field in required_fields:
assert field in fields, f"Field '{field}' not found in fields parameter"
@pytest.mark.asyncio
async def test_multiple_action_types(self, mock_api_request, mock_auth_manager, valid_campaign_id):
"""Test handling of multiple action types in the response"""
result = await get_insights(
object_id=valid_campaign_id,
time_range="last_30d",
level="campaign"
)
# Parse the result
result_data = json.loads(result)
# Get the first data point
data_point = result_data['data'][0]
# Check that multiple action types are present
actions = data_point.get('actions', [])
action_types = [action.get('action_type') for action in actions]
assert 'purchase' in action_types, "Purchase action type not found"
assert 'lead' in action_types, "Lead action type not found"
assert 'view_content' in action_types, "View content action type not found"
# Check action_values has corresponding entries
action_values = data_point.get('action_values', [])
action_value_types = [action_value.get('action_type') for action_value in action_values]
assert 'purchase' in action_value_types, "Purchase action_value type not found"
assert 'lead' in action_value_types, "Lead action_value type not found"
@pytest.mark.e2e
@pytest.mark.skip(reason="E2E test - run manually only")
class TestInsightsActionsAndValuesE2E:
"""E2E tests for actions and action_values via MCP HTTP server"""
def __init__(self, base_url: str = "http://localhost:8080"):
self.base_url = base_url.rstrip('/')
self.endpoint = f"{self.base_url}/mcp/"
self.request_id = 1
# Default account from workspace rules
self.account_id = "act_701351919139047"
def _make_request(self, method: str, params: Dict[str, Any] = None) -> Dict[str, Any]:
headers = {
"Content-Type": "application/json",
"Accept": "application/json, text/event-stream",
"User-Agent": "Insights-E2E-Test-Client/1.0"
}
payload = {
"jsonrpc": "2.0",
"method": method,
"id": self.request_id
}
if params:
payload["params"] = params
try:
resp = requests.post(self.endpoint, headers=headers, json=payload, timeout=30)
self.request_id += 1
return {
"status_code": resp.status_code,
"json": resp.json() if resp.status_code == 200 else None,
"text": resp.text,
"success": resp.status_code == 200
}
except requests.exceptions.RequestException as e:
return {"status_code": 0, "json": None, "text": str(e), "success": False, "error": str(e)}
def _check_for_errors(self, parsed_content: Dict[str, Any]) -> Dict[str, Any]:
if "data" in parsed_content:
data = parsed_content["data"]
if isinstance(data, dict) and 'error' in data:
return {"has_error": True, "error_message": data['error'], "format": "wrapped_dict"}
if isinstance(data, str):
try:
error_data = json.loads(data)
if 'error' in error_data:
return {"has_error": True, "error_message": error_data['error'], "format": "wrapped_json"}
except json.JSONDecodeError:
pass
if 'error' in parsed_content:
return {"has_error": True, "error_message": parsed_content['error'], "format": "direct"}
return {"has_error": False}
def test_tool_exists_and_has_required_fields(self):
result = self._make_request("tools/list", {})
assert result["success"], f"tools/list failed: {result.get('text', '')}"
tools = result["json"]["result"].get("tools", [])
tool = next((t for t in tools if t["name"] == "get_insights"), None)
assert tool is not None, "get_insights tool not found"
props = tool.get("inputSchema", {}).get("properties", {})
for req in ["object_id", "time_range", "breakdown", "level"]:
assert req in props, f"Missing parameter in schema: {req}"
def test_get_insights_account_level(self):
params = {
"name": "get_insights",
"arguments": {
"object_id": self.account_id,
"time_range": "last_30d",
"level": "account"
}
}
result = self._make_request("tools/call", params)
assert result["success"], f"tools/call failed: {result.get('text', '')}"
response_data = result["json"]["result"]
content = response_data.get("content", [{}])[0].get("text", "")
parsed = json.loads(content)
err = self._check_for_errors(parsed)
# Don't fail if auth or permissions block; just assert structure is parsable
if err["has_error"]:
assert isinstance(err["error_message"], (str, dict))
else:
# Expect data list on success
data = parsed.get("data") if isinstance(parsed, dict) else None
assert data is not None
def main():
tester = TestInsightsActionsAndValuesE2E()
print("🚀 Insights Actions E2E (manual)")
# Basic smoke run
try:
tester.test_tool_exists_and_has_required_fields()
print("✅ Tool schema ok")
tester.test_get_insights_account_level()
print("✅ Account-level insights request executed")
except AssertionError as e:
print(f"❌ Assertion failed: {e}")
except Exception as e:
print(f"❌ Error: {e}")
if __name__ == "__main__":
main()
def extract_purchase_data(insights_data):
"""
Helper function to extract purchase data from insights response.
Args:
insights_data: The data array from insights response
Returns:
dict: Dictionary with purchase count and value
"""
if not insights_data or len(insights_data) == 0:
return {"purchase_count": 0, "purchase_value": 0.0}
data_point = insights_data[0]
# Extract purchase count from actions
actions = data_point.get('actions', [])
purchase_actions = [action for action in actions if action.get('action_type') == 'purchase']
purchase_count = int(purchase_actions[0].get('value', 0)) if purchase_actions else 0
# Extract purchase value from action_values
action_values = data_point.get('action_values', [])
purchase_values = [action_value for action_value in action_values if action_value.get('action_type') == 'purchase']
purchase_value = float(purchase_values[0].get('value', 0)) if purchase_values else 0.0
return {
"purchase_count": purchase_count,
"purchase_value": purchase_value
}
class TestPurchaseDataExtraction:
"""Test suite for purchase data extraction helper function"""
def test_extract_purchase_data_with_purchases(self):
"""Test extraction when purchase data is present"""
insights_data = [{
"actions": [
{"action_type": "purchase", "value": "5"},
{"action_type": "lead", "value": "3"}
],
"action_values": [
{"action_type": "purchase", "value": "500.00"},
{"action_type": "lead", "value": "150.00"}
]
}]
result = extract_purchase_data(insights_data)
assert result["purchase_count"] == 5
assert result["purchase_value"] == 500.0
def test_extract_purchase_data_without_purchases(self):
"""Test extraction when no purchase data is present"""
insights_data = [{
"actions": [
{"action_type": "lead", "value": "3"},
{"action_type": "view_content", "value": "20"}
],
"action_values": [
{"action_type": "lead", "value": "150.00"},
{"action_type": "view_content", "value": "0.00"}
]
}]
result = extract_purchase_data(insights_data)
assert result["purchase_count"] == 0
assert result["purchase_value"] == 0.0
def test_extract_purchase_data_empty_data(self):
"""Test extraction with empty data"""
insights_data = []
result = extract_purchase_data(insights_data)
assert result["purchase_count"] == 0
assert result["purchase_value"] == 0.0
```
--------------------------------------------------------------------------------
/meta_ads_mcp/core/auth.py:
--------------------------------------------------------------------------------
```python
"""Authentication related functionality for Meta Ads API."""
from typing import Any, Dict, Optional
import time
import platform
import pathlib
import os
import webbrowser
import asyncio
import json
from .utils import logger
import requests
# Import from the new callback server module
from .callback_server import (
start_callback_server,
shutdown_callback_server,
token_container,
callback_server_port
)
# Import the new Pipeboard authentication
from .pipeboard_auth import pipeboard_auth_manager
# Auth constants
# Scope includes pages_show_list and pages_read_engagement to fix issue #16
# where get_account_pages failed for regular users due to missing page permissions
AUTH_SCOPE = "business_management,public_profile,pages_show_list,pages_read_engagement"
AUTH_REDIRECT_URI = "http://localhost:8888/callback"
AUTH_RESPONSE_TYPE = "token"
# Log important configuration information
logger.info("Authentication module initialized")
logger.info(f"Auth scope: {AUTH_SCOPE}")
logger.info(f"Default redirect URI: {AUTH_REDIRECT_URI}")
# Global flag for authentication state
needs_authentication = False
# Meta configuration singleton
class MetaConfig:
_instance = None
def __new__(cls):
if cls._instance is None:
logger.debug("Creating new MetaConfig instance")
cls._instance = super(MetaConfig, cls).__new__(cls)
cls._instance.app_id = os.environ.get("META_APP_ID", "779761636818489")
logger.info(f"MetaConfig initialized with app_id from env/default: {cls._instance.app_id}")
return cls._instance
def set_app_id(self, app_id):
"""Set the Meta App ID for API calls"""
logger.info(f"Setting Meta App ID: {app_id}")
self.app_id = app_id
# Also update environment variable for modules that might read directly from it
os.environ["META_APP_ID"] = app_id
logger.debug(f"Updated META_APP_ID environment variable: {os.environ.get('META_APP_ID')}")
def get_app_id(self):
"""Get the current Meta App ID"""
# Check if we have one set
if hasattr(self, 'app_id') and self.app_id:
logger.debug(f"Using app_id from instance: {self.app_id}")
return self.app_id
# If not, try environment variable
env_app_id = os.environ.get("META_APP_ID", "")
if env_app_id:
logger.debug(f"Using app_id from environment: {env_app_id}")
# Update our instance for future use
self.app_id = env_app_id
return env_app_id
logger.warning("No app_id found in instance or environment variables")
return ""
def is_configured(self):
"""Check if the Meta configuration is complete"""
app_id = self.get_app_id()
configured = bool(app_id)
logger.debug(f"MetaConfig.is_configured() = {configured} (app_id: {app_id})")
return configured
# Create singleton instance
meta_config = MetaConfig()
class TokenInfo:
"""Stores token information including expiration"""
def __init__(self, access_token: str, expires_in: Optional[int] = None, user_id: Optional[str] = None):
self.access_token = access_token
self.expires_in = expires_in
self.user_id = user_id
self.created_at = int(time.time())
logger.debug(f"TokenInfo created. Expires in: {expires_in if expires_in else 'Not specified'}")
def is_expired(self) -> bool:
"""Check if the token is expired"""
if not self.expires_in:
return False # If no expiration is set, assume it's not expired
current_time = int(time.time())
return current_time > (self.created_at + self.expires_in)
def serialize(self) -> Dict[str, Any]:
"""Convert to a dictionary for storage"""
return {
"access_token": self.access_token,
"expires_in": self.expires_in,
"user_id": self.user_id,
"created_at": self.created_at
}
@classmethod
def deserialize(cls, data: Dict[str, Any]) -> 'TokenInfo':
"""Create from a stored dictionary"""
token = cls(
access_token=data.get("access_token", ""),
expires_in=data.get("expires_in"),
user_id=data.get("user_id")
)
token.created_at = data.get("created_at", int(time.time()))
return token
class AuthManager:
"""Manages authentication with Meta APIs"""
def __init__(self, app_id: str, redirect_uri: str = AUTH_REDIRECT_URI):
self.app_id = app_id
self.redirect_uri = redirect_uri
self.token_info = None
# Check for Pipeboard token first
self.use_pipeboard = bool(os.environ.get("PIPEBOARD_API_TOKEN", ""))
if not self.use_pipeboard:
self._load_cached_token()
def _get_token_cache_path(self) -> pathlib.Path:
"""Get the platform-specific path for token cache file"""
if platform.system() == "Windows":
base_path = pathlib.Path(os.environ.get("APPDATA", ""))
elif platform.system() == "Darwin": # macOS
base_path = pathlib.Path.home() / "Library" / "Application Support"
else: # Assume Linux/Unix
base_path = pathlib.Path.home() / ".config"
# Create directory if it doesn't exist
cache_dir = base_path / "meta-ads-mcp"
cache_dir.mkdir(parents=True, exist_ok=True)
return cache_dir / "token_cache.json"
def _load_cached_token(self) -> bool:
"""Load token from cache if available"""
cache_path = self._get_token_cache_path()
if not cache_path.exists():
return False
try:
with open(cache_path, "r") as f:
data = json.load(f)
# Validate the cached data structure
required_fields = ["access_token", "created_at"]
if not all(field in data for field in required_fields):
logger.warning("Cached token data is missing required fields")
return False
# Check if the token looks valid (basic format check)
if not data.get("access_token") or len(data["access_token"]) < 20:
logger.warning("Cached token appears malformed")
return False
self.token_info = TokenInfo.deserialize(data)
# Check if token is expired
if self.token_info.is_expired():
logger.info("Cached token is expired, removing cache file")
# Remove the expired cache file
try:
cache_path.unlink()
logger.info(f"Removed expired token cache: {cache_path}")
except Exception as e:
logger.warning(f"Could not remove expired cache file: {e}")
self.token_info = None
return False
# Additional validation: check if token is too old (more than 60 days)
current_time = int(time.time())
if self.token_info.created_at and (current_time - self.token_info.created_at) > (60 * 24 * 3600):
logger.warning("Cached token is too old (more than 60 days), removing cache file")
try:
cache_path.unlink()
logger.info(f"Removed old token cache: {cache_path}")
except Exception as e:
logger.warning(f"Could not remove old cache file: {e}")
self.token_info = None
return False
logger.info(f"Loaded cached token (expires in {(self.token_info.created_at + self.token_info.expires_in) - int(time.time())} seconds)")
return True
except Exception as e:
logger.error(f"Error loading cached token: {e}")
# If there's any error reading the cache, try to remove the corrupted file
try:
cache_path.unlink()
logger.info(f"Removed corrupted token cache: {cache_path}")
except Exception as cleanup_error:
logger.warning(f"Could not remove corrupted cache file: {cleanup_error}")
return False
def _save_token_to_cache(self) -> None:
"""Save token to cache file"""
if not self.token_info:
return
cache_path = self._get_token_cache_path()
try:
with open(cache_path, "w") as f:
json.dump(self.token_info.serialize(), f)
logger.info(f"Token cached at: {cache_path}")
except Exception as e:
logger.error(f"Error saving token to cache: {e}")
def get_auth_url(self) -> str:
"""Generate the Facebook OAuth URL for desktop app flow"""
return (
f"https://www.facebook.com/v22.0/dialog/oauth?"
f"client_id={self.app_id}&"
f"redirect_uri={self.redirect_uri}&"
f"scope={AUTH_SCOPE}&"
f"response_type={AUTH_RESPONSE_TYPE}"
)
def authenticate(self, force_refresh: bool = False) -> Optional[str]:
"""
Authenticate with Meta APIs
Args:
force_refresh: Force token refresh even if cached token exists
Returns:
Access token if successful, None otherwise
"""
# If Pipeboard auth is available, use that instead
if self.use_pipeboard:
logger.info("Using Pipeboard authentication")
return pipeboard_auth_manager.get_access_token(force_refresh=force_refresh)
# Otherwise, use the original OAuth flow
# Check if we already have a valid token
if not force_refresh and self.token_info and not self.token_info.is_expired():
return self.token_info.access_token
# Start the callback server if not already running
try:
port = start_callback_server()
# Update redirect URI with the actual port
self.redirect_uri = f"http://localhost:{port}/callback"
# Generate the auth URL
auth_url = self.get_auth_url()
# Open browser with auth URL
logger.info(f"Opening browser with URL: {auth_url}")
webbrowser.open(auth_url)
# We don't wait for the token here anymore
# The token will be processed by the callback server
# Just return None to indicate we've started the flow
return None
except Exception as e:
logger.error(f"Failed to start callback server: {e}")
logger.info("Callback server disabled. OAuth authentication flow cannot be used.")
return None
def get_access_token(self) -> Optional[str]:
"""
Get the current access token, refreshing if necessary
Returns:
Access token if available, None otherwise
"""
# If using Pipeboard, always delegate to the Pipeboard auth manager
if self.use_pipeboard:
return pipeboard_auth_manager.get_access_token()
if not self.token_info or self.token_info.is_expired():
return None
return self.token_info.access_token
def invalidate_token(self) -> None:
"""Invalidate the current token, usually because it has expired or is invalid"""
# If using Pipeboard, delegate to the Pipeboard auth manager
if self.use_pipeboard:
pipeboard_auth_manager.invalidate_token()
return
if self.token_info:
logger.info(f"Invalidating token: {self.token_info.access_token[:10]}...")
self.token_info = None
# Signal that authentication is needed
global needs_authentication
needs_authentication = True
# Remove the cached token file
try:
cache_path = self._get_token_cache_path()
if cache_path.exists():
os.remove(cache_path)
logger.info(f"Removed cached token file: {cache_path}")
except Exception as e:
logger.error(f"Error removing cached token file: {e}")
def clear_token(self) -> None:
"""Alias for invalidate_token for consistency with other APIs"""
self.invalidate_token()
def process_token_response(token_container):
"""Process the token response from Facebook."""
global needs_authentication, auth_manager
if token_container and token_container.get('token'):
logger.info("Processing token response from Facebook OAuth")
# Exchange the short-lived token for a long-lived token
short_lived_token = token_container['token']
long_lived_token_info = exchange_token_for_long_lived(short_lived_token)
if long_lived_token_info:
logger.info(f"Successfully exchanged for long-lived token (expires in {long_lived_token_info.expires_in} seconds)")
try:
auth_manager.token_info = long_lived_token_info
logger.info(f"Long-lived token info set in auth_manager, expires in {long_lived_token_info.expires_in} seconds")
except NameError:
logger.error("auth_manager not defined when trying to process token")
try:
logger.info("Attempting to save long-lived token to cache")
auth_manager._save_token_to_cache()
logger.info(f"Long-lived token successfully saved to cache at {auth_manager._get_token_cache_path()}")
except Exception as e:
logger.error(f"Error saving token to cache: {e}")
needs_authentication = False
return True
else:
# Fall back to the short-lived token if exchange fails
logger.warning("Failed to exchange for long-lived token, using short-lived token instead")
token_info = TokenInfo(
access_token=short_lived_token,
expires_in=token_container.get('expires_in', 0)
)
try:
auth_manager.token_info = token_info
logger.info(f"Short-lived token info set in auth_manager, expires in {token_info.expires_in} seconds")
except NameError:
logger.error("auth_manager not defined when trying to process token")
try:
logger.info("Attempting to save token to cache")
auth_manager._save_token_to_cache()
logger.info(f"Token successfully saved to cache at {auth_manager._get_token_cache_path()}")
except Exception as e:
logger.error(f"Error saving token to cache: {e}")
needs_authentication = False
return True
else:
logger.warning("Received empty token in process_token_response")
needs_authentication = True
return False
def exchange_token_for_long_lived(short_lived_token):
"""
Exchange a short-lived token for a long-lived token (60 days validity).
Args:
short_lived_token: The short-lived access token received from OAuth flow
Returns:
TokenInfo object with the long-lived token, or None if exchange failed
"""
logger.info("Attempting to exchange short-lived token for long-lived token")
try:
# Get the app ID from the configuration
app_id = meta_config.get_app_id()
# Get the app secret - this should be securely stored
app_secret = os.environ.get("META_APP_SECRET", "")
if not app_id or not app_secret:
logger.error("Missing app_id or app_secret for token exchange")
return None
# Make the API request to exchange the token
url = "https://graph.facebook.com/v22.0/oauth/access_token"
params = {
"grant_type": "fb_exchange_token",
"client_id": app_id,
"client_secret": app_secret,
"fb_exchange_token": short_lived_token
}
logger.debug(f"Making token exchange request to {url}")
response = requests.get(url, params=params)
if response.status_code == 200:
data = response.json()
logger.debug(f"Token exchange response: {data}")
# Create TokenInfo from the response
# The response includes access_token and expires_in (in seconds)
new_token = data.get("access_token")
expires_in = data.get("expires_in")
if new_token:
logger.info(f"Received long-lived token, expires in {expires_in} seconds (~{expires_in//86400} days)")
return TokenInfo(
access_token=new_token,
expires_in=expires_in
)
else:
logger.error("No access_token in exchange response")
return None
else:
logger.error(f"Token exchange failed with status {response.status_code}: {response.text}")
return None
except Exception as e:
logger.error(f"Error exchanging token: {e}")
return None
async def get_current_access_token() -> Optional[str]:
"""Get the current access token from auth manager"""
# Check for environment variable first - this takes highest precedence
env_token = os.environ.get("META_ACCESS_TOKEN")
if env_token:
logger.debug("Using access token from META_ACCESS_TOKEN environment variable")
# Basic validation
if len(env_token) < 20: # Most Meta tokens are much longer
logger.error(f"TOKEN VALIDATION FAILED: Token from environment variable appears malformed (length: {len(env_token)})")
return None
return env_token
# Use the singleton auth manager
global auth_manager
# Log the function call and current app ID
logger.debug("get_current_access_token() called")
app_id = meta_config.get_app_id()
logger.debug(f"Current app_id: {app_id}")
# Check if using Pipeboard authentication
using_pipeboard = auth_manager.use_pipeboard
# Check if app_id is valid - but only if not using Pipeboard authentication
if not app_id and not using_pipeboard:
logger.error("TOKEN VALIDATION FAILED: No valid app_id configured")
logger.error("Please set META_APP_ID environment variable or configure via meta_config.set_app_id()")
return None
# Attempt to get access token
try:
token = auth_manager.get_access_token()
if token:
# Add basic token validation - check if it looks like a valid token
if len(token) < 20: # Most Meta tokens are much longer
logger.error(f"TOKEN VALIDATION FAILED: Token appears malformed (length: {len(token)})")
auth_manager.invalidate_token()
return None
logger.debug(f"Access token found in auth_manager (starts with: {token[:10]}...)")
return token
else:
logger.warning("No valid access token available in auth_manager")
# Check why token might be missing
if hasattr(auth_manager, 'token_info') and auth_manager.token_info:
if auth_manager.token_info.is_expired():
logger.error("TOKEN VALIDATION FAILED: Token is expired")
# Add expiration details
if hasattr(auth_manager.token_info, 'expires_in') and auth_manager.token_info.expires_in:
expiry_time = auth_manager.token_info.created_at + auth_manager.token_info.expires_in
current_time = int(time.time())
expired_seconds_ago = current_time - expiry_time
logger.error(f"Token expired {expired_seconds_ago} seconds ago")
elif not auth_manager.token_info.access_token:
logger.error("TOKEN VALIDATION FAILED: Token object exists but access_token is empty")
else:
logger.error("TOKEN VALIDATION FAILED: Token exists but was rejected for unknown reason")
else:
logger.error("TOKEN VALIDATION FAILED: No token information available")
# Suggest next steps for troubleshooting
logger.error("To fix: Try re-authenticating or check if your token has been revoked")
return None
except Exception as e:
logger.error(f"Error getting access token: {str(e)}")
import traceback
logger.error(f"Token validation stacktrace: {traceback.format_exc()}")
return None
def login():
"""
Start the login flow to authenticate with Meta
"""
print("Starting Meta Ads authentication flow...")
try:
# Start the callback server first
try:
port = start_callback_server()
except Exception as callback_error:
print(f"Error: {callback_error}")
print("Callback server is disabled. Please use alternative authentication methods:")
print("- Set PIPEBOARD_API_TOKEN environment variable for Pipeboard authentication")
print("- Or provide a direct META_ACCESS_TOKEN environment variable")
return
# Get the auth URL and open the browser
auth_url = auth_manager.get_auth_url()
print(f"Opening browser with URL: {auth_url}")
webbrowser.open(auth_url)
# Wait for token to be received
print("Waiting for authentication to complete...")
max_wait = 300 # 5 minutes
wait_interval = 2 # 2 seconds
for _ in range(max_wait // wait_interval):
if token_container["token"]:
token = token_container["token"]
print("Authentication successful!")
# Verify token works by getting basic user info
try:
from .api import make_api_request
result = asyncio.run(make_api_request("me", token, {}))
print(f"Authenticated as: {result.get('name', 'Unknown')} (ID: {result.get('id', 'Unknown')})")
return
except Exception as e:
print(f"Warning: Could not verify token: {e}")
return
time.sleep(wait_interval)
print("Authentication timed out. Please try again.")
except Exception as e:
print(f"Error during authentication: {e}")
print(f"Direct authentication URL: {auth_manager.get_auth_url()}")
print("You can manually open this URL in your browser to complete authentication.")
# Initialize auth manager with a placeholder - will be updated at runtime
META_APP_ID = os.environ.get("META_APP_ID", "YOUR_META_APP_ID")
# Create the auth manager
auth_manager = AuthManager(META_APP_ID)
```
--------------------------------------------------------------------------------
/meta_ads_mcp/core/pipeboard_auth.py:
--------------------------------------------------------------------------------
```python
"""Authentication with Meta Ads API via pipeboard.co."""
import os
import json
import time
import requests
from pathlib import Path
import platform
from typing import Optional, Dict, Any
from .utils import logger
# Enable more detailed logging
import logging
logger.setLevel(logging.DEBUG)
# Base URL for pipeboard API
PIPEBOARD_API_BASE = "https://pipeboard.co/api"
# Debug message about API base URL
logger.info(f"Pipeboard API base URL: {PIPEBOARD_API_BASE}")
class TokenInfo:
"""Stores token information including expiration"""
def __init__(self, access_token: str, expires_at: Optional[str] = None, token_type: Optional[str] = None):
self.access_token = access_token
self.expires_at = expires_at
self.token_type = token_type
self.created_at = int(time.time())
logger.debug(f"TokenInfo created. Expires at: {expires_at if expires_at else 'Not specified'}")
def is_expired(self) -> bool:
"""Check if the token is expired"""
if not self.expires_at:
logger.debug("No expiration date set for token, assuming not expired")
return False # If no expiration is set, assume it's not expired
# Parse ISO 8601 date format to timestamp
try:
# Convert the expires_at string to a timestamp
# Format is like "2023-12-31T23:59:59.999Z" or "2023-12-31T23:59:59.999+00:00"
from datetime import datetime
# Remove the Z suffix if present and handle +00:00 format
expires_at_str = self.expires_at
if expires_at_str.endswith('Z'):
expires_at_str = expires_at_str[:-1] # Remove Z
# Handle microseconds if present
if '.' in expires_at_str:
datetime_format = "%Y-%m-%dT%H:%M:%S.%f"
else:
datetime_format = "%Y-%m-%dT%H:%M:%S"
# Handle timezone offset
timezone_offset = "+00:00"
if "+" in expires_at_str:
expires_at_str, timezone_offset = expires_at_str.split("+")
timezone_offset = "+" + timezone_offset
# Parse the datetime without timezone info
expires_datetime = datetime.strptime(expires_at_str, datetime_format)
# Convert to timestamp (assume UTC)
expires_timestamp = expires_datetime.timestamp()
current_time = time.time()
# Check if token is expired and log result
is_expired = current_time > expires_timestamp
time_diff = expires_timestamp - current_time
if is_expired:
logger.debug(f"Token is expired! Current time: {datetime.fromtimestamp(current_time)}, "
f"Expires at: {datetime.fromtimestamp(expires_timestamp)}, "
f"Expired {abs(time_diff):.0f} seconds ago")
else:
logger.debug(f"Token is still valid. Expires at: {datetime.fromtimestamp(expires_timestamp)}, "
f"Time remaining: {time_diff:.0f} seconds")
return is_expired
except Exception as e:
logger.error(f"Error parsing expiration date: {e}")
# Log the actual value to help diagnose format issues
logger.error(f"Invalid expires_at value: '{self.expires_at}'")
# Log detailed error information
import traceback
logger.error(f"Traceback: {traceback.format_exc()}")
return False # If we can't parse the date, assume it's not expired
def serialize(self) -> Dict[str, Any]:
"""Convert to a dictionary for storage"""
return {
"access_token": self.access_token,
"expires_at": self.expires_at,
"token_type": self.token_type,
"created_at": self.created_at
}
@classmethod
def deserialize(cls, data: Dict[str, Any]) -> 'TokenInfo':
"""Create from a stored dictionary"""
logger.debug(f"Deserializing token data with keys: {', '.join(data.keys())}")
if 'expires_at' in data:
logger.debug(f"Token expires_at from cache: {data['expires_at']}")
token = cls(
access_token=data.get("access_token", ""),
expires_at=data.get("expires_at"),
token_type=data.get("token_type")
)
token.created_at = data.get("created_at", int(time.time()))
return token
class PipeboardAuthManager:
"""Manages authentication with Meta APIs via pipeboard.co"""
def __init__(self):
self.api_token = os.environ.get("PIPEBOARD_API_TOKEN", "")
logger.debug(f"PipeboardAuthManager initialized with API token: {self.api_token[:5]}..." if self.api_token else "No API token")
if self.api_token:
logger.info("Pipeboard authentication enabled. Will use pipeboard.co for Meta authentication.")
else:
logger.info("Pipeboard authentication not enabled. Set PIPEBOARD_API_TOKEN environment variable to enable.")
self.token_info = None
# Note: Token caching is disabled to always fetch fresh tokens from Pipeboard
def _get_token_cache_path(self) -> Path:
"""Get the platform-specific path for token cache file"""
if platform.system() == "Windows":
base_path = Path(os.environ.get("APPDATA", ""))
elif platform.system() == "Darwin": # macOS
base_path = Path.home() / "Library" / "Application Support"
else: # Assume Linux/Unix
base_path = Path.home() / ".config"
# Create directory if it doesn't exist
cache_dir = base_path / "meta-ads-mcp"
cache_dir.mkdir(parents=True, exist_ok=True)
cache_path = cache_dir / "pipeboard_token_cache.json"
logger.debug(f"Token cache path: {cache_path}")
return cache_path
def _load_cached_token(self) -> bool:
"""Load token from cache if available"""
cache_path = self._get_token_cache_path()
if not cache_path.exists():
logger.debug(f"Token cache file not found at {cache_path}")
return False
try:
with open(cache_path, "r") as f:
logger.debug(f"Reading token cache from {cache_path}")
data = json.load(f)
# Validate the cached data structure
required_fields = ["access_token"]
if not all(field in data for field in required_fields):
logger.warning("Cached token data is missing required fields")
return False
# Check if the token looks valid (basic format check)
if not data.get("access_token") or len(data["access_token"]) < 20:
logger.warning("Cached token appears malformed")
return False
self.token_info = TokenInfo.deserialize(data)
# Log token details (partial token for security)
masked_token = self.token_info.access_token[:10] + "..." + self.token_info.access_token[-5:] if self.token_info.access_token else "None"
logger.debug(f"Loaded token: {masked_token}")
# Check if token is expired
if self.token_info.is_expired():
logger.info("Cached token is expired, removing cache file")
# Remove the expired cache file
try:
cache_path.unlink()
logger.info(f"Removed expired token cache: {cache_path}")
except Exception as e:
logger.warning(f"Could not remove expired cache file: {e}")
self.token_info = None
return False
# Additional validation: check if token is too old (more than 60 days)
current_time = int(time.time())
if self.token_info.created_at and (current_time - self.token_info.created_at) > (60 * 24 * 3600):
logger.warning("Cached token is too old (more than 60 days), removing cache file")
try:
cache_path.unlink()
logger.info(f"Removed old token cache: {cache_path}")
except Exception as e:
logger.warning(f"Could not remove old cache file: {e}")
self.token_info = None
return False
logger.info(f"Loaded cached token (expires at {self.token_info.expires_at})")
return True
except json.JSONDecodeError as e:
logger.error(f"Error parsing token cache file: {e}")
logger.debug("Token cache file might be corrupted, trying to read raw content")
try:
with open(cache_path, "r") as f:
raw_content = f.read()
logger.debug(f"Raw cache file content (first 100 chars): {raw_content[:100]}")
except Exception as e2:
logger.error(f"Could not read raw cache file: {e2}")
# If there's any error reading the cache, try to remove the corrupted file
try:
cache_path.unlink()
logger.info(f"Removed corrupted token cache: {cache_path}")
except Exception as cleanup_error:
logger.warning(f"Could not remove corrupted cache file: {cleanup_error}")
return False
except Exception as e:
logger.error(f"Error loading cached token: {e}")
# If there's any error reading the cache, try to remove the corrupted file
try:
cache_path.unlink()
logger.info(f"Removed corrupted token cache: {cache_path}")
except Exception as cleanup_error:
logger.warning(f"Could not remove corrupted cache file: {cleanup_error}")
return False
def _save_token_to_cache(self) -> None:
"""Save token to cache file"""
if not self.token_info:
logger.debug("No token to save to cache")
return
cache_path = self._get_token_cache_path()
try:
token_data = self.token_info.serialize()
logger.debug(f"Saving token to cache. Expires at: {token_data.get('expires_at')}")
with open(cache_path, "w") as f:
json.dump(token_data, f)
logger.info(f"Token cached at: {cache_path}")
except Exception as e:
logger.error(f"Error saving token to cache: {e}")
def initiate_auth_flow(self) -> Dict[str, str]:
"""
Initiate the Meta OAuth flow via pipeboard.co
Returns:
Dict with loginUrl and status info
"""
if not self.api_token:
logger.error("No PIPEBOARD_API_TOKEN environment variable set")
raise ValueError("No PIPEBOARD_API_TOKEN environment variable set")
# Exactly match the format used in meta_auth_test.sh
url = f"{PIPEBOARD_API_BASE}/meta/auth?api_token={self.api_token}"
headers = {
"Content-Type": "application/json"
}
logger.info(f"Initiating auth flow with POST request to {url}")
try:
# Make the POST request exactly as in the working meta_auth_test.sh script
response = requests.post(url, headers=headers)
logger.info(f"Auth flow response status: {response.status_code}")
# Better error handling
if response.status_code != 200:
logger.error(f"Auth flow error: HTTP {response.status_code}")
error_text = response.text if response.text else "No response content"
logger.error(f"Response content: {error_text}")
if response.status_code == 404:
raise ValueError(f"Pipeboard API endpoint not found. Check if the server is running at {PIPEBOARD_API_BASE}")
elif response.status_code == 401:
raise ValueError(f"Unauthorized: Invalid API token. Check your PIPEBOARD_API_TOKEN.")
response.raise_for_status()
# Parse the response
try:
data = response.json()
logger.info(f"Received response keys: {', '.join(data.keys())}")
except json.JSONDecodeError:
logger.error(f"Could not parse JSON response: {response.text}")
raise ValueError(f"Invalid JSON response from auth endpoint: {response.text[:100]}")
# Log auth flow response (without sensitive information)
if 'loginUrl' in data:
logger.info(f"Auth flow initiated successfully with login URL: {data['loginUrl'][:30]}...")
else:
logger.warning(f"Auth flow response missing loginUrl field. Response keys: {', '.join(data.keys())}")
return data
except requests.exceptions.ConnectionError as e:
logger.error(f"Connection error to Pipeboard: {e}")
logger.debug(f"Attempting to connect to: {PIPEBOARD_API_BASE}")
raise
except requests.exceptions.Timeout as e:
logger.error(f"Timeout connecting to Pipeboard: {e}")
raise
except requests.exceptions.RequestException as e:
logger.error(f"Error initiating auth flow: {e}")
raise
except Exception as e:
logger.error(f"Unexpected error initiating auth flow: {e}")
raise
def get_access_token(self, force_refresh: bool = False) -> Optional[str]:
"""
Get the current access token, refreshing if necessary or if forced
Args:
force_refresh: Force token refresh even if cached token exists
Returns:
Access token if available, None otherwise
"""
# First check if API token is configured
if not self.api_token:
logger.error("TOKEN VALIDATION FAILED: No Pipeboard API token configured")
logger.error("Please set PIPEBOARD_API_TOKEN environment variable")
return None
logger.info("Getting fresh token from Pipeboard (caching disabled)")
# If force refresh or no token/expired token, get a new one from Pipeboard
try:
# Make a request to get the token, using the same URL format as initiate_auth_flow
url = f"{PIPEBOARD_API_BASE}/meta/token?api_token={self.api_token}"
headers = {
"Content-Type": "application/json"
}
logger.info(f"Requesting token from {url}")
# Add timeout for better error messages
try:
response = requests.get(url, headers=headers, timeout=10)
except requests.exceptions.Timeout:
logger.error("TOKEN VALIDATION FAILED: Timeout while connecting to Pipeboard API")
logger.error(f"Could not connect to {PIPEBOARD_API_BASE} within 10 seconds")
return None
except requests.exceptions.ConnectionError:
logger.error("TOKEN VALIDATION FAILED: Connection error with Pipeboard API")
logger.error(f"Could not connect to {PIPEBOARD_API_BASE} - check if service is running")
return None
logger.info(f"Token request response status: {response.status_code}")
# Better error handling with response content
if response.status_code != 200:
logger.error(f"TOKEN VALIDATION FAILED: HTTP error {response.status_code}")
error_text = response.text if response.text else "No response content"
logger.error(f"Response content: {error_text}")
# Add more specific error messages for common status codes
if response.status_code == 401:
logger.error("Authentication failed: Invalid Pipeboard API token")
elif response.status_code == 404:
logger.error("Endpoint not found: Check if Pipeboard API service is running correctly")
elif response.status_code == 400:
logger.error("Bad request: The request to Pipeboard API was malformed")
response.raise_for_status()
try:
data = response.json()
logger.info(f"Received token response with keys: {', '.join(data.keys())}")
except json.JSONDecodeError:
logger.error("TOKEN VALIDATION FAILED: Invalid JSON response from Pipeboard API")
logger.error(f"Response content (first 100 chars): {response.text[:100]}")
return None
# Validate response data
if "access_token" not in data:
logger.error("TOKEN VALIDATION FAILED: No access_token in Pipeboard API response")
logger.error(f"Response keys: {', '.join(data.keys())}")
if "error" in data:
logger.error(f"Error details: {data['error']}")
else:
logger.error("No error information available in response")
return None
# Create new token info
self.token_info = TokenInfo(
access_token=data.get("access_token"),
expires_at=data.get("expires_at"),
token_type=data.get("token_type", "bearer")
)
# Note: Token caching is disabled
masked_token = self.token_info.access_token[:10] + "..." + self.token_info.access_token[-5:] if self.token_info.access_token else "None"
logger.info(f"Successfully retrieved access token: {masked_token}")
return self.token_info.access_token
except requests.RequestException as e:
status_code = e.response.status_code if hasattr(e, 'response') and e.response else None
response_text = e.response.text if hasattr(e, 'response') and e.response else "No response"
if status_code == 401:
logger.error(f"Unauthorized: Check your PIPEBOARD_API_TOKEN. Response: {response_text}")
elif status_code == 404:
logger.error(f"No token available: You might need to complete authorization first. Response: {response_text}")
# Return None so caller can handle the auth flow
return None
else:
logger.error(f"Error getting access token (status {status_code}): {e}")
logger.error(f"Response content: {response_text}")
return None
except Exception as e:
logger.error(f"Unexpected error getting access token: {e}")
return None
def invalidate_token(self) -> None:
"""Invalidate the current token, usually because it has expired or is invalid"""
if self.token_info:
logger.info(f"Invalidating token: {self.token_info.access_token[:10]}...")
self.token_info = None
# Remove the cached token file
try:
cache_path = self._get_token_cache_path()
if cache_path.exists():
os.remove(cache_path)
logger.info(f"Removed cached token file: {cache_path}")
else:
logger.debug(f"No token cache file to remove: {cache_path}")
except Exception as e:
logger.error(f"Error removing cached token file: {e}")
else:
logger.debug("No token to invalidate")
def test_token_validity(self) -> bool:
"""
Test if the current token is valid with the Meta Graph API
Returns:
True if valid, False otherwise
"""
if not self.token_info or not self.token_info.access_token:
logger.debug("No token to test")
logger.error("TOKEN VALIDATION FAILED: Missing token to test")
return False
# Log token details for debugging (partial token for security)
masked_token = self.token_info.access_token[:5] + "..." + self.token_info.access_token[-5:] if self.token_info.access_token else "None"
token_type = self.token_info.token_type if hasattr(self.token_info, 'token_type') and self.token_info.token_type else "bearer"
logger.debug(f"Testing token validity (token: {masked_token}, type: {token_type})")
try:
# Make a simple request to the /me endpoint to test the token
META_GRAPH_API_VERSION = "v22.0"
url = f"https://graph.facebook.com/{META_GRAPH_API_VERSION}/me"
headers = {"Authorization": f"Bearer {self.token_info.access_token}"}
logger.debug(f"Testing token validity with request to {url}")
# Add timeout and better error handling
try:
response = requests.get(url, headers=headers, timeout=10)
except requests.exceptions.Timeout:
logger.error("TOKEN VALIDATION FAILED: Timeout while connecting to Meta API")
logger.error("The Graph API did not respond within 10 seconds")
return False
except requests.exceptions.ConnectionError:
logger.error("TOKEN VALIDATION FAILED: Connection error with Meta API")
logger.error("Could not establish connection to Graph API - check network connectivity")
return False
if response.status_code == 200:
data = response.json()
logger.debug(f"Token is valid. User ID: {data.get('id')}")
# Add more useful user information for debugging
user_info = f"User ID: {data.get('id')}"
if 'name' in data:
user_info += f", Name: {data.get('name')}"
logger.info(f"Meta API token validated successfully ({user_info})")
return True
else:
logger.error(f"TOKEN VALIDATION FAILED: API returned status {response.status_code}")
# Try to parse the error response for more detailed information
try:
error_data = response.json()
if 'error' in error_data:
error_obj = error_data.get('error', {})
error_code = error_obj.get('code', 'unknown')
error_message = error_obj.get('message', 'Unknown error')
logger.error(f"Meta API error: Code {error_code} - {error_message}")
# Add specific guidance for common error codes
if error_code == 190:
logger.error("Error indicates the token is invalid or has expired")
elif error_code == 4:
logger.error("Error indicates rate limiting - too many requests")
elif error_code == 200:
logger.error("Error indicates API permissions or configuration issue")
else:
logger.error(f"No error object in response: {error_data}")
except json.JSONDecodeError:
logger.error(f"Could not parse error response: {response.text[:200]}")
return False
except Exception as e:
logger.error(f"TOKEN VALIDATION FAILED: Unexpected error: {str(e)}")
# Add stack trace for debugging complex issues
import traceback
logger.error(f"Stack trace: {traceback.format_exc()}")
return False
# Create singleton instance
pipeboard_auth_manager = PipeboardAuthManager()
```