This is page 3 of 6. Use http://codebase.md/nictuku/meta-ads-mcp?lines=true&page={x} to view the full context.
# Directory Structure
```
├── .github
│ └── workflows
│ ├── publish-mcp.yml
│ ├── publish.yml
│ └── test.yml
├── .gitignore
├── .python-version
├── .uv.toml
├── CUSTOM_META_APP.md
├── Dockerfile
├── examples
│ ├── example_http_client.py
│ └── README.md
├── future_improvements.md
├── images
│ └── meta-ads-example.png
├── LICENSE
├── LOCAL_INSTALLATION.md
├── meta_ads_auth.sh
├── meta_ads_mcp
│ ├── __init__.py
│ ├── __main__.py
│ └── core
│ ├── __init__.py
│ ├── accounts.py
│ ├── ads_library.py
│ ├── ads.py
│ ├── adsets.py
│ ├── api.py
│ ├── auth.py
│ ├── authentication.py
│ ├── budget_schedules.py
│ ├── callback_server.py
│ ├── campaigns.py
│ ├── duplication.py
│ ├── http_auth_integration.py
│ ├── insights.py
│ ├── openai_deep_research.py
│ ├── pipeboard_auth.py
│ ├── reports.py
│ ├── resources.py
│ ├── server.py
│ ├── targeting.py
│ └── utils.py
├── META_API_NOTES.md
├── poetry.lock
├── pyproject.toml
├── README.md
├── RELEASE.md
├── requirements.txt
├── server.json
├── setup.py
├── smithery.yaml
├── STREAMABLE_HTTP_SETUP.md
└── tests
├── __init__.py
├── conftest.py
├── e2e_account_info_search_issue.py
├── README_REGRESSION_TESTS.md
├── README.md
├── test_account_info_access_fix.py
├── test_account_search.py
├── test_budget_update_e2e.py
├── test_budget_update.py
├── test_create_ad_creative_simple.py
├── test_create_simple_creative_e2e.py
├── test_dsa_beneficiary.py
├── test_dsa_integration.py
├── test_duplication_regression.py
├── test_duplication.py
├── test_dynamic_creatives.py
├── test_estimate_audience_size_e2e.py
├── test_estimate_audience_size.py
├── test_get_account_pages.py
├── test_get_ad_creatives_fix.py
├── test_get_ad_image_quality_improvements.py
├── test_get_ad_image_regression.py
├── test_http_transport.py
├── test_insights_actions_and_values_e2e.py
├── test_insights_pagination.py
├── test_integration_openai_mcp.py
├── test_is_dynamic_creative_adset.py
├── test_mobile_app_adset_creation.py
├── test_mobile_app_adset_issue.py
├── test_openai_mcp_deep_research.py
├── test_openai.py
├── test_page_discovery_integration.py
├── test_page_discovery.py
├── test_targeting_search_e2e.py
├── test_targeting.py
├── test_update_ad_creative_id.py
└── test_upload_ad_image.py
```
# Files
--------------------------------------------------------------------------------
/tests/test_targeting.py:
--------------------------------------------------------------------------------
```python
1 | #!/usr/bin/env python3
2 | """
3 | Unit tests for targeting search functionality in Meta Ads MCP.
4 | """
5 |
6 | import pytest
7 | import json
8 | from unittest.mock import AsyncMock, patch
9 |
10 | from meta_ads_mcp.core.targeting import (
11 | search_interests,
12 | get_interest_suggestions,
13 | estimate_audience_size,
14 | search_behaviors,
15 | search_demographics,
16 | search_geo_locations
17 | )
18 |
19 |
20 | class TestSearchInterests:
21 | """Test cases for search_interests function"""
22 |
23 | @pytest.mark.asyncio
24 | async def test_search_interests_success(self):
25 | """Test successful interest search"""
26 | mock_response = {
27 | "data": [
28 | {
29 | "id": "6003139266461",
30 | "name": "Movies",
31 | "audience_size": 1234567890,
32 | "path": ["Entertainment", "Movies"]
33 | },
34 | {
35 | "id": "6003397425735",
36 | "name": "Tennis",
37 | "audience_size": 987654321,
38 | "path": ["Sports", "Tennis"]
39 | }
40 | ]
41 | }
42 |
43 | with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
44 | mock_api.return_value = mock_response
45 |
46 | result = await search_interests(access_token="test_token", query="movies", limit=10)
47 |
48 | # Verify API call
49 | mock_api.assert_called_once_with(
50 | "search",
51 | "test_token",
52 | {
53 | "type": "adinterest",
54 | "q": "movies",
55 | "limit": 10
56 | }
57 | )
58 |
59 | # Verify response
60 | result_data = json.loads(result)
61 | assert result_data == mock_response
62 | assert len(result_data["data"]) == 2
63 | assert result_data["data"][0]["name"] == "Movies"
64 |
65 | @pytest.mark.asyncio
66 | async def test_search_interests_no_query(self):
67 | """Test search_interests with empty query parameter"""
68 | result = await search_interests(
69 | query="", # Now provide the required parameter but with empty value
70 | access_token="test_token"
71 | )
72 |
73 | result_data = json.loads(result)
74 | # The @meta_api_tool decorator wraps errors in a 'data' field
75 | assert "data" in result_data
76 | nested_data = json.loads(result_data["data"])
77 | assert "error" in nested_data
78 | assert nested_data["error"] == "No search query provided"
79 |
80 | @pytest.mark.asyncio
81 | async def test_search_interests_default_limit(self):
82 | """Test search_interests with default limit"""
83 | mock_response = {"data": []}
84 |
85 | # Mock both the API request and the auth system to bypass decorator issues
86 | with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
87 | with patch('meta_ads_mcp.core.auth.get_current_access_token') as mock_auth:
88 | mock_auth.return_value = "test_token"
89 | mock_api.return_value = mock_response
90 |
91 | result = await search_interests(query="test")
92 |
93 | # Verify default limit is used
94 | mock_api.assert_called_once_with(
95 | "search",
96 | "test_token",
97 | {
98 | "type": "adinterest",
99 | "q": "test",
100 | "limit": 25
101 | }
102 | )
103 |
104 | # Verify the result is properly formatted
105 | result_data = json.loads(result)
106 | assert "data" in result_data
107 |
108 |
109 | class TestGetInterestSuggestions:
110 | """Test cases for get_interest_suggestions function"""
111 |
112 | @pytest.mark.asyncio
113 | async def test_get_interest_suggestions_success(self):
114 | """Test successful interest suggestions"""
115 | mock_response = {
116 | "data": [
117 | {
118 | "id": "6003022269556",
119 | "name": "Rugby football",
120 | "audience_size": 13214830,
121 | "path": [],
122 | "description": None
123 | },
124 | {
125 | "id": "6003146664949",
126 | "name": "Netball",
127 | "audience_size": 4333770,
128 | "path": [],
129 | "description": None
130 | }
131 | ]
132 | }
133 |
134 | with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
135 | mock_api.return_value = mock_response
136 |
137 | result = await get_interest_suggestions(
138 | access_token="test_token",
139 | interest_list=["Basketball", "Soccer"],
140 | limit=15
141 | )
142 |
143 | # Verify API call
144 | mock_api.assert_called_once_with(
145 | "search",
146 | "test_token",
147 | {
148 | "type": "adinterestsuggestion",
149 | "interest_list": '["Basketball", "Soccer"]',
150 | "limit": 15
151 | }
152 | )
153 |
154 | # Verify response
155 | result_data = json.loads(result)
156 | assert result_data == mock_response
157 | assert len(result_data["data"]) == 2
158 |
159 | @pytest.mark.asyncio
160 | async def test_get_interest_suggestions_no_list(self):
161 | """Test get_interest_suggestions with empty interest list"""
162 | result = await get_interest_suggestions(
163 | interest_list=[], # Now provide the required parameter but with empty value
164 | access_token="test_token"
165 | )
166 |
167 | result_data = json.loads(result)
168 | # The @meta_api_tool decorator wraps errors in a 'data' field
169 | assert "data" in result_data
170 | nested_data = json.loads(result_data["data"])
171 | assert "error" in nested_data
172 | assert nested_data["error"] == "No interest list provided"
173 |
174 |
175 | class TestEstimateAudienceSizeBackwardsCompatibility:
176 | """Test cases for estimate_audience_size function backwards compatibility"""
177 |
178 | @pytest.mark.asyncio
179 | async def test_validate_interests_by_name_success(self):
180 | """Test successful interest validation by name"""
181 | mock_response = {
182 | "data": [
183 | {
184 | "name": "Japan",
185 | "valid": True,
186 | "id": 6003700426513,
187 | "audience_size": 68310258
188 | },
189 | {
190 | "name": "nonexistantkeyword",
191 | "valid": False
192 | }
193 | ]
194 | }
195 |
196 | with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
197 | mock_api.return_value = mock_response
198 |
199 | result = await estimate_audience_size(
200 | access_token="test_token",
201 | interest_list=["Japan", "nonexistantkeyword"]
202 | )
203 |
204 | # Verify API call
205 | mock_api.assert_called_once_with(
206 | "search",
207 | "test_token",
208 | {
209 | "type": "adinterestvalid",
210 | "interest_list": '["Japan", "nonexistantkeyword"]'
211 | }
212 | )
213 |
214 | # Verify response
215 | result_data = json.loads(result)
216 | assert result_data == mock_response
217 | assert result_data["data"][0]["valid"] is True
218 | assert result_data["data"][1]["valid"] is False
219 |
220 | @pytest.mark.asyncio
221 | async def test_validate_interests_by_fbid_success(self):
222 | """Test successful interest validation by FBID"""
223 | mock_response = {
224 | "data": [
225 | {
226 | "id": "6003700426513",
227 | "valid": True,
228 | "audience_size": 68310258
229 | }
230 | ]
231 | }
232 |
233 | with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
234 | mock_api.return_value = mock_response
235 |
236 | result = await estimate_audience_size(
237 | access_token="test_token",
238 | interest_fbid_list=["6003700426513"]
239 | )
240 |
241 | # Verify API call
242 | mock_api.assert_called_once_with(
243 | "search",
244 | "test_token",
245 | {
246 | "type": "adinterestvalid",
247 | "interest_fbid_list": '["6003700426513"]'
248 | }
249 | )
250 |
251 | # Verify response
252 | result_data = json.loads(result)
253 | assert result_data == mock_response
254 |
255 | @pytest.mark.asyncio
256 | async def test_validate_interests_no_input(self):
257 | """Test estimate_audience_size with no input lists (backwards compatibility)"""
258 | result = await estimate_audience_size(access_token="test_token")
259 |
260 | result_data = json.loads(result)
261 | # The @meta_api_tool decorator wraps errors in a 'data' field
262 | assert "data" in result_data
263 | nested_data = json.loads(result_data["data"])
264 | assert "error" in nested_data
265 | assert nested_data["error"] == "No interest list or FBID list provided"
266 |
267 |
268 | class TestSearchBehaviors:
269 | """Test cases for search_behaviors function"""
270 |
271 | @pytest.mark.asyncio
272 | async def test_search_behaviors_success(self):
273 | """Test successful behavior search"""
274 | mock_response = {
275 | "data": [
276 | {
277 | "id": 6007101597783,
278 | "name": "Business Travelers",
279 | "audience_size_lower_bound": 1000000,
280 | "audience_size_upper_bound": 2000000,
281 | "path": ["Travel", "Business Travel"],
282 | "description": "People who travel for business",
283 | "type": "behaviors"
284 | }
285 | ]
286 | }
287 |
288 | with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
289 | mock_api.return_value = mock_response
290 |
291 | result = await search_behaviors(access_token="test_token", limit=25)
292 |
293 | # Verify API call
294 | mock_api.assert_called_once_with(
295 | "search",
296 | "test_token",
297 | {
298 | "type": "adTargetingCategory",
299 | "class": "behaviors",
300 | "limit": 25
301 | }
302 | )
303 |
304 | # Verify response
305 | result_data = json.loads(result)
306 | assert result_data == mock_response
307 |
308 |
309 | class TestSearchDemographics:
310 | """Test cases for search_demographics function"""
311 |
312 | @pytest.mark.asyncio
313 | async def test_search_demographics_success(self):
314 | """Test successful demographics search"""
315 | mock_response = {
316 | "data": [
317 | {
318 | "id": 6015559470583,
319 | "name": "Parents (All)",
320 | "audience_size_lower_bound": 500000000,
321 | "audience_size_upper_bound": 750000000,
322 | "path": ["Family", "Parents"],
323 | "description": "Parents of children of any age",
324 | "type": "demographics"
325 | }
326 | ]
327 | }
328 |
329 | with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
330 | mock_api.return_value = mock_response
331 |
332 | result = await search_demographics(
333 | access_token="test_token",
334 | demographic_class="life_events",
335 | limit=30
336 | )
337 |
338 | # Verify API call
339 | mock_api.assert_called_once_with(
340 | "search",
341 | "test_token",
342 | {
343 | "type": "adTargetingCategory",
344 | "class": "life_events",
345 | "limit": 30
346 | }
347 | )
348 |
349 | # Verify response
350 | result_data = json.loads(result)
351 | assert result_data == mock_response
352 |
353 |
354 | class TestSearchGeoLocations:
355 | """Test cases for search_geo_locations function"""
356 |
357 | @pytest.mark.asyncio
358 | async def test_search_geo_locations_success(self):
359 | """Test successful geo location search"""
360 | mock_response = {
361 | "data": [
362 | {
363 | "key": "US",
364 | "name": "United States",
365 | "type": "country",
366 | "supports_city": True,
367 | "supports_region": True
368 | },
369 | {
370 | "key": "3847",
371 | "name": "California",
372 | "type": "region",
373 | "country_code": "US",
374 | "country_name": "United States",
375 | "supports_city": True,
376 | "supports_region": True
377 | }
378 | ]
379 | }
380 |
381 | with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
382 | mock_api.return_value = mock_response
383 |
384 | result = await search_geo_locations(
385 | access_token="test_token",
386 | query="United States",
387 | location_types=["country", "region"],
388 | limit=10
389 | )
390 |
391 | # Verify API call
392 | mock_api.assert_called_once_with(
393 | "search",
394 | "test_token",
395 | {
396 | "type": "adgeolocation",
397 | "q": "United States",
398 | "location_types": '["country", "region"]',
399 | "limit": 10
400 | }
401 | )
402 |
403 | # Verify response
404 | result_data = json.loads(result)
405 | assert result_data == mock_response
406 |
407 | @pytest.mark.asyncio
408 | async def test_search_geo_locations_no_query(self):
409 | """Test search_geo_locations with empty query"""
410 | result = await search_geo_locations(
411 | query="", # Now provide the required parameter but with empty value
412 | access_token="test_token"
413 | )
414 |
415 | result_data = json.loads(result)
416 | # The @meta_api_tool decorator wraps errors in a 'data' field
417 | assert "data" in result_data
418 | nested_data = json.loads(result_data["data"])
419 | assert "error" in nested_data
420 | assert nested_data["error"] == "No search query provided"
421 |
422 | @pytest.mark.asyncio
423 | async def test_search_geo_locations_no_location_types(self):
424 | """Test search_geo_locations without location_types filter"""
425 | mock_response = {"data": []}
426 |
427 | # Mock both the API request and the auth system to bypass decorator issues
428 | with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
429 | with patch('meta_ads_mcp.core.auth.get_current_access_token') as mock_auth:
430 | mock_auth.return_value = "test_token"
431 | mock_api.return_value = mock_response
432 |
433 | result = await search_geo_locations(query="test")
434 |
435 | # Verify API call doesn't include location_types
436 | mock_api.assert_called_once_with(
437 | "search",
438 | "test_token",
439 | {
440 | "type": "adgeolocation",
441 | "q": "test",
442 | "limit": 25
443 | }
444 | )
445 |
446 | # Verify the result is properly formatted
447 | result_data = json.loads(result)
448 | assert "data" in result_data
```
--------------------------------------------------------------------------------
/meta_ads_mcp/core/adsets.py:
--------------------------------------------------------------------------------
```python
1 | """Ad Set-related functionality for Meta Ads API."""
2 |
3 | import json
4 | from typing import Optional, Dict, Any, List
5 | from .api import meta_api_tool, make_api_request
6 | from .accounts import get_ad_accounts
7 | from .server import mcp_server
8 |
9 |
10 | @mcp_server.tool()
11 | @meta_api_tool
12 | async def get_adsets(account_id: str, access_token: Optional[str] = None, limit: int = 10, campaign_id: str = "") -> str:
13 | """
14 | Get ad sets for a Meta Ads account with optional filtering by campaign.
15 |
16 | Args:
17 | account_id: Meta Ads account ID (format: act_XXXXXXXXX)
18 | access_token: Meta API access token (optional - will use cached token if not provided)
19 | limit: Maximum number of ad sets to return (default: 10)
20 | campaign_id: Optional campaign ID to filter by
21 | """
22 | # Require explicit account_id
23 | if not account_id:
24 | return json.dumps({"error": "No account ID specified"}, indent=2)
25 |
26 | # Change endpoint based on whether campaign_id is provided
27 | if campaign_id:
28 | endpoint = f"{campaign_id}/adsets"
29 | params = {
30 | "fields": "id,name,campaign_id,status,daily_budget,lifetime_budget,targeting,bid_amount,bid_strategy,optimization_goal,billing_event,start_time,end_time,created_time,updated_time,is_dynamic_creative,frequency_control_specs{event,interval_days,max_frequency}",
31 | "limit": limit
32 | }
33 | else:
34 | # Use account endpoint if no campaign_id is given
35 | endpoint = f"{account_id}/adsets"
36 | params = {
37 | "fields": "id,name,campaign_id,status,daily_budget,lifetime_budget,targeting,bid_amount,bid_strategy,optimization_goal,billing_event,start_time,end_time,created_time,updated_time,is_dynamic_creative,frequency_control_specs{event,interval_days,max_frequency}",
38 | "limit": limit
39 | }
40 | # Note: Removed the attempt to add campaign_id to params for the account endpoint case,
41 | # as it was ineffective and the logic now uses the correct endpoint for campaign filtering.
42 |
43 | data = await make_api_request(endpoint, access_token, params)
44 |
45 | return json.dumps(data, indent=2)
46 |
47 |
48 | @mcp_server.tool()
49 | @meta_api_tool
50 | async def get_adset_details(adset_id: str, access_token: Optional[str] = None) -> str:
51 | """
52 | Get detailed information about a specific ad set.
53 |
54 | Args:
55 | adset_id: Meta Ads ad set ID
56 | access_token: Meta API access token (optional - will use cached token if not provided)
57 |
58 | Example:
59 | To call this function through MCP, pass the adset_id as the first argument:
60 | {
61 | "args": "YOUR_ADSET_ID"
62 | }
63 | """
64 | if not adset_id:
65 | return json.dumps({"error": "No ad set ID provided"}, indent=2)
66 |
67 | endpoint = f"{adset_id}"
68 | # Explicitly prioritize frequency_control_specs in the fields request
69 | params = {
70 | "fields": "id,name,campaign_id,status,frequency_control_specs{event,interval_days,max_frequency},daily_budget,lifetime_budget,targeting,bid_amount,bid_strategy,optimization_goal,billing_event,start_time,end_time,created_time,updated_time,attribution_spec,destination_type,promoted_object,pacing_type,budget_remaining,dsa_beneficiary,is_dynamic_creative"
71 | }
72 |
73 | data = await make_api_request(endpoint, access_token, params)
74 |
75 | # For debugging - check if frequency_control_specs was returned
76 | if 'frequency_control_specs' not in data:
77 | data['_meta'] = {
78 | 'note': 'No frequency_control_specs field was returned by the API. This means either no frequency caps are set or the API did not include this field in the response.'
79 | }
80 |
81 | return json.dumps(data, indent=2)
82 |
83 |
84 | @mcp_server.tool()
85 | @meta_api_tool
86 | async def create_adset(
87 | account_id: str,
88 | campaign_id: str,
89 | name: str,
90 | optimization_goal: str,
91 | billing_event: str,
92 | status: str = "PAUSED",
93 | daily_budget: Optional[int] = None,
94 | lifetime_budget: Optional[int] = None,
95 | targeting: Optional[Dict[str, Any]] = None,
96 | bid_amount: Optional[int] = None,
97 | bid_strategy: Optional[str] = None,
98 | start_time: Optional[str] = None,
99 | end_time: Optional[str] = None,
100 | dsa_beneficiary: Optional[str] = None,
101 | promoted_object: Optional[Dict[str, Any]] = None,
102 | destination_type: Optional[str] = None,
103 | is_dynamic_creative: Optional[bool] = None,
104 | access_token: Optional[str] = None
105 | ) -> str:
106 | """
107 | Create a new ad set in a Meta Ads account.
108 |
109 | Args:
110 | account_id: Meta Ads account ID (format: act_XXXXXXXXX)
111 | campaign_id: Meta Ads campaign ID this ad set belongs to
112 | name: Ad set name
113 | optimization_goal: Conversion optimization goal (e.g., 'LINK_CLICKS', 'REACH', 'CONVERSIONS', 'APP_INSTALLS')
114 | billing_event: How you're charged (e.g., 'IMPRESSIONS', 'LINK_CLICKS')
115 | status: Initial ad set status (default: PAUSED)
116 | daily_budget: Daily budget in account currency (in cents) as a string
117 | lifetime_budget: Lifetime budget in account currency (in cents) as a string
118 | targeting: Targeting specifications including age, location, interests, etc.
119 | Use targeting_automation.advantage_audience=1 for automatic audience finding
120 | bid_amount: Bid amount in account currency (in cents)
121 | bid_strategy: Bid strategy (e.g., 'LOWEST_COST', 'LOWEST_COST_WITH_BID_CAP')
122 | start_time: Start time in ISO 8601 format (e.g., '2023-12-01T12:00:00-0800')
123 | end_time: End time in ISO 8601 format
124 | dsa_beneficiary: DSA beneficiary (person/organization benefiting from ads) for European compliance
125 | promoted_object: Mobile app configuration for APP_INSTALLS campaigns. Required fields: application_id, object_store_url.
126 | Optional fields: custom_event_type, pixel_id, page_id.
127 | Example: {"application_id": "123456789012345", "object_store_url": "https://apps.apple.com/app/id123456789"}
128 | destination_type: Where users are directed after clicking the ad (e.g., 'APP_STORE', 'DEEPLINK', 'APP_INSTALL', 'ON_AD').
129 | Required for mobile app campaigns and lead generation campaigns.
130 | Use 'ON_AD' for lead generation campaigns where user interaction happens within the ad.
131 | is_dynamic_creative: Enable Dynamic Creative for this ad set (required when using dynamic creatives with asset_feed_spec/dynamic_creative_spec).
132 | access_token: Meta API access token (optional - will use cached token if not provided)
133 | """
134 | # Check required parameters
135 | if not account_id:
136 | return json.dumps({"error": "No account ID provided"}, indent=2)
137 |
138 | if not campaign_id:
139 | return json.dumps({"error": "No campaign ID provided"}, indent=2)
140 |
141 | if not name:
142 | return json.dumps({"error": "No ad set name provided"}, indent=2)
143 |
144 | if not optimization_goal:
145 | return json.dumps({"error": "No optimization goal provided"}, indent=2)
146 |
147 | if not billing_event:
148 | return json.dumps({"error": "No billing event provided"}, indent=2)
149 |
150 | # Validate mobile app parameters for APP_INSTALLS campaigns
151 | if optimization_goal == "APP_INSTALLS":
152 | if not promoted_object:
153 | return json.dumps({
154 | "error": "promoted_object is required for APP_INSTALLS optimization goal",
155 | "details": "Mobile app campaigns must specify which app is being promoted",
156 | "required_fields": ["application_id", "object_store_url"]
157 | }, indent=2)
158 |
159 | # Validate promoted_object structure
160 | if not isinstance(promoted_object, dict):
161 | return json.dumps({
162 | "error": "promoted_object must be a dictionary",
163 | "example": {"application_id": "123456789012345", "object_store_url": "https://apps.apple.com/app/id123456789"}
164 | }, indent=2)
165 |
166 | # Validate required promoted_object fields
167 | if "application_id" not in promoted_object:
168 | return json.dumps({
169 | "error": "promoted_object missing required field: application_id",
170 | "details": "application_id is the Facebook app ID for your mobile app"
171 | }, indent=2)
172 |
173 | if "object_store_url" not in promoted_object:
174 | return json.dumps({
175 | "error": "promoted_object missing required field: object_store_url",
176 | "details": "object_store_url should be the App Store or Google Play URL for your app"
177 | }, indent=2)
178 |
179 | # Validate store URL format
180 | store_url = promoted_object["object_store_url"]
181 | valid_store_patterns = [
182 | "apps.apple.com", # iOS App Store
183 | "play.google.com", # Google Play Store
184 | "itunes.apple.com" # Alternative iOS format
185 | ]
186 |
187 | if not any(pattern in store_url for pattern in valid_store_patterns):
188 | return json.dumps({
189 | "error": "Invalid object_store_url format",
190 | "details": "URL must be from App Store (apps.apple.com) or Google Play (play.google.com)",
191 | "provided_url": store_url
192 | }, indent=2)
193 |
194 | # Validate destination_type if provided
195 | if destination_type:
196 | valid_destination_types = ["APP_STORE", "DEEPLINK", "APP_INSTALL", "ON_AD"]
197 | if destination_type not in valid_destination_types:
198 | return json.dumps({
199 | "error": f"Invalid destination_type: {destination_type}",
200 | "valid_values": valid_destination_types
201 | }, indent=2)
202 |
203 | # Basic targeting is required if not provided
204 | if not targeting:
205 | targeting = {
206 | "age_min": 18,
207 | "age_max": 65,
208 | "geo_locations": {"countries": ["US"]},
209 | "targeting_automation": {"advantage_audience": 1}
210 | }
211 |
212 | endpoint = f"{account_id}/adsets"
213 |
214 | params = {
215 | "name": name,
216 | "campaign_id": campaign_id,
217 | "status": status,
218 | "optimization_goal": optimization_goal,
219 | "billing_event": billing_event,
220 | "targeting": json.dumps(targeting) # Properly format as JSON string
221 | }
222 |
223 | # Convert budget values to strings if they aren't already
224 | if daily_budget is not None:
225 | params["daily_budget"] = str(daily_budget)
226 |
227 | if lifetime_budget is not None:
228 | params["lifetime_budget"] = str(lifetime_budget)
229 |
230 | # Add other parameters if provided
231 | if bid_amount is not None:
232 | params["bid_amount"] = str(bid_amount)
233 |
234 | if bid_strategy:
235 | params["bid_strategy"] = bid_strategy
236 |
237 | if start_time:
238 | params["start_time"] = start_time
239 |
240 | if end_time:
241 | params["end_time"] = end_time
242 |
243 | # Add DSA beneficiary if provided
244 | if dsa_beneficiary:
245 | params["dsa_beneficiary"] = dsa_beneficiary
246 |
247 | # Add mobile app parameters if provided
248 | if promoted_object:
249 | params["promoted_object"] = json.dumps(promoted_object)
250 |
251 | if destination_type:
252 | params["destination_type"] = destination_type
253 |
254 | # Enable Dynamic Creative if requested
255 | if is_dynamic_creative is not None:
256 | params["is_dynamic_creative"] = "true" if bool(is_dynamic_creative) else "false"
257 |
258 | try:
259 | data = await make_api_request(endpoint, access_token, params, method="POST")
260 | return json.dumps(data, indent=2)
261 | except Exception as e:
262 | error_msg = str(e)
263 |
264 | # Enhanced error handling for DSA beneficiary issues
265 | if "permission" in error_msg.lower() or "insufficient" in error_msg.lower():
266 | return json.dumps({
267 | "error": "Insufficient permissions to set DSA beneficiary. Please ensure you have business_management permissions.",
268 | "details": error_msg,
269 | "params_sent": params,
270 | "permission_required": True
271 | }, indent=2)
272 | elif "dsa_beneficiary" in error_msg.lower() and ("not supported" in error_msg.lower() or "parameter" in error_msg.lower()):
273 | return json.dumps({
274 | "error": "DSA beneficiary parameter not supported in this API version. Please set DSA beneficiary manually in Facebook Ads Manager.",
275 | "details": error_msg,
276 | "params_sent": params,
277 | "manual_setup_required": True
278 | }, indent=2)
279 | elif "benefits from ads" in error_msg or "DSA beneficiary" in error_msg:
280 | return json.dumps({
281 | "error": "DSA beneficiary required for European compliance. Please provide the person or organization that benefits from ads in this ad set.",
282 | "details": error_msg,
283 | "params_sent": params,
284 | "dsa_required": True
285 | }, indent=2)
286 | else:
287 | return json.dumps({
288 | "error": "Failed to create ad set",
289 | "details": error_msg,
290 | "params_sent": params
291 | }, indent=2)
292 |
293 |
294 | @mcp_server.tool()
295 | @meta_api_tool
296 | async def update_adset(adset_id: str, frequency_control_specs: Optional[List[Dict[str, Any]]] = None, bid_strategy: Optional[str] = None,
297 | bid_amount: Optional[int] = None, status: Optional[str] = None, targeting: Optional[Dict[str, Any]] = None,
298 | optimization_goal: Optional[str] = None, daily_budget: Optional[int] = None, lifetime_budget: Optional[int] = None,
299 | is_dynamic_creative: Optional[bool] = None,
300 | access_token: Optional[str] = None) -> str:
301 | """
302 | Update an ad set with new settings including frequency caps and budgets.
303 |
304 | Args:
305 | adset_id: Meta Ads ad set ID
306 | frequency_control_specs: List of frequency control specifications
307 | (e.g. [{"event": "IMPRESSIONS", "interval_days": 7, "max_frequency": 3}])
308 | bid_strategy: Bid strategy (e.g., 'LOWEST_COST_WITH_BID_CAP')
309 | bid_amount: Bid amount in account currency (in cents for USD)
310 | status: Update ad set status (ACTIVE, PAUSED, etc.)
311 | targeting: Complete targeting specifications (will replace existing targeting)
312 | (e.g. {"targeting_automation":{"advantage_audience":1}, "geo_locations": {"countries": ["US"]}})
313 | optimization_goal: Conversion optimization goal (e.g., 'LINK_CLICKS', 'CONVERSIONS', 'APP_INSTALLS', etc.)
314 | daily_budget: Daily budget in account currency (in cents) as a string
315 | lifetime_budget: Lifetime budget in account currency (in cents) as a string
316 | is_dynamic_creative: Enable/disable Dynamic Creative for this ad set.
317 | access_token: Meta API access token (optional - will use cached token if not provided)
318 | """
319 | if not adset_id:
320 | return json.dumps({"error": "No ad set ID provided"}, indent=2)
321 |
322 | params = {}
323 |
324 | if frequency_control_specs is not None:
325 | params['frequency_control_specs'] = frequency_control_specs
326 |
327 | if bid_strategy is not None:
328 | params['bid_strategy'] = bid_strategy
329 |
330 | if bid_amount is not None:
331 | params['bid_amount'] = str(bid_amount)
332 |
333 | if status is not None:
334 | params['status'] = status
335 |
336 | if optimization_goal is not None:
337 | params['optimization_goal'] = optimization_goal
338 |
339 | if targeting is not None:
340 | # Ensure proper JSON encoding for targeting
341 | if isinstance(targeting, dict):
342 | params['targeting'] = json.dumps(targeting)
343 | else:
344 | params['targeting'] = targeting # Already a string
345 |
346 | # Add budget parameters if provided
347 | if daily_budget is not None:
348 | params['daily_budget'] = str(daily_budget)
349 |
350 | if lifetime_budget is not None:
351 | params['lifetime_budget'] = str(lifetime_budget)
352 |
353 | if is_dynamic_creative is not None:
354 | params['is_dynamic_creative'] = "true" if bool(is_dynamic_creative) else "false"
355 |
356 | if not params:
357 | return json.dumps({"error": "No update parameters provided"}, indent=2)
358 |
359 | endpoint = f"{adset_id}"
360 |
361 | try:
362 | # Use POST method for updates as per Meta API documentation
363 | data = await make_api_request(endpoint, access_token, params, method="POST")
364 | return json.dumps(data, indent=2)
365 | except Exception as e:
366 | error_msg = str(e)
367 | # Include adset_id in error for better context
368 | return json.dumps({
369 | "error": f"Failed to update ad set {adset_id}",
370 | "details": error_msg,
371 | "params_sent": params
372 | }, indent=2)
```
--------------------------------------------------------------------------------
/tests/test_openai_mcp_deep_research.py:
--------------------------------------------------------------------------------
```python
1 | #!/usr/bin/env python3
2 | """
3 | OpenAI MCP Deep Research Integration Tests
4 |
5 | This test suite validates the OpenAI MCP specification compliance:
6 | - search tool: Returns list of IDs based on query
7 | - fetch tool: Returns complete record data by ID
8 | - ChatGPT Deep Research compatibility
9 | - Integration with existing authentication
10 |
11 | Usage:
12 | 1. Start the server: python -m meta_ads_mcp --transport streamable-http --port 8080
13 | 2. Run tests: python -m pytest tests/test_openai_mcp_deep_research.py -v
14 |
15 | Or run directly:
16 | python tests/test_openai_mcp_deep_research.py
17 | """
18 |
19 | import requests
20 | import json
21 | import time
22 | import sys
23 | import os
24 | from typing import Dict, Any, Optional, List
25 |
26 | # Load environment variables from .env file
27 | try:
28 | from dotenv import load_dotenv
29 | load_dotenv()
30 | print("✅ Loaded environment variables from .env file")
31 | except ImportError:
32 | print("⚠️ python-dotenv not installed, using system environment variables only")
33 | print(" Install with: pip install python-dotenv")
34 |
35 | # Add project root to path for imports
36 | sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
37 |
38 | class OpenAIMCPTester:
39 | """Test suite for OpenAI MCP Deep Research compatibility"""
40 |
41 | def __init__(self, base_url: str = "http://localhost:8080"):
42 | self.base_url = base_url.rstrip('/')
43 | self.endpoint = f"{self.base_url}/mcp/"
44 | self.request_id = 1
45 |
46 | def _make_request(self, method: str, params: Dict[str, Any] = None,
47 | headers: Dict[str, str] = None) -> Dict[str, Any]:
48 | """Make a JSON-RPC request to the MCP server"""
49 |
50 | # Default headers for MCP protocol with streamable HTTP transport
51 | default_headers = {
52 | "Content-Type": "application/json",
53 | "Accept": "application/json, text/event-stream",
54 | "User-Agent": "OpenAI-MCP-Test-Client/1.0"
55 | }
56 |
57 | if headers:
58 | default_headers.update(headers)
59 |
60 | payload = {
61 | "jsonrpc": "2.0",
62 | "method": method,
63 | "id": self.request_id
64 | }
65 |
66 | if params:
67 | payload["params"] = params
68 |
69 | try:
70 | response = requests.post(
71 | self.endpoint,
72 | headers=default_headers,
73 | json=payload,
74 | timeout=10
75 | )
76 |
77 | self.request_id += 1
78 |
79 | return {
80 | "status_code": response.status_code,
81 | "headers": dict(response.headers),
82 | "json": response.json() if response.status_code == 200 else None,
83 | "text": response.text,
84 | "success": response.status_code == 200
85 | }
86 |
87 | except requests.exceptions.RequestException as e:
88 | return {
89 | "status_code": 0,
90 | "headers": {},
91 | "json": None,
92 | "text": str(e),
93 | "success": False,
94 | "error": str(e)
95 | }
96 |
97 | def test_search_tool_exists(self, auth_headers: Dict[str, str] = None) -> Dict[str, Any]:
98 | """Test that the search tool is available in tools list"""
99 | result = self._make_request("tools/list", {}, auth_headers)
100 |
101 | if not result["success"]:
102 | return {"success": False, "error": "Failed to get tools list"}
103 |
104 | tools = result["json"]["result"].get("tools", [])
105 | search_tool = next((tool for tool in tools if tool["name"] == "search"), None)
106 |
107 | return {
108 | "success": search_tool is not None,
109 | "tool": search_tool,
110 | "all_tools": [tool["name"] for tool in tools]
111 | }
112 |
113 | def test_fetch_tool_exists(self, auth_headers: Dict[str, str] = None) -> Dict[str, Any]:
114 | """Test that the fetch tool is available in tools list"""
115 | result = self._make_request("tools/list", {}, auth_headers)
116 |
117 | if not result["success"]:
118 | return {"success": False, "error": "Failed to get tools list"}
119 |
120 | tools = result["json"]["result"].get("tools", [])
121 | fetch_tool = next((tool for tool in tools if tool["name"] == "fetch"), None)
122 |
123 | return {
124 | "success": fetch_tool is not None,
125 | "tool": fetch_tool,
126 | "all_tools": [tool["name"] for tool in tools]
127 | }
128 |
129 | def test_search_tool_call(self, query: str, auth_headers: Dict[str, str] = None) -> Dict[str, Any]:
130 | """Test calling the search tool with a query"""
131 | result = self._make_request("tools/call", {
132 | "name": "search",
133 | "arguments": {"query": query}
134 | }, auth_headers)
135 |
136 | if not result["success"]:
137 | return {"success": False, "error": result.get("text", "Unknown error")}
138 |
139 | # Parse the tool response
140 | response_data = result["json"]["result"]
141 | content = response_data.get("content", [{}])[0].get("text", "")
142 |
143 | try:
144 | parsed_content = json.loads(content)
145 | ids = parsed_content.get("ids", [])
146 |
147 | return {
148 | "success": True,
149 | "ids": ids,
150 | "raw_content": content,
151 | "id_count": len(ids)
152 | }
153 | except json.JSONDecodeError:
154 | return {
155 | "success": False,
156 | "error": "Search tool did not return valid JSON",
157 | "raw_content": content
158 | }
159 |
160 | def test_fetch_tool_call(self, record_id: str, auth_headers: Dict[str, str] = None) -> Dict[str, Any]:
161 | """Test calling the fetch tool with an ID"""
162 | result = self._make_request("tools/call", {
163 | "name": "fetch",
164 | "arguments": {"id": record_id}
165 | }, auth_headers)
166 |
167 | if not result["success"]:
168 | return {"success": False, "error": result.get("text", "Unknown error")}
169 |
170 | # Parse the tool response
171 | response_data = result["json"]["result"]
172 | content = response_data.get("content", [{}])[0].get("text", "")
173 |
174 | try:
175 | parsed_content = json.loads(content)
176 |
177 | return {
178 | "success": True,
179 | "record": parsed_content,
180 | "raw_content": content,
181 | "has_required_fields": all(field in parsed_content for field in ["id", "title", "text"])
182 | }
183 | except json.JSONDecodeError:
184 | return {
185 | "success": False,
186 | "error": "Fetch tool did not return valid JSON",
187 | "raw_content": content
188 | }
189 |
190 | def test_search_fetch_workflow(self, auth_headers: Dict[str, str] = None) -> Dict[str, Any]:
191 | """Test the complete search->fetch workflow that ChatGPT Deep Research expects"""
192 |
193 | # Step 1: Search for something that will return account IDs
194 | search_result = self.test_search_tool_call("Yves", auth_headers)
195 |
196 | if not search_result["success"]:
197 | return {
198 | "success": False,
199 | "step": "search",
200 | "error": search_result.get("error", "Search failed")
201 | }
202 |
203 | if not search_result["ids"]:
204 | return {
205 | "success": False,
206 | "step": "search",
207 | "error": "Search returned no IDs"
208 | }
209 |
210 | # Step 2: Fetch the first ID
211 | first_id = search_result["ids"][0]
212 | fetch_result = self.test_fetch_tool_call(first_id, auth_headers)
213 |
214 | if not fetch_result["success"]:
215 | return {
216 | "success": False,
217 | "step": "fetch",
218 | "error": fetch_result.get("error", "Fetch failed"),
219 | "searched_id": first_id
220 | }
221 |
222 | return {
223 | "success": True,
224 | "search_ids": search_result["ids"],
225 | "fetched_record": fetch_result["record"],
226 | "workflow_complete": True
227 | }
228 |
229 | def test_openai_specification_compliance(self, auth_headers: Dict[str, str] = None) -> Dict[str, bool]:
230 | """Test compliance with OpenAI's MCP specification for Deep Research"""
231 | results = {}
232 |
233 | print("\n🧪 Testing OpenAI MCP Specification Compliance")
234 | print("="*55)
235 |
236 | # Test 1: Both required tools exist
237 | print("🔍 Checking required tools exist")
238 | search_exists = self.test_search_tool_exists(auth_headers)
239 | fetch_exists = self.test_fetch_tool_exists(auth_headers)
240 |
241 | results["search_tool_exists"] = search_exists["success"]
242 | results["fetch_tool_exists"] = fetch_exists["success"]
243 |
244 | if not search_exists["success"]:
245 | print("❌ Search tool not found")
246 | print(f" Available tools: {search_exists.get('all_tools', [])}")
247 | return results
248 |
249 | if not fetch_exists["success"]:
250 | print("❌ Fetch tool not found")
251 | print(f" Available tools: {fetch_exists.get('all_tools', [])}")
252 | return results
253 |
254 | print("✅ Both search and fetch tools found")
255 |
256 | # Test 2: Search tool returns proper format
257 | print("\n🔍 Testing search tool format")
258 | search_result = self.test_search_tool_call("Yves", auth_headers)
259 | results["search_format_valid"] = search_result["success"]
260 |
261 | if not search_result["success"]:
262 | print(f"❌ Search tool failed: {search_result.get('error', 'Unknown error')}")
263 | return results
264 |
265 | print(f"✅ Search tool returns valid format with {search_result['id_count']} IDs")
266 |
267 | # Test 3: Fetch tool returns proper format
268 | if search_result["ids"]:
269 | print("\n🔍 Testing fetch tool format")
270 | first_id = search_result["ids"][0]
271 | fetch_result = self.test_fetch_tool_call(first_id, auth_headers)
272 | results["fetch_format_valid"] = fetch_result["success"]
273 | results["fetch_has_required_fields"] = fetch_result.get("has_required_fields", False)
274 |
275 | if not fetch_result["success"]:
276 | print(f"❌ Fetch tool failed: {fetch_result.get('error', 'Unknown error')}")
277 | return results
278 |
279 | print("✅ Fetch tool returns valid format")
280 |
281 | if fetch_result["has_required_fields"]:
282 | print("✅ Fetch response includes required fields (id, title, text)")
283 | else:
284 | print("⚠️ Fetch response missing some required fields")
285 | else:
286 | print("⚠️ Cannot test fetch tool - no IDs returned by search")
287 | results["fetch_format_valid"] = False
288 | results["fetch_has_required_fields"] = False
289 |
290 | # Test 4: Complete workflow
291 | print("\n🔍 Testing complete search->fetch workflow")
292 | workflow_result = self.test_search_fetch_workflow(auth_headers)
293 | results["workflow_complete"] = workflow_result["success"]
294 |
295 | if workflow_result["success"]:
296 | print("✅ Complete workflow successful")
297 | else:
298 | print(f"❌ Workflow failed at {workflow_result.get('step', 'unknown')} step")
299 | print(f" Error: {workflow_result.get('error', 'Unknown error')}")
300 |
301 | return results
302 |
303 | def test_page_search_functionality(self, auth_headers: Dict[str, str] = None) -> Dict[str, Any]:
304 | """Test that the search function includes page searching when query mentions pages"""
305 | print("\n🔍 Testing page search functionality")
306 |
307 | # Test 1: Search with page-related query that matches an account name
308 | page_search_result = self.test_search_tool_call("Injury Payouts pages", auth_headers)
309 |
310 | if not page_search_result["success"]:
311 | return {
312 | "success": False,
313 | "error": f"Page search failed: {page_search_result.get('error', 'Unknown error')}"
314 | }
315 |
316 | # Check if page records are included in results
317 | page_ids = [id for id in page_search_result["ids"] if id.startswith("page:")]
318 |
319 | result = {
320 | "success": True,
321 | "page_search_works": len(page_ids) > 0,
322 | "page_ids_found": len(page_ids),
323 | "total_ids": len(page_search_result["ids"]),
324 | "page_ids": page_ids
325 | }
326 |
327 | if len(page_ids) > 0:
328 | print(f"✅ Page search working - found {len(page_ids)} page records")
329 |
330 | # Test 2: Fetch a page record
331 | first_page_id = page_ids[0]
332 | fetch_result = self.test_fetch_tool_call(first_page_id, auth_headers)
333 |
334 | if fetch_result["success"]:
335 | print(f"✅ Page fetch working - retrieved page record: {first_page_id}")
336 | result["page_fetch_works"] = True
337 | result["fetched_page_data"] = fetch_result.get("record", {})
338 | else:
339 | print(f"❌ Page fetch failed: {fetch_result.get('error', 'Unknown error')}")
340 | result["page_fetch_works"] = False
341 | else:
342 | print("⚠️ No page records found in search results")
343 | result["page_fetch_works"] = False
344 |
345 | return result
346 |
347 | def run_openai_compliance_test_suite(self) -> bool:
348 | """Run complete OpenAI MCP compliance test suite"""
349 | print("🚀 OpenAI MCP Deep Research Compliance Test Suite")
350 | print("="*60)
351 |
352 | # Check server availability first
353 | try:
354 | response = requests.get(f"{self.base_url}/", timeout=5)
355 | server_running = response.status_code in [200, 404]
356 | except:
357 | server_running = False
358 |
359 | if not server_running:
360 | print("❌ Server is not running at", self.base_url)
361 | print(" Please start the server with:")
362 | print(" python -m meta_ads_mcp --transport streamable-http --port 8080")
363 | return False
364 |
365 | print("✅ Server is running")
366 |
367 | # Test with no authentication (server handles auth implicitly)
368 | auth_scenarios = [
369 | {
370 | "name": "No Authentication",
371 | "headers": None
372 | }
373 | ]
374 |
375 | all_results = {}
376 |
377 | for scenario in auth_scenarios:
378 | print(f"\n📋 Testing with: {scenario['name']}")
379 | print("-" * 40)
380 |
381 | results = self.test_openai_specification_compliance(scenario["headers"])
382 |
383 | # Add page search test
384 | page_results = self.test_page_search_functionality(scenario["headers"])
385 | results["page_search_functionality"] = page_results.get("page_search_works", False)
386 | results["page_fetch_functionality"] = page_results.get("page_fetch_works", False)
387 |
388 | all_results[scenario["name"]] = results
389 |
390 | # Summary
391 | print("\n🏁 OPENAI MCP COMPLIANCE TEST RESULTS")
392 | print("="*40)
393 |
394 | overall_success = True
395 | for scenario_name, results in all_results.items():
396 | scenario_success = all(results.values()) if results else False
397 | status = "✅ COMPLIANT" if scenario_success else "❌ NON-COMPLIANT"
398 | print(f"{scenario_name}: {status}")
399 |
400 | if not scenario_success and results:
401 | for test_name, test_result in results.items():
402 | if not test_result:
403 | print(f" ❌ {test_name}")
404 |
405 | if not scenario_success:
406 | overall_success = False
407 |
408 | print(f"\n📊 Overall OpenAI MCP Compliance: {'✅ COMPLIANT' if overall_success else '❌ NON-COMPLIANT'}")
409 |
410 | if overall_success:
411 | print("\n🎉 Server is fully compatible with OpenAI's MCP specification!")
412 | print(" • ChatGPT Deep Research: Ready")
413 | print(" • Search tool: Compliant (includes page search)")
414 | print(" • Fetch tool: Compliant")
415 | print(" • Workflow: Complete")
416 | print(" • Page Search: Enhanced")
417 | else:
418 | print("\n⚠️ Server needs updates for OpenAI MCP compliance")
419 | print(" See failed tests above for required changes")
420 |
421 | return overall_success
422 |
423 |
424 | def main():
425 | """Main test execution"""
426 | tester = OpenAIMCPTester()
427 | success = tester.run_openai_compliance_test_suite()
428 | sys.exit(0 if success else 1)
429 |
430 |
431 | if __name__ == "__main__":
432 | main()
```
--------------------------------------------------------------------------------
/meta_ads_mcp/core/api.py:
--------------------------------------------------------------------------------
```python
1 | """Core API functionality for Meta Ads API."""
2 |
3 | from typing import Any, Dict, Optional, Callable
4 | import json
5 | import httpx
6 | import asyncio
7 | import functools
8 | import os
9 | from . import auth
10 | from .auth import needs_authentication, auth_manager, start_callback_server, shutdown_callback_server
11 | from .utils import logger
12 |
13 | # Constants
14 | META_GRAPH_API_VERSION = "v22.0"
15 | META_GRAPH_API_BASE = f"https://graph.facebook.com/{META_GRAPH_API_VERSION}"
16 | USER_AGENT = "meta-ads-mcp/1.0"
17 |
18 | # Log key environment and configuration at startup
19 | logger.info("Core API module initialized")
20 | logger.info(f"Graph API Version: {META_GRAPH_API_VERSION}")
21 | logger.info(f"META_APP_ID env var present: {'Yes' if os.environ.get('META_APP_ID') else 'No'}")
22 |
23 | class GraphAPIError(Exception):
24 | """Exception raised for errors from the Graph API."""
25 | def __init__(self, error_data: Dict[str, Any]):
26 | self.error_data = error_data
27 | self.message = error_data.get('message', 'Unknown Graph API error')
28 | super().__init__(self.message)
29 |
30 | # Log error details
31 | logger.error(f"Graph API Error: {self.message}")
32 | logger.debug(f"Error details: {error_data}")
33 |
34 | # Check if this is an auth error
35 | if "code" in error_data and error_data["code"] in [190, 102, 4]:
36 | # Common auth error codes
37 | logger.warning(f"Auth error detected (code: {error_data['code']}). Invalidating token.")
38 | auth_manager.invalidate_token()
39 |
40 |
41 | async def make_api_request(
42 | endpoint: str,
43 | access_token: str,
44 | params: Optional[Dict[str, Any]] = None,
45 | method: str = "GET"
46 | ) -> Dict[str, Any]:
47 | """
48 | Make a request to the Meta Graph API.
49 |
50 | Args:
51 | endpoint: API endpoint path (without base URL)
52 | access_token: Meta API access token
53 | params: Additional query parameters
54 | method: HTTP method (GET, POST, DELETE)
55 |
56 | Returns:
57 | API response as a dictionary
58 | """
59 | # Validate access token before proceeding
60 | if not access_token:
61 | logger.error("API request attempted with blank access token")
62 | return {
63 | "error": {
64 | "message": "Authentication Required",
65 | "details": "A valid access token is required to access the Meta API",
66 | "action_required": "Please authenticate first"
67 | }
68 | }
69 |
70 | url = f"{META_GRAPH_API_BASE}/{endpoint}"
71 |
72 | headers = {
73 | "User-Agent": USER_AGENT,
74 | }
75 |
76 | request_params = params or {}
77 | request_params["access_token"] = access_token
78 |
79 | # Logging the request (masking token for security)
80 | masked_params = {k: "***TOKEN***" if k == "access_token" else v for k, v in request_params.items()}
81 | logger.debug(f"API Request: {method} {url}")
82 | logger.debug(f"Request params: {masked_params}")
83 |
84 | # Check for app_id in params
85 | app_id = auth_manager.app_id
86 | logger.debug(f"Current app_id from auth_manager: {app_id}")
87 |
88 | async with httpx.AsyncClient() as client:
89 | try:
90 | if method == "GET":
91 | # For GET, JSON-encode dict/list params (e.g., targeting_spec) to proper strings
92 | encoded_params = {}
93 | for key, value in request_params.items():
94 | if isinstance(value, (dict, list)):
95 | encoded_params[key] = json.dumps(value)
96 | else:
97 | encoded_params[key] = value
98 | response = await client.get(url, params=encoded_params, headers=headers, timeout=30.0)
99 | elif method == "POST":
100 | # For Meta API, POST requests need data, not JSON
101 | if 'targeting' in request_params and isinstance(request_params['targeting'], dict):
102 | # Convert targeting dict to string for the API
103 | request_params['targeting'] = json.dumps(request_params['targeting'])
104 |
105 | # Convert lists and dicts to JSON strings
106 | for key, value in request_params.items():
107 | if isinstance(value, (list, dict)):
108 | request_params[key] = json.dumps(value)
109 |
110 | logger.debug(f"POST params (prepared): {masked_params}")
111 | response = await client.post(url, data=request_params, headers=headers, timeout=30.0)
112 | elif method == "DELETE":
113 | response = await client.delete(url, params=request_params, headers=headers, timeout=30.0)
114 | else:
115 | raise ValueError(f"Unsupported HTTP method: {method}")
116 |
117 | response.raise_for_status()
118 | logger.debug(f"API Response status: {response.status_code}")
119 |
120 | # Ensure the response is JSON and return it as a dictionary
121 | try:
122 | return response.json()
123 | except json.JSONDecodeError:
124 | # If not JSON, return text content in a structured format
125 | return {
126 | "text_response": response.text,
127 | "status_code": response.status_code
128 | }
129 |
130 | except httpx.HTTPStatusError as e:
131 | error_info = {}
132 | try:
133 | error_info = e.response.json()
134 | except:
135 | error_info = {"status_code": e.response.status_code, "text": e.response.text}
136 |
137 | logger.error(f"HTTP Error: {e.response.status_code} - {error_info}")
138 |
139 | # Check for authentication errors
140 | if e.response.status_code == 401 or e.response.status_code == 403:
141 | logger.warning("Detected authentication error (401/403)")
142 | auth_manager.invalidate_token()
143 | elif "error" in error_info:
144 | error_obj = error_info.get("error", {})
145 | # Check for specific FB API errors related to auth
146 | if isinstance(error_obj, dict) and error_obj.get("code") in [190, 102, 4, 200, 10]:
147 | logger.warning(f"Detected Facebook API auth error: {error_obj.get('code')}")
148 | # Log more details about app ID related errors
149 | if error_obj.get("code") == 200 and "Provide valid app ID" in error_obj.get("message", ""):
150 | logger.error("Meta API authentication configuration issue")
151 | logger.error(f"Current app_id: {app_id}")
152 | # Provide a clearer error message without the confusing "Provide valid app ID" message
153 | return {
154 | "error": {
155 | "message": "Meta API authentication configuration issue. Please check your app credentials.",
156 | "original_error": error_obj.get("message"),
157 | "code": error_obj.get("code")
158 | }
159 | }
160 | auth_manager.invalidate_token()
161 |
162 | # Include full details for technical users
163 | full_response = {
164 | "headers": dict(e.response.headers),
165 | "status_code": e.response.status_code,
166 | "url": str(e.response.url),
167 | "reason": getattr(e.response, "reason_phrase", "Unknown reason"),
168 | "request_method": e.request.method,
169 | "request_url": str(e.request.url)
170 | }
171 |
172 | # Return a properly structured error object
173 | return {
174 | "error": {
175 | "message": f"HTTP Error: {e.response.status_code}",
176 | "details": error_info,
177 | "full_response": full_response
178 | }
179 | }
180 |
181 | except Exception as e:
182 | logger.error(f"Request Error: {str(e)}")
183 | return {"error": {"message": str(e)}}
184 |
185 |
186 | # Generic wrapper for all Meta API tools
187 | def meta_api_tool(func):
188 | """Decorator for Meta API tools that handles authentication and error handling."""
189 | @functools.wraps(func)
190 | async def wrapper(*args, **kwargs):
191 | try:
192 | # Log function call
193 | logger.debug(f"Function call: {func.__name__}")
194 | logger.debug(f"Args: {args}")
195 | # Log kwargs without sensitive info
196 | safe_kwargs = {k: ('***TOKEN***' if k == 'access_token' else v) for k, v in kwargs.items()}
197 | logger.debug(f"Kwargs: {safe_kwargs}")
198 |
199 | # Log app ID information
200 | app_id = auth_manager.app_id
201 | logger.debug(f"Current app_id: {app_id}")
202 | logger.debug(f"META_APP_ID env var: {os.environ.get('META_APP_ID')}")
203 |
204 | # If access_token is not in kwargs or not kwargs['access_token'], try to get it from auth_manager
205 | if 'access_token' not in kwargs or not kwargs['access_token']:
206 | try:
207 | access_token = await auth.get_current_access_token()
208 | if access_token:
209 | kwargs['access_token'] = access_token
210 | logger.debug("Using access token from auth_manager")
211 | else:
212 | logger.warning("No access token available from auth_manager")
213 | # Add more details about why token might be missing
214 | if (auth_manager.app_id == "YOUR_META_APP_ID" or not auth_manager.app_id) and not auth_manager.use_pipeboard:
215 | logger.error("TOKEN VALIDATION FAILED: No valid app_id configured")
216 | logger.error("Please set META_APP_ID environment variable or configure in your code")
217 | elif auth_manager.use_pipeboard:
218 | logger.error("TOKEN VALIDATION FAILED: Pipeboard authentication enabled but no valid token available")
219 | logger.error("Complete authentication via Pipeboard service or check PIPEBOARD_API_TOKEN")
220 | else:
221 | logger.error("Check logs above for detailed token validation failures")
222 | except Exception as e:
223 | logger.error(f"Error getting access token: {str(e)}")
224 | # Add stack trace for better debugging
225 | import traceback
226 | logger.error(f"Stack trace: {traceback.format_exc()}")
227 |
228 | # Final validation - if we still don't have a valid token, return authentication required
229 | if 'access_token' not in kwargs or not kwargs['access_token']:
230 | logger.warning("No access token available, authentication needed")
231 |
232 | # Add more specific troubleshooting information
233 | auth_url = auth_manager.get_auth_url()
234 | app_id = auth_manager.app_id
235 | using_pipeboard = auth_manager.use_pipeboard
236 |
237 | logger.error("TOKEN VALIDATION SUMMARY:")
238 | logger.error(f"- Current app_id: '{app_id}'")
239 | logger.error(f"- Environment META_APP_ID: '{os.environ.get('META_APP_ID', 'Not set')}'")
240 | logger.error(f"- Pipeboard API token configured: {'Yes' if os.environ.get('PIPEBOARD_API_TOKEN') else 'No'}")
241 | logger.error(f"- Using Pipeboard authentication: {'Yes' if using_pipeboard else 'No'}")
242 |
243 | # Check for common configuration issues - but only if not using Pipeboard
244 | if not using_pipeboard and (app_id == "YOUR_META_APP_ID" or not app_id):
245 | logger.error("ISSUE DETECTED: No valid Meta App ID configured")
246 | logger.error("ACTION REQUIRED: Set META_APP_ID environment variable with a valid App ID")
247 | elif using_pipeboard:
248 | logger.error("ISSUE DETECTED: Pipeboard authentication configured but no valid token available")
249 | logger.error("ACTION REQUIRED: Complete authentication via Pipeboard service")
250 |
251 | # Provide different guidance based on authentication method
252 | if using_pipeboard:
253 | return json.dumps({
254 | "error": {
255 | "message": "Pipeboard Authentication Required",
256 | "details": {
257 | "description": "Your Pipeboard API token is invalid or has expired",
258 | "action_required": "Update your Pipeboard token",
259 | "setup_url": "https://pipeboard.co/setup",
260 | "token_url": "https://pipeboard.co/api-tokens",
261 | "configuration_status": {
262 | "app_id_configured": bool(app_id) and app_id != "YOUR_META_APP_ID",
263 | "pipeboard_enabled": True,
264 | },
265 | "troubleshooting": "Go to https://pipeboard.co/setup to verify your account setup, then visit https://pipeboard.co/api-tokens to obtain a new API token",
266 | "setup_link": "[Verify your Pipeboard account setup](https://pipeboard.co/setup)",
267 | "token_link": "[Get a new Pipeboard API token](https://pipeboard.co/api-tokens)"
268 | }
269 | }
270 | }, indent=2)
271 | else:
272 | return json.dumps({
273 | "error": {
274 | "message": "Authentication Required",
275 | "details": {
276 | "description": "You need to authenticate with the Meta API before using this tool",
277 | "action_required": "Please authenticate first",
278 | "auth_url": auth_url,
279 | "configuration_status": {
280 | "app_id_configured": bool(app_id) and app_id != "YOUR_META_APP_ID",
281 | "pipeboard_enabled": False,
282 | },
283 | "troubleshooting": "Check logs for TOKEN VALIDATION FAILED messages",
284 | "markdown_link": f"[Click here to authenticate with Meta Ads API]({auth_url})"
285 | }
286 | }
287 | }, indent=2)
288 |
289 | # Call the original function
290 | result = await func(*args, **kwargs)
291 |
292 | # If the result is a string (JSON), try to parse it to check for errors
293 | if isinstance(result, str):
294 | try:
295 | result_dict = json.loads(result)
296 | if "error" in result_dict:
297 | logger.error(f"Error in API response: {result_dict['error']}")
298 | # If this is an app ID error, log more details
299 | if isinstance(result_dict.get("details", {}).get("error", {}), dict):
300 | error_obj = result_dict["details"]["error"]
301 | if error_obj.get("code") == 200 and "Provide valid app ID" in error_obj.get("message", ""):
302 | logger.error("Meta API authentication configuration issue")
303 | logger.error(f"Current app_id: {app_id}")
304 | # Replace the confusing error with a more user-friendly one
305 | return json.dumps({
306 | "error": {
307 | "message": "Meta API Configuration Issue",
308 | "details": {
309 | "description": "Your Meta API app is not properly configured",
310 | "action_required": "Check your META_APP_ID environment variable",
311 | "current_app_id": app_id,
312 | "original_error": error_obj.get("message")
313 | }
314 | }
315 | }, indent=2)
316 | except Exception:
317 | # Not JSON or other parsing error, wrap it in a dictionary
318 | return json.dumps({"data": result}, indent=2)
319 |
320 | # If result is already a dictionary, ensure it's properly serialized
321 | if isinstance(result, dict):
322 | return json.dumps(result, indent=2)
323 |
324 | return result
325 | except Exception as e:
326 | logger.error(f"Error in {func.__name__}: {str(e)}")
327 | return json.dumps({"error": str(e)}, indent=2)
328 |
329 | return wrapper
```
--------------------------------------------------------------------------------
/tests/test_get_account_pages.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Tests for the updated get_account_pages function with multi-approach strategy.
3 | """
4 |
5 | import pytest
6 | import json
7 | from unittest.mock import AsyncMock, patch
8 | from meta_ads_mcp.core.ads import get_account_pages
9 |
10 |
11 | class TestGetAccountPages:
12 | """Test the updated get_account_pages function with comprehensive multi-approach testing."""
13 |
14 | @pytest.mark.asyncio
15 | async def test_get_account_pages_multi_approach_success(self):
16 | """Test successful page discovery using multiple approaches."""
17 | # Mock data for different endpoints
18 | mock_user_pages = {
19 | "data": [
20 | {
21 | "id": "111111111",
22 | "name": "Personal Page",
23 | "category": "Personal Blog",
24 | "fan_count": 100
25 | }
26 | ]
27 | }
28 |
29 | mock_client_pages = {
30 | "data": [
31 | {
32 | "id": "222222222",
33 | "name": "Client Page",
34 | "category": "Business",
35 | "fan_count": 500
36 | }
37 | ]
38 | }
39 |
40 | mock_adcreatives = {
41 | "data": [
42 | {
43 | "id": "creative_123",
44 | "object_story_spec": {"page_id": "333333333"}
45 | }
46 | ]
47 | }
48 |
49 | # Mock page details for discovered IDs
50 | mock_page_details = {
51 | "111111111": {
52 | "id": "111111111",
53 | "name": "Personal Page",
54 | "username": "personalpage",
55 | "category": "Personal Blog",
56 | "fan_count": 100,
57 | "link": "https://facebook.com/personalpage",
58 | "verification_status": "not_verified"
59 | },
60 | "222222222": {
61 | "id": "222222222",
62 | "name": "Client Page",
63 | "username": "clientpage",
64 | "category": "Business",
65 | "fan_count": 500,
66 | "link": "https://facebook.com/clientpage",
67 | "verification_status": "verified"
68 | },
69 | "333333333": {
70 | "id": "333333333",
71 | "name": "Creative Page",
72 | "username": "creativepage",
73 | "category": "Creative",
74 | "fan_count": 1000,
75 | "link": "https://facebook.com/creativepage",
76 | "verification_status": "not_verified"
77 | }
78 | }
79 |
80 | with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth, \
81 | patch('meta_ads_mcp.core.ads.make_api_request') as mock_api:
82 |
83 | mock_auth.return_value = "test_access_token"
84 |
85 | # Mock API calls in sequence for different approaches
86 | def mock_api_side_effect(endpoint, access_token, params):
87 | if endpoint == "me/accounts":
88 | return mock_user_pages
89 | elif endpoint == "3182643988557192/owned_pages":
90 | return {"data": []} # No business pages
91 | elif endpoint == "act_3182643988557192/client_pages":
92 | return mock_client_pages
93 | elif endpoint == "act_3182643988557192/adcreatives":
94 | return mock_adcreatives
95 | elif endpoint == "act_3182643988557192/ads":
96 | return {"data": []} # No ads
97 | elif endpoint == "act_3182643988557192/promoted_objects":
98 | return {"data": []} # No promoted objects
99 | elif endpoint == "act_3182643988557192/campaigns":
100 | return {"data": []} # No campaigns
101 | elif endpoint in mock_page_details:
102 | return mock_page_details[endpoint]
103 | else:
104 | return {"data": []}
105 |
106 | mock_api.side_effect = mock_api_side_effect
107 |
108 | # Call the function
109 | result = await get_account_pages(account_id="act_3182643988557192")
110 | result_data = json.loads(result)
111 |
112 | # Verify the structure and content
113 | assert "data" in result_data
114 | assert "total_pages_found" in result_data
115 |
116 | # Should find 3 unique pages
117 | assert result_data["total_pages_found"] == 3
118 | assert len(result_data["data"]) == 3
119 |
120 | # Verify that we found valid page data
121 | assert all("id" in page for page in result_data["data"])
122 |
123 | # Verify the page names are correct
124 | page_names = [page.get("name") for page in result_data["data"]]
125 | assert "Personal Page" in page_names
126 | assert "Client Page" in page_names
127 | assert "Creative Page" in page_names
128 |
129 | # Verify each page has basic required fields
130 | for page in result_data["data"]:
131 | assert "id" in page
132 | assert "name" in page
133 |
134 | @pytest.mark.asyncio
135 | async def test_get_account_pages_me_special_case(self):
136 | """Test the special case when account_id is 'me'."""
137 | mock_user_pages = {
138 | "data": [
139 | {
140 | "id": "444444444",
141 | "name": "My Personal Page",
142 | "category": "Personal",
143 | "fan_count": 50
144 | }
145 | ]
146 | }
147 |
148 | with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth, \
149 | patch('meta_ads_mcp.core.ads.make_api_request') as mock_api:
150 |
151 | mock_auth.return_value = "test_access_token"
152 | mock_api.return_value = mock_user_pages
153 |
154 | result = await get_account_pages(account_id="me")
155 | result_data = json.loads(result)
156 |
157 | # Verify the call was made to me/accounts
158 | mock_api.assert_called_once_with(
159 | "me/accounts",
160 | "test_access_token",
161 | {"fields": "id,name,username,category,fan_count,link,verification_status,picture"}
162 | )
163 |
164 | # Verify response structure
165 | assert "data" in result_data
166 | assert len(result_data["data"]) == 1
167 | assert result_data["data"][0]["id"] == "444444444"
168 |
169 | @pytest.mark.asyncio
170 | async def test_get_account_pages_tracking_specs_discovery(self):
171 | """Test page discovery from tracking specs (most reliable method)."""
172 | mock_ads_data = {
173 | "data": [
174 | {
175 | "id": "ad_123",
176 | "tracking_specs": [
177 | {
178 | "page": ["555555555", "666666666"]
179 | }
180 | ]
181 | }
182 | ]
183 | }
184 |
185 | mock_page_details = {
186 | "id": "555555555",
187 | "name": "Tracking Page",
188 | "username": "trackingpage",
189 | "category": "Business"
190 | }
191 |
192 | with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth, \
193 | patch('meta_ads_mcp.core.ads.make_api_request') as mock_api:
194 |
195 | mock_auth.return_value = "test_access_token"
196 |
197 | def mock_api_side_effect(endpoint, access_token, params):
198 | if "tracking_specs" in params.get("fields", ""):
199 | return mock_ads_data
200 | elif endpoint == "555555555":
201 | return mock_page_details
202 | elif endpoint == "666666666":
203 | return {"error": "Page not accessible"}
204 | else:
205 | return {"data": []}
206 |
207 | mock_api.side_effect = mock_api_side_effect
208 |
209 | result = await get_account_pages(account_id="act_123456789")
210 | result_data = json.loads(result)
211 |
212 | # Should find pages from tracking specs
213 | assert result_data["total_pages_found"] == 2
214 |
215 | # Check that one page has details and one has error
216 | pages = result_data["data"]
217 | assert len(pages) == 2
218 |
219 | # One should have full details, one should have error
220 | page_with_details = next((p for p in pages if "error" not in p), None)
221 | page_with_error = next((p for p in pages if "error" in p), None)
222 |
223 | assert page_with_details is not None
224 | assert page_with_error is not None
225 | assert page_with_details["name"] == "Tracking Page"
226 | assert "not accessible" in page_with_error["error"]
227 |
228 | @pytest.mark.asyncio
229 | async def test_get_account_pages_no_pages_found(self):
230 | """Test when no pages are found through any approach."""
231 | with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth, \
232 | patch('meta_ads_mcp.core.ads.make_api_request') as mock_api:
233 |
234 | mock_auth.return_value = "test_access_token"
235 | mock_api.return_value = {"data": []} # All endpoints return empty
236 |
237 | result = await get_account_pages(account_id="act_123456789")
238 | result_data = json.loads(result)
239 |
240 | # Should return the fallback message
241 | assert "data" in result_data
242 | assert len(result_data["data"]) == 0
243 | assert "message" in result_data
244 | assert "No pages found" in result_data["message"]
245 | assert "suggestion" in result_data
246 |
247 | @pytest.mark.asyncio
248 | async def test_get_account_pages_error_handling(self):
249 | """Test error handling when API calls fail."""
250 | with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth, \
251 | patch('meta_ads_mcp.core.ads.make_api_request') as mock_api:
252 |
253 | mock_auth.return_value = "test_access_token"
254 | mock_api.side_effect = Exception("API Error")
255 |
256 | result = await get_account_pages(account_id="act_123456789")
257 | result_data = json.loads(result)
258 |
259 | # Individual API errors are caught and logged, function returns "no pages found"
260 | assert "data" in result_data
261 | assert len(result_data["data"]) == 0
262 | assert "message" in result_data
263 | assert "No pages found" in result_data["message"]
264 |
265 | # But debug info should show the errors
266 | if "debug" in result_data:
267 | debug = result_data["debug"]
268 | assert "errors" in debug
269 | assert len(debug["errors"]) > 0
270 | # Should have multiple API errors logged
271 | assert any("API Error" in error for error in debug["errors"])
272 |
273 | @pytest.mark.asyncio
274 | async def test_get_account_pages_no_account_id(self):
275 | """Test error when no account ID is provided."""
276 | result = await get_account_pages(account_id=None)
277 | result_data = json.loads(result)
278 |
279 | # The @meta_api_tool decorator handles authentication before function logic
280 | # So it returns an authentication error instead of the simple account ID error
281 | # Error responses may be wrapped in a "data" field (MCP format)
282 |
283 | # Check for direct error format
284 | if "error" in result_data or "message" in result_data:
285 | if "message" in result_data and "Authentication Required" in result_data["message"]:
286 | # MCP decorator returns authentication error - this is expected
287 | assert True # This is the expected behavior
288 | return
289 | elif "error" in result_data and "No account ID provided" in result_data["error"]:
290 | # Direct function call might return the account ID error
291 | assert True # This is also valid
292 | return
293 |
294 | # Check for wrapped error format (MCP response format)
295 | if "data" in result_data:
296 | try:
297 | error_data = json.loads(result_data["data"])
298 | if "error" in error_data and "No account ID provided" in error_data["error"]:
299 | assert True # Wrapped error format
300 | return
301 | except (json.JSONDecodeError, TypeError):
302 | pass
303 |
304 | # Fallback: Check if the response contains any indication of missing account ID
305 | result_str = str(result_data)
306 | assert "No account ID provided" in result_str or "Authentication Required" in result_str
307 |
308 | @pytest.mark.asyncio
309 | async def test_get_account_pages_account_id_validation_direct(self):
310 | """Test account ID validation by directly testing the function logic."""
311 | # Import the function implementation directly to bypass decorators
312 | from meta_ads_mcp.core.ads import get_account_pages
313 |
314 | # Mock the function to bypass decorator authentication
315 | with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
316 | mock_auth.return_value = "test_token"
317 |
318 | # Call with empty string (should be treated as None)
319 | result = await get_account_pages(account_id="")
320 | result_data = json.loads(result)
321 |
322 | # Response might be wrapped in MCP format
323 | if "data" in result_data and isinstance(result_data["data"], str):
324 | # Parse the nested JSON response
325 | inner_data = json.loads(result_data["data"])
326 | assert "error" in inner_data
327 | assert "No account ID provided" in inner_data["error"]
328 | else:
329 | # Direct response format
330 | assert "error" in result_data or "message" in result_data
331 | result_str = str(result_data)
332 | assert "No account ID provided" in result_str or "Authentication Required" in result_str
333 |
334 | @pytest.mark.asyncio
335 | async def test_get_account_pages_multiple_sources(self):
336 | """Test that pages from multiple sources are properly collected."""
337 | # Mock different results for different approaches
338 | mock_responses = {
339 | "me/accounts": {"data": [{"id": "111111111"}]},
340 | "123456789/owned_pages": {"data": []},
341 | "act_123456789/client_pages": {"data": [{"id": "222222222"}]},
342 | "act_123456789/adcreatives": {"data": []},
343 | "act_123456789/ads": {"data": []},
344 | "act_123456789/promoted_objects": {"data": []},
345 | "act_123456789/campaigns": {"data": []},
346 | "111111111": {"id": "111111111", "name": "Page 1"},
347 | "222222222": {"id": "222222222", "name": "Page 2"}
348 | }
349 |
350 | with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth, \
351 | patch('meta_ads_mcp.core.ads.make_api_request') as mock_api:
352 |
353 | mock_auth.return_value = "test_access_token"
354 |
355 | def mock_api_side_effect(endpoint, access_token, params):
356 | return mock_responses.get(endpoint, {"data": []})
357 |
358 | mock_api.side_effect = mock_api_side_effect
359 |
360 | result = await get_account_pages(account_id="act_123456789")
361 | result_data = json.loads(result)
362 |
363 | # Verify basic response structure
364 | assert "data" in result_data
365 | assert "total_pages_found" in result_data
366 | assert result_data["total_pages_found"] == 2
367 | assert len(result_data["data"]) == 2
368 |
369 | # Verify pages have correct names
370 | page_names = [page.get("name") for page in result_data["data"]]
371 | assert "Page 1" in page_names
372 | assert "Page 2" in page_names
373 |
374 | @pytest.mark.asyncio
375 | async def test_get_account_pages_act_prefix_handling(self):
376 | """Test that account IDs without 'act_' prefix are handled correctly."""
377 | with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth, \
378 | patch('meta_ads_mcp.core.ads.make_api_request') as mock_api:
379 |
380 | mock_auth.return_value = "test_access_token"
381 | mock_api.return_value = {"data": []}
382 |
383 | # Test with account ID without 'act_' prefix
384 | result = await get_account_pages(account_id="123456789")
385 |
386 | # Check that API calls were made with 'act_' prefix added
387 | calls = mock_api.call_args_list
388 |
389 | # Should find calls with 'act_123456789' in the endpoint
390 | act_calls = [call for call in calls if 'act_123456789' in str(call)]
391 | assert len(act_calls) > 0, "Should have made calls with 'act_' prefix"
392 |
393 | # Should also have made calls with raw account ID for business endpoints
394 | business_calls = [call for call in calls if '123456789/owned_pages' in str(call)]
395 | assert len(business_calls) > 0, "Should have made calls to business endpoints"
396 |
397 |
398 | if __name__ == "__main__":
399 | pytest.main([__file__])
```
--------------------------------------------------------------------------------
/tests/test_budget_update.py:
--------------------------------------------------------------------------------
```python
1 | #!/usr/bin/env python3
2 | """
3 | Unit Tests for Budget Update Functionality
4 |
5 | This test suite validates the budget update parameter implementation for the
6 | update_adset function in meta_ads_mcp/core/adsets.py.
7 |
8 | Test cases cover:
9 | - Budget update success scenarios
10 | - Budget validation (negative, zero, too high values)
11 | - Budget update with other parameters
12 | - Error handling and permissions
13 | """
14 |
15 | import pytest
16 | import json
17 | import asyncio
18 | from unittest.mock import AsyncMock, patch, MagicMock
19 | from typing import Dict, Any, List
20 |
21 | # Import the function to test
22 | from meta_ads_mcp.core.adsets import update_adset
23 |
24 |
25 | class TestBudgetUpdateFunctionality:
26 | """Test suite for budget update functionality"""
27 |
28 | @pytest.fixture
29 | def mock_api_request(self):
30 | """Mock for the make_api_request function"""
31 | with patch('meta_ads_mcp.core.adsets.make_api_request') as mock:
32 | mock.return_value = {
33 | "id": "test_adset_id",
34 | "daily_budget": "5000",
35 | "status": "ACTIVE"
36 | }
37 | yield mock
38 |
39 | @pytest.fixture
40 | def mock_auth_manager(self):
41 | """Mock for the authentication manager"""
42 | with patch('meta_ads_mcp.core.api.auth_manager') as mock, \
43 | patch('meta_ads_mcp.core.auth.get_current_access_token') as mock_get_token:
44 | # Mock a valid access token
45 | mock.get_current_access_token.return_value = "test_access_token"
46 | mock.is_token_valid.return_value = True
47 | mock.app_id = "test_app_id"
48 | mock_get_token.return_value = "test_access_token"
49 | yield mock
50 |
51 | @pytest.fixture
52 | def valid_adset_id(self):
53 | """Valid ad set ID for testing"""
54 | return "123456789"
55 |
56 | @pytest.fixture
57 | def valid_daily_budget(self):
58 | """Valid daily budget amount in cents"""
59 | return "5000" # $50.00
60 |
61 | @pytest.fixture
62 | def valid_lifetime_budget(self):
63 | """Valid lifetime budget amount in cents"""
64 | return "50000" # $500.00
65 |
66 | @pytest.mark.asyncio
67 | async def test_budget_update_success(self, mock_api_request, mock_auth_manager, valid_adset_id, valid_daily_budget):
68 | """Test successful budget update"""
69 |
70 | result = await update_adset(
71 | adset_id=valid_adset_id,
72 | daily_budget=valid_daily_budget
73 | )
74 |
75 | # Parse the result
76 | result_data = json.loads(result)
77 |
78 | # Verify the API was called with correct parameters
79 | mock_api_request.assert_called_once()
80 | call_args = mock_api_request.call_args
81 |
82 | # Check that the endpoint is correct (first argument)
83 | assert call_args[0][0] == valid_adset_id
84 |
85 | # Check that daily_budget was included in parameters (third argument)
86 | params = call_args[0][2] # Third positional argument is params
87 | assert 'daily_budget' in params
88 | assert params['daily_budget'] == valid_daily_budget
89 |
90 | # Verify the response structure
91 | assert 'id' in result_data
92 | assert result_data['id'] == "test_adset_id"
93 |
94 | @pytest.mark.asyncio
95 | async def test_lifetime_budget_update_success(self, mock_api_request, mock_auth_manager, valid_adset_id, valid_lifetime_budget):
96 | """Test successful lifetime budget update"""
97 |
98 | result = await update_adset(
99 | adset_id=valid_adset_id,
100 | lifetime_budget=valid_lifetime_budget
101 | )
102 |
103 | # Parse the result
104 | result_data = json.loads(result)
105 |
106 | # Verify the API was called with correct parameters
107 | mock_api_request.assert_called_once()
108 | call_args = mock_api_request.call_args
109 |
110 | # Check that lifetime_budget was included in parameters
111 | params = call_args[0][2] # Third positional argument is params
112 | assert 'lifetime_budget' in params
113 | assert params['lifetime_budget'] == valid_lifetime_budget
114 |
115 | # Verify the response structure
116 | assert 'id' in result_data
117 |
118 | @pytest.mark.asyncio
119 | async def test_both_budget_types_update(self, mock_api_request, mock_auth_manager, valid_adset_id, valid_daily_budget, valid_lifetime_budget):
120 | """Test updating both daily and lifetime budget simultaneously"""
121 |
122 | result = await update_adset(
123 | adset_id=valid_adset_id,
124 | daily_budget=valid_daily_budget,
125 | lifetime_budget=valid_lifetime_budget
126 | )
127 |
128 | # Parse the result
129 | result_data = json.loads(result)
130 |
131 | # Verify the API was called with correct parameters
132 | mock_api_request.assert_called_once()
133 | call_args = mock_api_request.call_args
134 |
135 | # Check that both budget parameters were included
136 | params = call_args[0][2] # Third positional argument is params
137 | assert 'daily_budget' in params
138 | assert 'lifetime_budget' in params
139 | assert params['daily_budget'] == valid_daily_budget
140 | assert params['lifetime_budget'] == valid_lifetime_budget
141 |
142 | # Verify the response structure
143 | assert 'id' in result_data
144 |
145 | @pytest.mark.asyncio
146 | async def test_budget_update_with_other_parameters(self, mock_api_request, mock_auth_manager, valid_adset_id, valid_daily_budget):
147 | """Test budget update combined with other parameters"""
148 |
149 | result = await update_adset(
150 | adset_id=valid_adset_id,
151 | daily_budget=valid_daily_budget,
152 | status="PAUSED",
153 | bid_amount=1000,
154 | bid_strategy="LOWEST_COST_WITH_BID_CAP"
155 | )
156 |
157 | # Parse the result
158 | result_data = json.loads(result)
159 |
160 | # Verify the API was called with correct parameters
161 | mock_api_request.assert_called_once()
162 | call_args = mock_api_request.call_args
163 |
164 | # Check that all parameters were included
165 | params = call_args[0][2] # Third positional argument is params
166 | assert 'daily_budget' in params
167 | assert 'status' in params
168 | assert 'bid_amount' in params
169 | assert 'bid_strategy' in params
170 | assert params['daily_budget'] == valid_daily_budget
171 | assert params['status'] == "PAUSED"
172 | assert params['bid_amount'] == "1000"
173 | assert params['bid_strategy'] == "LOWEST_COST_WITH_BID_CAP"
174 |
175 | # Verify the response structure
176 | assert 'id' in result_data
177 |
178 | @pytest.mark.asyncio
179 | async def test_budget_update_with_numeric_values(self, mock_api_request, mock_auth_manager, valid_adset_id):
180 | """Test budget update with numeric values (should be converted to strings)"""
181 |
182 | result = await update_adset(
183 | adset_id=valid_adset_id,
184 | daily_budget=5000, # Integer
185 | lifetime_budget=50000 # Integer
186 | )
187 |
188 | # Parse the result
189 | result_data = json.loads(result)
190 |
191 | # Verify the API was called with correct parameters
192 | mock_api_request.assert_called_once()
193 | call_args = mock_api_request.call_args
194 |
195 | # Check that numeric values were converted to strings
196 | params = call_args[0][2] # Third positional argument is params
197 | assert 'daily_budget' in params
198 | assert 'lifetime_budget' in params
199 | assert params['daily_budget'] == "5000"
200 | assert params['lifetime_budget'] == "50000"
201 |
202 | # Verify the response structure
203 | assert 'id' in result_data
204 |
205 | @pytest.mark.asyncio
206 | async def test_budget_update_with_zero_budget(self, mock_api_request, mock_auth_manager, valid_adset_id):
207 | """Test budget update with zero budget (should be allowed)"""
208 |
209 | result = await update_adset(
210 | adset_id=valid_adset_id,
211 | daily_budget="0"
212 | )
213 |
214 | # Parse the result
215 | result_data = json.loads(result)
216 |
217 | # Verify the API was called with zero budget
218 | mock_api_request.assert_called_once()
219 | call_args = mock_api_request.call_args
220 |
221 | params = call_args[0][2] # Third positional argument is params
222 | assert 'daily_budget' in params
223 | assert params['daily_budget'] == "0"
224 |
225 | # Verify the response structure
226 | assert 'id' in result_data
227 |
228 | @pytest.mark.asyncio
229 | async def test_budget_update_with_high_budget(self, mock_api_request, mock_auth_manager, valid_adset_id):
230 | """Test budget update with high budget values"""
231 |
232 | high_budget = "1000000" # $10,000
233 |
234 | result = await update_adset(
235 | adset_id=valid_adset_id,
236 | daily_budget=high_budget
237 | )
238 |
239 | # Parse the result
240 | result_data = json.loads(result)
241 |
242 | # Verify the API was called with high budget
243 | mock_api_request.assert_called_once()
244 | call_args = mock_api_request.call_args
245 |
246 | params = call_args[0][2] # Third positional argument is params
247 | assert 'daily_budget' in params
248 | assert params['daily_budget'] == high_budget
249 |
250 | # Verify the response structure
251 | assert 'id' in result_data
252 |
253 | @pytest.mark.asyncio
254 | async def test_budget_update_api_error_handling(self, mock_api_request, mock_auth_manager, valid_adset_id, valid_daily_budget):
255 | """Test error handling when API call fails"""
256 |
257 | # Mock API to raise an exception
258 | mock_api_request.side_effect = Exception("API Error: Invalid budget amount")
259 |
260 | result = await update_adset(
261 | adset_id=valid_adset_id,
262 | daily_budget=valid_daily_budget
263 | )
264 |
265 | # Parse the result
266 | result_data = json.loads(result)
267 |
268 | # Verify error response structure
269 | # The error is wrapped in a 'data' field as JSON string
270 | error_data = json.loads(result_data['data'])
271 | assert 'error' in error_data
272 | assert 'details' in error_data
273 | assert 'params_sent' in error_data
274 | assert "Failed to update ad set" in error_data['error']
275 | assert "API Error: Invalid budget amount" in error_data['details']
276 | assert valid_adset_id in error_data['error']
277 |
278 | # Verify the parameters that were sent
279 | assert error_data['params_sent']['daily_budget'] == valid_daily_budget
280 |
281 | @pytest.mark.asyncio
282 | async def test_budget_update_with_invalid_adset_id(self, mock_api_request, mock_auth_manager):
283 | """Test budget update with invalid ad set ID"""
284 |
285 | result = await update_adset(
286 | adset_id="", # Empty ad set ID
287 | daily_budget="5000"
288 | )
289 |
290 | # Parse the result
291 | result_data = json.loads(result)
292 |
293 | # Verify error response
294 | error_data = json.loads(result_data['data'])
295 | assert 'error' in error_data
296 | assert "No ad set ID provided" in error_data['error']
297 |
298 | # Verify API was not called
299 | mock_api_request.assert_not_called()
300 |
301 | @pytest.mark.asyncio
302 | async def test_budget_update_with_no_parameters(self, mock_api_request, mock_auth_manager, valid_adset_id):
303 | """Test budget update with no parameters provided"""
304 |
305 | result = await update_adset(
306 | adset_id=valid_adset_id
307 | # No parameters provided
308 | )
309 |
310 | # Parse the result
311 | result_data = json.loads(result)
312 |
313 | # Verify error response
314 | error_data = json.loads(result_data['data'])
315 | assert 'error' in error_data
316 | assert "No update parameters provided" in error_data['error']
317 |
318 | # Verify API was not called
319 | mock_api_request.assert_not_called()
320 |
321 | @pytest.mark.asyncio
322 | async def test_budget_update_with_negative_budget(self, mock_api_request, mock_auth_manager, valid_adset_id):
323 | """Test budget update with negative budget (should be handled by API)"""
324 |
325 | # Mock API to handle negative budget error
326 | mock_api_request.side_effect = Exception("API Error: Budget amount must be positive")
327 |
328 | result = await update_adset(
329 | adset_id=valid_adset_id,
330 | daily_budget="-1000"
331 | )
332 |
333 | # Parse the result
334 | result_data = json.loads(result)
335 |
336 | # Verify error response structure
337 | error_data = json.loads(result_data['data'])
338 | assert 'error' in error_data
339 | assert 'details' in error_data
340 | assert "API Error: Budget amount must be positive" in error_data['details']
341 |
342 | @pytest.mark.asyncio
343 | async def test_budget_update_with_non_numeric_string(self, mock_api_request, mock_auth_manager, valid_adset_id):
344 | """Test budget update with non-numeric string (should be handled by API)"""
345 |
346 | # Mock API to handle non-numeric budget error
347 | mock_api_request.side_effect = Exception("API Error: Invalid budget format")
348 |
349 | result = await update_adset(
350 | adset_id=valid_adset_id,
351 | daily_budget="invalid_budget"
352 | )
353 |
354 | # Parse the result
355 | result_data = json.loads(result)
356 |
357 | # Verify error response structure
358 | error_data = json.loads(result_data['data'])
359 | assert 'error' in error_data
360 | assert 'details' in error_data
361 | assert "API Error: Invalid budget format" in error_data['details']
362 |
363 | @pytest.mark.asyncio
364 | async def test_budget_update_with_permission_error(self, mock_api_request, mock_auth_manager, valid_adset_id, valid_daily_budget):
365 | """Test budget update with permission error"""
366 |
367 | # Mock API to raise permission error
368 | mock_api_request.side_effect = Exception("API Error: (#100) Insufficient permissions")
369 |
370 | result = await update_adset(
371 | adset_id=valid_adset_id,
372 | daily_budget=valid_daily_budget
373 | )
374 |
375 | # Parse the result
376 | result_data = json.loads(result)
377 |
378 | # Verify error response structure
379 | error_data = json.loads(result_data['data'])
380 | assert 'error' in error_data
381 | assert 'details' in error_data
382 | assert "Insufficient permissions" in error_data['details']
383 |
384 | @pytest.mark.asyncio
385 | async def test_budget_update_with_targeting(self, mock_api_request, mock_auth_manager, valid_adset_id, valid_daily_budget):
386 | """Test budget update combined with targeting update"""
387 |
388 | targeting = {
389 | "age_min": 25,
390 | "age_max": 45,
391 | "geo_locations": {"countries": ["US", "CA"]}
392 | }
393 |
394 | result = await update_adset(
395 | adset_id=valid_adset_id,
396 | daily_budget=valid_daily_budget,
397 | targeting=targeting
398 | )
399 |
400 | # Parse the result
401 | result_data = json.loads(result)
402 |
403 | # Verify the API was called with correct parameters
404 | mock_api_request.assert_called_once()
405 | call_args = mock_api_request.call_args
406 |
407 | # Check that both budget and targeting were included
408 | params = call_args[0][2] # Third positional argument is params
409 | assert 'daily_budget' in params
410 | assert 'targeting' in params
411 | assert params['daily_budget'] == valid_daily_budget
412 |
413 | # Verify targeting was properly JSON encoded
414 | targeting_json = json.loads(params['targeting'])
415 | assert targeting_json['age_min'] == 25
416 | assert targeting_json['age_max'] == 45
417 | assert targeting_json['geo_locations']['countries'] == ["US", "CA"]
418 |
419 | # Verify the response structure
420 | assert 'id' in result_data
421 |
422 |
423 | class TestBudgetUpdateIntegration:
424 | """Integration tests for budget update functionality"""
425 |
426 | @pytest.mark.asyncio
427 | async def test_budget_update_workflow(self):
428 | """Test complete budget update workflow"""
429 |
430 | # This test would require a real ad set ID and valid API credentials
431 | # For now, we'll test the function signature and parameter handling
432 |
433 | # Test that the function accepts the new parameters
434 | with patch('meta_ads_mcp.core.adsets.make_api_request') as mock_api, \
435 | patch('meta_ads_mcp.core.api.auth_manager') as mock_auth, \
436 | patch('meta_ads_mcp.core.auth.get_current_access_token') as mock_get_token:
437 |
438 | mock_api.return_value = {"id": "test_id", "daily_budget": "5000"}
439 | mock_auth.get_current_access_token.return_value = "test_access_token"
440 | mock_auth.is_token_valid.return_value = True
441 | mock_auth.app_id = "test_app_id"
442 | mock_get_token.return_value = "test_access_token"
443 |
444 | result = await update_adset(
445 | adset_id="test_adset_id",
446 | daily_budget="5000",
447 | lifetime_budget="50000"
448 | )
449 |
450 | # Verify the function executed without errors
451 | assert result is not None
452 | # The result might already be a dict or a JSON string
453 | if isinstance(result, str):
454 | result_data = json.loads(result)
455 | else:
456 | result_data = result
457 | assert 'id' in result_data
458 |
459 |
460 | if __name__ == "__main__":
461 | pytest.main([__file__, "-v"])
```
--------------------------------------------------------------------------------
/meta_ads_mcp/core/server.py:
--------------------------------------------------------------------------------
```python
1 | """MCP server configuration for Meta Ads API."""
2 |
3 | from mcp.server.fastmcp import FastMCP
4 | import argparse
5 | import os
6 | import sys
7 | import webbrowser
8 | import json
9 | from typing import Dict, Any, Optional
10 | from .auth import login as login_auth
11 | from .resources import list_resources, get_resource
12 | from .utils import logger
13 | from .pipeboard_auth import pipeboard_auth_manager
14 | import time
15 |
16 | # Initialize FastMCP server
17 | mcp_server = FastMCP("meta-ads")
18 |
19 | # Register resource URIs
20 | mcp_server.resource(uri="meta-ads://resources")(list_resources)
21 | mcp_server.resource(uri="meta-ads://images/{resource_id}")(get_resource)
22 |
23 |
24 | class StreamableHTTPHandler:
25 | """Handles stateless Streamable HTTP requests for Meta Ads MCP"""
26 |
27 | def __init__(self):
28 | """Initialize handler with no session storage - all auth per request"""
29 | logger.debug("StreamableHTTPHandler initialized for stateless operation")
30 |
31 | def handle_request(self, request_headers: Dict[str, str], request_body: Dict[str, Any]) -> Dict[str, Any]:
32 | """Handle individual request with authentication
33 |
34 | Args:
35 | request_headers: HTTP request headers
36 | request_body: JSON-RPC request body
37 |
38 | Returns:
39 | JSON response with auth status and any tool results
40 | """
41 | try:
42 | # Extract authentication configuration from headers
43 | auth_config = self.get_auth_config_from_headers(request_headers)
44 | logger.debug(f"Auth method detected: {auth_config['auth_method']}")
45 |
46 | # Handle based on auth method
47 | if auth_config['auth_method'] == 'bearer':
48 | return self.handle_bearer_request(auth_config, request_body)
49 | elif auth_config['auth_method'] == 'custom_meta_app':
50 | return self.handle_custom_app_request(auth_config, request_body)
51 | else:
52 | return self.handle_unauthenticated_request(request_body)
53 |
54 | except Exception as e:
55 | logger.error(f"Error handling request: {e}")
56 | return {
57 | 'jsonrpc': '2.0',
58 | 'error': {
59 | 'code': -32603,
60 | 'message': 'Internal error',
61 | 'data': str(e)
62 | },
63 | 'id': request_body.get('id')
64 | }
65 |
66 | def get_auth_config_from_headers(self, request_headers: Dict[str, str]) -> Dict[str, Any]:
67 | """Extract authentication configuration from HTTP headers
68 |
69 | Args:
70 | request_headers: HTTP request headers
71 |
72 | Returns:
73 | Dictionary with auth method and relevant credentials
74 | """
75 | # Security validation - only allow safe headers
76 | ALLOWED_VIA_HEADERS = {
77 | 'pipeboard_api_token': True, # ✅ Primary method - simple and secure
78 | 'meta_app_id': True, # ✅ Fallback only - triggers OAuth complexity
79 | 'meta_app_secret': False, # ❌ Server environment only
80 | 'meta_access_token': False, # ❌ Use proper auth flows instead
81 | }
82 |
83 | # PRIMARY: Check for Bearer token in Authorization header (handles 90%+ of cases)
84 | auth_header = request_headers.get('Authorization') or request_headers.get('authorization')
85 | if auth_header and auth_header.lower().startswith('bearer '):
86 | token = auth_header[7:].strip()
87 | logger.info("Bearer authentication detected (primary path)")
88 | return {
89 | 'auth_method': 'bearer',
90 | 'bearer_token': token,
91 | 'requires_oauth': False # Simple token-based auth
92 | }
93 |
94 | # FALLBACK: Custom Meta app (minority of users)
95 | meta_app_id = request_headers.get('X-META-APP-ID') or request_headers.get('x-meta-app-id')
96 | if meta_app_id:
97 | logger.debug("Custom Meta app authentication detected (fallback path)")
98 | return {
99 | 'auth_method': 'custom_meta_app',
100 | 'meta_app_id': meta_app_id,
101 | 'requires_oauth': True # Complex OAuth flow required
102 | }
103 |
104 | # No authentication provided
105 | logger.warning("No authentication method detected in headers")
106 | return {
107 | 'auth_method': 'none',
108 | 'requires_oauth': False
109 | }
110 |
111 | def handle_bearer_request(self, auth_config: Dict[str, Any], request_body: Dict[str, Any]) -> Dict[str, Any]:
112 | """Handle request with Bearer token (primary path)
113 |
114 | Args:
115 | auth_config: Authentication configuration from headers
116 | request_body: JSON-RPC request body
117 |
118 | Returns:
119 | JSON response ready for tool execution
120 | """
121 | logger.debug("Processing Bearer authenticated request")
122 | token = auth_config['bearer_token']
123 |
124 | # Token is ready to use immediately for API calls
125 | # TODO: In next phases, this will execute the actual tool call
126 | return {
127 | 'jsonrpc': '2.0',
128 | 'result': {
129 | 'status': 'ready',
130 | 'auth_method': 'bearer',
131 | 'message': 'Authentication successful with Bearer token',
132 | 'token_source': 'bearer_header'
133 | },
134 | 'id': request_body.get('id')
135 | }
136 |
137 | def handle_custom_app_request(self, auth_config: Dict[str, Any], request_body: Dict[str, Any]) -> Dict[str, Any]:
138 | """Handle request with custom Meta app (fallback path)
139 |
140 | Args:
141 | auth_config: Authentication configuration from headers
142 | request_body: JSON-RPC request body
143 |
144 | Returns:
145 | JSON response indicating OAuth flow is required
146 | """
147 | logger.debug("Processing custom Meta app request (OAuth required)")
148 |
149 | # This may require OAuth flow initiation
150 | # Each request is independent - no session state
151 | return {
152 | 'jsonrpc': '2.0',
153 | 'result': {
154 | 'status': 'oauth_required',
155 | 'auth_method': 'custom_meta_app',
156 | 'meta_app_id': auth_config['meta_app_id'],
157 | 'message': 'OAuth flow required for custom Meta app authentication',
158 | 'next_steps': 'Use get_login_link tool to initiate OAuth flow'
159 | },
160 | 'id': request_body.get('id')
161 | }
162 |
163 | def handle_unauthenticated_request(self, request_body: Dict[str, Any]) -> Dict[str, Any]:
164 | """Handle request with no authentication
165 |
166 | Args:
167 | request_body: JSON-RPC request body
168 |
169 | Returns:
170 | JSON error response requesting authentication
171 | """
172 | logger.warning("Unauthenticated request received")
173 |
174 | return {
175 | 'jsonrpc': '2.0',
176 | 'error': {
177 | 'code': -32600,
178 | 'message': 'Authentication required',
179 | 'data': {
180 | 'supported_methods': [
181 | 'Authorization: Bearer <token> (recommended)',
182 | 'X-META-APP-ID: Custom Meta app OAuth (advanced users)'
183 | ],
184 | 'documentation': 'https://github.com/pipeboard-co/meta-ads-mcp'
185 | }
186 | },
187 | 'id': request_body.get('id')
188 | }
189 |
190 |
191 | def login_cli():
192 | """
193 | Command-line function to authenticate with Meta
194 | """
195 | logger.info("Starting Meta Ads CLI authentication flow")
196 | print("Starting Meta Ads CLI authentication flow...")
197 |
198 | # Call the common login function
199 | login_auth()
200 |
201 |
202 | def main():
203 | """Main entry point for the package"""
204 | # Log startup information
205 | logger.info("Meta Ads MCP server starting")
206 | logger.debug(f"Python version: {sys.version}")
207 | logger.debug(f"Args: {sys.argv}")
208 |
209 | # Initialize argument parser
210 | parser = argparse.ArgumentParser(
211 | description="Meta Ads MCP Server - Model Context Protocol server for Meta Ads API",
212 | epilog="For more information, see https://github.com/pipeboard-co/meta-ads-mcp"
213 | )
214 | parser.add_argument("--login", action="store_true", help="Authenticate with Meta and store the token")
215 | parser.add_argument("--app-id", type=str, help="Meta App ID (Client ID) for authentication")
216 | parser.add_argument("--version", action="store_true", help="Show the version of the package")
217 |
218 | # Transport configuration arguments
219 | parser.add_argument("--transport", type=str, choices=["stdio", "streamable-http"],
220 | default="stdio",
221 | help="Transport method: 'stdio' for MCP clients (default), 'streamable-http' for HTTP API access")
222 | parser.add_argument("--port", type=int, default=8080,
223 | help="Port for Streamable HTTP transport (default: 8080, only used with --transport streamable-http)")
224 | parser.add_argument("--host", type=str, default="localhost",
225 | help="Host for Streamable HTTP transport (default: localhost, only used with --transport streamable-http)")
226 | parser.add_argument("--sse-response", action="store_true",
227 | help="Use SSE response format instead of JSON (default: JSON, only used with --transport streamable-http)")
228 |
229 | args = parser.parse_args()
230 | logger.debug(f"Parsed args: login={args.login}, app_id={args.app_id}, version={args.version}")
231 | logger.debug(f"Transport args: transport={args.transport}, port={args.port}, host={args.host}, sse_response={args.sse_response}")
232 |
233 | # Validate CLI argument combinations
234 | if args.transport == "stdio" and (args.port != 8080 or args.host != "localhost" or args.sse_response):
235 | logger.warning("HTTP transport arguments (--port, --host, --sse-response) are ignored when using stdio transport")
236 | print("Warning: HTTP transport arguments are ignored when using stdio transport")
237 |
238 | # Update app ID if provided as environment variable or command line arg
239 | from .auth import auth_manager, meta_config
240 |
241 | # Check environment variable first (early init)
242 | env_app_id = os.environ.get("META_APP_ID")
243 | if env_app_id:
244 | logger.debug(f"Found META_APP_ID in environment: {env_app_id}")
245 | else:
246 | logger.warning("META_APP_ID not found in environment variables")
247 |
248 | # Command line takes precedence
249 | if args.app_id:
250 | logger.info(f"Setting app_id from command line: {args.app_id}")
251 | auth_manager.app_id = args.app_id
252 | meta_config.set_app_id(args.app_id)
253 | elif env_app_id:
254 | logger.info(f"Setting app_id from environment: {env_app_id}")
255 | auth_manager.app_id = env_app_id
256 | meta_config.set_app_id(env_app_id)
257 |
258 | # Log the final app ID that will be used
259 | logger.info(f"Final app_id from meta_config: {meta_config.get_app_id()}")
260 | logger.info(f"Final app_id from auth_manager: {auth_manager.app_id}")
261 | logger.info(f"ENV META_APP_ID: {os.environ.get('META_APP_ID')}")
262 |
263 | # Show version if requested
264 | if args.version:
265 | from meta_ads_mcp import __version__
266 | logger.info(f"Displaying version: {__version__}")
267 | print(f"Meta Ads MCP v{__version__}")
268 | return 0
269 |
270 | # Handle login command
271 | if args.login:
272 | login_cli()
273 | return 0
274 |
275 | # Check for Pipeboard authentication and token
276 | pipeboard_api_token = os.environ.get("PIPEBOARD_API_TOKEN")
277 | if pipeboard_api_token:
278 | logger.info("Using Pipeboard authentication")
279 | print("✅ Pipeboard authentication enabled")
280 | print(f" API token: {pipeboard_api_token[:8]}...{pipeboard_api_token[-4:]}")
281 | # Check for existing token
282 | token = pipeboard_auth_manager.get_access_token()
283 | if not token:
284 | logger.info("No valid Pipeboard token found. Initiating browser-based authentication flow.")
285 | print("No valid Meta token found. Opening browser for authentication...")
286 | try:
287 | # Initialize the auth flow and get the login URL
288 | auth_data = pipeboard_auth_manager.initiate_auth_flow()
289 | login_url = auth_data.get('loginUrl')
290 | if login_url:
291 | logger.info(f"Opening browser with login URL: {login_url}")
292 | webbrowser.open(login_url)
293 | print("Please authorize the application in your browser.")
294 | print("After authorization, the token will be automatically retrieved.")
295 | print("Waiting for authentication to complete...")
296 |
297 | # Poll for token completion
298 | max_attempts = 30 # Try for 30 * 2 = 60 seconds
299 | for attempt in range(max_attempts):
300 | print(f"Waiting for authentication... ({attempt+1}/{max_attempts})")
301 | # Try to get the token again
302 | token = pipeboard_auth_manager.get_access_token(force_refresh=True)
303 | if token:
304 | print("Authentication successful!")
305 | break
306 | time.sleep(2) # Wait 2 seconds between attempts
307 |
308 | if not token:
309 | print("Authentication timed out. Starting server anyway.")
310 | print("You may need to restart the server after completing authentication.")
311 | else:
312 | logger.error("No login URL received from Pipeboard API")
313 | print("Error: Could not get authentication URL. Check your API token.")
314 | except Exception as e:
315 | logger.error(f"Error initiating browser-based authentication: {e}")
316 | print(f"Error: Could not start authentication: {e}")
317 | else:
318 | print(f"✅ Valid Pipeboard access token found")
319 | print(f" Token preview: {token[:10]}...{token[-5:]}")
320 |
321 | # Transport-specific server initialization and startup
322 | if args.transport == "streamable-http":
323 | logger.info(f"Starting MCP server with Streamable HTTP transport on {args.host}:{args.port}")
324 | logger.info("Mode: Stateless (no session persistence)")
325 | logger.info(f"Response format: {'SSE' if args.sse_response else 'JSON'}")
326 | logger.info("Primary auth method: Bearer Token (recommended)")
327 | logger.info("Fallback auth method: Custom Meta App OAuth (complex setup)")
328 |
329 | print(f"Starting Meta Ads MCP server with Streamable HTTP transport")
330 | print(f"Server will listen on {args.host}:{args.port}")
331 | print(f"Response format: {'SSE' if args.sse_response else 'JSON'}")
332 | print("Primary authentication: Bearer Token (via Authorization: Bearer <token> header)")
333 | print("Fallback authentication: Custom Meta App OAuth (via X-META-APP-ID header)")
334 |
335 | # Configure the existing server with streamable HTTP settings
336 | mcp_server.settings.host = args.host
337 | mcp_server.settings.port = args.port
338 | mcp_server.settings.stateless_http = True
339 | mcp_server.settings.json_response = not args.sse_response
340 |
341 | # Import all tool modules to ensure they are registered
342 | logger.info("Ensuring all tools are registered for HTTP transport")
343 | from . import accounts, campaigns, adsets, ads, insights, authentication
344 | from . import ads_library, budget_schedules, reports, openai_deep_research
345 |
346 | # ✅ NEW: Setup HTTP authentication middleware
347 | logger.info("Setting up HTTP authentication middleware")
348 | try:
349 | from .http_auth_integration import setup_fastmcp_http_auth
350 |
351 | # Setup the FastMCP HTTP auth integration
352 | setup_fastmcp_http_auth(mcp_server)
353 | logger.info("FastMCP HTTP authentication integration setup successful")
354 | print("✅ FastMCP HTTP authentication integration enabled")
355 | print(" - Bearer tokens via Authorization: Bearer <token> header")
356 | print(" - Direct Meta tokens via X-META-ACCESS-TOKEN header")
357 |
358 | except Exception as e:
359 | logger.error(f"Failed to setup FastMCP HTTP authentication integration: {e}")
360 | print(f"⚠️ FastMCP HTTP authentication integration setup failed: {e}")
361 | print(" Server will still start but may not support header-based auth")
362 |
363 | # Log final server configuration
364 | logger.info(f"FastMCP server configured with:")
365 | logger.info(f" - Host: {mcp_server.settings.host}")
366 | logger.info(f" - Port: {mcp_server.settings.port}")
367 | logger.info(f" - Stateless HTTP: {mcp_server.settings.stateless_http}")
368 | logger.info(f" - JSON Response: {mcp_server.settings.json_response}")
369 | logger.info(f" - Streamable HTTP Path: {mcp_server.settings.streamable_http_path}")
370 |
371 | # Start the FastMCP server with Streamable HTTP transport
372 | try:
373 | logger.info("Starting FastMCP server with Streamable HTTP transport")
374 | print(f"✅ Server configured successfully")
375 | print(f" URL: http://{args.host}:{args.port}{mcp_server.settings.streamable_http_path}/")
376 | print(f" Mode: {'Stateless' if mcp_server.settings.stateless_http else 'Stateful'}")
377 | print(f" Format: {'JSON' if mcp_server.settings.json_response else 'SSE'}")
378 | mcp_server.run(transport="streamable-http")
379 | except Exception as e:
380 | logger.error(f"Error starting Streamable HTTP server: {e}")
381 | print(f"Error: Failed to start Streamable HTTP server: {e}")
382 | return 1
383 | else:
384 | # Default stdio transport
385 | logger.info("Starting MCP server with stdio transport")
386 | mcp_server.run(transport='stdio')
```
--------------------------------------------------------------------------------
/tests/test_estimate_audience_size.py:
--------------------------------------------------------------------------------
```python
1 | #!/usr/bin/env python3
2 | """
3 | Unit tests for estimate_audience_size functionality in Meta Ads MCP.
4 |
5 | This module tests the new estimate_audience_size function that replaces validate_interests
6 | and provides comprehensive audience estimation using Meta's reachestimate API.
7 | """
8 |
9 | import pytest
10 | import json
11 | from unittest.mock import AsyncMock, patch
12 |
13 | from meta_ads_mcp.core.targeting import estimate_audience_size
14 |
15 |
16 | class TestEstimateAudienceSize:
17 | """Test cases for estimate_audience_size function"""
18 |
19 | @pytest.mark.asyncio
20 | async def test_comprehensive_audience_estimation_success(self):
21 | """Test successful comprehensive audience estimation with complex targeting"""
22 | mock_response = {
23 | "data": [
24 | {
25 | "estimate_mau": 1500000,
26 | "estimate_dau": [
27 | {"min_reach": 100000, "max_reach": 150000, "bid": 100},
28 | {"min_reach": 200000, "max_reach": 250000, "bid": 200}
29 | ],
30 | "bid_estimates": {
31 | "median": 150,
32 | "min": 50,
33 | "max": 300
34 | },
35 | "unsupported_targeting": []
36 | }
37 | ]
38 | }
39 |
40 | targeting_spec = {
41 | "age_min": 25,
42 | "age_max": 65,
43 | "geo_locations": {"countries": ["US"]},
44 | "flexible_spec": [
45 | {"interests": [{"id": "6003371567474"}]}
46 | ]
47 | }
48 |
49 | with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
50 | mock_api.return_value = mock_response
51 |
52 | result = await estimate_audience_size(
53 | access_token="test_token",
54 | account_id="act_123456789",
55 | targeting=targeting_spec,
56 | optimization_goal="REACH"
57 | )
58 |
59 | # Verify API call
60 | mock_api.assert_called_once_with(
61 | "act_123456789/reachestimate",
62 | "test_token",
63 | {
64 | "targeting_spec": targeting_spec
65 | },
66 | method="GET"
67 | )
68 |
69 | # Verify response format
70 | result_data = json.loads(result)
71 | assert result_data["success"] is True
72 | assert result_data["account_id"] == "act_123456789"
73 | assert result_data["targeting"] == targeting_spec
74 | assert result_data["optimization_goal"] == "REACH"
75 | assert result_data["estimated_audience_size"] == 1500000
76 | assert "estimate_details" in result_data
77 | assert result_data["estimate_details"]["monthly_active_users"] == 1500000
78 |
79 | @pytest.mark.asyncio
80 | async def test_different_optimization_goals(self):
81 | """Test audience estimation with different optimization goals (parameter is preserved in response)"""
82 | mock_response = {
83 | "data": [
84 | {
85 | "estimate_mau": 800000,
86 | "estimate_dau": [],
87 | "bid_estimates": {},
88 | "unsupported_targeting": []
89 | }
90 | ]
91 | }
92 |
93 | targeting_spec = {
94 | "age_min": 18,
95 | "age_max": 45,
96 | "geo_locations": {"countries": ["US"]}
97 | }
98 |
99 | # Test different optimization goals - they should all use the same reachestimate endpoint
100 | test_goals = ["REACH", "LINK_CLICKS", "LANDING_PAGE_VIEWS", "CONVERSIONS", "APP_INSTALLS"]
101 |
102 | with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
103 | mock_api.return_value = mock_response
104 |
105 | for optimization_goal in test_goals:
106 | mock_api.reset_mock()
107 |
108 | result = await estimate_audience_size(
109 | access_token="test_token",
110 | account_id="act_123456789",
111 | targeting=targeting_spec,
112 | optimization_goal=optimization_goal
113 | )
114 |
115 | # Verify API call uses reachestimate endpoint with simplified parameters
116 | mock_api.assert_called_once_with(
117 | "act_123456789/reachestimate",
118 | "test_token",
119 | {
120 | "targeting_spec": targeting_spec
121 | },
122 | method="GET"
123 | )
124 |
125 | result_data = json.loads(result)
126 | assert result_data["success"] is True
127 | assert result_data["optimization_goal"] == optimization_goal
128 |
129 | @pytest.mark.asyncio
130 | async def test_backwards_compatibility_interest_names(self):
131 | """Test backwards compatibility with interest name validation"""
132 | mock_response = {
133 | "data": [
134 | {
135 | "name": "Japan",
136 | "valid": True,
137 | "id": 6003700426513,
138 | "audience_size": 68310258
139 | },
140 | {
141 | "name": "invalidinterest",
142 | "valid": False
143 | }
144 | ]
145 | }
146 |
147 | with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
148 | mock_api.return_value = mock_response
149 |
150 | result = await estimate_audience_size(
151 | access_token="test_token",
152 | interest_list=["Japan", "invalidinterest"]
153 | )
154 |
155 | # Verify it uses the old validation API
156 | mock_api.assert_called_once_with(
157 | "search",
158 | "test_token",
159 | {
160 | "type": "adinterestvalid",
161 | "interest_list": '["Japan", "invalidinterest"]'
162 | }
163 | )
164 |
165 | # Verify response matches old format
166 | result_data = json.loads(result)
167 | assert result_data == mock_response
168 | assert result_data["data"][0]["valid"] is True
169 | assert result_data["data"][1]["valid"] is False
170 |
171 | @pytest.mark.asyncio
172 | async def test_backwards_compatibility_interest_fbids(self):
173 | """Test backwards compatibility with interest FBID validation"""
174 | mock_response = {
175 | "data": [
176 | {
177 | "id": "6003700426513",
178 | "valid": True,
179 | "audience_size": 68310258
180 | }
181 | ]
182 | }
183 |
184 | with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
185 | mock_api.return_value = mock_response
186 |
187 | result = await estimate_audience_size(
188 | access_token="test_token",
189 | interest_fbid_list=["6003700426513"]
190 | )
191 |
192 | # Verify it uses the old validation API
193 | mock_api.assert_called_once_with(
194 | "search",
195 | "test_token",
196 | {
197 | "type": "adinterestvalid",
198 | "interest_fbid_list": '["6003700426513"]'
199 | }
200 | )
201 |
202 | # Verify response matches old format
203 | result_data = json.loads(result)
204 | assert result_data == mock_response
205 | assert result_data["data"][0]["valid"] is True
206 |
207 | @pytest.mark.asyncio
208 | async def test_backwards_compatibility_both_interest_params(self):
209 | """Test backwards compatibility with both interest names and FBIDs"""
210 | mock_response = {
211 | "data": [
212 | {"name": "Japan", "valid": True, "id": 6003700426513, "audience_size": 68310258},
213 | {"id": "6003397425735", "valid": True, "audience_size": 12345678}
214 | ]
215 | }
216 |
217 | with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
218 | mock_api.return_value = mock_response
219 |
220 | result = await estimate_audience_size(
221 | access_token="test_token",
222 | interest_list=["Japan"],
223 | interest_fbid_list=["6003397425735"]
224 | )
225 |
226 | # Verify both parameters are passed
227 | mock_api.assert_called_once_with(
228 | "search",
229 | "test_token",
230 | {
231 | "type": "adinterestvalid",
232 | "interest_list": '["Japan"]',
233 | "interest_fbid_list": '["6003397425735"]'
234 | }
235 | )
236 |
237 | result_data = json.loads(result)
238 | assert result_data == mock_response
239 |
240 | @pytest.mark.asyncio
241 | async def test_error_no_account_id_for_comprehensive(self):
242 | """Test error when account_id missing for comprehensive estimation"""
243 | targeting_spec = {
244 | "age_min": 25,
245 | "age_max": 65,
246 | "geo_locations": {"countries": ["US"]}
247 | }
248 |
249 | result = await estimate_audience_size(
250 | access_token="test_token",
251 | targeting=targeting_spec
252 | )
253 |
254 | result_data = json.loads(result)
255 | # The @meta_api_tool decorator wraps errors in a 'data' field
256 | assert "data" in result_data
257 | nested_data = json.loads(result_data["data"])
258 | assert "error" in nested_data
259 | assert "account_id is required" in nested_data["error"]
260 | assert "details" in nested_data
261 |
262 | @pytest.mark.asyncio
263 | async def test_error_no_targeting_for_comprehensive(self):
264 | """Test error when targeting missing for comprehensive estimation"""
265 | result = await estimate_audience_size(
266 | access_token="test_token",
267 | account_id="act_123456789"
268 | )
269 |
270 | result_data = json.loads(result)
271 | # The @meta_api_tool decorator wraps errors in a 'data' field
272 | assert "data" in result_data
273 | nested_data = json.loads(result_data["data"])
274 | assert "error" in nested_data
275 | assert "targeting specification is required" in nested_data["error"]
276 | assert "example" in nested_data
277 |
278 | @pytest.mark.asyncio
279 | async def test_error_no_parameters(self):
280 | """Test error when no parameters provided"""
281 | # Since we're using the @meta_api_tool decorator, we need to simulate
282 | # its behavior for error handling
283 | with patch('meta_ads_mcp.core.auth.get_current_access_token') as mock_auth:
284 | mock_auth.return_value = "test_token"
285 |
286 | result = await estimate_audience_size()
287 |
288 | result_data = json.loads(result)
289 | # The @meta_api_tool decorator wraps errors in a 'data' field
290 | assert "data" in result_data
291 | nested_data = json.loads(result_data["data"])
292 | assert "error" in nested_data
293 |
294 | @pytest.mark.asyncio
295 | async def test_error_backwards_compatibility_no_interests(self):
296 | """Test error in backwards compatibility mode with no interest parameters"""
297 | result = await estimate_audience_size(
298 | access_token="test_token"
299 | )
300 |
301 | result_data = json.loads(result)
302 | # The @meta_api_tool decorator wraps errors in a 'data' field
303 | assert "data" in result_data
304 | nested_data = json.loads(result_data["data"])
305 | assert "error" in nested_data
306 |
307 | @pytest.mark.asyncio
308 | async def test_api_error_handling(self):
309 | """Test handling of API errors from reachestimate"""
310 | targeting_spec = {
311 | "age_min": 25,
312 | "age_max": 65,
313 | "geo_locations": {"countries": ["US"]}
314 | }
315 |
316 | # Simulate API exception
317 | with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
318 | mock_api.side_effect = Exception("API connection failed")
319 |
320 | result = await estimate_audience_size(
321 | access_token="test_token",
322 | account_id="act_123456789",
323 | targeting=targeting_spec
324 | )
325 |
326 | result_data = json.loads(result)
327 | # The @meta_api_tool decorator wraps errors in a 'data' field
328 | assert "data" in result_data
329 | nested_data = json.loads(result_data["data"])
330 | assert "error" in nested_data
331 | assert "Failed to get audience estimation from reachestimate endpoint" in nested_data["error"]
332 | assert "details" in nested_data
333 |
334 | @pytest.mark.asyncio
335 | async def test_empty_api_response(self):
336 | """Test handling of empty response from reachestimate API"""
337 | targeting_spec = {
338 | "age_min": 25,
339 | "age_max": 65,
340 | "geo_locations": {"countries": ["US"]}
341 | }
342 |
343 | # Simulate empty API response
344 | mock_response = {"data": []}
345 |
346 | with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
347 | mock_api.return_value = mock_response
348 |
349 | result = await estimate_audience_size(
350 | access_token="test_token",
351 | account_id="act_123456789",
352 | targeting=targeting_spec
353 | )
354 |
355 | result_data = json.loads(result)
356 | # The @meta_api_tool decorator wraps errors in a 'data' field
357 | assert "data" in result_data
358 | nested_data = json.loads(result_data["data"])
359 | assert "error" in nested_data
360 | assert "No estimation data returned" in nested_data["error"]
361 | assert "raw_response" in nested_data
362 |
363 | @pytest.mark.asyncio
364 | async def test_comprehensive_estimation_with_minimal_targeting(self):
365 | """Test comprehensive estimation with minimal targeting requirements"""
366 | mock_response = {
367 | "data": [
368 | {
369 | "estimate_mau": 500000,
370 | "estimate_dau": [],
371 | "bid_estimates": {},
372 | "unsupported_targeting": []
373 | }
374 | ]
375 | }
376 |
377 | minimal_targeting = {
378 | "age_min": 18,
379 | "age_max": 65,
380 | "geo_locations": {"countries": ["US"]}
381 | }
382 |
383 | with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
384 | mock_api.return_value = mock_response
385 |
386 | result = await estimate_audience_size(
387 | access_token="test_token",
388 | account_id="act_123456789",
389 | targeting=minimal_targeting
390 | )
391 |
392 | result_data = json.loads(result)
393 | assert result_data["success"] is True
394 | assert result_data["estimated_audience_size"] == 500000
395 | assert result_data["targeting"] == minimal_targeting
396 |
397 | @pytest.mark.asyncio
398 | async def test_estimate_details_structure(self):
399 | """Test that estimate_details contains expected structure"""
400 | mock_response = {
401 | "data": [
402 | {
403 | "estimate_mau": 2000000,
404 | "estimate_dau": [
405 | {"min_reach": 150000, "max_reach": 200000, "bid": 120}
406 | ],
407 | "bid_estimates": {
408 | "median": 180,
409 | "min": 80,
410 | "max": 350
411 | },
412 | "unsupported_targeting": ["custom_audiences"]
413 | }
414 | ]
415 | }
416 |
417 | targeting_spec = {
418 | "age_min": 25,
419 | "age_max": 45,
420 | "geo_locations": {"countries": ["US"]},
421 | "flexible_spec": [
422 | {"interests": [{"id": "6003371567474"}, {"id": "6003462346642"}]}
423 | ]
424 | }
425 |
426 | with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
427 | mock_api.return_value = mock_response
428 |
429 | result = await estimate_audience_size(
430 | access_token="test_token",
431 | account_id="act_123456789",
432 | targeting=targeting_spec,
433 | optimization_goal="CONVERSIONS"
434 | )
435 |
436 | result_data = json.loads(result)
437 | assert result_data["success"] is True
438 |
439 | # Check estimate_details structure
440 | details = result_data["estimate_details"]
441 | assert "monthly_active_users" in details
442 | assert "daily_outcomes_curve" in details
443 | assert "bid_estimate" in details
444 | assert "unsupported_targeting" in details
445 |
446 | assert details["monthly_active_users"] == 2000000
447 | assert len(details["daily_outcomes_curve"]) == 1
448 | assert details["bid_estimate"]["median"] == 180
449 | assert "custom_audiences" in details["unsupported_targeting"]
450 |
451 | @pytest.mark.asyncio
452 | async def test_function_registration(self):
453 | """Test that the function is properly registered as an MCP tool"""
454 | # This test verifies the function has the correct decorators
455 | assert hasattr(estimate_audience_size, '__wrapped__') # From @meta_api_tool
456 |
457 | # Verify function signature
458 | import inspect
459 | sig = inspect.signature(estimate_audience_size)
460 |
461 | expected_params = [
462 | 'access_token', 'account_id', 'targeting', 'optimization_goal',
463 | 'interest_list', 'interest_fbid_list'
464 | ]
465 |
466 | for param in expected_params:
467 | assert param in sig.parameters
468 |
469 | # Verify default values
470 | assert sig.parameters['optimization_goal'].default == "REACH"
471 | assert sig.parameters['access_token'].default is None
472 | assert sig.parameters['targeting'].default is None
```
--------------------------------------------------------------------------------
/meta_ads_mcp/core/openai_deep_research.py:
--------------------------------------------------------------------------------
```python
1 | """OpenAI MCP Deep Research tools for Meta Ads API.
2 |
3 | This module implements the required 'search' and 'fetch' tools for OpenAI's
4 | ChatGPT Deep Research feature, providing access to Meta Ads data in the format
5 | expected by ChatGPT.
6 |
7 | The tools expose Meta Ads data (accounts, campaigns, ads, etc.) as searchable
8 | and fetchable records for ChatGPT Deep Research analysis.
9 | """
10 |
11 | import json
12 | import re
13 | from typing import List, Dict, Any, Optional
14 | from .api import meta_api_tool, make_api_request
15 | from .server import mcp_server
16 | from .utils import logger
17 |
18 |
19 | class MetaAdsDataManager:
20 | """Manages Meta Ads data for OpenAI MCP search and fetch operations"""
21 |
22 | def __init__(self):
23 | self._cache = {}
24 | logger.debug("MetaAdsDataManager initialized")
25 |
26 | async def _get_ad_accounts(self, access_token: str, limit: int = 200) -> List[Dict[str, Any]]:
27 | """Get ad accounts data"""
28 | try:
29 | endpoint = "me/adaccounts"
30 | params = {
31 | "fields": "id,name,account_id,account_status,amount_spent,balance,currency,business_city,business_country_code",
32 | "limit": limit
33 | }
34 |
35 | data = await make_api_request(endpoint, access_token, params)
36 |
37 | if "data" in data:
38 | return data["data"]
39 | return []
40 | except Exception as e:
41 | logger.error(f"Error fetching ad accounts: {e}")
42 | return []
43 |
44 | async def _get_campaigns(self, access_token: str, account_id: str, limit: int = 25) -> List[Dict[str, Any]]:
45 | """Get campaigns data for an account"""
46 | try:
47 | endpoint = f"{account_id}/campaigns"
48 | params = {
49 | "fields": "id,name,status,objective,daily_budget,lifetime_budget,start_time,stop_time,created_time,updated_time",
50 | "limit": limit
51 | }
52 |
53 | data = await make_api_request(endpoint, access_token, params)
54 |
55 | if "data" in data:
56 | return data["data"]
57 | return []
58 | except Exception as e:
59 | logger.error(f"Error fetching campaigns for {account_id}: {e}")
60 | return []
61 |
62 | async def _get_ads(self, access_token: str, account_id: str, limit: int = 25) -> List[Dict[str, Any]]:
63 | """Get ads data for an account"""
64 | try:
65 | endpoint = f"{account_id}/ads"
66 | params = {
67 | "fields": "id,name,status,creative,targeting,bid_amount,created_time,updated_time",
68 | "limit": limit
69 | }
70 |
71 | data = await make_api_request(endpoint, access_token, params)
72 |
73 | if "data" in data:
74 | return data["data"]
75 | return []
76 | except Exception as e:
77 | logger.error(f"Error fetching ads for {account_id}: {e}")
78 | return []
79 |
80 | async def _get_pages_for_account(self, access_token: str, account_id: str) -> List[Dict[str, Any]]:
81 | """Get pages associated with an account"""
82 | try:
83 | # Import the page discovery function from ads module
84 | from .ads import _discover_pages_for_account
85 |
86 | # Ensure account_id has the 'act_' prefix
87 | if not account_id.startswith("act_"):
88 | account_id = f"act_{account_id}"
89 |
90 | page_discovery_result = await _discover_pages_for_account(account_id, access_token)
91 |
92 | if not page_discovery_result.get("success"):
93 | return []
94 |
95 | # Return page data in a consistent format
96 | return [{
97 | "id": page_discovery_result["page_id"],
98 | "name": page_discovery_result.get("page_name", "Unknown"),
99 | "source": page_discovery_result.get("source", "unknown"),
100 | "account_id": account_id
101 | }]
102 | except Exception as e:
103 | logger.error(f"Error fetching pages for {account_id}: {e}")
104 | return []
105 |
106 | async def _get_businesses(self, access_token: str, user_id: str = "me", limit: int = 25) -> List[Dict[str, Any]]:
107 | """Get businesses accessible by the current user"""
108 | try:
109 | endpoint = f"{user_id}/businesses"
110 | params = {
111 | "fields": "id,name,created_time,verification_status",
112 | "limit": limit
113 | }
114 |
115 | data = await make_api_request(endpoint, access_token, params)
116 |
117 | if "data" in data:
118 | return data["data"]
119 | return []
120 | except Exception as e:
121 | logger.error(f"Error fetching businesses: {e}")
122 | return []
123 |
124 | async def search_records(self, query: str, access_token: str) -> List[str]:
125 | """Search Meta Ads data and return matching record IDs
126 |
127 | Args:
128 | query: Search query string
129 | access_token: Meta API access token
130 |
131 | Returns:
132 | List of record IDs that match the query
133 | """
134 | logger.info(f"Searching Meta Ads data with query: {query}")
135 |
136 | # Normalize query for matching
137 | query_lower = query.lower()
138 | query_terms = re.findall(r'\w+', query_lower)
139 |
140 | matching_ids = []
141 |
142 | try:
143 | # Search ad accounts
144 | accounts = await self._get_ad_accounts(access_token, limit=200)
145 | for account in accounts:
146 | account_text = f"{account.get('name', '')} {account.get('id', '')} {account.get('account_status', '')} {account.get('business_city', '')} {account.get('business_country_code', '')}".lower()
147 |
148 | if any(term in account_text for term in query_terms):
149 | record_id = f"account:{account['id']}"
150 | matching_ids.append(record_id)
151 |
152 | # Cache the account data
153 | self._cache[record_id] = {
154 | "id": record_id,
155 | "type": "account",
156 | "title": f"Ad Account: {account.get('name', 'Unnamed Account')}",
157 | "text": f"Meta Ads Account {account.get('name', 'Unnamed')} (ID: {account.get('id', 'N/A')}) - Status: {account.get('account_status', 'Unknown')}, Currency: {account.get('currency', 'Unknown')}, Spent: ${account.get('amount_spent', 0)}, Balance: ${account.get('balance', 0)}",
158 | "metadata": {
159 | "account_id": account.get('id'),
160 | "account_name": account.get('name'),
161 | "status": account.get('account_status'),
162 | "currency": account.get('currency'),
163 | "business_location": f"{account.get('business_city', '')}, {account.get('business_country_code', '')}".strip(', '),
164 | "data_type": "meta_ads_account"
165 | },
166 | "raw_data": account
167 | }
168 |
169 | # Also search campaigns for this account if it matches
170 | campaigns = await self._get_campaigns(access_token, account['id'], limit=10)
171 | for campaign in campaigns:
172 | campaign_text = f"{campaign.get('name', '')} {campaign.get('objective', '')} {campaign.get('status', '')}".lower()
173 |
174 | if any(term in campaign_text for term in query_terms):
175 | campaign_record_id = f"campaign:{campaign['id']}"
176 | matching_ids.append(campaign_record_id)
177 |
178 | # Cache the campaign data
179 | self._cache[campaign_record_id] = {
180 | "id": campaign_record_id,
181 | "type": "campaign",
182 | "title": f"Campaign: {campaign.get('name', 'Unnamed Campaign')}",
183 | "text": f"Meta Ads Campaign {campaign.get('name', 'Unnamed')} (ID: {campaign.get('id', 'N/A')}) - Objective: {campaign.get('objective', 'Unknown')}, Status: {campaign.get('status', 'Unknown')}, Daily Budget: ${campaign.get('daily_budget', 'Not set')}, Account: {account.get('name', 'Unknown')}",
184 | "metadata": {
185 | "campaign_id": campaign.get('id'),
186 | "campaign_name": campaign.get('name'),
187 | "objective": campaign.get('objective'),
188 | "status": campaign.get('status'),
189 | "account_id": account.get('id'),
190 | "account_name": account.get('name'),
191 | "data_type": "meta_ads_campaign"
192 | },
193 | "raw_data": campaign
194 | }
195 |
196 | # If query specifically mentions "ads" or "ad", also search individual ads
197 | if any(term in ['ad', 'ads', 'advertisement', 'creative'] for term in query_terms):
198 | for account in accounts[:3]: # Limit to first 3 accounts for performance
199 | ads = await self._get_ads(access_token, account['id'], limit=10)
200 | for ad in ads:
201 | ad_text = f"{ad.get('name', '')} {ad.get('status', '')}".lower()
202 |
203 | if any(term in ad_text for term in query_terms):
204 | ad_record_id = f"ad:{ad['id']}"
205 | matching_ids.append(ad_record_id)
206 |
207 | # Cache the ad data
208 | self._cache[ad_record_id] = {
209 | "id": ad_record_id,
210 | "type": "ad",
211 | "title": f"Ad: {ad.get('name', 'Unnamed Ad')}",
212 | "text": f"Meta Ad {ad.get('name', 'Unnamed')} (ID: {ad.get('id', 'N/A')}) - Status: {ad.get('status', 'Unknown')}, Bid Amount: ${ad.get('bid_amount', 'Not set')}, Account: {account.get('name', 'Unknown')}",
213 | "metadata": {
214 | "ad_id": ad.get('id'),
215 | "ad_name": ad.get('name'),
216 | "status": ad.get('status'),
217 | "account_id": account.get('id'),
218 | "account_name": account.get('name'),
219 | "data_type": "meta_ads_ad"
220 | },
221 | "raw_data": ad
222 | }
223 |
224 | # If query specifically mentions "page" or "pages", also search pages
225 | if any(term in ['page', 'pages', 'facebook page'] for term in query_terms):
226 | for account in accounts[:5]: # Limit to first 5 accounts for performance
227 | pages = await self._get_pages_for_account(access_token, account['id'])
228 | for page in pages:
229 | page_text = f"{page.get('name', '')} {page.get('source', '')}".lower()
230 |
231 | if any(term in page_text for term in query_terms):
232 | page_record_id = f"page:{page['id']}"
233 | matching_ids.append(page_record_id)
234 |
235 | # Cache the page data
236 | self._cache[page_record_id] = {
237 | "id": page_record_id,
238 | "type": "page",
239 | "title": f"Facebook Page: {page.get('name', 'Unnamed Page')}",
240 | "text": f"Facebook Page {page.get('name', 'Unnamed')} (ID: {page.get('id', 'N/A')}) - Source: {page.get('source', 'Unknown')}, Account: {account.get('name', 'Unknown')}",
241 | "metadata": {
242 | "page_id": page.get('id'),
243 | "page_name": page.get('name'),
244 | "source": page.get('source'),
245 | "account_id": account.get('id'),
246 | "account_name": account.get('name'),
247 | "data_type": "meta_ads_page"
248 | },
249 | "raw_data": page
250 | }
251 |
252 | # If query specifically mentions "business" or "businesses", also search businesses
253 | if any(term in ['business', 'businesses', 'company', 'companies'] for term in query_terms):
254 | businesses = await self._get_businesses(access_token, limit=25)
255 | for business in businesses:
256 | business_text = f"{business.get('name', '')} {business.get('verification_status', '')}".lower()
257 |
258 | if any(term in business_text for term in query_terms):
259 | business_record_id = f"business:{business['id']}"
260 | matching_ids.append(business_record_id)
261 |
262 | # Cache the business data
263 | self._cache[business_record_id] = {
264 | "id": business_record_id,
265 | "type": "business",
266 | "title": f"Business: {business.get('name', 'Unnamed Business')}",
267 | "text": f"Meta Business {business.get('name', 'Unnamed')} (ID: {business.get('id', 'N/A')}) - Created: {business.get('created_time', 'Unknown')}, Verification: {business.get('verification_status', 'Unknown')}",
268 | "metadata": {
269 | "business_id": business.get('id'),
270 | "business_name": business.get('name'),
271 | "created_time": business.get('created_time'),
272 | "verification_status": business.get('verification_status'),
273 | "data_type": "meta_ads_business"
274 | },
275 | "raw_data": business
276 | }
277 |
278 | except Exception as e:
279 | logger.error(f"Error during search operation: {e}")
280 | # Return empty list on error, but don't raise exception
281 | return []
282 |
283 | logger.info(f"Search completed. Found {len(matching_ids)} matching records")
284 | return matching_ids[:50] # Limit to 50 results for performance
285 |
286 | def fetch_record(self, record_id: str) -> Optional[Dict[str, Any]]:
287 | """Fetch a cached record by ID
288 |
289 | Args:
290 | record_id: The record ID to fetch
291 |
292 | Returns:
293 | Record data or None if not found
294 | """
295 | logger.info(f"Fetching record: {record_id}")
296 |
297 | record = self._cache.get(record_id)
298 | if record:
299 | logger.debug(f"Record found in cache: {record['type']}")
300 | return record
301 | else:
302 | logger.warning(f"Record not found in cache: {record_id}")
303 | return None
304 |
305 |
306 | # Global data manager instance
307 | _data_manager = MetaAdsDataManager()
308 |
309 |
310 | @mcp_server.tool()
311 | @meta_api_tool
312 | async def search(
313 | query: str,
314 | access_token: Optional[str] = None
315 | ) -> str:
316 | """
317 | Search through Meta Ads data and return matching record IDs.
318 | It searches across ad accounts, campaigns, ads, pages, and businesses to find relevant records
319 | based on the provided query.
320 |
321 | Args:
322 | query: Search query string to find relevant Meta Ads records
323 | access_token: Meta API access token (optional - will use cached token if not provided)
324 |
325 | Returns:
326 | JSON response with list of matching record IDs
327 |
328 | Example Usage:
329 | search(query="active campaigns")
330 | search(query="account spending")
331 | search(query="facebook ads performance")
332 | search(query="facebook pages")
333 | search(query="user businesses")
334 | """
335 | if not query:
336 | return json.dumps({
337 | "error": "query parameter is required",
338 | "ids": []
339 | }, indent=2)
340 |
341 | try:
342 | # Use the data manager to search records
343 | matching_ids = await _data_manager.search_records(query, access_token)
344 |
345 | response = {
346 | "ids": matching_ids,
347 | "query": query,
348 | "total_results": len(matching_ids)
349 | }
350 |
351 | logger.info(f"Search successful. Query: '{query}', Results: {len(matching_ids)}")
352 | return json.dumps(response, indent=2)
353 |
354 | except Exception as e:
355 | error_msg = str(e)
356 | logger.error(f"Error in search tool: {error_msg}")
357 |
358 | return json.dumps({
359 | "error": "Failed to search Meta Ads data",
360 | "details": error_msg,
361 | "ids": [],
362 | "query": query
363 | }, indent=2)
364 |
365 |
366 | @mcp_server.tool()
367 | async def fetch(
368 | id: str
369 | ) -> str:
370 | """
371 | Fetch complete record data by ID.
372 | It retrieves the full data for a specific record identified by its ID.
373 |
374 | Args:
375 | id: The record ID to fetch (format: "type:id", e.g., "account:act_123456")
376 |
377 | Returns:
378 | JSON response with complete record data including id, title, text, and metadata
379 |
380 | Example Usage:
381 | fetch(id="account:act_123456789")
382 | fetch(id="campaign:23842588888640185")
383 | fetch(id="ad:23842614006130185")
384 | fetch(id="page:123456789")
385 | """
386 | if not id:
387 | return json.dumps({
388 | "error": "id parameter is required"
389 | }, indent=2)
390 |
391 | try:
392 | # Use the data manager to fetch the record
393 | record = _data_manager.fetch_record(id)
394 |
395 | if record:
396 | logger.info(f"Record fetched successfully: {id}")
397 | return json.dumps(record, indent=2)
398 | else:
399 | logger.warning(f"Record not found: {id}")
400 | return json.dumps({
401 | "error": f"Record not found: {id}",
402 | "id": id
403 | }, indent=2)
404 |
405 | except Exception as e:
406 | error_msg = str(e)
407 | logger.error(f"Error in fetch tool: {error_msg}")
408 |
409 | return json.dumps({
410 | "error": "Failed to fetch record",
411 | "details": error_msg,
412 | "id": id
413 | }, indent=2)
```
--------------------------------------------------------------------------------
/meta_ads_mcp/core/duplication.py:
--------------------------------------------------------------------------------
```python
1 | """Duplication functionality for Meta Ads API."""
2 |
3 | import json
4 | import os
5 | import httpx
6 | from typing import Optional, Dict, Any, List, Union
7 | from .server import mcp_server
8 | from .api import meta_api_tool
9 | from . import auth
10 | from .http_auth_integration import FastMCPAuthIntegration
11 |
12 |
13 | # Only register the duplication functions if the environment variable is set
14 | ENABLE_DUPLICATION = bool(os.environ.get("META_ADS_ENABLE_DUPLICATION", ""))
15 |
16 | if ENABLE_DUPLICATION:
17 | @mcp_server.tool()
18 | @meta_api_tool
19 | async def duplicate_campaign(
20 | campaign_id: str,
21 | access_token: Optional[str] = None,
22 | name_suffix: Optional[str] = " - Copy",
23 | include_ad_sets: bool = True,
24 | include_ads: bool = True,
25 | include_creatives: bool = True,
26 | copy_schedule: bool = False,
27 | new_daily_budget: Optional[float] = None,
28 | new_status: Optional[str] = "PAUSED"
29 | ) -> str:
30 | """
31 | Duplicate a Meta Ads campaign with all its ad sets and ads.
32 |
33 | Recommended: Use this to run robust experiments.
34 |
35 | Args:
36 | campaign_id: Meta Ads campaign ID to duplicate
37 | name_suffix: Suffix to add to the duplicated campaign name
38 | include_ad_sets: Whether to duplicate ad sets within the campaign
39 | include_ads: Whether to duplicate ads within ad sets
40 | include_creatives: Whether to duplicate ad creatives
41 | copy_schedule: Whether to copy the campaign schedule
42 | new_daily_budget: Override the daily budget for the new campaign
43 | new_status: Status for the new campaign (ACTIVE or PAUSED)
44 | """
45 | return await _forward_duplication_request(
46 | "campaign",
47 | campaign_id,
48 | access_token,
49 | {
50 | "name_suffix": name_suffix,
51 | "include_ad_sets": include_ad_sets,
52 | "include_ads": include_ads,
53 | "include_creatives": include_creatives,
54 | "copy_schedule": copy_schedule,
55 | "new_daily_budget": new_daily_budget,
56 | "new_status": new_status
57 | }
58 | )
59 |
60 | @mcp_server.tool()
61 | @meta_api_tool
62 | async def duplicate_adset(
63 | adset_id: str,
64 | access_token: Optional[str] = None,
65 | target_campaign_id: Optional[str] = None,
66 | name_suffix: Optional[str] = " - Copy",
67 | include_ads: bool = True,
68 | include_creatives: bool = True,
69 | new_daily_budget: Optional[float] = None,
70 | new_targeting: Optional[Dict[str, Any]] = None,
71 | new_status: Optional[str] = "PAUSED"
72 | ) -> str:
73 | """
74 | Duplicate a Meta Ads ad set with its ads.
75 |
76 | Recommended: Use this to run robust experiments.
77 |
78 | Args:
79 | adset_id: Meta Ads ad set ID to duplicate
80 | target_campaign_id: Campaign ID to move the duplicated ad set to (optional)
81 | name_suffix: Suffix to add to the duplicated ad set name
82 | include_ads: Whether to duplicate ads within the ad set
83 | include_creatives: Whether to duplicate ad creatives
84 | new_daily_budget: Override the daily budget for the new ad set
85 | new_targeting: Override targeting settings for the new ad set
86 | new_status: Status for the new ad set (ACTIVE or PAUSED)
87 | """
88 | return await _forward_duplication_request(
89 | "adset",
90 | adset_id,
91 | access_token,
92 | {
93 | "target_campaign_id": target_campaign_id,
94 | "name_suffix": name_suffix,
95 | "include_ads": include_ads,
96 | "include_creatives": include_creatives,
97 | "new_daily_budget": new_daily_budget,
98 | "new_targeting": new_targeting,
99 | "new_status": new_status
100 | }
101 | )
102 |
103 | @mcp_server.tool()
104 | @meta_api_tool
105 | async def duplicate_ad(
106 | ad_id: str,
107 | access_token: Optional[str] = None,
108 | target_adset_id: Optional[str] = None,
109 | name_suffix: Optional[str] = " - Copy",
110 | duplicate_creative: bool = True,
111 | new_creative_name: Optional[str] = None,
112 | new_status: Optional[str] = "PAUSED"
113 | ) -> str:
114 | """
115 | Duplicate a Meta Ads ad.
116 |
117 | Recommended: Use this to run robust experiments.
118 |
119 | Args:
120 | ad_id: Meta Ads ad ID to duplicate
121 | target_adset_id: Ad set ID to move the duplicated ad to (optional)
122 | name_suffix: Suffix to add to the duplicated ad name
123 | duplicate_creative: Whether to duplicate the ad creative
124 | new_creative_name: Override name for the duplicated creative
125 | new_status: Status for the new ad (ACTIVE or PAUSED)
126 | """
127 | return await _forward_duplication_request(
128 | "ad",
129 | ad_id,
130 | access_token,
131 | {
132 | "target_adset_id": target_adset_id,
133 | "name_suffix": name_suffix,
134 | "duplicate_creative": duplicate_creative,
135 | "new_creative_name": new_creative_name,
136 | "new_status": new_status
137 | }
138 | )
139 |
140 | @mcp_server.tool()
141 | @meta_api_tool
142 | async def duplicate_creative(
143 | creative_id: str,
144 | access_token: Optional[str] = None,
145 | name_suffix: Optional[str] = " - Copy",
146 | new_primary_text: Optional[str] = None,
147 | new_headline: Optional[str] = None,
148 | new_description: Optional[str] = None,
149 | new_cta_type: Optional[str] = None,
150 | new_destination_url: Optional[str] = None
151 | ) -> str:
152 | """
153 | Duplicate a Meta Ads creative.
154 |
155 | Recommended: Use this to run robust experiments.
156 |
157 | Args:
158 | creative_id: Meta Ads creative ID to duplicate
159 | name_suffix: Suffix to add to the duplicated creative name
160 | new_primary_text: Override the primary text for the new creative
161 | new_headline: Override the headline for the new creative
162 | new_description: Override the description for the new creative
163 | new_cta_type: Override the call-to-action type for the new creative
164 | new_destination_url: Override the destination URL for the new creative
165 | """
166 | return await _forward_duplication_request(
167 | "creative",
168 | creative_id,
169 | access_token,
170 | {
171 | "name_suffix": name_suffix,
172 | "new_primary_text": new_primary_text,
173 | "new_headline": new_headline,
174 | "new_description": new_description,
175 | "new_cta_type": new_cta_type,
176 | "new_destination_url": new_destination_url
177 | }
178 | )
179 |
180 |
181 | async def _forward_duplication_request(resource_type: str, resource_id: str, access_token: str, options: Dict[str, Any]) -> str:
182 | """
183 | Forward duplication request to the cloud-hosted MCP API using dual-header authentication.
184 |
185 | This implements the dual-header authentication pattern for MCP server callbacks:
186 | - Authorization: Bearer <facebook_token> - Facebook access token for Meta API calls
187 | - X-Pipeboard-Token: <pipeboard_token> - Pipeboard API token for authentication
188 |
189 | Args:
190 | resource_type: Type of resource to duplicate (campaign, adset, ad, creative)
191 | resource_id: ID of the resource to duplicate
192 | access_token: Meta API access token (optional, will use context if not provided)
193 | options: Duplication options
194 | """
195 | try:
196 | # Get tokens from the request context that were set by the HTTP auth middleware
197 | # In the dual-header authentication pattern:
198 | # - Pipeboard token comes from X-Pipeboard-Token header (for authentication)
199 | # - Facebook token comes from Authorization header (for Meta API calls)
200 |
201 | # Get tokens from context set by AuthInjectionMiddleware
202 | pipeboard_token = FastMCPAuthIntegration.get_pipeboard_token()
203 | facebook_token = FastMCPAuthIntegration.get_auth_token()
204 |
205 | # Use provided access_token parameter if no Facebook token found in context
206 | if not facebook_token:
207 | facebook_token = access_token if access_token else await auth.get_current_access_token()
208 |
209 | # Validate we have both required tokens
210 | if not pipeboard_token:
211 | return json.dumps({
212 | "error": "authentication_required",
213 | "message": "Pipeboard API token not found",
214 | "details": {
215 | "required": "Valid Pipeboard token via X-Pipeboard-Token header",
216 | "received_headers": "Check that the MCP server is forwarding the X-Pipeboard-Token header"
217 | }
218 | }, indent=2)
219 |
220 | if not facebook_token:
221 | return json.dumps({
222 | "error": "authentication_required",
223 | "message": "Meta Ads access token not found",
224 | "details": {
225 | "required": "Valid Meta access token from authenticated session",
226 | "check": "Ensure Facebook account is connected and token is valid"
227 | }
228 | }, indent=2)
229 |
230 | # Construct the API endpoint
231 | base_url = "https://mcp.pipeboard.co"
232 | endpoint = f"{base_url}/api/meta/duplicate/{resource_type}/{resource_id}"
233 |
234 | # Prepare the dual-header authentication as per API documentation
235 | headers = {
236 | "Authorization": f"Bearer {facebook_token}", # Facebook token for Meta API
237 | "X-Pipeboard-Token": pipeboard_token, # Pipeboard token for auth
238 | "Content-Type": "application/json",
239 | "User-Agent": "meta-ads-mcp/1.0"
240 | }
241 |
242 | # Remove None values from options
243 | clean_options = {k: v for k, v in options.items() if v is not None}
244 |
245 | # Make the request to the cloud service
246 | async with httpx.AsyncClient(timeout=30.0) as client:
247 | response = await client.post(
248 | endpoint,
249 | headers=headers,
250 | json=clean_options
251 | )
252 |
253 | if response.status_code == 200:
254 | result = response.json()
255 | return json.dumps(result, indent=2)
256 | elif response.status_code == 400:
257 | # Validation failed
258 | try:
259 | error_data = response.json()
260 | return json.dumps({
261 | "success": False,
262 | "error": "validation_failed",
263 | "errors": error_data.get("errors", [response.text]),
264 | "warnings": error_data.get("warnings", [])
265 | }, indent=2)
266 | except:
267 | return json.dumps({
268 | "success": False,
269 | "error": "validation_failed",
270 | "errors": [response.text],
271 | "warnings": []
272 | }, indent=2)
273 | elif response.status_code == 401:
274 | return json.dumps({
275 | "success": False,
276 | "error": "authentication_error",
277 | "message": "Invalid or expired API token"
278 | }, indent=2)
279 | elif response.status_code == 402:
280 | try:
281 | error_data = response.json()
282 | return json.dumps({
283 | "success": False,
284 | "error": "subscription_required",
285 | "message": error_data.get("message", "This feature is not available in your current plan"),
286 | "upgrade_url": error_data.get("upgrade_url", "https://pipeboard.co/upgrade"),
287 | "suggestion": error_data.get("suggestion", "Please upgrade your account to access this feature")
288 | }, indent=2)
289 | except:
290 | return json.dumps({
291 | "success": False,
292 | "error": "subscription_required",
293 | "message": "This feature is not available in your current plan",
294 | "upgrade_url": "https://pipeboard.co/upgrade",
295 | "suggestion": "Please upgrade your account to access this feature"
296 | }, indent=2)
297 | elif response.status_code == 403:
298 | try:
299 | error_data = response.json()
300 | # Check if this is a premium feature error
301 | if error_data.get("error") == "premium_feature":
302 | return json.dumps({
303 | "success": False,
304 | "error": "premium_feature_required",
305 | "message": error_data.get("message", "This is a premium feature that requires subscription"),
306 | "details": error_data.get("details", {
307 | "upgrade_url": "https://pipeboard.co/upgrade",
308 | "suggestion": "Please upgrade your account to access this feature"
309 | })
310 | }, indent=2)
311 | else:
312 | # Default to facebook connection required
313 | return json.dumps({
314 | "success": False,
315 | "error": "facebook_connection_required",
316 | "message": error_data.get("message", "You need to connect your Facebook account first"),
317 | "details": error_data.get("details", {
318 | "login_flow_url": "/connections",
319 | "auth_flow_url": "/api/meta/auth"
320 | })
321 | }, indent=2)
322 | except:
323 | return json.dumps({
324 | "success": False,
325 | "error": "facebook_connection_required",
326 | "message": "You need to connect your Facebook account first",
327 | "details": {
328 | "login_flow_url": "/connections",
329 | "auth_flow_url": "/api/meta/auth"
330 | }
331 | }, indent=2)
332 | elif response.status_code == 404:
333 | return json.dumps({
334 | "success": False,
335 | "error": "resource_not_found",
336 | "message": f"{resource_type.title()} not found or access denied",
337 | "suggestion": f"Verify the {resource_type} ID and your Facebook account permissions"
338 | }, indent=2)
339 | elif response.status_code == 429:
340 | return json.dumps({
341 | "error": "rate_limit_exceeded",
342 | "message": "Meta API rate limit exceeded",
343 | "details": {
344 | "suggestion": "Please wait before retrying",
345 | "retry_after": response.headers.get("Retry-After", "60")
346 | }
347 | }, indent=2)
348 | elif response.status_code == 502:
349 | try:
350 | error_data = response.json()
351 | return json.dumps({
352 | "success": False,
353 | "error": "meta_api_error",
354 | "message": error_data.get("message", "Facebook API error"),
355 | "recoverable": True,
356 | "suggestion": "Please wait 5 minutes before retrying"
357 | }, indent=2)
358 | except:
359 | return json.dumps({
360 | "success": False,
361 | "error": "meta_api_error",
362 | "message": "Facebook API error",
363 | "recoverable": True,
364 | "suggestion": "Please wait 5 minutes before retrying"
365 | }, indent=2)
366 | else:
367 | error_detail = response.text
368 | try:
369 | error_json = response.json()
370 | error_detail = error_json.get("message", error_detail)
371 | except:
372 | pass
373 |
374 | return json.dumps({
375 | "error": "duplication_failed",
376 | "message": f"Failed to duplicate {resource_type}",
377 | "details": {
378 | "status_code": response.status_code,
379 | "error_detail": error_detail,
380 | "resource_type": resource_type,
381 | "resource_id": resource_id
382 | }
383 | }, indent=2)
384 |
385 | except httpx.TimeoutException:
386 | return json.dumps({
387 | "error": "request_timeout",
388 | "message": "Request to duplication service timed out",
389 | "details": {
390 | "suggestion": "Please try again later",
391 | "timeout": "30 seconds"
392 | }
393 | }, indent=2)
394 |
395 | except httpx.RequestError as e:
396 | return json.dumps({
397 | "error": "network_error",
398 | "message": "Failed to connect to duplication service",
399 | "details": {
400 | "error": str(e),
401 | "suggestion": "Check your internet connection and try again"
402 | }
403 | }, indent=2)
404 |
405 | except Exception as e:
406 | return json.dumps({
407 | "error": "unexpected_error",
408 | "message": f"Unexpected error during {resource_type} duplication",
409 | "details": {
410 | "error": str(e),
411 | "resource_type": resource_type,
412 | "resource_id": resource_id
413 | }
414 | }, indent=2)
415 |
416 |
417 | def _get_estimated_components(resource_type: str, options: Dict[str, Any]) -> Dict[str, Any]:
418 | """Get estimated components that would be duplicated."""
419 | if resource_type == "campaign":
420 | components = {"campaigns": 1}
421 | if options.get("include_ad_sets", True):
422 | components["ad_sets"] = "3-5 (estimated)"
423 | if options.get("include_ads", True):
424 | components["ads"] = "5-15 (estimated)"
425 | if options.get("include_creatives", True):
426 | components["creatives"] = "5-15 (estimated)"
427 | return components
428 | elif resource_type == "adset":
429 | components = {"ad_sets": 1}
430 | if options.get("include_ads", True):
431 | components["ads"] = "2-5 (estimated)"
432 | if options.get("include_creatives", True):
433 | components["creatives"] = "2-5 (estimated)"
434 | return components
435 | elif resource_type == "ad":
436 | components = {"ads": 1}
437 | if options.get("duplicate_creative", True):
438 | components["creatives"] = 1
439 | return components
440 | elif resource_type == "creative":
441 | return {"creatives": 1}
442 |
443 | return {}
```