This is page 4 of 5. Use http://codebase.md/nictuku/meta-ads-mcp?page={x} to view the full context.
# Directory Structure
```
├── .github
│ └── workflows
│ ├── publish-mcp.yml
│ ├── publish.yml
│ └── test.yml
├── .gitignore
├── .python-version
├── .uv.toml
├── CUSTOM_META_APP.md
├── Dockerfile
├── examples
│ ├── example_http_client.py
│ └── README.md
├── future_improvements.md
├── images
│ └── meta-ads-example.png
├── LICENSE
├── LOCAL_INSTALLATION.md
├── meta_ads_auth.sh
├── meta_ads_mcp
│ ├── __init__.py
│ ├── __main__.py
│ └── core
│ ├── __init__.py
│ ├── accounts.py
│ ├── ads_library.py
│ ├── ads.py
│ ├── adsets.py
│ ├── api.py
│ ├── auth.py
│ ├── authentication.py
│ ├── budget_schedules.py
│ ├── callback_server.py
│ ├── campaigns.py
│ ├── duplication.py
│ ├── http_auth_integration.py
│ ├── insights.py
│ ├── openai_deep_research.py
│ ├── pipeboard_auth.py
│ ├── reports.py
│ ├── resources.py
│ ├── server.py
│ ├── targeting.py
│ └── utils.py
├── META_API_NOTES.md
├── poetry.lock
├── pyproject.toml
├── README.md
├── RELEASE.md
├── requirements.txt
├── server.json
├── setup.py
├── smithery.yaml
├── STREAMABLE_HTTP_SETUP.md
└── tests
├── __init__.py
├── conftest.py
├── e2e_account_info_search_issue.py
├── README_REGRESSION_TESTS.md
├── README.md
├── test_account_info_access_fix.py
├── test_account_search.py
├── test_budget_update_e2e.py
├── test_budget_update.py
├── test_create_ad_creative_simple.py
├── test_create_simple_creative_e2e.py
├── test_dsa_beneficiary.py
├── test_dsa_integration.py
├── test_duplication_regression.py
├── test_duplication.py
├── test_dynamic_creatives.py
├── test_estimate_audience_size_e2e.py
├── test_estimate_audience_size.py
├── test_get_account_pages.py
├── test_get_ad_creatives_fix.py
├── test_get_ad_image_quality_improvements.py
├── test_get_ad_image_regression.py
├── test_http_transport.py
├── test_insights_actions_and_values_e2e.py
├── test_insights_pagination.py
├── test_integration_openai_mcp.py
├── test_is_dynamic_creative_adset.py
├── test_mobile_app_adset_creation.py
├── test_mobile_app_adset_issue.py
├── test_openai_mcp_deep_research.py
├── test_openai.py
├── test_page_discovery_integration.py
├── test_page_discovery.py
├── test_targeting_search_e2e.py
├── test_targeting.py
├── test_update_ad_creative_id.py
└── test_upload_ad_image.py
```
# Files
--------------------------------------------------------------------------------
/meta_ads_mcp/core/targeting.py:
--------------------------------------------------------------------------------
```python
"""Targeting search functionality for Meta Ads API."""
import json
from typing import Optional, List, Dict, Any
import os
from .api import meta_api_tool, make_api_request
from .server import mcp_server
@mcp_server.tool()
@meta_api_tool
async def search_interests(query: str, access_token: Optional[str] = None, limit: int = 25) -> str:
"""
Search for interest targeting options by keyword.
Args:
query: Search term for interests (e.g., "baseball", "cooking", "travel")
access_token: Meta API access token (optional - will use cached token if not provided)
limit: Maximum number of results to return (default: 25)
Returns:
JSON string containing interest data with id, name, audience_size, and path fields
"""
if not query:
return json.dumps({"error": "No search query provided"}, indent=2)
endpoint = "search"
params = {
"type": "adinterest",
"q": query,
"limit": limit
}
data = await make_api_request(endpoint, access_token, params)
return json.dumps(data, indent=2)
@mcp_server.tool()
@meta_api_tool
async def get_interest_suggestions(interest_list: List[str], access_token: Optional[str] = None, limit: int = 25) -> str:
"""
Get interest suggestions based on existing interests.
Args:
interest_list: List of interest names to get suggestions for (e.g., ["Basketball", "Soccer"])
access_token: Meta API access token (optional - will use cached token if not provided)
limit: Maximum number of suggestions to return (default: 25)
Returns:
JSON string containing suggested interests with id, name, audience_size, and description fields
"""
if not interest_list:
return json.dumps({"error": "No interest list provided"}, indent=2)
endpoint = "search"
params = {
"type": "adinterestsuggestion",
"interest_list": json.dumps(interest_list),
"limit": limit
}
data = await make_api_request(endpoint, access_token, params)
return json.dumps(data, indent=2)
@mcp_server.tool()
@meta_api_tool
async def estimate_audience_size(
access_token: Optional[str] = None,
account_id: Optional[str] = None,
targeting: Optional[Dict[str, Any]] = None,
optimization_goal: str = "REACH",
# Backwards compatibility for simple interest validation
interest_list: Optional[List[str]] = None,
interest_fbid_list: Optional[List[str]] = None
) -> str:
"""
Estimate audience size for targeting specifications using Meta's delivery_estimate API.
This function provides comprehensive audience estimation for complex targeting combinations
including demographics, geography, interests, and behaviors. It also maintains backwards
compatibility for simple interest validation.
Args:
access_token: Meta API access token (optional - will use cached token if not provided)
account_id: Meta Ads account ID (format: act_XXXXXXXXX) - required for comprehensive estimation
targeting: Complete targeting specification including demographics, geography, interests, etc.
Example: {
"age_min": 25,
"age_max": 65,
"geo_locations": {"countries": ["PL"]},
"flexible_spec": [
{"interests": [{"id": "6003371567474"}]},
{"interests": [{"id": "6003462346642"}]}
]
}
optimization_goal: Optimization goal for estimation (default: "REACH").
Options: "REACH", "LINK_CLICKS", "IMPRESSIONS", "CONVERSIONS", etc.
interest_list: [DEPRECATED - for backwards compatibility] List of interest names to validate
interest_fbid_list: [DEPRECATED - for backwards compatibility] List of interest IDs to validate
Returns:
JSON string with audience estimation results including estimated_audience_size,
reach_estimate, and targeting validation
"""
# Handle backwards compatibility - simple interest validation
# Check if we're in backwards compatibility mode (interest params provided OR no comprehensive params)
is_backwards_compatible_call = (interest_list or interest_fbid_list) or (not account_id and not targeting)
if is_backwards_compatible_call and not targeting:
if not interest_list and not interest_fbid_list:
return json.dumps({"error": "No interest list or FBID list provided"}, indent=2)
endpoint = "search"
params = {
"type": "adinterestvalid"
}
if interest_list:
params["interest_list"] = json.dumps(interest_list)
if interest_fbid_list:
params["interest_fbid_list"] = json.dumps(interest_fbid_list)
data = await make_api_request(endpoint, access_token, params)
return json.dumps(data, indent=2)
# Comprehensive audience estimation using delivery_estimate API
if not account_id:
return json.dumps({
"error": "account_id is required for comprehensive audience estimation",
"details": "For simple interest validation, use interest_list or interest_fbid_list parameters"
}, indent=2)
if not targeting:
return json.dumps({
"error": "targeting specification is required for comprehensive audience estimation",
"example": {
"age_min": 25,
"age_max": 65,
"geo_locations": {"countries": ["US"]},
"flexible_spec": [
{"interests": [{"id": "6003371567474"}]}
]
}
}, indent=2)
# Preflight validation: require at least one location OR a custom audience
def _has_location_or_custom_audience(t: Dict[str, Any]) -> bool:
if not isinstance(t, dict):
return False
geo = t.get("geo_locations") or {}
if isinstance(geo, dict):
for key in [
"countries",
"regions",
"cities",
"zips",
"geo_markets",
"country_groups"
]:
val = geo.get(key)
if isinstance(val, list) and len(val) > 0:
return True
# Top-level custom audiences
ca = t.get("custom_audiences")
if isinstance(ca, list) and len(ca) > 0:
return True
# Custom audiences within flexible_spec
flex = t.get("flexible_spec")
if isinstance(flex, list):
for spec in flex:
if isinstance(spec, dict):
ca_spec = spec.get("custom_audiences")
if isinstance(ca_spec, list) and len(ca_spec) > 0:
return True
return False
if not _has_location_or_custom_audience(targeting):
return json.dumps({
"error": "Missing target audience location",
"details": "Select at least one location in targeting.geo_locations or include a custom audience.",
"action_required": "Add geo_locations with countries/regions/cities/zips or include custom_audiences.",
"example": {
"geo_locations": {"countries": ["US"]},
"age_min": 25,
"age_max": 65
}
}, indent=2)
# Build reach estimate request (using correct Meta API endpoint)
endpoint = f"{account_id}/reachestimate"
params = {
"targeting_spec": targeting
}
# Note: reachestimate endpoint doesn't support optimization_goal or objective parameters
try:
data = await make_api_request(endpoint, access_token, params, method="GET")
# Surface Graph API errors directly for better diagnostics.
# If reachestimate fails, optionally attempt a fallback using delivery_estimate.
if isinstance(data, dict) and "error" in data:
# Special handling for Missing Target Audience Location error (subcode 1885364)
try:
err_wrapper = data.get("error", {})
details_obj = err_wrapper.get("details", {})
raw_err = details_obj.get("error", {}) if isinstance(details_obj, dict) else {}
if (
isinstance(raw_err, dict) and (
raw_err.get("error_subcode") == 1885364 or
raw_err.get("error_user_title") == "Missing Target Audience Location"
)
):
return json.dumps({
"error": "Missing target audience location",
"details": raw_err.get("error_user_msg") or "Select at least one location, or choose a custom audience.",
"endpoint_used": f"{account_id}/reachestimate",
"action_required": "Add geo_locations with at least one of countries/regions/cities/zips or include custom_audiences.",
"blame_field_specs": raw_err.get("error_data", {}).get("blame_field_specs") if isinstance(raw_err.get("error_data"), dict) else None
}, indent=2)
except Exception:
pass
# Allow disabling fallback via environment variable
# Default: fallback disabled unless explicitly enabled by setting DISABLE flag to "0"
disable_fallback = os.environ.get("META_MCP_DISABLE_DELIVERY_FALLBACK", "1") == "1"
if disable_fallback:
return json.dumps({
"error": "Graph API returned an error for reachestimate",
"details": data.get("error"),
"endpoint_used": f"{account_id}/reachestimate",
"request_params": {
"has_targeting_spec": bool(targeting),
},
"note": "delivery_estimate fallback disabled via META_MCP_DISABLE_DELIVERY_FALLBACK"
}, indent=2)
# Try fallback to delivery_estimate endpoint
try:
fallback_endpoint = f"{account_id}/delivery_estimate"
fallback_params = {
"targeting_spec": json.dumps(targeting),
# Some API versions accept optimization_goal here
"optimization_goal": optimization_goal
}
fallback_data = await make_api_request(fallback_endpoint, access_token, fallback_params, method="GET")
# If fallback returns usable data, format similarly
if isinstance(fallback_data, dict) and "data" in fallback_data and len(fallback_data["data"]) > 0:
estimate_data = fallback_data["data"][0]
formatted_response = {
"success": True,
"account_id": account_id,
"targeting": targeting,
"optimization_goal": optimization_goal,
"estimated_audience_size": estimate_data.get("estimate_mau", 0),
"estimate_details": {
"monthly_active_users": estimate_data.get("estimate_mau", 0),
"daily_outcomes_curve": estimate_data.get("estimate_dau", []),
"bid_estimate": estimate_data.get("bid_estimates", {}),
"unsupported_targeting": estimate_data.get("unsupported_targeting", [])
},
"raw_response": fallback_data,
"fallback_endpoint_used": "delivery_estimate"
}
return json.dumps(formatted_response, indent=2)
# Fallback returned but not in expected format
return json.dumps({
"error": "Graph API returned an error for reachestimate; delivery_estimate fallback did not return usable data",
"reachestimate_error": data.get("error"),
"fallback_endpoint_used": "delivery_estimate",
"fallback_raw_response": fallback_data,
"endpoint_used": f"{account_id}/reachestimate",
"request_params": {
"has_targeting_spec": bool(targeting)
}
}, indent=2)
except Exception as _fallback_exc:
return json.dumps({
"error": "Graph API returned an error for reachestimate; delivery_estimate fallback also failed",
"reachestimate_error": data.get("error"),
"fallback_endpoint_used": "delivery_estimate",
"fallback_exception": str(_fallback_exc),
"endpoint_used": f"{account_id}/reachestimate",
"request_params": {
"has_targeting_spec": bool(targeting)
}
}, indent=2)
# Format the response for easier consumption
if "data" in data:
response_data = data["data"]
# Case 1: delivery_estimate-like list structure
if isinstance(response_data, list) and len(response_data) > 0:
estimate_data = response_data[0]
formatted_response = {
"success": True,
"account_id": account_id,
"targeting": targeting,
"optimization_goal": optimization_goal,
"estimated_audience_size": estimate_data.get("estimate_mau", 0),
"estimate_details": {
"monthly_active_users": estimate_data.get("estimate_mau", 0),
"daily_outcomes_curve": estimate_data.get("estimate_dau", []),
"bid_estimate": estimate_data.get("bid_estimates", {}),
"unsupported_targeting": estimate_data.get("unsupported_targeting", [])
},
"raw_response": data
}
return json.dumps(formatted_response, indent=2)
# Case 1b: explicit handling for empty list responses
if isinstance(response_data, list) and len(response_data) == 0:
return json.dumps({
"error": "No estimation data returned from Meta API",
"raw_response": data,
"debug_info": {
"response_keys": list(data.keys()) if isinstance(data, dict) else "not_a_dict",
"response_type": str(type(data)),
"endpoint_used": f"{account_id}/reachestimate"
}
}, indent=2)
# Case 2: reachestimate dict structure with bounds
if isinstance(response_data, dict):
lower = response_data.get("users_lower_bound", response_data.get("estimate_mau_lower_bound"))
upper = response_data.get("users_upper_bound", response_data.get("estimate_mau_upper_bound"))
estimate_ready = response_data.get("estimate_ready")
midpoint = None
try:
if isinstance(lower, (int, float)) and isinstance(upper, (int, float)):
midpoint = int((lower + upper) / 2)
except Exception:
midpoint = None
formatted_response = {
"success": True,
"account_id": account_id,
"targeting": targeting,
"optimization_goal": optimization_goal,
"estimated_audience_size": midpoint if midpoint is not None else 0,
"estimate_details": {
"users_lower_bound": lower,
"users_upper_bound": upper,
"estimate_ready": estimate_ready
},
"raw_response": data
}
return json.dumps(formatted_response, indent=2)
else:
return json.dumps({
"error": "No estimation data returned from Meta API",
"raw_response": data,
"debug_info": {
"response_keys": list(data.keys()) if isinstance(data, dict) else "not_a_dict",
"response_type": str(type(data)),
"endpoint_used": f"{account_id}/reachestimate"
}
}, indent=2)
except Exception as e:
# Try fallback to delivery_estimate first when an exception occurs (unless disabled)
# Default: fallback disabled unless explicitly enabled by setting DISABLE flag to "0"
disable_fallback = os.environ.get("META_MCP_DISABLE_DELIVERY_FALLBACK", "1") == "1"
if not disable_fallback:
try:
fallback_endpoint = f"{account_id}/delivery_estimate"
fallback_params = {
"targeting_spec": json.dumps(targeting) if isinstance(targeting, dict) else targeting,
"optimization_goal": optimization_goal
}
fallback_data = await make_api_request(fallback_endpoint, access_token, fallback_params, method="GET")
if isinstance(fallback_data, dict) and "data" in fallback_data and len(fallback_data["data"]) > 0:
estimate_data = fallback_data["data"][0]
formatted_response = {
"success": True,
"account_id": account_id,
"targeting": targeting,
"optimization_goal": optimization_goal,
"estimated_audience_size": estimate_data.get("estimate_mau", 0),
"estimate_details": {
"monthly_active_users": estimate_data.get("estimate_mau", 0),
"daily_outcomes_curve": estimate_data.get("estimate_dau", []),
"bid_estimate": estimate_data.get("bid_estimates", {}),
"unsupported_targeting": estimate_data.get("unsupported_targeting", [])
},
"raw_response": fallback_data,
"fallback_endpoint_used": "delivery_estimate"
}
return json.dumps(formatted_response, indent=2)
except Exception as _fallback_exc:
# If fallback also fails, proceed to detailed error handling below
pass
# Check if this is the specific Business Manager system user permission error
error_str = str(e)
if "100" in error_str and "33" in error_str:
# Try to provide fallback estimation using individual interests if available
interests_found = []
if targeting and "interests" in targeting:
interests_found.extend([interest.get("id") for interest in targeting["interests"] if interest.get("id")])
elif targeting and "flexible_spec" in targeting:
for spec in targeting["flexible_spec"]:
if "interests" in spec:
interests_found.extend([interest.get("id") for interest in spec["interests"] if interest.get("id")])
if interests_found:
# Attempt to get individual interest data as fallback
try:
fallback_result = await estimate_audience_size(
access_token=access_token,
interest_fbid_list=interests_found
)
fallback_data = json.loads(fallback_result)
return json.dumps({
"comprehensive_targeting_failed": True,
"error_code": "100-33",
"fallback_used": True,
"details": {
"issue": "reachestimate endpoint returned error - possibly due to targeting parameters or account limitations",
"solution": "Individual interest validation used as fallback - comprehensive targeting may have specific requirements",
"endpoint_used": f"{account_id}/reachestimate"
},
"individual_interest_data": fallback_data,
"note": "Individual interest audience sizes provided as fallback. Comprehensive targeting via reachestimate endpoint failed."
}, indent=2)
except:
pass
return json.dumps({
"error": "reachestimate endpoint returned error (previously was incorrectly using delivery_estimate)",
"error_code": "100-33",
"details": {
"issue": "The endpoint returned an error, possibly due to targeting parameters or account limitations",
"endpoint_used": f"{account_id}/reachestimate",
"previous_issue": "Code was previously using non-existent delivery_estimate endpoint - now fixed",
"available_alternative": "Use interest_list or interest_fbid_list parameters for individual interest validation"
},
"raw_error": error_str
}, indent=2)
else:
return json.dumps({
"error": f"Failed to get audience estimation from reachestimate endpoint: {str(e)}",
"details": "Check targeting parameters and account permissions",
"error_type": "general_api_error",
"endpoint_used": f"{account_id}/reachestimate"
}, indent=2)
@mcp_server.tool()
@meta_api_tool
async def search_behaviors(access_token: Optional[str] = None, limit: int = 50) -> str:
"""
Get all available behavior targeting options.
Args:
access_token: Meta API access token (optional - will use cached token if not provided)
limit: Maximum number of results to return (default: 50)
Returns:
JSON string containing behavior targeting options with id, name, audience_size bounds, path, and description
"""
endpoint = "search"
params = {
"type": "adTargetingCategory",
"class": "behaviors",
"limit": limit
}
data = await make_api_request(endpoint, access_token, params)
return json.dumps(data, indent=2)
@mcp_server.tool()
@meta_api_tool
async def search_demographics(access_token: Optional[str] = None, demographic_class: str = "demographics", limit: int = 50) -> str:
"""
Get demographic targeting options.
Args:
access_token: Meta API access token (optional - will use cached token if not provided)
demographic_class: Type of demographics to retrieve. Options: 'demographics', 'life_events',
'industries', 'income', 'family_statuses', 'user_device', 'user_os' (default: 'demographics')
limit: Maximum number of results to return (default: 50)
Returns:
JSON string containing demographic targeting options with id, name, audience_size bounds, path, and description
"""
endpoint = "search"
params = {
"type": "adTargetingCategory",
"class": demographic_class,
"limit": limit
}
data = await make_api_request(endpoint, access_token, params)
return json.dumps(data, indent=2)
@mcp_server.tool()
@meta_api_tool
async def search_geo_locations(query: str, access_token: Optional[str] = None,
location_types: Optional[List[str]] = None, limit: int = 25) -> str:
"""
Search for geographic targeting locations.
Args:
query: Search term for locations (e.g., "New York", "California", "Japan")
access_token: Meta API access token (optional - will use cached token if not provided)
location_types: Types of locations to search. Options: ['country', 'region', 'city', 'zip',
'geo_market', 'electoral_district']. If not specified, searches all types.
limit: Maximum number of results to return (default: 25)
Returns:
JSON string containing location data with key, name, type, and geographic hierarchy information
"""
if not query:
return json.dumps({"error": "No search query provided"}, indent=2)
endpoint = "search"
params = {
"type": "adgeolocation",
"q": query,
"limit": limit
}
if location_types:
params["location_types"] = json.dumps(location_types)
data = await make_api_request(endpoint, access_token, params)
return json.dumps(data, indent=2)
```
--------------------------------------------------------------------------------
/tests/test_estimate_audience_size_e2e.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
End-to-End Audience Estimation Test for Meta Ads MCP
This test validates that the new estimate_audience_size function correctly provides
comprehensive audience estimation and backwards compatibility for interest validation
through a pre-authenticated MCP server.
Usage:
1. Start the server: uv run python -m meta_ads_mcp --transport streamable-http --port 8080
2. Run test: uv run python tests/test_estimate_audience_size_e2e.py
Or with pytest (manual only):
uv run python -m pytest tests/test_estimate_audience_size_e2e.py -v -m e2e
Test scenarios:
1. Comprehensive audience estimation with complex targeting
2. Backwards compatibility with simple interest validation
3. Error handling for invalid parameters
4. Different optimization goals
"""
import pytest
import requests
import json
import os
import sys
from typing import Dict, Any, List
# Load environment variables from .env file
try:
from dotenv import load_dotenv
load_dotenv()
print("✅ Loaded environment variables from .env file")
except ImportError:
print("⚠️ python-dotenv not installed, using system environment variables only")
@pytest.mark.e2e
@pytest.mark.skip(reason="E2E test - run manually only")
class AudienceEstimationTester:
"""Test suite focused on audience estimation functionality"""
def __init__(self, base_url: str = "http://localhost:8080"):
self.base_url = base_url.rstrip('/')
self.endpoint = f"{self.base_url}/mcp/"
self.request_id = 1
# Default account ID from workspace rules
self.account_id = "act_701351919139047"
# Test targeting specifications
self.test_targeting_specs = {
"simple_demographics": {
"age_min": 25,
"age_max": 65,
"geo_locations": {"countries": ["US"]}
},
"demographics_with_interests": {
"age_min": 18,
"age_max": 35,
"geo_locations": {"countries": ["PL"]},
"flexible_spec": [
{"interests": [{"id": "6003371567474"}]} # Business interest
]
},
"complex_targeting": {
"age_min": 25,
"age_max": 55,
"geo_locations": {"countries": ["US"], "regions": [{"key": "3847"}]}, # California
"flexible_spec": [
{"interests": [{"id": "6003371567474"}, {"id": "6003462346642"}]}, # Business + Technology
{"behaviors": [{"id": "6007101597783"}]} # Business travelers
]
},
"mobile_app_targeting": {
"age_min": 18,
"age_max": 45,
"geo_locations": {"countries": ["US"]},
"user_device": ["mobile"],
"user_os": ["iOS", "Android"],
"flexible_spec": [
{"interests": [{"id": "6003139266461"}]} # Mobile games
]
}
}
# Test interest lists for backwards compatibility
self.test_interests = {
"valid_names": ["Japan", "Basketball", "Technology"],
"mixed_validity": ["Japan", "invalidinterestname12345", "Basketball"],
"valid_fbids": ["6003700426513", "6003397425735"], # Japan, Tennis
"invalid_fbids": ["999999999999", "000000000000"]
}
def _make_request(self, method: str, params: Dict[str, Any] = None,
headers: Dict[str, str] = None) -> Dict[str, Any]:
"""Make a JSON-RPC request to the MCP server"""
default_headers = {
"Content-Type": "application/json",
"Accept": "application/json, text/event-stream",
"User-Agent": "Audience-Estimation-Test-Client/1.0"
}
if headers:
default_headers.update(headers)
payload = {
"jsonrpc": "2.0",
"method": method,
"id": self.request_id
}
if params:
payload["params"] = params
try:
response = requests.post(
self.endpoint,
headers=default_headers,
json=payload,
timeout=20 # Increased timeout for delivery estimates
)
self.request_id += 1
return {
"status_code": response.status_code,
"headers": dict(response.headers),
"json": response.json() if response.status_code == 200 else None,
"text": response.text,
"success": response.status_code == 200
}
except requests.exceptions.RequestException as e:
return {
"status_code": 0,
"headers": {},
"json": None,
"text": str(e),
"success": False,
"error": str(e)
}
def _check_for_errors(self, parsed_content: Dict[str, Any]) -> Dict[str, Any]:
"""Properly handle both wrapped and direct error formats"""
# Check for data wrapper format first
if "data" in parsed_content:
data = parsed_content["data"]
# Handle case where data is already parsed (dict/list)
if isinstance(data, dict) and 'error' in data:
return {
"has_error": True,
"error_message": data['error'],
"error_details": data.get('details', ''),
"format": "wrapped_dict"
}
# Handle case where data is a JSON string that needs parsing
if isinstance(data, str):
try:
error_data = json.loads(data)
if 'error' in error_data:
return {
"has_error": True,
"error_message": error_data['error'],
"error_details": error_data.get('details', ''),
"format": "wrapped_json"
}
except json.JSONDecodeError:
# Data field exists but isn't valid JSON
pass
# Check for direct error format
if 'error' in parsed_content:
return {
"has_error": True,
"error_message": parsed_content['error'],
"error_details": parsed_content.get('details', ''),
"format": "direct"
}
return {"has_error": False}
def _extract_data(self, parsed_content: Dict[str, Any]) -> Any:
"""Extract successful response data from various wrapper formats"""
if "data" in parsed_content:
data = parsed_content["data"]
# Handle case where data is already parsed
if isinstance(data, (list, dict)):
return data
# Handle case where data is a JSON string
if isinstance(data, str):
try:
return json.loads(data)
except json.JSONDecodeError:
return None
# Handle direct format (data at top level)
if isinstance(parsed_content, (list, dict)):
return parsed_content
return None
def test_pl_only_reachestimate_bounds(self) -> Dict[str, Any]:
"""Verify PL-only reachestimate returns expected bounds and midpoint.
Prerequisite: Start server with fallback disabled so reachestimate is used directly.
Example:
export META_MCP_DISABLE_DELIVERY_FALLBACK=1
uv run python -m meta_ads_mcp --transport streamable-http --port 8080
"""
print(f"\n🇵🇱 Testing PL-only reachestimate bounds (fallback disabled)")
local_account_id = "act_3182643988557192"
targeting_spec = {"geo_locations": {"countries": ["PL"]}}
expected_lower = 18600000
expected_upper = 21900000
expected_midpoint = 20250000
result = self._make_request("tools/call", {
"name": "estimate_audience_size",
"arguments": {
"account_id": local_account_id,
"targeting": targeting_spec,
"optimization_goal": "REACH"
}
})
if not result["success"]:
print(f" ❌ Request failed: {result.get('text', 'Unknown error')}")
return {"success": False, "error": result.get("text", "Unknown error")}
response_data = result["json"]["result"]
content = response_data.get("content", [{}])[0].get("text", "")
try:
parsed_content = json.loads(content)
except json.JSONDecodeError:
print(f" ❌ Invalid JSON response")
return {"success": False, "error": "Invalid JSON"}
error_info = self._check_for_errors(parsed_content)
if error_info["has_error"]:
print(f" ❌ API Error: {error_info['error_message']}")
return {"success": False, "error": error_info["error_message"], "error_format": error_info["format"]}
if not parsed_content.get("success", False):
print(f" ❌ Response indicates failure but no error message found")
return {"success": False, "error": "Unexpected failure"}
details = parsed_content.get("estimate_details", {}) or {}
lower = details.get("users_lower_bound")
upper = details.get("users_upper_bound")
midpoint = parsed_content.get("estimated_audience_size")
fallback_used = parsed_content.get("fallback_endpoint_used")
ok = (
lower == expected_lower and
upper == expected_upper and
midpoint == expected_midpoint and
(fallback_used is None)
)
if ok:
print(f" ✅ Bounds: {lower:,}–{upper:,}; midpoint: {midpoint:,}")
return {
"success": True,
"users_lower_bound": lower,
"users_upper_bound": upper,
"midpoint": midpoint
}
else:
print(f" ❌ Unexpected values: lower={lower}, upper={upper}, midpoint={midpoint}, fallback={fallback_used}")
return {
"success": False,
"users_lower_bound": lower,
"users_upper_bound": upper,
"midpoint": midpoint,
"fallback_endpoint_used": fallback_used
}
def test_comprehensive_audience_estimation(self) -> Dict[str, Any]:
"""Test comprehensive audience estimation with complex targeting"""
print(f"\n🎯 Testing Comprehensive Audience Estimation")
results = {}
for spec_name, targeting_spec in self.test_targeting_specs.items():
print(f" 📊 Testing targeting: '{spec_name}'")
result = self._make_request("tools/call", {
"name": "estimate_audience_size",
"arguments": {
"account_id": self.account_id,
"targeting": targeting_spec,
"optimization_goal": "REACH"
}
})
if not result["success"]:
results[spec_name] = {
"success": False,
"error": result.get("text", "Unknown error")
}
print(f" ❌ Failed: {result.get('text', 'Unknown error')}")
continue
# Parse response
response_data = result["json"]["result"]
content = response_data.get("content", [{}])[0].get("text", "")
try:
parsed_content = json.loads(content)
# Check for errors using robust helper method
error_info = self._check_for_errors(parsed_content)
if error_info["has_error"]:
results[spec_name] = {
"success": False,
"error": error_info["error_message"],
"error_format": error_info["format"]
}
print(f" ❌ API Error: {error_info['error_message']}")
continue
# Check for expected fields in comprehensive estimation
has_success = parsed_content.get("success", False)
has_estimate = "estimated_audience_size" in parsed_content
has_details = "estimate_details" in parsed_content
results[spec_name] = {
"success": has_success and has_estimate,
"has_estimate": has_estimate,
"has_details": has_details,
"estimated_size": parsed_content.get("estimated_audience_size", 0),
"optimization_goal": parsed_content.get("optimization_goal"),
"raw_response": parsed_content
}
if has_success and has_estimate:
estimate_size = parsed_content.get("estimated_audience_size", 0)
print(f" ✅ Estimated audience: {estimate_size:,} people")
else:
print(f" ⚠️ Incomplete response: success={has_success}, estimate={has_estimate}")
except json.JSONDecodeError:
results[spec_name] = {
"success": False,
"error": "Invalid JSON response",
"raw_content": content
}
print(f" ❌ Invalid JSON: {content[:100]}...")
return results
def test_backwards_compatibility_interest_validation(self) -> Dict[str, Any]:
"""Test backwards compatibility with simple interest validation"""
print(f"\n🔄 Testing Backwards Compatibility (Interest Validation)")
results = {}
# Test with interest names
print(f" 📝 Testing interest name validation")
result = self._make_request("tools/call", {
"name": "estimate_audience_size",
"arguments": {
"interest_list": self.test_interests["mixed_validity"]
}
})
if result["success"]:
response_data = result["json"]["result"]
content = response_data.get("content", [{}])[0].get("text", "")
try:
parsed_content = json.loads(content)
# Check for errors first
error_info = self._check_for_errors(parsed_content)
if error_info["has_error"]:
results["interest_names"] = {
"success": False,
"error": error_info["error_message"],
"error_format": error_info["format"]
}
print(f" ❌ API Error: {error_info['error_message']}")
else:
# Extract data using robust helper method
validations = self._extract_data(parsed_content)
if validations and isinstance(validations, list):
results["interest_names"] = {
"success": True,
"count": len(validations),
"has_valid": any(v.get("valid", False) for v in validations),
"has_invalid": any(not v.get("valid", True) for v in validations),
"validations": validations
}
print(f" ✅ Validated {len(validations)} interests")
for validation in validations:
status = "✅" if validation.get("valid") else "❌"
print(f" {status} {validation.get('name', 'N/A')}")
else:
results["interest_names"] = {"success": False, "error": "No validation data"}
print(f" ❌ No validation data returned")
except json.JSONDecodeError:
results["interest_names"] = {"success": False, "error": "Invalid JSON"}
print(f" ❌ Invalid JSON response")
else:
results["interest_names"] = {"success": False, "error": result.get("text", "Request failed")}
print(f" ❌ Request failed: {result.get('text', 'Unknown error')}")
# Test with interest FBIDs
print(f" 🔢 Testing interest FBID validation")
result = self._make_request("tools/call", {
"name": "estimate_audience_size",
"arguments": {
"interest_fbid_list": self.test_interests["valid_fbids"]
}
})
if result["success"]:
response_data = result["json"]["result"]
content = response_data.get("content", [{}])[0].get("text", "")
try:
parsed_content = json.loads(content)
# Check for errors first
error_info = self._check_for_errors(parsed_content)
if error_info["has_error"]:
results["interest_fbids"] = {
"success": False,
"error": error_info["error_message"],
"error_format": error_info["format"]
}
print(f" ❌ API Error: {error_info['error_message']}")
else:
# Extract data using robust helper method
validations = self._extract_data(parsed_content)
if validations and isinstance(validations, list):
results["interest_fbids"] = {
"success": True,
"count": len(validations),
"all_valid": all(v.get("valid", False) for v in validations),
"validations": validations
}
print(f" ✅ Validated {len(validations)} FBID interests")
for validation in validations:
status = "✅" if validation.get("valid") else "❌"
print(f" {status} FBID: {validation.get('id', 'N/A')}")
else:
results["interest_fbids"] = {"success": False, "error": "No validation data"}
print(f" ❌ No validation data returned")
except json.JSONDecodeError:
results["interest_fbids"] = {"success": False, "error": "Invalid JSON"}
print(f" ❌ Invalid JSON response")
else:
results["interest_fbids"] = {"success": False, "error": result.get("text", "Request failed")}
print(f" ❌ Request failed: {result.get('text', 'Unknown error')}")
return results
def test_different_optimization_goals(self) -> Dict[str, Any]:
"""Test audience estimation with different optimization goals"""
print(f"\n🎯 Testing Different Optimization Goals")
results = {}
optimization_goals = ["REACH", "LINK_CLICKS", "CONVERSIONS", "APP_INSTALLS"]
base_targeting = self.test_targeting_specs["simple_demographics"]
for goal in optimization_goals:
print(f" 🎯 Testing optimization goal: '{goal}'")
result = self._make_request("tools/call", {
"name": "estimate_audience_size",
"arguments": {
"account_id": self.account_id,
"targeting": base_targeting,
"optimization_goal": goal
}
})
if result["success"]:
response_data = result["json"]["result"]
content = response_data.get("content", [{}])[0].get("text", "")
try:
parsed_content = json.loads(content)
# Check for errors first
error_info = self._check_for_errors(parsed_content)
if error_info["has_error"]:
results[goal] = {
"success": False,
"error": error_info["error_message"],
"error_format": error_info["format"]
}
print(f" ❌ {goal}: {error_info['error_message']}")
elif parsed_content.get("success", False):
results[goal] = {
"success": True,
"estimated_size": parsed_content.get("estimated_audience_size", 0),
"goal_used": parsed_content.get("optimization_goal")
}
estimate_size = parsed_content.get("estimated_audience_size", 0)
print(f" ✅ {goal}: {estimate_size:,} people")
else:
results[goal] = {
"success": False,
"error": "Response indicates failure but no error message found"
}
print(f" ❌ {goal}: Response indicates failure but no error message found")
except json.JSONDecodeError:
results[goal] = {"success": False, "error": "Invalid JSON"}
print(f" ❌ {goal}: Invalid JSON response")
else:
results[goal] = {"success": False, "error": result.get("text", "Request failed")}
print(f" ❌ {goal}: Request failed")
return results
def test_error_handling(self) -> Dict[str, Any]:
"""Test error handling for invalid parameters"""
print(f"\n⚠️ Testing Error Handling")
results = {}
# Test 1: No parameters
print(f" 🚫 Testing with no parameters")
result = self._make_request("tools/call", {
"name": "estimate_audience_size",
"arguments": {}
})
results["no_params"] = self._parse_error_response(result, "Should require targeting or interest validation")
# Test 2: Account ID without targeting
print(f" 🚫 Testing account ID without targeting")
result = self._make_request("tools/call", {
"name": "estimate_audience_size",
"arguments": {
"account_id": self.account_id
}
})
results["no_targeting"] = self._parse_error_response(result, "Should require targeting specification")
# Test 3: Invalid targeting structure
print(f" 🚫 Testing invalid targeting structure")
result = self._make_request("tools/call", {
"name": "estimate_audience_size",
"arguments": {
"account_id": self.account_id,
"targeting": {"invalid": "structure"}
}
})
results["invalid_targeting"] = self._parse_error_response(result, "Should handle invalid targeting")
# Test 4: Missing location in targeting (no geo_locations or custom audiences)
print(f" 🚫 Testing missing location in targeting")
result = self._make_request("tools/call", {
"name": "estimate_audience_size",
"arguments": {
"account_id": self.account_id,
# Interests present but no geo_locations and no custom_audiences
"targeting": {
"age_min": 18,
"age_max": 35,
"flexible_spec": [
{"interests": [{"id": "6003371567474"}]}
]
}
}
})
results["missing_location"] = self._parse_error_response(result, "Should require a location or custom audience")
return results
def _parse_error_response(self, result: Dict[str, Any], description: str) -> Dict[str, Any]:
"""Helper to parse and validate error responses"""
if not result["success"]:
print(f" ✅ {description}: Request failed as expected")
return {"success": True, "error_type": "request_failure"}
response_data = result["json"]["result"]
content = response_data.get("content", [{}])[0].get("text", "")
try:
parsed_content = json.loads(content)
# Use robust error checking helper method
error_info = self._check_for_errors(parsed_content)
if error_info["has_error"]:
print(f" ✅ {description}: {error_info['error_message']}")
return {
"success": True,
"error_message": error_info["error_message"],
"error_format": error_info["format"]
}
else:
print(f" ❌ {description}: No error returned when expected")
return {"success": False, "unexpected_success": True}
except json.JSONDecodeError:
print(f" ❌ {description}: Invalid JSON response")
return {"success": False, "error": "Invalid JSON"}
def run_audience_estimation_tests(self) -> bool:
"""Run comprehensive audience estimation tests"""
print("🚀 Meta Ads Audience Estimation End-to-End Test Suite")
print("="*70)
# Check server availability
try:
response = requests.get(f"{self.base_url}/", timeout=5)
server_running = response.status_code in [200, 404]
except:
server_running = False
if not server_running:
print("❌ Server is not running at", self.base_url)
print(" Please start the server with:")
print(" python3 -m meta_ads_mcp --transport streamable-http --port 8080")
return False
print("✅ Server is running")
print("🔐 Using implicit authentication from server")
print(f"🏢 Using account ID: {self.account_id}")
# Test 0: PL-only reachestimate bounds verification
print("\n" + "="*70)
print("📋 PHASE 0: PL-only reachestimate bounds verification (fallback disabled)")
print("="*70)
pl_only_results = self.test_pl_only_reachestimate_bounds()
pl_only_success = pl_only_results.get("success", False)
# Test 1: Comprehensive Audience Estimation
print("\n" + "="*70)
print("📋 PHASE 1: Testing Comprehensive Audience Estimation")
print("="*70)
comprehensive_results = self.test_comprehensive_audience_estimation()
comprehensive_success = any(
result.get("success") and result.get("estimated_size", 0) > 0
for result in comprehensive_results.values()
)
# Test 2: Backwards Compatibility
print("\n" + "="*70)
print("📋 PHASE 2: Testing Backwards Compatibility")
print("="*70)
compat_results = self.test_backwards_compatibility_interest_validation()
compat_success = (
compat_results.get("interest_names", {}).get("success", False) and
compat_results.get("interest_fbids", {}).get("success", False)
)
# Test 3: Different Optimization Goals
print("\n" + "="*70)
print("📋 PHASE 3: Testing Different Optimization Goals")
print("="*70)
goals_results = self.test_different_optimization_goals()
goals_success = any(
result.get("success") and result.get("estimated_size", 0) > 0
for result in goals_results.values()
)
# Test 4: Error Handling
print("\n" + "="*70)
print("📋 PHASE 4: Testing Error Handling")
print("="*70)
error_results = self.test_error_handling()
error_success = all(
result.get("success", False) for result in error_results.values()
)
# Final assessment
print("\n" + "="*70)
print("📊 FINAL RESULTS")
print("="*70)
all_tests = [
("PL-only Reachestimate Bounds", pl_only_success),
("Comprehensive Estimation", comprehensive_success),
("Backwards Compatibility", compat_success),
("Optimization Goals", goals_success),
("Error Handling", error_success)
]
passed_tests = sum(1 for _, success in all_tests if success)
total_tests = len(all_tests)
for test_name, success in all_tests:
status = "✅ PASSED" if success else "❌ FAILED"
print(f" • {test_name}: {status}")
overall_success = passed_tests >= 3 # At least 3 out of 4 tests should pass
if overall_success:
print(f"\n✅ Audience estimation tests: SUCCESS ({passed_tests}/{total_tests} passed)")
print(" • Comprehensive audience estimation is working")
print(" • Backwards compatibility is maintained")
print(" • Meta reachestimate API integration is functional")
return True
else:
print(f"\n❌ Audience estimation tests: FAILED ({passed_tests}/{total_tests} passed)")
print(" • Some audience estimation features are not working properly")
return False
def main():
"""Main test execution"""
tester = AudienceEstimationTester()
success = tester.run_audience_estimation_tests()
if success:
print("\n🎉 All audience estimation tests passed!")
else:
print("\n⚠️ Some audience estimation tests failed - see details above")
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()
```
--------------------------------------------------------------------------------
/tests/test_dsa_beneficiary.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Unit tests for DSA (Digital Services Act) beneficiary functionality in Meta Ads MCP.
This module tests the implementation of DSA beneficiary field support for ad set creation,
including detection of DSA requirements, parameter validation, and error handling.
"""
import pytest
import json
from unittest.mock import AsyncMock, patch, MagicMock
from meta_ads_mcp.core.adsets import create_adset, get_adset_details
from meta_ads_mcp.core.accounts import get_account_info
class TestDSABeneficiaryDetection:
"""Test cases for detecting DSA beneficiary requirements"""
@pytest.mark.asyncio
async def test_dsa_requirement_detection_business_account(self):
"""Test DSA requirement detection for European business accounts"""
mock_account_response = {
"id": "act_701351919139047",
"name": "Test European Business Account",
"account_status": 1,
"business_country_code": "DE", # Germany - DSA compliant
"business_city": "Berlin",
"currency": "EUR"
}
with patch('meta_ads_mcp.core.accounts.make_api_request', new_callable=AsyncMock) as mock_api:
with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
mock_auth.return_value = "test_access_token"
mock_api.return_value = mock_account_response
result = await get_account_info(account_id="act_701351919139047")
# Handle new return format (dictionary instead of JSON string)
if isinstance(result, dict):
result_data = result
else:
result_data = json.loads(result)
# Verify account info is retrieved
assert result_data["id"] == "act_701351919139047"
assert result_data["business_country_code"] == "DE"
# Verify DSA requirement detection
assert "dsa_required" in result_data or "business_country_code" in result_data
@pytest.mark.asyncio
async def test_dsa_requirement_detection_non_dsa_region(self):
"""Test detection for non-DSA compliant regions"""
mock_account_response = {
"id": "act_123456789",
"name": "Test US Account",
"account_status": 1,
"business_country_code": "US", # US - not DSA compliant
"business_city": "New York",
"currency": "USD"
}
with patch('meta_ads_mcp.core.accounts.make_api_request', new_callable=AsyncMock) as mock_api:
with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
mock_auth.return_value = "test_access_token"
mock_api.return_value = mock_account_response
result = await get_account_info(account_id="act_123456789")
# Handle new return format (dictionary instead of JSON string)
if isinstance(result, dict):
result_data = result
else:
result_data = json.loads(result)
# Verify no DSA requirement for US accounts
assert result_data["business_country_code"] == "US"
@pytest.mark.asyncio
async def test_dsa_requirement_detection_error_handling(self):
"""Test error handling when account info cannot be retrieved"""
with patch('meta_ads_mcp.core.accounts.make_api_request', new_callable=AsyncMock) as mock_api:
with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
mock_auth.return_value = "test_access_token"
mock_api.side_effect = Exception("API Error")
result = await get_account_info(account_id="act_invalid")
# Handle new return format (dictionary instead of JSON string)
if isinstance(result, dict):
result_data = result
else:
result_data = json.loads(result)
# Verify error is properly handled
assert "error" in result_data
@pytest.mark.asyncio
async def test_account_info_requires_account_id(self):
"""Test that get_account_info requires an account_id parameter"""
with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
mock_auth.return_value = "test_access_token"
# Test without account_id parameter
result = await get_account_info(account_id=None)
# Handle new return format (dictionary instead of JSON string)
if isinstance(result, dict):
result_data = result
else:
result_data = json.loads(result)
# Verify error message for missing account_id
assert "error" in result_data
assert "Account ID is required" in result_data["error"]["message"]
assert "Please specify an account_id parameter" in result_data["error"]["details"]
assert "example" in result_data["error"]
@pytest.mark.asyncio
async def test_account_info_inaccessible_account_error(self):
"""Test that get_account_info provides helpful error for inaccessible accounts"""
# Mock permission error for direct account access (first API call)
mock_permission_error = {
"error": {
"message": "Insufficient access privileges",
"type": "OAuthException",
"code": 200
}
}
# Mock accessible accounts response (second API call)
mock_accessible_accounts = {
"data": [
{"id": "act_123", "name": "Test Account 1"},
{"id": "act_456", "name": "Test Account 2"}
]
}
with patch('meta_ads_mcp.core.accounts.make_api_request', new_callable=AsyncMock) as mock_api:
with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
mock_auth.return_value = "test_access_token"
# First call returns permission error, second call returns accessible accounts
mock_api.side_effect = [mock_permission_error, mock_accessible_accounts]
result = await get_account_info(account_id="act_inaccessible")
# Handle new return format (dictionary instead of JSON string)
if isinstance(result, dict):
result_data = result
else:
result_data = json.loads(result)
# Verify helpful error message for inaccessible account
assert "error" in result_data
assert "not accessible to your user account" in result_data["error"]["message"]
assert "accessible_accounts" in result_data["error"]
assert "suggestion" in result_data["error"]
assert len(result_data["error"]["accessible_accounts"]) == 2
class TestDSABeneficiaryParameter:
"""Test cases for DSA beneficiary parameter support"""
@pytest.mark.asyncio
async def test_create_adset_with_dsa_beneficiary_success(self):
"""Test successful ad set creation with DSA beneficiary parameter"""
mock_response = {
"id": "23842588888640185",
"name": "Test Ad Set with DSA",
"status": "PAUSED",
"dsa_beneficiary": "Test Organization GmbH"
}
with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
mock_auth.return_value = "test_access_token"
mock_api.return_value = mock_response
result = await create_adset(
account_id="act_701351919139047",
campaign_id="23842588888640184",
name="Test Ad Set with DSA",
optimization_goal="LINK_CLICKS",
billing_event="IMPRESSIONS",
dsa_beneficiary="Test Organization GmbH"
)
# Verify the API was called with DSA beneficiary parameter
mock_api.assert_called_once()
call_args = mock_api.call_args
assert "dsa_beneficiary" in str(call_args)
# Verify response contains ad set ID
result_data = json.loads(result)
assert "id" in result_data
@pytest.mark.asyncio
async def test_create_adset_with_dsa_beneficiary_validation_error(self):
"""Test error handling when DSA beneficiary parameter is invalid"""
mock_error_response = {
"error": {
"message": "DSA beneficiary required for European compliance",
"type": "OAuthException",
"code": 100
}
}
with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
mock_auth.return_value = "test_access_token"
mock_api.side_effect = Exception("DSA beneficiary required for European compliance")
result = await create_adset(
account_id="act_701351919139047",
campaign_id="23842588888640184",
name="Test Ad Set",
optimization_goal="LINK_CLICKS",
billing_event="IMPRESSIONS"
# No DSA beneficiary provided
)
# Verify error message is clear and actionable
result_data = json.loads(result)
# Handle response wrapped in 'data' field by meta_api_tool decorator
if "data" in result_data:
actual_data = json.loads(result_data["data"])
else:
actual_data = result_data
assert "DSA beneficiary required" in actual_data.get("error", "")
@pytest.mark.asyncio
async def test_create_adset_without_dsa_beneficiary_dsa_required(self):
"""Test error when DSA beneficiary is required but not provided"""
with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
mock_auth.return_value = "test_access_token"
mock_api.side_effect = Exception("Enter the person or organization that benefits from ads in this ad set")
result = await create_adset(
account_id="act_701351919139047",
campaign_id="23842588888640184",
name="Test Ad Set",
optimization_goal="LINK_CLICKS",
billing_event="IMPRESSIONS"
# No DSA beneficiary provided
)
# Verify error message is clear and actionable
result_data = json.loads(result)
# Handle response wrapped in 'data' field by meta_api_tool decorator
if "data" in result_data:
actual_data = json.loads(result_data["data"])
else:
actual_data = result_data
assert "benefits from ads" in actual_data.get("error", "")
@pytest.mark.asyncio
async def test_create_adset_dsa_beneficiary_in_targeting(self):
"""Test that DSA beneficiary is not added to targeting spec"""
with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
mock_auth.return_value = "test_access_token"
mock_api.return_value = {"id": "23842588888640185"}
result = await create_adset(
account_id="act_701351919139047",
campaign_id="23842588888640184",
name="Test Ad Set",
optimization_goal="LINK_CLICKS",
billing_event="IMPRESSIONS",
targeting={"geo_locations": {"countries": ["DE"]}},
dsa_beneficiary="Test Organization GmbH"
)
# Verify the API was called
mock_api.assert_called_once()
call_args = mock_api.call_args
# Verify DSA beneficiary is sent as separate parameter, not in targeting
call_str = str(call_args)
assert "dsa_beneficiary" in call_str
assert "Test Organization GmbH" in call_str
@pytest.mark.asyncio
async def test_create_adset_dsa_beneficiary_parameter_formats(self):
"""Test different formats for DSA beneficiary parameter"""
test_cases = [
"Simple Organization",
"Organization with Special Chars: GmbH & Co. KG",
"Organization with Numbers: Test123 Inc.",
"Very Long Organization Name That Exceeds Normal Limits But Should Still Work"
]
for beneficiary_name in test_cases:
with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
mock_auth.return_value = "test_access_token"
mock_api.return_value = {"id": "23842588888640185"}
result = await create_adset(
account_id="act_701351919139047",
campaign_id="23842588888640184",
name="Test Ad Set",
optimization_goal="LINK_CLICKS",
billing_event="IMPRESSIONS",
dsa_beneficiary=beneficiary_name
)
# Verify the API was called with the beneficiary name
mock_api.assert_called_once()
call_args = mock_api.call_args
assert beneficiary_name in str(call_args)
class TestDSAPermissionHandling:
"""Test cases for permission-related DSA beneficiary issues"""
@pytest.mark.asyncio
async def test_dsa_beneficiary_missing_business_management_permission(self):
"""Test error handling when business_management permissions are missing"""
with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
mock_auth.return_value = "test_access_token"
mock_api.side_effect = Exception("Permission denied: business_management permission required")
result = await create_adset(
account_id="act_701351919139047",
campaign_id="23842588888640184",
name="Test Ad Set",
optimization_goal="LINK_CLICKS",
billing_event="IMPRESSIONS",
dsa_beneficiary="Test Organization GmbH"
)
# Verify permission error is handled
result_data = json.loads(result)
# Handle response wrapped in 'data' field by meta_api_tool decorator
if "data" in result_data:
actual_data = json.loads(result_data["data"])
else:
actual_data = result_data
assert "permission" in actual_data.get("error", "").lower()
@pytest.mark.asyncio
async def test_dsa_beneficiary_api_limitation_handling(self):
"""Test handling when API doesn't support dsa_beneficiary parameter"""
with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
mock_auth.return_value = "test_access_token"
mock_api.side_effect = Exception("Parameter dsa_beneficiary is not supported")
result = await create_adset(
account_id="act_701351919139047",
campaign_id="23842588888640184",
name="Test Ad Set",
optimization_goal="LINK_CLICKS",
billing_event="IMPRESSIONS",
dsa_beneficiary="Test Organization GmbH"
)
# Verify API limitation error is handled
result_data = json.loads(result)
# Handle response wrapped in 'data' field by meta_api_tool decorator
if "data" in result_data:
actual_data = json.loads(result_data["data"])
else:
actual_data = result_data
assert "not supported" in actual_data.get("error", "").lower()
class TestDSARegionalCompliance:
"""Test cases for regional DSA compliance"""
@pytest.mark.asyncio
async def test_dsa_compliance_european_regions(self):
"""Test DSA compliance for European regions"""
european_countries = ["DE", "FR", "IT", "ES", "NL", "BE", "AT", "IE", "DK", "SE", "FI", "NO"]
for country_code in european_countries:
mock_account_response = {
"id": f"act_{country_code.lower()}",
"name": f"Test {country_code} Account",
"account_status": 1,
"business_country_code": country_code,
"currency": "EUR"
}
with patch('meta_ads_mcp.core.accounts.make_api_request', new_callable=AsyncMock) as mock_api:
with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
mock_auth.return_value = "test_access_token"
mock_api.return_value = mock_account_response
result = await get_account_info(account_id=f"act_{country_code.lower()}")
result_data = json.loads(result)
# Verify European countries are detected as DSA compliant
assert result_data["business_country_code"] == country_code
@pytest.mark.asyncio
async def test_dsa_compliance_non_european_regions(self):
"""Test DSA compliance for non-European regions"""
non_european_countries = ["US", "CA", "AU", "JP", "BR", "IN"]
for country_code in non_european_countries:
mock_account_response = {
"id": f"act_{country_code.lower()}",
"name": f"Test {country_code} Account",
"account_status": 1,
"business_country_code": country_code,
"currency": "USD"
}
with patch('meta_ads_mcp.core.accounts.make_api_request', new_callable=AsyncMock) as mock_api:
with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
mock_auth.return_value = "test_access_token"
mock_api.return_value = mock_account_response
result = await get_account_info(account_id=f"act_{country_code.lower()}")
result_data = json.loads(result)
# Verify non-European countries are not DSA compliant
assert result_data["business_country_code"] == country_code
@pytest.mark.asyncio
async def test_dsa_beneficiary_validation_by_region(self):
"""Test DSA beneficiary validation based on region"""
# Test European account (should require DSA beneficiary)
european_mock_response = {
"id": "act_de",
"name": "German Account",
"business_country_code": "DE"
}
# Test US account (should not require DSA beneficiary)
us_mock_response = {
"id": "act_us",
"name": "US Account",
"business_country_code": "US"
}
# Test European account
with patch('meta_ads_mcp.core.accounts.make_api_request', new_callable=AsyncMock) as mock_api:
with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
mock_auth.return_value = "test_access_token"
mock_api.return_value = european_mock_response
result = await get_account_info(account_id="act_de")
result_data = json.loads(result)
assert result_data["business_country_code"] == "DE"
# Test US account
with patch('meta_ads_mcp.core.accounts.make_api_request', new_callable=AsyncMock) as mock_api:
with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
mock_auth.return_value = "test_access_token"
mock_api.return_value = us_mock_response
result = await get_account_info(account_id="act_us")
result_data = json.loads(result)
assert result_data["business_country_code"] == "US"
class TestDSAErrorHandling:
"""Test cases for comprehensive DSA error handling"""
@pytest.mark.asyncio
async def test_dsa_beneficiary_clear_error_message(self):
"""Test that DSA-related errors provide clear, actionable messages"""
error_scenarios = [
("DSA beneficiary required for European compliance", "DSA beneficiary"),
("Enter the person or organization that benefits from ads", "benefits from ads"),
("Permission denied: business_management required", "permission"),
("Parameter dsa_beneficiary is not supported", "not supported")
]
for error_message, expected_keyword in error_scenarios:
with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
mock_auth.return_value = "test_access_token"
mock_api.side_effect = Exception(error_message)
result = await create_adset(
account_id="act_701351919139047",
campaign_id="23842588888640184",
name="Test Ad Set",
optimization_goal="LINK_CLICKS",
billing_event="IMPRESSIONS"
)
# Verify error message contains expected keyword
result_data = json.loads(result)
# Handle response wrapped in 'data' field by meta_api_tool decorator
if "data" in result_data:
actual_data = json.loads(result_data["data"])
else:
actual_data = result_data
assert expected_keyword.lower() in actual_data.get("error", "").lower()
@pytest.mark.asyncio
async def test_dsa_beneficiary_fallback_behavior(self):
"""Test fallback behavior for unexpected DSA-related errors"""
with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
mock_auth.return_value = "test_access_token"
mock_api.side_effect = Exception("Unexpected DSA-related error")
result = await create_adset(
account_id="act_701351919139047",
campaign_id="23842588888640184",
name="Test Ad Set",
optimization_goal="LINK_CLICKS",
billing_event="IMPRESSIONS"
)
# Verify fallback error handling
result_data = json.loads(result)
# Handle response wrapped in 'data' field by meta_api_tool decorator
if "data" in result_data:
actual_data = json.loads(result_data["data"])
else:
actual_data = result_data
assert "error" in actual_data
class TestDSABeneficiaryRetrieval:
"""Test cases for retrieving DSA beneficiary information from ad sets"""
@pytest.mark.asyncio
async def test_get_adset_details_with_dsa_beneficiary(self):
"""Test retrieving ad set details that include DSA beneficiary field"""
mock_adset_response = {
"id": "120229746629010183",
"name": "Test Ad Set with DSA",
"campaign_id": "120229656904980183",
"status": "PAUSED",
"daily_budget": "1000",
"targeting": {
"geo_locations": {"countries": ["US"]},
"age_min": 25,
"age_max": 65
},
"bid_amount": 200,
"optimization_goal": "LINK_CLICKS",
"billing_event": "IMPRESSIONS",
"dsa_beneficiary": "Test Organization Inc"
}
with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
mock_auth.return_value = "test_access_token"
mock_api.return_value = mock_adset_response
result = await get_adset_details(adset_id="120229746629010183")
result_data = json.loads(result)
# Verify DSA beneficiary field is present and correct
assert "dsa_beneficiary" in result_data
assert result_data["dsa_beneficiary"] == "Test Organization Inc"
assert result_data["id"] == "120229746629010183"
@pytest.mark.asyncio
async def test_get_adset_details_without_dsa_beneficiary(self):
"""Test retrieving ad set details that don't have DSA beneficiary field"""
mock_adset_response = {
"id": "120229746624860183",
"name": "Test Ad Set without DSA",
"campaign_id": "120229656904980183",
"status": "PAUSED",
"daily_budget": "1000",
"targeting": {
"geo_locations": {"countries": ["US"]},
"age_min": 25,
"age_max": 65
},
"bid_amount": 200,
"optimization_goal": "LINK_CLICKS",
"billing_event": "IMPRESSIONS"
# No dsa_beneficiary field
}
with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
mock_auth.return_value = "test_access_token"
mock_api.return_value = mock_adset_response
result = await get_adset_details(adset_id="120229746624860183")
result_data = json.loads(result)
# Verify ad set details are retrieved correctly
assert result_data["id"] == "120229746624860183"
assert "dsa_beneficiary" not in result_data # Should not be present
@pytest.mark.asyncio
async def test_get_adset_details_empty_dsa_beneficiary(self):
"""Test retrieving ad set details with empty DSA beneficiary field"""
mock_adset_response = {
"id": "120229746629010183",
"name": "Test Ad Set with Empty DSA",
"campaign_id": "120229656904980183",
"status": "PAUSED",
"daily_budget": "1000",
"targeting": {
"geo_locations": {"countries": ["US"]}
},
"bid_amount": 200,
"optimization_goal": "LINK_CLICKS",
"billing_event": "IMPRESSIONS",
"dsa_beneficiary": "" # Empty string
}
with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
mock_auth.return_value = "test_access_token"
mock_api.return_value = mock_adset_response
result = await get_adset_details(adset_id="120229746629010183")
result_data = json.loads(result)
# Verify empty DSA beneficiary field is handled correctly
assert "dsa_beneficiary" in result_data
assert result_data["dsa_beneficiary"] == ""
@pytest.mark.asyncio
async def test_get_adset_details_dsa_beneficiary_field_requested(self):
"""Test that the API request includes dsa_beneficiary in the fields parameter"""
with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
mock_auth.return_value = "test_access_token"
mock_api.return_value = {"id": "120229746629010183"}
result = await get_adset_details(adset_id="120229746629010183")
# Verify the API was called with dsa_beneficiary in fields
mock_api.assert_called_once()
call_args = mock_api.call_args
assert "dsa_beneficiary" in str(call_args)
@pytest.mark.asyncio
async def test_get_adset_details_error_handling(self):
"""Test error handling when retrieving ad set details fails"""
with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
mock_auth.return_value = "test_access_token"
mock_api.side_effect = Exception("Ad set not found")
result = await get_adset_details(adset_id="invalid_adset_id")
# Handle response format - could be dict or JSON string
if isinstance(result, dict):
result_data = result
else:
result_data = json.loads(result)
# Verify error is properly handled
assert "error" in result_data
```
--------------------------------------------------------------------------------
/tests/test_budget_update_e2e.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
End-to-End Budget Update Test for Meta Ads MCP
This test validates that the budget update functionality correctly updates
ad set budgets through the Meta Ads API through a pre-authenticated MCP server.
Test functions:
- update_adset (with daily_budget parameter)
- update_adset (with lifetime_budget parameter)
- update_adset (with both budget types)
- Error handling for invalid budgets
- Budget update with other parameters
"""
import requests
import json
import os
import sys
import time
from typing import Dict, Any, List
# Load environment variables from .env file
try:
from dotenv import load_dotenv
load_dotenv()
print("✅ Loaded environment variables from .env file")
except ImportError:
print("⚠️ python-dotenv not installed, using system environment variables only")
class BudgetUpdateTester:
"""Test suite focused on budget update functionality"""
def __init__(self, base_url: str = "http://localhost:8080"):
self.base_url = base_url.rstrip('/')
self.endpoint = f"{self.base_url}/mcp/"
self.request_id = 1
# Test data for validation
self.test_budgets = {
"daily_budgets": ["5000", "10000", "25000"], # $50, $100, $250
"lifetime_budgets": ["50000", "100000", "250000"], # $500, $1000, $2500
"invalid_budgets": ["-1000", "0", "invalid_budget", "999999999999"]
}
# Test ad set IDs specifically created for budget testing
self.test_adset_ids = [
"120229734413930183",
"120229734413930183",
"120229734413930183",
"120229734413930183",
"120229734413930183",
"120229734413930183"
]
# Rate limiting tracking
self.rate_limit_hit = False
self.last_rate_limit_time = 0
def _wait_for_rate_limit(self, error_msg: str) -> bool:
"""Wait if we hit rate limiting, return True if we should retry"""
if "rate limit" in error_msg.lower() or "too many changes" in error_msg.lower():
if not self.rate_limit_hit:
print(f" ⏳ Rate limit hit! Waiting 1 hour before continuing...")
print(f" • Meta Ads API allows only 4 budget changes per hour")
print(f" • You can manually continue by pressing Enter when ready")
self.rate_limit_hit = True
self.last_rate_limit_time = time.time()
# Wait for user input or 1 hour
try:
input(" Press Enter when ready to continue (or wait 1 hour)...")
print(" ✅ Continuing with tests...")
return True
except KeyboardInterrupt:
print(" ❌ Test interrupted by user")
return False
else:
print(f" ⏳ Still rate limited, waiting...")
return False
return False
def _make_request(self, method: str, params: Dict[str, Any] = None,
headers: Dict[str, str] = None) -> Dict[str, Any]:
"""Make a JSON-RPC request to the MCP server"""
default_headers = {
"Content-Type": "application/json",
"Accept": "application/json, text/event-stream",
"User-Agent": "Budget-Update-Test-Client/1.0"
}
if headers:
default_headers.update(headers)
payload = {
"jsonrpc": "2.0",
"method": method,
"id": self.request_id
}
if params:
payload["params"] = params
try:
response = requests.post(
self.endpoint,
headers=default_headers,
json=payload,
timeout=15
)
self.request_id += 1
return {
"status_code": response.status_code,
"headers": dict(response.headers),
"json": response.json() if response.status_code == 200 else None,
"text": response.text,
"success": response.status_code == 200
}
except requests.exceptions.RequestException as e:
return {
"status_code": 0,
"headers": {},
"json": None,
"text": str(e),
"success": False,
"error": str(e)
}
def test_daily_budget_update(self) -> Dict[str, Any]:
"""Test daily budget update functionality"""
print(f"\n💰 Testing daily budget update function")
results = {}
for budget in self.test_budgets["daily_budgets"]:
print(f" 💰 Updating daily budget to: ${int(budget)/100:.2f}")
# Retry logic for rate limiting
max_retries = 3
for attempt in range(max_retries):
result = self._make_request("tools/call", {
"name": "update_adset",
"arguments": {
"adset_id": self.test_adset_ids[0],
"daily_budget": budget
}
})
if not result["success"]:
results[budget] = {
"success": False,
"error": result.get("text", "Unknown error")
}
print(f" ❌ Failed: {result.get('text', 'Unknown error')}")
break
# Parse response
response_data = result["json"]["result"]
content = response_data.get("content", [{}])[0].get("text", "")
try:
parsed_content = json.loads(content)
# Check for successful update indicators
has_id = "id" in parsed_content
has_daily_budget = "daily_budget" in parsed_content
has_success = "success" in parsed_content
has_error = "error" in parsed_content
# Handle rate limiting and API errors
if has_error:
error_msg = parsed_content.get("error", "")
if "rate limit" in error_msg.lower() or "too many changes" in error_msg.lower():
if attempt < max_retries - 1: # Don't retry on last attempt
if self._wait_for_rate_limit(error_msg):
print(f" 🔄 Retrying after rate limit...")
continue
else:
break
else:
results[budget] = {
"success": True, # Rate limiting is expected behavior
"has_success": False,
"has_error": True,
"rate_limited": True,
"error_message": error_msg
}
print(f" ⚠️ Rate limited (expected): {error_msg}")
break
else:
results[budget] = {
"success": False,
"has_error": True,
"error_message": error_msg
}
print(f" ❌ API Error: {error_msg}")
break
results[budget] = {
"success": True,
"has_id": has_id,
"has_daily_budget": has_daily_budget,
"has_success": has_success,
"updated_budget": parsed_content.get("daily_budget", "N/A"),
"adset_id": parsed_content.get("id", "N/A")
}
print(f" ✅ Updated daily budget to ${int(budget)/100:.2f}")
print(f" • Ad Set ID: {parsed_content.get('id', 'N/A')}")
print(f" • Success: {parsed_content.get('success', 'N/A')}")
print(f" • Raw Response: {parsed_content}")
# Note: Meta Ads API returns {"success": true} for updates
# The actual updated values can be verified by fetching ad set details
break # Success, exit retry loop
except json.JSONDecodeError:
results[budget] = {
"success": False,
"error": "Invalid JSON response",
"raw_content": content
}
print(f" ❌ Invalid JSON: {content}")
break
return results
def test_lifetime_budget_update(self) -> Dict[str, Any]:
"""Test lifetime budget update functionality"""
print(f"\n💰 Testing lifetime budget update function")
print(f" ⚠️ Note: Meta Ads API may reject lifetime budget updates if ad set has daily budget")
results = {}
for budget in self.test_budgets["lifetime_budgets"]:
print(f" 💰 Updating lifetime budget to: ${int(budget)/100:.2f}")
# Retry logic for rate limiting
max_retries = 3
for attempt in range(max_retries):
result = self._make_request("tools/call", {
"name": "update_adset",
"arguments": {
"adset_id": self.test_adset_ids[1],
"lifetime_budget": budget
}
})
if not result["success"]:
results[budget] = {
"success": False,
"error": result.get("text", "Unknown error")
}
print(f" ❌ Failed: {result.get('text', 'Unknown error')}")
break
# Parse response
response_data = result["json"]["result"]
content = response_data.get("content", [{}])[0].get("text", "")
try:
parsed_content = json.loads(content)
# Check for successful update indicators
has_id = "id" in parsed_content
has_lifetime_budget = "lifetime_budget" in parsed_content
has_success = "success" in parsed_content
has_error = "error" in parsed_content
# Handle rate limiting and API errors
if has_error:
error_msg = parsed_content.get("error", "")
if "rate limit" in error_msg.lower() or "too many changes" in error_msg.lower():
if attempt < max_retries - 1: # Don't retry on last attempt
if self._wait_for_rate_limit(error_msg):
print(f" 🔄 Retrying after rate limit...")
continue
else:
break
else:
results[budget] = {
"success": True, # Rate limiting is expected behavior
"has_success": False,
"has_error": True,
"rate_limited": True,
"error_message": error_msg
}
print(f" ⚠️ Rate limited (expected): {error_msg}")
break
elif "should be recurring budget" in error_msg.lower() or "cannot switch" in error_msg.lower():
results[budget] = {
"success": False,
"has_error": True,
"api_limitation": "Cannot switch from daily to lifetime budget",
"error_message": error_msg
}
print(f" ⚠️ API Limitation: {error_msg}")
break
else:
results[budget] = {
"success": False,
"has_error": True,
"error_message": error_msg
}
print(f" ❌ API Error: {error_msg}")
break
results[budget] = {
"success": True,
"has_id": has_id,
"has_lifetime_budget": has_lifetime_budget,
"has_success": has_success,
"updated_budget": parsed_content.get("lifetime_budget", "N/A"),
"adset_id": parsed_content.get("id", "N/A")
}
print(f" ✅ Updated lifetime budget to ${int(budget)/100:.2f}")
print(f" • Ad Set ID: {parsed_content.get('id', 'N/A')}")
print(f" • Success: {parsed_content.get('success', 'N/A')}")
# Note: Meta Ads API returns {"success": true} for updates
# The actual updated values can be verified by fetching ad set details
break # Success, exit retry loop
except json.JSONDecodeError:
results[budget] = {
"success": False,
"error": "Invalid JSON response",
"raw_content": content
}
print(f" ❌ Invalid JSON: {content}")
break
return results
def test_both_budget_types_update(self) -> Dict[str, Any]:
"""Test updating both daily and lifetime budget simultaneously"""
print(f"\n💰 Testing both budget types update function")
print(f" ⚠️ Note: Meta Ads API may reject this if ad set has existing daily budget")
daily_budget = "15000" # $150
lifetime_budget = "150000" # $1500
print(f" 💰 Updating both budgets - Daily: ${int(daily_budget)/100:.2f}, Lifetime: ${int(lifetime_budget)/100:.2f}")
result = self._make_request("tools/call", {
"name": "update_adset",
"arguments": {
"adset_id": self.test_adset_ids[2],
"daily_budget": daily_budget,
"lifetime_budget": lifetime_budget
}
})
if not result["success"]:
return {
"success": False,
"error": result.get("text", "Unknown error")
}
# Parse response
response_data = result["json"]["result"]
content = response_data.get("content", [{}])[0].get("text", "")
try:
parsed_content = json.loads(content)
if "error" in parsed_content:
error_msg = parsed_content.get("error", "")
if "rate limit" in error_msg.lower() or "too many changes" in error_msg.lower():
return {
"success": True, # Rate limiting is expected behavior
"rate_limited": True,
"error_message": error_msg
}
else:
return {
"success": False,
"error": error_msg,
"api_limitation": "Cannot have both daily and lifetime budgets"
}
# Check for successful update indicators
has_id = "id" in parsed_content
has_daily_budget = "daily_budget" in parsed_content
has_lifetime_budget = "lifetime_budget" in parsed_content
has_success = "success" in parsed_content
result_data = {
"success": True,
"has_id": has_id,
"has_daily_budget": has_daily_budget,
"has_lifetime_budget": has_lifetime_budget,
"has_success": has_success,
"daily_budget": parsed_content.get("daily_budget", "N/A"),
"lifetime_budget": parsed_content.get("lifetime_budget", "N/A"),
"adset_id": parsed_content.get("id", "N/A")
}
print(f" ✅ Updated both budgets successfully")
print(f" • Ad Set ID: {parsed_content.get('id', 'N/A')}")
print(f" • Success: {parsed_content.get('success', 'N/A')}")
# Note: Meta Ads API returns {"success": true} for updates
# The actual updated values can be verified by fetching ad set details
return result_data
except json.JSONDecodeError:
return {
"success": False,
"error": "Invalid JSON response",
"raw_content": content
}
def test_budget_update_with_other_parameters(self) -> Dict[str, Any]:
"""Test budget update combined with other parameters"""
print(f"\n💰 Testing budget update with other parameters")
result = self._make_request("tools/call", {
"name": "update_adset",
"arguments": {
"adset_id": self.test_adset_ids[3],
"daily_budget": "7500", # $75
"status": "PAUSED",
"bid_amount": 1000,
"bid_strategy": "LOWEST_COST_WITH_BID_CAP"
}
})
if not result["success"]:
return {
"success": False,
"error": result.get("text", "Unknown error")
}
# Parse response
response_data = result["json"]["result"]
content = response_data.get("content", [{}])[0].get("text", "")
try:
parsed_content = json.loads(content)
if "error" in parsed_content:
error_msg = parsed_content.get("error", "")
if "rate limit" in error_msg.lower() or "too many changes" in error_msg.lower():
return {
"success": True, # Rate limiting is expected behavior
"rate_limited": True,
"error_message": error_msg
}
else:
return {
"success": False,
"error": error_msg
}
# Check for successful update indicators
has_id = "id" in parsed_content
has_daily_budget = "daily_budget" in parsed_content
has_status = "status" in parsed_content
has_success = "success" in parsed_content
result_data = {
"success": True,
"has_id": has_id,
"has_daily_budget": has_daily_budget,
"has_status": has_status,
"has_success": has_success,
"daily_budget": parsed_content.get("daily_budget", "N/A"),
"status": parsed_content.get("status", "N/A"),
"adset_id": parsed_content.get("id", "N/A")
}
print(f" ✅ Updated budget with other parameters successfully")
print(f" • Ad Set ID: {parsed_content.get('id', 'N/A')}")
print(f" • Success: {parsed_content.get('success', 'N/A')}")
# Note: Meta Ads API returns {"success": true} for updates
# The actual updated values can be verified by fetching ad set details
return result_data
except json.JSONDecodeError:
return {
"success": False,
"error": "Invalid JSON response",
"raw_content": content
}
def test_invalid_budget_handling(self) -> Dict[str, Any]:
"""Test error handling for invalid budget values"""
print(f"\n💰 Testing invalid budget handling")
results = {}
for invalid_budget in self.test_budgets["invalid_budgets"]:
print(f" 💰 Testing invalid budget: '{invalid_budget}'")
result = self._make_request("tools/call", {
"name": "update_adset",
"arguments": {
"adset_id": self.test_adset_ids[4],
"daily_budget": invalid_budget
}
})
if not result["success"]:
results[invalid_budget] = {
"success": False,
"error": result.get("text", "Unknown error")
}
print(f" ❌ Request failed: {result.get('text', 'Unknown error')}")
continue
# Parse response
response_data = result["json"]["result"]
content = response_data.get("content", [{}])[0].get("text", "")
try:
parsed_content = json.loads(content)
# For invalid budgets, we expect an error response
has_error = "error" in parsed_content or "data" in parsed_content
has_details = "details" in parsed_content
# Check if the error is a proper validation error (not a rate limit or other issue)
error_msg = parsed_content.get("error", "")
if not error_msg and "data" in parsed_content:
try:
data_content = json.loads(parsed_content.get("data", ""))
if "error" in data_content:
error_msg = data_content["error"].get("message", "")
except:
pass
is_validation_error = any(keyword in error_msg.lower() for keyword in [
"must be a number", "greater than or equal to 0", "too high", "too low", "invalid parameter",
"budget is too low", "budget is too high", "decrease your ad set budget"
])
results[invalid_budget] = {
"success": has_error and is_validation_error, # Success if we got proper validation error
"has_error": has_error,
"has_details": has_details,
"is_validation_error": is_validation_error,
"error_message": error_msg or parsed_content.get("error", "No error field"),
"details": parsed_content.get("details", "No details field")
}
if has_error and is_validation_error:
print(f" ✅ Properly handled invalid budget '{invalid_budget}'")
print(f" • Error: {parsed_content.get('error', 'N/A')}")
elif has_error:
print(f" ⚠️ Got error but not validation error for '{invalid_budget}'")
print(f" • Error: {parsed_content.get('error', 'N/A')}")
else:
print(f" ❌ Unexpected success for invalid budget '{invalid_budget}'")
print(f" • Response: {parsed_content}")
except json.JSONDecodeError:
results[invalid_budget] = {
"success": False,
"error": "Invalid JSON response",
"raw_content": content
}
print(f" ❌ Invalid JSON: {content}")
return results
def test_budget_update_with_targeting(self) -> Dict[str, Any]:
"""Test budget update combined with targeting update"""
print(f"\n💰 Testing budget update with targeting")
targeting = {
"age_min": 25,
"age_max": 45,
"geo_locations": {"countries": ["US", "CA"]}
}
result = self._make_request("tools/call", {
"name": "update_adset",
"arguments": {
"adset_id": self.test_adset_ids[5],
"daily_budget": "8500", # $85
"targeting": targeting
}
})
if not result["success"]:
return {
"success": False,
"error": result.get("text", "Unknown error")
}
# Parse response
response_data = result["json"]["result"]
content = response_data.get("content", [{}])[0].get("text", "")
try:
parsed_content = json.loads(content)
if "error" in parsed_content:
error_msg = parsed_content.get("error", "")
if "rate limit" in error_msg.lower() or "too many changes" in error_msg.lower():
return {
"success": True, # Rate limiting is expected behavior
"rate_limited": True,
"error_message": error_msg
}
else:
return {
"success": False,
"error": error_msg
}
# Check for successful update indicators
has_id = "id" in parsed_content
has_daily_budget = "daily_budget" in parsed_content
has_success = "success" in parsed_content
result_data = {
"success": True,
"has_id": has_id,
"has_daily_budget": has_daily_budget,
"has_success": has_success,
"daily_budget": parsed_content.get("daily_budget", "N/A"),
"adset_id": parsed_content.get("id", "N/A")
}
print(f" ✅ Updated budget with targeting successfully")
print(f" • Ad Set ID: {parsed_content.get('id', 'N/A')}")
print(f" • Success: {parsed_content.get('success', 'N/A')}")
# Note: Meta Ads API returns {"success": true} for updates
# The actual updated values can be verified by fetching ad set details
return result_data
except json.JSONDecodeError:
return {
"success": False,
"error": "Invalid JSON response",
"raw_content": content
}
def run_budget_update_tests(self) -> bool:
"""Run comprehensive budget update tests"""
print("🚀 Meta Ads Budget Update End-to-End Test Suite")
print("="*60)
# Check server availability
try:
response = requests.get(f"{self.base_url}/", timeout=5)
server_running = response.status_code in [200, 404]
except:
server_running = False
if not server_running:
print("❌ Server is not running at", self.base_url)
print(" Please start the server with:")
print(" python3 -m meta_ads_mcp --transport streamable-http --port 8080")
return False
print("✅ Server is running")
print("🔐 Using implicit authentication from server")
print("⚠️ Note: This test uses ad sets specifically created for budget testing")
print("⚠️ Note: Campaign uses ad set level budgets - testing budget updates at ad set level")
print("⚠️ Note: Meta Ads API allows only 4 budget changes per hour - test will wait if rate limited")
# Test 1: Daily Budget Updates
print("\n" + "="*60)
print("📋 PHASE 1: Testing Daily Budget Updates")
print("="*60)
daily_results = self.test_daily_budget_update()
daily_success = any(
result.get("success") or
(result.get("success") and result.get("rate_limited"))
for result in daily_results.values()
)
# Test 2: Lifetime Budget Updates
print("\n" + "="*60)
print("📋 PHASE 2: Testing Lifetime Budget Updates")
print("="*60)
lifetime_results = self.test_lifetime_budget_update()
lifetime_success = any(
result.get("success") or
(result.get("success") and result.get("rate_limited")) or
(not result.get("success") and result.get("api_limitation"))
for result in lifetime_results.values()
)
# Test 3: Both Budget Types
print("\n" + "="*60)
print("📋 PHASE 3: Testing Both Budget Types")
print("="*60)
both_budgets_result = self.test_both_budget_types_update()
both_budgets_success = (both_budgets_result.get("success") or
(not both_budgets_result.get("success") and
both_budgets_result.get("rate_limited")) or
(not both_budgets_result.get("success") and
both_budgets_result.get("api_limitation")))
# Test 4: Budget with Other Parameters
print("\n" + "="*60)
print("📋 PHASE 4: Testing Budget with Other Parameters")
print("="*60)
other_params_result = self.test_budget_update_with_other_parameters()
other_params_success = (other_params_result.get("success") or
(other_params_result.get("success") and
other_params_result.get("rate_limited")))
# Test 5: Invalid Budget Handling
print("\n" + "="*60)
print("📋 PHASE 5: Testing Invalid Budget Handling")
print("="*60)
invalid_results = self.test_invalid_budget_handling()
invalid_success = any(
result.get("success") and result.get("is_validation_error")
for result in invalid_results.values()
)
# Test 6: Budget with Targeting
print("\n" + "="*60)
print("📋 PHASE 6: Testing Budget with Targeting")
print("="*60)
targeting_result = self.test_budget_update_with_targeting()
targeting_success = (targeting_result.get("success") or
(targeting_result.get("success") and
targeting_result.get("rate_limited")))
# Final assessment
print("\n" + "="*60)
print("📊 FINAL RESULTS")
print("="*60)
all_tests = [
("Daily Budget Updates", daily_success),
("Lifetime Budget Updates", lifetime_success),
("Both Budget Types", both_budgets_success),
("Budget with Other Parameters", other_params_success),
("Invalid Budget Handling", invalid_success),
("Budget with Targeting", targeting_success)
]
passed_tests = sum(1 for _, success in all_tests if success)
total_tests = len(all_tests)
for test_name, success in all_tests:
status = "✅ PASSED" if success else "❌ FAILED"
print(f" • {test_name}: {status}")
overall_success = passed_tests >= 4 # At least 4 out of 6 tests should pass
if overall_success:
print(f"\n✅ Budget update tests: SUCCESS ({passed_tests}/{total_tests} passed)")
print(" • Core budget update functionality is working")
print(" • Meta Ads API integration is functional")
print(" • Error handling is working properly")
return True
else:
print(f"\n❌ Budget update tests: FAILED ({passed_tests}/{total_tests} passed)")
print(" • Some budget update functions are not working properly")
print(" • Check API permissions and ad set IDs")
return False
def main():
"""Main test execution"""
tester = BudgetUpdateTester()
success = tester.run_budget_update_tests()
if success:
print("\n🎉 All budget update tests passed!")
else:
print("\n⚠️ Some budget update tests failed - see details above")
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()
```
--------------------------------------------------------------------------------
/tests/test_dynamic_creatives.py:
--------------------------------------------------------------------------------
```python
"""Tests for dynamic creative features including multiple headlines and descriptions.
Tests for the enhanced create_ad_creative function that supports:
- Multiple headlines
- Multiple descriptions
- Dynamic creative optimization settings
- Creative update functionality
"""
import pytest
import json
from unittest.mock import AsyncMock, patch
from meta_ads_mcp.core.ads import create_ad_creative, update_ad_creative
@pytest.mark.asyncio
class TestDynamicCreatives:
"""Test cases for dynamic creative features."""
async def test_create_ad_creative_single_headline(self):
"""Test creating ad creative with single headline (simple case)."""
sample_creative_data = {
"id": "123456789",
"name": "Test Creative",
"status": "ACTIVE"
}
with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
mock_api.return_value = sample_creative_data
result = await create_ad_creative(
access_token="test_token",
account_id="act_123456789",
name="Test Creative",
image_hash="abc123",
page_id="987654321",
link_url="https://example.com",
message="Test message",
headline="Single Headline",
call_to_action_type="LEARN_MORE"
)
result_data = json.loads(result)
assert result_data["success"] is True
# Verify the API call was made with single headline in object_story_spec
call_args_list = mock_api.call_args_list
assert len(call_args_list) >= 1
# First call should be the creative creation
first_call = call_args_list[0]
creative_data = first_call[0][2] # params is the third argument
# Should use object_story_spec with link_data for simple creatives (not asset_feed_spec)
assert "object_story_spec" in creative_data
assert "link_data" in creative_data["object_story_spec"]
assert creative_data["object_story_spec"]["link_data"]["name"] == "Single Headline"
assert "asset_feed_spec" not in creative_data
async def test_create_ad_creative_single_description(self):
"""Test creating ad creative with single description (simple case)."""
sample_creative_data = {
"id": "123456789",
"name": "Test Creative",
"status": "ACTIVE"
}
with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
mock_api.return_value = sample_creative_data
result = await create_ad_creative(
access_token="test_token",
account_id="act_123456789",
name="Test Creative",
image_hash="abc123",
page_id="987654321",
link_url="https://example.com",
message="Test message",
description="Single Description",
call_to_action_type="LEARN_MORE"
)
result_data = json.loads(result)
assert result_data["success"] is True
# Verify the API call was made with single description in object_story_spec
call_args_list = mock_api.call_args_list
assert len(call_args_list) >= 1
# First call should be the creative creation
first_call = call_args_list[0]
creative_data = first_call[0][2] # params is the third argument
# Should use object_story_spec with link_data for simple creatives (not asset_feed_spec)
assert "object_story_spec" in creative_data
assert "link_data" in creative_data["object_story_spec"]
assert creative_data["object_story_spec"]["link_data"]["description"] == "Single Description"
assert "asset_feed_spec" not in creative_data
async def test_create_ad_creative_cannot_mix_headline_and_headlines(self):
"""Test that mixing headline and headlines parameters raises error."""
result = await create_ad_creative(
access_token="test_token",
account_id="act_123456789",
name="Test Creative",
image_hash="abc123",
page_id="987654321",
link_url="https://example.com",
message="Test message",
headline="Single Headline",
headlines=["Headline 1", "Headline 2"],
call_to_action_type="LEARN_MORE"
)
result_data = json.loads(result)
# Check if error is wrapped in "data" field (MCP error response format)
if "data" in result_data:
error_data = json.loads(result_data["data"])
assert "error" in error_data
assert "Cannot specify both 'headline' and 'headlines'" in error_data["error"]
else:
assert "error" in result_data
assert "Cannot specify both 'headline' and 'headlines'" in result_data["error"]
async def test_create_ad_creative_cannot_mix_description_and_descriptions(self):
"""Test that mixing description and descriptions parameters raises error."""
result = await create_ad_creative(
access_token="test_token",
account_id="act_123456789",
name="Test Creative",
image_hash="abc123",
page_id="987654321",
link_url="https://example.com",
message="Test message",
description="Single Description",
descriptions=["Description 1", "Description 2"],
call_to_action_type="LEARN_MORE"
)
result_data = json.loads(result)
# Check if error is wrapped in "data" field (MCP error response format)
if "data" in result_data:
error_data = json.loads(result_data["data"])
assert "error" in error_data
assert "Cannot specify both 'description' and 'descriptions'" in error_data["error"]
else:
assert "error" in result_data
assert "Cannot specify both 'description' and 'descriptions'" in result_data["error"]
async def test_create_ad_creative_multiple_headlines(self):
"""Test creating ad creative with multiple headlines."""
sample_creative_data = {
"id": "123456789",
"name": "Test Creative",
"status": "ACTIVE"
}
with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
mock_api.return_value = sample_creative_data
result = await create_ad_creative(
access_token="test_token",
account_id="act_123456789",
name="Test Creative",
image_hash="abc123",
page_id="987654321",
link_url="https://example.com",
message="Test message",
headlines=["Headline 1", "Headline 2", "Headline 3"],
call_to_action_type="LEARN_MORE"
)
result_data = json.loads(result)
assert result_data["success"] is True
# Verify the API call was made with multiple headlines
# We need to check the first call (creative creation), not the second call (details fetch)
call_args_list = mock_api.call_args_list
assert len(call_args_list) >= 1
# First call should be the creative creation
first_call = call_args_list[0]
creative_data = first_call[0][2] # params is the third argument
# Should use asset_feed_spec with headlines array format
assert "asset_feed_spec" in creative_data
assert "headlines" in creative_data["asset_feed_spec"]
assert creative_data["asset_feed_spec"]["headlines"] == [
{"text": "Headline 1"},
{"text": "Headline 2"},
{"text": "Headline 3"}
]
async def test_create_ad_creative_multiple_descriptions(self):
"""Test creating ad creative with multiple descriptions."""
sample_creative_data = {
"id": "123456789",
"name": "Test Creative",
"status": "ACTIVE"
}
with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
mock_api.return_value = sample_creative_data
result = await create_ad_creative(
access_token="test_token",
account_id="act_123456789",
name="Test Creative",
image_hash="abc123",
page_id="987654321",
link_url="https://example.com",
message="Test message",
descriptions=["Description 1", "Description 2"],
call_to_action_type="LEARN_MORE"
)
result_data = json.loads(result)
assert result_data["success"] is True
# Verify the API call was made with multiple descriptions
# We need to check the first call (creative creation), not the second call (details fetch)
call_args_list = mock_api.call_args_list
assert len(call_args_list) >= 1
# First call should be the creative creation
first_call = call_args_list[0]
creative_data = first_call[0][2] # params is the third argument
# Should use asset_feed_spec with descriptions array format
assert "asset_feed_spec" in creative_data
assert "descriptions" in creative_data["asset_feed_spec"]
assert creative_data["asset_feed_spec"]["descriptions"] == [
{"text": "Description 1"},
{"text": "Description 2"}
]
async def test_create_ad_creative_dynamic_creative_spec(self):
"""Test creating ad creative with dynamic_creative_spec optimization settings."""
sample_creative_data = {
"id": "123456789",
"name": "Test Creative",
"status": "ACTIVE"
}
with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
mock_api.return_value = sample_creative_data
result = await create_ad_creative(
access_token="test_token",
account_id="act_123456789",
name="Test Creative",
image_hash="abc123",
page_id="987654321",
link_url="https://example.com",
message="Test message",
headlines=["Headline 1", "Headline 2"],
descriptions=["Description 1", "Description 2"],
dynamic_creative_spec={
"headline_optimization": True,
"description_optimization": True
},
call_to_action_type="LEARN_MORE"
)
result_data = json.loads(result)
assert result_data["success"] is True
# Verify the API call was made with dynamic_creative_spec
# We need to check the first call (creative creation), not the second call (details fetch)
call_args_list = mock_api.call_args_list
assert len(call_args_list) >= 1
# First call should be the creative creation
first_call = call_args_list[0]
creative_data = first_call[0][2] # params is the third argument
# Should include dynamic_creative_spec
assert "dynamic_creative_spec" in creative_data
assert creative_data["dynamic_creative_spec"]["headline_optimization"] is True
assert creative_data["dynamic_creative_spec"]["description_optimization"] is True
async def test_create_ad_creative_multiple_headlines_and_descriptions(self):
"""Test creating ad creative with both multiple headlines and descriptions."""
sample_creative_data = {
"id": "123456789",
"name": "Test Creative",
"status": "ACTIVE"
}
with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
mock_api.return_value = sample_creative_data
result = await create_ad_creative(
access_token="test_token",
account_id="act_123456789",
name="Test Creative",
image_hash="abc123",
page_id="987654321",
link_url="https://example.com",
message="Test message",
headlines=["Headline 1", "Headline 2", "Headline 3"],
descriptions=["Description 1", "Description 2"],
dynamic_creative_spec={
"headline_optimization": True,
"description_optimization": True
},
call_to_action_type="LEARN_MORE"
)
result_data = json.loads(result)
assert result_data["success"] is True
# Verify the API call was made with both multiple headlines and descriptions
# We need to check the first call (creative creation), not the second call (details fetch)
call_args_list = mock_api.call_args_list
assert len(call_args_list) >= 1
# First call should be the creative creation
first_call = call_args_list[0]
creative_data = first_call[0][2] # params is the third argument
assert "asset_feed_spec" in creative_data
assert "headlines" in creative_data["asset_feed_spec"]
assert "descriptions" in creative_data["asset_feed_spec"]
assert "dynamic_creative_spec" in creative_data
async def test_create_ad_creative_validation_max_headlines(self):
"""Test validation for maximum number of headlines."""
# Create list with more than 5 headlines (assuming 5 is the limit)
too_many_headlines = [f"Headline {i}" for i in range(6)]
result = await create_ad_creative(
access_token="test_token",
account_id="act_123456789",
name="Test Creative",
image_hash="abc123",
page_id="987654321",
headlines=too_many_headlines
)
result_data = json.loads(result)
# The error might be wrapped in a 'data' field
if "data" in result_data:
error_data = json.loads(result_data["data"])
assert "error" in error_data
assert "maximum" in error_data["error"].lower() or "limit" in error_data["error"].lower()
else:
assert "error" in result_data
assert "maximum" in result_data["error"].lower() or "limit" in result_data["error"].lower()
async def test_create_ad_creative_validation_max_descriptions(self):
"""Test validation for maximum number of descriptions."""
# Create list with more than 5 descriptions (assuming 5 is the limit)
too_many_descriptions = [f"Description {i}" for i in range(6)]
result = await create_ad_creative(
access_token="test_token",
account_id="act_123456789",
name="Test Creative",
image_hash="abc123",
page_id="987654321",
descriptions=too_many_descriptions
)
result_data = json.loads(result)
# The error might be wrapped in a 'data' field
if "data" in result_data:
error_data = json.loads(result_data["data"])
assert "error" in error_data
assert "maximum" in error_data["error"].lower() or "limit" in error_data["error"].lower()
else:
assert "error" in result_data
assert "maximum" in result_data["error"].lower() or "limit" in result_data["error"].lower()
async def test_update_ad_creative_add_headlines(self):
"""Test updating an existing creative to add multiple headlines."""
sample_creative_data = {
"id": "123456789",
"name": "Updated Creative",
"status": "ACTIVE"
}
with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
mock_api.return_value = sample_creative_data
result = await update_ad_creative(
access_token="test_token",
creative_id="123456789",
headlines=["New Headline 1", "New Headline 2", "New Headline 3"]
)
result_data = json.loads(result)
assert result_data["success"] is True
# Verify the API call was made with updated headlines
# We need to check the first call (creative update), not the second call (details fetch)
call_args_list = mock_api.call_args_list
assert len(call_args_list) >= 1
# First call should be the creative update
first_call = call_args_list[0]
creative_data = first_call[0][2] # params is the third argument
assert "asset_feed_spec" in creative_data
assert "headlines" in creative_data["asset_feed_spec"]
assert creative_data["asset_feed_spec"]["headlines"] == [
{"text": "New Headline 1"},
{"text": "New Headline 2"},
{"text": "New Headline 3"}
]
async def test_update_ad_creative_add_descriptions(self):
"""Test updating an existing creative to add multiple descriptions."""
sample_creative_data = {
"id": "123456789",
"name": "Updated Creative",
"status": "ACTIVE"
}
with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
mock_api.return_value = sample_creative_data
result = await update_ad_creative(
access_token="test_token",
creative_id="123456789",
descriptions=["New Description 1", "New Description 2"]
)
result_data = json.loads(result)
assert result_data["success"] is True
# Verify the API call was made with updated descriptions
# We need to check the first call (creative update), not the second call (details fetch)
call_args_list = mock_api.call_args_list
assert len(call_args_list) >= 1
# First call should be the creative update
first_call = call_args_list[0]
creative_data = first_call[0][2] # params is the third argument
assert "asset_feed_spec" in creative_data
assert "descriptions" in creative_data["asset_feed_spec"]
assert creative_data["asset_feed_spec"]["descriptions"] == [
{"text": "New Description 1"},
{"text": "New Description 2"}
]
async def test_update_ad_creative_update_dynamic_spec(self):
"""Test updating an existing creative's dynamic_creative_spec."""
sample_creative_data = {
"id": "123456789",
"name": "Updated Creative",
"status": "ACTIVE"
}
with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
mock_api.return_value = sample_creative_data
result = await update_ad_creative(
access_token="test_token",
creative_id="123456789",
dynamic_creative_spec={
"headline_optimization": True,
"description_optimization": False
}
)
result_data = json.loads(result)
assert result_data["success"] is True
# Verify the API call was made with updated dynamic_creative_spec
# We need to check the first call (creative update), not the second call (details fetch)
call_args_list = mock_api.call_args_list
assert len(call_args_list) >= 1
# First call should be the creative update
first_call = call_args_list[0]
creative_data = first_call[0][2] # params is the third argument
assert "dynamic_creative_spec" in creative_data
assert creative_data["dynamic_creative_spec"]["headline_optimization"] is True
assert creative_data["dynamic_creative_spec"]["description_optimization"] is False
async def test_update_ad_creative_no_creative_id(self):
"""Test update_ad_creative with empty creative_id provided."""
result = await update_ad_creative(
creative_id="", # Now provide the required parameter but with empty value
access_token="test_token",
headlines=["New Headline"]
)
result_data = json.loads(result)
# The error might be wrapped in a 'data' field
if "data" in result_data:
error_data = json.loads(result_data["data"])
assert "error" in error_data
assert "creative id" in error_data["error"].lower()
else:
assert "error" in result_data
assert "creative id" in result_data["error"].lower()
async def test_update_ad_creative_cannot_mix_headline_and_headlines(self):
"""Test that mixing headline and headlines parameters raises error in update."""
result = await update_ad_creative(
access_token="test_token",
creative_id="123456789",
headline="Single Headline",
headlines=["Headline 1", "Headline 2"]
)
result_data = json.loads(result)
# Check if error is wrapped in "data" field (MCP error response format)
if "data" in result_data:
error_data = json.loads(result_data["data"])
assert "error" in error_data
assert "Cannot specify both 'headline' and 'headlines'" in error_data["error"]
else:
assert "error" in result_data
assert "Cannot specify both 'headline' and 'headlines'" in result_data["error"]
async def test_update_ad_creative_cannot_mix_description_and_descriptions(self):
"""Test that mixing description and descriptions parameters raises error in update."""
result = await update_ad_creative(
access_token="test_token",
creative_id="123456789",
description="Single Description",
descriptions=["Description 1", "Description 2"]
)
result_data = json.loads(result)
# Check if error is wrapped in "data" field (MCP error response format)
if "data" in result_data:
error_data = json.loads(result_data["data"])
assert "error" in error_data
assert "Cannot specify both 'description' and 'descriptions'" in error_data["error"]
else:
assert "error" in result_data
assert "Cannot specify both 'description' and 'descriptions'" in result_data["error"]
async def test_update_ad_creative_validation_max_headlines(self):
"""Test validation for maximum number of headlines in update."""
# Create list with more than 5 headlines (limit)
too_many_headlines = [f"Headline {i}" for i in range(6)]
result = await update_ad_creative(
access_token="test_token",
creative_id="123456789",
headlines=too_many_headlines
)
result_data = json.loads(result)
# The error might be wrapped in a 'data' field
if "data" in result_data:
error_data = json.loads(result_data["data"])
assert "error" in error_data
assert "maximum 5 headlines" in error_data["error"].lower()
else:
assert "error" in result_data
assert "maximum 5 headlines" in result_data["error"].lower()
async def test_update_ad_creative_validation_max_descriptions(self):
"""Test validation for maximum number of descriptions in update."""
# Create list with more than 5 descriptions (limit)
too_many_descriptions = [f"Description {i}" for i in range(6)]
result = await update_ad_creative(
access_token="test_token",
creative_id="123456789",
descriptions=too_many_descriptions
)
result_data = json.loads(result)
# The error might be wrapped in a 'data' field
if "data" in result_data:
error_data = json.loads(result_data["data"])
assert "error" in error_data
assert "maximum 5 descriptions" in error_data["error"].lower()
else:
assert "error" in result_data
assert "maximum 5 descriptions" in result_data["error"].lower()
async def test_update_ad_creative_validation_headline_length(self):
"""Test validation for headline character limit."""
# Create headline longer than 40 characters
long_headline = "A" * 41
result = await update_ad_creative(
access_token="test_token",
creative_id="123456789",
headlines=[long_headline]
)
result_data = json.loads(result)
# The error might be wrapped in a 'data' field
if "data" in result_data:
error_data = json.loads(result_data["data"])
assert "error" in error_data
assert "40 character limit" in error_data["error"]
else:
assert "error" in result_data
assert "40 character limit" in result_data["error"]
async def test_update_ad_creative_validation_description_length(self):
"""Test validation for description character limit."""
# Create description longer than 125 characters
long_description = "A" * 126
result = await update_ad_creative(
access_token="test_token",
creative_id="123456789",
descriptions=[long_description]
)
result_data = json.loads(result)
# The error might be wrapped in a 'data' field
if "data" in result_data:
error_data = json.loads(result_data["data"])
assert "error" in error_data
assert "125 character limit" in error_data["error"]
else:
assert "error" in result_data
assert "125 character limit" in result_data["error"]
async def test_update_ad_creative_name_only(self):
"""Test updating just the creative name."""
sample_creative_data = {
"id": "123456789",
"name": "Updated Creative Name",
"status": "ACTIVE"
}
with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
mock_api.return_value = sample_creative_data
result = await update_ad_creative(
access_token="test_token",
creative_id="123456789",
name="Updated Creative Name"
)
result_data = json.loads(result)
assert result_data["success"] is True
# Verify the API call was made with just the name
call_args_list = mock_api.call_args_list
assert len(call_args_list) >= 1
first_call = call_args_list[0]
creative_data = first_call[0][2] # params is the third argument
assert "name" in creative_data
assert creative_data["name"] == "Updated Creative Name"
# Should not have asset_feed_spec since no dynamic content
assert "asset_feed_spec" not in creative_data
async def test_update_ad_creative_message_only(self):
"""Test updating just the creative message."""
sample_creative_data = {
"id": "123456789",
"name": "Test Creative",
"status": "ACTIVE"
}
with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
mock_api.return_value = sample_creative_data
result = await update_ad_creative(
access_token="test_token",
creative_id="123456789",
message="Updated message text"
)
result_data = json.loads(result)
assert result_data["success"] is True
# Verify the API call was made with object_story_spec
call_args_list = mock_api.call_args_list
assert len(call_args_list) >= 1
first_call = call_args_list[0]
creative_data = first_call[0][2] # params is the third argument
assert "object_story_spec" in creative_data
assert creative_data["object_story_spec"]["link_data"]["message"] == "Updated message text"
async def test_update_ad_creative_cta_with_dynamic_content(self):
"""Test updating call_to_action_type with dynamic creative content."""
sample_creative_data = {
"id": "123456789",
"name": "Test Creative",
"status": "ACTIVE"
}
with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
mock_api.return_value = sample_creative_data
result = await update_ad_creative(
access_token="test_token",
creative_id="123456789",
headlines=["Test Headline"],
call_to_action_type="SHOP_NOW"
)
result_data = json.loads(result)
assert result_data["success"] is True
# Verify CTA is added to asset_feed_spec when dynamic content exists
call_args_list = mock_api.call_args_list
assert len(call_args_list) >= 1
first_call = call_args_list[0]
creative_data = first_call[0][2] # params is the third argument
assert "asset_feed_spec" in creative_data
assert "call_to_action_types" in creative_data["asset_feed_spec"]
assert creative_data["asset_feed_spec"]["call_to_action_types"] == ["SHOP_NOW"]
async def test_update_ad_creative_cta_without_dynamic_content(self):
"""Test updating call_to_action_type without dynamic creative content."""
sample_creative_data = {
"id": "123456789",
"name": "Test Creative",
"status": "ACTIVE"
}
with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
mock_api.return_value = sample_creative_data
result = await update_ad_creative(
access_token="test_token",
creative_id="123456789",
call_to_action_type="LEARN_MORE"
)
result_data = json.loads(result)
assert result_data["success"] is True
# Verify CTA is added to object_story_spec when no dynamic content
call_args_list = mock_api.call_args_list
assert len(call_args_list) >= 1
first_call = call_args_list[0]
creative_data = first_call[0][2] # params is the third argument
assert "object_story_spec" in creative_data
assert "call_to_action" in creative_data["object_story_spec"]["link_data"]
assert creative_data["object_story_spec"]["link_data"]["call_to_action"]["type"] == "LEARN_MORE"
async def test_update_ad_creative_combined_updates(self):
"""Test updating multiple parameters at once."""
sample_creative_data = {
"id": "123456789",
"name": "Multi-Updated Creative",
"status": "ACTIVE"
}
with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
mock_api.return_value = sample_creative_data
result = await update_ad_creative(
access_token="test_token",
creative_id="123456789",
name="Multi-Updated Creative",
message="Updated message",
headlines=["New Headline 1", "New Headline 2"],
descriptions=["New Description 1"],
call_to_action_type="SIGN_UP"
)
result_data = json.loads(result)
assert result_data["success"] is True
# Verify all updates are included
call_args_list = mock_api.call_args_list
assert len(call_args_list) >= 1
first_call = call_args_list[0]
creative_data = first_call[0][2] # params is the third argument
# Should have name
assert creative_data["name"] == "Multi-Updated Creative"
# Should have asset_feed_spec with all dynamic content
assert "asset_feed_spec" in creative_data
assert "headlines" in creative_data["asset_feed_spec"]
assert "descriptions" in creative_data["asset_feed_spec"]
assert "primary_texts" in creative_data["asset_feed_spec"]
assert "call_to_action_types" in creative_data["asset_feed_spec"]
# Verify content
assert creative_data["asset_feed_spec"]["headlines"] == [
{"text": "New Headline 1"}, {"text": "New Headline 2"}
]
assert creative_data["asset_feed_spec"]["descriptions"] == [
{"text": "New Description 1"}
]
assert creative_data["asset_feed_spec"]["primary_texts"] == [
{"text": "Updated message"}
]
assert creative_data["asset_feed_spec"]["call_to_action_types"] == ["SIGN_UP"]
async def test_update_ad_creative_api_error_handling(self):
"""Test error handling when API request fails."""
with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
mock_api.side_effect = Exception("API request failed")
result = await update_ad_creative(
access_token="test_token",
creative_id="123456789",
name="Test Creative"
)
result_data = json.loads(result)
# The error might be wrapped in a 'data' field
if "data" in result_data:
error_data = json.loads(result_data["data"])
assert "error" in error_data
assert error_data["error"] == "Failed to update ad creative"
assert "details" in error_data
assert "update_data_sent" in error_data
else:
assert "error" in result_data
assert result_data["error"] == "Failed to update ad creative"
assert "details" in result_data
assert "update_data_sent" in result_data
```
--------------------------------------------------------------------------------
/tests/test_duplication_regression.py:
--------------------------------------------------------------------------------
```python
"""Comprehensive regression tests for duplication module."""
import os
import json
import pytest
import httpx
from unittest.mock import patch, AsyncMock, MagicMock
import importlib
class TestDuplicationFeatureToggle:
"""Test feature toggle functionality to prevent regression."""
def test_feature_disabled_by_default(self):
"""Ensure duplication is disabled by default."""
with patch.dict(os.environ, {}, clear=True):
# Force reload to pick up environment changes
import importlib
from meta_ads_mcp.core import duplication
importlib.reload(duplication)
assert not duplication.ENABLE_DUPLICATION
# Note: Functions may persist in module namespace due to previous test runs
# The important thing is that ENABLE_DUPLICATION flag is False
def test_feature_enabled_with_env_var(self):
"""Ensure duplication is enabled when environment variable is set."""
with patch.dict(os.environ, {"META_ADS_ENABLE_DUPLICATION": "1"}):
# Force reload to pick up environment changes
import importlib
from meta_ads_mcp.core import duplication
importlib.reload(duplication)
assert duplication.ENABLE_DUPLICATION
assert hasattr(duplication, 'duplicate_campaign')
assert hasattr(duplication, 'duplicate_adset')
assert hasattr(duplication, 'duplicate_ad')
assert hasattr(duplication, 'duplicate_creative')
def test_feature_enabled_with_various_truthy_values(self):
"""Test that various truthy values enable the feature."""
truthy_values = ["1", "true", "TRUE", "yes", "YES", "on", "ON", "enabled"]
for value in truthy_values:
with patch.dict(os.environ, {"META_ADS_ENABLE_DUPLICATION": value}):
import importlib
from meta_ads_mcp.core import duplication
importlib.reload(duplication)
assert duplication.ENABLE_DUPLICATION, f"Value '{value}' should enable the feature"
def test_feature_disabled_with_empty_string(self):
"""Test that empty string disables the feature."""
with patch.dict(os.environ, {"META_ADS_ENABLE_DUPLICATION": ""}):
import importlib
from meta_ads_mcp.core import duplication
importlib.reload(duplication)
assert not duplication.ENABLE_DUPLICATION
class TestDuplicationDecorators:
"""Test that decorators are applied correctly to prevent regression."""
@pytest.fixture(autouse=True)
def enable_feature(self):
"""Enable the duplication feature for these tests."""
with patch.dict(os.environ, {"META_ADS_ENABLE_DUPLICATION": "1"}):
import importlib
from meta_ads_mcp.core import duplication
importlib.reload(duplication)
yield duplication
def test_functions_have_meta_api_tool_decorator(self, enable_feature):
"""Ensure all duplication functions have @meta_api_tool decorator."""
duplication = enable_feature
functions = ['duplicate_campaign', 'duplicate_adset', 'duplicate_ad', 'duplicate_creative']
for func_name in functions:
func = getattr(duplication, func_name)
# Check that function has been wrapped by meta_api_tool
# The meta_api_tool decorator should add access token handling
assert callable(func), f"{func_name} should be callable"
# Check function signature includes access_token parameter
import inspect
sig = inspect.signature(func)
assert 'access_token' in sig.parameters, f"{func_name} should have access_token parameter"
assert sig.parameters['access_token'].default is None, f"{func_name} access_token should default to None"
@pytest.mark.asyncio
async def test_functions_are_mcp_tools(self, enable_feature):
"""Ensure all duplication functions are registered as MCP tools."""
# This test ensures the @mcp_server.tool() decorator is working
from meta_ads_mcp.core.server import mcp_server
# Get all registered tool names (list_tools is async)
tools = await mcp_server.list_tools()
tool_names = [tool.name for tool in tools]
expected_tools = ['duplicate_campaign', 'duplicate_adset', 'duplicate_ad', 'duplicate_creative']
for tool_name in expected_tools:
assert tool_name in tool_names, f"{tool_name} should be registered as an MCP tool"
class TestDuplicationAPIContract:
"""Test API contract to prevent regression in external API calls."""
@pytest.fixture(autouse=True)
def enable_feature(self):
"""Enable the duplication feature for these tests."""
with patch.dict(os.environ, {"META_ADS_ENABLE_DUPLICATION": "1"}):
import importlib
from meta_ads_mcp.core import duplication
importlib.reload(duplication)
yield duplication
@pytest.mark.asyncio
async def test_api_endpoint_construction(self, enable_feature):
"""Test that API endpoints are constructed correctly."""
duplication = enable_feature
test_cases = [
("campaign", "123456789", "https://mcp.pipeboard.co/api/meta/duplicate/campaign/123456789"),
("adset", "987654321", "https://mcp.pipeboard.co/api/meta/duplicate/adset/987654321"),
("ad", "555666777", "https://mcp.pipeboard.co/api/meta/duplicate/ad/555666777"),
("creative", "111222333", "https://mcp.pipeboard.co/api/meta/duplicate/creative/111222333"),
]
for resource_type, resource_id, expected_url in test_cases:
# Mock dual-header authentication
with patch("meta_ads_mcp.core.duplication.FastMCPAuthIntegration") as mock_auth:
mock_auth.get_pipeboard_token.return_value = "pipeboard_token"
mock_auth.get_auth_token.return_value = "facebook_token"
with patch("meta_ads_mcp.core.duplication.httpx.AsyncClient") as mock_client:
mock_response = AsyncMock()
mock_response.status_code = 200
mock_response.json.return_value = {"success": True}
mock_client.return_value.__aenter__.return_value.post.return_value = mock_response
await duplication._forward_duplication_request(
resource_type, resource_id, "test_token", {}
)
# Verify the correct URL was called
call_args = mock_client.return_value.__aenter__.return_value.post.call_args
actual_url = call_args[0][0]
assert actual_url == expected_url, f"Expected {expected_url}, got {actual_url}"
@pytest.mark.asyncio
async def test_request_headers_format(self, enable_feature):
"""Test that request headers are formatted correctly."""
duplication = enable_feature
# Mock dual-header authentication
with patch("meta_ads_mcp.core.duplication.FastMCPAuthIntegration") as mock_auth:
mock_auth.get_pipeboard_token.return_value = "pipeboard_token_12345"
mock_auth.get_auth_token.return_value = "facebook_token_67890"
with patch("meta_ads_mcp.core.duplication.httpx.AsyncClient") as mock_client:
mock_response = AsyncMock()
mock_response.status_code = 200
mock_response.json.return_value = {"success": True}
mock_client.return_value.__aenter__.return_value.post.return_value = mock_response
await duplication._forward_duplication_request(
"campaign", "123456789", "test_token_12345", {"name_suffix": " - Test"}
)
# Verify dual headers are sent correctly
call_args = mock_client.return_value.__aenter__.return_value.post.call_args
headers = call_args[1]["headers"]
# Check the dual-header authentication pattern
assert headers["Authorization"] == "Bearer facebook_token_67890" # Facebook token for Meta API
assert headers["X-Pipeboard-Token"] == "pipeboard_token_12345" # Pipeboard token for auth
assert headers["Content-Type"] == "application/json"
assert headers["User-Agent"] == "meta-ads-mcp/1.0"
@pytest.mark.asyncio
async def test_request_timeout_configuration(self, enable_feature):
"""Test that request timeout is configured correctly."""
duplication = enable_feature
# Mock dual-header authentication
with patch("meta_ads_mcp.core.duplication.FastMCPAuthIntegration") as mock_auth:
mock_auth.get_pipeboard_token.return_value = "pipeboard_token"
mock_auth.get_auth_token.return_value = "facebook_token"
with patch("meta_ads_mcp.core.duplication.httpx.AsyncClient") as mock_client:
mock_response = AsyncMock()
mock_response.status_code = 200
mock_response.json.return_value = {"success": True}
mock_client.return_value.__aenter__.return_value.post.return_value = mock_response
await duplication._forward_duplication_request(
"campaign", "123456789", "test_token", {}
)
# Verify timeout is set to 30 seconds
mock_client.assert_called_once_with(timeout=30.0)
class TestDuplicationErrorHandling:
"""Test error handling to prevent regression in error scenarios."""
@pytest.fixture(autouse=True)
def enable_feature(self):
"""Enable the duplication feature for these tests."""
with patch.dict(os.environ, {"META_ADS_ENABLE_DUPLICATION": "1"}):
import importlib
from meta_ads_mcp.core import duplication
importlib.reload(duplication)
yield duplication
@pytest.mark.asyncio
async def test_missing_access_token_error(self, enable_feature):
"""Test error handling when authentication tokens are missing."""
duplication = enable_feature
# Test missing Pipeboard token (primary authentication failure)
with patch("meta_ads_mcp.core.duplication.FastMCPAuthIntegration") as mock_auth:
mock_auth.get_pipeboard_token.return_value = None # No Pipeboard token
mock_auth.get_auth_token.return_value = "facebook_token" # Has Facebook token
result = await duplication._forward_duplication_request("campaign", "123", None, {})
result_json = json.loads(result)
assert result_json["error"] == "authentication_required"
assert "Pipeboard API token not found" in result_json["message"]
# Test missing Facebook token (secondary authentication failure)
with patch("meta_ads_mcp.core.duplication.FastMCPAuthIntegration") as mock_auth:
mock_auth.get_pipeboard_token.return_value = "pipeboard_token" # Has Pipeboard token
mock_auth.get_auth_token.return_value = None # No Facebook token
with patch("meta_ads_mcp.core.auth.get_current_access_token") as mock_get_token:
mock_get_token.return_value = None # No fallback token
result = await duplication._forward_duplication_request("campaign", "123", None, {})
result_json = json.loads(result)
assert result_json["error"] == "authentication_required"
assert "Meta Ads access token not found" in result_json["message"]
@pytest.mark.asyncio
async def test_http_status_code_handling(self, enable_feature):
"""Test handling of various HTTP status codes."""
duplication = enable_feature
status_code_tests = [
(200, "success_response", "json"),
(400, "validation_failed", "error"),
(401, "authentication_error", "error"),
(402, "subscription_required", "error"),
(403, "facebook_connection_required", "error"),
(404, "resource_not_found", "error"),
(429, "rate_limit_exceeded", "error"),
(502, "meta_api_error", "error"),
(500, "duplication_failed", "error"),
]
for status_code, expected_error_type, response_type in status_code_tests:
with patch("meta_ads_mcp.core.duplication.httpx.AsyncClient") as mock_client, \
patch("meta_ads_mcp.core.duplication.FastMCPAuthIntegration.get_pipeboard_token", return_value="test_pipeboard_token"), \
patch("meta_ads_mcp.core.duplication.FastMCPAuthIntegration.get_auth_token", return_value="test_facebook_token"):
# Use MagicMock instead of AsyncMock for more predictable behavior
mock_response = MagicMock()
mock_response.status_code = status_code
if status_code == 200:
mock_response.json.return_value = {"success": True, "id": "new_123"}
elif status_code == 400:
mock_response.json.return_value = {"errors": ["Invalid parameter"], "warnings": []}
elif status_code == 401:
mock_response.json.side_effect = Exception("No JSON")
mock_response.text = "Unauthorized"
elif status_code == 402:
mock_response.json.return_value = {
"message": "This feature is not available in your current plan",
"upgrade_url": "https://pipeboard.co/upgrade",
"suggestion": "Please upgrade your account to access this feature"
}
elif status_code == 403:
mock_response.json.return_value = {
"message": "You need to connect your Facebook account first",
"details": {
"login_flow_url": "/connections",
"auth_flow_url": "/api/meta/auth"
}
}
elif status_code == 404:
mock_response.json.side_effect = Exception("No JSON")
mock_response.text = "Not Found"
elif status_code == 429:
mock_response.headers.get.return_value = "60"
mock_response.json.side_effect = Exception("No JSON")
mock_response.text = "Rate limited"
elif status_code == 502:
mock_response.json.return_value = {"message": "Facebook API error"}
else:
mock_response.json.side_effect = Exception("No JSON")
mock_response.text = f"Error {status_code}"
mock_client.return_value.__aenter__.return_value.post.return_value = mock_response
result = await duplication._forward_duplication_request(
"campaign", "123", "token", {}
)
result_json = json.loads(result)
if response_type == "error":
if status_code == 401:
assert result_json["error"] == expected_error_type
elif status_code == 403:
assert result_json["error"] == expected_error_type
elif status_code == 400:
assert result_json["error"] == expected_error_type
elif status_code == 404:
assert result_json["error"] == expected_error_type
elif status_code == 502:
assert result_json["error"] == expected_error_type
else:
assert result_json["error"] == expected_error_type
else:
assert "success" in result_json or "id" in result_json
@pytest.mark.asyncio
async def test_network_error_handling(self, enable_feature):
"""Test handling of network errors."""
duplication = enable_feature
network_errors = [
(httpx.TimeoutException("Timeout"), "request_timeout"),
(httpx.RequestError("Connection failed"), "network_error"),
(Exception("Unexpected error"), "unexpected_error"),
]
for exception, expected_error in network_errors:
with patch("meta_ads_mcp.core.duplication.httpx.AsyncClient") as mock_client, \
patch("meta_ads_mcp.core.duplication.FastMCPAuthIntegration.get_pipeboard_token", return_value="test_pipeboard_token"), \
patch("meta_ads_mcp.core.duplication.FastMCPAuthIntegration.get_auth_token", return_value="test_facebook_token"):
mock_client.return_value.__aenter__.return_value.post.side_effect = exception
result = await duplication._forward_duplication_request(
"campaign", "123", "token", {}
)
result_json = json.loads(result)
assert result_json["error"] == expected_error
class TestDuplicationParameterHandling:
"""Test parameter handling to prevent regression in data processing."""
@pytest.fixture(autouse=True)
def enable_feature(self):
"""Enable the duplication feature for these tests."""
with patch.dict(os.environ, {"META_ADS_ENABLE_DUPLICATION": "1"}):
import importlib
from meta_ads_mcp.core import duplication
importlib.reload(duplication)
yield duplication
@pytest.mark.asyncio
async def test_none_values_filtered_from_options(self, enable_feature):
"""Test that None values are filtered from options."""
duplication = enable_feature
# Mock dual-header authentication
with patch("meta_ads_mcp.core.duplication.FastMCPAuthIntegration") as mock_auth:
mock_auth.get_pipeboard_token.return_value = "pipeboard_token"
mock_auth.get_auth_token.return_value = "facebook_token"
with patch("meta_ads_mcp.core.duplication.httpx.AsyncClient") as mock_client:
mock_response = AsyncMock()
mock_response.status_code = 200
mock_response.json.return_value = {"success": True}
mock_client.return_value.__aenter__.return_value.post.return_value = mock_response
# Test with options containing None values
options_with_none = {
"name_suffix": " - Test",
"new_daily_budget": None,
"new_status": "PAUSED",
"new_headline": None,
}
await duplication._forward_duplication_request(
"campaign", "123", "token", options_with_none
)
# Verify None values were filtered out
call_args = mock_client.return_value.__aenter__.return_value.post.call_args
json_payload = call_args[1]["json"]
assert "name_suffix" in json_payload
assert "new_status" in json_payload
assert "new_daily_budget" not in json_payload
assert "new_headline" not in json_payload
@pytest.mark.asyncio
async def test_campaign_duplication_parameter_forwarding(self, enable_feature):
"""Test that campaign duplication forwards all parameters correctly."""
duplication = enable_feature
with patch("meta_ads_mcp.core.duplication._forward_duplication_request") as mock_forward:
mock_forward.return_value = '{"success": true}'
# Test with all parameters
result = await duplication.duplicate_campaign(
campaign_id="123456789",
access_token="test_token",
name_suffix=" - New Copy",
include_ad_sets=False,
include_ads=True,
include_creatives=False,
copy_schedule=True,
new_daily_budget=100.50,
new_status="ACTIVE"
)
# Verify parameters were forwarded correctly
mock_forward.assert_called_once_with(
"campaign",
"123456789",
"test_token",
{
"name_suffix": " - New Copy",
"include_ad_sets": False,
"include_ads": True,
"include_creatives": False,
"copy_schedule": True,
"new_daily_budget": 100.50,
"new_status": "ACTIVE"
}
)
@pytest.mark.asyncio
async def test_adset_duplication_parameter_forwarding(self, enable_feature):
"""Test that ad set duplication forwards all parameters correctly including new_targeting."""
duplication = enable_feature
with patch("meta_ads_mcp.core.duplication._forward_duplication_request") as mock_forward:
mock_forward.return_value = '{"success": true}'
# Test with all parameters including new_targeting
result = await duplication.duplicate_adset(
adset_id="987654321",
access_token="test_token",
target_campaign_id="campaign_123",
name_suffix=" - Targeted Copy",
include_ads=False,
include_creatives=True,
new_daily_budget=200.00,
new_targeting={
"age_min": 25,
"age_max": 45,
"geo_locations": {
"countries": ["US", "CA"]
}
},
new_status="ACTIVE"
)
# Verify parameters were forwarded correctly
mock_forward.assert_called_once_with(
"adset",
"987654321",
"test_token",
{
"target_campaign_id": "campaign_123",
"name_suffix": " - Targeted Copy",
"include_ads": False,
"include_creatives": True,
"new_daily_budget": 200.00,
"new_targeting": {
"age_min": 25,
"age_max": 45,
"geo_locations": {
"countries": ["US", "CA"]
}
},
"new_status": "ACTIVE"
}
)
def test_estimated_components_calculation(self, enable_feature):
"""Test that estimated components are calculated correctly."""
duplication = enable_feature
test_cases = [
# Campaign with all components
("campaign", {"include_ad_sets": True, "include_ads": True, "include_creatives": True},
{"campaigns": 1, "ad_sets": "3-5 (estimated)", "ads": "5-15 (estimated)", "creatives": "5-15 (estimated)"}),
# Campaign with no sub-components
("campaign", {"include_ad_sets": False, "include_ads": False, "include_creatives": False},
{"campaigns": 1}),
# Ad set with ads
("adset", {"include_ads": True, "include_creatives": True},
{"ad_sets": 1, "ads": "2-5 (estimated)", "creatives": "2-5 (estimated)"}),
# Ad set without ads
("adset", {"include_ads": False, "include_creatives": False},
{"ad_sets": 1}),
# Single ad with creative
("ad", {"duplicate_creative": True},
{"ads": 1, "creatives": 1}),
# Single ad without creative
("ad", {"duplicate_creative": False},
{"ads": 1}),
# Single creative
("creative", {},
{"creatives": 1}),
]
for resource_type, options, expected in test_cases:
result = duplication._get_estimated_components(resource_type, options)
assert result == expected, f"Failed for {resource_type} with {options}"
class TestDuplicationIntegration:
"""Integration tests to prevent regression in end-to-end functionality."""
@pytest.fixture(autouse=True)
def enable_feature(self):
"""Enable the duplication feature for these tests."""
with patch.dict(os.environ, {"META_ADS_ENABLE_DUPLICATION": "1"}):
import importlib
from meta_ads_mcp.core import duplication
importlib.reload(duplication)
yield duplication
@pytest.mark.asyncio
async def test_end_to_end_successful_duplication(self, enable_feature):
"""Test complete successful duplication flow."""
duplication = enable_feature
# Mock the auth system completely to bypass the @meta_api_tool decorator checks
with patch("meta_ads_mcp.core.duplication.FastMCPAuthIntegration") as mock_auth_integration:
# Mock dual authentication tokens
mock_auth_integration.get_pipeboard_token.return_value = "pipeboard_token"
mock_auth_integration.get_auth_token.return_value = "facebook_token"
with patch("meta_ads_mcp.core.duplication.httpx.AsyncClient") as mock_client:
# Mock successful response
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {
"success": True,
"original_campaign_id": "123456789",
"new_campaign_id": "987654321",
"duplicated_components": {
"campaign": {"id": "987654321", "name": "Test Campaign - Copy"},
"ad_sets": [{"id": "111", "name": "Ad Set 1 - Copy"}],
"ads": [{"id": "222", "name": "Ad 1 - Copy"}],
"creatives": [{"id": "333", "name": "Creative 1 - Copy"}]
},
"warnings": [],
"subscription": {
"status": "active"
}
}
mock_client.return_value.__aenter__.return_value.post.return_value = mock_response
# Call the function with explicit token
result = await duplication.duplicate_campaign(
campaign_id="123456789",
access_token="facebook_token", # Use the mocked token
name_suffix=" - Test Copy"
)
# Verify result - handle @meta_api_tool wrapper
result_json = json.loads(result)
if "data" in result_json:
actual_result = json.loads(result_json["data"])
else:
actual_result = result_json
assert actual_result["success"] is True
assert actual_result["new_campaign_id"] == "987654321"
assert "duplicated_components" in actual_result
@pytest.mark.asyncio
async def test_facebook_connection_error_flow(self, enable_feature):
"""Test Facebook connection required error flow."""
duplication = enable_feature
# Mock the auth system completely to bypass the @meta_api_tool decorator checks
with patch("meta_ads_mcp.core.duplication.FastMCPAuthIntegration") as mock_auth_integration:
# Mock dual authentication tokens
mock_auth_integration.get_pipeboard_token.return_value = "pipeboard_token"
mock_auth_integration.get_auth_token.return_value = "facebook_token"
with patch("meta_ads_mcp.core.duplication.httpx.AsyncClient") as mock_client:
# Mock 403 response (Facebook connection required)
mock_response = MagicMock()
mock_response.status_code = 403
mock_response.json.return_value = {
"message": "You need to connect your Facebook account first",
"details": {
"login_flow_url": "/connections",
"auth_flow_url": "/api/meta/auth"
}
}
mock_client.return_value.__aenter__.return_value.post.return_value = mock_response
result = await duplication.duplicate_campaign(
campaign_id="123456789",
access_token="facebook_token" # Use the mocked token
)
# The @meta_api_tool decorator wraps the result in a data field
result_json = json.loads(result)
if "data" in result_json:
actual_result = json.loads(result_json["data"])
else:
actual_result = result_json
assert actual_result["success"] is False
assert actual_result["error"] == "facebook_connection_required"
assert actual_result["message"] == "You need to connect your Facebook account first"
assert "details" in actual_result
assert actual_result["details"]["login_flow_url"] == "/connections"
@pytest.mark.asyncio
async def test_subscription_required_error_flow(self, enable_feature):
"""Test subscription required error flow."""
duplication = enable_feature
# Mock the auth system completely to bypass the @meta_api_tool decorator checks
with patch("meta_ads_mcp.core.duplication.FastMCPAuthIntegration") as mock_auth_integration:
# Mock dual authentication tokens
mock_auth_integration.get_pipeboard_token.return_value = "pipeboard_token"
mock_auth_integration.get_auth_token.return_value = "facebook_token"
with patch("meta_ads_mcp.core.duplication.httpx.AsyncClient") as mock_client:
# Mock 402 response (subscription required)
mock_response = MagicMock()
mock_response.status_code = 402
mock_response.json.return_value = {
"message": "This feature is not available in your current plan",
"upgrade_url": "https://pipeboard.co/upgrade",
"suggestion": "Please upgrade your account to access this feature"
}
mock_client.return_value.__aenter__.return_value.post.return_value = mock_response
result = await duplication.duplicate_campaign(
campaign_id="123456789",
access_token="facebook_token" # Use the mocked token
)
# The @meta_api_tool decorator wraps the result in a data field
result_json = json.loads(result)
if "data" in result_json:
actual_result = json.loads(result_json["data"])
else:
actual_result = result_json
assert actual_result["success"] is False
assert actual_result["error"] == "subscription_required"
assert actual_result["message"] == "This feature is not available in your current plan"
assert actual_result["upgrade_url"] == "https://pipeboard.co/upgrade"
assert actual_result["suggestion"] == "Please upgrade your account to access this feature"
class TestDuplicationTokenHandling:
"""Test access token handling to prevent auth regression."""
@pytest.fixture(autouse=True)
def enable_feature(self):
"""Enable the duplication feature for these tests."""
with patch.dict(os.environ, {"META_ADS_ENABLE_DUPLICATION": "1"}):
import importlib
from meta_ads_mcp.core import duplication
importlib.reload(duplication)
yield duplication
@pytest.mark.asyncio
async def test_meta_api_tool_decorator_token_handling(self, enable_feature):
"""Test that @meta_api_tool decorator properly handles explicit tokens."""
duplication = enable_feature
# Test with explicit token - this should bypass auth system entirely
with patch("meta_ads_mcp.core.duplication._forward_duplication_request") as mock_forward:
mock_forward.return_value = '{"success": true}'
# Call with explicit access_token
await duplication.duplicate_campaign(
campaign_id="123456789",
access_token="explicit_token_12345"
)
# Verify the explicit token was passed through
mock_forward.assert_called_once()
call_args = mock_forward.call_args[0]
assert call_args[2] == "explicit_token_12345" # access_token is 3rd argument
@pytest.mark.asyncio
async def test_explicit_token_overrides_injection(self, enable_feature):
"""Test that explicit token overrides auto-injection."""
duplication = enable_feature
with patch("meta_ads_mcp.core.auth.get_current_access_token") as mock_get_token:
mock_get_token.return_value = "injected_token"
with patch("meta_ads_mcp.core.duplication._forward_duplication_request") as mock_forward:
mock_forward.return_value = '{"success": true}'
# Call with explicit access_token
await duplication.duplicate_campaign(
campaign_id="123456789",
access_token="explicit_token_12345"
)
# Verify the explicit token was used, not the injected one
mock_forward.assert_called_once()
call_args = mock_forward.call_args[0]
assert call_args[2] == "explicit_token_12345" # access_token is 3rd argument
class TestDuplicationRegressionEdgeCases:
"""Test edge cases that could cause regressions."""
@pytest.fixture(autouse=True)
def enable_feature(self):
"""Enable the duplication feature for these tests."""
with patch.dict(os.environ, {"META_ADS_ENABLE_DUPLICATION": "1"}):
import importlib
from meta_ads_mcp.core import duplication
importlib.reload(duplication)
yield duplication
@pytest.mark.asyncio
async def test_empty_string_parameters(self, enable_feature):
"""Test handling of empty string parameters."""
duplication = enable_feature
with patch("meta_ads_mcp.core.duplication._forward_duplication_request") as mock_forward:
mock_forward.return_value = '{"success": true}'
# Test with empty strings
await duplication.duplicate_campaign(
campaign_id="123456789",
access_token="token",
name_suffix="", # Empty string
new_status="" # Empty string
)
# Verify empty strings are preserved (not filtered like None)
call_args = mock_forward.call_args[0]
options = call_args[3]
assert options["name_suffix"] == ""
assert options["new_status"] == ""
@pytest.mark.asyncio
async def test_unicode_parameters(self, enable_feature):
"""Test handling of unicode parameters."""
duplication = enable_feature
# Mock dual-header authentication
with patch("meta_ads_mcp.core.duplication.FastMCPAuthIntegration") as mock_auth:
mock_auth.get_pipeboard_token.return_value = "pipeboard_token"
mock_auth.get_auth_token.return_value = "facebook_token"
with patch("meta_ads_mcp.core.duplication.httpx.AsyncClient") as mock_client:
mock_response = AsyncMock()
mock_response.status_code = 200
mock_response.json.return_value = {"success": True}
mock_client.return_value.__aenter__.return_value.post.return_value = mock_response
# Test with unicode characters
unicode_suffix = " - 复制版本 🚀"
await duplication._forward_duplication_request(
"campaign", "123", "token", {"name_suffix": unicode_suffix}
)
# Verify unicode is preserved in the request
call_args = mock_client.return_value.__aenter__.return_value.post.call_args
json_payload = call_args[1]["json"]
assert json_payload["name_suffix"] == unicode_suffix
@pytest.mark.asyncio
async def test_large_parameter_values(self, enable_feature):
"""Test handling of large parameter values."""
duplication = enable_feature
# Mock dual-header authentication
with patch("meta_ads_mcp.core.duplication.FastMCPAuthIntegration") as mock_auth:
mock_auth.get_pipeboard_token.return_value = "pipeboard_token"
mock_auth.get_auth_token.return_value = "facebook_token"
with patch("meta_ads_mcp.core.duplication.httpx.AsyncClient") as mock_client:
mock_response = AsyncMock()
mock_response.status_code = 200
mock_response.json.return_value = {"success": True}
mock_client.return_value.__aenter__.return_value.post.return_value = mock_response
# Test with very large budget value
large_budget = 999999999.99
await duplication._forward_duplication_request(
"campaign", "123", "token", {"new_daily_budget": large_budget}
)
# Verify large values are preserved
call_args = mock_client.return_value.__aenter__.return_value.post.call_args
json_payload = call_args[1]["json"]
assert json_payload["new_daily_budget"] == large_budget
def test_module_reload_safety(self):
"""Test that module can be safely reloaded without side effects."""
# This tests for common issues like global state pollution
# Enable feature
with patch.dict(os.environ, {"META_ADS_ENABLE_DUPLICATION": "1"}):
import importlib
from meta_ads_mcp.core import duplication
importlib.reload(duplication)
assert duplication.ENABLE_DUPLICATION
assert hasattr(duplication, 'duplicate_campaign')
# Disable feature and reload
with patch.dict(os.environ, {"META_ADS_ENABLE_DUPLICATION": ""}):
importlib.reload(duplication)
assert not duplication.ENABLE_DUPLICATION
# Note: Functions may still exist in the module namespace due to Python's
# module loading behavior, but they won't be registered as MCP tools
# This is expected behavior and not a problem for the feature toggle
# Re-enable feature and reload
with patch.dict(os.environ, {"META_ADS_ENABLE_DUPLICATION": "1"}):
importlib.reload(duplication)
assert duplication.ENABLE_DUPLICATION
assert hasattr(duplication, 'duplicate_campaign')
```