This is page 5 of 6. Use http://codebase.md/nictuku/meta-ads-mcp?lines=true&page={x} to view the full context.
# Directory Structure
```
├── .github
│ └── workflows
│ ├── publish-mcp.yml
│ ├── publish.yml
│ └── test.yml
├── .gitignore
├── .python-version
├── .uv.toml
├── CUSTOM_META_APP.md
├── Dockerfile
├── examples
│ ├── example_http_client.py
│ └── README.md
├── future_improvements.md
├── images
│ └── meta-ads-example.png
├── LICENSE
├── LOCAL_INSTALLATION.md
├── meta_ads_auth.sh
├── meta_ads_mcp
│ ├── __init__.py
│ ├── __main__.py
│ └── core
│ ├── __init__.py
│ ├── accounts.py
│ ├── ads_library.py
│ ├── ads.py
│ ├── adsets.py
│ ├── api.py
│ ├── auth.py
│ ├── authentication.py
│ ├── budget_schedules.py
│ ├── callback_server.py
│ ├── campaigns.py
│ ├── duplication.py
│ ├── http_auth_integration.py
│ ├── insights.py
│ ├── openai_deep_research.py
│ ├── pipeboard_auth.py
│ ├── reports.py
│ ├── resources.py
│ ├── server.py
│ ├── targeting.py
│ └── utils.py
├── META_API_NOTES.md
├── poetry.lock
├── pyproject.toml
├── README.md
├── RELEASE.md
├── requirements.txt
├── server.json
├── setup.py
├── smithery.yaml
├── STREAMABLE_HTTP_SETUP.md
└── tests
├── __init__.py
├── conftest.py
├── e2e_account_info_search_issue.py
├── README_REGRESSION_TESTS.md
├── README.md
├── test_account_info_access_fix.py
├── test_account_search.py
├── test_budget_update_e2e.py
├── test_budget_update.py
├── test_create_ad_creative_simple.py
├── test_create_simple_creative_e2e.py
├── test_dsa_beneficiary.py
├── test_dsa_integration.py
├── test_duplication_regression.py
├── test_duplication.py
├── test_dynamic_creatives.py
├── test_estimate_audience_size_e2e.py
├── test_estimate_audience_size.py
├── test_get_account_pages.py
├── test_get_ad_creatives_fix.py
├── test_get_ad_image_quality_improvements.py
├── test_get_ad_image_regression.py
├── test_http_transport.py
├── test_insights_actions_and_values_e2e.py
├── test_insights_pagination.py
├── test_integration_openai_mcp.py
├── test_is_dynamic_creative_adset.py
├── test_mobile_app_adset_creation.py
├── test_mobile_app_adset_issue.py
├── test_openai_mcp_deep_research.py
├── test_openai.py
├── test_page_discovery_integration.py
├── test_page_discovery.py
├── test_targeting_search_e2e.py
├── test_targeting.py
├── test_update_ad_creative_id.py
└── test_upload_ad_image.py
```
# Files
--------------------------------------------------------------------------------
/meta_ads_mcp/core/targeting.py:
--------------------------------------------------------------------------------
```python
1 | """Targeting search functionality for Meta Ads API."""
2 |
3 | import json
4 | from typing import Optional, List, Dict, Any
5 | import os
6 | from .api import meta_api_tool, make_api_request
7 | from .server import mcp_server
8 |
9 |
10 | @mcp_server.tool()
11 | @meta_api_tool
12 | async def search_interests(query: str, access_token: Optional[str] = None, limit: int = 25) -> str:
13 | """
14 | Search for interest targeting options by keyword.
15 |
16 | Args:
17 | query: Search term for interests (e.g., "baseball", "cooking", "travel")
18 | access_token: Meta API access token (optional - will use cached token if not provided)
19 | limit: Maximum number of results to return (default: 25)
20 |
21 | Returns:
22 | JSON string containing interest data with id, name, audience_size, and path fields
23 | """
24 | if not query:
25 | return json.dumps({"error": "No search query provided"}, indent=2)
26 |
27 | endpoint = "search"
28 | params = {
29 | "type": "adinterest",
30 | "q": query,
31 | "limit": limit
32 | }
33 |
34 | data = await make_api_request(endpoint, access_token, params)
35 |
36 | return json.dumps(data, indent=2)
37 |
38 |
39 | @mcp_server.tool()
40 | @meta_api_tool
41 | async def get_interest_suggestions(interest_list: List[str], access_token: Optional[str] = None, limit: int = 25) -> str:
42 | """
43 | Get interest suggestions based on existing interests.
44 |
45 | Args:
46 | interest_list: List of interest names to get suggestions for (e.g., ["Basketball", "Soccer"])
47 | access_token: Meta API access token (optional - will use cached token if not provided)
48 | limit: Maximum number of suggestions to return (default: 25)
49 |
50 | Returns:
51 | JSON string containing suggested interests with id, name, audience_size, and description fields
52 | """
53 | if not interest_list:
54 | return json.dumps({"error": "No interest list provided"}, indent=2)
55 |
56 | endpoint = "search"
57 | params = {
58 | "type": "adinterestsuggestion",
59 | "interest_list": json.dumps(interest_list),
60 | "limit": limit
61 | }
62 |
63 | data = await make_api_request(endpoint, access_token, params)
64 |
65 | return json.dumps(data, indent=2)
66 |
67 |
68 | @mcp_server.tool()
69 | @meta_api_tool
70 | async def estimate_audience_size(
71 | access_token: Optional[str] = None,
72 | account_id: Optional[str] = None,
73 | targeting: Optional[Dict[str, Any]] = None,
74 | optimization_goal: str = "REACH",
75 | # Backwards compatibility for simple interest validation
76 | interest_list: Optional[List[str]] = None,
77 | interest_fbid_list: Optional[List[str]] = None
78 | ) -> str:
79 | """
80 | Estimate audience size for targeting specifications using Meta's delivery_estimate API.
81 |
82 | This function provides comprehensive audience estimation for complex targeting combinations
83 | including demographics, geography, interests, and behaviors. It also maintains backwards
84 | compatibility for simple interest validation.
85 |
86 | Args:
87 | access_token: Meta API access token (optional - will use cached token if not provided)
88 | account_id: Meta Ads account ID (format: act_XXXXXXXXX) - required for comprehensive estimation
89 | targeting: Complete targeting specification including demographics, geography, interests, etc.
90 | Example: {
91 | "age_min": 25,
92 | "age_max": 65,
93 | "geo_locations": {"countries": ["PL"]},
94 | "flexible_spec": [
95 | {"interests": [{"id": "6003371567474"}]},
96 | {"interests": [{"id": "6003462346642"}]}
97 | ]
98 | }
99 | optimization_goal: Optimization goal for estimation (default: "REACH").
100 | Options: "REACH", "LINK_CLICKS", "IMPRESSIONS", "CONVERSIONS", etc.
101 | interest_list: [DEPRECATED - for backwards compatibility] List of interest names to validate
102 | interest_fbid_list: [DEPRECATED - for backwards compatibility] List of interest IDs to validate
103 |
104 | Returns:
105 | JSON string with audience estimation results including estimated_audience_size,
106 | reach_estimate, and targeting validation
107 | """
108 | # Handle backwards compatibility - simple interest validation
109 | # Check if we're in backwards compatibility mode (interest params provided OR no comprehensive params)
110 | is_backwards_compatible_call = (interest_list or interest_fbid_list) or (not account_id and not targeting)
111 |
112 | if is_backwards_compatible_call and not targeting:
113 | if not interest_list and not interest_fbid_list:
114 | return json.dumps({"error": "No interest list or FBID list provided"}, indent=2)
115 |
116 | endpoint = "search"
117 | params = {
118 | "type": "adinterestvalid"
119 | }
120 |
121 | if interest_list:
122 | params["interest_list"] = json.dumps(interest_list)
123 |
124 | if interest_fbid_list:
125 | params["interest_fbid_list"] = json.dumps(interest_fbid_list)
126 |
127 | data = await make_api_request(endpoint, access_token, params)
128 |
129 | return json.dumps(data, indent=2)
130 |
131 | # Comprehensive audience estimation using delivery_estimate API
132 | if not account_id:
133 | return json.dumps({
134 | "error": "account_id is required for comprehensive audience estimation",
135 | "details": "For simple interest validation, use interest_list or interest_fbid_list parameters"
136 | }, indent=2)
137 |
138 | if not targeting:
139 | return json.dumps({
140 | "error": "targeting specification is required for comprehensive audience estimation",
141 | "example": {
142 | "age_min": 25,
143 | "age_max": 65,
144 | "geo_locations": {"countries": ["US"]},
145 | "flexible_spec": [
146 | {"interests": [{"id": "6003371567474"}]}
147 | ]
148 | }
149 | }, indent=2)
150 |
151 | # Preflight validation: require at least one location OR a custom audience
152 | def _has_location_or_custom_audience(t: Dict[str, Any]) -> bool:
153 | if not isinstance(t, dict):
154 | return False
155 | geo = t.get("geo_locations") or {}
156 | if isinstance(geo, dict):
157 | for key in [
158 | "countries",
159 | "regions",
160 | "cities",
161 | "zips",
162 | "geo_markets",
163 | "country_groups"
164 | ]:
165 | val = geo.get(key)
166 | if isinstance(val, list) and len(val) > 0:
167 | return True
168 | # Top-level custom audiences
169 | ca = t.get("custom_audiences")
170 | if isinstance(ca, list) and len(ca) > 0:
171 | return True
172 | # Custom audiences within flexible_spec
173 | flex = t.get("flexible_spec")
174 | if isinstance(flex, list):
175 | for spec in flex:
176 | if isinstance(spec, dict):
177 | ca_spec = spec.get("custom_audiences")
178 | if isinstance(ca_spec, list) and len(ca_spec) > 0:
179 | return True
180 | return False
181 |
182 | if not _has_location_or_custom_audience(targeting):
183 | return json.dumps({
184 | "error": "Missing target audience location",
185 | "details": "Select at least one location in targeting.geo_locations or include a custom audience.",
186 | "action_required": "Add geo_locations with countries/regions/cities/zips or include custom_audiences.",
187 | "example": {
188 | "geo_locations": {"countries": ["US"]},
189 | "age_min": 25,
190 | "age_max": 65
191 | }
192 | }, indent=2)
193 |
194 | # Build reach estimate request (using correct Meta API endpoint)
195 | endpoint = f"{account_id}/reachestimate"
196 | params = {
197 | "targeting_spec": targeting
198 | }
199 |
200 | # Note: reachestimate endpoint doesn't support optimization_goal or objective parameters
201 |
202 | try:
203 | data = await make_api_request(endpoint, access_token, params, method="GET")
204 |
205 | # Surface Graph API errors directly for better diagnostics.
206 | # If reachestimate fails, optionally attempt a fallback using delivery_estimate.
207 | if isinstance(data, dict) and "error" in data:
208 | # Special handling for Missing Target Audience Location error (subcode 1885364)
209 | try:
210 | err_wrapper = data.get("error", {})
211 | details_obj = err_wrapper.get("details", {})
212 | raw_err = details_obj.get("error", {}) if isinstance(details_obj, dict) else {}
213 | if (
214 | isinstance(raw_err, dict) and (
215 | raw_err.get("error_subcode") == 1885364 or
216 | raw_err.get("error_user_title") == "Missing Target Audience Location"
217 | )
218 | ):
219 | return json.dumps({
220 | "error": "Missing target audience location",
221 | "details": raw_err.get("error_user_msg") or "Select at least one location, or choose a custom audience.",
222 | "endpoint_used": f"{account_id}/reachestimate",
223 | "action_required": "Add geo_locations with at least one of countries/regions/cities/zips or include custom_audiences.",
224 | "blame_field_specs": raw_err.get("error_data", {}).get("blame_field_specs") if isinstance(raw_err.get("error_data"), dict) else None
225 | }, indent=2)
226 | except Exception:
227 | pass
228 | # Allow disabling fallback via environment variable
229 | # Default: fallback disabled unless explicitly enabled by setting DISABLE flag to "0"
230 | disable_fallback = os.environ.get("META_MCP_DISABLE_DELIVERY_FALLBACK", "1") == "1"
231 | if disable_fallback:
232 | return json.dumps({
233 | "error": "Graph API returned an error for reachestimate",
234 | "details": data.get("error"),
235 | "endpoint_used": f"{account_id}/reachestimate",
236 | "request_params": {
237 | "has_targeting_spec": bool(targeting),
238 | },
239 | "note": "delivery_estimate fallback disabled via META_MCP_DISABLE_DELIVERY_FALLBACK"
240 | }, indent=2)
241 |
242 | # Try fallback to delivery_estimate endpoint
243 | try:
244 | fallback_endpoint = f"{account_id}/delivery_estimate"
245 | fallback_params = {
246 | "targeting_spec": json.dumps(targeting),
247 | # Some API versions accept optimization_goal here
248 | "optimization_goal": optimization_goal
249 | }
250 | fallback_data = await make_api_request(fallback_endpoint, access_token, fallback_params, method="GET")
251 |
252 | # If fallback returns usable data, format similarly
253 | if isinstance(fallback_data, dict) and "data" in fallback_data and len(fallback_data["data"]) > 0:
254 | estimate_data = fallback_data["data"][0]
255 | formatted_response = {
256 | "success": True,
257 | "account_id": account_id,
258 | "targeting": targeting,
259 | "optimization_goal": optimization_goal,
260 | "estimated_audience_size": estimate_data.get("estimate_mau", 0),
261 | "estimate_details": {
262 | "monthly_active_users": estimate_data.get("estimate_mau", 0),
263 | "daily_outcomes_curve": estimate_data.get("estimate_dau", []),
264 | "bid_estimate": estimate_data.get("bid_estimates", {}),
265 | "unsupported_targeting": estimate_data.get("unsupported_targeting", [])
266 | },
267 | "raw_response": fallback_data,
268 | "fallback_endpoint_used": "delivery_estimate"
269 | }
270 | return json.dumps(formatted_response, indent=2)
271 |
272 | # Fallback returned but not in expected format
273 | return json.dumps({
274 | "error": "Graph API returned an error for reachestimate; delivery_estimate fallback did not return usable data",
275 | "reachestimate_error": data.get("error"),
276 | "fallback_endpoint_used": "delivery_estimate",
277 | "fallback_raw_response": fallback_data,
278 | "endpoint_used": f"{account_id}/reachestimate",
279 | "request_params": {
280 | "has_targeting_spec": bool(targeting)
281 | }
282 | }, indent=2)
283 | except Exception as _fallback_exc:
284 | return json.dumps({
285 | "error": "Graph API returned an error for reachestimate; delivery_estimate fallback also failed",
286 | "reachestimate_error": data.get("error"),
287 | "fallback_endpoint_used": "delivery_estimate",
288 | "fallback_exception": str(_fallback_exc),
289 | "endpoint_used": f"{account_id}/reachestimate",
290 | "request_params": {
291 | "has_targeting_spec": bool(targeting)
292 | }
293 | }, indent=2)
294 |
295 | # Format the response for easier consumption
296 | if "data" in data:
297 | response_data = data["data"]
298 | # Case 1: delivery_estimate-like list structure
299 | if isinstance(response_data, list) and len(response_data) > 0:
300 | estimate_data = response_data[0]
301 | formatted_response = {
302 | "success": True,
303 | "account_id": account_id,
304 | "targeting": targeting,
305 | "optimization_goal": optimization_goal,
306 | "estimated_audience_size": estimate_data.get("estimate_mau", 0),
307 | "estimate_details": {
308 | "monthly_active_users": estimate_data.get("estimate_mau", 0),
309 | "daily_outcomes_curve": estimate_data.get("estimate_dau", []),
310 | "bid_estimate": estimate_data.get("bid_estimates", {}),
311 | "unsupported_targeting": estimate_data.get("unsupported_targeting", [])
312 | },
313 | "raw_response": data
314 | }
315 | return json.dumps(formatted_response, indent=2)
316 | # Case 1b: explicit handling for empty list responses
317 | if isinstance(response_data, list) and len(response_data) == 0:
318 | return json.dumps({
319 | "error": "No estimation data returned from Meta API",
320 | "raw_response": data,
321 | "debug_info": {
322 | "response_keys": list(data.keys()) if isinstance(data, dict) else "not_a_dict",
323 | "response_type": str(type(data)),
324 | "endpoint_used": f"{account_id}/reachestimate"
325 | }
326 | }, indent=2)
327 | # Case 2: reachestimate dict structure with bounds
328 | if isinstance(response_data, dict):
329 | lower = response_data.get("users_lower_bound", response_data.get("estimate_mau_lower_bound"))
330 | upper = response_data.get("users_upper_bound", response_data.get("estimate_mau_upper_bound"))
331 | estimate_ready = response_data.get("estimate_ready")
332 | midpoint = None
333 | try:
334 | if isinstance(lower, (int, float)) and isinstance(upper, (int, float)):
335 | midpoint = int((lower + upper) / 2)
336 | except Exception:
337 | midpoint = None
338 | formatted_response = {
339 | "success": True,
340 | "account_id": account_id,
341 | "targeting": targeting,
342 | "optimization_goal": optimization_goal,
343 | "estimated_audience_size": midpoint if midpoint is not None else 0,
344 | "estimate_details": {
345 | "users_lower_bound": lower,
346 | "users_upper_bound": upper,
347 | "estimate_ready": estimate_ready
348 | },
349 | "raw_response": data
350 | }
351 | return json.dumps(formatted_response, indent=2)
352 | else:
353 | return json.dumps({
354 | "error": "No estimation data returned from Meta API",
355 | "raw_response": data,
356 | "debug_info": {
357 | "response_keys": list(data.keys()) if isinstance(data, dict) else "not_a_dict",
358 | "response_type": str(type(data)),
359 | "endpoint_used": f"{account_id}/reachestimate"
360 | }
361 | }, indent=2)
362 |
363 | except Exception as e:
364 | # Try fallback to delivery_estimate first when an exception occurs (unless disabled)
365 | # Default: fallback disabled unless explicitly enabled by setting DISABLE flag to "0"
366 | disable_fallback = os.environ.get("META_MCP_DISABLE_DELIVERY_FALLBACK", "1") == "1"
367 | if not disable_fallback:
368 | try:
369 | fallback_endpoint = f"{account_id}/delivery_estimate"
370 | fallback_params = {
371 | "targeting_spec": json.dumps(targeting) if isinstance(targeting, dict) else targeting,
372 | "optimization_goal": optimization_goal
373 | }
374 | fallback_data = await make_api_request(fallback_endpoint, access_token, fallback_params, method="GET")
375 |
376 | if isinstance(fallback_data, dict) and "data" in fallback_data and len(fallback_data["data"]) > 0:
377 | estimate_data = fallback_data["data"][0]
378 | formatted_response = {
379 | "success": True,
380 | "account_id": account_id,
381 | "targeting": targeting,
382 | "optimization_goal": optimization_goal,
383 | "estimated_audience_size": estimate_data.get("estimate_mau", 0),
384 | "estimate_details": {
385 | "monthly_active_users": estimate_data.get("estimate_mau", 0),
386 | "daily_outcomes_curve": estimate_data.get("estimate_dau", []),
387 | "bid_estimate": estimate_data.get("bid_estimates", {}),
388 | "unsupported_targeting": estimate_data.get("unsupported_targeting", [])
389 | },
390 | "raw_response": fallback_data,
391 | "fallback_endpoint_used": "delivery_estimate"
392 | }
393 | return json.dumps(formatted_response, indent=2)
394 | except Exception as _fallback_exc:
395 | # If fallback also fails, proceed to detailed error handling below
396 | pass
397 |
398 | # Check if this is the specific Business Manager system user permission error
399 | error_str = str(e)
400 | if "100" in error_str and "33" in error_str:
401 | # Try to provide fallback estimation using individual interests if available
402 | interests_found = []
403 | if targeting and "interests" in targeting:
404 | interests_found.extend([interest.get("id") for interest in targeting["interests"] if interest.get("id")])
405 | elif targeting and "flexible_spec" in targeting:
406 | for spec in targeting["flexible_spec"]:
407 | if "interests" in spec:
408 | interests_found.extend([interest.get("id") for interest in spec["interests"] if interest.get("id")])
409 |
410 | if interests_found:
411 | # Attempt to get individual interest data as fallback
412 | try:
413 | fallback_result = await estimate_audience_size(
414 | access_token=access_token,
415 | interest_fbid_list=interests_found
416 | )
417 | fallback_data = json.loads(fallback_result)
418 |
419 | return json.dumps({
420 | "comprehensive_targeting_failed": True,
421 | "error_code": "100-33",
422 | "fallback_used": True,
423 | "details": {
424 | "issue": "reachestimate endpoint returned error - possibly due to targeting parameters or account limitations",
425 | "solution": "Individual interest validation used as fallback - comprehensive targeting may have specific requirements",
426 | "endpoint_used": f"{account_id}/reachestimate"
427 | },
428 | "individual_interest_data": fallback_data,
429 | "note": "Individual interest audience sizes provided as fallback. Comprehensive targeting via reachestimate endpoint failed."
430 | }, indent=2)
431 | except:
432 | pass
433 |
434 | return json.dumps({
435 | "error": "reachestimate endpoint returned error (previously was incorrectly using delivery_estimate)",
436 | "error_code": "100-33",
437 | "details": {
438 | "issue": "The endpoint returned an error, possibly due to targeting parameters or account limitations",
439 | "endpoint_used": f"{account_id}/reachestimate",
440 | "previous_issue": "Code was previously using non-existent delivery_estimate endpoint - now fixed",
441 | "available_alternative": "Use interest_list or interest_fbid_list parameters for individual interest validation"
442 | },
443 | "raw_error": error_str
444 | }, indent=2)
445 | else:
446 | return json.dumps({
447 | "error": f"Failed to get audience estimation from reachestimate endpoint: {str(e)}",
448 | "details": "Check targeting parameters and account permissions",
449 | "error_type": "general_api_error",
450 | "endpoint_used": f"{account_id}/reachestimate"
451 | }, indent=2)
452 |
453 |
454 | @mcp_server.tool()
455 | @meta_api_tool
456 | async def search_behaviors(access_token: Optional[str] = None, limit: int = 50) -> str:
457 | """
458 | Get all available behavior targeting options.
459 |
460 | Args:
461 | access_token: Meta API access token (optional - will use cached token if not provided)
462 | limit: Maximum number of results to return (default: 50)
463 |
464 | Returns:
465 | JSON string containing behavior targeting options with id, name, audience_size bounds, path, and description
466 | """
467 | endpoint = "search"
468 | params = {
469 | "type": "adTargetingCategory",
470 | "class": "behaviors",
471 | "limit": limit
472 | }
473 |
474 | data = await make_api_request(endpoint, access_token, params)
475 |
476 | return json.dumps(data, indent=2)
477 |
478 |
479 | @mcp_server.tool()
480 | @meta_api_tool
481 | async def search_demographics(access_token: Optional[str] = None, demographic_class: str = "demographics", limit: int = 50) -> str:
482 | """
483 | Get demographic targeting options.
484 |
485 | Args:
486 | access_token: Meta API access token (optional - will use cached token if not provided)
487 | demographic_class: Type of demographics to retrieve. Options: 'demographics', 'life_events',
488 | 'industries', 'income', 'family_statuses', 'user_device', 'user_os' (default: 'demographics')
489 | limit: Maximum number of results to return (default: 50)
490 |
491 | Returns:
492 | JSON string containing demographic targeting options with id, name, audience_size bounds, path, and description
493 | """
494 | endpoint = "search"
495 | params = {
496 | "type": "adTargetingCategory",
497 | "class": demographic_class,
498 | "limit": limit
499 | }
500 |
501 | data = await make_api_request(endpoint, access_token, params)
502 |
503 | return json.dumps(data, indent=2)
504 |
505 |
506 | @mcp_server.tool()
507 | @meta_api_tool
508 | async def search_geo_locations(query: str, access_token: Optional[str] = None,
509 | location_types: Optional[List[str]] = None, limit: int = 25) -> str:
510 | """
511 | Search for geographic targeting locations.
512 |
513 | Args:
514 | query: Search term for locations (e.g., "New York", "California", "Japan")
515 | access_token: Meta API access token (optional - will use cached token if not provided)
516 | location_types: Types of locations to search. Options: ['country', 'region', 'city', 'zip',
517 | 'geo_market', 'electoral_district']. If not specified, searches all types.
518 | limit: Maximum number of results to return (default: 25)
519 |
520 | Returns:
521 | JSON string containing location data with key, name, type, and geographic hierarchy information
522 | """
523 | if not query:
524 | return json.dumps({"error": "No search query provided"}, indent=2)
525 |
526 | endpoint = "search"
527 | params = {
528 | "type": "adgeolocation",
529 | "q": query,
530 | "limit": limit
531 | }
532 |
533 | if location_types:
534 | params["location_types"] = json.dumps(location_types)
535 |
536 | data = await make_api_request(endpoint, access_token, params)
537 |
538 | return json.dumps(data, indent=2)
```
--------------------------------------------------------------------------------
/tests/test_estimate_audience_size_e2e.py:
--------------------------------------------------------------------------------
```python
1 | #!/usr/bin/env python3
2 | """
3 | End-to-End Audience Estimation Test for Meta Ads MCP
4 |
5 | This test validates that the new estimate_audience_size function correctly provides
6 | comprehensive audience estimation and backwards compatibility for interest validation
7 | through a pre-authenticated MCP server.
8 |
9 | Usage:
10 | 1. Start the server: uv run python -m meta_ads_mcp --transport streamable-http --port 8080
11 | 2. Run test: uv run python tests/test_estimate_audience_size_e2e.py
12 |
13 | Or with pytest (manual only):
14 | uv run python -m pytest tests/test_estimate_audience_size_e2e.py -v -m e2e
15 |
16 | Test scenarios:
17 | 1. Comprehensive audience estimation with complex targeting
18 | 2. Backwards compatibility with simple interest validation
19 | 3. Error handling for invalid parameters
20 | 4. Different optimization goals
21 | """
22 |
23 | import pytest
24 | import requests
25 | import json
26 | import os
27 | import sys
28 | from typing import Dict, Any, List
29 |
30 | # Load environment variables from .env file
31 | try:
32 | from dotenv import load_dotenv
33 | load_dotenv()
34 | print("✅ Loaded environment variables from .env file")
35 | except ImportError:
36 | print("⚠️ python-dotenv not installed, using system environment variables only")
37 |
38 | @pytest.mark.e2e
39 | @pytest.mark.skip(reason="E2E test - run manually only")
40 | class AudienceEstimationTester:
41 | """Test suite focused on audience estimation functionality"""
42 |
43 | def __init__(self, base_url: str = "http://localhost:8080"):
44 | self.base_url = base_url.rstrip('/')
45 | self.endpoint = f"{self.base_url}/mcp/"
46 | self.request_id = 1
47 |
48 | # Default account ID from workspace rules
49 | self.account_id = "act_701351919139047"
50 |
51 | # Test targeting specifications
52 | self.test_targeting_specs = {
53 | "simple_demographics": {
54 | "age_min": 25,
55 | "age_max": 65,
56 | "geo_locations": {"countries": ["US"]}
57 | },
58 | "demographics_with_interests": {
59 | "age_min": 18,
60 | "age_max": 35,
61 | "geo_locations": {"countries": ["PL"]},
62 | "flexible_spec": [
63 | {"interests": [{"id": "6003371567474"}]} # Business interest
64 | ]
65 | },
66 | "complex_targeting": {
67 | "age_min": 25,
68 | "age_max": 55,
69 | "geo_locations": {"countries": ["US"], "regions": [{"key": "3847"}]}, # California
70 | "flexible_spec": [
71 | {"interests": [{"id": "6003371567474"}, {"id": "6003462346642"}]}, # Business + Technology
72 | {"behaviors": [{"id": "6007101597783"}]} # Business travelers
73 | ]
74 | },
75 | "mobile_app_targeting": {
76 | "age_min": 18,
77 | "age_max": 45,
78 | "geo_locations": {"countries": ["US"]},
79 | "user_device": ["mobile"],
80 | "user_os": ["iOS", "Android"],
81 | "flexible_spec": [
82 | {"interests": [{"id": "6003139266461"}]} # Mobile games
83 | ]
84 | }
85 | }
86 |
87 | # Test interest lists for backwards compatibility
88 | self.test_interests = {
89 | "valid_names": ["Japan", "Basketball", "Technology"],
90 | "mixed_validity": ["Japan", "invalidinterestname12345", "Basketball"],
91 | "valid_fbids": ["6003700426513", "6003397425735"], # Japan, Tennis
92 | "invalid_fbids": ["999999999999", "000000000000"]
93 | }
94 |
95 | def _make_request(self, method: str, params: Dict[str, Any] = None,
96 | headers: Dict[str, str] = None) -> Dict[str, Any]:
97 | """Make a JSON-RPC request to the MCP server"""
98 |
99 | default_headers = {
100 | "Content-Type": "application/json",
101 | "Accept": "application/json, text/event-stream",
102 | "User-Agent": "Audience-Estimation-Test-Client/1.0"
103 | }
104 |
105 | if headers:
106 | default_headers.update(headers)
107 |
108 | payload = {
109 | "jsonrpc": "2.0",
110 | "method": method,
111 | "id": self.request_id
112 | }
113 |
114 | if params:
115 | payload["params"] = params
116 |
117 | try:
118 | response = requests.post(
119 | self.endpoint,
120 | headers=default_headers,
121 | json=payload,
122 | timeout=20 # Increased timeout for delivery estimates
123 | )
124 |
125 | self.request_id += 1
126 |
127 | return {
128 | "status_code": response.status_code,
129 | "headers": dict(response.headers),
130 | "json": response.json() if response.status_code == 200 else None,
131 | "text": response.text,
132 | "success": response.status_code == 200
133 | }
134 |
135 | except requests.exceptions.RequestException as e:
136 | return {
137 | "status_code": 0,
138 | "headers": {},
139 | "json": None,
140 | "text": str(e),
141 | "success": False,
142 | "error": str(e)
143 | }
144 |
145 | def _check_for_errors(self, parsed_content: Dict[str, Any]) -> Dict[str, Any]:
146 | """Properly handle both wrapped and direct error formats"""
147 |
148 | # Check for data wrapper format first
149 | if "data" in parsed_content:
150 | data = parsed_content["data"]
151 |
152 | # Handle case where data is already parsed (dict/list)
153 | if isinstance(data, dict) and 'error' in data:
154 | return {
155 | "has_error": True,
156 | "error_message": data['error'],
157 | "error_details": data.get('details', ''),
158 | "format": "wrapped_dict"
159 | }
160 |
161 | # Handle case where data is a JSON string that needs parsing
162 | if isinstance(data, str):
163 | try:
164 | error_data = json.loads(data)
165 | if 'error' in error_data:
166 | return {
167 | "has_error": True,
168 | "error_message": error_data['error'],
169 | "error_details": error_data.get('details', ''),
170 | "format": "wrapped_json"
171 | }
172 | except json.JSONDecodeError:
173 | # Data field exists but isn't valid JSON
174 | pass
175 |
176 | # Check for direct error format
177 | if 'error' in parsed_content:
178 | return {
179 | "has_error": True,
180 | "error_message": parsed_content['error'],
181 | "error_details": parsed_content.get('details', ''),
182 | "format": "direct"
183 | }
184 |
185 | return {"has_error": False}
186 |
187 | def _extract_data(self, parsed_content: Dict[str, Any]) -> Any:
188 | """Extract successful response data from various wrapper formats"""
189 |
190 | if "data" in parsed_content:
191 | data = parsed_content["data"]
192 |
193 | # Handle case where data is already parsed
194 | if isinstance(data, (list, dict)):
195 | return data
196 |
197 | # Handle case where data is a JSON string
198 | if isinstance(data, str):
199 | try:
200 | return json.loads(data)
201 | except json.JSONDecodeError:
202 | return None
203 |
204 | # Handle direct format (data at top level)
205 | if isinstance(parsed_content, (list, dict)):
206 | return parsed_content
207 |
208 | return None
209 |
210 | def test_pl_only_reachestimate_bounds(self) -> Dict[str, Any]:
211 | """Verify PL-only reachestimate returns expected bounds and midpoint.
212 |
213 | Prerequisite: Start server with fallback disabled so reachestimate is used directly.
214 | Example:
215 | export META_MCP_DISABLE_DELIVERY_FALLBACK=1
216 | uv run python -m meta_ads_mcp --transport streamable-http --port 8080
217 | """
218 | print(f"\n🇵🇱 Testing PL-only reachestimate bounds (fallback disabled)")
219 | local_account_id = "act_3182643988557192"
220 | targeting_spec = {"geo_locations": {"countries": ["PL"]}}
221 | expected_lower = 18600000
222 | expected_upper = 21900000
223 | expected_midpoint = 20250000
224 |
225 | result = self._make_request("tools/call", {
226 | "name": "estimate_audience_size",
227 | "arguments": {
228 | "account_id": local_account_id,
229 | "targeting": targeting_spec,
230 | "optimization_goal": "REACH"
231 | }
232 | })
233 |
234 | if not result["success"]:
235 | print(f" ❌ Request failed: {result.get('text', 'Unknown error')}")
236 | return {"success": False, "error": result.get("text", "Unknown error")}
237 |
238 | response_data = result["json"]["result"]
239 | content = response_data.get("content", [{}])[0].get("text", "")
240 | try:
241 | parsed_content = json.loads(content)
242 | except json.JSONDecodeError:
243 | print(f" ❌ Invalid JSON response")
244 | return {"success": False, "error": "Invalid JSON"}
245 |
246 | error_info = self._check_for_errors(parsed_content)
247 | if error_info["has_error"]:
248 | print(f" ❌ API Error: {error_info['error_message']}")
249 | return {"success": False, "error": error_info["error_message"], "error_format": error_info["format"]}
250 |
251 | if not parsed_content.get("success", False):
252 | print(f" ❌ Response indicates failure but no error message found")
253 | return {"success": False, "error": "Unexpected failure"}
254 |
255 | details = parsed_content.get("estimate_details", {}) or {}
256 | lower = details.get("users_lower_bound")
257 | upper = details.get("users_upper_bound")
258 | midpoint = parsed_content.get("estimated_audience_size")
259 | fallback_used = parsed_content.get("fallback_endpoint_used")
260 |
261 | ok = (
262 | lower == expected_lower and
263 | upper == expected_upper and
264 | midpoint == expected_midpoint and
265 | (fallback_used is None)
266 | )
267 |
268 | if ok:
269 | print(f" ✅ Bounds: {lower:,}–{upper:,}; midpoint: {midpoint:,}")
270 | return {
271 | "success": True,
272 | "users_lower_bound": lower,
273 | "users_upper_bound": upper,
274 | "midpoint": midpoint
275 | }
276 | else:
277 | print(f" ❌ Unexpected values: lower={lower}, upper={upper}, midpoint={midpoint}, fallback={fallback_used}")
278 | return {
279 | "success": False,
280 | "users_lower_bound": lower,
281 | "users_upper_bound": upper,
282 | "midpoint": midpoint,
283 | "fallback_endpoint_used": fallback_used
284 | }
285 |
286 | def test_comprehensive_audience_estimation(self) -> Dict[str, Any]:
287 | """Test comprehensive audience estimation with complex targeting"""
288 |
289 | print(f"\n🎯 Testing Comprehensive Audience Estimation")
290 | results = {}
291 |
292 | for spec_name, targeting_spec in self.test_targeting_specs.items():
293 | print(f" 📊 Testing targeting: '{spec_name}'")
294 |
295 | result = self._make_request("tools/call", {
296 | "name": "estimate_audience_size",
297 | "arguments": {
298 | "account_id": self.account_id,
299 | "targeting": targeting_spec,
300 | "optimization_goal": "REACH"
301 | }
302 | })
303 |
304 | if not result["success"]:
305 | results[spec_name] = {
306 | "success": False,
307 | "error": result.get("text", "Unknown error")
308 | }
309 | print(f" ❌ Failed: {result.get('text', 'Unknown error')}")
310 | continue
311 |
312 | # Parse response
313 | response_data = result["json"]["result"]
314 | content = response_data.get("content", [{}])[0].get("text", "")
315 |
316 | try:
317 | parsed_content = json.loads(content)
318 |
319 | # Check for errors using robust helper method
320 | error_info = self._check_for_errors(parsed_content)
321 | if error_info["has_error"]:
322 | results[spec_name] = {
323 | "success": False,
324 | "error": error_info["error_message"],
325 | "error_format": error_info["format"]
326 | }
327 | print(f" ❌ API Error: {error_info['error_message']}")
328 | continue
329 |
330 | # Check for expected fields in comprehensive estimation
331 | has_success = parsed_content.get("success", False)
332 | has_estimate = "estimated_audience_size" in parsed_content
333 | has_details = "estimate_details" in parsed_content
334 |
335 | results[spec_name] = {
336 | "success": has_success and has_estimate,
337 | "has_estimate": has_estimate,
338 | "has_details": has_details,
339 | "estimated_size": parsed_content.get("estimated_audience_size", 0),
340 | "optimization_goal": parsed_content.get("optimization_goal"),
341 | "raw_response": parsed_content
342 | }
343 |
344 | if has_success and has_estimate:
345 | estimate_size = parsed_content.get("estimated_audience_size", 0)
346 | print(f" ✅ Estimated audience: {estimate_size:,} people")
347 | else:
348 | print(f" ⚠️ Incomplete response: success={has_success}, estimate={has_estimate}")
349 |
350 | except json.JSONDecodeError:
351 | results[spec_name] = {
352 | "success": False,
353 | "error": "Invalid JSON response",
354 | "raw_content": content
355 | }
356 | print(f" ❌ Invalid JSON: {content[:100]}...")
357 |
358 | return results
359 |
360 | def test_backwards_compatibility_interest_validation(self) -> Dict[str, Any]:
361 | """Test backwards compatibility with simple interest validation"""
362 |
363 | print(f"\n🔄 Testing Backwards Compatibility (Interest Validation)")
364 | results = {}
365 |
366 | # Test with interest names
367 | print(f" 📝 Testing interest name validation")
368 |
369 | result = self._make_request("tools/call", {
370 | "name": "estimate_audience_size",
371 | "arguments": {
372 | "interest_list": self.test_interests["mixed_validity"]
373 | }
374 | })
375 |
376 | if result["success"]:
377 | response_data = result["json"]["result"]
378 | content = response_data.get("content", [{}])[0].get("text", "")
379 |
380 | try:
381 | parsed_content = json.loads(content)
382 |
383 | # Check for errors first
384 | error_info = self._check_for_errors(parsed_content)
385 | if error_info["has_error"]:
386 | results["interest_names"] = {
387 | "success": False,
388 | "error": error_info["error_message"],
389 | "error_format": error_info["format"]
390 | }
391 | print(f" ❌ API Error: {error_info['error_message']}")
392 | else:
393 | # Extract data using robust helper method
394 | validations = self._extract_data(parsed_content)
395 | if validations and isinstance(validations, list):
396 | results["interest_names"] = {
397 | "success": True,
398 | "count": len(validations),
399 | "has_valid": any(v.get("valid", False) for v in validations),
400 | "has_invalid": any(not v.get("valid", True) for v in validations),
401 | "validations": validations
402 | }
403 | print(f" ✅ Validated {len(validations)} interests")
404 | for validation in validations:
405 | status = "✅" if validation.get("valid") else "❌"
406 | print(f" {status} {validation.get('name', 'N/A')}")
407 | else:
408 | results["interest_names"] = {"success": False, "error": "No validation data"}
409 | print(f" ❌ No validation data returned")
410 |
411 | except json.JSONDecodeError:
412 | results["interest_names"] = {"success": False, "error": "Invalid JSON"}
413 | print(f" ❌ Invalid JSON response")
414 | else:
415 | results["interest_names"] = {"success": False, "error": result.get("text", "Request failed")}
416 | print(f" ❌ Request failed: {result.get('text', 'Unknown error')}")
417 |
418 | # Test with interest FBIDs
419 | print(f" 🔢 Testing interest FBID validation")
420 |
421 | result = self._make_request("tools/call", {
422 | "name": "estimate_audience_size",
423 | "arguments": {
424 | "interest_fbid_list": self.test_interests["valid_fbids"]
425 | }
426 | })
427 |
428 | if result["success"]:
429 | response_data = result["json"]["result"]
430 | content = response_data.get("content", [{}])[0].get("text", "")
431 |
432 | try:
433 | parsed_content = json.loads(content)
434 |
435 | # Check for errors first
436 | error_info = self._check_for_errors(parsed_content)
437 | if error_info["has_error"]:
438 | results["interest_fbids"] = {
439 | "success": False,
440 | "error": error_info["error_message"],
441 | "error_format": error_info["format"]
442 | }
443 | print(f" ❌ API Error: {error_info['error_message']}")
444 | else:
445 | # Extract data using robust helper method
446 | validations = self._extract_data(parsed_content)
447 | if validations and isinstance(validations, list):
448 | results["interest_fbids"] = {
449 | "success": True,
450 | "count": len(validations),
451 | "all_valid": all(v.get("valid", False) for v in validations),
452 | "validations": validations
453 | }
454 | print(f" ✅ Validated {len(validations)} FBID interests")
455 | for validation in validations:
456 | status = "✅" if validation.get("valid") else "❌"
457 | print(f" {status} FBID: {validation.get('id', 'N/A')}")
458 | else:
459 | results["interest_fbids"] = {"success": False, "error": "No validation data"}
460 | print(f" ❌ No validation data returned")
461 |
462 | except json.JSONDecodeError:
463 | results["interest_fbids"] = {"success": False, "error": "Invalid JSON"}
464 | print(f" ❌ Invalid JSON response")
465 | else:
466 | results["interest_fbids"] = {"success": False, "error": result.get("text", "Request failed")}
467 | print(f" ❌ Request failed: {result.get('text', 'Unknown error')}")
468 |
469 | return results
470 |
471 | def test_different_optimization_goals(self) -> Dict[str, Any]:
472 | """Test audience estimation with different optimization goals"""
473 |
474 | print(f"\n🎯 Testing Different Optimization Goals")
475 | results = {}
476 |
477 | optimization_goals = ["REACH", "LINK_CLICKS", "CONVERSIONS", "APP_INSTALLS"]
478 | base_targeting = self.test_targeting_specs["simple_demographics"]
479 |
480 | for goal in optimization_goals:
481 | print(f" 🎯 Testing optimization goal: '{goal}'")
482 |
483 | result = self._make_request("tools/call", {
484 | "name": "estimate_audience_size",
485 | "arguments": {
486 | "account_id": self.account_id,
487 | "targeting": base_targeting,
488 | "optimization_goal": goal
489 | }
490 | })
491 |
492 | if result["success"]:
493 | response_data = result["json"]["result"]
494 | content = response_data.get("content", [{}])[0].get("text", "")
495 |
496 | try:
497 | parsed_content = json.loads(content)
498 |
499 | # Check for errors first
500 | error_info = self._check_for_errors(parsed_content)
501 | if error_info["has_error"]:
502 | results[goal] = {
503 | "success": False,
504 | "error": error_info["error_message"],
505 | "error_format": error_info["format"]
506 | }
507 | print(f" ❌ {goal}: {error_info['error_message']}")
508 | elif parsed_content.get("success", False):
509 | results[goal] = {
510 | "success": True,
511 | "estimated_size": parsed_content.get("estimated_audience_size", 0),
512 | "goal_used": parsed_content.get("optimization_goal")
513 | }
514 | estimate_size = parsed_content.get("estimated_audience_size", 0)
515 | print(f" ✅ {goal}: {estimate_size:,} people")
516 | else:
517 | results[goal] = {
518 | "success": False,
519 | "error": "Response indicates failure but no error message found"
520 | }
521 | print(f" ❌ {goal}: Response indicates failure but no error message found")
522 |
523 | except json.JSONDecodeError:
524 | results[goal] = {"success": False, "error": "Invalid JSON"}
525 | print(f" ❌ {goal}: Invalid JSON response")
526 | else:
527 | results[goal] = {"success": False, "error": result.get("text", "Request failed")}
528 | print(f" ❌ {goal}: Request failed")
529 |
530 | return results
531 |
532 | def test_error_handling(self) -> Dict[str, Any]:
533 | """Test error handling for invalid parameters"""
534 |
535 | print(f"\n⚠️ Testing Error Handling")
536 | results = {}
537 |
538 | # Test 1: No parameters
539 | print(f" 🚫 Testing with no parameters")
540 | result = self._make_request("tools/call", {
541 | "name": "estimate_audience_size",
542 | "arguments": {}
543 | })
544 |
545 | results["no_params"] = self._parse_error_response(result, "Should require targeting or interest validation")
546 |
547 | # Test 2: Account ID without targeting
548 | print(f" 🚫 Testing account ID without targeting")
549 | result = self._make_request("tools/call", {
550 | "name": "estimate_audience_size",
551 | "arguments": {
552 | "account_id": self.account_id
553 | }
554 | })
555 |
556 | results["no_targeting"] = self._parse_error_response(result, "Should require targeting specification")
557 |
558 | # Test 3: Invalid targeting structure
559 | print(f" 🚫 Testing invalid targeting structure")
560 | result = self._make_request("tools/call", {
561 | "name": "estimate_audience_size",
562 | "arguments": {
563 | "account_id": self.account_id,
564 | "targeting": {"invalid": "structure"}
565 | }
566 | })
567 |
568 | results["invalid_targeting"] = self._parse_error_response(result, "Should handle invalid targeting")
569 |
570 | # Test 4: Missing location in targeting (no geo_locations or custom audiences)
571 | print(f" 🚫 Testing missing location in targeting")
572 | result = self._make_request("tools/call", {
573 | "name": "estimate_audience_size",
574 | "arguments": {
575 | "account_id": self.account_id,
576 | # Interests present but no geo_locations and no custom_audiences
577 | "targeting": {
578 | "age_min": 18,
579 | "age_max": 35,
580 | "flexible_spec": [
581 | {"interests": [{"id": "6003371567474"}]}
582 | ]
583 | }
584 | }
585 | })
586 | results["missing_location"] = self._parse_error_response(result, "Should require a location or custom audience")
587 |
588 | return results
589 |
590 | def _parse_error_response(self, result: Dict[str, Any], description: str) -> Dict[str, Any]:
591 | """Helper to parse and validate error responses"""
592 |
593 | if not result["success"]:
594 | print(f" ✅ {description}: Request failed as expected")
595 | return {"success": True, "error_type": "request_failure"}
596 |
597 | response_data = result["json"]["result"]
598 | content = response_data.get("content", [{}])[0].get("text", "")
599 |
600 | try:
601 | parsed_content = json.loads(content)
602 |
603 | # Use robust error checking helper method
604 | error_info = self._check_for_errors(parsed_content)
605 | if error_info["has_error"]:
606 | print(f" ✅ {description}: {error_info['error_message']}")
607 | return {
608 | "success": True,
609 | "error_message": error_info["error_message"],
610 | "error_format": error_info["format"]
611 | }
612 | else:
613 | print(f" ❌ {description}: No error returned when expected")
614 | return {"success": False, "unexpected_success": True}
615 |
616 | except json.JSONDecodeError:
617 | print(f" ❌ {description}: Invalid JSON response")
618 | return {"success": False, "error": "Invalid JSON"}
619 |
620 | def run_audience_estimation_tests(self) -> bool:
621 | """Run comprehensive audience estimation tests"""
622 |
623 | print("🚀 Meta Ads Audience Estimation End-to-End Test Suite")
624 | print("="*70)
625 |
626 | # Check server availability
627 | try:
628 | response = requests.get(f"{self.base_url}/", timeout=5)
629 | server_running = response.status_code in [200, 404]
630 | except:
631 | server_running = False
632 |
633 | if not server_running:
634 | print("❌ Server is not running at", self.base_url)
635 | print(" Please start the server with:")
636 | print(" python3 -m meta_ads_mcp --transport streamable-http --port 8080")
637 | return False
638 |
639 | print("✅ Server is running")
640 | print("🔐 Using implicit authentication from server")
641 | print(f"🏢 Using account ID: {self.account_id}")
642 |
643 | # Test 0: PL-only reachestimate bounds verification
644 | print("\n" + "="*70)
645 | print("📋 PHASE 0: PL-only reachestimate bounds verification (fallback disabled)")
646 | print("="*70)
647 | pl_only_results = self.test_pl_only_reachestimate_bounds()
648 | pl_only_success = pl_only_results.get("success", False)
649 |
650 | # Test 1: Comprehensive Audience Estimation
651 | print("\n" + "="*70)
652 | print("📋 PHASE 1: Testing Comprehensive Audience Estimation")
653 | print("="*70)
654 |
655 | comprehensive_results = self.test_comprehensive_audience_estimation()
656 | comprehensive_success = any(
657 | result.get("success") and result.get("estimated_size", 0) > 0
658 | for result in comprehensive_results.values()
659 | )
660 |
661 | # Test 2: Backwards Compatibility
662 | print("\n" + "="*70)
663 | print("📋 PHASE 2: Testing Backwards Compatibility")
664 | print("="*70)
665 |
666 | compat_results = self.test_backwards_compatibility_interest_validation()
667 | compat_success = (
668 | compat_results.get("interest_names", {}).get("success", False) and
669 | compat_results.get("interest_fbids", {}).get("success", False)
670 | )
671 |
672 | # Test 3: Different Optimization Goals
673 | print("\n" + "="*70)
674 | print("📋 PHASE 3: Testing Different Optimization Goals")
675 | print("="*70)
676 |
677 | goals_results = self.test_different_optimization_goals()
678 | goals_success = any(
679 | result.get("success") and result.get("estimated_size", 0) > 0
680 | for result in goals_results.values()
681 | )
682 |
683 | # Test 4: Error Handling
684 | print("\n" + "="*70)
685 | print("📋 PHASE 4: Testing Error Handling")
686 | print("="*70)
687 |
688 | error_results = self.test_error_handling()
689 | error_success = all(
690 | result.get("success", False) for result in error_results.values()
691 | )
692 |
693 | # Final assessment
694 | print("\n" + "="*70)
695 | print("📊 FINAL RESULTS")
696 | print("="*70)
697 |
698 | all_tests = [
699 | ("PL-only Reachestimate Bounds", pl_only_success),
700 | ("Comprehensive Estimation", comprehensive_success),
701 | ("Backwards Compatibility", compat_success),
702 | ("Optimization Goals", goals_success),
703 | ("Error Handling", error_success)
704 | ]
705 |
706 | passed_tests = sum(1 for _, success in all_tests if success)
707 | total_tests = len(all_tests)
708 |
709 | for test_name, success in all_tests:
710 | status = "✅ PASSED" if success else "❌ FAILED"
711 | print(f" • {test_name}: {status}")
712 |
713 | overall_success = passed_tests >= 3 # At least 3 out of 4 tests should pass
714 |
715 | if overall_success:
716 | print(f"\n✅ Audience estimation tests: SUCCESS ({passed_tests}/{total_tests} passed)")
717 | print(" • Comprehensive audience estimation is working")
718 | print(" • Backwards compatibility is maintained")
719 | print(" • Meta reachestimate API integration is functional")
720 | return True
721 | else:
722 | print(f"\n❌ Audience estimation tests: FAILED ({passed_tests}/{total_tests} passed)")
723 | print(" • Some audience estimation features are not working properly")
724 | return False
725 |
726 |
727 | def main():
728 | """Main test execution"""
729 | tester = AudienceEstimationTester()
730 | success = tester.run_audience_estimation_tests()
731 |
732 | if success:
733 | print("\n🎉 All audience estimation tests passed!")
734 | else:
735 | print("\n⚠️ Some audience estimation tests failed - see details above")
736 |
737 | sys.exit(0 if success else 1)
738 |
739 |
740 | if __name__ == "__main__":
741 | main()
```
--------------------------------------------------------------------------------
/tests/test_dsa_beneficiary.py:
--------------------------------------------------------------------------------
```python
1 | #!/usr/bin/env python3
2 | """
3 | Unit tests for DSA (Digital Services Act) beneficiary functionality in Meta Ads MCP.
4 |
5 | This module tests the implementation of DSA beneficiary field support for ad set creation,
6 | including detection of DSA requirements, parameter validation, and error handling.
7 | """
8 |
9 | import pytest
10 | import json
11 | from unittest.mock import AsyncMock, patch, MagicMock
12 |
13 | from meta_ads_mcp.core.adsets import create_adset, get_adset_details
14 | from meta_ads_mcp.core.accounts import get_account_info
15 |
16 |
17 | class TestDSABeneficiaryDetection:
18 | """Test cases for detecting DSA beneficiary requirements"""
19 |
20 | @pytest.mark.asyncio
21 | async def test_dsa_requirement_detection_business_account(self):
22 | """Test DSA requirement detection for European business accounts"""
23 | mock_account_response = {
24 | "id": "act_701351919139047",
25 | "name": "Test European Business Account",
26 | "account_status": 1,
27 | "business_country_code": "DE", # Germany - DSA compliant
28 | "business_city": "Berlin",
29 | "currency": "EUR"
30 | }
31 |
32 | with patch('meta_ads_mcp.core.accounts.make_api_request', new_callable=AsyncMock) as mock_api:
33 | with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
34 | mock_auth.return_value = "test_access_token"
35 | mock_api.return_value = mock_account_response
36 |
37 | result = await get_account_info(account_id="act_701351919139047")
38 |
39 | # Handle new return format (dictionary instead of JSON string)
40 | if isinstance(result, dict):
41 | result_data = result
42 | else:
43 | result_data = json.loads(result)
44 |
45 | # Verify account info is retrieved
46 | assert result_data["id"] == "act_701351919139047"
47 | assert result_data["business_country_code"] == "DE"
48 |
49 | # Verify DSA requirement detection
50 | assert "dsa_required" in result_data or "business_country_code" in result_data
51 |
52 | @pytest.mark.asyncio
53 | async def test_dsa_requirement_detection_non_dsa_region(self):
54 | """Test detection for non-DSA compliant regions"""
55 | mock_account_response = {
56 | "id": "act_123456789",
57 | "name": "Test US Account",
58 | "account_status": 1,
59 | "business_country_code": "US", # US - not DSA compliant
60 | "business_city": "New York",
61 | "currency": "USD"
62 | }
63 |
64 | with patch('meta_ads_mcp.core.accounts.make_api_request', new_callable=AsyncMock) as mock_api:
65 | with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
66 | mock_auth.return_value = "test_access_token"
67 | mock_api.return_value = mock_account_response
68 |
69 | result = await get_account_info(account_id="act_123456789")
70 |
71 | # Handle new return format (dictionary instead of JSON string)
72 | if isinstance(result, dict):
73 | result_data = result
74 | else:
75 | result_data = json.loads(result)
76 |
77 | # Verify no DSA requirement for US accounts
78 | assert result_data["business_country_code"] == "US"
79 |
80 | @pytest.mark.asyncio
81 | async def test_dsa_requirement_detection_error_handling(self):
82 | """Test error handling when account info cannot be retrieved"""
83 | with patch('meta_ads_mcp.core.accounts.make_api_request', new_callable=AsyncMock) as mock_api:
84 | with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
85 | mock_auth.return_value = "test_access_token"
86 | mock_api.side_effect = Exception("API Error")
87 |
88 | result = await get_account_info(account_id="act_invalid")
89 |
90 | # Handle new return format (dictionary instead of JSON string)
91 | if isinstance(result, dict):
92 | result_data = result
93 | else:
94 | result_data = json.loads(result)
95 |
96 | # Verify error is properly handled
97 | assert "error" in result_data
98 |
99 | @pytest.mark.asyncio
100 | async def test_account_info_requires_account_id(self):
101 | """Test that get_account_info requires an account_id parameter"""
102 |
103 | with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
104 | mock_auth.return_value = "test_access_token"
105 |
106 | # Test without account_id parameter
107 | result = await get_account_info(account_id=None)
108 |
109 | # Handle new return format (dictionary instead of JSON string)
110 | if isinstance(result, dict):
111 | result_data = result
112 | else:
113 | result_data = json.loads(result)
114 |
115 | # Verify error message for missing account_id
116 | assert "error" in result_data
117 | assert "Account ID is required" in result_data["error"]["message"]
118 | assert "Please specify an account_id parameter" in result_data["error"]["details"]
119 | assert "example" in result_data["error"]
120 |
121 | @pytest.mark.asyncio
122 | async def test_account_info_inaccessible_account_error(self):
123 | """Test that get_account_info provides helpful error for inaccessible accounts"""
124 |
125 | # Mock permission error for direct account access (first API call)
126 | mock_permission_error = {
127 | "error": {
128 | "message": "Insufficient access privileges",
129 | "type": "OAuthException",
130 | "code": 200
131 | }
132 | }
133 |
134 | # Mock accessible accounts response (second API call)
135 | mock_accessible_accounts = {
136 | "data": [
137 | {"id": "act_123", "name": "Test Account 1"},
138 | {"id": "act_456", "name": "Test Account 2"}
139 | ]
140 | }
141 |
142 | with patch('meta_ads_mcp.core.accounts.make_api_request', new_callable=AsyncMock) as mock_api:
143 | with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
144 | mock_auth.return_value = "test_access_token"
145 | # First call returns permission error, second call returns accessible accounts
146 | mock_api.side_effect = [mock_permission_error, mock_accessible_accounts]
147 |
148 | result = await get_account_info(account_id="act_inaccessible")
149 |
150 | # Handle new return format (dictionary instead of JSON string)
151 | if isinstance(result, dict):
152 | result_data = result
153 | else:
154 | result_data = json.loads(result)
155 |
156 | # Verify helpful error message for inaccessible account
157 | assert "error" in result_data
158 | assert "not accessible to your user account" in result_data["error"]["message"]
159 | assert "accessible_accounts" in result_data["error"]
160 | assert "suggestion" in result_data["error"]
161 | assert len(result_data["error"]["accessible_accounts"]) == 2
162 |
163 |
164 | class TestDSABeneficiaryParameter:
165 | """Test cases for DSA beneficiary parameter support"""
166 |
167 | @pytest.mark.asyncio
168 | async def test_create_adset_with_dsa_beneficiary_success(self):
169 | """Test successful ad set creation with DSA beneficiary parameter"""
170 | mock_response = {
171 | "id": "23842588888640185",
172 | "name": "Test Ad Set with DSA",
173 | "status": "PAUSED",
174 | "dsa_beneficiary": "Test Organization GmbH"
175 | }
176 |
177 | with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
178 | with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
179 | mock_auth.return_value = "test_access_token"
180 | mock_api.return_value = mock_response
181 |
182 | result = await create_adset(
183 | account_id="act_701351919139047",
184 | campaign_id="23842588888640184",
185 | name="Test Ad Set with DSA",
186 | optimization_goal="LINK_CLICKS",
187 | billing_event="IMPRESSIONS",
188 | dsa_beneficiary="Test Organization GmbH"
189 | )
190 |
191 | # Verify the API was called with DSA beneficiary parameter
192 | mock_api.assert_called_once()
193 | call_args = mock_api.call_args
194 | assert "dsa_beneficiary" in str(call_args)
195 |
196 | # Verify response contains ad set ID
197 | result_data = json.loads(result)
198 | assert "id" in result_data
199 |
200 | @pytest.mark.asyncio
201 | async def test_create_adset_with_dsa_beneficiary_validation_error(self):
202 | """Test error handling when DSA beneficiary parameter is invalid"""
203 | mock_error_response = {
204 | "error": {
205 | "message": "DSA beneficiary required for European compliance",
206 | "type": "OAuthException",
207 | "code": 100
208 | }
209 | }
210 |
211 | with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
212 | with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
213 | mock_auth.return_value = "test_access_token"
214 | mock_api.side_effect = Exception("DSA beneficiary required for European compliance")
215 |
216 | result = await create_adset(
217 | account_id="act_701351919139047",
218 | campaign_id="23842588888640184",
219 | name="Test Ad Set",
220 | optimization_goal="LINK_CLICKS",
221 | billing_event="IMPRESSIONS"
222 | # No DSA beneficiary provided
223 | )
224 |
225 | # Verify error message is clear and actionable
226 | result_data = json.loads(result)
227 |
228 | # Handle response wrapped in 'data' field by meta_api_tool decorator
229 | if "data" in result_data:
230 | actual_data = json.loads(result_data["data"])
231 | else:
232 | actual_data = result_data
233 |
234 | assert "DSA beneficiary required" in actual_data.get("error", "")
235 |
236 | @pytest.mark.asyncio
237 | async def test_create_adset_without_dsa_beneficiary_dsa_required(self):
238 | """Test error when DSA beneficiary is required but not provided"""
239 | with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
240 | with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
241 | mock_auth.return_value = "test_access_token"
242 | mock_api.side_effect = Exception("Enter the person or organization that benefits from ads in this ad set")
243 |
244 | result = await create_adset(
245 | account_id="act_701351919139047",
246 | campaign_id="23842588888640184",
247 | name="Test Ad Set",
248 | optimization_goal="LINK_CLICKS",
249 | billing_event="IMPRESSIONS"
250 | # No DSA beneficiary provided
251 | )
252 |
253 | # Verify error message is clear and actionable
254 | result_data = json.loads(result)
255 |
256 | # Handle response wrapped in 'data' field by meta_api_tool decorator
257 | if "data" in result_data:
258 | actual_data = json.loads(result_data["data"])
259 | else:
260 | actual_data = result_data
261 |
262 | assert "benefits from ads" in actual_data.get("error", "")
263 |
264 | @pytest.mark.asyncio
265 | async def test_create_adset_dsa_beneficiary_in_targeting(self):
266 | """Test that DSA beneficiary is not added to targeting spec"""
267 | with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
268 | with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
269 | mock_auth.return_value = "test_access_token"
270 | mock_api.return_value = {"id": "23842588888640185"}
271 |
272 | result = await create_adset(
273 | account_id="act_701351919139047",
274 | campaign_id="23842588888640184",
275 | name="Test Ad Set",
276 | optimization_goal="LINK_CLICKS",
277 | billing_event="IMPRESSIONS",
278 | targeting={"geo_locations": {"countries": ["DE"]}},
279 | dsa_beneficiary="Test Organization GmbH"
280 | )
281 |
282 | # Verify the API was called
283 | mock_api.assert_called_once()
284 | call_args = mock_api.call_args
285 |
286 | # Verify DSA beneficiary is sent as separate parameter, not in targeting
287 | call_str = str(call_args)
288 | assert "dsa_beneficiary" in call_str
289 | assert "Test Organization GmbH" in call_str
290 |
291 | @pytest.mark.asyncio
292 | async def test_create_adset_dsa_beneficiary_parameter_formats(self):
293 | """Test different formats for DSA beneficiary parameter"""
294 | test_cases = [
295 | "Simple Organization",
296 | "Organization with Special Chars: GmbH & Co. KG",
297 | "Organization with Numbers: Test123 Inc.",
298 | "Very Long Organization Name That Exceeds Normal Limits But Should Still Work"
299 | ]
300 |
301 | for beneficiary_name in test_cases:
302 | with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
303 | with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
304 | mock_auth.return_value = "test_access_token"
305 | mock_api.return_value = {"id": "23842588888640185"}
306 |
307 | result = await create_adset(
308 | account_id="act_701351919139047",
309 | campaign_id="23842588888640184",
310 | name="Test Ad Set",
311 | optimization_goal="LINK_CLICKS",
312 | billing_event="IMPRESSIONS",
313 | dsa_beneficiary=beneficiary_name
314 | )
315 |
316 | # Verify the API was called with the beneficiary name
317 | mock_api.assert_called_once()
318 | call_args = mock_api.call_args
319 | assert beneficiary_name in str(call_args)
320 |
321 |
322 | class TestDSAPermissionHandling:
323 | """Test cases for permission-related DSA beneficiary issues"""
324 |
325 | @pytest.mark.asyncio
326 | async def test_dsa_beneficiary_missing_business_management_permission(self):
327 | """Test error handling when business_management permissions are missing"""
328 | with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
329 | with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
330 | mock_auth.return_value = "test_access_token"
331 | mock_api.side_effect = Exception("Permission denied: business_management permission required")
332 |
333 | result = await create_adset(
334 | account_id="act_701351919139047",
335 | campaign_id="23842588888640184",
336 | name="Test Ad Set",
337 | optimization_goal="LINK_CLICKS",
338 | billing_event="IMPRESSIONS",
339 | dsa_beneficiary="Test Organization GmbH"
340 | )
341 |
342 | # Verify permission error is handled
343 | result_data = json.loads(result)
344 |
345 | # Handle response wrapped in 'data' field by meta_api_tool decorator
346 | if "data" in result_data:
347 | actual_data = json.loads(result_data["data"])
348 | else:
349 | actual_data = result_data
350 |
351 | assert "permission" in actual_data.get("error", "").lower()
352 |
353 | @pytest.mark.asyncio
354 | async def test_dsa_beneficiary_api_limitation_handling(self):
355 | """Test handling when API doesn't support dsa_beneficiary parameter"""
356 | with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
357 | with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
358 | mock_auth.return_value = "test_access_token"
359 | mock_api.side_effect = Exception("Parameter dsa_beneficiary is not supported")
360 |
361 | result = await create_adset(
362 | account_id="act_701351919139047",
363 | campaign_id="23842588888640184",
364 | name="Test Ad Set",
365 | optimization_goal="LINK_CLICKS",
366 | billing_event="IMPRESSIONS",
367 | dsa_beneficiary="Test Organization GmbH"
368 | )
369 |
370 | # Verify API limitation error is handled
371 | result_data = json.loads(result)
372 |
373 | # Handle response wrapped in 'data' field by meta_api_tool decorator
374 | if "data" in result_data:
375 | actual_data = json.loads(result_data["data"])
376 | else:
377 | actual_data = result_data
378 |
379 | assert "not supported" in actual_data.get("error", "").lower()
380 |
381 |
382 | class TestDSARegionalCompliance:
383 | """Test cases for regional DSA compliance"""
384 |
385 | @pytest.mark.asyncio
386 | async def test_dsa_compliance_european_regions(self):
387 | """Test DSA compliance for European regions"""
388 | european_countries = ["DE", "FR", "IT", "ES", "NL", "BE", "AT", "IE", "DK", "SE", "FI", "NO"]
389 |
390 | for country_code in european_countries:
391 | mock_account_response = {
392 | "id": f"act_{country_code.lower()}",
393 | "name": f"Test {country_code} Account",
394 | "account_status": 1,
395 | "business_country_code": country_code,
396 | "currency": "EUR"
397 | }
398 |
399 | with patch('meta_ads_mcp.core.accounts.make_api_request', new_callable=AsyncMock) as mock_api:
400 | with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
401 | mock_auth.return_value = "test_access_token"
402 | mock_api.return_value = mock_account_response
403 |
404 | result = await get_account_info(account_id=f"act_{country_code.lower()}")
405 | result_data = json.loads(result)
406 |
407 | # Verify European countries are detected as DSA compliant
408 | assert result_data["business_country_code"] == country_code
409 |
410 | @pytest.mark.asyncio
411 | async def test_dsa_compliance_non_european_regions(self):
412 | """Test DSA compliance for non-European regions"""
413 | non_european_countries = ["US", "CA", "AU", "JP", "BR", "IN"]
414 |
415 | for country_code in non_european_countries:
416 | mock_account_response = {
417 | "id": f"act_{country_code.lower()}",
418 | "name": f"Test {country_code} Account",
419 | "account_status": 1,
420 | "business_country_code": country_code,
421 | "currency": "USD"
422 | }
423 |
424 | with patch('meta_ads_mcp.core.accounts.make_api_request', new_callable=AsyncMock) as mock_api:
425 | with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
426 | mock_auth.return_value = "test_access_token"
427 | mock_api.return_value = mock_account_response
428 |
429 | result = await get_account_info(account_id=f"act_{country_code.lower()}")
430 | result_data = json.loads(result)
431 |
432 | # Verify non-European countries are not DSA compliant
433 | assert result_data["business_country_code"] == country_code
434 |
435 | @pytest.mark.asyncio
436 | async def test_dsa_beneficiary_validation_by_region(self):
437 | """Test DSA beneficiary validation based on region"""
438 | # Test European account (should require DSA beneficiary)
439 | european_mock_response = {
440 | "id": "act_de",
441 | "name": "German Account",
442 | "business_country_code": "DE"
443 | }
444 |
445 | # Test US account (should not require DSA beneficiary)
446 | us_mock_response = {
447 | "id": "act_us",
448 | "name": "US Account",
449 | "business_country_code": "US"
450 | }
451 |
452 | # Test European account
453 | with patch('meta_ads_mcp.core.accounts.make_api_request', new_callable=AsyncMock) as mock_api:
454 | with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
455 | mock_auth.return_value = "test_access_token"
456 | mock_api.return_value = european_mock_response
457 |
458 | result = await get_account_info(account_id="act_de")
459 | result_data = json.loads(result)
460 |
461 | assert result_data["business_country_code"] == "DE"
462 |
463 | # Test US account
464 | with patch('meta_ads_mcp.core.accounts.make_api_request', new_callable=AsyncMock) as mock_api:
465 | with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
466 | mock_auth.return_value = "test_access_token"
467 | mock_api.return_value = us_mock_response
468 |
469 | result = await get_account_info(account_id="act_us")
470 | result_data = json.loads(result)
471 |
472 | assert result_data["business_country_code"] == "US"
473 |
474 |
475 | class TestDSAErrorHandling:
476 | """Test cases for comprehensive DSA error handling"""
477 |
478 | @pytest.mark.asyncio
479 | async def test_dsa_beneficiary_clear_error_message(self):
480 | """Test that DSA-related errors provide clear, actionable messages"""
481 | error_scenarios = [
482 | ("DSA beneficiary required for European compliance", "DSA beneficiary"),
483 | ("Enter the person or organization that benefits from ads", "benefits from ads"),
484 | ("Permission denied: business_management required", "permission"),
485 | ("Parameter dsa_beneficiary is not supported", "not supported")
486 | ]
487 |
488 | for error_message, expected_keyword in error_scenarios:
489 | with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
490 | with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
491 | mock_auth.return_value = "test_access_token"
492 | mock_api.side_effect = Exception(error_message)
493 |
494 | result = await create_adset(
495 | account_id="act_701351919139047",
496 | campaign_id="23842588888640184",
497 | name="Test Ad Set",
498 | optimization_goal="LINK_CLICKS",
499 | billing_event="IMPRESSIONS"
500 | )
501 |
502 | # Verify error message contains expected keyword
503 | result_data = json.loads(result)
504 |
505 | # Handle response wrapped in 'data' field by meta_api_tool decorator
506 | if "data" in result_data:
507 | actual_data = json.loads(result_data["data"])
508 | else:
509 | actual_data = result_data
510 |
511 | assert expected_keyword.lower() in actual_data.get("error", "").lower()
512 |
513 | @pytest.mark.asyncio
514 | async def test_dsa_beneficiary_fallback_behavior(self):
515 | """Test fallback behavior for unexpected DSA-related errors"""
516 | with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
517 | with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
518 | mock_auth.return_value = "test_access_token"
519 | mock_api.side_effect = Exception("Unexpected DSA-related error")
520 |
521 | result = await create_adset(
522 | account_id="act_701351919139047",
523 | campaign_id="23842588888640184",
524 | name="Test Ad Set",
525 | optimization_goal="LINK_CLICKS",
526 | billing_event="IMPRESSIONS"
527 | )
528 |
529 | # Verify fallback error handling
530 | result_data = json.loads(result)
531 |
532 | # Handle response wrapped in 'data' field by meta_api_tool decorator
533 | if "data" in result_data:
534 | actual_data = json.loads(result_data["data"])
535 | else:
536 | actual_data = result_data
537 |
538 | assert "error" in actual_data
539 |
540 |
541 | class TestDSABeneficiaryRetrieval:
542 | """Test cases for retrieving DSA beneficiary information from ad sets"""
543 |
544 | @pytest.mark.asyncio
545 | async def test_get_adset_details_with_dsa_beneficiary(self):
546 | """Test retrieving ad set details that include DSA beneficiary field"""
547 | mock_adset_response = {
548 | "id": "120229746629010183",
549 | "name": "Test Ad Set with DSA",
550 | "campaign_id": "120229656904980183",
551 | "status": "PAUSED",
552 | "daily_budget": "1000",
553 | "targeting": {
554 | "geo_locations": {"countries": ["US"]},
555 | "age_min": 25,
556 | "age_max": 65
557 | },
558 | "bid_amount": 200,
559 | "optimization_goal": "LINK_CLICKS",
560 | "billing_event": "IMPRESSIONS",
561 | "dsa_beneficiary": "Test Organization Inc"
562 | }
563 |
564 | with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
565 | with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
566 | mock_auth.return_value = "test_access_token"
567 | mock_api.return_value = mock_adset_response
568 |
569 | result = await get_adset_details(adset_id="120229746629010183")
570 | result_data = json.loads(result)
571 |
572 | # Verify DSA beneficiary field is present and correct
573 | assert "dsa_beneficiary" in result_data
574 | assert result_data["dsa_beneficiary"] == "Test Organization Inc"
575 | assert result_data["id"] == "120229746629010183"
576 |
577 | @pytest.mark.asyncio
578 | async def test_get_adset_details_without_dsa_beneficiary(self):
579 | """Test retrieving ad set details that don't have DSA beneficiary field"""
580 | mock_adset_response = {
581 | "id": "120229746624860183",
582 | "name": "Test Ad Set without DSA",
583 | "campaign_id": "120229656904980183",
584 | "status": "PAUSED",
585 | "daily_budget": "1000",
586 | "targeting": {
587 | "geo_locations": {"countries": ["US"]},
588 | "age_min": 25,
589 | "age_max": 65
590 | },
591 | "bid_amount": 200,
592 | "optimization_goal": "LINK_CLICKS",
593 | "billing_event": "IMPRESSIONS"
594 | # No dsa_beneficiary field
595 | }
596 |
597 | with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
598 | with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
599 | mock_auth.return_value = "test_access_token"
600 | mock_api.return_value = mock_adset_response
601 |
602 | result = await get_adset_details(adset_id="120229746624860183")
603 | result_data = json.loads(result)
604 |
605 | # Verify ad set details are retrieved correctly
606 | assert result_data["id"] == "120229746624860183"
607 | assert "dsa_beneficiary" not in result_data # Should not be present
608 |
609 | @pytest.mark.asyncio
610 | async def test_get_adset_details_empty_dsa_beneficiary(self):
611 | """Test retrieving ad set details with empty DSA beneficiary field"""
612 | mock_adset_response = {
613 | "id": "120229746629010183",
614 | "name": "Test Ad Set with Empty DSA",
615 | "campaign_id": "120229656904980183",
616 | "status": "PAUSED",
617 | "daily_budget": "1000",
618 | "targeting": {
619 | "geo_locations": {"countries": ["US"]}
620 | },
621 | "bid_amount": 200,
622 | "optimization_goal": "LINK_CLICKS",
623 | "billing_event": "IMPRESSIONS",
624 | "dsa_beneficiary": "" # Empty string
625 | }
626 |
627 | with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
628 | with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
629 | mock_auth.return_value = "test_access_token"
630 | mock_api.return_value = mock_adset_response
631 |
632 | result = await get_adset_details(adset_id="120229746629010183")
633 | result_data = json.loads(result)
634 |
635 | # Verify empty DSA beneficiary field is handled correctly
636 | assert "dsa_beneficiary" in result_data
637 | assert result_data["dsa_beneficiary"] == ""
638 |
639 | @pytest.mark.asyncio
640 | async def test_get_adset_details_dsa_beneficiary_field_requested(self):
641 | """Test that the API request includes dsa_beneficiary in the fields parameter"""
642 | with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
643 | with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
644 | mock_auth.return_value = "test_access_token"
645 | mock_api.return_value = {"id": "120229746629010183"}
646 |
647 | result = await get_adset_details(adset_id="120229746629010183")
648 |
649 | # Verify the API was called with dsa_beneficiary in fields
650 | mock_api.assert_called_once()
651 | call_args = mock_api.call_args
652 | assert "dsa_beneficiary" in str(call_args)
653 |
654 | @pytest.mark.asyncio
655 | async def test_get_adset_details_error_handling(self):
656 | """Test error handling when retrieving ad set details fails"""
657 | with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
658 | with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
659 | mock_auth.return_value = "test_access_token"
660 | mock_api.side_effect = Exception("Ad set not found")
661 |
662 | result = await get_adset_details(adset_id="invalid_adset_id")
663 |
664 | # Handle response format - could be dict or JSON string
665 | if isinstance(result, dict):
666 | result_data = result
667 | else:
668 | result_data = json.loads(result)
669 |
670 | # Verify error is properly handled
671 | assert "error" in result_data
```
--------------------------------------------------------------------------------
/tests/test_budget_update_e2e.py:
--------------------------------------------------------------------------------
```python
1 | #!/usr/bin/env python3
2 | """
3 | End-to-End Budget Update Test for Meta Ads MCP
4 |
5 | This test validates that the budget update functionality correctly updates
6 | ad set budgets through the Meta Ads API through a pre-authenticated MCP server.
7 |
8 | Test functions:
9 | - update_adset (with daily_budget parameter)
10 | - update_adset (with lifetime_budget parameter)
11 | - update_adset (with both budget types)
12 | - Error handling for invalid budgets
13 | - Budget update with other parameters
14 | """
15 |
16 | import requests
17 | import json
18 | import os
19 | import sys
20 | import time
21 | from typing import Dict, Any, List
22 |
23 | # Load environment variables from .env file
24 | try:
25 | from dotenv import load_dotenv
26 | load_dotenv()
27 | print("✅ Loaded environment variables from .env file")
28 | except ImportError:
29 | print("⚠️ python-dotenv not installed, using system environment variables only")
30 |
31 | class BudgetUpdateTester:
32 | """Test suite focused on budget update functionality"""
33 |
34 | def __init__(self, base_url: str = "http://localhost:8080"):
35 | self.base_url = base_url.rstrip('/')
36 | self.endpoint = f"{self.base_url}/mcp/"
37 | self.request_id = 1
38 |
39 | # Test data for validation
40 | self.test_budgets = {
41 | "daily_budgets": ["5000", "10000", "25000"], # $50, $100, $250
42 | "lifetime_budgets": ["50000", "100000", "250000"], # $500, $1000, $2500
43 | "invalid_budgets": ["-1000", "0", "invalid_budget", "999999999999"]
44 | }
45 |
46 | # Test ad set IDs specifically created for budget testing
47 | self.test_adset_ids = [
48 | "120229734413930183",
49 | "120229734413930183",
50 | "120229734413930183",
51 | "120229734413930183",
52 | "120229734413930183",
53 | "120229734413930183"
54 | ]
55 |
56 | # Rate limiting tracking
57 | self.rate_limit_hit = False
58 | self.last_rate_limit_time = 0
59 |
60 | def _wait_for_rate_limit(self, error_msg: str) -> bool:
61 | """Wait if we hit rate limiting, return True if we should retry"""
62 | if "rate limit" in error_msg.lower() or "too many changes" in error_msg.lower():
63 | if not self.rate_limit_hit:
64 | print(f" ⏳ Rate limit hit! Waiting 1 hour before continuing...")
65 | print(f" • Meta Ads API allows only 4 budget changes per hour")
66 | print(f" • You can manually continue by pressing Enter when ready")
67 | self.rate_limit_hit = True
68 | self.last_rate_limit_time = time.time()
69 |
70 | # Wait for user input or 1 hour
71 | try:
72 | input(" Press Enter when ready to continue (or wait 1 hour)...")
73 | print(" ✅ Continuing with tests...")
74 | return True
75 | except KeyboardInterrupt:
76 | print(" ❌ Test interrupted by user")
77 | return False
78 | else:
79 | print(f" ⏳ Still rate limited, waiting...")
80 | return False
81 | return False
82 |
83 | def _make_request(self, method: str, params: Dict[str, Any] = None,
84 | headers: Dict[str, str] = None) -> Dict[str, Any]:
85 | """Make a JSON-RPC request to the MCP server"""
86 |
87 | default_headers = {
88 | "Content-Type": "application/json",
89 | "Accept": "application/json, text/event-stream",
90 | "User-Agent": "Budget-Update-Test-Client/1.0"
91 | }
92 |
93 | if headers:
94 | default_headers.update(headers)
95 |
96 | payload = {
97 | "jsonrpc": "2.0",
98 | "method": method,
99 | "id": self.request_id
100 | }
101 |
102 | if params:
103 | payload["params"] = params
104 |
105 | try:
106 | response = requests.post(
107 | self.endpoint,
108 | headers=default_headers,
109 | json=payload,
110 | timeout=15
111 | )
112 |
113 | self.request_id += 1
114 |
115 | return {
116 | "status_code": response.status_code,
117 | "headers": dict(response.headers),
118 | "json": response.json() if response.status_code == 200 else None,
119 | "text": response.text,
120 | "success": response.status_code == 200
121 | }
122 |
123 | except requests.exceptions.RequestException as e:
124 | return {
125 | "status_code": 0,
126 | "headers": {},
127 | "json": None,
128 | "text": str(e),
129 | "success": False,
130 | "error": str(e)
131 | }
132 |
133 | def test_daily_budget_update(self) -> Dict[str, Any]:
134 | """Test daily budget update functionality"""
135 |
136 | print(f"\n💰 Testing daily budget update function")
137 | results = {}
138 |
139 | for budget in self.test_budgets["daily_budgets"]:
140 | print(f" 💰 Updating daily budget to: ${int(budget)/100:.2f}")
141 |
142 | # Retry logic for rate limiting
143 | max_retries = 3
144 | for attempt in range(max_retries):
145 | result = self._make_request("tools/call", {
146 | "name": "update_adset",
147 | "arguments": {
148 | "adset_id": self.test_adset_ids[0],
149 | "daily_budget": budget
150 | }
151 | })
152 |
153 | if not result["success"]:
154 | results[budget] = {
155 | "success": False,
156 | "error": result.get("text", "Unknown error")
157 | }
158 | print(f" ❌ Failed: {result.get('text', 'Unknown error')}")
159 | break
160 |
161 | # Parse response
162 | response_data = result["json"]["result"]
163 | content = response_data.get("content", [{}])[0].get("text", "")
164 |
165 | try:
166 | parsed_content = json.loads(content)
167 |
168 | # Check for successful update indicators
169 | has_id = "id" in parsed_content
170 | has_daily_budget = "daily_budget" in parsed_content
171 | has_success = "success" in parsed_content
172 | has_error = "error" in parsed_content
173 |
174 | # Handle rate limiting and API errors
175 | if has_error:
176 | error_msg = parsed_content.get("error", "")
177 | if "rate limit" in error_msg.lower() or "too many changes" in error_msg.lower():
178 | if attempt < max_retries - 1: # Don't retry on last attempt
179 | if self._wait_for_rate_limit(error_msg):
180 | print(f" 🔄 Retrying after rate limit...")
181 | continue
182 | else:
183 | break
184 | else:
185 | results[budget] = {
186 | "success": True, # Rate limiting is expected behavior
187 | "has_success": False,
188 | "has_error": True,
189 | "rate_limited": True,
190 | "error_message": error_msg
191 | }
192 | print(f" ⚠️ Rate limited (expected): {error_msg}")
193 | break
194 | else:
195 | results[budget] = {
196 | "success": False,
197 | "has_error": True,
198 | "error_message": error_msg
199 | }
200 | print(f" ❌ API Error: {error_msg}")
201 | break
202 |
203 | results[budget] = {
204 | "success": True,
205 | "has_id": has_id,
206 | "has_daily_budget": has_daily_budget,
207 | "has_success": has_success,
208 | "updated_budget": parsed_content.get("daily_budget", "N/A"),
209 | "adset_id": parsed_content.get("id", "N/A")
210 | }
211 |
212 | print(f" ✅ Updated daily budget to ${int(budget)/100:.2f}")
213 | print(f" • Ad Set ID: {parsed_content.get('id', 'N/A')}")
214 | print(f" • Success: {parsed_content.get('success', 'N/A')}")
215 | print(f" • Raw Response: {parsed_content}")
216 |
217 | # Note: Meta Ads API returns {"success": true} for updates
218 | # The actual updated values can be verified by fetching ad set details
219 | break # Success, exit retry loop
220 |
221 | except json.JSONDecodeError:
222 | results[budget] = {
223 | "success": False,
224 | "error": "Invalid JSON response",
225 | "raw_content": content
226 | }
227 | print(f" ❌ Invalid JSON: {content}")
228 | break
229 |
230 | return results
231 |
232 | def test_lifetime_budget_update(self) -> Dict[str, Any]:
233 | """Test lifetime budget update functionality"""
234 |
235 | print(f"\n💰 Testing lifetime budget update function")
236 | print(f" ⚠️ Note: Meta Ads API may reject lifetime budget updates if ad set has daily budget")
237 | results = {}
238 |
239 | for budget in self.test_budgets["lifetime_budgets"]:
240 | print(f" 💰 Updating lifetime budget to: ${int(budget)/100:.2f}")
241 |
242 | # Retry logic for rate limiting
243 | max_retries = 3
244 | for attempt in range(max_retries):
245 | result = self._make_request("tools/call", {
246 | "name": "update_adset",
247 | "arguments": {
248 | "adset_id": self.test_adset_ids[1],
249 | "lifetime_budget": budget
250 | }
251 | })
252 |
253 | if not result["success"]:
254 | results[budget] = {
255 | "success": False,
256 | "error": result.get("text", "Unknown error")
257 | }
258 | print(f" ❌ Failed: {result.get('text', 'Unknown error')}")
259 | break
260 |
261 | # Parse response
262 | response_data = result["json"]["result"]
263 | content = response_data.get("content", [{}])[0].get("text", "")
264 |
265 | try:
266 | parsed_content = json.loads(content)
267 |
268 | # Check for successful update indicators
269 | has_id = "id" in parsed_content
270 | has_lifetime_budget = "lifetime_budget" in parsed_content
271 | has_success = "success" in parsed_content
272 | has_error = "error" in parsed_content
273 |
274 | # Handle rate limiting and API errors
275 | if has_error:
276 | error_msg = parsed_content.get("error", "")
277 | if "rate limit" in error_msg.lower() or "too many changes" in error_msg.lower():
278 | if attempt < max_retries - 1: # Don't retry on last attempt
279 | if self._wait_for_rate_limit(error_msg):
280 | print(f" 🔄 Retrying after rate limit...")
281 | continue
282 | else:
283 | break
284 | else:
285 | results[budget] = {
286 | "success": True, # Rate limiting is expected behavior
287 | "has_success": False,
288 | "has_error": True,
289 | "rate_limited": True,
290 | "error_message": error_msg
291 | }
292 | print(f" ⚠️ Rate limited (expected): {error_msg}")
293 | break
294 | elif "should be recurring budget" in error_msg.lower() or "cannot switch" in error_msg.lower():
295 | results[budget] = {
296 | "success": False,
297 | "has_error": True,
298 | "api_limitation": "Cannot switch from daily to lifetime budget",
299 | "error_message": error_msg
300 | }
301 | print(f" ⚠️ API Limitation: {error_msg}")
302 | break
303 | else:
304 | results[budget] = {
305 | "success": False,
306 | "has_error": True,
307 | "error_message": error_msg
308 | }
309 | print(f" ❌ API Error: {error_msg}")
310 | break
311 |
312 | results[budget] = {
313 | "success": True,
314 | "has_id": has_id,
315 | "has_lifetime_budget": has_lifetime_budget,
316 | "has_success": has_success,
317 | "updated_budget": parsed_content.get("lifetime_budget", "N/A"),
318 | "adset_id": parsed_content.get("id", "N/A")
319 | }
320 |
321 | print(f" ✅ Updated lifetime budget to ${int(budget)/100:.2f}")
322 | print(f" • Ad Set ID: {parsed_content.get('id', 'N/A')}")
323 | print(f" • Success: {parsed_content.get('success', 'N/A')}")
324 |
325 | # Note: Meta Ads API returns {"success": true} for updates
326 | # The actual updated values can be verified by fetching ad set details
327 | break # Success, exit retry loop
328 |
329 | except json.JSONDecodeError:
330 | results[budget] = {
331 | "success": False,
332 | "error": "Invalid JSON response",
333 | "raw_content": content
334 | }
335 | print(f" ❌ Invalid JSON: {content}")
336 | break
337 |
338 | return results
339 |
340 | def test_both_budget_types_update(self) -> Dict[str, Any]:
341 | """Test updating both daily and lifetime budget simultaneously"""
342 |
343 | print(f"\n💰 Testing both budget types update function")
344 | print(f" ⚠️ Note: Meta Ads API may reject this if ad set has existing daily budget")
345 |
346 | daily_budget = "15000" # $150
347 | lifetime_budget = "150000" # $1500
348 |
349 | print(f" 💰 Updating both budgets - Daily: ${int(daily_budget)/100:.2f}, Lifetime: ${int(lifetime_budget)/100:.2f}")
350 |
351 | result = self._make_request("tools/call", {
352 | "name": "update_adset",
353 | "arguments": {
354 | "adset_id": self.test_adset_ids[2],
355 | "daily_budget": daily_budget,
356 | "lifetime_budget": lifetime_budget
357 | }
358 | })
359 |
360 | if not result["success"]:
361 | return {
362 | "success": False,
363 | "error": result.get("text", "Unknown error")
364 | }
365 |
366 | # Parse response
367 | response_data = result["json"]["result"]
368 | content = response_data.get("content", [{}])[0].get("text", "")
369 |
370 | try:
371 | parsed_content = json.loads(content)
372 |
373 | if "error" in parsed_content:
374 | error_msg = parsed_content.get("error", "")
375 | if "rate limit" in error_msg.lower() or "too many changes" in error_msg.lower():
376 | return {
377 | "success": True, # Rate limiting is expected behavior
378 | "rate_limited": True,
379 | "error_message": error_msg
380 | }
381 | else:
382 | return {
383 | "success": False,
384 | "error": error_msg,
385 | "api_limitation": "Cannot have both daily and lifetime budgets"
386 | }
387 |
388 | # Check for successful update indicators
389 | has_id = "id" in parsed_content
390 | has_daily_budget = "daily_budget" in parsed_content
391 | has_lifetime_budget = "lifetime_budget" in parsed_content
392 | has_success = "success" in parsed_content
393 |
394 | result_data = {
395 | "success": True,
396 | "has_id": has_id,
397 | "has_daily_budget": has_daily_budget,
398 | "has_lifetime_budget": has_lifetime_budget,
399 | "has_success": has_success,
400 | "daily_budget": parsed_content.get("daily_budget", "N/A"),
401 | "lifetime_budget": parsed_content.get("lifetime_budget", "N/A"),
402 | "adset_id": parsed_content.get("id", "N/A")
403 | }
404 |
405 | print(f" ✅ Updated both budgets successfully")
406 | print(f" • Ad Set ID: {parsed_content.get('id', 'N/A')}")
407 | print(f" • Success: {parsed_content.get('success', 'N/A')}")
408 |
409 | # Note: Meta Ads API returns {"success": true} for updates
410 | # The actual updated values can be verified by fetching ad set details
411 |
412 | return result_data
413 |
414 | except json.JSONDecodeError:
415 | return {
416 | "success": False,
417 | "error": "Invalid JSON response",
418 | "raw_content": content
419 | }
420 |
421 | def test_budget_update_with_other_parameters(self) -> Dict[str, Any]:
422 | """Test budget update combined with other parameters"""
423 |
424 | print(f"\n💰 Testing budget update with other parameters")
425 |
426 | result = self._make_request("tools/call", {
427 | "name": "update_adset",
428 | "arguments": {
429 | "adset_id": self.test_adset_ids[3],
430 | "daily_budget": "7500", # $75
431 | "status": "PAUSED",
432 | "bid_amount": 1000,
433 | "bid_strategy": "LOWEST_COST_WITH_BID_CAP"
434 | }
435 | })
436 |
437 | if not result["success"]:
438 | return {
439 | "success": False,
440 | "error": result.get("text", "Unknown error")
441 | }
442 |
443 | # Parse response
444 | response_data = result["json"]["result"]
445 | content = response_data.get("content", [{}])[0].get("text", "")
446 |
447 | try:
448 | parsed_content = json.loads(content)
449 |
450 | if "error" in parsed_content:
451 | error_msg = parsed_content.get("error", "")
452 | if "rate limit" in error_msg.lower() or "too many changes" in error_msg.lower():
453 | return {
454 | "success": True, # Rate limiting is expected behavior
455 | "rate_limited": True,
456 | "error_message": error_msg
457 | }
458 | else:
459 | return {
460 | "success": False,
461 | "error": error_msg
462 | }
463 |
464 | # Check for successful update indicators
465 | has_id = "id" in parsed_content
466 | has_daily_budget = "daily_budget" in parsed_content
467 | has_status = "status" in parsed_content
468 | has_success = "success" in parsed_content
469 |
470 | result_data = {
471 | "success": True,
472 | "has_id": has_id,
473 | "has_daily_budget": has_daily_budget,
474 | "has_status": has_status,
475 | "has_success": has_success,
476 | "daily_budget": parsed_content.get("daily_budget", "N/A"),
477 | "status": parsed_content.get("status", "N/A"),
478 | "adset_id": parsed_content.get("id", "N/A")
479 | }
480 |
481 | print(f" ✅ Updated budget with other parameters successfully")
482 | print(f" • Ad Set ID: {parsed_content.get('id', 'N/A')}")
483 | print(f" • Success: {parsed_content.get('success', 'N/A')}")
484 |
485 | # Note: Meta Ads API returns {"success": true} for updates
486 | # The actual updated values can be verified by fetching ad set details
487 |
488 | return result_data
489 |
490 | except json.JSONDecodeError:
491 | return {
492 | "success": False,
493 | "error": "Invalid JSON response",
494 | "raw_content": content
495 | }
496 |
497 | def test_invalid_budget_handling(self) -> Dict[str, Any]:
498 | """Test error handling for invalid budget values"""
499 |
500 | print(f"\n💰 Testing invalid budget handling")
501 | results = {}
502 |
503 | for invalid_budget in self.test_budgets["invalid_budgets"]:
504 | print(f" 💰 Testing invalid budget: '{invalid_budget}'")
505 |
506 | result = self._make_request("tools/call", {
507 | "name": "update_adset",
508 | "arguments": {
509 | "adset_id": self.test_adset_ids[4],
510 | "daily_budget": invalid_budget
511 | }
512 | })
513 |
514 | if not result["success"]:
515 | results[invalid_budget] = {
516 | "success": False,
517 | "error": result.get("text", "Unknown error")
518 | }
519 | print(f" ❌ Request failed: {result.get('text', 'Unknown error')}")
520 | continue
521 |
522 | # Parse response
523 | response_data = result["json"]["result"]
524 | content = response_data.get("content", [{}])[0].get("text", "")
525 |
526 | try:
527 | parsed_content = json.loads(content)
528 |
529 | # For invalid budgets, we expect an error response
530 | has_error = "error" in parsed_content or "data" in parsed_content
531 | has_details = "details" in parsed_content
532 |
533 | # Check if the error is a proper validation error (not a rate limit or other issue)
534 | error_msg = parsed_content.get("error", "")
535 | if not error_msg and "data" in parsed_content:
536 | try:
537 | data_content = json.loads(parsed_content.get("data", ""))
538 | if "error" in data_content:
539 | error_msg = data_content["error"].get("message", "")
540 | except:
541 | pass
542 |
543 | is_validation_error = any(keyword in error_msg.lower() for keyword in [
544 | "must be a number", "greater than or equal to 0", "too high", "too low", "invalid parameter",
545 | "budget is too low", "budget is too high", "decrease your ad set budget"
546 | ])
547 |
548 | results[invalid_budget] = {
549 | "success": has_error and is_validation_error, # Success if we got proper validation error
550 | "has_error": has_error,
551 | "has_details": has_details,
552 | "is_validation_error": is_validation_error,
553 | "error_message": error_msg or parsed_content.get("error", "No error field"),
554 | "details": parsed_content.get("details", "No details field")
555 | }
556 |
557 | if has_error and is_validation_error:
558 | print(f" ✅ Properly handled invalid budget '{invalid_budget}'")
559 | print(f" • Error: {parsed_content.get('error', 'N/A')}")
560 | elif has_error:
561 | print(f" ⚠️ Got error but not validation error for '{invalid_budget}'")
562 | print(f" • Error: {parsed_content.get('error', 'N/A')}")
563 | else:
564 | print(f" ❌ Unexpected success for invalid budget '{invalid_budget}'")
565 | print(f" • Response: {parsed_content}")
566 |
567 | except json.JSONDecodeError:
568 | results[invalid_budget] = {
569 | "success": False,
570 | "error": "Invalid JSON response",
571 | "raw_content": content
572 | }
573 | print(f" ❌ Invalid JSON: {content}")
574 |
575 | return results
576 |
577 | def test_budget_update_with_targeting(self) -> Dict[str, Any]:
578 | """Test budget update combined with targeting update"""
579 |
580 | print(f"\n💰 Testing budget update with targeting")
581 |
582 | targeting = {
583 | "age_min": 25,
584 | "age_max": 45,
585 | "geo_locations": {"countries": ["US", "CA"]}
586 | }
587 |
588 | result = self._make_request("tools/call", {
589 | "name": "update_adset",
590 | "arguments": {
591 | "adset_id": self.test_adset_ids[5],
592 | "daily_budget": "8500", # $85
593 | "targeting": targeting
594 | }
595 | })
596 |
597 | if not result["success"]:
598 | return {
599 | "success": False,
600 | "error": result.get("text", "Unknown error")
601 | }
602 |
603 | # Parse response
604 | response_data = result["json"]["result"]
605 | content = response_data.get("content", [{}])[0].get("text", "")
606 |
607 | try:
608 | parsed_content = json.loads(content)
609 |
610 | if "error" in parsed_content:
611 | error_msg = parsed_content.get("error", "")
612 | if "rate limit" in error_msg.lower() or "too many changes" in error_msg.lower():
613 | return {
614 | "success": True, # Rate limiting is expected behavior
615 | "rate_limited": True,
616 | "error_message": error_msg
617 | }
618 | else:
619 | return {
620 | "success": False,
621 | "error": error_msg
622 | }
623 |
624 | # Check for successful update indicators
625 | has_id = "id" in parsed_content
626 | has_daily_budget = "daily_budget" in parsed_content
627 | has_success = "success" in parsed_content
628 |
629 | result_data = {
630 | "success": True,
631 | "has_id": has_id,
632 | "has_daily_budget": has_daily_budget,
633 | "has_success": has_success,
634 | "daily_budget": parsed_content.get("daily_budget", "N/A"),
635 | "adset_id": parsed_content.get("id", "N/A")
636 | }
637 |
638 | print(f" ✅ Updated budget with targeting successfully")
639 | print(f" • Ad Set ID: {parsed_content.get('id', 'N/A')}")
640 | print(f" • Success: {parsed_content.get('success', 'N/A')}")
641 |
642 | # Note: Meta Ads API returns {"success": true} for updates
643 | # The actual updated values can be verified by fetching ad set details
644 |
645 | return result_data
646 |
647 | except json.JSONDecodeError:
648 | return {
649 | "success": False,
650 | "error": "Invalid JSON response",
651 | "raw_content": content
652 | }
653 |
654 | def run_budget_update_tests(self) -> bool:
655 | """Run comprehensive budget update tests"""
656 |
657 | print("🚀 Meta Ads Budget Update End-to-End Test Suite")
658 | print("="*60)
659 |
660 | # Check server availability
661 | try:
662 | response = requests.get(f"{self.base_url}/", timeout=5)
663 | server_running = response.status_code in [200, 404]
664 | except:
665 | server_running = False
666 |
667 | if not server_running:
668 | print("❌ Server is not running at", self.base_url)
669 | print(" Please start the server with:")
670 | print(" python3 -m meta_ads_mcp --transport streamable-http --port 8080")
671 | return False
672 |
673 | print("✅ Server is running")
674 | print("🔐 Using implicit authentication from server")
675 | print("⚠️ Note: This test uses ad sets specifically created for budget testing")
676 | print("⚠️ Note: Campaign uses ad set level budgets - testing budget updates at ad set level")
677 | print("⚠️ Note: Meta Ads API allows only 4 budget changes per hour - test will wait if rate limited")
678 |
679 | # Test 1: Daily Budget Updates
680 | print("\n" + "="*60)
681 | print("📋 PHASE 1: Testing Daily Budget Updates")
682 | print("="*60)
683 |
684 | daily_results = self.test_daily_budget_update()
685 | daily_success = any(
686 | result.get("success") or
687 | (result.get("success") and result.get("rate_limited"))
688 | for result in daily_results.values()
689 | )
690 |
691 | # Test 2: Lifetime Budget Updates
692 | print("\n" + "="*60)
693 | print("📋 PHASE 2: Testing Lifetime Budget Updates")
694 | print("="*60)
695 |
696 | lifetime_results = self.test_lifetime_budget_update()
697 | lifetime_success = any(
698 | result.get("success") or
699 | (result.get("success") and result.get("rate_limited")) or
700 | (not result.get("success") and result.get("api_limitation"))
701 | for result in lifetime_results.values()
702 | )
703 |
704 | # Test 3: Both Budget Types
705 | print("\n" + "="*60)
706 | print("📋 PHASE 3: Testing Both Budget Types")
707 | print("="*60)
708 |
709 | both_budgets_result = self.test_both_budget_types_update()
710 | both_budgets_success = (both_budgets_result.get("success") or
711 | (not both_budgets_result.get("success") and
712 | both_budgets_result.get("rate_limited")) or
713 | (not both_budgets_result.get("success") and
714 | both_budgets_result.get("api_limitation")))
715 |
716 | # Test 4: Budget with Other Parameters
717 | print("\n" + "="*60)
718 | print("📋 PHASE 4: Testing Budget with Other Parameters")
719 | print("="*60)
720 |
721 | other_params_result = self.test_budget_update_with_other_parameters()
722 | other_params_success = (other_params_result.get("success") or
723 | (other_params_result.get("success") and
724 | other_params_result.get("rate_limited")))
725 |
726 | # Test 5: Invalid Budget Handling
727 | print("\n" + "="*60)
728 | print("📋 PHASE 5: Testing Invalid Budget Handling")
729 | print("="*60)
730 |
731 | invalid_results = self.test_invalid_budget_handling()
732 | invalid_success = any(
733 | result.get("success") and result.get("is_validation_error")
734 | for result in invalid_results.values()
735 | )
736 |
737 | # Test 6: Budget with Targeting
738 | print("\n" + "="*60)
739 | print("📋 PHASE 6: Testing Budget with Targeting")
740 | print("="*60)
741 |
742 | targeting_result = self.test_budget_update_with_targeting()
743 | targeting_success = (targeting_result.get("success") or
744 | (targeting_result.get("success") and
745 | targeting_result.get("rate_limited")))
746 |
747 | # Final assessment
748 | print("\n" + "="*60)
749 | print("📊 FINAL RESULTS")
750 | print("="*60)
751 |
752 | all_tests = [
753 | ("Daily Budget Updates", daily_success),
754 | ("Lifetime Budget Updates", lifetime_success),
755 | ("Both Budget Types", both_budgets_success),
756 | ("Budget with Other Parameters", other_params_success),
757 | ("Invalid Budget Handling", invalid_success),
758 | ("Budget with Targeting", targeting_success)
759 | ]
760 |
761 | passed_tests = sum(1 for _, success in all_tests if success)
762 | total_tests = len(all_tests)
763 |
764 | for test_name, success in all_tests:
765 | status = "✅ PASSED" if success else "❌ FAILED"
766 | print(f" • {test_name}: {status}")
767 |
768 | overall_success = passed_tests >= 4 # At least 4 out of 6 tests should pass
769 |
770 | if overall_success:
771 | print(f"\n✅ Budget update tests: SUCCESS ({passed_tests}/{total_tests} passed)")
772 | print(" • Core budget update functionality is working")
773 | print(" • Meta Ads API integration is functional")
774 | print(" • Error handling is working properly")
775 | return True
776 | else:
777 | print(f"\n❌ Budget update tests: FAILED ({passed_tests}/{total_tests} passed)")
778 | print(" • Some budget update functions are not working properly")
779 | print(" • Check API permissions and ad set IDs")
780 | return False
781 |
782 |
783 | def main():
784 | """Main test execution"""
785 | tester = BudgetUpdateTester()
786 | success = tester.run_budget_update_tests()
787 |
788 | if success:
789 | print("\n🎉 All budget update tests passed!")
790 | else:
791 | print("\n⚠️ Some budget update tests failed - see details above")
792 |
793 | sys.exit(0 if success else 1)
794 |
795 |
796 | if __name__ == "__main__":
797 | main()
```
--------------------------------------------------------------------------------
/tests/test_dynamic_creatives.py:
--------------------------------------------------------------------------------
```python
1 | """Tests for dynamic creative features including multiple headlines and descriptions.
2 |
3 | Tests for the enhanced create_ad_creative function that supports:
4 | - Multiple headlines
5 | - Multiple descriptions
6 | - Dynamic creative optimization settings
7 | - Creative update functionality
8 | """
9 |
10 | import pytest
11 | import json
12 | from unittest.mock import AsyncMock, patch
13 | from meta_ads_mcp.core.ads import create_ad_creative, update_ad_creative
14 |
15 |
16 | @pytest.mark.asyncio
17 | class TestDynamicCreatives:
18 | """Test cases for dynamic creative features."""
19 |
20 | async def test_create_ad_creative_single_headline(self):
21 | """Test creating ad creative with single headline (simple case)."""
22 |
23 | sample_creative_data = {
24 | "id": "123456789",
25 | "name": "Test Creative",
26 | "status": "ACTIVE"
27 | }
28 |
29 | with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
30 | mock_api.return_value = sample_creative_data
31 |
32 | result = await create_ad_creative(
33 | access_token="test_token",
34 | account_id="act_123456789",
35 | name="Test Creative",
36 | image_hash="abc123",
37 | page_id="987654321",
38 | link_url="https://example.com",
39 | message="Test message",
40 | headline="Single Headline",
41 | call_to_action_type="LEARN_MORE"
42 | )
43 |
44 | result_data = json.loads(result)
45 | assert result_data["success"] is True
46 |
47 | # Verify the API call was made with single headline in object_story_spec
48 | call_args_list = mock_api.call_args_list
49 | assert len(call_args_list) >= 1
50 |
51 | # First call should be the creative creation
52 | first_call = call_args_list[0]
53 | creative_data = first_call[0][2] # params is the third argument
54 |
55 | # Should use object_story_spec with link_data for simple creatives (not asset_feed_spec)
56 | assert "object_story_spec" in creative_data
57 | assert "link_data" in creative_data["object_story_spec"]
58 | assert creative_data["object_story_spec"]["link_data"]["name"] == "Single Headline"
59 | assert "asset_feed_spec" not in creative_data
60 |
61 | async def test_create_ad_creative_single_description(self):
62 | """Test creating ad creative with single description (simple case)."""
63 |
64 | sample_creative_data = {
65 | "id": "123456789",
66 | "name": "Test Creative",
67 | "status": "ACTIVE"
68 | }
69 |
70 | with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
71 | mock_api.return_value = sample_creative_data
72 |
73 | result = await create_ad_creative(
74 | access_token="test_token",
75 | account_id="act_123456789",
76 | name="Test Creative",
77 | image_hash="abc123",
78 | page_id="987654321",
79 | link_url="https://example.com",
80 | message="Test message",
81 | description="Single Description",
82 | call_to_action_type="LEARN_MORE"
83 | )
84 |
85 | result_data = json.loads(result)
86 | assert result_data["success"] is True
87 |
88 | # Verify the API call was made with single description in object_story_spec
89 | call_args_list = mock_api.call_args_list
90 | assert len(call_args_list) >= 1
91 |
92 | # First call should be the creative creation
93 | first_call = call_args_list[0]
94 | creative_data = first_call[0][2] # params is the third argument
95 |
96 | # Should use object_story_spec with link_data for simple creatives (not asset_feed_spec)
97 | assert "object_story_spec" in creative_data
98 | assert "link_data" in creative_data["object_story_spec"]
99 | assert creative_data["object_story_spec"]["link_data"]["description"] == "Single Description"
100 | assert "asset_feed_spec" not in creative_data
101 |
102 | async def test_create_ad_creative_cannot_mix_headline_and_headlines(self):
103 | """Test that mixing headline and headlines parameters raises error."""
104 |
105 | result = await create_ad_creative(
106 | access_token="test_token",
107 | account_id="act_123456789",
108 | name="Test Creative",
109 | image_hash="abc123",
110 | page_id="987654321",
111 | link_url="https://example.com",
112 | message="Test message",
113 | headline="Single Headline",
114 | headlines=["Headline 1", "Headline 2"],
115 | call_to_action_type="LEARN_MORE"
116 | )
117 |
118 | result_data = json.loads(result)
119 |
120 | # Check if error is wrapped in "data" field (MCP error response format)
121 | if "data" in result_data:
122 | error_data = json.loads(result_data["data"])
123 | assert "error" in error_data
124 | assert "Cannot specify both 'headline' and 'headlines'" in error_data["error"]
125 | else:
126 | assert "error" in result_data
127 | assert "Cannot specify both 'headline' and 'headlines'" in result_data["error"]
128 |
129 | async def test_create_ad_creative_cannot_mix_description_and_descriptions(self):
130 | """Test that mixing description and descriptions parameters raises error."""
131 |
132 | result = await create_ad_creative(
133 | access_token="test_token",
134 | account_id="act_123456789",
135 | name="Test Creative",
136 | image_hash="abc123",
137 | page_id="987654321",
138 | link_url="https://example.com",
139 | message="Test message",
140 | description="Single Description",
141 | descriptions=["Description 1", "Description 2"],
142 | call_to_action_type="LEARN_MORE"
143 | )
144 |
145 | result_data = json.loads(result)
146 |
147 | # Check if error is wrapped in "data" field (MCP error response format)
148 | if "data" in result_data:
149 | error_data = json.loads(result_data["data"])
150 | assert "error" in error_data
151 | assert "Cannot specify both 'description' and 'descriptions'" in error_data["error"]
152 | else:
153 | assert "error" in result_data
154 | assert "Cannot specify both 'description' and 'descriptions'" in result_data["error"]
155 |
156 |
157 | async def test_create_ad_creative_multiple_headlines(self):
158 | """Test creating ad creative with multiple headlines."""
159 |
160 | sample_creative_data = {
161 | "id": "123456789",
162 | "name": "Test Creative",
163 | "status": "ACTIVE"
164 | }
165 |
166 | with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
167 | mock_api.return_value = sample_creative_data
168 |
169 | result = await create_ad_creative(
170 | access_token="test_token",
171 | account_id="act_123456789",
172 | name="Test Creative",
173 | image_hash="abc123",
174 | page_id="987654321",
175 | link_url="https://example.com",
176 | message="Test message",
177 | headlines=["Headline 1", "Headline 2", "Headline 3"],
178 | call_to_action_type="LEARN_MORE"
179 | )
180 |
181 | result_data = json.loads(result)
182 | assert result_data["success"] is True
183 |
184 | # Verify the API call was made with multiple headlines
185 | # We need to check the first call (creative creation), not the second call (details fetch)
186 | call_args_list = mock_api.call_args_list
187 | assert len(call_args_list) >= 1
188 |
189 | # First call should be the creative creation
190 | first_call = call_args_list[0]
191 | creative_data = first_call[0][2] # params is the third argument
192 |
193 | # Should use asset_feed_spec with headlines array format
194 | assert "asset_feed_spec" in creative_data
195 | assert "headlines" in creative_data["asset_feed_spec"]
196 | assert creative_data["asset_feed_spec"]["headlines"] == [
197 | {"text": "Headline 1"},
198 | {"text": "Headline 2"},
199 | {"text": "Headline 3"}
200 | ]
201 |
202 | async def test_create_ad_creative_multiple_descriptions(self):
203 | """Test creating ad creative with multiple descriptions."""
204 |
205 | sample_creative_data = {
206 | "id": "123456789",
207 | "name": "Test Creative",
208 | "status": "ACTIVE"
209 | }
210 |
211 | with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
212 | mock_api.return_value = sample_creative_data
213 |
214 | result = await create_ad_creative(
215 | access_token="test_token",
216 | account_id="act_123456789",
217 | name="Test Creative",
218 | image_hash="abc123",
219 | page_id="987654321",
220 | link_url="https://example.com",
221 | message="Test message",
222 | descriptions=["Description 1", "Description 2"],
223 | call_to_action_type="LEARN_MORE"
224 | )
225 |
226 | result_data = json.loads(result)
227 | assert result_data["success"] is True
228 |
229 | # Verify the API call was made with multiple descriptions
230 | # We need to check the first call (creative creation), not the second call (details fetch)
231 | call_args_list = mock_api.call_args_list
232 | assert len(call_args_list) >= 1
233 |
234 | # First call should be the creative creation
235 | first_call = call_args_list[0]
236 | creative_data = first_call[0][2] # params is the third argument
237 |
238 | # Should use asset_feed_spec with descriptions array format
239 | assert "asset_feed_spec" in creative_data
240 | assert "descriptions" in creative_data["asset_feed_spec"]
241 | assert creative_data["asset_feed_spec"]["descriptions"] == [
242 | {"text": "Description 1"},
243 | {"text": "Description 2"}
244 | ]
245 |
246 | async def test_create_ad_creative_dynamic_creative_spec(self):
247 | """Test creating ad creative with dynamic_creative_spec optimization settings."""
248 |
249 | sample_creative_data = {
250 | "id": "123456789",
251 | "name": "Test Creative",
252 | "status": "ACTIVE"
253 | }
254 |
255 | with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
256 | mock_api.return_value = sample_creative_data
257 |
258 | result = await create_ad_creative(
259 | access_token="test_token",
260 | account_id="act_123456789",
261 | name="Test Creative",
262 | image_hash="abc123",
263 | page_id="987654321",
264 | link_url="https://example.com",
265 | message="Test message",
266 | headlines=["Headline 1", "Headline 2"],
267 | descriptions=["Description 1", "Description 2"],
268 | dynamic_creative_spec={
269 | "headline_optimization": True,
270 | "description_optimization": True
271 | },
272 | call_to_action_type="LEARN_MORE"
273 | )
274 |
275 | result_data = json.loads(result)
276 | assert result_data["success"] is True
277 |
278 | # Verify the API call was made with dynamic_creative_spec
279 | # We need to check the first call (creative creation), not the second call (details fetch)
280 | call_args_list = mock_api.call_args_list
281 | assert len(call_args_list) >= 1
282 |
283 | # First call should be the creative creation
284 | first_call = call_args_list[0]
285 | creative_data = first_call[0][2] # params is the third argument
286 |
287 | # Should include dynamic_creative_spec
288 | assert "dynamic_creative_spec" in creative_data
289 | assert creative_data["dynamic_creative_spec"]["headline_optimization"] is True
290 | assert creative_data["dynamic_creative_spec"]["description_optimization"] is True
291 |
292 | async def test_create_ad_creative_multiple_headlines_and_descriptions(self):
293 | """Test creating ad creative with both multiple headlines and descriptions."""
294 |
295 | sample_creative_data = {
296 | "id": "123456789",
297 | "name": "Test Creative",
298 | "status": "ACTIVE"
299 | }
300 |
301 | with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
302 | mock_api.return_value = sample_creative_data
303 |
304 | result = await create_ad_creative(
305 | access_token="test_token",
306 | account_id="act_123456789",
307 | name="Test Creative",
308 | image_hash="abc123",
309 | page_id="987654321",
310 | link_url="https://example.com",
311 | message="Test message",
312 | headlines=["Headline 1", "Headline 2", "Headline 3"],
313 | descriptions=["Description 1", "Description 2"],
314 | dynamic_creative_spec={
315 | "headline_optimization": True,
316 | "description_optimization": True
317 | },
318 | call_to_action_type="LEARN_MORE"
319 | )
320 |
321 | result_data = json.loads(result)
322 | assert result_data["success"] is True
323 |
324 | # Verify the API call was made with both multiple headlines and descriptions
325 | # We need to check the first call (creative creation), not the second call (details fetch)
326 | call_args_list = mock_api.call_args_list
327 | assert len(call_args_list) >= 1
328 |
329 | # First call should be the creative creation
330 | first_call = call_args_list[0]
331 | creative_data = first_call[0][2] # params is the third argument
332 |
333 | assert "asset_feed_spec" in creative_data
334 | assert "headlines" in creative_data["asset_feed_spec"]
335 | assert "descriptions" in creative_data["asset_feed_spec"]
336 | assert "dynamic_creative_spec" in creative_data
337 |
338 | async def test_create_ad_creative_validation_max_headlines(self):
339 | """Test validation for maximum number of headlines."""
340 |
341 | # Create list with more than 5 headlines (assuming 5 is the limit)
342 | too_many_headlines = [f"Headline {i}" for i in range(6)]
343 |
344 | result = await create_ad_creative(
345 | access_token="test_token",
346 | account_id="act_123456789",
347 | name="Test Creative",
348 | image_hash="abc123",
349 | page_id="987654321",
350 | headlines=too_many_headlines
351 | )
352 |
353 | result_data = json.loads(result)
354 | # The error might be wrapped in a 'data' field
355 | if "data" in result_data:
356 | error_data = json.loads(result_data["data"])
357 | assert "error" in error_data
358 | assert "maximum" in error_data["error"].lower() or "limit" in error_data["error"].lower()
359 | else:
360 | assert "error" in result_data
361 | assert "maximum" in result_data["error"].lower() or "limit" in result_data["error"].lower()
362 |
363 | async def test_create_ad_creative_validation_max_descriptions(self):
364 | """Test validation for maximum number of descriptions."""
365 |
366 | # Create list with more than 5 descriptions (assuming 5 is the limit)
367 | too_many_descriptions = [f"Description {i}" for i in range(6)]
368 |
369 | result = await create_ad_creative(
370 | access_token="test_token",
371 | account_id="act_123456789",
372 | name="Test Creative",
373 | image_hash="abc123",
374 | page_id="987654321",
375 | descriptions=too_many_descriptions
376 | )
377 |
378 | result_data = json.loads(result)
379 | # The error might be wrapped in a 'data' field
380 | if "data" in result_data:
381 | error_data = json.loads(result_data["data"])
382 | assert "error" in error_data
383 | assert "maximum" in error_data["error"].lower() or "limit" in error_data["error"].lower()
384 | else:
385 | assert "error" in result_data
386 | assert "maximum" in result_data["error"].lower() or "limit" in result_data["error"].lower()
387 |
388 |
389 |
390 | async def test_update_ad_creative_add_headlines(self):
391 | """Test updating an existing creative to add multiple headlines."""
392 |
393 | sample_creative_data = {
394 | "id": "123456789",
395 | "name": "Updated Creative",
396 | "status": "ACTIVE"
397 | }
398 |
399 | with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
400 | mock_api.return_value = sample_creative_data
401 |
402 | result = await update_ad_creative(
403 | access_token="test_token",
404 | creative_id="123456789",
405 | headlines=["New Headline 1", "New Headline 2", "New Headline 3"]
406 | )
407 |
408 | result_data = json.loads(result)
409 | assert result_data["success"] is True
410 |
411 | # Verify the API call was made with updated headlines
412 | # We need to check the first call (creative update), not the second call (details fetch)
413 | call_args_list = mock_api.call_args_list
414 | assert len(call_args_list) >= 1
415 |
416 | # First call should be the creative update
417 | first_call = call_args_list[0]
418 | creative_data = first_call[0][2] # params is the third argument
419 |
420 | assert "asset_feed_spec" in creative_data
421 | assert "headlines" in creative_data["asset_feed_spec"]
422 | assert creative_data["asset_feed_spec"]["headlines"] == [
423 | {"text": "New Headline 1"},
424 | {"text": "New Headline 2"},
425 | {"text": "New Headline 3"}
426 | ]
427 |
428 | async def test_update_ad_creative_add_descriptions(self):
429 | """Test updating an existing creative to add multiple descriptions."""
430 |
431 | sample_creative_data = {
432 | "id": "123456789",
433 | "name": "Updated Creative",
434 | "status": "ACTIVE"
435 | }
436 |
437 | with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
438 | mock_api.return_value = sample_creative_data
439 |
440 | result = await update_ad_creative(
441 | access_token="test_token",
442 | creative_id="123456789",
443 | descriptions=["New Description 1", "New Description 2"]
444 | )
445 |
446 | result_data = json.loads(result)
447 | assert result_data["success"] is True
448 |
449 | # Verify the API call was made with updated descriptions
450 | # We need to check the first call (creative update), not the second call (details fetch)
451 | call_args_list = mock_api.call_args_list
452 | assert len(call_args_list) >= 1
453 |
454 | # First call should be the creative update
455 | first_call = call_args_list[0]
456 | creative_data = first_call[0][2] # params is the third argument
457 |
458 | assert "asset_feed_spec" in creative_data
459 | assert "descriptions" in creative_data["asset_feed_spec"]
460 | assert creative_data["asset_feed_spec"]["descriptions"] == [
461 | {"text": "New Description 1"},
462 | {"text": "New Description 2"}
463 | ]
464 |
465 | async def test_update_ad_creative_update_dynamic_spec(self):
466 | """Test updating an existing creative's dynamic_creative_spec."""
467 |
468 | sample_creative_data = {
469 | "id": "123456789",
470 | "name": "Updated Creative",
471 | "status": "ACTIVE"
472 | }
473 |
474 | with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
475 | mock_api.return_value = sample_creative_data
476 |
477 | result = await update_ad_creative(
478 | access_token="test_token",
479 | creative_id="123456789",
480 | dynamic_creative_spec={
481 | "headline_optimization": True,
482 | "description_optimization": False
483 | }
484 | )
485 |
486 | result_data = json.loads(result)
487 | assert result_data["success"] is True
488 |
489 | # Verify the API call was made with updated dynamic_creative_spec
490 | # We need to check the first call (creative update), not the second call (details fetch)
491 | call_args_list = mock_api.call_args_list
492 | assert len(call_args_list) >= 1
493 |
494 | # First call should be the creative update
495 | first_call = call_args_list[0]
496 | creative_data = first_call[0][2] # params is the third argument
497 |
498 | assert "dynamic_creative_spec" in creative_data
499 | assert creative_data["dynamic_creative_spec"]["headline_optimization"] is True
500 | assert creative_data["dynamic_creative_spec"]["description_optimization"] is False
501 |
502 | async def test_update_ad_creative_no_creative_id(self):
503 | """Test update_ad_creative with empty creative_id provided."""
504 |
505 | result = await update_ad_creative(
506 | creative_id="", # Now provide the required parameter but with empty value
507 | access_token="test_token",
508 | headlines=["New Headline"]
509 | )
510 |
511 | result_data = json.loads(result)
512 | # The error might be wrapped in a 'data' field
513 | if "data" in result_data:
514 | error_data = json.loads(result_data["data"])
515 | assert "error" in error_data
516 | assert "creative id" in error_data["error"].lower()
517 | else:
518 | assert "error" in result_data
519 | assert "creative id" in result_data["error"].lower()
520 |
521 | async def test_update_ad_creative_cannot_mix_headline_and_headlines(self):
522 | """Test that mixing headline and headlines parameters raises error in update."""
523 |
524 | result = await update_ad_creative(
525 | access_token="test_token",
526 | creative_id="123456789",
527 | headline="Single Headline",
528 | headlines=["Headline 1", "Headline 2"]
529 | )
530 |
531 | result_data = json.loads(result)
532 |
533 | # Check if error is wrapped in "data" field (MCP error response format)
534 | if "data" in result_data:
535 | error_data = json.loads(result_data["data"])
536 | assert "error" in error_data
537 | assert "Cannot specify both 'headline' and 'headlines'" in error_data["error"]
538 | else:
539 | assert "error" in result_data
540 | assert "Cannot specify both 'headline' and 'headlines'" in result_data["error"]
541 |
542 | async def test_update_ad_creative_cannot_mix_description_and_descriptions(self):
543 | """Test that mixing description and descriptions parameters raises error in update."""
544 |
545 | result = await update_ad_creative(
546 | access_token="test_token",
547 | creative_id="123456789",
548 | description="Single Description",
549 | descriptions=["Description 1", "Description 2"]
550 | )
551 |
552 | result_data = json.loads(result)
553 |
554 | # Check if error is wrapped in "data" field (MCP error response format)
555 | if "data" in result_data:
556 | error_data = json.loads(result_data["data"])
557 | assert "error" in error_data
558 | assert "Cannot specify both 'description' and 'descriptions'" in error_data["error"]
559 | else:
560 | assert "error" in result_data
561 | assert "Cannot specify both 'description' and 'descriptions'" in result_data["error"]
562 |
563 | async def test_update_ad_creative_validation_max_headlines(self):
564 | """Test validation for maximum number of headlines in update."""
565 |
566 | # Create list with more than 5 headlines (limit)
567 | too_many_headlines = [f"Headline {i}" for i in range(6)]
568 |
569 | result = await update_ad_creative(
570 | access_token="test_token",
571 | creative_id="123456789",
572 | headlines=too_many_headlines
573 | )
574 |
575 | result_data = json.loads(result)
576 | # The error might be wrapped in a 'data' field
577 | if "data" in result_data:
578 | error_data = json.loads(result_data["data"])
579 | assert "error" in error_data
580 | assert "maximum 5 headlines" in error_data["error"].lower()
581 | else:
582 | assert "error" in result_data
583 | assert "maximum 5 headlines" in result_data["error"].lower()
584 |
585 | async def test_update_ad_creative_validation_max_descriptions(self):
586 | """Test validation for maximum number of descriptions in update."""
587 |
588 | # Create list with more than 5 descriptions (limit)
589 | too_many_descriptions = [f"Description {i}" for i in range(6)]
590 |
591 | result = await update_ad_creative(
592 | access_token="test_token",
593 | creative_id="123456789",
594 | descriptions=too_many_descriptions
595 | )
596 |
597 | result_data = json.loads(result)
598 | # The error might be wrapped in a 'data' field
599 | if "data" in result_data:
600 | error_data = json.loads(result_data["data"])
601 | assert "error" in error_data
602 | assert "maximum 5 descriptions" in error_data["error"].lower()
603 | else:
604 | assert "error" in result_data
605 | assert "maximum 5 descriptions" in result_data["error"].lower()
606 |
607 | async def test_update_ad_creative_validation_headline_length(self):
608 | """Test validation for headline character limit."""
609 |
610 | # Create headline longer than 40 characters
611 | long_headline = "A" * 41
612 |
613 | result = await update_ad_creative(
614 | access_token="test_token",
615 | creative_id="123456789",
616 | headlines=[long_headline]
617 | )
618 |
619 | result_data = json.loads(result)
620 | # The error might be wrapped in a 'data' field
621 | if "data" in result_data:
622 | error_data = json.loads(result_data["data"])
623 | assert "error" in error_data
624 | assert "40 character limit" in error_data["error"]
625 | else:
626 | assert "error" in result_data
627 | assert "40 character limit" in result_data["error"]
628 |
629 | async def test_update_ad_creative_validation_description_length(self):
630 | """Test validation for description character limit."""
631 |
632 | # Create description longer than 125 characters
633 | long_description = "A" * 126
634 |
635 | result = await update_ad_creative(
636 | access_token="test_token",
637 | creative_id="123456789",
638 | descriptions=[long_description]
639 | )
640 |
641 | result_data = json.loads(result)
642 | # The error might be wrapped in a 'data' field
643 | if "data" in result_data:
644 | error_data = json.loads(result_data["data"])
645 | assert "error" in error_data
646 | assert "125 character limit" in error_data["error"]
647 | else:
648 | assert "error" in result_data
649 | assert "125 character limit" in result_data["error"]
650 |
651 | async def test_update_ad_creative_name_only(self):
652 | """Test updating just the creative name."""
653 |
654 | sample_creative_data = {
655 | "id": "123456789",
656 | "name": "Updated Creative Name",
657 | "status": "ACTIVE"
658 | }
659 |
660 | with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
661 | mock_api.return_value = sample_creative_data
662 |
663 | result = await update_ad_creative(
664 | access_token="test_token",
665 | creative_id="123456789",
666 | name="Updated Creative Name"
667 | )
668 |
669 | result_data = json.loads(result)
670 | assert result_data["success"] is True
671 |
672 | # Verify the API call was made with just the name
673 | call_args_list = mock_api.call_args_list
674 | assert len(call_args_list) >= 1
675 |
676 | first_call = call_args_list[0]
677 | creative_data = first_call[0][2] # params is the third argument
678 |
679 | assert "name" in creative_data
680 | assert creative_data["name"] == "Updated Creative Name"
681 | # Should not have asset_feed_spec since no dynamic content
682 | assert "asset_feed_spec" not in creative_data
683 |
684 | async def test_update_ad_creative_message_only(self):
685 | """Test updating just the creative message."""
686 |
687 | sample_creative_data = {
688 | "id": "123456789",
689 | "name": "Test Creative",
690 | "status": "ACTIVE"
691 | }
692 |
693 | with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
694 | mock_api.return_value = sample_creative_data
695 |
696 | result = await update_ad_creative(
697 | access_token="test_token",
698 | creative_id="123456789",
699 | message="Updated message text"
700 | )
701 |
702 | result_data = json.loads(result)
703 | assert result_data["success"] is True
704 |
705 | # Verify the API call was made with object_story_spec
706 | call_args_list = mock_api.call_args_list
707 | assert len(call_args_list) >= 1
708 |
709 | first_call = call_args_list[0]
710 | creative_data = first_call[0][2] # params is the third argument
711 |
712 | assert "object_story_spec" in creative_data
713 | assert creative_data["object_story_spec"]["link_data"]["message"] == "Updated message text"
714 |
715 | async def test_update_ad_creative_cta_with_dynamic_content(self):
716 | """Test updating call_to_action_type with dynamic creative content."""
717 |
718 | sample_creative_data = {
719 | "id": "123456789",
720 | "name": "Test Creative",
721 | "status": "ACTIVE"
722 | }
723 |
724 | with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
725 | mock_api.return_value = sample_creative_data
726 |
727 | result = await update_ad_creative(
728 | access_token="test_token",
729 | creative_id="123456789",
730 | headlines=["Test Headline"],
731 | call_to_action_type="SHOP_NOW"
732 | )
733 |
734 | result_data = json.loads(result)
735 | assert result_data["success"] is True
736 |
737 | # Verify CTA is added to asset_feed_spec when dynamic content exists
738 | call_args_list = mock_api.call_args_list
739 | assert len(call_args_list) >= 1
740 |
741 | first_call = call_args_list[0]
742 | creative_data = first_call[0][2] # params is the third argument
743 |
744 | assert "asset_feed_spec" in creative_data
745 | assert "call_to_action_types" in creative_data["asset_feed_spec"]
746 | assert creative_data["asset_feed_spec"]["call_to_action_types"] == ["SHOP_NOW"]
747 |
748 | async def test_update_ad_creative_cta_without_dynamic_content(self):
749 | """Test updating call_to_action_type without dynamic creative content."""
750 |
751 | sample_creative_data = {
752 | "id": "123456789",
753 | "name": "Test Creative",
754 | "status": "ACTIVE"
755 | }
756 |
757 | with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
758 | mock_api.return_value = sample_creative_data
759 |
760 | result = await update_ad_creative(
761 | access_token="test_token",
762 | creative_id="123456789",
763 | call_to_action_type="LEARN_MORE"
764 | )
765 |
766 | result_data = json.loads(result)
767 | assert result_data["success"] is True
768 |
769 | # Verify CTA is added to object_story_spec when no dynamic content
770 | call_args_list = mock_api.call_args_list
771 | assert len(call_args_list) >= 1
772 |
773 | first_call = call_args_list[0]
774 | creative_data = first_call[0][2] # params is the third argument
775 |
776 | assert "object_story_spec" in creative_data
777 | assert "call_to_action" in creative_data["object_story_spec"]["link_data"]
778 | assert creative_data["object_story_spec"]["link_data"]["call_to_action"]["type"] == "LEARN_MORE"
779 |
780 | async def test_update_ad_creative_combined_updates(self):
781 | """Test updating multiple parameters at once."""
782 |
783 | sample_creative_data = {
784 | "id": "123456789",
785 | "name": "Multi-Updated Creative",
786 | "status": "ACTIVE"
787 | }
788 |
789 | with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
790 | mock_api.return_value = sample_creative_data
791 |
792 | result = await update_ad_creative(
793 | access_token="test_token",
794 | creative_id="123456789",
795 | name="Multi-Updated Creative",
796 | message="Updated message",
797 | headlines=["New Headline 1", "New Headline 2"],
798 | descriptions=["New Description 1"],
799 | call_to_action_type="SIGN_UP"
800 | )
801 |
802 | result_data = json.loads(result)
803 | assert result_data["success"] is True
804 |
805 | # Verify all updates are included
806 | call_args_list = mock_api.call_args_list
807 | assert len(call_args_list) >= 1
808 |
809 | first_call = call_args_list[0]
810 | creative_data = first_call[0][2] # params is the third argument
811 |
812 | # Should have name
813 | assert creative_data["name"] == "Multi-Updated Creative"
814 |
815 | # Should have asset_feed_spec with all dynamic content
816 | assert "asset_feed_spec" in creative_data
817 | assert "headlines" in creative_data["asset_feed_spec"]
818 | assert "descriptions" in creative_data["asset_feed_spec"]
819 | assert "primary_texts" in creative_data["asset_feed_spec"]
820 | assert "call_to_action_types" in creative_data["asset_feed_spec"]
821 |
822 | # Verify content
823 | assert creative_data["asset_feed_spec"]["headlines"] == [
824 | {"text": "New Headline 1"}, {"text": "New Headline 2"}
825 | ]
826 | assert creative_data["asset_feed_spec"]["descriptions"] == [
827 | {"text": "New Description 1"}
828 | ]
829 | assert creative_data["asset_feed_spec"]["primary_texts"] == [
830 | {"text": "Updated message"}
831 | ]
832 | assert creative_data["asset_feed_spec"]["call_to_action_types"] == ["SIGN_UP"]
833 |
834 | async def test_update_ad_creative_api_error_handling(self):
835 | """Test error handling when API request fails."""
836 |
837 | with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
838 | mock_api.side_effect = Exception("API request failed")
839 |
840 | result = await update_ad_creative(
841 | access_token="test_token",
842 | creative_id="123456789",
843 | name="Test Creative"
844 | )
845 |
846 | result_data = json.loads(result)
847 | # The error might be wrapped in a 'data' field
848 | if "data" in result_data:
849 | error_data = json.loads(result_data["data"])
850 | assert "error" in error_data
851 | assert error_data["error"] == "Failed to update ad creative"
852 | assert "details" in error_data
853 | assert "update_data_sent" in error_data
854 | else:
855 | assert "error" in result_data
856 | assert result_data["error"] == "Failed to update ad creative"
857 | assert "details" in result_data
858 | assert "update_data_sent" in result_data
859 |
```
--------------------------------------------------------------------------------
/tests/test_duplication_regression.py:
--------------------------------------------------------------------------------
```python
1 | """Comprehensive regression tests for duplication module."""
2 |
3 | import os
4 | import json
5 | import pytest
6 | import httpx
7 | from unittest.mock import patch, AsyncMock, MagicMock
8 | import importlib
9 |
10 |
11 | class TestDuplicationFeatureToggle:
12 | """Test feature toggle functionality to prevent regression."""
13 |
14 | def test_feature_disabled_by_default(self):
15 | """Ensure duplication is disabled by default."""
16 | with patch.dict(os.environ, {}, clear=True):
17 | # Force reload to pick up environment changes
18 | import importlib
19 | from meta_ads_mcp.core import duplication
20 | importlib.reload(duplication)
21 |
22 | assert not duplication.ENABLE_DUPLICATION
23 | # Note: Functions may persist in module namespace due to previous test runs
24 | # The important thing is that ENABLE_DUPLICATION flag is False
25 |
26 | def test_feature_enabled_with_env_var(self):
27 | """Ensure duplication is enabled when environment variable is set."""
28 | with patch.dict(os.environ, {"META_ADS_ENABLE_DUPLICATION": "1"}):
29 | # Force reload to pick up environment changes
30 | import importlib
31 | from meta_ads_mcp.core import duplication
32 | importlib.reload(duplication)
33 |
34 | assert duplication.ENABLE_DUPLICATION
35 | assert hasattr(duplication, 'duplicate_campaign')
36 | assert hasattr(duplication, 'duplicate_adset')
37 | assert hasattr(duplication, 'duplicate_ad')
38 | assert hasattr(duplication, 'duplicate_creative')
39 |
40 | def test_feature_enabled_with_various_truthy_values(self):
41 | """Test that various truthy values enable the feature."""
42 | truthy_values = ["1", "true", "TRUE", "yes", "YES", "on", "ON", "enabled"]
43 |
44 | for value in truthy_values:
45 | with patch.dict(os.environ, {"META_ADS_ENABLE_DUPLICATION": value}):
46 | import importlib
47 | from meta_ads_mcp.core import duplication
48 | importlib.reload(duplication)
49 |
50 | assert duplication.ENABLE_DUPLICATION, f"Value '{value}' should enable the feature"
51 |
52 | def test_feature_disabled_with_empty_string(self):
53 | """Test that empty string disables the feature."""
54 | with patch.dict(os.environ, {"META_ADS_ENABLE_DUPLICATION": ""}):
55 | import importlib
56 | from meta_ads_mcp.core import duplication
57 | importlib.reload(duplication)
58 |
59 | assert not duplication.ENABLE_DUPLICATION
60 |
61 |
62 | class TestDuplicationDecorators:
63 | """Test that decorators are applied correctly to prevent regression."""
64 |
65 | @pytest.fixture(autouse=True)
66 | def enable_feature(self):
67 | """Enable the duplication feature for these tests."""
68 | with patch.dict(os.environ, {"META_ADS_ENABLE_DUPLICATION": "1"}):
69 | import importlib
70 | from meta_ads_mcp.core import duplication
71 | importlib.reload(duplication)
72 | yield duplication
73 |
74 | def test_functions_have_meta_api_tool_decorator(self, enable_feature):
75 | """Ensure all duplication functions have @meta_api_tool decorator."""
76 | duplication = enable_feature
77 |
78 | functions = ['duplicate_campaign', 'duplicate_adset', 'duplicate_ad', 'duplicate_creative']
79 |
80 | for func_name in functions:
81 | func = getattr(duplication, func_name)
82 |
83 | # Check that function has been wrapped by meta_api_tool
84 | # The meta_api_tool decorator should add access token handling
85 | assert callable(func), f"{func_name} should be callable"
86 |
87 | # Check function signature includes access_token parameter
88 | import inspect
89 | sig = inspect.signature(func)
90 | assert 'access_token' in sig.parameters, f"{func_name} should have access_token parameter"
91 | assert sig.parameters['access_token'].default is None, f"{func_name} access_token should default to None"
92 |
93 | @pytest.mark.asyncio
94 | async def test_functions_are_mcp_tools(self, enable_feature):
95 | """Ensure all duplication functions are registered as MCP tools."""
96 | # This test ensures the @mcp_server.tool() decorator is working
97 | from meta_ads_mcp.core.server import mcp_server
98 |
99 | # Get all registered tool names (list_tools is async)
100 | tools = await mcp_server.list_tools()
101 | tool_names = [tool.name for tool in tools]
102 |
103 | expected_tools = ['duplicate_campaign', 'duplicate_adset', 'duplicate_ad', 'duplicate_creative']
104 |
105 | for tool_name in expected_tools:
106 | assert tool_name in tool_names, f"{tool_name} should be registered as an MCP tool"
107 |
108 |
109 | class TestDuplicationAPIContract:
110 | """Test API contract to prevent regression in external API calls."""
111 |
112 | @pytest.fixture(autouse=True)
113 | def enable_feature(self):
114 | """Enable the duplication feature for these tests."""
115 | with patch.dict(os.environ, {"META_ADS_ENABLE_DUPLICATION": "1"}):
116 | import importlib
117 | from meta_ads_mcp.core import duplication
118 | importlib.reload(duplication)
119 | yield duplication
120 |
121 | @pytest.mark.asyncio
122 | async def test_api_endpoint_construction(self, enable_feature):
123 | """Test that API endpoints are constructed correctly."""
124 | duplication = enable_feature
125 |
126 | test_cases = [
127 | ("campaign", "123456789", "https://mcp.pipeboard.co/api/meta/duplicate/campaign/123456789"),
128 | ("adset", "987654321", "https://mcp.pipeboard.co/api/meta/duplicate/adset/987654321"),
129 | ("ad", "555666777", "https://mcp.pipeboard.co/api/meta/duplicate/ad/555666777"),
130 | ("creative", "111222333", "https://mcp.pipeboard.co/api/meta/duplicate/creative/111222333"),
131 | ]
132 |
133 | for resource_type, resource_id, expected_url in test_cases:
134 | # Mock dual-header authentication
135 | with patch("meta_ads_mcp.core.duplication.FastMCPAuthIntegration") as mock_auth:
136 | mock_auth.get_pipeboard_token.return_value = "pipeboard_token"
137 | mock_auth.get_auth_token.return_value = "facebook_token"
138 |
139 | with patch("meta_ads_mcp.core.duplication.httpx.AsyncClient") as mock_client:
140 | mock_response = AsyncMock()
141 | mock_response.status_code = 200
142 | mock_response.json.return_value = {"success": True}
143 | mock_client.return_value.__aenter__.return_value.post.return_value = mock_response
144 |
145 | await duplication._forward_duplication_request(
146 | resource_type, resource_id, "test_token", {}
147 | )
148 |
149 | # Verify the correct URL was called
150 | call_args = mock_client.return_value.__aenter__.return_value.post.call_args
151 | actual_url = call_args[0][0]
152 | assert actual_url == expected_url, f"Expected {expected_url}, got {actual_url}"
153 |
154 | @pytest.mark.asyncio
155 | async def test_request_headers_format(self, enable_feature):
156 | """Test that request headers are formatted correctly."""
157 | duplication = enable_feature
158 |
159 | # Mock dual-header authentication
160 | with patch("meta_ads_mcp.core.duplication.FastMCPAuthIntegration") as mock_auth:
161 | mock_auth.get_pipeboard_token.return_value = "pipeboard_token_12345"
162 | mock_auth.get_auth_token.return_value = "facebook_token_67890"
163 |
164 | with patch("meta_ads_mcp.core.duplication.httpx.AsyncClient") as mock_client:
165 | mock_response = AsyncMock()
166 | mock_response.status_code = 200
167 | mock_response.json.return_value = {"success": True}
168 | mock_client.return_value.__aenter__.return_value.post.return_value = mock_response
169 |
170 | await duplication._forward_duplication_request(
171 | "campaign", "123456789", "test_token_12345", {"name_suffix": " - Test"}
172 | )
173 |
174 | # Verify dual headers are sent correctly
175 | call_args = mock_client.return_value.__aenter__.return_value.post.call_args
176 | headers = call_args[1]["headers"]
177 |
178 | # Check the dual-header authentication pattern
179 | assert headers["Authorization"] == "Bearer facebook_token_67890" # Facebook token for Meta API
180 | assert headers["X-Pipeboard-Token"] == "pipeboard_token_12345" # Pipeboard token for auth
181 | assert headers["Content-Type"] == "application/json"
182 | assert headers["User-Agent"] == "meta-ads-mcp/1.0"
183 |
184 | @pytest.mark.asyncio
185 | async def test_request_timeout_configuration(self, enable_feature):
186 | """Test that request timeout is configured correctly."""
187 | duplication = enable_feature
188 |
189 | # Mock dual-header authentication
190 | with patch("meta_ads_mcp.core.duplication.FastMCPAuthIntegration") as mock_auth:
191 | mock_auth.get_pipeboard_token.return_value = "pipeboard_token"
192 | mock_auth.get_auth_token.return_value = "facebook_token"
193 |
194 | with patch("meta_ads_mcp.core.duplication.httpx.AsyncClient") as mock_client:
195 | mock_response = AsyncMock()
196 | mock_response.status_code = 200
197 | mock_response.json.return_value = {"success": True}
198 | mock_client.return_value.__aenter__.return_value.post.return_value = mock_response
199 |
200 | await duplication._forward_duplication_request(
201 | "campaign", "123456789", "test_token", {}
202 | )
203 |
204 | # Verify timeout is set to 30 seconds
205 | mock_client.assert_called_once_with(timeout=30.0)
206 |
207 |
208 | class TestDuplicationErrorHandling:
209 | """Test error handling to prevent regression in error scenarios."""
210 |
211 | @pytest.fixture(autouse=True)
212 | def enable_feature(self):
213 | """Enable the duplication feature for these tests."""
214 | with patch.dict(os.environ, {"META_ADS_ENABLE_DUPLICATION": "1"}):
215 | import importlib
216 | from meta_ads_mcp.core import duplication
217 | importlib.reload(duplication)
218 | yield duplication
219 |
220 | @pytest.mark.asyncio
221 | async def test_missing_access_token_error(self, enable_feature):
222 | """Test error handling when authentication tokens are missing."""
223 | duplication = enable_feature
224 |
225 | # Test missing Pipeboard token (primary authentication failure)
226 | with patch("meta_ads_mcp.core.duplication.FastMCPAuthIntegration") as mock_auth:
227 | mock_auth.get_pipeboard_token.return_value = None # No Pipeboard token
228 | mock_auth.get_auth_token.return_value = "facebook_token" # Has Facebook token
229 |
230 | result = await duplication._forward_duplication_request("campaign", "123", None, {})
231 | result_json = json.loads(result)
232 |
233 | assert result_json["error"] == "authentication_required"
234 | assert "Pipeboard API token not found" in result_json["message"]
235 |
236 | # Test missing Facebook token (secondary authentication failure)
237 | with patch("meta_ads_mcp.core.duplication.FastMCPAuthIntegration") as mock_auth:
238 | mock_auth.get_pipeboard_token.return_value = "pipeboard_token" # Has Pipeboard token
239 | mock_auth.get_auth_token.return_value = None # No Facebook token
240 |
241 | with patch("meta_ads_mcp.core.auth.get_current_access_token") as mock_get_token:
242 | mock_get_token.return_value = None # No fallback token
243 |
244 | result = await duplication._forward_duplication_request("campaign", "123", None, {})
245 | result_json = json.loads(result)
246 |
247 | assert result_json["error"] == "authentication_required"
248 | assert "Meta Ads access token not found" in result_json["message"]
249 |
250 | @pytest.mark.asyncio
251 | async def test_http_status_code_handling(self, enable_feature):
252 | """Test handling of various HTTP status codes."""
253 | duplication = enable_feature
254 |
255 | status_code_tests = [
256 | (200, "success_response", "json"),
257 | (400, "validation_failed", "error"),
258 | (401, "authentication_error", "error"),
259 | (402, "subscription_required", "error"),
260 | (403, "facebook_connection_required", "error"),
261 | (404, "resource_not_found", "error"),
262 | (429, "rate_limit_exceeded", "error"),
263 | (502, "meta_api_error", "error"),
264 | (500, "duplication_failed", "error"),
265 | ]
266 |
267 | for status_code, expected_error_type, response_type in status_code_tests:
268 | with patch("meta_ads_mcp.core.duplication.httpx.AsyncClient") as mock_client, \
269 | patch("meta_ads_mcp.core.duplication.FastMCPAuthIntegration.get_pipeboard_token", return_value="test_pipeboard_token"), \
270 | patch("meta_ads_mcp.core.duplication.FastMCPAuthIntegration.get_auth_token", return_value="test_facebook_token"):
271 | # Use MagicMock instead of AsyncMock for more predictable behavior
272 | mock_response = MagicMock()
273 | mock_response.status_code = status_code
274 |
275 | if status_code == 200:
276 | mock_response.json.return_value = {"success": True, "id": "new_123"}
277 | elif status_code == 400:
278 | mock_response.json.return_value = {"errors": ["Invalid parameter"], "warnings": []}
279 | elif status_code == 401:
280 | mock_response.json.side_effect = Exception("No JSON")
281 | mock_response.text = "Unauthorized"
282 | elif status_code == 402:
283 | mock_response.json.return_value = {
284 | "message": "This feature is not available in your current plan",
285 | "upgrade_url": "https://pipeboard.co/upgrade",
286 | "suggestion": "Please upgrade your account to access this feature"
287 | }
288 | elif status_code == 403:
289 | mock_response.json.return_value = {
290 | "message": "You need to connect your Facebook account first",
291 | "details": {
292 | "login_flow_url": "/connections",
293 | "auth_flow_url": "/api/meta/auth"
294 | }
295 | }
296 | elif status_code == 404:
297 | mock_response.json.side_effect = Exception("No JSON")
298 | mock_response.text = "Not Found"
299 | elif status_code == 429:
300 | mock_response.headers.get.return_value = "60"
301 | mock_response.json.side_effect = Exception("No JSON")
302 | mock_response.text = "Rate limited"
303 | elif status_code == 502:
304 | mock_response.json.return_value = {"message": "Facebook API error"}
305 | else:
306 | mock_response.json.side_effect = Exception("No JSON")
307 | mock_response.text = f"Error {status_code}"
308 |
309 | mock_client.return_value.__aenter__.return_value.post.return_value = mock_response
310 |
311 | result = await duplication._forward_duplication_request(
312 | "campaign", "123", "token", {}
313 | )
314 | result_json = json.loads(result)
315 |
316 | if response_type == "error":
317 | if status_code == 401:
318 | assert result_json["error"] == expected_error_type
319 | elif status_code == 403:
320 | assert result_json["error"] == expected_error_type
321 | elif status_code == 400:
322 | assert result_json["error"] == expected_error_type
323 | elif status_code == 404:
324 | assert result_json["error"] == expected_error_type
325 | elif status_code == 502:
326 | assert result_json["error"] == expected_error_type
327 | else:
328 | assert result_json["error"] == expected_error_type
329 | else:
330 | assert "success" in result_json or "id" in result_json
331 |
332 | @pytest.mark.asyncio
333 | async def test_network_error_handling(self, enable_feature):
334 | """Test handling of network errors."""
335 | duplication = enable_feature
336 |
337 | network_errors = [
338 | (httpx.TimeoutException("Timeout"), "request_timeout"),
339 | (httpx.RequestError("Connection failed"), "network_error"),
340 | (Exception("Unexpected error"), "unexpected_error"),
341 | ]
342 |
343 | for exception, expected_error in network_errors:
344 | with patch("meta_ads_mcp.core.duplication.httpx.AsyncClient") as mock_client, \
345 | patch("meta_ads_mcp.core.duplication.FastMCPAuthIntegration.get_pipeboard_token", return_value="test_pipeboard_token"), \
346 | patch("meta_ads_mcp.core.duplication.FastMCPAuthIntegration.get_auth_token", return_value="test_facebook_token"):
347 | mock_client.return_value.__aenter__.return_value.post.side_effect = exception
348 |
349 | result = await duplication._forward_duplication_request(
350 | "campaign", "123", "token", {}
351 | )
352 | result_json = json.loads(result)
353 |
354 | assert result_json["error"] == expected_error
355 |
356 |
357 | class TestDuplicationParameterHandling:
358 | """Test parameter handling to prevent regression in data processing."""
359 |
360 | @pytest.fixture(autouse=True)
361 | def enable_feature(self):
362 | """Enable the duplication feature for these tests."""
363 | with patch.dict(os.environ, {"META_ADS_ENABLE_DUPLICATION": "1"}):
364 | import importlib
365 | from meta_ads_mcp.core import duplication
366 | importlib.reload(duplication)
367 | yield duplication
368 |
369 | @pytest.mark.asyncio
370 | async def test_none_values_filtered_from_options(self, enable_feature):
371 | """Test that None values are filtered from options."""
372 | duplication = enable_feature
373 |
374 | # Mock dual-header authentication
375 | with patch("meta_ads_mcp.core.duplication.FastMCPAuthIntegration") as mock_auth:
376 | mock_auth.get_pipeboard_token.return_value = "pipeboard_token"
377 | mock_auth.get_auth_token.return_value = "facebook_token"
378 |
379 | with patch("meta_ads_mcp.core.duplication.httpx.AsyncClient") as mock_client:
380 | mock_response = AsyncMock()
381 | mock_response.status_code = 200
382 | mock_response.json.return_value = {"success": True}
383 | mock_client.return_value.__aenter__.return_value.post.return_value = mock_response
384 |
385 | # Test with options containing None values
386 | options_with_none = {
387 | "name_suffix": " - Test",
388 | "new_daily_budget": None,
389 | "new_status": "PAUSED",
390 | "new_headline": None,
391 | }
392 |
393 | await duplication._forward_duplication_request(
394 | "campaign", "123", "token", options_with_none
395 | )
396 |
397 | # Verify None values were filtered out
398 | call_args = mock_client.return_value.__aenter__.return_value.post.call_args
399 | json_payload = call_args[1]["json"]
400 |
401 | assert "name_suffix" in json_payload
402 | assert "new_status" in json_payload
403 | assert "new_daily_budget" not in json_payload
404 | assert "new_headline" not in json_payload
405 |
406 | @pytest.mark.asyncio
407 | async def test_campaign_duplication_parameter_forwarding(self, enable_feature):
408 | """Test that campaign duplication forwards all parameters correctly."""
409 | duplication = enable_feature
410 |
411 | with patch("meta_ads_mcp.core.duplication._forward_duplication_request") as mock_forward:
412 | mock_forward.return_value = '{"success": true}'
413 |
414 | # Test with all parameters
415 | result = await duplication.duplicate_campaign(
416 | campaign_id="123456789",
417 | access_token="test_token",
418 | name_suffix=" - New Copy",
419 | include_ad_sets=False,
420 | include_ads=True,
421 | include_creatives=False,
422 | copy_schedule=True,
423 | new_daily_budget=100.50,
424 | new_status="ACTIVE"
425 | )
426 |
427 | # Verify parameters were forwarded correctly
428 | mock_forward.assert_called_once_with(
429 | "campaign",
430 | "123456789",
431 | "test_token",
432 | {
433 | "name_suffix": " - New Copy",
434 | "include_ad_sets": False,
435 | "include_ads": True,
436 | "include_creatives": False,
437 | "copy_schedule": True,
438 | "new_daily_budget": 100.50,
439 | "new_status": "ACTIVE"
440 | }
441 | )
442 |
443 | @pytest.mark.asyncio
444 | async def test_adset_duplication_parameter_forwarding(self, enable_feature):
445 | """Test that ad set duplication forwards all parameters correctly including new_targeting."""
446 | duplication = enable_feature
447 |
448 | with patch("meta_ads_mcp.core.duplication._forward_duplication_request") as mock_forward:
449 | mock_forward.return_value = '{"success": true}'
450 |
451 | # Test with all parameters including new_targeting
452 | result = await duplication.duplicate_adset(
453 | adset_id="987654321",
454 | access_token="test_token",
455 | target_campaign_id="campaign_123",
456 | name_suffix=" - Targeted Copy",
457 | include_ads=False,
458 | include_creatives=True,
459 | new_daily_budget=200.00,
460 | new_targeting={
461 | "age_min": 25,
462 | "age_max": 45,
463 | "geo_locations": {
464 | "countries": ["US", "CA"]
465 | }
466 | },
467 | new_status="ACTIVE"
468 | )
469 |
470 | # Verify parameters were forwarded correctly
471 | mock_forward.assert_called_once_with(
472 | "adset",
473 | "987654321",
474 | "test_token",
475 | {
476 | "target_campaign_id": "campaign_123",
477 | "name_suffix": " - Targeted Copy",
478 | "include_ads": False,
479 | "include_creatives": True,
480 | "new_daily_budget": 200.00,
481 | "new_targeting": {
482 | "age_min": 25,
483 | "age_max": 45,
484 | "geo_locations": {
485 | "countries": ["US", "CA"]
486 | }
487 | },
488 | "new_status": "ACTIVE"
489 | }
490 | )
491 |
492 | def test_estimated_components_calculation(self, enable_feature):
493 | """Test that estimated components are calculated correctly."""
494 | duplication = enable_feature
495 |
496 | test_cases = [
497 | # Campaign with all components
498 | ("campaign", {"include_ad_sets": True, "include_ads": True, "include_creatives": True},
499 | {"campaigns": 1, "ad_sets": "3-5 (estimated)", "ads": "5-15 (estimated)", "creatives": "5-15 (estimated)"}),
500 |
501 | # Campaign with no sub-components
502 | ("campaign", {"include_ad_sets": False, "include_ads": False, "include_creatives": False},
503 | {"campaigns": 1}),
504 |
505 | # Ad set with ads
506 | ("adset", {"include_ads": True, "include_creatives": True},
507 | {"ad_sets": 1, "ads": "2-5 (estimated)", "creatives": "2-5 (estimated)"}),
508 |
509 | # Ad set without ads
510 | ("adset", {"include_ads": False, "include_creatives": False},
511 | {"ad_sets": 1}),
512 |
513 | # Single ad with creative
514 | ("ad", {"duplicate_creative": True},
515 | {"ads": 1, "creatives": 1}),
516 |
517 | # Single ad without creative
518 | ("ad", {"duplicate_creative": False},
519 | {"ads": 1}),
520 |
521 | # Single creative
522 | ("creative", {},
523 | {"creatives": 1}),
524 | ]
525 |
526 | for resource_type, options, expected in test_cases:
527 | result = duplication._get_estimated_components(resource_type, options)
528 | assert result == expected, f"Failed for {resource_type} with {options}"
529 |
530 |
531 | class TestDuplicationIntegration:
532 | """Integration tests to prevent regression in end-to-end functionality."""
533 |
534 | @pytest.fixture(autouse=True)
535 | def enable_feature(self):
536 | """Enable the duplication feature for these tests."""
537 | with patch.dict(os.environ, {"META_ADS_ENABLE_DUPLICATION": "1"}):
538 | import importlib
539 | from meta_ads_mcp.core import duplication
540 | importlib.reload(duplication)
541 | yield duplication
542 |
543 | @pytest.mark.asyncio
544 | async def test_end_to_end_successful_duplication(self, enable_feature):
545 | """Test complete successful duplication flow."""
546 | duplication = enable_feature
547 |
548 | # Mock the auth system completely to bypass the @meta_api_tool decorator checks
549 | with patch("meta_ads_mcp.core.duplication.FastMCPAuthIntegration") as mock_auth_integration:
550 | # Mock dual authentication tokens
551 | mock_auth_integration.get_pipeboard_token.return_value = "pipeboard_token"
552 | mock_auth_integration.get_auth_token.return_value = "facebook_token"
553 |
554 | with patch("meta_ads_mcp.core.duplication.httpx.AsyncClient") as mock_client:
555 | # Mock successful response
556 | mock_response = MagicMock()
557 | mock_response.status_code = 200
558 | mock_response.json.return_value = {
559 | "success": True,
560 | "original_campaign_id": "123456789",
561 | "new_campaign_id": "987654321",
562 | "duplicated_components": {
563 | "campaign": {"id": "987654321", "name": "Test Campaign - Copy"},
564 | "ad_sets": [{"id": "111", "name": "Ad Set 1 - Copy"}],
565 | "ads": [{"id": "222", "name": "Ad 1 - Copy"}],
566 | "creatives": [{"id": "333", "name": "Creative 1 - Copy"}]
567 | },
568 | "warnings": [],
569 | "subscription": {
570 | "status": "active"
571 | }
572 | }
573 | mock_client.return_value.__aenter__.return_value.post.return_value = mock_response
574 |
575 | # Call the function with explicit token
576 | result = await duplication.duplicate_campaign(
577 | campaign_id="123456789",
578 | access_token="facebook_token", # Use the mocked token
579 | name_suffix=" - Test Copy"
580 | )
581 |
582 | # Verify result - handle @meta_api_tool wrapper
583 | result_json = json.loads(result)
584 | if "data" in result_json:
585 | actual_result = json.loads(result_json["data"])
586 | else:
587 | actual_result = result_json
588 |
589 | assert actual_result["success"] is True
590 | assert actual_result["new_campaign_id"] == "987654321"
591 | assert "duplicated_components" in actual_result
592 |
593 | @pytest.mark.asyncio
594 | async def test_facebook_connection_error_flow(self, enable_feature):
595 | """Test Facebook connection required error flow."""
596 | duplication = enable_feature
597 |
598 | # Mock the auth system completely to bypass the @meta_api_tool decorator checks
599 | with patch("meta_ads_mcp.core.duplication.FastMCPAuthIntegration") as mock_auth_integration:
600 | # Mock dual authentication tokens
601 | mock_auth_integration.get_pipeboard_token.return_value = "pipeboard_token"
602 | mock_auth_integration.get_auth_token.return_value = "facebook_token"
603 |
604 | with patch("meta_ads_mcp.core.duplication.httpx.AsyncClient") as mock_client:
605 | # Mock 403 response (Facebook connection required)
606 | mock_response = MagicMock()
607 | mock_response.status_code = 403
608 | mock_response.json.return_value = {
609 | "message": "You need to connect your Facebook account first",
610 | "details": {
611 | "login_flow_url": "/connections",
612 | "auth_flow_url": "/api/meta/auth"
613 | }
614 | }
615 | mock_client.return_value.__aenter__.return_value.post.return_value = mock_response
616 |
617 | result = await duplication.duplicate_campaign(
618 | campaign_id="123456789",
619 | access_token="facebook_token" # Use the mocked token
620 | )
621 |
622 | # The @meta_api_tool decorator wraps the result in a data field
623 | result_json = json.loads(result)
624 | if "data" in result_json:
625 | actual_result = json.loads(result_json["data"])
626 | else:
627 | actual_result = result_json
628 |
629 | assert actual_result["success"] is False
630 | assert actual_result["error"] == "facebook_connection_required"
631 | assert actual_result["message"] == "You need to connect your Facebook account first"
632 | assert "details" in actual_result
633 | assert actual_result["details"]["login_flow_url"] == "/connections"
634 |
635 | @pytest.mark.asyncio
636 | async def test_subscription_required_error_flow(self, enable_feature):
637 | """Test subscription required error flow."""
638 | duplication = enable_feature
639 |
640 | # Mock the auth system completely to bypass the @meta_api_tool decorator checks
641 | with patch("meta_ads_mcp.core.duplication.FastMCPAuthIntegration") as mock_auth_integration:
642 | # Mock dual authentication tokens
643 | mock_auth_integration.get_pipeboard_token.return_value = "pipeboard_token"
644 | mock_auth_integration.get_auth_token.return_value = "facebook_token"
645 |
646 | with patch("meta_ads_mcp.core.duplication.httpx.AsyncClient") as mock_client:
647 | # Mock 402 response (subscription required)
648 | mock_response = MagicMock()
649 | mock_response.status_code = 402
650 | mock_response.json.return_value = {
651 | "message": "This feature is not available in your current plan",
652 | "upgrade_url": "https://pipeboard.co/upgrade",
653 | "suggestion": "Please upgrade your account to access this feature"
654 | }
655 | mock_client.return_value.__aenter__.return_value.post.return_value = mock_response
656 |
657 | result = await duplication.duplicate_campaign(
658 | campaign_id="123456789",
659 | access_token="facebook_token" # Use the mocked token
660 | )
661 |
662 | # The @meta_api_tool decorator wraps the result in a data field
663 | result_json = json.loads(result)
664 | if "data" in result_json:
665 | actual_result = json.loads(result_json["data"])
666 | else:
667 | actual_result = result_json
668 |
669 | assert actual_result["success"] is False
670 | assert actual_result["error"] == "subscription_required"
671 | assert actual_result["message"] == "This feature is not available in your current plan"
672 | assert actual_result["upgrade_url"] == "https://pipeboard.co/upgrade"
673 | assert actual_result["suggestion"] == "Please upgrade your account to access this feature"
674 |
675 |
676 | class TestDuplicationTokenHandling:
677 | """Test access token handling to prevent auth regression."""
678 |
679 | @pytest.fixture(autouse=True)
680 | def enable_feature(self):
681 | """Enable the duplication feature for these tests."""
682 | with patch.dict(os.environ, {"META_ADS_ENABLE_DUPLICATION": "1"}):
683 | import importlib
684 | from meta_ads_mcp.core import duplication
685 | importlib.reload(duplication)
686 | yield duplication
687 |
688 | @pytest.mark.asyncio
689 | async def test_meta_api_tool_decorator_token_handling(self, enable_feature):
690 | """Test that @meta_api_tool decorator properly handles explicit tokens."""
691 | duplication = enable_feature
692 |
693 | # Test with explicit token - this should bypass auth system entirely
694 | with patch("meta_ads_mcp.core.duplication._forward_duplication_request") as mock_forward:
695 | mock_forward.return_value = '{"success": true}'
696 |
697 | # Call with explicit access_token
698 | await duplication.duplicate_campaign(
699 | campaign_id="123456789",
700 | access_token="explicit_token_12345"
701 | )
702 |
703 | # Verify the explicit token was passed through
704 | mock_forward.assert_called_once()
705 | call_args = mock_forward.call_args[0]
706 | assert call_args[2] == "explicit_token_12345" # access_token is 3rd argument
707 |
708 | @pytest.mark.asyncio
709 | async def test_explicit_token_overrides_injection(self, enable_feature):
710 | """Test that explicit token overrides auto-injection."""
711 | duplication = enable_feature
712 |
713 | with patch("meta_ads_mcp.core.auth.get_current_access_token") as mock_get_token:
714 | mock_get_token.return_value = "injected_token"
715 |
716 | with patch("meta_ads_mcp.core.duplication._forward_duplication_request") as mock_forward:
717 | mock_forward.return_value = '{"success": true}'
718 |
719 | # Call with explicit access_token
720 | await duplication.duplicate_campaign(
721 | campaign_id="123456789",
722 | access_token="explicit_token_12345"
723 | )
724 |
725 | # Verify the explicit token was used, not the injected one
726 | mock_forward.assert_called_once()
727 | call_args = mock_forward.call_args[0]
728 | assert call_args[2] == "explicit_token_12345" # access_token is 3rd argument
729 |
730 |
731 | class TestDuplicationRegressionEdgeCases:
732 | """Test edge cases that could cause regressions."""
733 |
734 | @pytest.fixture(autouse=True)
735 | def enable_feature(self):
736 | """Enable the duplication feature for these tests."""
737 | with patch.dict(os.environ, {"META_ADS_ENABLE_DUPLICATION": "1"}):
738 | import importlib
739 | from meta_ads_mcp.core import duplication
740 | importlib.reload(duplication)
741 | yield duplication
742 |
743 | @pytest.mark.asyncio
744 | async def test_empty_string_parameters(self, enable_feature):
745 | """Test handling of empty string parameters."""
746 | duplication = enable_feature
747 |
748 | with patch("meta_ads_mcp.core.duplication._forward_duplication_request") as mock_forward:
749 | mock_forward.return_value = '{"success": true}'
750 |
751 | # Test with empty strings
752 | await duplication.duplicate_campaign(
753 | campaign_id="123456789",
754 | access_token="token",
755 | name_suffix="", # Empty string
756 | new_status="" # Empty string
757 | )
758 |
759 | # Verify empty strings are preserved (not filtered like None)
760 | call_args = mock_forward.call_args[0]
761 | options = call_args[3]
762 | assert options["name_suffix"] == ""
763 | assert options["new_status"] == ""
764 |
765 | @pytest.mark.asyncio
766 | async def test_unicode_parameters(self, enable_feature):
767 | """Test handling of unicode parameters."""
768 | duplication = enable_feature
769 |
770 | # Mock dual-header authentication
771 | with patch("meta_ads_mcp.core.duplication.FastMCPAuthIntegration") as mock_auth:
772 | mock_auth.get_pipeboard_token.return_value = "pipeboard_token"
773 | mock_auth.get_auth_token.return_value = "facebook_token"
774 |
775 | with patch("meta_ads_mcp.core.duplication.httpx.AsyncClient") as mock_client:
776 | mock_response = AsyncMock()
777 | mock_response.status_code = 200
778 | mock_response.json.return_value = {"success": True}
779 | mock_client.return_value.__aenter__.return_value.post.return_value = mock_response
780 |
781 | # Test with unicode characters
782 | unicode_suffix = " - 复制版本 🚀"
783 | await duplication._forward_duplication_request(
784 | "campaign", "123", "token", {"name_suffix": unicode_suffix}
785 | )
786 |
787 | # Verify unicode is preserved in the request
788 | call_args = mock_client.return_value.__aenter__.return_value.post.call_args
789 | json_payload = call_args[1]["json"]
790 | assert json_payload["name_suffix"] == unicode_suffix
791 |
792 | @pytest.mark.asyncio
793 | async def test_large_parameter_values(self, enable_feature):
794 | """Test handling of large parameter values."""
795 | duplication = enable_feature
796 |
797 | # Mock dual-header authentication
798 | with patch("meta_ads_mcp.core.duplication.FastMCPAuthIntegration") as mock_auth:
799 | mock_auth.get_pipeboard_token.return_value = "pipeboard_token"
800 | mock_auth.get_auth_token.return_value = "facebook_token"
801 |
802 | with patch("meta_ads_mcp.core.duplication.httpx.AsyncClient") as mock_client:
803 | mock_response = AsyncMock()
804 | mock_response.status_code = 200
805 | mock_response.json.return_value = {"success": True}
806 | mock_client.return_value.__aenter__.return_value.post.return_value = mock_response
807 |
808 | # Test with very large budget value
809 | large_budget = 999999999.99
810 | await duplication._forward_duplication_request(
811 | "campaign", "123", "token", {"new_daily_budget": large_budget}
812 | )
813 |
814 | # Verify large values are preserved
815 | call_args = mock_client.return_value.__aenter__.return_value.post.call_args
816 | json_payload = call_args[1]["json"]
817 | assert json_payload["new_daily_budget"] == large_budget
818 |
819 | def test_module_reload_safety(self):
820 | """Test that module can be safely reloaded without side effects."""
821 | # This tests for common issues like global state pollution
822 |
823 | # Enable feature
824 | with patch.dict(os.environ, {"META_ADS_ENABLE_DUPLICATION": "1"}):
825 | import importlib
826 | from meta_ads_mcp.core import duplication
827 | importlib.reload(duplication)
828 |
829 | assert duplication.ENABLE_DUPLICATION
830 | assert hasattr(duplication, 'duplicate_campaign')
831 |
832 | # Disable feature and reload
833 | with patch.dict(os.environ, {"META_ADS_ENABLE_DUPLICATION": ""}):
834 | importlib.reload(duplication)
835 |
836 | assert not duplication.ENABLE_DUPLICATION
837 | # Note: Functions may still exist in the module namespace due to Python's
838 | # module loading behavior, but they won't be registered as MCP tools
839 | # This is expected behavior and not a problem for the feature toggle
840 |
841 | # Re-enable feature and reload
842 | with patch.dict(os.environ, {"META_ADS_ENABLE_DUPLICATION": "1"}):
843 | importlib.reload(duplication)
844 |
845 | assert duplication.ENABLE_DUPLICATION
846 | assert hasattr(duplication, 'duplicate_campaign')
```