#
tokens: 49031/50000 6/82 files (page 5/6)
lines: on (toggle) GitHub
raw markdown copy reset
This is page 5 of 6. Use http://codebase.md/nictuku/meta-ads-mcp?lines=true&page={x} to view the full context.

# Directory Structure

```
├── .github
│   └── workflows
│       ├── publish-mcp.yml
│       ├── publish.yml
│       └── test.yml
├── .gitignore
├── .python-version
├── .uv.toml
├── CUSTOM_META_APP.md
├── Dockerfile
├── examples
│   ├── example_http_client.py
│   └── README.md
├── future_improvements.md
├── images
│   └── meta-ads-example.png
├── LICENSE
├── LOCAL_INSTALLATION.md
├── meta_ads_auth.sh
├── meta_ads_mcp
│   ├── __init__.py
│   ├── __main__.py
│   └── core
│       ├── __init__.py
│       ├── accounts.py
│       ├── ads_library.py
│       ├── ads.py
│       ├── adsets.py
│       ├── api.py
│       ├── auth.py
│       ├── authentication.py
│       ├── budget_schedules.py
│       ├── callback_server.py
│       ├── campaigns.py
│       ├── duplication.py
│       ├── http_auth_integration.py
│       ├── insights.py
│       ├── openai_deep_research.py
│       ├── pipeboard_auth.py
│       ├── reports.py
│       ├── resources.py
│       ├── server.py
│       ├── targeting.py
│       └── utils.py
├── META_API_NOTES.md
├── poetry.lock
├── pyproject.toml
├── README.md
├── RELEASE.md
├── requirements.txt
├── server.json
├── setup.py
├── smithery.yaml
├── STREAMABLE_HTTP_SETUP.md
└── tests
    ├── __init__.py
    ├── conftest.py
    ├── e2e_account_info_search_issue.py
    ├── README_REGRESSION_TESTS.md
    ├── README.md
    ├── test_account_info_access_fix.py
    ├── test_account_search.py
    ├── test_budget_update_e2e.py
    ├── test_budget_update.py
    ├── test_create_ad_creative_simple.py
    ├── test_create_simple_creative_e2e.py
    ├── test_dsa_beneficiary.py
    ├── test_dsa_integration.py
    ├── test_duplication_regression.py
    ├── test_duplication.py
    ├── test_dynamic_creatives.py
    ├── test_estimate_audience_size_e2e.py
    ├── test_estimate_audience_size.py
    ├── test_get_account_pages.py
    ├── test_get_ad_creatives_fix.py
    ├── test_get_ad_image_quality_improvements.py
    ├── test_get_ad_image_regression.py
    ├── test_http_transport.py
    ├── test_insights_actions_and_values_e2e.py
    ├── test_insights_pagination.py
    ├── test_integration_openai_mcp.py
    ├── test_is_dynamic_creative_adset.py
    ├── test_mobile_app_adset_creation.py
    ├── test_mobile_app_adset_issue.py
    ├── test_openai_mcp_deep_research.py
    ├── test_openai.py
    ├── test_page_discovery_integration.py
    ├── test_page_discovery.py
    ├── test_targeting_search_e2e.py
    ├── test_targeting.py
    ├── test_update_ad_creative_id.py
    └── test_upload_ad_image.py
```

# Files

--------------------------------------------------------------------------------
/meta_ads_mcp/core/targeting.py:
--------------------------------------------------------------------------------

```python
  1 | """Targeting search functionality for Meta Ads API."""
  2 | 
  3 | import json
  4 | from typing import Optional, List, Dict, Any
  5 | import os
  6 | from .api import meta_api_tool, make_api_request
  7 | from .server import mcp_server
  8 | 
  9 | 
 10 | @mcp_server.tool()
 11 | @meta_api_tool
 12 | async def search_interests(query: str, access_token: Optional[str] = None, limit: int = 25) -> str:
 13 |     """
 14 |     Search for interest targeting options by keyword.
 15 |     
 16 |     Args:
 17 |         query: Search term for interests (e.g., "baseball", "cooking", "travel")
 18 |         access_token: Meta API access token (optional - will use cached token if not provided)
 19 |         limit: Maximum number of results to return (default: 25)
 20 |     
 21 |     Returns:
 22 |         JSON string containing interest data with id, name, audience_size, and path fields
 23 |     """
 24 |     if not query:
 25 |         return json.dumps({"error": "No search query provided"}, indent=2)
 26 |     
 27 |     endpoint = "search"
 28 |     params = {
 29 |         "type": "adinterest",
 30 |         "q": query,
 31 |         "limit": limit
 32 |     }
 33 |     
 34 |     data = await make_api_request(endpoint, access_token, params)
 35 |     
 36 |     return json.dumps(data, indent=2)
 37 | 
 38 | 
 39 | @mcp_server.tool()
 40 | @meta_api_tool
 41 | async def get_interest_suggestions(interest_list: List[str], access_token: Optional[str] = None, limit: int = 25) -> str:
 42 |     """
 43 |     Get interest suggestions based on existing interests.
 44 |     
 45 |     Args:
 46 |         interest_list: List of interest names to get suggestions for (e.g., ["Basketball", "Soccer"])
 47 |         access_token: Meta API access token (optional - will use cached token if not provided)  
 48 |         limit: Maximum number of suggestions to return (default: 25)
 49 |     
 50 |     Returns:
 51 |         JSON string containing suggested interests with id, name, audience_size, and description fields
 52 |     """
 53 |     if not interest_list:
 54 |         return json.dumps({"error": "No interest list provided"}, indent=2)
 55 |     
 56 |     endpoint = "search"
 57 |     params = {
 58 |         "type": "adinterestsuggestion", 
 59 |         "interest_list": json.dumps(interest_list),
 60 |         "limit": limit
 61 |     }
 62 |     
 63 |     data = await make_api_request(endpoint, access_token, params)
 64 |     
 65 |     return json.dumps(data, indent=2)
 66 | 
 67 | 
 68 | @mcp_server.tool()
 69 | @meta_api_tool
 70 | async def estimate_audience_size(
 71 |     access_token: Optional[str] = None,
 72 |     account_id: Optional[str] = None,
 73 |     targeting: Optional[Dict[str, Any]] = None,
 74 |     optimization_goal: str = "REACH",
 75 |     # Backwards compatibility for simple interest validation
 76 |     interest_list: Optional[List[str]] = None,
 77 |     interest_fbid_list: Optional[List[str]] = None
 78 | ) -> str:
 79 |     """
 80 |     Estimate audience size for targeting specifications using Meta's delivery_estimate API.
 81 |     
 82 |     This function provides comprehensive audience estimation for complex targeting combinations
 83 |     including demographics, geography, interests, and behaviors. It also maintains backwards
 84 |     compatibility for simple interest validation.
 85 |     
 86 |     Args:
 87 |         access_token: Meta API access token (optional - will use cached token if not provided)
 88 |         account_id: Meta Ads account ID (format: act_XXXXXXXXX) - required for comprehensive estimation
 89 |         targeting: Complete targeting specification including demographics, geography, interests, etc.
 90 |                   Example: {
 91 |                       "age_min": 25,
 92 |                       "age_max": 65,
 93 |                       "geo_locations": {"countries": ["PL"]},
 94 |                       "flexible_spec": [
 95 |                           {"interests": [{"id": "6003371567474"}]},
 96 |                           {"interests": [{"id": "6003462346642"}]}
 97 |                       ]
 98 |                   }
 99 |         optimization_goal: Optimization goal for estimation (default: "REACH"). 
100 |                           Options: "REACH", "LINK_CLICKS", "IMPRESSIONS", "CONVERSIONS", etc.
101 |         interest_list: [DEPRECATED - for backwards compatibility] List of interest names to validate
102 |         interest_fbid_list: [DEPRECATED - for backwards compatibility] List of interest IDs to validate
103 |     
104 |     Returns:
105 |         JSON string with audience estimation results including estimated_audience_size,
106 |         reach_estimate, and targeting validation
107 |     """
108 |     # Handle backwards compatibility - simple interest validation
109 |     # Check if we're in backwards compatibility mode (interest params provided OR no comprehensive params)
110 |     is_backwards_compatible_call = (interest_list or interest_fbid_list) or (not account_id and not targeting)
111 |     
112 |     if is_backwards_compatible_call and not targeting:
113 |         if not interest_list and not interest_fbid_list:
114 |             return json.dumps({"error": "No interest list or FBID list provided"}, indent=2)
115 |         
116 |         endpoint = "search"
117 |         params = {
118 |             "type": "adinterestvalid"
119 |         }
120 |         
121 |         if interest_list:
122 |             params["interest_list"] = json.dumps(interest_list)
123 |         
124 |         if interest_fbid_list:
125 |             params["interest_fbid_list"] = json.dumps(interest_fbid_list)
126 |         
127 |         data = await make_api_request(endpoint, access_token, params)
128 |         
129 |         return json.dumps(data, indent=2)
130 |     
131 |     # Comprehensive audience estimation using delivery_estimate API
132 |     if not account_id:
133 |         return json.dumps({
134 |             "error": "account_id is required for comprehensive audience estimation",
135 |             "details": "For simple interest validation, use interest_list or interest_fbid_list parameters"
136 |         }, indent=2)
137 |     
138 |     if not targeting:
139 |         return json.dumps({
140 |             "error": "targeting specification is required for comprehensive audience estimation",
141 |             "example": {
142 |                 "age_min": 25,
143 |                 "age_max": 65,
144 |                 "geo_locations": {"countries": ["US"]},
145 |                 "flexible_spec": [
146 |                     {"interests": [{"id": "6003371567474"}]}
147 |                 ]
148 |             }
149 |         }, indent=2)
150 |     
151 |     # Preflight validation: require at least one location OR a custom audience
152 |     def _has_location_or_custom_audience(t: Dict[str, Any]) -> bool:
153 |         if not isinstance(t, dict):
154 |             return False
155 |         geo = t.get("geo_locations") or {}
156 |         if isinstance(geo, dict):
157 |             for key in [
158 |                 "countries",
159 |                 "regions",
160 |                 "cities",
161 |                 "zips",
162 |                 "geo_markets",
163 |                 "country_groups"
164 |             ]:
165 |                 val = geo.get(key)
166 |                 if isinstance(val, list) and len(val) > 0:
167 |                     return True
168 |         # Top-level custom audiences
169 |         ca = t.get("custom_audiences")
170 |         if isinstance(ca, list) and len(ca) > 0:
171 |             return True
172 |         # Custom audiences within flexible_spec
173 |         flex = t.get("flexible_spec")
174 |         if isinstance(flex, list):
175 |             for spec in flex:
176 |                 if isinstance(spec, dict):
177 |                     ca_spec = spec.get("custom_audiences")
178 |                     if isinstance(ca_spec, list) and len(ca_spec) > 0:
179 |                         return True
180 |         return False
181 | 
182 |     if not _has_location_or_custom_audience(targeting):
183 |         return json.dumps({
184 |             "error": "Missing target audience location",
185 |             "details": "Select at least one location in targeting.geo_locations or include a custom audience.",
186 |             "action_required": "Add geo_locations with countries/regions/cities/zips or include custom_audiences.",
187 |             "example": {
188 |                 "geo_locations": {"countries": ["US"]},
189 |                 "age_min": 25,
190 |                 "age_max": 65
191 |             }
192 |         }, indent=2)
193 |     
194 |     # Build reach estimate request (using correct Meta API endpoint)
195 |     endpoint = f"{account_id}/reachestimate"
196 |     params = {
197 |         "targeting_spec": targeting
198 |     }
199 |     
200 |     # Note: reachestimate endpoint doesn't support optimization_goal or objective parameters
201 |     
202 |     try:
203 |         data = await make_api_request(endpoint, access_token, params, method="GET")
204 |         
205 |         # Surface Graph API errors directly for better diagnostics.
206 |         # If reachestimate fails, optionally attempt a fallback using delivery_estimate.
207 |         if isinstance(data, dict) and "error" in data:
208 |             # Special handling for Missing Target Audience Location error (subcode 1885364)
209 |             try:
210 |                 err_wrapper = data.get("error", {})
211 |                 details_obj = err_wrapper.get("details", {})
212 |                 raw_err = details_obj.get("error", {}) if isinstance(details_obj, dict) else {}
213 |                 if (
214 |                     isinstance(raw_err, dict) and (
215 |                         raw_err.get("error_subcode") == 1885364 or
216 |                         raw_err.get("error_user_title") == "Missing Target Audience Location"
217 |                     )
218 |                 ):
219 |                     return json.dumps({
220 |                         "error": "Missing target audience location",
221 |                         "details": raw_err.get("error_user_msg") or "Select at least one location, or choose a custom audience.",
222 |                         "endpoint_used": f"{account_id}/reachestimate",
223 |                         "action_required": "Add geo_locations with at least one of countries/regions/cities/zips or include custom_audiences.",
224 |                         "blame_field_specs": raw_err.get("error_data", {}).get("blame_field_specs") if isinstance(raw_err.get("error_data"), dict) else None
225 |                     }, indent=2)
226 |             except Exception:
227 |                 pass
228 |             # Allow disabling fallback via environment variable
229 |             # Default: fallback disabled unless explicitly enabled by setting DISABLE flag to "0"
230 |             disable_fallback = os.environ.get("META_MCP_DISABLE_DELIVERY_FALLBACK", "1") == "1"
231 |             if disable_fallback:
232 |                 return json.dumps({
233 |                     "error": "Graph API returned an error for reachestimate",
234 |                     "details": data.get("error"),
235 |                     "endpoint_used": f"{account_id}/reachestimate",
236 |                     "request_params": {
237 |                         "has_targeting_spec": bool(targeting),
238 |                     },
239 |                     "note": "delivery_estimate fallback disabled via META_MCP_DISABLE_DELIVERY_FALLBACK"
240 |                 }, indent=2)
241 | 
242 |             # Try fallback to delivery_estimate endpoint
243 |             try:
244 |                 fallback_endpoint = f"{account_id}/delivery_estimate"
245 |                 fallback_params = {
246 |                     "targeting_spec": json.dumps(targeting),
247 |                     # Some API versions accept optimization_goal here
248 |                     "optimization_goal": optimization_goal
249 |                 }
250 |                 fallback_data = await make_api_request(fallback_endpoint, access_token, fallback_params, method="GET")
251 |                 
252 |                 # If fallback returns usable data, format similarly
253 |                 if isinstance(fallback_data, dict) and "data" in fallback_data and len(fallback_data["data"]) > 0:
254 |                     estimate_data = fallback_data["data"][0]
255 |                     formatted_response = {
256 |                         "success": True,
257 |                         "account_id": account_id,
258 |                         "targeting": targeting,
259 |                         "optimization_goal": optimization_goal,
260 |                         "estimated_audience_size": estimate_data.get("estimate_mau", 0),
261 |                         "estimate_details": {
262 |                             "monthly_active_users": estimate_data.get("estimate_mau", 0),
263 |                             "daily_outcomes_curve": estimate_data.get("estimate_dau", []),
264 |                             "bid_estimate": estimate_data.get("bid_estimates", {}),
265 |                             "unsupported_targeting": estimate_data.get("unsupported_targeting", [])
266 |                         },
267 |                         "raw_response": fallback_data,
268 |                         "fallback_endpoint_used": "delivery_estimate"
269 |                     }
270 |                     return json.dumps(formatted_response, indent=2)
271 |                 
272 |                 # Fallback returned but not in expected format
273 |                 return json.dumps({
274 |                     "error": "Graph API returned an error for reachestimate; delivery_estimate fallback did not return usable data",
275 |                     "reachestimate_error": data.get("error"),
276 |                     "fallback_endpoint_used": "delivery_estimate",
277 |                     "fallback_raw_response": fallback_data,
278 |                     "endpoint_used": f"{account_id}/reachestimate",
279 |                     "request_params": {
280 |                         "has_targeting_spec": bool(targeting)
281 |                     }
282 |                 }, indent=2)
283 |             except Exception as _fallback_exc:
284 |                 return json.dumps({
285 |                     "error": "Graph API returned an error for reachestimate; delivery_estimate fallback also failed",
286 |                     "reachestimate_error": data.get("error"),
287 |                     "fallback_endpoint_used": "delivery_estimate",
288 |                     "fallback_exception": str(_fallback_exc),
289 |                     "endpoint_used": f"{account_id}/reachestimate",
290 |                     "request_params": {
291 |                         "has_targeting_spec": bool(targeting)
292 |                     }
293 |                 }, indent=2)
294 | 
295 |         # Format the response for easier consumption
296 |         if "data" in data:
297 |             response_data = data["data"]
298 |             # Case 1: delivery_estimate-like list structure
299 |             if isinstance(response_data, list) and len(response_data) > 0:
300 |                 estimate_data = response_data[0]
301 |                 formatted_response = {
302 |                     "success": True,
303 |                     "account_id": account_id,
304 |                     "targeting": targeting,
305 |                     "optimization_goal": optimization_goal,
306 |                     "estimated_audience_size": estimate_data.get("estimate_mau", 0),
307 |                     "estimate_details": {
308 |                         "monthly_active_users": estimate_data.get("estimate_mau", 0),
309 |                         "daily_outcomes_curve": estimate_data.get("estimate_dau", []),
310 |                         "bid_estimate": estimate_data.get("bid_estimates", {}),
311 |                         "unsupported_targeting": estimate_data.get("unsupported_targeting", [])
312 |                     },
313 |                     "raw_response": data
314 |                 }
315 |                 return json.dumps(formatted_response, indent=2)
316 |             # Case 1b: explicit handling for empty list responses
317 |             if isinstance(response_data, list) and len(response_data) == 0:
318 |                 return json.dumps({
319 |                     "error": "No estimation data returned from Meta API",
320 |                     "raw_response": data,
321 |                     "debug_info": {
322 |                         "response_keys": list(data.keys()) if isinstance(data, dict) else "not_a_dict",
323 |                         "response_type": str(type(data)),
324 |                         "endpoint_used": f"{account_id}/reachestimate"
325 |                     }
326 |                 }, indent=2)
327 |             # Case 2: reachestimate dict structure with bounds
328 |             if isinstance(response_data, dict):
329 |                 lower = response_data.get("users_lower_bound", response_data.get("estimate_mau_lower_bound"))
330 |                 upper = response_data.get("users_upper_bound", response_data.get("estimate_mau_upper_bound"))
331 |                 estimate_ready = response_data.get("estimate_ready")
332 |                 midpoint = None
333 |                 try:
334 |                     if isinstance(lower, (int, float)) and isinstance(upper, (int, float)):
335 |                         midpoint = int((lower + upper) / 2)
336 |                 except Exception:
337 |                     midpoint = None
338 |                 formatted_response = {
339 |                     "success": True,
340 |                     "account_id": account_id,
341 |                     "targeting": targeting,
342 |                     "optimization_goal": optimization_goal,
343 |                     "estimated_audience_size": midpoint if midpoint is not None else 0,
344 |                     "estimate_details": {
345 |                         "users_lower_bound": lower,
346 |                         "users_upper_bound": upper,
347 |                         "estimate_ready": estimate_ready
348 |                     },
349 |                     "raw_response": data
350 |                 }
351 |                 return json.dumps(formatted_response, indent=2)
352 |         else:
353 |             return json.dumps({
354 |                 "error": "No estimation data returned from Meta API",
355 |                 "raw_response": data,
356 |                 "debug_info": {
357 |                     "response_keys": list(data.keys()) if isinstance(data, dict) else "not_a_dict",
358 |                     "response_type": str(type(data)),
359 |                     "endpoint_used": f"{account_id}/reachestimate"
360 |                 }
361 |             }, indent=2)
362 |     
363 |     except Exception as e:
364 |         # Try fallback to delivery_estimate first when an exception occurs (unless disabled)
365 |         # Default: fallback disabled unless explicitly enabled by setting DISABLE flag to "0"
366 |         disable_fallback = os.environ.get("META_MCP_DISABLE_DELIVERY_FALLBACK", "1") == "1"
367 |         if not disable_fallback:
368 |             try:
369 |                 fallback_endpoint = f"{account_id}/delivery_estimate"
370 |                 fallback_params = {
371 |                 "targeting_spec": json.dumps(targeting) if isinstance(targeting, dict) else targeting,
372 |                 "optimization_goal": optimization_goal
373 |             }
374 |                 fallback_data = await make_api_request(fallback_endpoint, access_token, fallback_params, method="GET")
375 |             
376 |                 if isinstance(fallback_data, dict) and "data" in fallback_data and len(fallback_data["data"]) > 0:
377 |                     estimate_data = fallback_data["data"][0]
378 |                     formatted_response = {
379 |                         "success": True,
380 |                         "account_id": account_id,
381 |                         "targeting": targeting,
382 |                         "optimization_goal": optimization_goal,
383 |                         "estimated_audience_size": estimate_data.get("estimate_mau", 0),
384 |                         "estimate_details": {
385 |                             "monthly_active_users": estimate_data.get("estimate_mau", 0),
386 |                             "daily_outcomes_curve": estimate_data.get("estimate_dau", []),
387 |                             "bid_estimate": estimate_data.get("bid_estimates", {}),
388 |                             "unsupported_targeting": estimate_data.get("unsupported_targeting", [])
389 |                         },
390 |                         "raw_response": fallback_data,
391 |                         "fallback_endpoint_used": "delivery_estimate"
392 |                     }
393 |                     return json.dumps(formatted_response, indent=2)
394 |             except Exception as _fallback_exc:
395 |                 # If fallback also fails, proceed to detailed error handling below
396 |                 pass
397 | 
398 |         # Check if this is the specific Business Manager system user permission error
399 |         error_str = str(e)
400 |         if "100" in error_str and "33" in error_str:
401 |             # Try to provide fallback estimation using individual interests if available
402 |             interests_found = []
403 |             if targeting and "interests" in targeting:
404 |                 interests_found.extend([interest.get("id") for interest in targeting["interests"] if interest.get("id")])
405 |             elif targeting and "flexible_spec" in targeting:
406 |                 for spec in targeting["flexible_spec"]:
407 |                     if "interests" in spec:
408 |                         interests_found.extend([interest.get("id") for interest in spec["interests"] if interest.get("id")])
409 |             
410 |             if interests_found:
411 |                 # Attempt to get individual interest data as fallback
412 |                 try:
413 |                     fallback_result = await estimate_audience_size(
414 |                         access_token=access_token,
415 |                         interest_fbid_list=interests_found
416 |                     )
417 |                     fallback_data = json.loads(fallback_result)
418 |                     
419 |                     return json.dumps({
420 |                         "comprehensive_targeting_failed": True,
421 |                         "error_code": "100-33",
422 |                         "fallback_used": True,
423 |                         "details": {
424 |                             "issue": "reachestimate endpoint returned error - possibly due to targeting parameters or account limitations",
425 |                             "solution": "Individual interest validation used as fallback - comprehensive targeting may have specific requirements",
426 |                             "endpoint_used": f"{account_id}/reachestimate"
427 |                         },
428 |                         "individual_interest_data": fallback_data,
429 |                         "note": "Individual interest audience sizes provided as fallback. Comprehensive targeting via reachestimate endpoint failed."
430 |                     }, indent=2)
431 |                 except:
432 |                     pass
433 |             
434 |             return json.dumps({
435 |                 "error": "reachestimate endpoint returned error (previously was incorrectly using delivery_estimate)",
436 |                 "error_code": "100-33", 
437 |                 "details": {
438 |                     "issue": "The endpoint returned an error, possibly due to targeting parameters or account limitations",
439 |                     "endpoint_used": f"{account_id}/reachestimate",
440 |                     "previous_issue": "Code was previously using non-existent delivery_estimate endpoint - now fixed",
441 |                     "available_alternative": "Use interest_list or interest_fbid_list parameters for individual interest validation"
442 |                 },
443 |                 "raw_error": error_str
444 |             }, indent=2)
445 |         else:
446 |             return json.dumps({
447 |                 "error": f"Failed to get audience estimation from reachestimate endpoint: {str(e)}",
448 |                 "details": "Check targeting parameters and account permissions",
449 |                 "error_type": "general_api_error",
450 |                 "endpoint_used": f"{account_id}/reachestimate"
451 |             }, indent=2)
452 | 
453 | 
454 | @mcp_server.tool()
455 | @meta_api_tool
456 | async def search_behaviors(access_token: Optional[str] = None, limit: int = 50) -> str:
457 |     """
458 |     Get all available behavior targeting options.
459 |     
460 |     Args:
461 |         access_token: Meta API access token (optional - will use cached token if not provided)
462 |         limit: Maximum number of results to return (default: 50)
463 |     
464 |     Returns:
465 |         JSON string containing behavior targeting options with id, name, audience_size bounds, path, and description
466 |     """
467 |     endpoint = "search"
468 |     params = {
469 |         "type": "adTargetingCategory",
470 |         "class": "behaviors",
471 |         "limit": limit
472 |     }
473 |     
474 |     data = await make_api_request(endpoint, access_token, params)
475 |     
476 |     return json.dumps(data, indent=2)
477 | 
478 | 
479 | @mcp_server.tool()
480 | @meta_api_tool
481 | async def search_demographics(access_token: Optional[str] = None, demographic_class: str = "demographics", limit: int = 50) -> str:
482 |     """
483 |     Get demographic targeting options.
484 |     
485 |     Args:
486 |         access_token: Meta API access token (optional - will use cached token if not provided)
487 |         demographic_class: Type of demographics to retrieve. Options: 'demographics', 'life_events', 
488 |                           'industries', 'income', 'family_statuses', 'user_device', 'user_os' (default: 'demographics')
489 |         limit: Maximum number of results to return (default: 50)
490 |     
491 |     Returns:
492 |         JSON string containing demographic targeting options with id, name, audience_size bounds, path, and description
493 |     """
494 |     endpoint = "search"
495 |     params = {
496 |         "type": "adTargetingCategory",
497 |         "class": demographic_class,
498 |         "limit": limit
499 |     }
500 |     
501 |     data = await make_api_request(endpoint, access_token, params)
502 |     
503 |     return json.dumps(data, indent=2)
504 | 
505 | 
506 | @mcp_server.tool()
507 | @meta_api_tool
508 | async def search_geo_locations(query: str, access_token: Optional[str] = None, 
509 |                              location_types: Optional[List[str]] = None, limit: int = 25) -> str:
510 |     """
511 |     Search for geographic targeting locations.
512 |     
513 |     Args:
514 |         query: Search term for locations (e.g., "New York", "California", "Japan")
515 |         access_token: Meta API access token (optional - will use cached token if not provided)
516 |         location_types: Types of locations to search. Options: ['country', 'region', 'city', 'zip', 
517 |                        'geo_market', 'electoral_district']. If not specified, searches all types.
518 |         limit: Maximum number of results to return (default: 25)
519 |     
520 |     Returns:
521 |         JSON string containing location data with key, name, type, and geographic hierarchy information
522 |     """
523 |     if not query:
524 |         return json.dumps({"error": "No search query provided"}, indent=2)
525 |     
526 |     endpoint = "search"
527 |     params = {
528 |         "type": "adgeolocation",
529 |         "q": query,
530 |         "limit": limit
531 |     }
532 |     
533 |     if location_types:
534 |         params["location_types"] = json.dumps(location_types)
535 |     
536 |     data = await make_api_request(endpoint, access_token, params)
537 |     
538 |     return json.dumps(data, indent=2) 
```

--------------------------------------------------------------------------------
/tests/test_estimate_audience_size_e2e.py:
--------------------------------------------------------------------------------

```python
  1 | #!/usr/bin/env python3
  2 | """
  3 | End-to-End Audience Estimation Test for Meta Ads MCP
  4 | 
  5 | This test validates that the new estimate_audience_size function correctly provides
  6 | comprehensive audience estimation and backwards compatibility for interest validation
  7 | through a pre-authenticated MCP server.
  8 | 
  9 | Usage:
 10 |     1. Start the server: uv run python -m meta_ads_mcp --transport streamable-http --port 8080
 11 |     2. Run test: uv run python tests/test_estimate_audience_size_e2e.py
 12 | 
 13 | Or with pytest (manual only):
 14 |     uv run python -m pytest tests/test_estimate_audience_size_e2e.py -v -m e2e
 15 | 
 16 | Test scenarios:
 17 | 1. Comprehensive audience estimation with complex targeting
 18 | 2. Backwards compatibility with simple interest validation
 19 | 3. Error handling for invalid parameters
 20 | 4. Different optimization goals
 21 | """
 22 | 
 23 | import pytest
 24 | import requests
 25 | import json
 26 | import os
 27 | import sys
 28 | from typing import Dict, Any, List
 29 | 
 30 | # Load environment variables from .env file
 31 | try:
 32 |     from dotenv import load_dotenv
 33 |     load_dotenv()
 34 |     print("✅ Loaded environment variables from .env file")
 35 | except ImportError:
 36 |     print("⚠️  python-dotenv not installed, using system environment variables only")
 37 | 
 38 | @pytest.mark.e2e
 39 | @pytest.mark.skip(reason="E2E test - run manually only")
 40 | class AudienceEstimationTester:
 41 |     """Test suite focused on audience estimation functionality"""
 42 |     
 43 |     def __init__(self, base_url: str = "http://localhost:8080"):
 44 |         self.base_url = base_url.rstrip('/')
 45 |         self.endpoint = f"{self.base_url}/mcp/"
 46 |         self.request_id = 1
 47 |         
 48 |         # Default account ID from workspace rules
 49 |         self.account_id = "act_701351919139047"
 50 |         
 51 |         # Test targeting specifications
 52 |         self.test_targeting_specs = {
 53 |             "simple_demographics": {
 54 |                 "age_min": 25,
 55 |                 "age_max": 65,
 56 |                 "geo_locations": {"countries": ["US"]}
 57 |             },
 58 |             "demographics_with_interests": {
 59 |                 "age_min": 18,
 60 |                 "age_max": 35,
 61 |                 "geo_locations": {"countries": ["PL"]},
 62 |                 "flexible_spec": [
 63 |                     {"interests": [{"id": "6003371567474"}]}  # Business interest
 64 |                 ]
 65 |             },
 66 |             "complex_targeting": {
 67 |                 "age_min": 25,
 68 |                 "age_max": 55,
 69 |                 "geo_locations": {"countries": ["US"], "regions": [{"key": "3847"}]},  # California
 70 |                 "flexible_spec": [
 71 |                     {"interests": [{"id": "6003371567474"}, {"id": "6003462346642"}]},  # Business + Technology
 72 |                     {"behaviors": [{"id": "6007101597783"}]}  # Business travelers
 73 |                 ]
 74 |             },
 75 |             "mobile_app_targeting": {
 76 |                 "age_min": 18,
 77 |                 "age_max": 45,
 78 |                 "geo_locations": {"countries": ["US"]},
 79 |                 "user_device": ["mobile"],
 80 |                 "user_os": ["iOS", "Android"],
 81 |                 "flexible_spec": [
 82 |                     {"interests": [{"id": "6003139266461"}]}  # Mobile games
 83 |                 ]
 84 |             }
 85 |         }
 86 |         
 87 |         # Test interest lists for backwards compatibility
 88 |         self.test_interests = {
 89 |             "valid_names": ["Japan", "Basketball", "Technology"],
 90 |             "mixed_validity": ["Japan", "invalidinterestname12345", "Basketball"],
 91 |             "valid_fbids": ["6003700426513", "6003397425735"],  # Japan, Tennis
 92 |             "invalid_fbids": ["999999999999", "000000000000"]
 93 |         }
 94 |         
 95 |     def _make_request(self, method: str, params: Dict[str, Any] = None, 
 96 |                      headers: Dict[str, str] = None) -> Dict[str, Any]:
 97 |         """Make a JSON-RPC request to the MCP server"""
 98 |         
 99 |         default_headers = {
100 |             "Content-Type": "application/json",
101 |             "Accept": "application/json, text/event-stream",
102 |             "User-Agent": "Audience-Estimation-Test-Client/1.0"
103 |         }
104 |         
105 |         if headers:
106 |             default_headers.update(headers)
107 |         
108 |         payload = {
109 |             "jsonrpc": "2.0",
110 |             "method": method,
111 |             "id": self.request_id
112 |         }
113 |         
114 |         if params:
115 |             payload["params"] = params
116 |         
117 |         try:
118 |             response = requests.post(
119 |                 self.endpoint,
120 |                 headers=default_headers,
121 |                 json=payload,
122 |                 timeout=20  # Increased timeout for delivery estimates
123 |             )
124 |             
125 |             self.request_id += 1
126 |             
127 |             return {
128 |                 "status_code": response.status_code,
129 |                 "headers": dict(response.headers),
130 |                 "json": response.json() if response.status_code == 200 else None,
131 |                 "text": response.text,
132 |                 "success": response.status_code == 200
133 |             }
134 |             
135 |         except requests.exceptions.RequestException as e:
136 |             return {
137 |                 "status_code": 0,
138 |                 "headers": {},
139 |                 "json": None,
140 |                 "text": str(e),
141 |                 "success": False,
142 |                 "error": str(e)
143 |             }
144 | 
145 |     def _check_for_errors(self, parsed_content: Dict[str, Any]) -> Dict[str, Any]:
146 |         """Properly handle both wrapped and direct error formats"""
147 |         
148 |         # Check for data wrapper format first
149 |         if "data" in parsed_content:
150 |             data = parsed_content["data"]
151 |             
152 |             # Handle case where data is already parsed (dict/list)
153 |             if isinstance(data, dict) and 'error' in data:
154 |                 return {
155 |                     "has_error": True,
156 |                     "error_message": data['error'],
157 |                     "error_details": data.get('details', ''),
158 |                     "format": "wrapped_dict"
159 |                 }
160 |             
161 |             # Handle case where data is a JSON string that needs parsing
162 |             if isinstance(data, str):
163 |                 try:
164 |                     error_data = json.loads(data)
165 |                     if 'error' in error_data:
166 |                         return {
167 |                             "has_error": True,
168 |                             "error_message": error_data['error'],
169 |                             "error_details": error_data.get('details', ''),
170 |                             "format": "wrapped_json"
171 |                         }
172 |                 except json.JSONDecodeError:
173 |                     # Data field exists but isn't valid JSON
174 |                     pass
175 |         
176 |         # Check for direct error format
177 |         if 'error' in parsed_content:
178 |             return {
179 |                 "has_error": True,
180 |                 "error_message": parsed_content['error'],
181 |                 "error_details": parsed_content.get('details', ''),
182 |                 "format": "direct"
183 |             }
184 |         
185 |         return {"has_error": False}
186 | 
187 |     def _extract_data(self, parsed_content: Dict[str, Any]) -> Any:
188 |         """Extract successful response data from various wrapper formats"""
189 |         
190 |         if "data" in parsed_content:
191 |             data = parsed_content["data"]
192 |             
193 |             # Handle case where data is already parsed
194 |             if isinstance(data, (list, dict)):
195 |                 return data
196 |             
197 |             # Handle case where data is a JSON string
198 |             if isinstance(data, str):
199 |                 try:
200 |                     return json.loads(data)
201 |                 except json.JSONDecodeError:
202 |                     return None
203 |         
204 |         # Handle direct format (data at top level)
205 |         if isinstance(parsed_content, (list, dict)):
206 |             return parsed_content
207 |         
208 |         return None
209 | 
210 |     def test_pl_only_reachestimate_bounds(self) -> Dict[str, Any]:
211 |         """Verify PL-only reachestimate returns expected bounds and midpoint.
212 |         
213 |         Prerequisite: Start server with fallback disabled so reachestimate is used directly.
214 |         Example:
215 |             export META_MCP_DISABLE_DELIVERY_FALLBACK=1
216 |             uv run python -m meta_ads_mcp --transport streamable-http --port 8080
217 |         """
218 |         print(f"\n🇵🇱 Testing PL-only reachestimate bounds (fallback disabled)")
219 |         local_account_id = "act_3182643988557192"
220 |         targeting_spec = {"geo_locations": {"countries": ["PL"]}}
221 |         expected_lower = 18600000
222 |         expected_upper = 21900000
223 |         expected_midpoint = 20250000
224 | 
225 |         result = self._make_request("tools/call", {
226 |             "name": "estimate_audience_size",
227 |             "arguments": {
228 |                 "account_id": local_account_id,
229 |                 "targeting": targeting_spec,
230 |                 "optimization_goal": "REACH"
231 |             }
232 |         })
233 | 
234 |         if not result["success"]:
235 |             print(f"   ❌ Request failed: {result.get('text', 'Unknown error')}")
236 |             return {"success": False, "error": result.get("text", "Unknown error")}
237 | 
238 |         response_data = result["json"]["result"]
239 |         content = response_data.get("content", [{}])[0].get("text", "")
240 |         try:
241 |             parsed_content = json.loads(content)
242 |         except json.JSONDecodeError:
243 |             print(f"   ❌ Invalid JSON response")
244 |             return {"success": False, "error": "Invalid JSON"}
245 | 
246 |         error_info = self._check_for_errors(parsed_content)
247 |         if error_info["has_error"]:
248 |             print(f"   ❌ API Error: {error_info['error_message']}")
249 |             return {"success": False, "error": error_info["error_message"], "error_format": error_info["format"]}
250 | 
251 |         if not parsed_content.get("success", False):
252 |             print(f"   ❌ Response indicates failure but no error message found")
253 |             return {"success": False, "error": "Unexpected failure"}
254 | 
255 |         details = parsed_content.get("estimate_details", {}) or {}
256 |         lower = details.get("users_lower_bound")
257 |         upper = details.get("users_upper_bound")
258 |         midpoint = parsed_content.get("estimated_audience_size")
259 |         fallback_used = parsed_content.get("fallback_endpoint_used")
260 | 
261 |         ok = (
262 |             lower == expected_lower and
263 |             upper == expected_upper and
264 |             midpoint == expected_midpoint and
265 |             (fallback_used is None)
266 |         )
267 | 
268 |         if ok:
269 |             print(f"   ✅ Bounds: {lower:,}–{upper:,}; midpoint: {midpoint:,}")
270 |             return {
271 |                 "success": True,
272 |                 "users_lower_bound": lower,
273 |                 "users_upper_bound": upper,
274 |                 "midpoint": midpoint
275 |             }
276 |         else:
277 |             print(f"   ❌ Unexpected values: lower={lower}, upper={upper}, midpoint={midpoint}, fallback={fallback_used}")
278 |             return {
279 |                 "success": False,
280 |                 "users_lower_bound": lower,
281 |                 "users_upper_bound": upper,
282 |                 "midpoint": midpoint,
283 |                 "fallback_endpoint_used": fallback_used
284 |             }
285 | 
286 |     def test_comprehensive_audience_estimation(self) -> Dict[str, Any]:
287 |         """Test comprehensive audience estimation with complex targeting"""
288 |         
289 |         print(f"\n🎯 Testing Comprehensive Audience Estimation")
290 |         results = {}
291 |         
292 |         for spec_name, targeting_spec in self.test_targeting_specs.items():
293 |             print(f"   📊 Testing targeting: '{spec_name}'")
294 |             
295 |             result = self._make_request("tools/call", {
296 |                 "name": "estimate_audience_size",
297 |                 "arguments": {
298 |                     "account_id": self.account_id,
299 |                     "targeting": targeting_spec,
300 |                     "optimization_goal": "REACH"
301 |                 }
302 |             })
303 |             
304 |             if not result["success"]:
305 |                 results[spec_name] = {
306 |                     "success": False,
307 |                     "error": result.get("text", "Unknown error")
308 |                 }
309 |                 print(f"   ❌ Failed: {result.get('text', 'Unknown error')}")
310 |                 continue
311 |             
312 |             # Parse response
313 |             response_data = result["json"]["result"]
314 |             content = response_data.get("content", [{}])[0].get("text", "")
315 |             
316 |             try:
317 |                 parsed_content = json.loads(content)
318 |                 
319 |                 # Check for errors using robust helper method
320 |                 error_info = self._check_for_errors(parsed_content)
321 |                 if error_info["has_error"]:
322 |                     results[spec_name] = {
323 |                         "success": False,
324 |                         "error": error_info["error_message"],
325 |                         "error_format": error_info["format"]
326 |                     }
327 |                     print(f"   ❌ API Error: {error_info['error_message']}")
328 |                     continue
329 |                 
330 |                 # Check for expected fields in comprehensive estimation
331 |                 has_success = parsed_content.get("success", False)
332 |                 has_estimate = "estimated_audience_size" in parsed_content
333 |                 has_details = "estimate_details" in parsed_content
334 |                 
335 |                 results[spec_name] = {
336 |                     "success": has_success and has_estimate,
337 |                     "has_estimate": has_estimate,
338 |                     "has_details": has_details,
339 |                     "estimated_size": parsed_content.get("estimated_audience_size", 0),
340 |                     "optimization_goal": parsed_content.get("optimization_goal"),
341 |                     "raw_response": parsed_content
342 |                 }
343 |                 
344 |                 if has_success and has_estimate:
345 |                     estimate_size = parsed_content.get("estimated_audience_size", 0)
346 |                     print(f"   ✅ Estimated audience: {estimate_size:,} people")
347 |                 else:
348 |                     print(f"   ⚠️  Incomplete response: success={has_success}, estimate={has_estimate}")
349 |                 
350 |             except json.JSONDecodeError:
351 |                 results[spec_name] = {
352 |                     "success": False,
353 |                     "error": "Invalid JSON response",
354 |                     "raw_content": content
355 |                 }
356 |                 print(f"   ❌ Invalid JSON: {content[:100]}...")
357 |         
358 |         return results
359 | 
360 |     def test_backwards_compatibility_interest_validation(self) -> Dict[str, Any]:
361 |         """Test backwards compatibility with simple interest validation"""
362 |         
363 |         print(f"\n🔄 Testing Backwards Compatibility (Interest Validation)")
364 |         results = {}
365 |         
366 |         # Test with interest names
367 |         print(f"   📝 Testing interest name validation")
368 |         
369 |         result = self._make_request("tools/call", {
370 |             "name": "estimate_audience_size",
371 |             "arguments": {
372 |                 "interest_list": self.test_interests["mixed_validity"]
373 |             }
374 |         })
375 |         
376 |         if result["success"]:
377 |             response_data = result["json"]["result"]
378 |             content = response_data.get("content", [{}])[0].get("text", "")
379 |             
380 |             try:
381 |                 parsed_content = json.loads(content)
382 |                 
383 |                 # Check for errors first
384 |                 error_info = self._check_for_errors(parsed_content)
385 |                 if error_info["has_error"]:
386 |                     results["interest_names"] = {
387 |                         "success": False,
388 |                         "error": error_info["error_message"],
389 |                         "error_format": error_info["format"]
390 |                     }
391 |                     print(f"   ❌ API Error: {error_info['error_message']}")
392 |                 else:
393 |                     # Extract data using robust helper method
394 |                     validations = self._extract_data(parsed_content)
395 |                     if validations and isinstance(validations, list):
396 |                         results["interest_names"] = {
397 |                             "success": True,
398 |                             "count": len(validations),
399 |                             "has_valid": any(v.get("valid", False) for v in validations),
400 |                             "has_invalid": any(not v.get("valid", True) for v in validations),
401 |                             "validations": validations
402 |                         }
403 |                         print(f"   ✅ Validated {len(validations)} interests")
404 |                         for validation in validations:
405 |                             status = "✅" if validation.get("valid") else "❌"
406 |                             print(f"      {status} {validation.get('name', 'N/A')}")
407 |                     else:
408 |                         results["interest_names"] = {"success": False, "error": "No validation data"}
409 |                         print(f"   ❌ No validation data returned")
410 |                     
411 |             except json.JSONDecodeError:
412 |                 results["interest_names"] = {"success": False, "error": "Invalid JSON"}
413 |                 print(f"   ❌ Invalid JSON response")
414 |         else:
415 |             results["interest_names"] = {"success": False, "error": result.get("text", "Request failed")}
416 |             print(f"   ❌ Request failed: {result.get('text', 'Unknown error')}")
417 |         
418 |         # Test with interest FBIDs
419 |         print(f"   🔢 Testing interest FBID validation")
420 |         
421 |         result = self._make_request("tools/call", {
422 |             "name": "estimate_audience_size",
423 |             "arguments": {
424 |                 "interest_fbid_list": self.test_interests["valid_fbids"]
425 |             }
426 |         })
427 |         
428 |         if result["success"]:
429 |             response_data = result["json"]["result"]
430 |             content = response_data.get("content", [{}])[0].get("text", "")
431 |             
432 |             try:
433 |                 parsed_content = json.loads(content)
434 |                 
435 |                 # Check for errors first
436 |                 error_info = self._check_for_errors(parsed_content)
437 |                 if error_info["has_error"]:
438 |                     results["interest_fbids"] = {
439 |                         "success": False,
440 |                         "error": error_info["error_message"],
441 |                         "error_format": error_info["format"]
442 |                     }
443 |                     print(f"   ❌ API Error: {error_info['error_message']}")
444 |                 else:
445 |                     # Extract data using robust helper method
446 |                     validations = self._extract_data(parsed_content)
447 |                     if validations and isinstance(validations, list):
448 |                         results["interest_fbids"] = {
449 |                             "success": True,
450 |                             "count": len(validations),
451 |                             "all_valid": all(v.get("valid", False) for v in validations),
452 |                             "validations": validations
453 |                         }
454 |                         print(f"   ✅ Validated {len(validations)} FBID interests")
455 |                         for validation in validations:
456 |                             status = "✅" if validation.get("valid") else "❌"
457 |                             print(f"      {status} FBID: {validation.get('id', 'N/A')}")
458 |                     else:
459 |                         results["interest_fbids"] = {"success": False, "error": "No validation data"}
460 |                         print(f"   ❌ No validation data returned")
461 |                     
462 |             except json.JSONDecodeError:
463 |                 results["interest_fbids"] = {"success": False, "error": "Invalid JSON"}
464 |                 print(f"   ❌ Invalid JSON response")
465 |         else:
466 |             results["interest_fbids"] = {"success": False, "error": result.get("text", "Request failed")}
467 |             print(f"   ❌ Request failed: {result.get('text', 'Unknown error')}")
468 |         
469 |         return results
470 | 
471 |     def test_different_optimization_goals(self) -> Dict[str, Any]:
472 |         """Test audience estimation with different optimization goals"""
473 |         
474 |         print(f"\n🎯 Testing Different Optimization Goals")
475 |         results = {}
476 |         
477 |         optimization_goals = ["REACH", "LINK_CLICKS", "CONVERSIONS", "APP_INSTALLS"]
478 |         base_targeting = self.test_targeting_specs["simple_demographics"]
479 |         
480 |         for goal in optimization_goals:
481 |             print(f"   🎯 Testing optimization goal: '{goal}'")
482 |             
483 |             result = self._make_request("tools/call", {
484 |                 "name": "estimate_audience_size",
485 |                 "arguments": {
486 |                     "account_id": self.account_id,
487 |                     "targeting": base_targeting,
488 |                     "optimization_goal": goal
489 |                 }
490 |             })
491 |             
492 |             if result["success"]:
493 |                 response_data = result["json"]["result"]
494 |                 content = response_data.get("content", [{}])[0].get("text", "")
495 |                 
496 |                 try:
497 |                     parsed_content = json.loads(content)
498 |                     
499 |                     # Check for errors first
500 |                     error_info = self._check_for_errors(parsed_content)
501 |                     if error_info["has_error"]:
502 |                         results[goal] = {
503 |                             "success": False,
504 |                             "error": error_info["error_message"],
505 |                             "error_format": error_info["format"]
506 |                         }
507 |                         print(f"   ❌ {goal}: {error_info['error_message']}")
508 |                     elif parsed_content.get("success", False):
509 |                         results[goal] = {
510 |                             "success": True,
511 |                             "estimated_size": parsed_content.get("estimated_audience_size", 0),
512 |                             "goal_used": parsed_content.get("optimization_goal")
513 |                         }
514 |                         estimate_size = parsed_content.get("estimated_audience_size", 0)
515 |                         print(f"   ✅ {goal}: {estimate_size:,} people")
516 |                     else:
517 |                         results[goal] = {
518 |                             "success": False,
519 |                             "error": "Response indicates failure but no error message found"
520 |                         }
521 |                         print(f"   ❌ {goal}: Response indicates failure but no error message found")
522 |                         
523 |                 except json.JSONDecodeError:
524 |                     results[goal] = {"success": False, "error": "Invalid JSON"}
525 |                     print(f"   ❌ {goal}: Invalid JSON response")
526 |             else:
527 |                 results[goal] = {"success": False, "error": result.get("text", "Request failed")}
528 |                 print(f"   ❌ {goal}: Request failed")
529 |         
530 |         return results
531 | 
532 |     def test_error_handling(self) -> Dict[str, Any]:
533 |         """Test error handling for invalid parameters"""
534 |         
535 |         print(f"\n⚠️  Testing Error Handling")
536 |         results = {}
537 |         
538 |         # Test 1: No parameters
539 |         print(f"   🚫 Testing with no parameters")
540 |         result = self._make_request("tools/call", {
541 |             "name": "estimate_audience_size",
542 |             "arguments": {}
543 |         })
544 |         
545 |         results["no_params"] = self._parse_error_response(result, "Should require targeting or interest validation")
546 |         
547 |         # Test 2: Account ID without targeting
548 |         print(f"   🚫 Testing account ID without targeting")
549 |         result = self._make_request("tools/call", {
550 |             "name": "estimate_audience_size",
551 |             "arguments": {
552 |                 "account_id": self.account_id
553 |             }
554 |         })
555 |         
556 |         results["no_targeting"] = self._parse_error_response(result, "Should require targeting specification")
557 |         
558 |         # Test 3: Invalid targeting structure
559 |         print(f"   🚫 Testing invalid targeting structure")
560 |         result = self._make_request("tools/call", {
561 |             "name": "estimate_audience_size",
562 |             "arguments": {
563 |                 "account_id": self.account_id,
564 |                 "targeting": {"invalid": "structure"}
565 |             }
566 |         })
567 |         
568 |         results["invalid_targeting"] = self._parse_error_response(result, "Should handle invalid targeting")
569 |         
570 |         # Test 4: Missing location in targeting (no geo_locations or custom audiences)
571 |         print(f"   🚫 Testing missing location in targeting")
572 |         result = self._make_request("tools/call", {
573 |             "name": "estimate_audience_size",
574 |             "arguments": {
575 |                 "account_id": self.account_id,
576 |                 # Interests present but no geo_locations and no custom_audiences
577 |                 "targeting": {
578 |                     "age_min": 18,
579 |                     "age_max": 35,
580 |                     "flexible_spec": [
581 |                         {"interests": [{"id": "6003371567474"}]}
582 |                     ]
583 |                 }
584 |             }
585 |         })
586 |         results["missing_location"] = self._parse_error_response(result, "Should require a location or custom audience")
587 |         
588 |         return results
589 |     
590 |     def _parse_error_response(self, result: Dict[str, Any], description: str) -> Dict[str, Any]:
591 |         """Helper to parse and validate error responses"""
592 |         
593 |         if not result["success"]:
594 |             print(f"   ✅ {description}: Request failed as expected")
595 |             return {"success": True, "error_type": "request_failure"}
596 |         
597 |         response_data = result["json"]["result"]
598 |         content = response_data.get("content", [{}])[0].get("text", "")
599 |         
600 |         try:
601 |             parsed_content = json.loads(content)
602 |             
603 |             # Use robust error checking helper method
604 |             error_info = self._check_for_errors(parsed_content)
605 |             if error_info["has_error"]:
606 |                 print(f"   ✅ {description}: {error_info['error_message']}")
607 |                 return {
608 |                     "success": True, 
609 |                     "error_message": error_info["error_message"],
610 |                     "error_format": error_info["format"]
611 |                 }
612 |             else:
613 |                 print(f"   ❌ {description}: No error returned when expected")
614 |                 return {"success": False, "unexpected_success": True}
615 |                 
616 |         except json.JSONDecodeError:
617 |             print(f"   ❌ {description}: Invalid JSON response")
618 |             return {"success": False, "error": "Invalid JSON"}
619 | 
620 |     def run_audience_estimation_tests(self) -> bool:
621 |         """Run comprehensive audience estimation tests"""
622 |         
623 |         print("🚀 Meta Ads Audience Estimation End-to-End Test Suite")
624 |         print("="*70)
625 |         
626 |         # Check server availability
627 |         try:
628 |             response = requests.get(f"{self.base_url}/", timeout=5)
629 |             server_running = response.status_code in [200, 404]
630 |         except:
631 |             server_running = False
632 |         
633 |         if not server_running:
634 |             print("❌ Server is not running at", self.base_url)
635 |             print("   Please start the server with:")
636 |             print("   python3 -m meta_ads_mcp --transport streamable-http --port 8080")
637 |             return False
638 |         
639 |         print("✅ Server is running")
640 |         print("🔐 Using implicit authentication from server")
641 |         print(f"🏢 Using account ID: {self.account_id}")
642 |         
643 |         # Test 0: PL-only reachestimate bounds verification
644 |         print("\n" + "="*70)
645 |         print("📋 PHASE 0: PL-only reachestimate bounds verification (fallback disabled)")
646 |         print("="*70)
647 |         pl_only_results = self.test_pl_only_reachestimate_bounds()
648 |         pl_only_success = pl_only_results.get("success", False)
649 | 
650 |         # Test 1: Comprehensive Audience Estimation
651 |         print("\n" + "="*70)
652 |         print("📋 PHASE 1: Testing Comprehensive Audience Estimation")
653 |         print("="*70)
654 |         
655 |         comprehensive_results = self.test_comprehensive_audience_estimation()
656 |         comprehensive_success = any(
657 |             result.get("success") and result.get("estimated_size", 0) > 0
658 |             for result in comprehensive_results.values()
659 |         )
660 |         
661 |         # Test 2: Backwards Compatibility
662 |         print("\n" + "="*70)
663 |         print("📋 PHASE 2: Testing Backwards Compatibility")
664 |         print("="*70)
665 |         
666 |         compat_results = self.test_backwards_compatibility_interest_validation()
667 |         compat_success = (
668 |             compat_results.get("interest_names", {}).get("success", False) and
669 |             compat_results.get("interest_fbids", {}).get("success", False)
670 |         )
671 |         
672 |         # Test 3: Different Optimization Goals
673 |         print("\n" + "="*70)
674 |         print("📋 PHASE 3: Testing Different Optimization Goals")
675 |         print("="*70)
676 |         
677 |         goals_results = self.test_different_optimization_goals()
678 |         goals_success = any(
679 |             result.get("success") and result.get("estimated_size", 0) > 0
680 |             for result in goals_results.values()
681 |         )
682 |         
683 |         # Test 4: Error Handling
684 |         print("\n" + "="*70)
685 |         print("📋 PHASE 4: Testing Error Handling")
686 |         print("="*70)
687 |         
688 |         error_results = self.test_error_handling()
689 |         error_success = all(
690 |             result.get("success", False) for result in error_results.values()
691 |         )
692 |         
693 |         # Final assessment
694 |         print("\n" + "="*70)
695 |         print("📊 FINAL RESULTS")
696 |         print("="*70)
697 |         
698 |         all_tests = [
699 |             ("PL-only Reachestimate Bounds", pl_only_success),
700 |             ("Comprehensive Estimation", comprehensive_success),
701 |             ("Backwards Compatibility", compat_success),
702 |             ("Optimization Goals", goals_success),
703 |             ("Error Handling", error_success)
704 |         ]
705 |         
706 |         passed_tests = sum(1 for _, success in all_tests if success)
707 |         total_tests = len(all_tests)
708 |         
709 |         for test_name, success in all_tests:
710 |             status = "✅ PASSED" if success else "❌ FAILED"
711 |             print(f"   • {test_name}: {status}")
712 |         
713 |         overall_success = passed_tests >= 3  # At least 3 out of 4 tests should pass
714 |         
715 |         if overall_success:
716 |             print(f"\n✅ Audience estimation tests: SUCCESS ({passed_tests}/{total_tests} passed)")
717 |             print("   • Comprehensive audience estimation is working")
718 |             print("   • Backwards compatibility is maintained")
719 |             print("   • Meta reachestimate API integration is functional")
720 |             return True
721 |         else:
722 |             print(f"\n❌ Audience estimation tests: FAILED ({passed_tests}/{total_tests} passed)")
723 |             print("   • Some audience estimation features are not working properly")
724 |             return False
725 | 
726 | 
727 | def main():
728 |     """Main test execution"""
729 |     tester = AudienceEstimationTester()
730 |     success = tester.run_audience_estimation_tests()
731 |     
732 |     if success:
733 |         print("\n🎉 All audience estimation tests passed!")
734 |     else:
735 |         print("\n⚠️  Some audience estimation tests failed - see details above")
736 |     
737 |     sys.exit(0 if success else 1)
738 | 
739 | 
740 | if __name__ == "__main__":
741 |     main()
```

--------------------------------------------------------------------------------
/tests/test_dsa_beneficiary.py:
--------------------------------------------------------------------------------

```python
  1 | #!/usr/bin/env python3
  2 | """
  3 | Unit tests for DSA (Digital Services Act) beneficiary functionality in Meta Ads MCP.
  4 | 
  5 | This module tests the implementation of DSA beneficiary field support for ad set creation,
  6 | including detection of DSA requirements, parameter validation, and error handling.
  7 | """
  8 | 
  9 | import pytest
 10 | import json
 11 | from unittest.mock import AsyncMock, patch, MagicMock
 12 | 
 13 | from meta_ads_mcp.core.adsets import create_adset, get_adset_details
 14 | from meta_ads_mcp.core.accounts import get_account_info
 15 | 
 16 | 
 17 | class TestDSABeneficiaryDetection:
 18 |     """Test cases for detecting DSA beneficiary requirements"""
 19 |     
 20 |     @pytest.mark.asyncio
 21 |     async def test_dsa_requirement_detection_business_account(self):
 22 |         """Test DSA requirement detection for European business accounts"""
 23 |         mock_account_response = {
 24 |             "id": "act_701351919139047",
 25 |             "name": "Test European Business Account",
 26 |             "account_status": 1,
 27 |             "business_country_code": "DE",  # Germany - DSA compliant
 28 |             "business_city": "Berlin",
 29 |             "currency": "EUR"
 30 |         }
 31 |         
 32 |         with patch('meta_ads_mcp.core.accounts.make_api_request', new_callable=AsyncMock) as mock_api:
 33 |             with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
 34 |                 mock_auth.return_value = "test_access_token"
 35 |                 mock_api.return_value = mock_account_response
 36 |                 
 37 |                 result = await get_account_info(account_id="act_701351919139047")
 38 |                 
 39 |                 # Handle new return format (dictionary instead of JSON string)
 40 |                 if isinstance(result, dict):
 41 |                     result_data = result
 42 |                 else:
 43 |                     result_data = json.loads(result)
 44 |                 
 45 |                 # Verify account info is retrieved
 46 |                 assert result_data["id"] == "act_701351919139047"
 47 |                 assert result_data["business_country_code"] == "DE"
 48 |                 
 49 |                 # Verify DSA requirement detection
 50 |                 assert "dsa_required" in result_data or "business_country_code" in result_data
 51 |     
 52 |     @pytest.mark.asyncio
 53 |     async def test_dsa_requirement_detection_non_dsa_region(self):
 54 |         """Test detection for non-DSA compliant regions"""
 55 |         mock_account_response = {
 56 |             "id": "act_123456789",
 57 |             "name": "Test US Account",
 58 |             "account_status": 1,
 59 |             "business_country_code": "US",  # US - not DSA compliant
 60 |             "business_city": "New York",
 61 |             "currency": "USD"
 62 |         }
 63 |         
 64 |         with patch('meta_ads_mcp.core.accounts.make_api_request', new_callable=AsyncMock) as mock_api:
 65 |             with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
 66 |                 mock_auth.return_value = "test_access_token"
 67 |                 mock_api.return_value = mock_account_response
 68 |                 
 69 |                 result = await get_account_info(account_id="act_123456789")
 70 |                 
 71 |                 # Handle new return format (dictionary instead of JSON string)
 72 |                 if isinstance(result, dict):
 73 |                     result_data = result
 74 |                 else:
 75 |                     result_data = json.loads(result)
 76 |                 
 77 |                 # Verify no DSA requirement for US accounts
 78 |                 assert result_data["business_country_code"] == "US"
 79 |     
 80 |     @pytest.mark.asyncio
 81 |     async def test_dsa_requirement_detection_error_handling(self):
 82 |         """Test error handling when account info cannot be retrieved"""
 83 |         with patch('meta_ads_mcp.core.accounts.make_api_request', new_callable=AsyncMock) as mock_api:
 84 |             with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
 85 |                 mock_auth.return_value = "test_access_token"
 86 |                 mock_api.side_effect = Exception("API Error")
 87 |                 
 88 |                 result = await get_account_info(account_id="act_invalid")
 89 |                 
 90 |                 # Handle new return format (dictionary instead of JSON string)
 91 |                 if isinstance(result, dict):
 92 |                     result_data = result
 93 |                 else:
 94 |                     result_data = json.loads(result)
 95 |                 
 96 |                 # Verify error is properly handled
 97 |                 assert "error" in result_data
 98 |     
 99 |     @pytest.mark.asyncio
100 |     async def test_account_info_requires_account_id(self):
101 |         """Test that get_account_info requires an account_id parameter"""
102 |         
103 |         with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
104 |             mock_auth.return_value = "test_access_token"
105 |             
106 |             # Test without account_id parameter
107 |             result = await get_account_info(account_id=None)
108 |             
109 |             # Handle new return format (dictionary instead of JSON string)
110 |             if isinstance(result, dict):
111 |                 result_data = result
112 |             else:
113 |                 result_data = json.loads(result)
114 |             
115 |             # Verify error message for missing account_id
116 |             assert "error" in result_data
117 |             assert "Account ID is required" in result_data["error"]["message"]
118 |             assert "Please specify an account_id parameter" in result_data["error"]["details"]
119 |             assert "example" in result_data["error"]
120 |     
121 |     @pytest.mark.asyncio
122 |     async def test_account_info_inaccessible_account_error(self):
123 |         """Test that get_account_info provides helpful error for inaccessible accounts"""
124 |         
125 |         # Mock permission error for direct account access (first API call)
126 |         mock_permission_error = {
127 |             "error": {
128 |                 "message": "Insufficient access privileges",
129 |                 "type": "OAuthException",
130 |                 "code": 200
131 |             }
132 |         }
133 |         
134 |         # Mock accessible accounts response (second API call)
135 |         mock_accessible_accounts = {
136 |             "data": [
137 |                 {"id": "act_123", "name": "Test Account 1"},
138 |                 {"id": "act_456", "name": "Test Account 2"}
139 |             ]
140 |         }
141 |         
142 |         with patch('meta_ads_mcp.core.accounts.make_api_request', new_callable=AsyncMock) as mock_api:
143 |             with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
144 |                 mock_auth.return_value = "test_access_token"
145 |                 # First call returns permission error, second call returns accessible accounts
146 |                 mock_api.side_effect = [mock_permission_error, mock_accessible_accounts]
147 |                 
148 |                 result = await get_account_info(account_id="act_inaccessible")
149 |                 
150 |                 # Handle new return format (dictionary instead of JSON string)
151 |                 if isinstance(result, dict):
152 |                     result_data = result
153 |                 else:
154 |                     result_data = json.loads(result)
155 |                 
156 |                 # Verify helpful error message for inaccessible account
157 |                 assert "error" in result_data
158 |                 assert "not accessible to your user account" in result_data["error"]["message"]
159 |                 assert "accessible_accounts" in result_data["error"]
160 |                 assert "suggestion" in result_data["error"]
161 |                 assert len(result_data["error"]["accessible_accounts"]) == 2
162 | 
163 | 
164 | class TestDSABeneficiaryParameter:
165 |     """Test cases for DSA beneficiary parameter support"""
166 |     
167 |     @pytest.mark.asyncio
168 |     async def test_create_adset_with_dsa_beneficiary_success(self):
169 |         """Test successful ad set creation with DSA beneficiary parameter"""
170 |         mock_response = {
171 |             "id": "23842588888640185",
172 |             "name": "Test Ad Set with DSA",
173 |             "status": "PAUSED",
174 |             "dsa_beneficiary": "Test Organization GmbH"
175 |         }
176 |         
177 |         with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
178 |             with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
179 |                 mock_auth.return_value = "test_access_token"
180 |                 mock_api.return_value = mock_response
181 |                 
182 |                 result = await create_adset(
183 |                     account_id="act_701351919139047",
184 |                     campaign_id="23842588888640184",
185 |                     name="Test Ad Set with DSA",
186 |                     optimization_goal="LINK_CLICKS",
187 |                     billing_event="IMPRESSIONS",
188 |                     dsa_beneficiary="Test Organization GmbH"
189 |                 )
190 |                 
191 |                 # Verify the API was called with DSA beneficiary parameter
192 |                 mock_api.assert_called_once()
193 |                 call_args = mock_api.call_args
194 |                 assert "dsa_beneficiary" in str(call_args)
195 |                 
196 |                 # Verify response contains ad set ID
197 |                 result_data = json.loads(result)
198 |                 assert "id" in result_data
199 |     
200 |     @pytest.mark.asyncio
201 |     async def test_create_adset_with_dsa_beneficiary_validation_error(self):
202 |         """Test error handling when DSA beneficiary parameter is invalid"""
203 |         mock_error_response = {
204 |             "error": {
205 |                 "message": "DSA beneficiary required for European compliance",
206 |                 "type": "OAuthException",
207 |                 "code": 100
208 |             }
209 |         }
210 |         
211 |         with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
212 |             with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
213 |                 mock_auth.return_value = "test_access_token"
214 |                 mock_api.side_effect = Exception("DSA beneficiary required for European compliance")
215 |                 
216 |                 result = await create_adset(
217 |                     account_id="act_701351919139047",
218 |                     campaign_id="23842588888640184",
219 |                     name="Test Ad Set",
220 |                     optimization_goal="LINK_CLICKS",
221 |                     billing_event="IMPRESSIONS"
222 |                     # No DSA beneficiary provided
223 |                 )
224 |                 
225 |                 # Verify error message is clear and actionable
226 |                 result_data = json.loads(result)
227 |                 
228 |                 # Handle response wrapped in 'data' field by meta_api_tool decorator
229 |                 if "data" in result_data:
230 |                     actual_data = json.loads(result_data["data"])
231 |                 else:
232 |                     actual_data = result_data
233 |                 
234 |                 assert "DSA beneficiary required" in actual_data.get("error", "")
235 |     
236 |     @pytest.mark.asyncio
237 |     async def test_create_adset_without_dsa_beneficiary_dsa_required(self):
238 |         """Test error when DSA beneficiary is required but not provided"""
239 |         with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
240 |             with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
241 |                 mock_auth.return_value = "test_access_token"
242 |                 mock_api.side_effect = Exception("Enter the person or organization that benefits from ads in this ad set")
243 |                 
244 |                 result = await create_adset(
245 |                     account_id="act_701351919139047",
246 |                     campaign_id="23842588888640184",
247 |                     name="Test Ad Set",
248 |                     optimization_goal="LINK_CLICKS",
249 |                     billing_event="IMPRESSIONS"
250 |                     # No DSA beneficiary provided
251 |                 )
252 |                 
253 |                 # Verify error message is clear and actionable
254 |                 result_data = json.loads(result)
255 |                 
256 |                 # Handle response wrapped in 'data' field by meta_api_tool decorator
257 |                 if "data" in result_data:
258 |                     actual_data = json.loads(result_data["data"])
259 |                 else:
260 |                     actual_data = result_data
261 |                 
262 |                 assert "benefits from ads" in actual_data.get("error", "")
263 |     
264 |     @pytest.mark.asyncio
265 |     async def test_create_adset_dsa_beneficiary_in_targeting(self):
266 |         """Test that DSA beneficiary is not added to targeting spec"""
267 |         with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
268 |             with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
269 |                 mock_auth.return_value = "test_access_token"
270 |                 mock_api.return_value = {"id": "23842588888640185"}
271 |                 
272 |                 result = await create_adset(
273 |                     account_id="act_701351919139047",
274 |                     campaign_id="23842588888640184",
275 |                     name="Test Ad Set",
276 |                     optimization_goal="LINK_CLICKS",
277 |                     billing_event="IMPRESSIONS",
278 |                     targeting={"geo_locations": {"countries": ["DE"]}},
279 |                     dsa_beneficiary="Test Organization GmbH"
280 |                 )
281 |                 
282 |                 # Verify the API was called
283 |                 mock_api.assert_called_once()
284 |                 call_args = mock_api.call_args
285 |                 
286 |                 # Verify DSA beneficiary is sent as separate parameter, not in targeting
287 |                 call_str = str(call_args)
288 |                 assert "dsa_beneficiary" in call_str
289 |                 assert "Test Organization GmbH" in call_str
290 |     
291 |     @pytest.mark.asyncio
292 |     async def test_create_adset_dsa_beneficiary_parameter_formats(self):
293 |         """Test different formats for DSA beneficiary parameter"""
294 |         test_cases = [
295 |             "Simple Organization",
296 |             "Organization with Special Chars: GmbH & Co. KG",
297 |             "Organization with Numbers: Test123 Inc.",
298 |             "Very Long Organization Name That Exceeds Normal Limits But Should Still Work"
299 |         ]
300 |         
301 |         for beneficiary_name in test_cases:
302 |             with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
303 |                 with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
304 |                     mock_auth.return_value = "test_access_token"
305 |                     mock_api.return_value = {"id": "23842588888640185"}
306 |                     
307 |                     result = await create_adset(
308 |                         account_id="act_701351919139047",
309 |                         campaign_id="23842588888640184",
310 |                         name="Test Ad Set",
311 |                         optimization_goal="LINK_CLICKS",
312 |                         billing_event="IMPRESSIONS",
313 |                         dsa_beneficiary=beneficiary_name
314 |                     )
315 |                     
316 |                     # Verify the API was called with the beneficiary name
317 |                     mock_api.assert_called_once()
318 |                     call_args = mock_api.call_args
319 |                     assert beneficiary_name in str(call_args)
320 | 
321 | 
322 | class TestDSAPermissionHandling:
323 |     """Test cases for permission-related DSA beneficiary issues"""
324 |     
325 |     @pytest.mark.asyncio
326 |     async def test_dsa_beneficiary_missing_business_management_permission(self):
327 |         """Test error handling when business_management permissions are missing"""
328 |         with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
329 |             with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
330 |                 mock_auth.return_value = "test_access_token"
331 |                 mock_api.side_effect = Exception("Permission denied: business_management permission required")
332 |                 
333 |                 result = await create_adset(
334 |                     account_id="act_701351919139047",
335 |                     campaign_id="23842588888640184",
336 |                     name="Test Ad Set",
337 |                     optimization_goal="LINK_CLICKS",
338 |                     billing_event="IMPRESSIONS",
339 |                     dsa_beneficiary="Test Organization GmbH"
340 |                 )
341 |                 
342 |                 # Verify permission error is handled
343 |                 result_data = json.loads(result)
344 |                 
345 |                 # Handle response wrapped in 'data' field by meta_api_tool decorator
346 |                 if "data" in result_data:
347 |                     actual_data = json.loads(result_data["data"])
348 |                 else:
349 |                     actual_data = result_data
350 |                 
351 |                 assert "permission" in actual_data.get("error", "").lower()
352 |     
353 |     @pytest.mark.asyncio
354 |     async def test_dsa_beneficiary_api_limitation_handling(self):
355 |         """Test handling when API doesn't support dsa_beneficiary parameter"""
356 |         with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
357 |             with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
358 |                 mock_auth.return_value = "test_access_token"
359 |                 mock_api.side_effect = Exception("Parameter dsa_beneficiary is not supported")
360 |                 
361 |                 result = await create_adset(
362 |                     account_id="act_701351919139047",
363 |                     campaign_id="23842588888640184",
364 |                     name="Test Ad Set",
365 |                     optimization_goal="LINK_CLICKS",
366 |                     billing_event="IMPRESSIONS",
367 |                     dsa_beneficiary="Test Organization GmbH"
368 |                 )
369 |                 
370 |                 # Verify API limitation error is handled
371 |                 result_data = json.loads(result)
372 |                 
373 |                 # Handle response wrapped in 'data' field by meta_api_tool decorator
374 |                 if "data" in result_data:
375 |                     actual_data = json.loads(result_data["data"])
376 |                 else:
377 |                     actual_data = result_data
378 |                 
379 |                 assert "not supported" in actual_data.get("error", "").lower()
380 | 
381 | 
382 | class TestDSARegionalCompliance:
383 |     """Test cases for regional DSA compliance"""
384 |     
385 |     @pytest.mark.asyncio
386 |     async def test_dsa_compliance_european_regions(self):
387 |         """Test DSA compliance for European regions"""
388 |         european_countries = ["DE", "FR", "IT", "ES", "NL", "BE", "AT", "IE", "DK", "SE", "FI", "NO"]
389 |         
390 |         for country_code in european_countries:
391 |             mock_account_response = {
392 |                 "id": f"act_{country_code.lower()}",
393 |                 "name": f"Test {country_code} Account",
394 |                 "account_status": 1,
395 |                 "business_country_code": country_code,
396 |                 "currency": "EUR"
397 |             }
398 |             
399 |             with patch('meta_ads_mcp.core.accounts.make_api_request', new_callable=AsyncMock) as mock_api:
400 |                 with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
401 |                     mock_auth.return_value = "test_access_token"
402 |                     mock_api.return_value = mock_account_response
403 |                     
404 |                     result = await get_account_info(account_id=f"act_{country_code.lower()}")
405 |                     result_data = json.loads(result)
406 |                     
407 |                     # Verify European countries are detected as DSA compliant
408 |                     assert result_data["business_country_code"] == country_code
409 |     
410 |     @pytest.mark.asyncio
411 |     async def test_dsa_compliance_non_european_regions(self):
412 |         """Test DSA compliance for non-European regions"""
413 |         non_european_countries = ["US", "CA", "AU", "JP", "BR", "IN"]
414 |         
415 |         for country_code in non_european_countries:
416 |             mock_account_response = {
417 |                 "id": f"act_{country_code.lower()}",
418 |                 "name": f"Test {country_code} Account",
419 |                 "account_status": 1,
420 |                 "business_country_code": country_code,
421 |                 "currency": "USD"
422 |             }
423 |             
424 |             with patch('meta_ads_mcp.core.accounts.make_api_request', new_callable=AsyncMock) as mock_api:
425 |                 with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
426 |                     mock_auth.return_value = "test_access_token"
427 |                     mock_api.return_value = mock_account_response
428 |                     
429 |                     result = await get_account_info(account_id=f"act_{country_code.lower()}")
430 |                     result_data = json.loads(result)
431 |                     
432 |                     # Verify non-European countries are not DSA compliant
433 |                     assert result_data["business_country_code"] == country_code
434 |     
435 |     @pytest.mark.asyncio
436 |     async def test_dsa_beneficiary_validation_by_region(self):
437 |         """Test DSA beneficiary validation based on region"""
438 |         # Test European account (should require DSA beneficiary)
439 |         european_mock_response = {
440 |             "id": "act_de",
441 |             "name": "German Account",
442 |             "business_country_code": "DE"
443 |         }
444 |         
445 |         # Test US account (should not require DSA beneficiary)
446 |         us_mock_response = {
447 |             "id": "act_us",
448 |             "name": "US Account",
449 |             "business_country_code": "US"
450 |         }
451 |         
452 |         # Test European account
453 |         with patch('meta_ads_mcp.core.accounts.make_api_request', new_callable=AsyncMock) as mock_api:
454 |             with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
455 |                 mock_auth.return_value = "test_access_token"
456 |                 mock_api.return_value = european_mock_response
457 |                 
458 |                 result = await get_account_info(account_id="act_de")
459 |                 result_data = json.loads(result)
460 |                 
461 |                 assert result_data["business_country_code"] == "DE"
462 |         
463 |         # Test US account
464 |         with patch('meta_ads_mcp.core.accounts.make_api_request', new_callable=AsyncMock) as mock_api:
465 |             with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
466 |                 mock_auth.return_value = "test_access_token"
467 |                 mock_api.return_value = us_mock_response
468 |                 
469 |                 result = await get_account_info(account_id="act_us")
470 |                 result_data = json.loads(result)
471 |                 
472 |                 assert result_data["business_country_code"] == "US"
473 | 
474 | 
475 | class TestDSAErrorHandling:
476 |     """Test cases for comprehensive DSA error handling"""
477 |     
478 |     @pytest.mark.asyncio
479 |     async def test_dsa_beneficiary_clear_error_message(self):
480 |         """Test that DSA-related errors provide clear, actionable messages"""
481 |         error_scenarios = [
482 |             ("DSA beneficiary required for European compliance", "DSA beneficiary"),
483 |             ("Enter the person or organization that benefits from ads", "benefits from ads"),
484 |             ("Permission denied: business_management required", "permission"),
485 |             ("Parameter dsa_beneficiary is not supported", "not supported")
486 |         ]
487 |         
488 |         for error_message, expected_keyword in error_scenarios:
489 |             with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
490 |                 with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
491 |                     mock_auth.return_value = "test_access_token"
492 |                     mock_api.side_effect = Exception(error_message)
493 |                     
494 |                     result = await create_adset(
495 |                         account_id="act_701351919139047",
496 |                         campaign_id="23842588888640184",
497 |                         name="Test Ad Set",
498 |                         optimization_goal="LINK_CLICKS",
499 |                         billing_event="IMPRESSIONS"
500 |                     )
501 |                     
502 |                     # Verify error message contains expected keyword
503 |                     result_data = json.loads(result)
504 |                     
505 |                     # Handle response wrapped in 'data' field by meta_api_tool decorator
506 |                     if "data" in result_data:
507 |                         actual_data = json.loads(result_data["data"])
508 |                     else:
509 |                         actual_data = result_data
510 |                     
511 |                     assert expected_keyword.lower() in actual_data.get("error", "").lower()
512 |     
513 |     @pytest.mark.asyncio
514 |     async def test_dsa_beneficiary_fallback_behavior(self):
515 |         """Test fallback behavior for unexpected DSA-related errors"""
516 |         with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
517 |             with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
518 |                 mock_auth.return_value = "test_access_token"
519 |                 mock_api.side_effect = Exception("Unexpected DSA-related error")
520 |                 
521 |                 result = await create_adset(
522 |                     account_id="act_701351919139047",
523 |                     campaign_id="23842588888640184",
524 |                     name="Test Ad Set",
525 |                     optimization_goal="LINK_CLICKS",
526 |                     billing_event="IMPRESSIONS"
527 |                 )
528 |                 
529 |                 # Verify fallback error handling
530 |                 result_data = json.loads(result)
531 |                 
532 |                 # Handle response wrapped in 'data' field by meta_api_tool decorator
533 |                 if "data" in result_data:
534 |                     actual_data = json.loads(result_data["data"])
535 |                 else:
536 |                     actual_data = result_data
537 |                 
538 |                 assert "error" in actual_data
539 | 
540 | 
541 | class TestDSABeneficiaryRetrieval:
542 |     """Test cases for retrieving DSA beneficiary information from ad sets"""
543 |     
544 |     @pytest.mark.asyncio
545 |     async def test_get_adset_details_with_dsa_beneficiary(self):
546 |         """Test retrieving ad set details that include DSA beneficiary field"""
547 |         mock_adset_response = {
548 |             "id": "120229746629010183",
549 |             "name": "Test Ad Set with DSA",
550 |             "campaign_id": "120229656904980183",
551 |             "status": "PAUSED",
552 |             "daily_budget": "1000",
553 |             "targeting": {
554 |                 "geo_locations": {"countries": ["US"]},
555 |                 "age_min": 25,
556 |                 "age_max": 65
557 |             },
558 |             "bid_amount": 200,
559 |             "optimization_goal": "LINK_CLICKS",
560 |             "billing_event": "IMPRESSIONS",
561 |             "dsa_beneficiary": "Test Organization Inc"
562 |         }
563 |         
564 |         with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
565 |             with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
566 |                 mock_auth.return_value = "test_access_token"
567 |                 mock_api.return_value = mock_adset_response
568 |                 
569 |                 result = await get_adset_details(adset_id="120229746629010183")
570 |                 result_data = json.loads(result)
571 |                 
572 |                 # Verify DSA beneficiary field is present and correct
573 |                 assert "dsa_beneficiary" in result_data
574 |                 assert result_data["dsa_beneficiary"] == "Test Organization Inc"
575 |                 assert result_data["id"] == "120229746629010183"
576 |     
577 |     @pytest.mark.asyncio
578 |     async def test_get_adset_details_without_dsa_beneficiary(self):
579 |         """Test retrieving ad set details that don't have DSA beneficiary field"""
580 |         mock_adset_response = {
581 |             "id": "120229746624860183",
582 |             "name": "Test Ad Set without DSA",
583 |             "campaign_id": "120229656904980183",
584 |             "status": "PAUSED",
585 |             "daily_budget": "1000",
586 |             "targeting": {
587 |                 "geo_locations": {"countries": ["US"]},
588 |                 "age_min": 25,
589 |                 "age_max": 65
590 |             },
591 |             "bid_amount": 200,
592 |             "optimization_goal": "LINK_CLICKS",
593 |             "billing_event": "IMPRESSIONS"
594 |             # No dsa_beneficiary field
595 |         }
596 |         
597 |         with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
598 |             with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
599 |                 mock_auth.return_value = "test_access_token"
600 |                 mock_api.return_value = mock_adset_response
601 |                 
602 |                 result = await get_adset_details(adset_id="120229746624860183")
603 |                 result_data = json.loads(result)
604 |                 
605 |                 # Verify ad set details are retrieved correctly
606 |                 assert result_data["id"] == "120229746624860183"
607 |                 assert "dsa_beneficiary" not in result_data  # Should not be present
608 |     
609 |     @pytest.mark.asyncio
610 |     async def test_get_adset_details_empty_dsa_beneficiary(self):
611 |         """Test retrieving ad set details with empty DSA beneficiary field"""
612 |         mock_adset_response = {
613 |             "id": "120229746629010183",
614 |             "name": "Test Ad Set with Empty DSA",
615 |             "campaign_id": "120229656904980183",
616 |             "status": "PAUSED",
617 |             "daily_budget": "1000",
618 |             "targeting": {
619 |                 "geo_locations": {"countries": ["US"]}
620 |             },
621 |             "bid_amount": 200,
622 |             "optimization_goal": "LINK_CLICKS",
623 |             "billing_event": "IMPRESSIONS",
624 |             "dsa_beneficiary": ""  # Empty string
625 |         }
626 |         
627 |         with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
628 |             with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
629 |                 mock_auth.return_value = "test_access_token"
630 |                 mock_api.return_value = mock_adset_response
631 |                 
632 |                 result = await get_adset_details(adset_id="120229746629010183")
633 |                 result_data = json.loads(result)
634 |                 
635 |                 # Verify empty DSA beneficiary field is handled correctly
636 |                 assert "dsa_beneficiary" in result_data
637 |                 assert result_data["dsa_beneficiary"] == ""
638 |     
639 |     @pytest.mark.asyncio
640 |     async def test_get_adset_details_dsa_beneficiary_field_requested(self):
641 |         """Test that the API request includes dsa_beneficiary in the fields parameter"""
642 |         with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
643 |             with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
644 |                 mock_auth.return_value = "test_access_token"
645 |                 mock_api.return_value = {"id": "120229746629010183"}
646 |                 
647 |                 result = await get_adset_details(adset_id="120229746629010183")
648 |                 
649 |                 # Verify the API was called with dsa_beneficiary in fields
650 |                 mock_api.assert_called_once()
651 |                 call_args = mock_api.call_args
652 |                 assert "dsa_beneficiary" in str(call_args)
653 |     
654 |     @pytest.mark.asyncio
655 |     async def test_get_adset_details_error_handling(self):
656 |         """Test error handling when retrieving ad set details fails"""
657 |         with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
658 |             with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
659 |                 mock_auth.return_value = "test_access_token"
660 |                 mock_api.side_effect = Exception("Ad set not found")
661 |                 
662 |                 result = await get_adset_details(adset_id="invalid_adset_id")
663 |                 
664 |                 # Handle response format - could be dict or JSON string
665 |                 if isinstance(result, dict):
666 |                     result_data = result
667 |                 else:
668 |                     result_data = json.loads(result)
669 |                 
670 |                 # Verify error is properly handled
671 |                 assert "error" in result_data 
```

--------------------------------------------------------------------------------
/tests/test_budget_update_e2e.py:
--------------------------------------------------------------------------------

```python
  1 | #!/usr/bin/env python3
  2 | """
  3 | End-to-End Budget Update Test for Meta Ads MCP
  4 | 
  5 | This test validates that the budget update functionality correctly updates
  6 | ad set budgets through the Meta Ads API through a pre-authenticated MCP server.
  7 | 
  8 | Test functions:
  9 | - update_adset (with daily_budget parameter)
 10 | - update_adset (with lifetime_budget parameter)
 11 | - update_adset (with both budget types)
 12 | - Error handling for invalid budgets
 13 | - Budget update with other parameters
 14 | """
 15 | 
 16 | import requests
 17 | import json
 18 | import os
 19 | import sys
 20 | import time
 21 | from typing import Dict, Any, List
 22 | 
 23 | # Load environment variables from .env file
 24 | try:
 25 |     from dotenv import load_dotenv
 26 |     load_dotenv()
 27 |     print("✅ Loaded environment variables from .env file")
 28 | except ImportError:
 29 |     print("⚠️  python-dotenv not installed, using system environment variables only")
 30 | 
 31 | class BudgetUpdateTester:
 32 |     """Test suite focused on budget update functionality"""
 33 |     
 34 |     def __init__(self, base_url: str = "http://localhost:8080"):
 35 |         self.base_url = base_url.rstrip('/')
 36 |         self.endpoint = f"{self.base_url}/mcp/"
 37 |         self.request_id = 1
 38 |         
 39 |         # Test data for validation
 40 |         self.test_budgets = {
 41 |             "daily_budgets": ["5000", "10000", "25000"],  # $50, $100, $250
 42 |             "lifetime_budgets": ["50000", "100000", "250000"],  # $500, $1000, $2500
 43 |             "invalid_budgets": ["-1000", "0", "invalid_budget", "999999999999"]
 44 |         }
 45 |         
 46 |         # Test ad set IDs specifically created for budget testing
 47 |         self.test_adset_ids = [
 48 |             "120229734413930183",
 49 |             "120229734413930183",
 50 |             "120229734413930183",
 51 |             "120229734413930183",
 52 |             "120229734413930183",
 53 |             "120229734413930183"
 54 |         ]
 55 |         
 56 |         # Rate limiting tracking
 57 |         self.rate_limit_hit = False
 58 |         self.last_rate_limit_time = 0
 59 |     
 60 |     def _wait_for_rate_limit(self, error_msg: str) -> bool:
 61 |         """Wait if we hit rate limiting, return True if we should retry"""
 62 |         if "rate limit" in error_msg.lower() or "too many changes" in error_msg.lower():
 63 |             if not self.rate_limit_hit:
 64 |                 print(f"   ⏳ Rate limit hit! Waiting 1 hour before continuing...")
 65 |                 print(f"      • Meta Ads API allows only 4 budget changes per hour")
 66 |                 print(f"      • You can manually continue by pressing Enter when ready")
 67 |                 self.rate_limit_hit = True
 68 |                 self.last_rate_limit_time = time.time()
 69 |                 
 70 |                 # Wait for user input or 1 hour
 71 |                 try:
 72 |                     input("   Press Enter when ready to continue (or wait 1 hour)...")
 73 |                     print("   ✅ Continuing with tests...")
 74 |                     return True
 75 |                 except KeyboardInterrupt:
 76 |                     print("   ❌ Test interrupted by user")
 77 |                     return False
 78 |             else:
 79 |                 print(f"   ⏳ Still rate limited, waiting...")
 80 |                 return False
 81 |         return False
 82 | 
 83 |     def _make_request(self, method: str, params: Dict[str, Any] = None, 
 84 |                      headers: Dict[str, str] = None) -> Dict[str, Any]:
 85 |         """Make a JSON-RPC request to the MCP server"""
 86 |         
 87 |         default_headers = {
 88 |             "Content-Type": "application/json",
 89 |             "Accept": "application/json, text/event-stream",
 90 |             "User-Agent": "Budget-Update-Test-Client/1.0"
 91 |         }
 92 |         
 93 |         if headers:
 94 |             default_headers.update(headers)
 95 |         
 96 |         payload = {
 97 |             "jsonrpc": "2.0",
 98 |             "method": method,
 99 |             "id": self.request_id
100 |         }
101 |         
102 |         if params:
103 |             payload["params"] = params
104 |         
105 |         try:
106 |             response = requests.post(
107 |                 self.endpoint,
108 |                 headers=default_headers,
109 |                 json=payload,
110 |                 timeout=15
111 |             )
112 |             
113 |             self.request_id += 1
114 |             
115 |             return {
116 |                 "status_code": response.status_code,
117 |                 "headers": dict(response.headers),
118 |                 "json": response.json() if response.status_code == 200 else None,
119 |                 "text": response.text,
120 |                 "success": response.status_code == 200
121 |             }
122 |             
123 |         except requests.exceptions.RequestException as e:
124 |             return {
125 |                 "status_code": 0,
126 |                 "headers": {},
127 |                 "json": None,
128 |                 "text": str(e),
129 |                 "success": False,
130 |                 "error": str(e)
131 |             }
132 | 
133 |     def test_daily_budget_update(self) -> Dict[str, Any]:
134 |         """Test daily budget update functionality"""
135 |         
136 |         print(f"\n💰 Testing daily budget update function")
137 |         results = {}
138 |         
139 |         for budget in self.test_budgets["daily_budgets"]:
140 |             print(f"   💰 Updating daily budget to: ${int(budget)/100:.2f}")
141 |             
142 |             # Retry logic for rate limiting
143 |             max_retries = 3
144 |             for attempt in range(max_retries):
145 |                 result = self._make_request("tools/call", {
146 |                     "name": "update_adset",
147 |                     "arguments": {
148 |                         "adset_id": self.test_adset_ids[0],
149 |                         "daily_budget": budget
150 |                     }
151 |                 })
152 |                 
153 |                 if not result["success"]:
154 |                     results[budget] = {
155 |                         "success": False,
156 |                         "error": result.get("text", "Unknown error")
157 |                     }
158 |                     print(f"   ❌ Failed: {result.get('text', 'Unknown error')}")
159 |                     break
160 |                 
161 |                 # Parse response
162 |                 response_data = result["json"]["result"]
163 |                 content = response_data.get("content", [{}])[0].get("text", "")
164 |                 
165 |                 try:
166 |                     parsed_content = json.loads(content)
167 |                     
168 |                     # Check for successful update indicators
169 |                     has_id = "id" in parsed_content
170 |                     has_daily_budget = "daily_budget" in parsed_content
171 |                     has_success = "success" in parsed_content
172 |                     has_error = "error" in parsed_content
173 |                     
174 |                     # Handle rate limiting and API errors
175 |                     if has_error:
176 |                         error_msg = parsed_content.get("error", "")
177 |                         if "rate limit" in error_msg.lower() or "too many changes" in error_msg.lower():
178 |                             if attempt < max_retries - 1:  # Don't retry on last attempt
179 |                                 if self._wait_for_rate_limit(error_msg):
180 |                                     print(f"   🔄 Retrying after rate limit...")
181 |                                     continue
182 |                                 else:
183 |                                     break
184 |                             else:
185 |                                 results[budget] = {
186 |                                     "success": True,  # Rate limiting is expected behavior
187 |                                     "has_success": False,
188 |                                     "has_error": True,
189 |                                     "rate_limited": True,
190 |                                     "error_message": error_msg
191 |                                 }
192 |                                 print(f"   ⚠️  Rate limited (expected): {error_msg}")
193 |                                 break
194 |                         else:
195 |                             results[budget] = {
196 |                                 "success": False,
197 |                                 "has_error": True,
198 |                                 "error_message": error_msg
199 |                             }
200 |                             print(f"   ❌ API Error: {error_msg}")
201 |                             break
202 |                     
203 |                     results[budget] = {
204 |                         "success": True,
205 |                         "has_id": has_id,
206 |                         "has_daily_budget": has_daily_budget,
207 |                         "has_success": has_success,
208 |                         "updated_budget": parsed_content.get("daily_budget", "N/A"),
209 |                         "adset_id": parsed_content.get("id", "N/A")
210 |                     }
211 |                     
212 |                     print(f"   ✅ Updated daily budget to ${int(budget)/100:.2f}")
213 |                     print(f"      • Ad Set ID: {parsed_content.get('id', 'N/A')}")
214 |                     print(f"      • Success: {parsed_content.get('success', 'N/A')}")
215 |                     print(f"      • Raw Response: {parsed_content}")
216 |                     
217 |                     # Note: Meta Ads API returns {"success": true} for updates
218 |                     # The actual updated values can be verified by fetching ad set details
219 |                     break  # Success, exit retry loop
220 |                     
221 |                 except json.JSONDecodeError:
222 |                     results[budget] = {
223 |                         "success": False,
224 |                         "error": "Invalid JSON response",
225 |                         "raw_content": content
226 |                     }
227 |                     print(f"   ❌ Invalid JSON: {content}")
228 |                     break
229 |         
230 |         return results
231 | 
232 |     def test_lifetime_budget_update(self) -> Dict[str, Any]:
233 |         """Test lifetime budget update functionality"""
234 |         
235 |         print(f"\n💰 Testing lifetime budget update function")
236 |         print(f"   ⚠️  Note: Meta Ads API may reject lifetime budget updates if ad set has daily budget")
237 |         results = {}
238 |         
239 |         for budget in self.test_budgets["lifetime_budgets"]:
240 |             print(f"   💰 Updating lifetime budget to: ${int(budget)/100:.2f}")
241 |             
242 |             # Retry logic for rate limiting
243 |             max_retries = 3
244 |             for attempt in range(max_retries):
245 |                 result = self._make_request("tools/call", {
246 |                     "name": "update_adset",
247 |                     "arguments": {
248 |                         "adset_id": self.test_adset_ids[1],
249 |                         "lifetime_budget": budget
250 |                     }
251 |                 })
252 |                 
253 |                 if not result["success"]:
254 |                     results[budget] = {
255 |                         "success": False,
256 |                         "error": result.get("text", "Unknown error")
257 |                     }
258 |                     print(f"   ❌ Failed: {result.get('text', 'Unknown error')}")
259 |                     break
260 |                 
261 |                 # Parse response
262 |                 response_data = result["json"]["result"]
263 |                 content = response_data.get("content", [{}])[0].get("text", "")
264 |                 
265 |                 try:
266 |                     parsed_content = json.loads(content)
267 |                     
268 |                     # Check for successful update indicators
269 |                     has_id = "id" in parsed_content
270 |                     has_lifetime_budget = "lifetime_budget" in parsed_content
271 |                     has_success = "success" in parsed_content
272 |                     has_error = "error" in parsed_content
273 |                     
274 |                     # Handle rate limiting and API errors
275 |                     if has_error:
276 |                         error_msg = parsed_content.get("error", "")
277 |                         if "rate limit" in error_msg.lower() or "too many changes" in error_msg.lower():
278 |                             if attempt < max_retries - 1:  # Don't retry on last attempt
279 |                                 if self._wait_for_rate_limit(error_msg):
280 |                                     print(f"   🔄 Retrying after rate limit...")
281 |                                     continue
282 |                                 else:
283 |                                     break
284 |                             else:
285 |                                 results[budget] = {
286 |                                     "success": True,  # Rate limiting is expected behavior
287 |                                     "has_success": False,
288 |                                     "has_error": True,
289 |                                     "rate_limited": True,
290 |                                     "error_message": error_msg
291 |                                 }
292 |                                 print(f"   ⚠️  Rate limited (expected): {error_msg}")
293 |                                 break
294 |                         elif "should be recurring budget" in error_msg.lower() or "cannot switch" in error_msg.lower():
295 |                             results[budget] = {
296 |                                 "success": False,
297 |                                 "has_error": True,
298 |                                 "api_limitation": "Cannot switch from daily to lifetime budget",
299 |                                 "error_message": error_msg
300 |                             }
301 |                             print(f"   ⚠️  API Limitation: {error_msg}")
302 |                             break
303 |                         else:
304 |                             results[budget] = {
305 |                                 "success": False,
306 |                                 "has_error": True,
307 |                                 "error_message": error_msg
308 |                             }
309 |                             print(f"   ❌ API Error: {error_msg}")
310 |                             break
311 |                     
312 |                     results[budget] = {
313 |                         "success": True,
314 |                         "has_id": has_id,
315 |                         "has_lifetime_budget": has_lifetime_budget,
316 |                         "has_success": has_success,
317 |                         "updated_budget": parsed_content.get("lifetime_budget", "N/A"),
318 |                         "adset_id": parsed_content.get("id", "N/A")
319 |                     }
320 |                     
321 |                     print(f"   ✅ Updated lifetime budget to ${int(budget)/100:.2f}")
322 |                     print(f"      • Ad Set ID: {parsed_content.get('id', 'N/A')}")
323 |                     print(f"      • Success: {parsed_content.get('success', 'N/A')}")
324 |                     
325 |                     # Note: Meta Ads API returns {"success": true} for updates
326 |                     # The actual updated values can be verified by fetching ad set details
327 |                     break  # Success, exit retry loop
328 |                     
329 |                 except json.JSONDecodeError:
330 |                     results[budget] = {
331 |                         "success": False,
332 |                         "error": "Invalid JSON response",
333 |                         "raw_content": content
334 |                     }
335 |                     print(f"   ❌ Invalid JSON: {content}")
336 |                     break
337 |         
338 |         return results
339 | 
340 |     def test_both_budget_types_update(self) -> Dict[str, Any]:
341 |         """Test updating both daily and lifetime budget simultaneously"""
342 |         
343 |         print(f"\n💰 Testing both budget types update function")
344 |         print(f"   ⚠️  Note: Meta Ads API may reject this if ad set has existing daily budget")
345 |         
346 |         daily_budget = "15000"  # $150
347 |         lifetime_budget = "150000"  # $1500
348 |         
349 |         print(f"   💰 Updating both budgets - Daily: ${int(daily_budget)/100:.2f}, Lifetime: ${int(lifetime_budget)/100:.2f}")
350 |         
351 |         result = self._make_request("tools/call", {
352 |             "name": "update_adset",
353 |                                 "arguments": {
354 |                         "adset_id": self.test_adset_ids[2],
355 |                         "daily_budget": daily_budget,
356 |                         "lifetime_budget": lifetime_budget
357 |                     }
358 |         })
359 |         
360 |         if not result["success"]:
361 |             return {
362 |                 "success": False,
363 |                 "error": result.get("text", "Unknown error")
364 |             }
365 |         
366 |         # Parse response
367 |         response_data = result["json"]["result"]
368 |         content = response_data.get("content", [{}])[0].get("text", "")
369 |         
370 |         try:
371 |             parsed_content = json.loads(content)
372 |             
373 |             if "error" in parsed_content:
374 |                 error_msg = parsed_content.get("error", "")
375 |                 if "rate limit" in error_msg.lower() or "too many changes" in error_msg.lower():
376 |                     return {
377 |                         "success": True,  # Rate limiting is expected behavior
378 |                         "rate_limited": True,
379 |                         "error_message": error_msg
380 |                     }
381 |                 else:
382 |                     return {
383 |                         "success": False,
384 |                         "error": error_msg,
385 |                         "api_limitation": "Cannot have both daily and lifetime budgets"
386 |                     }
387 |             
388 |             # Check for successful update indicators
389 |             has_id = "id" in parsed_content
390 |             has_daily_budget = "daily_budget" in parsed_content
391 |             has_lifetime_budget = "lifetime_budget" in parsed_content
392 |             has_success = "success" in parsed_content
393 |             
394 |             result_data = {
395 |                 "success": True,
396 |                 "has_id": has_id,
397 |                 "has_daily_budget": has_daily_budget,
398 |                 "has_lifetime_budget": has_lifetime_budget,
399 |                 "has_success": has_success,
400 |                 "daily_budget": parsed_content.get("daily_budget", "N/A"),
401 |                 "lifetime_budget": parsed_content.get("lifetime_budget", "N/A"),
402 |                 "adset_id": parsed_content.get("id", "N/A")
403 |             }
404 |             
405 |             print(f"   ✅ Updated both budgets successfully")
406 |             print(f"      • Ad Set ID: {parsed_content.get('id', 'N/A')}")
407 |             print(f"      • Success: {parsed_content.get('success', 'N/A')}")
408 |             
409 |             # Note: Meta Ads API returns {"success": true} for updates
410 |             # The actual updated values can be verified by fetching ad set details
411 |             
412 |             return result_data
413 |             
414 |         except json.JSONDecodeError:
415 |             return {
416 |                 "success": False,
417 |                 "error": "Invalid JSON response",
418 |                 "raw_content": content
419 |             }
420 | 
421 |     def test_budget_update_with_other_parameters(self) -> Dict[str, Any]:
422 |         """Test budget update combined with other parameters"""
423 |         
424 |         print(f"\n💰 Testing budget update with other parameters")
425 |         
426 |         result = self._make_request("tools/call", {
427 |             "name": "update_adset",
428 |             "arguments": {
429 |                 "adset_id": self.test_adset_ids[3],
430 |                 "daily_budget": "7500",  # $75
431 |                 "status": "PAUSED",
432 |                 "bid_amount": 1000,
433 |                 "bid_strategy": "LOWEST_COST_WITH_BID_CAP"
434 |             }
435 |         })
436 |         
437 |         if not result["success"]:
438 |             return {
439 |                 "success": False,
440 |                 "error": result.get("text", "Unknown error")
441 |             }
442 |         
443 |         # Parse response
444 |         response_data = result["json"]["result"]
445 |         content = response_data.get("content", [{}])[0].get("text", "")
446 |         
447 |         try:
448 |             parsed_content = json.loads(content)
449 |             
450 |             if "error" in parsed_content:
451 |                 error_msg = parsed_content.get("error", "")
452 |                 if "rate limit" in error_msg.lower() or "too many changes" in error_msg.lower():
453 |                     return {
454 |                         "success": True,  # Rate limiting is expected behavior
455 |                         "rate_limited": True,
456 |                         "error_message": error_msg
457 |                     }
458 |                 else:
459 |                     return {
460 |                         "success": False,
461 |                         "error": error_msg
462 |                     }
463 |             
464 |             # Check for successful update indicators
465 |             has_id = "id" in parsed_content
466 |             has_daily_budget = "daily_budget" in parsed_content
467 |             has_status = "status" in parsed_content
468 |             has_success = "success" in parsed_content
469 |             
470 |             result_data = {
471 |                 "success": True,
472 |                 "has_id": has_id,
473 |                 "has_daily_budget": has_daily_budget,
474 |                 "has_status": has_status,
475 |                 "has_success": has_success,
476 |                 "daily_budget": parsed_content.get("daily_budget", "N/A"),
477 |                 "status": parsed_content.get("status", "N/A"),
478 |                 "adset_id": parsed_content.get("id", "N/A")
479 |             }
480 |             
481 |             print(f"   ✅ Updated budget with other parameters successfully")
482 |             print(f"      • Ad Set ID: {parsed_content.get('id', 'N/A')}")
483 |             print(f"      • Success: {parsed_content.get('success', 'N/A')}")
484 |             
485 |             # Note: Meta Ads API returns {"success": true} for updates
486 |             # The actual updated values can be verified by fetching ad set details
487 |             
488 |             return result_data
489 |             
490 |         except json.JSONDecodeError:
491 |             return {
492 |                 "success": False,
493 |                 "error": "Invalid JSON response",
494 |                 "raw_content": content
495 |             }
496 | 
497 |     def test_invalid_budget_handling(self) -> Dict[str, Any]:
498 |         """Test error handling for invalid budget values"""
499 |         
500 |         print(f"\n💰 Testing invalid budget handling")
501 |         results = {}
502 |         
503 |         for invalid_budget in self.test_budgets["invalid_budgets"]:
504 |             print(f"   💰 Testing invalid budget: '{invalid_budget}'")
505 |             
506 |             result = self._make_request("tools/call", {
507 |                 "name": "update_adset",
508 |                 "arguments": {
509 |                     "adset_id": self.test_adset_ids[4],
510 |                     "daily_budget": invalid_budget
511 |                 }
512 |             })
513 |             
514 |             if not result["success"]:
515 |                 results[invalid_budget] = {
516 |                     "success": False,
517 |                     "error": result.get("text", "Unknown error")
518 |                 }
519 |                 print(f"   ❌ Request failed: {result.get('text', 'Unknown error')}")
520 |                 continue
521 |             
522 |             # Parse response
523 |             response_data = result["json"]["result"]
524 |             content = response_data.get("content", [{}])[0].get("text", "")
525 |             
526 |             try:
527 |                 parsed_content = json.loads(content)
528 |                 
529 |                 # For invalid budgets, we expect an error response
530 |                 has_error = "error" in parsed_content or "data" in parsed_content
531 |                 has_details = "details" in parsed_content
532 |                 
533 |                 # Check if the error is a proper validation error (not a rate limit or other issue)
534 |                 error_msg = parsed_content.get("error", "")
535 |                 if not error_msg and "data" in parsed_content:
536 |                     try:
537 |                         data_content = json.loads(parsed_content.get("data", ""))
538 |                         if "error" in data_content:
539 |                             error_msg = data_content["error"].get("message", "")
540 |                     except:
541 |                         pass
542 |                 
543 |                 is_validation_error = any(keyword in error_msg.lower() for keyword in [
544 |                     "must be a number", "greater than or equal to 0", "too high", "too low", "invalid parameter",
545 |                     "budget is too low", "budget is too high", "decrease your ad set budget"
546 |                 ])
547 |                 
548 |                 results[invalid_budget] = {
549 |                     "success": has_error and is_validation_error,  # Success if we got proper validation error
550 |                     "has_error": has_error,
551 |                     "has_details": has_details,
552 |                     "is_validation_error": is_validation_error,
553 |                     "error_message": error_msg or parsed_content.get("error", "No error field"),
554 |                     "details": parsed_content.get("details", "No details field")
555 |                 }
556 |                 
557 |                 if has_error and is_validation_error:
558 |                     print(f"   ✅ Properly handled invalid budget '{invalid_budget}'")
559 |                     print(f"      • Error: {parsed_content.get('error', 'N/A')}")
560 |                 elif has_error:
561 |                     print(f"   ⚠️  Got error but not validation error for '{invalid_budget}'")
562 |                     print(f"      • Error: {parsed_content.get('error', 'N/A')}")
563 |                 else:
564 |                     print(f"   ❌ Unexpected success for invalid budget '{invalid_budget}'")
565 |                     print(f"      • Response: {parsed_content}")
566 |                 
567 |             except json.JSONDecodeError:
568 |                 results[invalid_budget] = {
569 |                     "success": False,
570 |                     "error": "Invalid JSON response",
571 |                     "raw_content": content
572 |                 }
573 |                 print(f"   ❌ Invalid JSON: {content}")
574 |         
575 |         return results
576 | 
577 |     def test_budget_update_with_targeting(self) -> Dict[str, Any]:
578 |         """Test budget update combined with targeting update"""
579 |         
580 |         print(f"\n💰 Testing budget update with targeting")
581 |         
582 |         targeting = {
583 |             "age_min": 25,
584 |             "age_max": 45,
585 |             "geo_locations": {"countries": ["US", "CA"]}
586 |         }
587 |         
588 |         result = self._make_request("tools/call", {
589 |             "name": "update_adset",
590 |             "arguments": {
591 |                 "adset_id": self.test_adset_ids[5],
592 |                 "daily_budget": "8500",  # $85
593 |                 "targeting": targeting
594 |             }
595 |         })
596 |         
597 |         if not result["success"]:
598 |             return {
599 |                 "success": False,
600 |                 "error": result.get("text", "Unknown error")
601 |             }
602 |         
603 |         # Parse response
604 |         response_data = result["json"]["result"]
605 |         content = response_data.get("content", [{}])[0].get("text", "")
606 |         
607 |         try:
608 |             parsed_content = json.loads(content)
609 |             
610 |             if "error" in parsed_content:
611 |                 error_msg = parsed_content.get("error", "")
612 |                 if "rate limit" in error_msg.lower() or "too many changes" in error_msg.lower():
613 |                     return {
614 |                         "success": True,  # Rate limiting is expected behavior
615 |                         "rate_limited": True,
616 |                         "error_message": error_msg
617 |                     }
618 |                 else:
619 |                     return {
620 |                         "success": False,
621 |                         "error": error_msg
622 |                     }
623 |             
624 |             # Check for successful update indicators
625 |             has_id = "id" in parsed_content
626 |             has_daily_budget = "daily_budget" in parsed_content
627 |             has_success = "success" in parsed_content
628 |             
629 |             result_data = {
630 |                 "success": True,
631 |                 "has_id": has_id,
632 |                 "has_daily_budget": has_daily_budget,
633 |                 "has_success": has_success,
634 |                 "daily_budget": parsed_content.get("daily_budget", "N/A"),
635 |                 "adset_id": parsed_content.get("id", "N/A")
636 |             }
637 |             
638 |             print(f"   ✅ Updated budget with targeting successfully")
639 |             print(f"      • Ad Set ID: {parsed_content.get('id', 'N/A')}")
640 |             print(f"      • Success: {parsed_content.get('success', 'N/A')}")
641 |             
642 |             # Note: Meta Ads API returns {"success": true} for updates
643 |             # The actual updated values can be verified by fetching ad set details
644 |             
645 |             return result_data
646 |             
647 |         except json.JSONDecodeError:
648 |             return {
649 |                 "success": False,
650 |                 "error": "Invalid JSON response",
651 |                 "raw_content": content
652 |             }
653 | 
654 |     def run_budget_update_tests(self) -> bool:
655 |         """Run comprehensive budget update tests"""
656 |         
657 |         print("🚀 Meta Ads Budget Update End-to-End Test Suite")
658 |         print("="*60)
659 |         
660 |         # Check server availability
661 |         try:
662 |             response = requests.get(f"{self.base_url}/", timeout=5)
663 |             server_running = response.status_code in [200, 404]
664 |         except:
665 |             server_running = False
666 |         
667 |         if not server_running:
668 |             print("❌ Server is not running at", self.base_url)
669 |             print("   Please start the server with:")
670 |             print("   python3 -m meta_ads_mcp --transport streamable-http --port 8080")
671 |             return False
672 |         
673 |         print("✅ Server is running")
674 |         print("🔐 Using implicit authentication from server")
675 |         print("⚠️  Note: This test uses ad sets specifically created for budget testing")
676 |         print("⚠️  Note: Campaign uses ad set level budgets - testing budget updates at ad set level")
677 |         print("⚠️  Note: Meta Ads API allows only 4 budget changes per hour - test will wait if rate limited")
678 |         
679 |         # Test 1: Daily Budget Updates
680 |         print("\n" + "="*60)
681 |         print("📋 PHASE 1: Testing Daily Budget Updates")
682 |         print("="*60)
683 |         
684 |         daily_results = self.test_daily_budget_update()
685 |         daily_success = any(
686 |             result.get("success") or 
687 |             (result.get("success") and result.get("rate_limited"))
688 |             for result in daily_results.values()
689 |         )
690 |         
691 |         # Test 2: Lifetime Budget Updates
692 |         print("\n" + "="*60)
693 |         print("📋 PHASE 2: Testing Lifetime Budget Updates")
694 |         print("="*60)
695 |         
696 |         lifetime_results = self.test_lifetime_budget_update()
697 |         lifetime_success = any(
698 |             result.get("success") or 
699 |             (result.get("success") and result.get("rate_limited")) or
700 |             (not result.get("success") and result.get("api_limitation"))
701 |             for result in lifetime_results.values()
702 |         )
703 |         
704 |         # Test 3: Both Budget Types
705 |         print("\n" + "="*60)
706 |         print("📋 PHASE 3: Testing Both Budget Types")
707 |         print("="*60)
708 |         
709 |         both_budgets_result = self.test_both_budget_types_update()
710 |         both_budgets_success = (both_budgets_result.get("success") or
711 |                               (not both_budgets_result.get("success") and 
712 |                                both_budgets_result.get("rate_limited")) or
713 |                               (not both_budgets_result.get("success") and 
714 |                                both_budgets_result.get("api_limitation")))
715 |         
716 |         # Test 4: Budget with Other Parameters
717 |         print("\n" + "="*60)
718 |         print("📋 PHASE 4: Testing Budget with Other Parameters")
719 |         print("="*60)
720 |         
721 |         other_params_result = self.test_budget_update_with_other_parameters()
722 |         other_params_success = (other_params_result.get("success") or
723 |                               (other_params_result.get("success") and 
724 |                                other_params_result.get("rate_limited")))
725 |         
726 |         # Test 5: Invalid Budget Handling
727 |         print("\n" + "="*60)
728 |         print("📋 PHASE 5: Testing Invalid Budget Handling")
729 |         print("="*60)
730 |         
731 |         invalid_results = self.test_invalid_budget_handling()
732 |         invalid_success = any(
733 |             result.get("success") and result.get("is_validation_error") 
734 |             for result in invalid_results.values()
735 |         )
736 |         
737 |         # Test 6: Budget with Targeting
738 |         print("\n" + "="*60)
739 |         print("📋 PHASE 6: Testing Budget with Targeting")
740 |         print("="*60)
741 |         
742 |         targeting_result = self.test_budget_update_with_targeting()
743 |         targeting_success = (targeting_result.get("success") or
744 |                            (targeting_result.get("success") and 
745 |                             targeting_result.get("rate_limited")))
746 |         
747 |         # Final assessment
748 |         print("\n" + "="*60)
749 |         print("📊 FINAL RESULTS")
750 |         print("="*60)
751 |         
752 |         all_tests = [
753 |             ("Daily Budget Updates", daily_success),
754 |             ("Lifetime Budget Updates", lifetime_success),
755 |             ("Both Budget Types", both_budgets_success),
756 |             ("Budget with Other Parameters", other_params_success),
757 |             ("Invalid Budget Handling", invalid_success),
758 |             ("Budget with Targeting", targeting_success)
759 |         ]
760 |         
761 |         passed_tests = sum(1 for _, success in all_tests if success)
762 |         total_tests = len(all_tests)
763 |         
764 |         for test_name, success in all_tests:
765 |             status = "✅ PASSED" if success else "❌ FAILED"
766 |             print(f"   • {test_name}: {status}")
767 |         
768 |         overall_success = passed_tests >= 4  # At least 4 out of 6 tests should pass
769 |         
770 |         if overall_success:
771 |             print(f"\n✅ Budget update tests: SUCCESS ({passed_tests}/{total_tests} passed)")
772 |             print("   • Core budget update functionality is working")
773 |             print("   • Meta Ads API integration is functional")
774 |             print("   • Error handling is working properly")
775 |             return True
776 |         else:
777 |             print(f"\n❌ Budget update tests: FAILED ({passed_tests}/{total_tests} passed)")
778 |             print("   • Some budget update functions are not working properly")
779 |             print("   • Check API permissions and ad set IDs")
780 |             return False
781 | 
782 | 
783 | def main():
784 |     """Main test execution"""
785 |     tester = BudgetUpdateTester()
786 |     success = tester.run_budget_update_tests()
787 |     
788 |     if success:
789 |         print("\n🎉 All budget update tests passed!")
790 |     else:
791 |         print("\n⚠️  Some budget update tests failed - see details above")
792 |     
793 |     sys.exit(0 if success else 1)
794 | 
795 | 
796 | if __name__ == "__main__":
797 |     main() 
```

--------------------------------------------------------------------------------
/tests/test_dynamic_creatives.py:
--------------------------------------------------------------------------------

```python
  1 | """Tests for dynamic creative features including multiple headlines and descriptions.
  2 | 
  3 | Tests for the enhanced create_ad_creative function that supports:
  4 | - Multiple headlines
  5 | - Multiple descriptions  
  6 | - Dynamic creative optimization settings
  7 | - Creative update functionality
  8 | """
  9 | 
 10 | import pytest
 11 | import json
 12 | from unittest.mock import AsyncMock, patch
 13 | from meta_ads_mcp.core.ads import create_ad_creative, update_ad_creative
 14 | 
 15 | 
 16 | @pytest.mark.asyncio
 17 | class TestDynamicCreatives:
 18 |     """Test cases for dynamic creative features."""
 19 |     
 20 |     async def test_create_ad_creative_single_headline(self):
 21 |         """Test creating ad creative with single headline (simple case)."""
 22 |         
 23 |         sample_creative_data = {
 24 |             "id": "123456789",
 25 |             "name": "Test Creative",
 26 |             "status": "ACTIVE"
 27 |         }
 28 |         
 29 |         with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
 30 |             mock_api.return_value = sample_creative_data
 31 |             
 32 |             result = await create_ad_creative(
 33 |                 access_token="test_token",
 34 |                 account_id="act_123456789",
 35 |                 name="Test Creative",
 36 |                 image_hash="abc123",
 37 |                 page_id="987654321",
 38 |                 link_url="https://example.com",
 39 |                 message="Test message",
 40 |                 headline="Single Headline",
 41 |                 call_to_action_type="LEARN_MORE"
 42 |             )
 43 |             
 44 |             result_data = json.loads(result)
 45 |             assert result_data["success"] is True
 46 |             
 47 |             # Verify the API call was made with single headline in object_story_spec
 48 |             call_args_list = mock_api.call_args_list
 49 |             assert len(call_args_list) >= 1
 50 |             
 51 |             # First call should be the creative creation
 52 |             first_call = call_args_list[0]
 53 |             creative_data = first_call[0][2]  # params is the third argument
 54 |             
 55 |             # Should use object_story_spec with link_data for simple creatives (not asset_feed_spec)
 56 |             assert "object_story_spec" in creative_data
 57 |             assert "link_data" in creative_data["object_story_spec"]
 58 |             assert creative_data["object_story_spec"]["link_data"]["name"] == "Single Headline"
 59 |             assert "asset_feed_spec" not in creative_data
 60 |     
 61 |     async def test_create_ad_creative_single_description(self):
 62 |         """Test creating ad creative with single description (simple case)."""
 63 |         
 64 |         sample_creative_data = {
 65 |             "id": "123456789",
 66 |             "name": "Test Creative",
 67 |             "status": "ACTIVE"
 68 |         }
 69 |         
 70 |         with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
 71 |             mock_api.return_value = sample_creative_data
 72 |             
 73 |             result = await create_ad_creative(
 74 |                 access_token="test_token",
 75 |                 account_id="act_123456789",
 76 |                 name="Test Creative",
 77 |                 image_hash="abc123",
 78 |                 page_id="987654321",
 79 |                 link_url="https://example.com",
 80 |                 message="Test message",
 81 |                 description="Single Description",
 82 |                 call_to_action_type="LEARN_MORE"
 83 |             )
 84 |             
 85 |             result_data = json.loads(result)
 86 |             assert result_data["success"] is True
 87 |             
 88 |             # Verify the API call was made with single description in object_story_spec
 89 |             call_args_list = mock_api.call_args_list
 90 |             assert len(call_args_list) >= 1
 91 |             
 92 |             # First call should be the creative creation
 93 |             first_call = call_args_list[0]
 94 |             creative_data = first_call[0][2]  # params is the third argument
 95 |             
 96 |             # Should use object_story_spec with link_data for simple creatives (not asset_feed_spec)
 97 |             assert "object_story_spec" in creative_data
 98 |             assert "link_data" in creative_data["object_story_spec"]
 99 |             assert creative_data["object_story_spec"]["link_data"]["description"] == "Single Description"
100 |             assert "asset_feed_spec" not in creative_data
101 |     
102 |     async def test_create_ad_creative_cannot_mix_headline_and_headlines(self):
103 |         """Test that mixing headline and headlines parameters raises error."""
104 |         
105 |         result = await create_ad_creative(
106 |             access_token="test_token",
107 |             account_id="act_123456789",
108 |             name="Test Creative",
109 |             image_hash="abc123",
110 |             page_id="987654321",
111 |             link_url="https://example.com",
112 |             message="Test message",
113 |             headline="Single Headline",
114 |             headlines=["Headline 1", "Headline 2"],
115 |             call_to_action_type="LEARN_MORE"
116 |         )
117 |         
118 |         result_data = json.loads(result)
119 |         
120 |         # Check if error is wrapped in "data" field (MCP error response format)
121 |         if "data" in result_data:
122 |             error_data = json.loads(result_data["data"])
123 |             assert "error" in error_data
124 |             assert "Cannot specify both 'headline' and 'headlines'" in error_data["error"]
125 |         else:
126 |             assert "error" in result_data
127 |             assert "Cannot specify both 'headline' and 'headlines'" in result_data["error"]
128 |     
129 |     async def test_create_ad_creative_cannot_mix_description_and_descriptions(self):
130 |         """Test that mixing description and descriptions parameters raises error."""
131 |         
132 |         result = await create_ad_creative(
133 |             access_token="test_token",
134 |             account_id="act_123456789",
135 |             name="Test Creative",
136 |             image_hash="abc123",
137 |             page_id="987654321",
138 |             link_url="https://example.com",
139 |             message="Test message",
140 |             description="Single Description",
141 |             descriptions=["Description 1", "Description 2"],
142 |             call_to_action_type="LEARN_MORE"
143 |         )
144 |         
145 |         result_data = json.loads(result)
146 |         
147 |         # Check if error is wrapped in "data" field (MCP error response format)
148 |         if "data" in result_data:
149 |             error_data = json.loads(result_data["data"])
150 |             assert "error" in error_data
151 |             assert "Cannot specify both 'description' and 'descriptions'" in error_data["error"]
152 |         else:
153 |             assert "error" in result_data
154 |             assert "Cannot specify both 'description' and 'descriptions'" in result_data["error"]
155 | 
156 |     
157 |     async def test_create_ad_creative_multiple_headlines(self):
158 |         """Test creating ad creative with multiple headlines."""
159 |         
160 |         sample_creative_data = {
161 |             "id": "123456789",
162 |             "name": "Test Creative",
163 |             "status": "ACTIVE"
164 |         }
165 |         
166 |         with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
167 |             mock_api.return_value = sample_creative_data
168 |             
169 |             result = await create_ad_creative(
170 |                 access_token="test_token",
171 |                 account_id="act_123456789",
172 |                 name="Test Creative",
173 |                 image_hash="abc123",
174 |                 page_id="987654321",
175 |                 link_url="https://example.com",
176 |                 message="Test message",
177 |                 headlines=["Headline 1", "Headline 2", "Headline 3"],
178 |                 call_to_action_type="LEARN_MORE"
179 |             )
180 |             
181 |             result_data = json.loads(result)
182 |             assert result_data["success"] is True
183 |             
184 |             # Verify the API call was made with multiple headlines
185 |             # We need to check the first call (creative creation), not the second call (details fetch)
186 |             call_args_list = mock_api.call_args_list
187 |             assert len(call_args_list) >= 1
188 |             
189 |             # First call should be the creative creation
190 |             first_call = call_args_list[0]
191 |             creative_data = first_call[0][2]  # params is the third argument
192 |             
193 |             # Should use asset_feed_spec with headlines array format
194 |             assert "asset_feed_spec" in creative_data
195 |             assert "headlines" in creative_data["asset_feed_spec"]
196 |             assert creative_data["asset_feed_spec"]["headlines"] == [
197 |                 {"text": "Headline 1"}, 
198 |                 {"text": "Headline 2"}, 
199 |                 {"text": "Headline 3"}
200 |             ]
201 |     
202 |     async def test_create_ad_creative_multiple_descriptions(self):
203 |         """Test creating ad creative with multiple descriptions."""
204 |         
205 |         sample_creative_data = {
206 |             "id": "123456789",
207 |             "name": "Test Creative",
208 |             "status": "ACTIVE"
209 |         }
210 |         
211 |         with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
212 |             mock_api.return_value = sample_creative_data
213 |             
214 |             result = await create_ad_creative(
215 |                 access_token="test_token",
216 |                 account_id="act_123456789",
217 |                 name="Test Creative",
218 |                 image_hash="abc123",
219 |                 page_id="987654321",
220 |                 link_url="https://example.com",
221 |                 message="Test message",
222 |                 descriptions=["Description 1", "Description 2"],
223 |                 call_to_action_type="LEARN_MORE"
224 |             )
225 |             
226 |             result_data = json.loads(result)
227 |             assert result_data["success"] is True
228 |             
229 |             # Verify the API call was made with multiple descriptions
230 |             # We need to check the first call (creative creation), not the second call (details fetch)
231 |             call_args_list = mock_api.call_args_list
232 |             assert len(call_args_list) >= 1
233 |             
234 |             # First call should be the creative creation
235 |             first_call = call_args_list[0]
236 |             creative_data = first_call[0][2]  # params is the third argument
237 |             
238 |             # Should use asset_feed_spec with descriptions array format
239 |             assert "asset_feed_spec" in creative_data
240 |             assert "descriptions" in creative_data["asset_feed_spec"]
241 |             assert creative_data["asset_feed_spec"]["descriptions"] == [
242 |                 {"text": "Description 1"}, 
243 |                 {"text": "Description 2"}
244 |             ]
245 |     
246 |     async def test_create_ad_creative_dynamic_creative_spec(self):
247 |         """Test creating ad creative with dynamic_creative_spec optimization settings."""
248 |         
249 |         sample_creative_data = {
250 |             "id": "123456789",
251 |             "name": "Test Creative",
252 |             "status": "ACTIVE"
253 |         }
254 |         
255 |         with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
256 |             mock_api.return_value = sample_creative_data
257 |             
258 |             result = await create_ad_creative(
259 |                 access_token="test_token",
260 |                 account_id="act_123456789",
261 |                 name="Test Creative",
262 |                 image_hash="abc123",
263 |                 page_id="987654321",
264 |                 link_url="https://example.com",
265 |                 message="Test message",
266 |                 headlines=["Headline 1", "Headline 2"],
267 |                 descriptions=["Description 1", "Description 2"],
268 |                 dynamic_creative_spec={
269 |                     "headline_optimization": True,
270 |                     "description_optimization": True
271 |                 },
272 |                 call_to_action_type="LEARN_MORE"
273 |             )
274 |             
275 |             result_data = json.loads(result)
276 |             assert result_data["success"] is True
277 |             
278 |             # Verify the API call was made with dynamic_creative_spec
279 |             # We need to check the first call (creative creation), not the second call (details fetch)
280 |             call_args_list = mock_api.call_args_list
281 |             assert len(call_args_list) >= 1
282 |             
283 |             # First call should be the creative creation
284 |             first_call = call_args_list[0]
285 |             creative_data = first_call[0][2]  # params is the third argument
286 |             
287 |             # Should include dynamic_creative_spec
288 |             assert "dynamic_creative_spec" in creative_data
289 |             assert creative_data["dynamic_creative_spec"]["headline_optimization"] is True
290 |             assert creative_data["dynamic_creative_spec"]["description_optimization"] is True
291 |     
292 |     async def test_create_ad_creative_multiple_headlines_and_descriptions(self):
293 |         """Test creating ad creative with both multiple headlines and descriptions."""
294 |         
295 |         sample_creative_data = {
296 |             "id": "123456789",
297 |             "name": "Test Creative",
298 |             "status": "ACTIVE"
299 |         }
300 |         
301 |         with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
302 |             mock_api.return_value = sample_creative_data
303 |             
304 |             result = await create_ad_creative(
305 |                 access_token="test_token",
306 |                 account_id="act_123456789",
307 |                 name="Test Creative",
308 |                 image_hash="abc123",
309 |                 page_id="987654321",
310 |                 link_url="https://example.com",
311 |                 message="Test message",
312 |                 headlines=["Headline 1", "Headline 2", "Headline 3"],
313 |                 descriptions=["Description 1", "Description 2"],
314 |                 dynamic_creative_spec={
315 |                     "headline_optimization": True,
316 |                     "description_optimization": True
317 |                 },
318 |                 call_to_action_type="LEARN_MORE"
319 |             )
320 |             
321 |             result_data = json.loads(result)
322 |             assert result_data["success"] is True
323 |             
324 |             # Verify the API call was made with both multiple headlines and descriptions
325 |             # We need to check the first call (creative creation), not the second call (details fetch)
326 |             call_args_list = mock_api.call_args_list
327 |             assert len(call_args_list) >= 1
328 |             
329 |             # First call should be the creative creation
330 |             first_call = call_args_list[0]
331 |             creative_data = first_call[0][2]  # params is the third argument
332 |             
333 |             assert "asset_feed_spec" in creative_data
334 |             assert "headlines" in creative_data["asset_feed_spec"]
335 |             assert "descriptions" in creative_data["asset_feed_spec"]
336 |             assert "dynamic_creative_spec" in creative_data
337 |     
338 |     async def test_create_ad_creative_validation_max_headlines(self):
339 |         """Test validation for maximum number of headlines."""
340 |         
341 |         # Create list with more than 5 headlines (assuming 5 is the limit)
342 |         too_many_headlines = [f"Headline {i}" for i in range(6)]
343 |         
344 |         result = await create_ad_creative(
345 |             access_token="test_token",
346 |             account_id="act_123456789",
347 |             name="Test Creative",
348 |             image_hash="abc123",
349 |             page_id="987654321",
350 |             headlines=too_many_headlines
351 |         )
352 |         
353 |         result_data = json.loads(result)
354 |         # The error might be wrapped in a 'data' field
355 |         if "data" in result_data:
356 |             error_data = json.loads(result_data["data"])
357 |             assert "error" in error_data
358 |             assert "maximum" in error_data["error"].lower() or "limit" in error_data["error"].lower()
359 |         else:
360 |             assert "error" in result_data
361 |             assert "maximum" in result_data["error"].lower() or "limit" in result_data["error"].lower()
362 |     
363 |     async def test_create_ad_creative_validation_max_descriptions(self):
364 |         """Test validation for maximum number of descriptions."""
365 |         
366 |         # Create list with more than 5 descriptions (assuming 5 is the limit)
367 |         too_many_descriptions = [f"Description {i}" for i in range(6)]
368 |         
369 |         result = await create_ad_creative(
370 |             access_token="test_token",
371 |             account_id="act_123456789",
372 |             name="Test Creative",
373 |             image_hash="abc123",
374 |             page_id="987654321",
375 |             descriptions=too_many_descriptions
376 |         )
377 |         
378 |         result_data = json.loads(result)
379 |         # The error might be wrapped in a 'data' field
380 |         if "data" in result_data:
381 |             error_data = json.loads(result_data["data"])
382 |             assert "error" in error_data
383 |             assert "maximum" in error_data["error"].lower() or "limit" in error_data["error"].lower()
384 |         else:
385 |             assert "error" in result_data
386 |             assert "maximum" in result_data["error"].lower() or "limit" in result_data["error"].lower()
387 |     
388 | 
389 |     
390 |     async def test_update_ad_creative_add_headlines(self):
391 |         """Test updating an existing creative to add multiple headlines."""
392 |         
393 |         sample_creative_data = {
394 |             "id": "123456789",
395 |             "name": "Updated Creative",
396 |             "status": "ACTIVE"
397 |         }
398 |         
399 |         with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
400 |             mock_api.return_value = sample_creative_data
401 |             
402 |             result = await update_ad_creative(
403 |                 access_token="test_token",
404 |                 creative_id="123456789",
405 |                 headlines=["New Headline 1", "New Headline 2", "New Headline 3"]
406 |             )
407 |             
408 |             result_data = json.loads(result)
409 |             assert result_data["success"] is True
410 |             
411 |             # Verify the API call was made with updated headlines
412 |             # We need to check the first call (creative update), not the second call (details fetch)
413 |             call_args_list = mock_api.call_args_list
414 |             assert len(call_args_list) >= 1
415 |             
416 |             # First call should be the creative update
417 |             first_call = call_args_list[0]
418 |             creative_data = first_call[0][2]  # params is the third argument
419 |             
420 |             assert "asset_feed_spec" in creative_data
421 |             assert "headlines" in creative_data["asset_feed_spec"]
422 |             assert creative_data["asset_feed_spec"]["headlines"] == [
423 |                 {"text": "New Headline 1"}, 
424 |                 {"text": "New Headline 2"}, 
425 |                 {"text": "New Headline 3"}
426 |             ]
427 |     
428 |     async def test_update_ad_creative_add_descriptions(self):
429 |         """Test updating an existing creative to add multiple descriptions."""
430 |         
431 |         sample_creative_data = {
432 |             "id": "123456789",
433 |             "name": "Updated Creative",
434 |             "status": "ACTIVE"
435 |         }
436 |         
437 |         with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
438 |             mock_api.return_value = sample_creative_data
439 |             
440 |             result = await update_ad_creative(
441 |                 access_token="test_token",
442 |                 creative_id="123456789",
443 |                 descriptions=["New Description 1", "New Description 2"]
444 |             )
445 |             
446 |             result_data = json.loads(result)
447 |             assert result_data["success"] is True
448 |             
449 |             # Verify the API call was made with updated descriptions
450 |             # We need to check the first call (creative update), not the second call (details fetch)
451 |             call_args_list = mock_api.call_args_list
452 |             assert len(call_args_list) >= 1
453 |             
454 |             # First call should be the creative update
455 |             first_call = call_args_list[0]
456 |             creative_data = first_call[0][2]  # params is the third argument
457 |             
458 |             assert "asset_feed_spec" in creative_data
459 |             assert "descriptions" in creative_data["asset_feed_spec"]
460 |             assert creative_data["asset_feed_spec"]["descriptions"] == [
461 |                 {"text": "New Description 1"}, 
462 |                 {"text": "New Description 2"}
463 |             ]
464 |     
465 |     async def test_update_ad_creative_update_dynamic_spec(self):
466 |         """Test updating an existing creative's dynamic_creative_spec."""
467 |         
468 |         sample_creative_data = {
469 |             "id": "123456789",
470 |             "name": "Updated Creative",
471 |             "status": "ACTIVE"
472 |         }
473 |         
474 |         with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
475 |             mock_api.return_value = sample_creative_data
476 |             
477 |             result = await update_ad_creative(
478 |                 access_token="test_token",
479 |                 creative_id="123456789",
480 |                 dynamic_creative_spec={
481 |                     "headline_optimization": True,
482 |                     "description_optimization": False
483 |                 }
484 |             )
485 |             
486 |             result_data = json.loads(result)
487 |             assert result_data["success"] is True
488 |             
489 |             # Verify the API call was made with updated dynamic_creative_spec
490 |             # We need to check the first call (creative update), not the second call (details fetch)
491 |             call_args_list = mock_api.call_args_list
492 |             assert len(call_args_list) >= 1
493 |             
494 |             # First call should be the creative update
495 |             first_call = call_args_list[0]
496 |             creative_data = first_call[0][2]  # params is the third argument
497 |             
498 |             assert "dynamic_creative_spec" in creative_data
499 |             assert creative_data["dynamic_creative_spec"]["headline_optimization"] is True
500 |             assert creative_data["dynamic_creative_spec"]["description_optimization"] is False
501 |     
502 |     async def test_update_ad_creative_no_creative_id(self):
503 |         """Test update_ad_creative with empty creative_id provided."""
504 |         
505 |         result = await update_ad_creative(
506 |             creative_id="",  # Now provide the required parameter but with empty value
507 |             access_token="test_token",
508 |             headlines=["New Headline"]
509 |         )
510 |         
511 |         result_data = json.loads(result)
512 |         # The error might be wrapped in a 'data' field
513 |         if "data" in result_data:
514 |             error_data = json.loads(result_data["data"])
515 |             assert "error" in error_data
516 |             assert "creative id" in error_data["error"].lower()
517 |         else:
518 |             assert "error" in result_data
519 |             assert "creative id" in result_data["error"].lower()
520 |     
521 |     async def test_update_ad_creative_cannot_mix_headline_and_headlines(self):
522 |         """Test that mixing headline and headlines parameters raises error in update."""
523 |         
524 |         result = await update_ad_creative(
525 |             access_token="test_token",
526 |             creative_id="123456789",
527 |             headline="Single Headline",
528 |             headlines=["Headline 1", "Headline 2"]
529 |         )
530 |         
531 |         result_data = json.loads(result)
532 |         
533 |         # Check if error is wrapped in "data" field (MCP error response format)
534 |         if "data" in result_data:
535 |             error_data = json.loads(result_data["data"])
536 |             assert "error" in error_data
537 |             assert "Cannot specify both 'headline' and 'headlines'" in error_data["error"]
538 |         else:
539 |             assert "error" in result_data
540 |             assert "Cannot specify both 'headline' and 'headlines'" in result_data["error"]
541 |     
542 |     async def test_update_ad_creative_cannot_mix_description_and_descriptions(self):
543 |         """Test that mixing description and descriptions parameters raises error in update."""
544 |         
545 |         result = await update_ad_creative(
546 |             access_token="test_token",
547 |             creative_id="123456789",
548 |             description="Single Description",
549 |             descriptions=["Description 1", "Description 2"]
550 |         )
551 |         
552 |         result_data = json.loads(result)
553 |         
554 |         # Check if error is wrapped in "data" field (MCP error response format)
555 |         if "data" in result_data:
556 |             error_data = json.loads(result_data["data"])
557 |             assert "error" in error_data
558 |             assert "Cannot specify both 'description' and 'descriptions'" in error_data["error"]
559 |         else:
560 |             assert "error" in result_data
561 |             assert "Cannot specify both 'description' and 'descriptions'" in result_data["error"]
562 | 
563 |     async def test_update_ad_creative_validation_max_headlines(self):
564 |         """Test validation for maximum number of headlines in update."""
565 |         
566 |         # Create list with more than 5 headlines (limit)
567 |         too_many_headlines = [f"Headline {i}" for i in range(6)]
568 |         
569 |         result = await update_ad_creative(
570 |             access_token="test_token",
571 |             creative_id="123456789",
572 |             headlines=too_many_headlines
573 |         )
574 |         
575 |         result_data = json.loads(result)
576 |         # The error might be wrapped in a 'data' field
577 |         if "data" in result_data:
578 |             error_data = json.loads(result_data["data"])
579 |             assert "error" in error_data
580 |             assert "maximum 5 headlines" in error_data["error"].lower()
581 |         else:
582 |             assert "error" in result_data
583 |             assert "maximum 5 headlines" in result_data["error"].lower()
584 |     
585 |     async def test_update_ad_creative_validation_max_descriptions(self):
586 |         """Test validation for maximum number of descriptions in update."""
587 |         
588 |         # Create list with more than 5 descriptions (limit)
589 |         too_many_descriptions = [f"Description {i}" for i in range(6)]
590 |         
591 |         result = await update_ad_creative(
592 |             access_token="test_token",
593 |             creative_id="123456789",
594 |             descriptions=too_many_descriptions
595 |         )
596 |         
597 |         result_data = json.loads(result)
598 |         # The error might be wrapped in a 'data' field
599 |         if "data" in result_data:
600 |             error_data = json.loads(result_data["data"])
601 |             assert "error" in error_data
602 |             assert "maximum 5 descriptions" in error_data["error"].lower()
603 |         else:
604 |             assert "error" in result_data
605 |             assert "maximum 5 descriptions" in result_data["error"].lower()
606 |     
607 |     async def test_update_ad_creative_validation_headline_length(self):
608 |         """Test validation for headline character limit."""
609 |         
610 |         # Create headline longer than 40 characters
611 |         long_headline = "A" * 41
612 |         
613 |         result = await update_ad_creative(
614 |             access_token="test_token",
615 |             creative_id="123456789",
616 |             headlines=[long_headline]
617 |         )
618 |         
619 |         result_data = json.loads(result)
620 |         # The error might be wrapped in a 'data' field
621 |         if "data" in result_data:
622 |             error_data = json.loads(result_data["data"])
623 |             assert "error" in error_data
624 |             assert "40 character limit" in error_data["error"]
625 |         else:
626 |             assert "error" in result_data
627 |             assert "40 character limit" in result_data["error"]
628 |     
629 |     async def test_update_ad_creative_validation_description_length(self):
630 |         """Test validation for description character limit."""
631 |         
632 |         # Create description longer than 125 characters
633 |         long_description = "A" * 126
634 |         
635 |         result = await update_ad_creative(
636 |             access_token="test_token",
637 |             creative_id="123456789",
638 |             descriptions=[long_description]
639 |         )
640 |         
641 |         result_data = json.loads(result)
642 |         # The error might be wrapped in a 'data' field
643 |         if "data" in result_data:
644 |             error_data = json.loads(result_data["data"])
645 |             assert "error" in error_data
646 |             assert "125 character limit" in error_data["error"]
647 |         else:
648 |             assert "error" in result_data
649 |             assert "125 character limit" in result_data["error"]
650 |     
651 |     async def test_update_ad_creative_name_only(self):
652 |         """Test updating just the creative name."""
653 |         
654 |         sample_creative_data = {
655 |             "id": "123456789",
656 |             "name": "Updated Creative Name",
657 |             "status": "ACTIVE"
658 |         }
659 |         
660 |         with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
661 |             mock_api.return_value = sample_creative_data
662 |             
663 |             result = await update_ad_creative(
664 |                 access_token="test_token",
665 |                 creative_id="123456789",
666 |                 name="Updated Creative Name"
667 |             )
668 |             
669 |             result_data = json.loads(result)
670 |             assert result_data["success"] is True
671 |             
672 |             # Verify the API call was made with just the name
673 |             call_args_list = mock_api.call_args_list
674 |             assert len(call_args_list) >= 1
675 |             
676 |             first_call = call_args_list[0]
677 |             creative_data = first_call[0][2]  # params is the third argument
678 |             
679 |             assert "name" in creative_data
680 |             assert creative_data["name"] == "Updated Creative Name"
681 |             # Should not have asset_feed_spec since no dynamic content
682 |             assert "asset_feed_spec" not in creative_data
683 |     
684 |     async def test_update_ad_creative_message_only(self):
685 |         """Test updating just the creative message."""
686 |         
687 |         sample_creative_data = {
688 |             "id": "123456789",
689 |             "name": "Test Creative",
690 |             "status": "ACTIVE"
691 |         }
692 |         
693 |         with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
694 |             mock_api.return_value = sample_creative_data
695 |             
696 |             result = await update_ad_creative(
697 |                 access_token="test_token",
698 |                 creative_id="123456789",
699 |                 message="Updated message text"
700 |             )
701 |             
702 |             result_data = json.loads(result)
703 |             assert result_data["success"] is True
704 |             
705 |             # Verify the API call was made with object_story_spec
706 |             call_args_list = mock_api.call_args_list
707 |             assert len(call_args_list) >= 1
708 |             
709 |             first_call = call_args_list[0]
710 |             creative_data = first_call[0][2]  # params is the third argument
711 |             
712 |             assert "object_story_spec" in creative_data
713 |             assert creative_data["object_story_spec"]["link_data"]["message"] == "Updated message text"
714 |     
715 |     async def test_update_ad_creative_cta_with_dynamic_content(self):
716 |         """Test updating call_to_action_type with dynamic creative content."""
717 |         
718 |         sample_creative_data = {
719 |             "id": "123456789",
720 |             "name": "Test Creative",
721 |             "status": "ACTIVE"
722 |         }
723 |         
724 |         with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
725 |             mock_api.return_value = sample_creative_data
726 |             
727 |             result = await update_ad_creative(
728 |                 access_token="test_token",
729 |                 creative_id="123456789",
730 |                 headlines=["Test Headline"],
731 |                 call_to_action_type="SHOP_NOW"
732 |             )
733 |             
734 |             result_data = json.loads(result)
735 |             assert result_data["success"] is True
736 |             
737 |             # Verify CTA is added to asset_feed_spec when dynamic content exists
738 |             call_args_list = mock_api.call_args_list
739 |             assert len(call_args_list) >= 1
740 |             
741 |             first_call = call_args_list[0]
742 |             creative_data = first_call[0][2]  # params is the third argument
743 |             
744 |             assert "asset_feed_spec" in creative_data
745 |             assert "call_to_action_types" in creative_data["asset_feed_spec"]
746 |             assert creative_data["asset_feed_spec"]["call_to_action_types"] == ["SHOP_NOW"]
747 |     
748 |     async def test_update_ad_creative_cta_without_dynamic_content(self):
749 |         """Test updating call_to_action_type without dynamic creative content."""
750 |         
751 |         sample_creative_data = {
752 |             "id": "123456789",
753 |             "name": "Test Creative",
754 |             "status": "ACTIVE"
755 |         }
756 |         
757 |         with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
758 |             mock_api.return_value = sample_creative_data
759 |             
760 |             result = await update_ad_creative(
761 |                 access_token="test_token",
762 |                 creative_id="123456789",
763 |                 call_to_action_type="LEARN_MORE"
764 |             )
765 |             
766 |             result_data = json.loads(result)
767 |             assert result_data["success"] is True
768 |             
769 |             # Verify CTA is added to object_story_spec when no dynamic content
770 |             call_args_list = mock_api.call_args_list
771 |             assert len(call_args_list) >= 1
772 |             
773 |             first_call = call_args_list[0]
774 |             creative_data = first_call[0][2]  # params is the third argument
775 |             
776 |             assert "object_story_spec" in creative_data
777 |             assert "call_to_action" in creative_data["object_story_spec"]["link_data"]
778 |             assert creative_data["object_story_spec"]["link_data"]["call_to_action"]["type"] == "LEARN_MORE"
779 |     
780 |     async def test_update_ad_creative_combined_updates(self):
781 |         """Test updating multiple parameters at once."""
782 |         
783 |         sample_creative_data = {
784 |             "id": "123456789",
785 |             "name": "Multi-Updated Creative",
786 |             "status": "ACTIVE"
787 |         }
788 |         
789 |         with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
790 |             mock_api.return_value = sample_creative_data
791 |             
792 |             result = await update_ad_creative(
793 |                 access_token="test_token",
794 |                 creative_id="123456789",
795 |                 name="Multi-Updated Creative",
796 |                 message="Updated message",
797 |                 headlines=["New Headline 1", "New Headline 2"],
798 |                 descriptions=["New Description 1"],
799 |                 call_to_action_type="SIGN_UP"
800 |             )
801 |             
802 |             result_data = json.loads(result)
803 |             assert result_data["success"] is True
804 |             
805 |             # Verify all updates are included
806 |             call_args_list = mock_api.call_args_list
807 |             assert len(call_args_list) >= 1
808 |             
809 |             first_call = call_args_list[0]
810 |             creative_data = first_call[0][2]  # params is the third argument
811 |             
812 |             # Should have name
813 |             assert creative_data["name"] == "Multi-Updated Creative"
814 |             
815 |             # Should have asset_feed_spec with all dynamic content
816 |             assert "asset_feed_spec" in creative_data
817 |             assert "headlines" in creative_data["asset_feed_spec"]
818 |             assert "descriptions" in creative_data["asset_feed_spec"]
819 |             assert "primary_texts" in creative_data["asset_feed_spec"]
820 |             assert "call_to_action_types" in creative_data["asset_feed_spec"]
821 |             
822 |             # Verify content
823 |             assert creative_data["asset_feed_spec"]["headlines"] == [
824 |                 {"text": "New Headline 1"}, {"text": "New Headline 2"}
825 |             ]
826 |             assert creative_data["asset_feed_spec"]["descriptions"] == [
827 |                 {"text": "New Description 1"}
828 |             ]
829 |             assert creative_data["asset_feed_spec"]["primary_texts"] == [
830 |                 {"text": "Updated message"}
831 |             ]
832 |             assert creative_data["asset_feed_spec"]["call_to_action_types"] == ["SIGN_UP"]
833 |     
834 |     async def test_update_ad_creative_api_error_handling(self):
835 |         """Test error handling when API request fails."""
836 |         
837 |         with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
838 |             mock_api.side_effect = Exception("API request failed")
839 |             
840 |             result = await update_ad_creative(
841 |                 access_token="test_token",
842 |                 creative_id="123456789",
843 |                 name="Test Creative"
844 |             )
845 |             
846 |             result_data = json.loads(result)
847 |             # The error might be wrapped in a 'data' field
848 |             if "data" in result_data:
849 |                 error_data = json.loads(result_data["data"])
850 |                 assert "error" in error_data
851 |                 assert error_data["error"] == "Failed to update ad creative"
852 |                 assert "details" in error_data
853 |                 assert "update_data_sent" in error_data
854 |             else:
855 |                 assert "error" in result_data
856 |                 assert result_data["error"] == "Failed to update ad creative"
857 |                 assert "details" in result_data
858 |                 assert "update_data_sent" in result_data
859 |  
```

--------------------------------------------------------------------------------
/tests/test_duplication_regression.py:
--------------------------------------------------------------------------------

```python
  1 | """Comprehensive regression tests for duplication module."""
  2 | 
  3 | import os
  4 | import json
  5 | import pytest
  6 | import httpx
  7 | from unittest.mock import patch, AsyncMock, MagicMock
  8 | import importlib
  9 | 
 10 | 
 11 | class TestDuplicationFeatureToggle:
 12 |     """Test feature toggle functionality to prevent regression."""
 13 |     
 14 |     def test_feature_disabled_by_default(self):
 15 |         """Ensure duplication is disabled by default."""
 16 |         with patch.dict(os.environ, {}, clear=True):
 17 |             # Force reload to pick up environment changes
 18 |             import importlib
 19 |             from meta_ads_mcp.core import duplication
 20 |             importlib.reload(duplication)
 21 |             
 22 |             assert not duplication.ENABLE_DUPLICATION
 23 |             # Note: Functions may persist in module namespace due to previous test runs
 24 |             # The important thing is that ENABLE_DUPLICATION flag is False
 25 |     
 26 |     def test_feature_enabled_with_env_var(self):
 27 |         """Ensure duplication is enabled when environment variable is set."""
 28 |         with patch.dict(os.environ, {"META_ADS_ENABLE_DUPLICATION": "1"}):
 29 |             # Force reload to pick up environment changes
 30 |             import importlib
 31 |             from meta_ads_mcp.core import duplication
 32 |             importlib.reload(duplication)
 33 |             
 34 |             assert duplication.ENABLE_DUPLICATION
 35 |             assert hasattr(duplication, 'duplicate_campaign')
 36 |             assert hasattr(duplication, 'duplicate_adset')
 37 |             assert hasattr(duplication, 'duplicate_ad')
 38 |             assert hasattr(duplication, 'duplicate_creative')
 39 |     
 40 |     def test_feature_enabled_with_various_truthy_values(self):
 41 |         """Test that various truthy values enable the feature."""
 42 |         truthy_values = ["1", "true", "TRUE", "yes", "YES", "on", "ON", "enabled"]
 43 |         
 44 |         for value in truthy_values:
 45 |             with patch.dict(os.environ, {"META_ADS_ENABLE_DUPLICATION": value}):
 46 |                 import importlib
 47 |                 from meta_ads_mcp.core import duplication
 48 |                 importlib.reload(duplication)
 49 |                 
 50 |                 assert duplication.ENABLE_DUPLICATION, f"Value '{value}' should enable the feature"
 51 |     
 52 |     def test_feature_disabled_with_empty_string(self):
 53 |         """Test that empty string disables the feature."""
 54 |         with patch.dict(os.environ, {"META_ADS_ENABLE_DUPLICATION": ""}):
 55 |             import importlib
 56 |             from meta_ads_mcp.core import duplication
 57 |             importlib.reload(duplication)
 58 |             
 59 |             assert not duplication.ENABLE_DUPLICATION
 60 | 
 61 | 
 62 | class TestDuplicationDecorators:
 63 |     """Test that decorators are applied correctly to prevent regression."""
 64 |     
 65 |     @pytest.fixture(autouse=True)
 66 |     def enable_feature(self):
 67 |         """Enable the duplication feature for these tests."""
 68 |         with patch.dict(os.environ, {"META_ADS_ENABLE_DUPLICATION": "1"}):
 69 |             import importlib
 70 |             from meta_ads_mcp.core import duplication
 71 |             importlib.reload(duplication)
 72 |             yield duplication
 73 |     
 74 |     def test_functions_have_meta_api_tool_decorator(self, enable_feature):
 75 |         """Ensure all duplication functions have @meta_api_tool decorator."""
 76 |         duplication = enable_feature
 77 |         
 78 |         functions = ['duplicate_campaign', 'duplicate_adset', 'duplicate_ad', 'duplicate_creative']
 79 |         
 80 |         for func_name in functions:
 81 |             func = getattr(duplication, func_name)
 82 |             
 83 |             # Check that function has been wrapped by meta_api_tool
 84 |             # The meta_api_tool decorator should add access token handling
 85 |             assert callable(func), f"{func_name} should be callable"
 86 |             
 87 |             # Check function signature includes access_token parameter
 88 |             import inspect
 89 |             sig = inspect.signature(func)
 90 |             assert 'access_token' in sig.parameters, f"{func_name} should have access_token parameter"
 91 |             assert sig.parameters['access_token'].default is None, f"{func_name} access_token should default to None"
 92 |     
 93 |     @pytest.mark.asyncio
 94 |     async def test_functions_are_mcp_tools(self, enable_feature):
 95 |         """Ensure all duplication functions are registered as MCP tools."""
 96 |         # This test ensures the @mcp_server.tool() decorator is working
 97 |         from meta_ads_mcp.core.server import mcp_server
 98 |         
 99 |         # Get all registered tool names (list_tools is async)
100 |         tools = await mcp_server.list_tools()
101 |         tool_names = [tool.name for tool in tools]
102 |         
103 |         expected_tools = ['duplicate_campaign', 'duplicate_adset', 'duplicate_ad', 'duplicate_creative']
104 |         
105 |         for tool_name in expected_tools:
106 |             assert tool_name in tool_names, f"{tool_name} should be registered as an MCP tool"
107 | 
108 | 
109 | class TestDuplicationAPIContract:
110 |     """Test API contract to prevent regression in external API calls."""
111 |     
112 |     @pytest.fixture(autouse=True)
113 |     def enable_feature(self):
114 |         """Enable the duplication feature for these tests."""
115 |         with patch.dict(os.environ, {"META_ADS_ENABLE_DUPLICATION": "1"}):
116 |             import importlib
117 |             from meta_ads_mcp.core import duplication
118 |             importlib.reload(duplication)
119 |             yield duplication
120 |     
121 |     @pytest.mark.asyncio
122 |     async def test_api_endpoint_construction(self, enable_feature):
123 |         """Test that API endpoints are constructed correctly."""
124 |         duplication = enable_feature
125 |         
126 |         test_cases = [
127 |             ("campaign", "123456789", "https://mcp.pipeboard.co/api/meta/duplicate/campaign/123456789"),
128 |             ("adset", "987654321", "https://mcp.pipeboard.co/api/meta/duplicate/adset/987654321"),
129 |             ("ad", "555666777", "https://mcp.pipeboard.co/api/meta/duplicate/ad/555666777"),
130 |             ("creative", "111222333", "https://mcp.pipeboard.co/api/meta/duplicate/creative/111222333"),
131 |         ]
132 |         
133 |         for resource_type, resource_id, expected_url in test_cases:
134 |             # Mock dual-header authentication
135 |             with patch("meta_ads_mcp.core.duplication.FastMCPAuthIntegration") as mock_auth:
136 |                 mock_auth.get_pipeboard_token.return_value = "pipeboard_token"
137 |                 mock_auth.get_auth_token.return_value = "facebook_token"
138 |                 
139 |                 with patch("meta_ads_mcp.core.duplication.httpx.AsyncClient") as mock_client:
140 |                     mock_response = AsyncMock()
141 |                     mock_response.status_code = 200
142 |                     mock_response.json.return_value = {"success": True}
143 |                     mock_client.return_value.__aenter__.return_value.post.return_value = mock_response
144 |                     
145 |                     await duplication._forward_duplication_request(
146 |                         resource_type, resource_id, "test_token", {}
147 |                     )
148 |                     
149 |                     # Verify the correct URL was called
150 |                     call_args = mock_client.return_value.__aenter__.return_value.post.call_args
151 |                     actual_url = call_args[0][0]
152 |                     assert actual_url == expected_url, f"Expected {expected_url}, got {actual_url}"
153 | 
154 |     @pytest.mark.asyncio
155 |     async def test_request_headers_format(self, enable_feature):
156 |         """Test that request headers are formatted correctly."""
157 |         duplication = enable_feature
158 |         
159 |         # Mock dual-header authentication
160 |         with patch("meta_ads_mcp.core.duplication.FastMCPAuthIntegration") as mock_auth:
161 |             mock_auth.get_pipeboard_token.return_value = "pipeboard_token_12345"
162 |             mock_auth.get_auth_token.return_value = "facebook_token_67890"
163 |             
164 |             with patch("meta_ads_mcp.core.duplication.httpx.AsyncClient") as mock_client:
165 |                 mock_response = AsyncMock()
166 |                 mock_response.status_code = 200
167 |                 mock_response.json.return_value = {"success": True}
168 |                 mock_client.return_value.__aenter__.return_value.post.return_value = mock_response
169 |                 
170 |                 await duplication._forward_duplication_request(
171 |                     "campaign", "123456789", "test_token_12345", {"name_suffix": " - Test"}
172 |                 )
173 |                 
174 |                 # Verify dual headers are sent correctly
175 |                 call_args = mock_client.return_value.__aenter__.return_value.post.call_args
176 |                 headers = call_args[1]["headers"]
177 |                 
178 |                 # Check the dual-header authentication pattern
179 |                 assert headers["Authorization"] == "Bearer facebook_token_67890"  # Facebook token for Meta API
180 |                 assert headers["X-Pipeboard-Token"] == "pipeboard_token_12345"   # Pipeboard token for auth
181 |                 assert headers["Content-Type"] == "application/json"
182 |                 assert headers["User-Agent"] == "meta-ads-mcp/1.0"
183 | 
184 |     @pytest.mark.asyncio
185 |     async def test_request_timeout_configuration(self, enable_feature):
186 |         """Test that request timeout is configured correctly."""
187 |         duplication = enable_feature
188 |         
189 |         # Mock dual-header authentication
190 |         with patch("meta_ads_mcp.core.duplication.FastMCPAuthIntegration") as mock_auth:
191 |             mock_auth.get_pipeboard_token.return_value = "pipeboard_token"
192 |             mock_auth.get_auth_token.return_value = "facebook_token"
193 |             
194 |             with patch("meta_ads_mcp.core.duplication.httpx.AsyncClient") as mock_client:
195 |                 mock_response = AsyncMock()
196 |                 mock_response.status_code = 200
197 |                 mock_response.json.return_value = {"success": True}
198 |                 mock_client.return_value.__aenter__.return_value.post.return_value = mock_response
199 |                 
200 |                 await duplication._forward_duplication_request(
201 |                     "campaign", "123456789", "test_token", {}
202 |                 )
203 |                 
204 |                 # Verify timeout is set to 30 seconds
205 |                 mock_client.assert_called_once_with(timeout=30.0)
206 | 
207 | 
208 | class TestDuplicationErrorHandling:
209 |     """Test error handling to prevent regression in error scenarios."""
210 |     
211 |     @pytest.fixture(autouse=True)
212 |     def enable_feature(self):
213 |         """Enable the duplication feature for these tests."""
214 |         with patch.dict(os.environ, {"META_ADS_ENABLE_DUPLICATION": "1"}):
215 |             import importlib
216 |             from meta_ads_mcp.core import duplication
217 |             importlib.reload(duplication)
218 |             yield duplication
219 |     
220 |     @pytest.mark.asyncio
221 |     async def test_missing_access_token_error(self, enable_feature):
222 |         """Test error handling when authentication tokens are missing."""
223 |         duplication = enable_feature
224 |         
225 |         # Test missing Pipeboard token (primary authentication failure)
226 |         with patch("meta_ads_mcp.core.duplication.FastMCPAuthIntegration") as mock_auth:
227 |             mock_auth.get_pipeboard_token.return_value = None  # No Pipeboard token
228 |             mock_auth.get_auth_token.return_value = "facebook_token"  # Has Facebook token
229 |             
230 |             result = await duplication._forward_duplication_request("campaign", "123", None, {})
231 |             result_json = json.loads(result)
232 |             
233 |             assert result_json["error"] == "authentication_required"
234 |             assert "Pipeboard API token not found" in result_json["message"]
235 |         
236 |         # Test missing Facebook token (secondary authentication failure)
237 |         with patch("meta_ads_mcp.core.duplication.FastMCPAuthIntegration") as mock_auth:
238 |             mock_auth.get_pipeboard_token.return_value = "pipeboard_token"  # Has Pipeboard token
239 |             mock_auth.get_auth_token.return_value = None  # No Facebook token
240 |             
241 |             with patch("meta_ads_mcp.core.auth.get_current_access_token") as mock_get_token:
242 |                 mock_get_token.return_value = None  # No fallback token
243 |                 
244 |                 result = await duplication._forward_duplication_request("campaign", "123", None, {})
245 |                 result_json = json.loads(result)
246 |                 
247 |                 assert result_json["error"] == "authentication_required"
248 |                 assert "Meta Ads access token not found" in result_json["message"]
249 |     
250 |     @pytest.mark.asyncio
251 |     async def test_http_status_code_handling(self, enable_feature):
252 |         """Test handling of various HTTP status codes."""
253 |         duplication = enable_feature
254 |         
255 |         status_code_tests = [
256 |             (200, "success_response", "json"),
257 |             (400, "validation_failed", "error"),
258 |             (401, "authentication_error", "error"),
259 |             (402, "subscription_required", "error"),
260 |             (403, "facebook_connection_required", "error"),
261 |             (404, "resource_not_found", "error"),
262 |             (429, "rate_limit_exceeded", "error"),
263 |             (502, "meta_api_error", "error"),
264 |             (500, "duplication_failed", "error"),
265 |         ]
266 |         
267 |         for status_code, expected_error_type, response_type in status_code_tests:
268 |             with patch("meta_ads_mcp.core.duplication.httpx.AsyncClient") as mock_client, \
269 |                  patch("meta_ads_mcp.core.duplication.FastMCPAuthIntegration.get_pipeboard_token", return_value="test_pipeboard_token"), \
270 |                  patch("meta_ads_mcp.core.duplication.FastMCPAuthIntegration.get_auth_token", return_value="test_facebook_token"):
271 |                 # Use MagicMock instead of AsyncMock for more predictable behavior
272 |                 mock_response = MagicMock()
273 |                 mock_response.status_code = status_code
274 |                 
275 |                 if status_code == 200:
276 |                     mock_response.json.return_value = {"success": True, "id": "new_123"}
277 |                 elif status_code == 400:
278 |                     mock_response.json.return_value = {"errors": ["Invalid parameter"], "warnings": []}
279 |                 elif status_code == 401:
280 |                     mock_response.json.side_effect = Exception("No JSON")
281 |                     mock_response.text = "Unauthorized"
282 |                 elif status_code == 402:
283 |                     mock_response.json.return_value = {
284 |                         "message": "This feature is not available in your current plan",
285 |                         "upgrade_url": "https://pipeboard.co/upgrade",
286 |                         "suggestion": "Please upgrade your account to access this feature"
287 |                     }
288 |                 elif status_code == 403:
289 |                     mock_response.json.return_value = {
290 |                         "message": "You need to connect your Facebook account first",
291 |                         "details": {
292 |                             "login_flow_url": "/connections",
293 |                             "auth_flow_url": "/api/meta/auth"
294 |                         }
295 |                     }
296 |                 elif status_code == 404:
297 |                     mock_response.json.side_effect = Exception("No JSON")
298 |                     mock_response.text = "Not Found"
299 |                 elif status_code == 429:
300 |                     mock_response.headers.get.return_value = "60"
301 |                     mock_response.json.side_effect = Exception("No JSON")
302 |                     mock_response.text = "Rate limited"
303 |                 elif status_code == 502:
304 |                     mock_response.json.return_value = {"message": "Facebook API error"}
305 |                 else:
306 |                     mock_response.json.side_effect = Exception("No JSON")
307 |                     mock_response.text = f"Error {status_code}"
308 |                 
309 |                 mock_client.return_value.__aenter__.return_value.post.return_value = mock_response
310 |                 
311 |                 result = await duplication._forward_duplication_request(
312 |                     "campaign", "123", "token", {}
313 |                 )
314 |                 result_json = json.loads(result)
315 |                 
316 |                 if response_type == "error":
317 |                     if status_code == 401:
318 |                         assert result_json["error"] == expected_error_type
319 |                     elif status_code == 403:
320 |                         assert result_json["error"] == expected_error_type
321 |                     elif status_code == 400:
322 |                         assert result_json["error"] == expected_error_type
323 |                     elif status_code == 404:
324 |                         assert result_json["error"] == expected_error_type
325 |                     elif status_code == 502:
326 |                         assert result_json["error"] == expected_error_type
327 |                     else:
328 |                         assert result_json["error"] == expected_error_type
329 |                 else:
330 |                     assert "success" in result_json or "id" in result_json
331 |     
332 |     @pytest.mark.asyncio
333 |     async def test_network_error_handling(self, enable_feature):
334 |         """Test handling of network errors."""
335 |         duplication = enable_feature
336 |         
337 |         network_errors = [
338 |             (httpx.TimeoutException("Timeout"), "request_timeout"),
339 |             (httpx.RequestError("Connection failed"), "network_error"),
340 |             (Exception("Unexpected error"), "unexpected_error"),
341 |         ]
342 |         
343 |         for exception, expected_error in network_errors:
344 |             with patch("meta_ads_mcp.core.duplication.httpx.AsyncClient") as mock_client, \
345 |                  patch("meta_ads_mcp.core.duplication.FastMCPAuthIntegration.get_pipeboard_token", return_value="test_pipeboard_token"), \
346 |                  patch("meta_ads_mcp.core.duplication.FastMCPAuthIntegration.get_auth_token", return_value="test_facebook_token"):
347 |                 mock_client.return_value.__aenter__.return_value.post.side_effect = exception
348 |                 
349 |                 result = await duplication._forward_duplication_request(
350 |                     "campaign", "123", "token", {}
351 |                 )
352 |                 result_json = json.loads(result)
353 |                 
354 |                 assert result_json["error"] == expected_error
355 | 
356 | 
357 | class TestDuplicationParameterHandling:
358 |     """Test parameter handling to prevent regression in data processing."""
359 |     
360 |     @pytest.fixture(autouse=True)
361 |     def enable_feature(self):
362 |         """Enable the duplication feature for these tests."""
363 |         with patch.dict(os.environ, {"META_ADS_ENABLE_DUPLICATION": "1"}):
364 |             import importlib
365 |             from meta_ads_mcp.core import duplication
366 |             importlib.reload(duplication)
367 |             yield duplication
368 |     
369 |     @pytest.mark.asyncio
370 |     async def test_none_values_filtered_from_options(self, enable_feature):
371 |         """Test that None values are filtered from options."""
372 |         duplication = enable_feature
373 |         
374 |         # Mock dual-header authentication  
375 |         with patch("meta_ads_mcp.core.duplication.FastMCPAuthIntegration") as mock_auth:
376 |             mock_auth.get_pipeboard_token.return_value = "pipeboard_token"
377 |             mock_auth.get_auth_token.return_value = "facebook_token"
378 |             
379 |             with patch("meta_ads_mcp.core.duplication.httpx.AsyncClient") as mock_client:
380 |                 mock_response = AsyncMock()
381 |                 mock_response.status_code = 200
382 |                 mock_response.json.return_value = {"success": True}
383 |                 mock_client.return_value.__aenter__.return_value.post.return_value = mock_response
384 |                 
385 |                 # Test with options containing None values
386 |                 options_with_none = {
387 |                     "name_suffix": " - Test",
388 |                     "new_daily_budget": None,
389 |                     "new_status": "PAUSED",
390 |                     "new_headline": None,
391 |                 }
392 |                 
393 |                 await duplication._forward_duplication_request(
394 |                     "campaign", "123", "token", options_with_none
395 |                 )
396 |                 
397 |                 # Verify None values were filtered out
398 |                 call_args = mock_client.return_value.__aenter__.return_value.post.call_args
399 |                 json_payload = call_args[1]["json"]
400 |                 
401 |                 assert "name_suffix" in json_payload
402 |                 assert "new_status" in json_payload
403 |                 assert "new_daily_budget" not in json_payload
404 |                 assert "new_headline" not in json_payload
405 |     
406 |     @pytest.mark.asyncio
407 |     async def test_campaign_duplication_parameter_forwarding(self, enable_feature):
408 |         """Test that campaign duplication forwards all parameters correctly."""
409 |         duplication = enable_feature
410 |         
411 |         with patch("meta_ads_mcp.core.duplication._forward_duplication_request") as mock_forward:
412 |             mock_forward.return_value = '{"success": true}'
413 |             
414 |             # Test with all parameters
415 |             result = await duplication.duplicate_campaign(
416 |                 campaign_id="123456789",
417 |                 access_token="test_token",
418 |                 name_suffix=" - New Copy",
419 |                 include_ad_sets=False,
420 |                 include_ads=True,
421 |                 include_creatives=False,
422 |                 copy_schedule=True,
423 |                 new_daily_budget=100.50,
424 |                 new_status="ACTIVE"
425 |             )
426 |             
427 |             # Verify parameters were forwarded correctly
428 |             mock_forward.assert_called_once_with(
429 |                 "campaign",
430 |                 "123456789",
431 |                 "test_token",
432 |                 {
433 |                     "name_suffix": " - New Copy",
434 |                     "include_ad_sets": False,
435 |                     "include_ads": True,
436 |                     "include_creatives": False,
437 |                     "copy_schedule": True,
438 |                     "new_daily_budget": 100.50,
439 |                     "new_status": "ACTIVE"
440 |                 }
441 |             )
442 |     
443 |     @pytest.mark.asyncio
444 |     async def test_adset_duplication_parameter_forwarding(self, enable_feature):
445 |         """Test that ad set duplication forwards all parameters correctly including new_targeting."""
446 |         duplication = enable_feature
447 |         
448 |         with patch("meta_ads_mcp.core.duplication._forward_duplication_request") as mock_forward:
449 |             mock_forward.return_value = '{"success": true}'
450 |             
451 |             # Test with all parameters including new_targeting
452 |             result = await duplication.duplicate_adset(
453 |                 adset_id="987654321",
454 |                 access_token="test_token",
455 |                 target_campaign_id="campaign_123",
456 |                 name_suffix=" - Targeted Copy",
457 |                 include_ads=False,
458 |                 include_creatives=True,
459 |                 new_daily_budget=200.00,
460 |                 new_targeting={
461 |                     "age_min": 25,
462 |                     "age_max": 45,
463 |                     "geo_locations": {
464 |                         "countries": ["US", "CA"]
465 |                     }
466 |                 },
467 |                 new_status="ACTIVE"
468 |             )
469 |             
470 |             # Verify parameters were forwarded correctly
471 |             mock_forward.assert_called_once_with(
472 |                 "adset",
473 |                 "987654321",
474 |                 "test_token",
475 |                 {
476 |                     "target_campaign_id": "campaign_123",
477 |                     "name_suffix": " - Targeted Copy",
478 |                     "include_ads": False,
479 |                     "include_creatives": True,
480 |                     "new_daily_budget": 200.00,
481 |                     "new_targeting": {
482 |                         "age_min": 25,
483 |                         "age_max": 45,
484 |                         "geo_locations": {
485 |                             "countries": ["US", "CA"]
486 |                         }
487 |                     },
488 |                     "new_status": "ACTIVE"
489 |                 }
490 |             )
491 |     
492 |     def test_estimated_components_calculation(self, enable_feature):
493 |         """Test that estimated components are calculated correctly."""
494 |         duplication = enable_feature
495 |         
496 |         test_cases = [
497 |             # Campaign with all components
498 |             ("campaign", {"include_ad_sets": True, "include_ads": True, "include_creatives": True}, 
499 |              {"campaigns": 1, "ad_sets": "3-5 (estimated)", "ads": "5-15 (estimated)", "creatives": "5-15 (estimated)"}),
500 |             
501 |             # Campaign with no sub-components
502 |             ("campaign", {"include_ad_sets": False, "include_ads": False, "include_creatives": False},
503 |              {"campaigns": 1}),
504 |             
505 |             # Ad set with ads
506 |             ("adset", {"include_ads": True, "include_creatives": True},
507 |              {"ad_sets": 1, "ads": "2-5 (estimated)", "creatives": "2-5 (estimated)"}),
508 |             
509 |             # Ad set without ads
510 |             ("adset", {"include_ads": False, "include_creatives": False},
511 |              {"ad_sets": 1}),
512 |             
513 |             # Single ad with creative
514 |             ("ad", {"duplicate_creative": True},
515 |              {"ads": 1, "creatives": 1}),
516 |             
517 |             # Single ad without creative
518 |             ("ad", {"duplicate_creative": False},
519 |              {"ads": 1}),
520 |             
521 |             # Single creative
522 |             ("creative", {},
523 |              {"creatives": 1}),
524 |         ]
525 |         
526 |         for resource_type, options, expected in test_cases:
527 |             result = duplication._get_estimated_components(resource_type, options)
528 |             assert result == expected, f"Failed for {resource_type} with {options}"
529 | 
530 | 
531 | class TestDuplicationIntegration:
532 |     """Integration tests to prevent regression in end-to-end functionality."""
533 |     
534 |     @pytest.fixture(autouse=True)
535 |     def enable_feature(self):
536 |         """Enable the duplication feature for these tests."""
537 |         with patch.dict(os.environ, {"META_ADS_ENABLE_DUPLICATION": "1"}):
538 |             import importlib
539 |             from meta_ads_mcp.core import duplication
540 |             importlib.reload(duplication)
541 |             yield duplication
542 |     
543 |     @pytest.mark.asyncio
544 |     async def test_end_to_end_successful_duplication(self, enable_feature):
545 |         """Test complete successful duplication flow."""
546 |         duplication = enable_feature
547 |         
548 |         # Mock the auth system completely to bypass the @meta_api_tool decorator checks
549 |         with patch("meta_ads_mcp.core.duplication.FastMCPAuthIntegration") as mock_auth_integration:
550 |             # Mock dual authentication tokens
551 |             mock_auth_integration.get_pipeboard_token.return_value = "pipeboard_token"
552 |             mock_auth_integration.get_auth_token.return_value = "facebook_token"
553 |             
554 |             with patch("meta_ads_mcp.core.duplication.httpx.AsyncClient") as mock_client:
555 |                         # Mock successful response
556 |                         mock_response = MagicMock()
557 |                         mock_response.status_code = 200
558 |                         mock_response.json.return_value = {
559 |                             "success": True,
560 |                             "original_campaign_id": "123456789",
561 |                             "new_campaign_id": "987654321",
562 |                             "duplicated_components": {
563 |                                 "campaign": {"id": "987654321", "name": "Test Campaign - Copy"},
564 |                                 "ad_sets": [{"id": "111", "name": "Ad Set 1 - Copy"}],
565 |                                 "ads": [{"id": "222", "name": "Ad 1 - Copy"}],
566 |                                 "creatives": [{"id": "333", "name": "Creative 1 - Copy"}]
567 |                             },
568 |                             "warnings": [],
569 |                             "subscription": {
570 |                                 "status": "active"
571 |                             }
572 |                         }
573 |                         mock_client.return_value.__aenter__.return_value.post.return_value = mock_response
574 |                         
575 |                         # Call the function with explicit token
576 |                         result = await duplication.duplicate_campaign(
577 |                             campaign_id="123456789",
578 |                             access_token="facebook_token",  # Use the mocked token
579 |                             name_suffix=" - Test Copy"
580 |                         )
581 |                         
582 |                         # Verify result - handle @meta_api_tool wrapper
583 |                         result_json = json.loads(result)
584 |                         if "data" in result_json:
585 |                             actual_result = json.loads(result_json["data"])
586 |                         else:
587 |                             actual_result = result_json
588 |                             
589 |                         assert actual_result["success"] is True
590 |                         assert actual_result["new_campaign_id"] == "987654321"
591 |                         assert "duplicated_components" in actual_result
592 |     
593 |     @pytest.mark.asyncio
594 |     async def test_facebook_connection_error_flow(self, enable_feature):
595 |         """Test Facebook connection required error flow."""
596 |         duplication = enable_feature
597 |         
598 |         # Mock the auth system completely to bypass the @meta_api_tool decorator checks  
599 |         with patch("meta_ads_mcp.core.duplication.FastMCPAuthIntegration") as mock_auth_integration:
600 |             # Mock dual authentication tokens
601 |             mock_auth_integration.get_pipeboard_token.return_value = "pipeboard_token"
602 |             mock_auth_integration.get_auth_token.return_value = "facebook_token"
603 |             
604 |             with patch("meta_ads_mcp.core.duplication.httpx.AsyncClient") as mock_client:
605 |                         # Mock 403 response (Facebook connection required)
606 |                         mock_response = MagicMock()
607 |                         mock_response.status_code = 403
608 |                         mock_response.json.return_value = {
609 |                             "message": "You need to connect your Facebook account first",
610 |                             "details": {
611 |                                 "login_flow_url": "/connections",
612 |                                 "auth_flow_url": "/api/meta/auth"
613 |                             }
614 |                         }
615 |                         mock_client.return_value.__aenter__.return_value.post.return_value = mock_response
616 |                         
617 |                         result = await duplication.duplicate_campaign(
618 |                             campaign_id="123456789",
619 |                             access_token="facebook_token"  # Use the mocked token
620 |                         )
621 |                         
622 |                         # The @meta_api_tool decorator wraps the result in a data field
623 |                         result_json = json.loads(result)
624 |                         if "data" in result_json:
625 |                             actual_result = json.loads(result_json["data"])
626 |                         else:
627 |                             actual_result = result_json
628 |                             
629 |                         assert actual_result["success"] is False
630 |                         assert actual_result["error"] == "facebook_connection_required"
631 |                         assert actual_result["message"] == "You need to connect your Facebook account first"
632 |                         assert "details" in actual_result
633 |                         assert actual_result["details"]["login_flow_url"] == "/connections"
634 |     
635 |     @pytest.mark.asyncio
636 |     async def test_subscription_required_error_flow(self, enable_feature):
637 |         """Test subscription required error flow."""
638 |         duplication = enable_feature
639 |         
640 |         # Mock the auth system completely to bypass the @meta_api_tool decorator checks
641 |         with patch("meta_ads_mcp.core.duplication.FastMCPAuthIntegration") as mock_auth_integration:
642 |             # Mock dual authentication tokens
643 |             mock_auth_integration.get_pipeboard_token.return_value = "pipeboard_token"
644 |             mock_auth_integration.get_auth_token.return_value = "facebook_token"
645 |             
646 |             with patch("meta_ads_mcp.core.duplication.httpx.AsyncClient") as mock_client:
647 |                         # Mock 402 response (subscription required)
648 |                         mock_response = MagicMock()
649 |                         mock_response.status_code = 402
650 |                         mock_response.json.return_value = {
651 |                             "message": "This feature is not available in your current plan",
652 |                             "upgrade_url": "https://pipeboard.co/upgrade",
653 |                             "suggestion": "Please upgrade your account to access this feature"
654 |                         }
655 |                         mock_client.return_value.__aenter__.return_value.post.return_value = mock_response
656 |                         
657 |                         result = await duplication.duplicate_campaign(
658 |                             campaign_id="123456789",
659 |                             access_token="facebook_token"  # Use the mocked token
660 |                         )
661 |                         
662 |                         # The @meta_api_tool decorator wraps the result in a data field
663 |                         result_json = json.loads(result)
664 |                         if "data" in result_json:
665 |                             actual_result = json.loads(result_json["data"])
666 |                         else:
667 |                             actual_result = result_json
668 |                             
669 |                         assert actual_result["success"] is False
670 |                         assert actual_result["error"] == "subscription_required"
671 |                         assert actual_result["message"] == "This feature is not available in your current plan"
672 |                         assert actual_result["upgrade_url"] == "https://pipeboard.co/upgrade"
673 |                         assert actual_result["suggestion"] == "Please upgrade your account to access this feature"
674 | 
675 | 
676 | class TestDuplicationTokenHandling:
677 |     """Test access token handling to prevent auth regression."""
678 |     
679 |     @pytest.fixture(autouse=True)
680 |     def enable_feature(self):
681 |         """Enable the duplication feature for these tests."""
682 |         with patch.dict(os.environ, {"META_ADS_ENABLE_DUPLICATION": "1"}):
683 |             import importlib
684 |             from meta_ads_mcp.core import duplication
685 |             importlib.reload(duplication)
686 |             yield duplication
687 |     
688 |     @pytest.mark.asyncio
689 |     async def test_meta_api_tool_decorator_token_handling(self, enable_feature):
690 |         """Test that @meta_api_tool decorator properly handles explicit tokens."""
691 |         duplication = enable_feature
692 |         
693 |         # Test with explicit token - this should bypass auth system entirely
694 |         with patch("meta_ads_mcp.core.duplication._forward_duplication_request") as mock_forward:
695 |             mock_forward.return_value = '{"success": true}'
696 |             
697 |             # Call with explicit access_token
698 |             await duplication.duplicate_campaign(
699 |                 campaign_id="123456789",
700 |                 access_token="explicit_token_12345"
701 |             )
702 |             
703 |             # Verify the explicit token was passed through
704 |             mock_forward.assert_called_once()
705 |             call_args = mock_forward.call_args[0]
706 |             assert call_args[2] == "explicit_token_12345"  # access_token is 3rd argument
707 |     
708 |     @pytest.mark.asyncio
709 |     async def test_explicit_token_overrides_injection(self, enable_feature):
710 |         """Test that explicit token overrides auto-injection."""
711 |         duplication = enable_feature
712 |         
713 |         with patch("meta_ads_mcp.core.auth.get_current_access_token") as mock_get_token:
714 |             mock_get_token.return_value = "injected_token"
715 |             
716 |             with patch("meta_ads_mcp.core.duplication._forward_duplication_request") as mock_forward:
717 |                 mock_forward.return_value = '{"success": true}'
718 |                 
719 |                 # Call with explicit access_token
720 |                 await duplication.duplicate_campaign(
721 |                     campaign_id="123456789",
722 |                     access_token="explicit_token_12345"
723 |                 )
724 |                 
725 |                 # Verify the explicit token was used, not the injected one
726 |                 mock_forward.assert_called_once()
727 |                 call_args = mock_forward.call_args[0]
728 |                 assert call_args[2] == "explicit_token_12345"  # access_token is 3rd argument
729 | 
730 | 
731 | class TestDuplicationRegressionEdgeCases:
732 |     """Test edge cases that could cause regressions."""
733 |     
734 |     @pytest.fixture(autouse=True)
735 |     def enable_feature(self):
736 |         """Enable the duplication feature for these tests."""
737 |         with patch.dict(os.environ, {"META_ADS_ENABLE_DUPLICATION": "1"}):
738 |             import importlib
739 |             from meta_ads_mcp.core import duplication
740 |             importlib.reload(duplication)
741 |             yield duplication
742 |     
743 |     @pytest.mark.asyncio
744 |     async def test_empty_string_parameters(self, enable_feature):
745 |         """Test handling of empty string parameters."""
746 |         duplication = enable_feature
747 |         
748 |         with patch("meta_ads_mcp.core.duplication._forward_duplication_request") as mock_forward:
749 |             mock_forward.return_value = '{"success": true}'
750 |             
751 |             # Test with empty strings
752 |             await duplication.duplicate_campaign(
753 |                 campaign_id="123456789",
754 |                 access_token="token",
755 |                 name_suffix="",  # Empty string
756 |                 new_status=""    # Empty string
757 |             )
758 |             
759 |             # Verify empty strings are preserved (not filtered like None)
760 |             call_args = mock_forward.call_args[0]
761 |             options = call_args[3]
762 |             assert options["name_suffix"] == ""
763 |             assert options["new_status"] == ""
764 |     
765 |     @pytest.mark.asyncio
766 |     async def test_unicode_parameters(self, enable_feature):
767 |         """Test handling of unicode parameters."""
768 |         duplication = enable_feature
769 |         
770 |         # Mock dual-header authentication
771 |         with patch("meta_ads_mcp.core.duplication.FastMCPAuthIntegration") as mock_auth:
772 |             mock_auth.get_pipeboard_token.return_value = "pipeboard_token"
773 |             mock_auth.get_auth_token.return_value = "facebook_token"
774 |             
775 |             with patch("meta_ads_mcp.core.duplication.httpx.AsyncClient") as mock_client:
776 |                 mock_response = AsyncMock()
777 |                 mock_response.status_code = 200
778 |                 mock_response.json.return_value = {"success": True}
779 |                 mock_client.return_value.__aenter__.return_value.post.return_value = mock_response
780 |                 
781 |                 # Test with unicode characters
782 |                 unicode_suffix = " - 复制版本 🚀"
783 |                 await duplication._forward_duplication_request(
784 |                     "campaign", "123", "token", {"name_suffix": unicode_suffix}
785 |                 )
786 |                 
787 |                 # Verify unicode is preserved in the request
788 |                 call_args = mock_client.return_value.__aenter__.return_value.post.call_args
789 |                 json_payload = call_args[1]["json"]
790 |                 assert json_payload["name_suffix"] == unicode_suffix
791 | 
792 |     @pytest.mark.asyncio
793 |     async def test_large_parameter_values(self, enable_feature):
794 |         """Test handling of large parameter values."""
795 |         duplication = enable_feature
796 |         
797 |         # Mock dual-header authentication
798 |         with patch("meta_ads_mcp.core.duplication.FastMCPAuthIntegration") as mock_auth:
799 |             mock_auth.get_pipeboard_token.return_value = "pipeboard_token"
800 |             mock_auth.get_auth_token.return_value = "facebook_token"
801 |             
802 |             with patch("meta_ads_mcp.core.duplication.httpx.AsyncClient") as mock_client:
803 |                 mock_response = AsyncMock()
804 |                 mock_response.status_code = 200
805 |                 mock_response.json.return_value = {"success": True}
806 |                 mock_client.return_value.__aenter__.return_value.post.return_value = mock_response
807 |                 
808 |                 # Test with very large budget value
809 |                 large_budget = 999999999.99
810 |                 await duplication._forward_duplication_request(
811 |                     "campaign", "123", "token", {"new_daily_budget": large_budget}
812 |                 )
813 |                 
814 |                 # Verify large values are preserved
815 |                 call_args = mock_client.return_value.__aenter__.return_value.post.call_args
816 |                 json_payload = call_args[1]["json"]
817 |                 assert json_payload["new_daily_budget"] == large_budget
818 |     
819 |     def test_module_reload_safety(self):
820 |         """Test that module can be safely reloaded without side effects."""
821 |         # This tests for common issues like global state pollution
822 |         
823 |         # Enable feature
824 |         with patch.dict(os.environ, {"META_ADS_ENABLE_DUPLICATION": "1"}):
825 |             import importlib
826 |             from meta_ads_mcp.core import duplication
827 |             importlib.reload(duplication)
828 |             
829 |             assert duplication.ENABLE_DUPLICATION
830 |             assert hasattr(duplication, 'duplicate_campaign')
831 |         
832 |         # Disable feature and reload
833 |         with patch.dict(os.environ, {"META_ADS_ENABLE_DUPLICATION": ""}):
834 |             importlib.reload(duplication)
835 |             
836 |             assert not duplication.ENABLE_DUPLICATION
837 |             # Note: Functions may still exist in the module namespace due to Python's 
838 |             # module loading behavior, but they won't be registered as MCP tools
839 |             # This is expected behavior and not a problem for the feature toggle
840 |         
841 |         # Re-enable feature and reload
842 |         with patch.dict(os.environ, {"META_ADS_ENABLE_DUPLICATION": "1"}):
843 |             importlib.reload(duplication)
844 |             
845 |             assert duplication.ENABLE_DUPLICATION
846 |             assert hasattr(duplication, 'duplicate_campaign') 
```
Page 5/6FirstPrevNextLast