#
tokens: 45240/50000 10/82 files (page 3/6)
lines: on (toggle) GitHub
raw markdown copy reset
This is page 3 of 6. Use http://codebase.md/nictuku/meta-ads-mcp?lines=true&page={x} to view the full context.

# Directory Structure

```
├── .github
│   └── workflows
│       ├── publish-mcp.yml
│       ├── publish.yml
│       └── test.yml
├── .gitignore
├── .python-version
├── .uv.toml
├── CUSTOM_META_APP.md
├── Dockerfile
├── examples
│   ├── example_http_client.py
│   └── README.md
├── future_improvements.md
├── images
│   └── meta-ads-example.png
├── LICENSE
├── LOCAL_INSTALLATION.md
├── meta_ads_auth.sh
├── meta_ads_mcp
│   ├── __init__.py
│   ├── __main__.py
│   └── core
│       ├── __init__.py
│       ├── accounts.py
│       ├── ads_library.py
│       ├── ads.py
│       ├── adsets.py
│       ├── api.py
│       ├── auth.py
│       ├── authentication.py
│       ├── budget_schedules.py
│       ├── callback_server.py
│       ├── campaigns.py
│       ├── duplication.py
│       ├── http_auth_integration.py
│       ├── insights.py
│       ├── openai_deep_research.py
│       ├── pipeboard_auth.py
│       ├── reports.py
│       ├── resources.py
│       ├── server.py
│       ├── targeting.py
│       └── utils.py
├── META_API_NOTES.md
├── poetry.lock
├── pyproject.toml
├── README.md
├── RELEASE.md
├── requirements.txt
├── server.json
├── setup.py
├── smithery.yaml
├── STREAMABLE_HTTP_SETUP.md
└── tests
    ├── __init__.py
    ├── conftest.py
    ├── e2e_account_info_search_issue.py
    ├── README_REGRESSION_TESTS.md
    ├── README.md
    ├── test_account_info_access_fix.py
    ├── test_account_search.py
    ├── test_budget_update_e2e.py
    ├── test_budget_update.py
    ├── test_create_ad_creative_simple.py
    ├── test_create_simple_creative_e2e.py
    ├── test_dsa_beneficiary.py
    ├── test_dsa_integration.py
    ├── test_duplication_regression.py
    ├── test_duplication.py
    ├── test_dynamic_creatives.py
    ├── test_estimate_audience_size_e2e.py
    ├── test_estimate_audience_size.py
    ├── test_get_account_pages.py
    ├── test_get_ad_creatives_fix.py
    ├── test_get_ad_image_quality_improvements.py
    ├── test_get_ad_image_regression.py
    ├── test_http_transport.py
    ├── test_insights_actions_and_values_e2e.py
    ├── test_insights_pagination.py
    ├── test_integration_openai_mcp.py
    ├── test_is_dynamic_creative_adset.py
    ├── test_mobile_app_adset_creation.py
    ├── test_mobile_app_adset_issue.py
    ├── test_openai_mcp_deep_research.py
    ├── test_openai.py
    ├── test_page_discovery_integration.py
    ├── test_page_discovery.py
    ├── test_targeting_search_e2e.py
    ├── test_targeting.py
    ├── test_update_ad_creative_id.py
    └── test_upload_ad_image.py
```

# Files

--------------------------------------------------------------------------------
/tests/test_targeting.py:
--------------------------------------------------------------------------------

```python
  1 | #!/usr/bin/env python3
  2 | """
  3 | Unit tests for targeting search functionality in Meta Ads MCP.
  4 | """
  5 | 
  6 | import pytest
  7 | import json
  8 | from unittest.mock import AsyncMock, patch
  9 | 
 10 | from meta_ads_mcp.core.targeting import (
 11 |     search_interests,
 12 |     get_interest_suggestions,
 13 |     estimate_audience_size,
 14 |     search_behaviors,
 15 |     search_demographics,
 16 |     search_geo_locations
 17 | )
 18 | 
 19 | 
 20 | class TestSearchInterests:
 21 |     """Test cases for search_interests function"""
 22 |     
 23 |     @pytest.mark.asyncio
 24 |     async def test_search_interests_success(self):
 25 |         """Test successful interest search"""
 26 |         mock_response = {
 27 |             "data": [
 28 |                 {
 29 |                     "id": "6003139266461",
 30 |                     "name": "Movies",
 31 |                     "audience_size": 1234567890,
 32 |                     "path": ["Entertainment", "Movies"]
 33 |                 },
 34 |                 {
 35 |                     "id": "6003397425735", 
 36 |                     "name": "Tennis",
 37 |                     "audience_size": 987654321,
 38 |                     "path": ["Sports", "Tennis"]
 39 |                 }
 40 |             ]
 41 |         }
 42 |         
 43 |         with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
 44 |             mock_api.return_value = mock_response
 45 |             
 46 |             result = await search_interests(access_token="test_token", query="movies", limit=10)
 47 |             
 48 |             # Verify API call
 49 |             mock_api.assert_called_once_with(
 50 |                 "search",
 51 |                 "test_token",
 52 |                 {
 53 |                     "type": "adinterest",
 54 |                     "q": "movies",
 55 |                     "limit": 10
 56 |                 }
 57 |             )
 58 |             
 59 |             # Verify response
 60 |             result_data = json.loads(result)
 61 |             assert result_data == mock_response
 62 |             assert len(result_data["data"]) == 2
 63 |             assert result_data["data"][0]["name"] == "Movies"
 64 |     
 65 |     @pytest.mark.asyncio
 66 |     async def test_search_interests_no_query(self):
 67 |         """Test search_interests with empty query parameter"""
 68 |         result = await search_interests(
 69 |             query="",  # Now provide the required parameter but with empty value
 70 |             access_token="test_token"
 71 |         )
 72 |         
 73 |         result_data = json.loads(result)
 74 |         # The @meta_api_tool decorator wraps errors in a 'data' field
 75 |         assert "data" in result_data
 76 |         nested_data = json.loads(result_data["data"])
 77 |         assert "error" in nested_data
 78 |         assert nested_data["error"] == "No search query provided"
 79 |     
 80 |     @pytest.mark.asyncio
 81 |     async def test_search_interests_default_limit(self):
 82 |         """Test search_interests with default limit"""
 83 |         mock_response = {"data": []}
 84 |         
 85 |         # Mock both the API request and the auth system to bypass decorator issues
 86 |         with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
 87 |             with patch('meta_ads_mcp.core.auth.get_current_access_token') as mock_auth:
 88 |                 mock_auth.return_value = "test_token"
 89 |                 mock_api.return_value = mock_response
 90 |                 
 91 |                 result = await search_interests(query="test")
 92 |                 
 93 |                 # Verify default limit is used
 94 |                 mock_api.assert_called_once_with(
 95 |                     "search",
 96 |                     "test_token",
 97 |                     {
 98 |                         "type": "adinterest",
 99 |                         "q": "test",
100 |                         "limit": 25
101 |                     }
102 |                 )
103 |                 
104 |                 # Verify the result is properly formatted
105 |                 result_data = json.loads(result)
106 |                 assert "data" in result_data
107 | 
108 | 
109 | class TestGetInterestSuggestions:
110 |     """Test cases for get_interest_suggestions function"""
111 |     
112 |     @pytest.mark.asyncio
113 |     async def test_get_interest_suggestions_success(self):
114 |         """Test successful interest suggestions"""
115 |         mock_response = {
116 |             "data": [
117 |                 {
118 |                     "id": "6003022269556",
119 |                     "name": "Rugby football",
120 |                     "audience_size": 13214830,
121 |                     "path": [],
122 |                     "description": None
123 |                 },
124 |                 {
125 |                     "id": "6003146664949",
126 |                     "name": "Netball", 
127 |                     "audience_size": 4333770,
128 |                     "path": [],
129 |                     "description": None
130 |                 }
131 |             ]
132 |         }
133 |         
134 |         with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
135 |             mock_api.return_value = mock_response
136 |             
137 |             result = await get_interest_suggestions(
138 |                 access_token="test_token",
139 |                 interest_list=["Basketball", "Soccer"],
140 |                 limit=15
141 |             )
142 |             
143 |             # Verify API call
144 |             mock_api.assert_called_once_with(
145 |                 "search",
146 |                 "test_token",
147 |                 {
148 |                     "type": "adinterestsuggestion",
149 |                     "interest_list": '["Basketball", "Soccer"]',
150 |                     "limit": 15
151 |                 }
152 |             )
153 |             
154 |             # Verify response
155 |             result_data = json.loads(result)
156 |             assert result_data == mock_response
157 |             assert len(result_data["data"]) == 2
158 |     
159 |     @pytest.mark.asyncio
160 |     async def test_get_interest_suggestions_no_list(self):
161 |         """Test get_interest_suggestions with empty interest list"""
162 |         result = await get_interest_suggestions(
163 |             interest_list=[],  # Now provide the required parameter but with empty value
164 |             access_token="test_token"
165 |         )
166 |         
167 |         result_data = json.loads(result)
168 |         # The @meta_api_tool decorator wraps errors in a 'data' field
169 |         assert "data" in result_data
170 |         nested_data = json.loads(result_data["data"])
171 |         assert "error" in nested_data
172 |         assert nested_data["error"] == "No interest list provided"
173 | 
174 | 
175 | class TestEstimateAudienceSizeBackwardsCompatibility:
176 |     """Test cases for estimate_audience_size function backwards compatibility"""
177 |     
178 |     @pytest.mark.asyncio
179 |     async def test_validate_interests_by_name_success(self):
180 |         """Test successful interest validation by name"""
181 |         mock_response = {
182 |             "data": [
183 |                 {
184 |                     "name": "Japan",
185 |                     "valid": True,
186 |                     "id": 6003700426513,
187 |                     "audience_size": 68310258
188 |                 },
189 |                 {
190 |                     "name": "nonexistantkeyword",
191 |                     "valid": False
192 |                 }
193 |             ]
194 |         }
195 |         
196 |         with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
197 |             mock_api.return_value = mock_response
198 |             
199 |             result = await estimate_audience_size(
200 |                 access_token="test_token",
201 |                 interest_list=["Japan", "nonexistantkeyword"]
202 |             )
203 |             
204 |             # Verify API call
205 |             mock_api.assert_called_once_with(
206 |                 "search",
207 |                 "test_token",
208 |                 {
209 |                     "type": "adinterestvalid",
210 |                     "interest_list": '["Japan", "nonexistantkeyword"]'
211 |                 }
212 |             )
213 |             
214 |             # Verify response
215 |             result_data = json.loads(result)
216 |             assert result_data == mock_response
217 |             assert result_data["data"][0]["valid"] is True
218 |             assert result_data["data"][1]["valid"] is False
219 |     
220 |     @pytest.mark.asyncio
221 |     async def test_validate_interests_by_fbid_success(self):
222 |         """Test successful interest validation by FBID"""
223 |         mock_response = {
224 |             "data": [
225 |                 {
226 |                     "id": "6003700426513",
227 |                     "valid": True,
228 |                     "audience_size": 68310258
229 |                 }
230 |             ]
231 |         }
232 |         
233 |         with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
234 |             mock_api.return_value = mock_response
235 |             
236 |             result = await estimate_audience_size(
237 |                 access_token="test_token",
238 |                 interest_fbid_list=["6003700426513"]
239 |             )
240 |             
241 |             # Verify API call
242 |             mock_api.assert_called_once_with(
243 |                 "search",
244 |                 "test_token",
245 |                 {
246 |                     "type": "adinterestvalid",
247 |                     "interest_fbid_list": '["6003700426513"]'
248 |                 }
249 |             )
250 |             
251 |             # Verify response
252 |             result_data = json.loads(result)
253 |             assert result_data == mock_response
254 |     
255 |     @pytest.mark.asyncio
256 |     async def test_validate_interests_no_input(self):
257 |         """Test estimate_audience_size with no input lists (backwards compatibility)"""
258 |         result = await estimate_audience_size(access_token="test_token")
259 |         
260 |         result_data = json.loads(result)
261 |         # The @meta_api_tool decorator wraps errors in a 'data' field
262 |         assert "data" in result_data
263 |         nested_data = json.loads(result_data["data"])
264 |         assert "error" in nested_data
265 |         assert nested_data["error"] == "No interest list or FBID list provided"
266 | 
267 | 
268 | class TestSearchBehaviors:
269 |     """Test cases for search_behaviors function"""
270 |     
271 |     @pytest.mark.asyncio
272 |     async def test_search_behaviors_success(self):
273 |         """Test successful behavior search"""
274 |         mock_response = {
275 |             "data": [
276 |                 {
277 |                     "id": 6007101597783,
278 |                     "name": "Business Travelers",
279 |                     "audience_size_lower_bound": 1000000,
280 |                     "audience_size_upper_bound": 2000000,
281 |                     "path": ["Travel", "Business Travel"],
282 |                     "description": "People who travel for business",
283 |                     "type": "behaviors"
284 |                 }
285 |             ]
286 |         }
287 |         
288 |         with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
289 |             mock_api.return_value = mock_response
290 |             
291 |             result = await search_behaviors(access_token="test_token", limit=25)
292 |             
293 |             # Verify API call
294 |             mock_api.assert_called_once_with(
295 |                 "search",
296 |                 "test_token",
297 |                 {
298 |                     "type": "adTargetingCategory",
299 |                     "class": "behaviors",
300 |                     "limit": 25
301 |                 }
302 |             )
303 |             
304 |             # Verify response
305 |             result_data = json.loads(result)
306 |             assert result_data == mock_response
307 | 
308 | 
309 | class TestSearchDemographics:
310 |     """Test cases for search_demographics function"""
311 |     
312 |     @pytest.mark.asyncio
313 |     async def test_search_demographics_success(self):
314 |         """Test successful demographics search"""
315 |         mock_response = {
316 |             "data": [
317 |                 {
318 |                     "id": 6015559470583,
319 |                     "name": "Parents (All)",
320 |                     "audience_size_lower_bound": 500000000,
321 |                     "audience_size_upper_bound": 750000000,
322 |                     "path": ["Family", "Parents"],
323 |                     "description": "Parents of children of any age",
324 |                     "type": "demographics"
325 |                 }
326 |             ]
327 |         }
328 |         
329 |         with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
330 |             mock_api.return_value = mock_response
331 |             
332 |             result = await search_demographics(
333 |                 access_token="test_token",
334 |                 demographic_class="life_events",
335 |                 limit=30
336 |             )
337 |             
338 |             # Verify API call
339 |             mock_api.assert_called_once_with(
340 |                 "search",
341 |                 "test_token",
342 |                 {
343 |                     "type": "adTargetingCategory",
344 |                     "class": "life_events",
345 |                     "limit": 30
346 |                 }
347 |             )
348 |             
349 |             # Verify response
350 |             result_data = json.loads(result)
351 |             assert result_data == mock_response
352 | 
353 | 
354 | class TestSearchGeoLocations:
355 |     """Test cases for search_geo_locations function"""
356 |     
357 |     @pytest.mark.asyncio
358 |     async def test_search_geo_locations_success(self):
359 |         """Test successful geo location search"""
360 |         mock_response = {
361 |             "data": [
362 |                 {
363 |                     "key": "US",
364 |                     "name": "United States",
365 |                     "type": "country",
366 |                     "supports_city": True,
367 |                     "supports_region": True
368 |                 },
369 |                 {
370 |                     "key": "3847",
371 |                     "name": "California",
372 |                     "type": "region",
373 |                     "country_code": "US",
374 |                     "country_name": "United States",
375 |                     "supports_city": True,
376 |                     "supports_region": True
377 |                 }
378 |             ]
379 |         }
380 |         
381 |         with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
382 |             mock_api.return_value = mock_response
383 |             
384 |             result = await search_geo_locations(
385 |                 access_token="test_token",
386 |                 query="United States",
387 |                 location_types=["country", "region"],
388 |                 limit=10
389 |             )
390 |             
391 |             # Verify API call
392 |             mock_api.assert_called_once_with(
393 |                 "search",
394 |                 "test_token",
395 |                 {
396 |                     "type": "adgeolocation",
397 |                     "q": "United States",
398 |                     "location_types": '["country", "region"]',
399 |                     "limit": 10
400 |                 }
401 |             )
402 |             
403 |             # Verify response
404 |             result_data = json.loads(result)
405 |             assert result_data == mock_response
406 |     
407 |     @pytest.mark.asyncio
408 |     async def test_search_geo_locations_no_query(self):
409 |         """Test search_geo_locations with empty query"""
410 |         result = await search_geo_locations(
411 |             query="",  # Now provide the required parameter but with empty value
412 |             access_token="test_token"
413 |         )
414 |         
415 |         result_data = json.loads(result)
416 |         # The @meta_api_tool decorator wraps errors in a 'data' field
417 |         assert "data" in result_data
418 |         nested_data = json.loads(result_data["data"])
419 |         assert "error" in nested_data
420 |         assert nested_data["error"] == "No search query provided"
421 |     
422 |     @pytest.mark.asyncio
423 |     async def test_search_geo_locations_no_location_types(self):
424 |         """Test search_geo_locations without location_types filter"""
425 |         mock_response = {"data": []}
426 |         
427 |         # Mock both the API request and the auth system to bypass decorator issues
428 |         with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
429 |             with patch('meta_ads_mcp.core.auth.get_current_access_token') as mock_auth:
430 |                 mock_auth.return_value = "test_token"
431 |                 mock_api.return_value = mock_response
432 |                 
433 |                 result = await search_geo_locations(query="test")
434 |                 
435 |                 # Verify API call doesn't include location_types
436 |                 mock_api.assert_called_once_with(
437 |                     "search",
438 |                     "test_token",
439 |                     {
440 |                         "type": "adgeolocation",
441 |                         "q": "test",
442 |                         "limit": 25
443 |                     }
444 |                 )
445 |                 
446 |                 # Verify the result is properly formatted
447 |                 result_data = json.loads(result)
448 |                 assert "data" in result_data 
```

--------------------------------------------------------------------------------
/meta_ads_mcp/core/adsets.py:
--------------------------------------------------------------------------------

```python
  1 | """Ad Set-related functionality for Meta Ads API."""
  2 | 
  3 | import json
  4 | from typing import Optional, Dict, Any, List
  5 | from .api import meta_api_tool, make_api_request
  6 | from .accounts import get_ad_accounts
  7 | from .server import mcp_server
  8 | 
  9 | 
 10 | @mcp_server.tool()
 11 | @meta_api_tool
 12 | async def get_adsets(account_id: str, access_token: Optional[str] = None, limit: int = 10, campaign_id: str = "") -> str:
 13 |     """
 14 |     Get ad sets for a Meta Ads account with optional filtering by campaign.
 15 |     
 16 |     Args:
 17 |         account_id: Meta Ads account ID (format: act_XXXXXXXXX)
 18 |         access_token: Meta API access token (optional - will use cached token if not provided)
 19 |         limit: Maximum number of ad sets to return (default: 10)
 20 |         campaign_id: Optional campaign ID to filter by
 21 |     """
 22 |     # Require explicit account_id
 23 |     if not account_id:
 24 |         return json.dumps({"error": "No account ID specified"}, indent=2)
 25 |     
 26 |     # Change endpoint based on whether campaign_id is provided
 27 |     if campaign_id:
 28 |         endpoint = f"{campaign_id}/adsets"
 29 |         params = {
 30 |             "fields": "id,name,campaign_id,status,daily_budget,lifetime_budget,targeting,bid_amount,bid_strategy,optimization_goal,billing_event,start_time,end_time,created_time,updated_time,is_dynamic_creative,frequency_control_specs{event,interval_days,max_frequency}",
 31 |             "limit": limit
 32 |         }
 33 |     else:
 34 |         # Use account endpoint if no campaign_id is given
 35 |         endpoint = f"{account_id}/adsets"
 36 |         params = {
 37 |             "fields": "id,name,campaign_id,status,daily_budget,lifetime_budget,targeting,bid_amount,bid_strategy,optimization_goal,billing_event,start_time,end_time,created_time,updated_time,is_dynamic_creative,frequency_control_specs{event,interval_days,max_frequency}",
 38 |             "limit": limit
 39 |         }
 40 |         # Note: Removed the attempt to add campaign_id to params for the account endpoint case, 
 41 |         # as it was ineffective and the logic now uses the correct endpoint for campaign filtering.
 42 | 
 43 |     data = await make_api_request(endpoint, access_token, params)
 44 |     
 45 |     return json.dumps(data, indent=2)
 46 | 
 47 | 
 48 | @mcp_server.tool()
 49 | @meta_api_tool
 50 | async def get_adset_details(adset_id: str, access_token: Optional[str] = None) -> str:
 51 |     """
 52 |     Get detailed information about a specific ad set.
 53 |     
 54 |     Args:
 55 |         adset_id: Meta Ads ad set ID
 56 |         access_token: Meta API access token (optional - will use cached token if not provided)
 57 |     
 58 |     Example:
 59 |         To call this function through MCP, pass the adset_id as the first argument:
 60 |         {
 61 |             "args": "YOUR_ADSET_ID"
 62 |         }
 63 |     """
 64 |     if not adset_id:
 65 |         return json.dumps({"error": "No ad set ID provided"}, indent=2)
 66 |     
 67 |     endpoint = f"{adset_id}"
 68 |     # Explicitly prioritize frequency_control_specs in the fields request
 69 |     params = {
 70 |         "fields": "id,name,campaign_id,status,frequency_control_specs{event,interval_days,max_frequency},daily_budget,lifetime_budget,targeting,bid_amount,bid_strategy,optimization_goal,billing_event,start_time,end_time,created_time,updated_time,attribution_spec,destination_type,promoted_object,pacing_type,budget_remaining,dsa_beneficiary,is_dynamic_creative"
 71 |     }
 72 |     
 73 |     data = await make_api_request(endpoint, access_token, params)
 74 |     
 75 |     # For debugging - check if frequency_control_specs was returned
 76 |     if 'frequency_control_specs' not in data:
 77 |         data['_meta'] = {
 78 |             'note': 'No frequency_control_specs field was returned by the API. This means either no frequency caps are set or the API did not include this field in the response.'
 79 |         }
 80 |     
 81 |     return json.dumps(data, indent=2)
 82 | 
 83 | 
 84 | @mcp_server.tool()
 85 | @meta_api_tool
 86 | async def create_adset(
 87 |     account_id: str, 
 88 |     campaign_id: str, 
 89 |     name: str,
 90 |     optimization_goal: str,
 91 |     billing_event: str,
 92 |     status: str = "PAUSED",
 93 |     daily_budget: Optional[int] = None,
 94 |     lifetime_budget: Optional[int] = None,
 95 |     targeting: Optional[Dict[str, Any]] = None,
 96 |     bid_amount: Optional[int] = None,
 97 |     bid_strategy: Optional[str] = None,
 98 |     start_time: Optional[str] = None,
 99 |     end_time: Optional[str] = None,
100 |     dsa_beneficiary: Optional[str] = None,
101 |     promoted_object: Optional[Dict[str, Any]] = None,
102 |     destination_type: Optional[str] = None,
103 |     is_dynamic_creative: Optional[bool] = None,
104 |     access_token: Optional[str] = None
105 | ) -> str:
106 |     """
107 |     Create a new ad set in a Meta Ads account.
108 |     
109 |     Args:
110 |         account_id: Meta Ads account ID (format: act_XXXXXXXXX)
111 |         campaign_id: Meta Ads campaign ID this ad set belongs to
112 |         name: Ad set name
113 |         optimization_goal: Conversion optimization goal (e.g., 'LINK_CLICKS', 'REACH', 'CONVERSIONS', 'APP_INSTALLS')
114 |         billing_event: How you're charged (e.g., 'IMPRESSIONS', 'LINK_CLICKS')
115 |         status: Initial ad set status (default: PAUSED)
116 |         daily_budget: Daily budget in account currency (in cents) as a string
117 |         lifetime_budget: Lifetime budget in account currency (in cents) as a string
118 |         targeting: Targeting specifications including age, location, interests, etc.
119 |                   Use targeting_automation.advantage_audience=1 for automatic audience finding
120 |         bid_amount: Bid amount in account currency (in cents)
121 |         bid_strategy: Bid strategy (e.g., 'LOWEST_COST', 'LOWEST_COST_WITH_BID_CAP')
122 |         start_time: Start time in ISO 8601 format (e.g., '2023-12-01T12:00:00-0800')
123 |         end_time: End time in ISO 8601 format
124 |         dsa_beneficiary: DSA beneficiary (person/organization benefiting from ads) for European compliance
125 |         promoted_object: Mobile app configuration for APP_INSTALLS campaigns. Required fields: application_id, object_store_url.
126 |                         Optional fields: custom_event_type, pixel_id, page_id.
127 |                         Example: {"application_id": "123456789012345", "object_store_url": "https://apps.apple.com/app/id123456789"}
128 |         destination_type: Where users are directed after clicking the ad (e.g., 'APP_STORE', 'DEEPLINK', 'APP_INSTALL', 'ON_AD').
129 |                           Required for mobile app campaigns and lead generation campaigns.
130 |                           Use 'ON_AD' for lead generation campaigns where user interaction happens within the ad.
131 |         is_dynamic_creative: Enable Dynamic Creative for this ad set (required when using dynamic creatives with asset_feed_spec/dynamic_creative_spec).
132 |         access_token: Meta API access token (optional - will use cached token if not provided)
133 |     """
134 |     # Check required parameters
135 |     if not account_id:
136 |         return json.dumps({"error": "No account ID provided"}, indent=2)
137 |     
138 |     if not campaign_id:
139 |         return json.dumps({"error": "No campaign ID provided"}, indent=2)
140 |     
141 |     if not name:
142 |         return json.dumps({"error": "No ad set name provided"}, indent=2)
143 |     
144 |     if not optimization_goal:
145 |         return json.dumps({"error": "No optimization goal provided"}, indent=2)
146 |     
147 |     if not billing_event:
148 |         return json.dumps({"error": "No billing event provided"}, indent=2)
149 |     
150 |     # Validate mobile app parameters for APP_INSTALLS campaigns
151 |     if optimization_goal == "APP_INSTALLS":
152 |         if not promoted_object:
153 |             return json.dumps({
154 |                 "error": "promoted_object is required for APP_INSTALLS optimization goal",
155 |                 "details": "Mobile app campaigns must specify which app is being promoted",
156 |                 "required_fields": ["application_id", "object_store_url"]
157 |             }, indent=2)
158 |         
159 |         # Validate promoted_object structure
160 |         if not isinstance(promoted_object, dict):
161 |             return json.dumps({
162 |                 "error": "promoted_object must be a dictionary",
163 |                 "example": {"application_id": "123456789012345", "object_store_url": "https://apps.apple.com/app/id123456789"}
164 |             }, indent=2)
165 |         
166 |         # Validate required promoted_object fields
167 |         if "application_id" not in promoted_object:
168 |             return json.dumps({
169 |                 "error": "promoted_object missing required field: application_id",
170 |                 "details": "application_id is the Facebook app ID for your mobile app"
171 |             }, indent=2)
172 |         
173 |         if "object_store_url" not in promoted_object:
174 |             return json.dumps({
175 |                 "error": "promoted_object missing required field: object_store_url", 
176 |                 "details": "object_store_url should be the App Store or Google Play URL for your app"
177 |             }, indent=2)
178 |         
179 |         # Validate store URL format
180 |         store_url = promoted_object["object_store_url"]
181 |         valid_store_patterns = [
182 |             "apps.apple.com",  # iOS App Store
183 |             "play.google.com",  # Google Play Store
184 |             "itunes.apple.com"  # Alternative iOS format
185 |         ]
186 |         
187 |         if not any(pattern in store_url for pattern in valid_store_patterns):
188 |             return json.dumps({
189 |                 "error": "Invalid object_store_url format",
190 |                 "details": "URL must be from App Store (apps.apple.com) or Google Play (play.google.com)",
191 |                 "provided_url": store_url
192 |             }, indent=2)
193 |     
194 |     # Validate destination_type if provided
195 |     if destination_type:
196 |         valid_destination_types = ["APP_STORE", "DEEPLINK", "APP_INSTALL", "ON_AD"]
197 |         if destination_type not in valid_destination_types:
198 |             return json.dumps({
199 |                 "error": f"Invalid destination_type: {destination_type}",
200 |                 "valid_values": valid_destination_types
201 |             }, indent=2)
202 |     
203 |     # Basic targeting is required if not provided
204 |     if not targeting:
205 |         targeting = {
206 |             "age_min": 18,
207 |             "age_max": 65,
208 |             "geo_locations": {"countries": ["US"]},
209 |             "targeting_automation": {"advantage_audience": 1}
210 |         }
211 |     
212 |     endpoint = f"{account_id}/adsets"
213 |     
214 |     params = {
215 |         "name": name,
216 |         "campaign_id": campaign_id,
217 |         "status": status,
218 |         "optimization_goal": optimization_goal,
219 |         "billing_event": billing_event,
220 |         "targeting": json.dumps(targeting)  # Properly format as JSON string
221 |     }
222 |     
223 |     # Convert budget values to strings if they aren't already
224 |     if daily_budget is not None:
225 |         params["daily_budget"] = str(daily_budget)
226 |     
227 |     if lifetime_budget is not None:
228 |         params["lifetime_budget"] = str(lifetime_budget)
229 |     
230 |     # Add other parameters if provided
231 |     if bid_amount is not None:
232 |         params["bid_amount"] = str(bid_amount)
233 |     
234 |     if bid_strategy:
235 |         params["bid_strategy"] = bid_strategy
236 |     
237 |     if start_time:
238 |         params["start_time"] = start_time
239 |     
240 |     if end_time:
241 |         params["end_time"] = end_time
242 |     
243 |     # Add DSA beneficiary if provided
244 |     if dsa_beneficiary:
245 |         params["dsa_beneficiary"] = dsa_beneficiary
246 |     
247 |     # Add mobile app parameters if provided
248 |     if promoted_object:
249 |         params["promoted_object"] = json.dumps(promoted_object)
250 |     
251 |     if destination_type:
252 |         params["destination_type"] = destination_type
253 |     
254 |     # Enable Dynamic Creative if requested
255 |     if is_dynamic_creative is not None:
256 |         params["is_dynamic_creative"] = "true" if bool(is_dynamic_creative) else "false"
257 |     
258 |     try:
259 |         data = await make_api_request(endpoint, access_token, params, method="POST")
260 |         return json.dumps(data, indent=2)
261 |     except Exception as e:
262 |         error_msg = str(e)
263 |         
264 |         # Enhanced error handling for DSA beneficiary issues
265 |         if "permission" in error_msg.lower() or "insufficient" in error_msg.lower():
266 |             return json.dumps({
267 |                 "error": "Insufficient permissions to set DSA beneficiary. Please ensure you have business_management permissions.",
268 |                 "details": error_msg,
269 |                 "params_sent": params,
270 |                 "permission_required": True
271 |             }, indent=2)
272 |         elif "dsa_beneficiary" in error_msg.lower() and ("not supported" in error_msg.lower() or "parameter" in error_msg.lower()):
273 |             return json.dumps({
274 |                 "error": "DSA beneficiary parameter not supported in this API version. Please set DSA beneficiary manually in Facebook Ads Manager.",
275 |                 "details": error_msg,
276 |                 "params_sent": params,
277 |                 "manual_setup_required": True
278 |             }, indent=2)
279 |         elif "benefits from ads" in error_msg or "DSA beneficiary" in error_msg:
280 |             return json.dumps({
281 |                 "error": "DSA beneficiary required for European compliance. Please provide the person or organization that benefits from ads in this ad set.",
282 |                 "details": error_msg,
283 |                 "params_sent": params,
284 |                 "dsa_required": True
285 |             }, indent=2)
286 |         else:
287 |             return json.dumps({
288 |                 "error": "Failed to create ad set",
289 |                 "details": error_msg,
290 |                 "params_sent": params
291 |             }, indent=2)
292 | 
293 | 
294 | @mcp_server.tool()
295 | @meta_api_tool
296 | async def update_adset(adset_id: str, frequency_control_specs: Optional[List[Dict[str, Any]]] = None, bid_strategy: Optional[str] = None, 
297 |                         bid_amount: Optional[int] = None, status: Optional[str] = None, targeting: Optional[Dict[str, Any]] = None, 
298 |                         optimization_goal: Optional[str] = None, daily_budget: Optional[int] = None, lifetime_budget: Optional[int] = None, 
299 |                         is_dynamic_creative: Optional[bool] = None,
300 |                         access_token: Optional[str] = None) -> str:
301 |     """
302 |     Update an ad set with new settings including frequency caps and budgets.
303 |     
304 |     Args:
305 |         adset_id: Meta Ads ad set ID
306 |         frequency_control_specs: List of frequency control specifications 
307 |                                  (e.g. [{"event": "IMPRESSIONS", "interval_days": 7, "max_frequency": 3}])
308 |         bid_strategy: Bid strategy (e.g., 'LOWEST_COST_WITH_BID_CAP')
309 |         bid_amount: Bid amount in account currency (in cents for USD)
310 |         status: Update ad set status (ACTIVE, PAUSED, etc.)
311 |         targeting: Complete targeting specifications (will replace existing targeting)
312 |                   (e.g. {"targeting_automation":{"advantage_audience":1}, "geo_locations": {"countries": ["US"]}})
313 |         optimization_goal: Conversion optimization goal (e.g., 'LINK_CLICKS', 'CONVERSIONS', 'APP_INSTALLS', etc.)
314 |         daily_budget: Daily budget in account currency (in cents) as a string
315 |         lifetime_budget: Lifetime budget in account currency (in cents) as a string
316 |         is_dynamic_creative: Enable/disable Dynamic Creative for this ad set.
317 |         access_token: Meta API access token (optional - will use cached token if not provided)
318 |     """
319 |     if not adset_id:
320 |         return json.dumps({"error": "No ad set ID provided"}, indent=2)
321 |     
322 |     params = {}
323 |     
324 |     if frequency_control_specs is not None:
325 |         params['frequency_control_specs'] = frequency_control_specs
326 |     
327 |     if bid_strategy is not None:
328 |         params['bid_strategy'] = bid_strategy
329 |         
330 |     if bid_amount is not None:
331 |         params['bid_amount'] = str(bid_amount)
332 |         
333 |     if status is not None:
334 |         params['status'] = status
335 |         
336 |     if optimization_goal is not None:
337 |         params['optimization_goal'] = optimization_goal
338 |         
339 |     if targeting is not None:
340 |         # Ensure proper JSON encoding for targeting
341 |         if isinstance(targeting, dict):
342 |             params['targeting'] = json.dumps(targeting)
343 |         else:
344 |             params['targeting'] = targeting  # Already a string
345 |     
346 |     # Add budget parameters if provided
347 |     if daily_budget is not None:
348 |         params['daily_budget'] = str(daily_budget)
349 |     
350 |     if lifetime_budget is not None:
351 |         params['lifetime_budget'] = str(lifetime_budget)
352 |     
353 |     if is_dynamic_creative is not None:
354 |         params['is_dynamic_creative'] = "true" if bool(is_dynamic_creative) else "false"
355 |     
356 |     if not params:
357 |         return json.dumps({"error": "No update parameters provided"}, indent=2)
358 | 
359 |     endpoint = f"{adset_id}"
360 |     
361 |     try:
362 |         # Use POST method for updates as per Meta API documentation
363 |         data = await make_api_request(endpoint, access_token, params, method="POST")
364 |         return json.dumps(data, indent=2)
365 |     except Exception as e:
366 |         error_msg = str(e)
367 |         # Include adset_id in error for better context
368 |         return json.dumps({
369 |             "error": f"Failed to update ad set {adset_id}",
370 |             "details": error_msg,
371 |             "params_sent": params
372 |         }, indent=2) 
```

--------------------------------------------------------------------------------
/tests/test_openai_mcp_deep_research.py:
--------------------------------------------------------------------------------

```python
  1 | #!/usr/bin/env python3
  2 | """
  3 | OpenAI MCP Deep Research Integration Tests
  4 | 
  5 | This test suite validates the OpenAI MCP specification compliance:
  6 | - search tool: Returns list of IDs based on query
  7 | - fetch tool: Returns complete record data by ID
  8 | - ChatGPT Deep Research compatibility
  9 | - Integration with existing authentication
 10 | 
 11 | Usage:
 12 |     1. Start the server: python -m meta_ads_mcp --transport streamable-http --port 8080
 13 |     2. Run tests: python -m pytest tests/test_openai_mcp_deep_research.py -v
 14 | 
 15 | Or run directly:
 16 |     python tests/test_openai_mcp_deep_research.py
 17 | """
 18 | 
 19 | import requests
 20 | import json
 21 | import time
 22 | import sys
 23 | import os
 24 | from typing import Dict, Any, Optional, List
 25 | 
 26 | # Load environment variables from .env file
 27 | try:
 28 |     from dotenv import load_dotenv
 29 |     load_dotenv()
 30 |     print("✅ Loaded environment variables from .env file")
 31 | except ImportError:
 32 |     print("⚠️  python-dotenv not installed, using system environment variables only")
 33 |     print("   Install with: pip install python-dotenv")
 34 | 
 35 | # Add project root to path for imports
 36 | sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
 37 | 
 38 | class OpenAIMCPTester:
 39 |     """Test suite for OpenAI MCP Deep Research compatibility"""
 40 |     
 41 |     def __init__(self, base_url: str = "http://localhost:8080"):
 42 |         self.base_url = base_url.rstrip('/')
 43 |         self.endpoint = f"{self.base_url}/mcp/"
 44 |         self.request_id = 1
 45 |         
 46 |     def _make_request(self, method: str, params: Dict[str, Any] = None, 
 47 |                      headers: Dict[str, str] = None) -> Dict[str, Any]:
 48 |         """Make a JSON-RPC request to the MCP server"""
 49 |         
 50 |         # Default headers for MCP protocol with streamable HTTP transport
 51 |         default_headers = {
 52 |             "Content-Type": "application/json",
 53 |             "Accept": "application/json, text/event-stream",
 54 |             "User-Agent": "OpenAI-MCP-Test-Client/1.0"
 55 |         }
 56 |         
 57 |         if headers:
 58 |             default_headers.update(headers)
 59 |         
 60 |         payload = {
 61 |             "jsonrpc": "2.0",
 62 |             "method": method,
 63 |             "id": self.request_id
 64 |         }
 65 |         
 66 |         if params:
 67 |             payload["params"] = params
 68 |         
 69 |         try:
 70 |             response = requests.post(
 71 |                 self.endpoint,
 72 |                 headers=default_headers,
 73 |                 json=payload,
 74 |                 timeout=10
 75 |             )
 76 |             
 77 |             self.request_id += 1
 78 |             
 79 |             return {
 80 |                 "status_code": response.status_code,
 81 |                 "headers": dict(response.headers),
 82 |                 "json": response.json() if response.status_code == 200 else None,
 83 |                 "text": response.text,
 84 |                 "success": response.status_code == 200
 85 |             }
 86 |             
 87 |         except requests.exceptions.RequestException as e:
 88 |             return {
 89 |                 "status_code": 0,
 90 |                 "headers": {},
 91 |                 "json": None,
 92 |                 "text": str(e),
 93 |                 "success": False,
 94 |                 "error": str(e)
 95 |             }
 96 | 
 97 |     def test_search_tool_exists(self, auth_headers: Dict[str, str] = None) -> Dict[str, Any]:
 98 |         """Test that the search tool is available in tools list"""
 99 |         result = self._make_request("tools/list", {}, auth_headers)
100 |         
101 |         if not result["success"]:
102 |             return {"success": False, "error": "Failed to get tools list"}
103 |         
104 |         tools = result["json"]["result"].get("tools", [])
105 |         search_tool = next((tool for tool in tools if tool["name"] == "search"), None)
106 |         
107 |         return {
108 |             "success": search_tool is not None,
109 |             "tool": search_tool,
110 |             "all_tools": [tool["name"] for tool in tools]
111 |         }
112 | 
113 |     def test_fetch_tool_exists(self, auth_headers: Dict[str, str] = None) -> Dict[str, Any]:
114 |         """Test that the fetch tool is available in tools list"""
115 |         result = self._make_request("tools/list", {}, auth_headers)
116 |         
117 |         if not result["success"]:
118 |             return {"success": False, "error": "Failed to get tools list"}
119 |         
120 |         tools = result["json"]["result"].get("tools", [])
121 |         fetch_tool = next((tool for tool in tools if tool["name"] == "fetch"), None)
122 |         
123 |         return {
124 |             "success": fetch_tool is not None,
125 |             "tool": fetch_tool,
126 |             "all_tools": [tool["name"] for tool in tools]
127 |         }
128 | 
129 |     def test_search_tool_call(self, query: str, auth_headers: Dict[str, str] = None) -> Dict[str, Any]:
130 |         """Test calling the search tool with a query"""
131 |         result = self._make_request("tools/call", {
132 |             "name": "search",
133 |             "arguments": {"query": query}
134 |         }, auth_headers)
135 |         
136 |         if not result["success"]:
137 |             return {"success": False, "error": result.get("text", "Unknown error")}
138 |         
139 |         # Parse the tool response
140 |         response_data = result["json"]["result"]
141 |         content = response_data.get("content", [{}])[0].get("text", "")
142 |         
143 |         try:
144 |             parsed_content = json.loads(content)
145 |             ids = parsed_content.get("ids", [])
146 |             
147 |             return {
148 |                 "success": True,
149 |                 "ids": ids,
150 |                 "raw_content": content,
151 |                 "id_count": len(ids)
152 |             }
153 |         except json.JSONDecodeError:
154 |             return {
155 |                 "success": False,
156 |                 "error": "Search tool did not return valid JSON",
157 |                 "raw_content": content
158 |             }
159 | 
160 |     def test_fetch_tool_call(self, record_id: str, auth_headers: Dict[str, str] = None) -> Dict[str, Any]:
161 |         """Test calling the fetch tool with an ID"""
162 |         result = self._make_request("tools/call", {
163 |             "name": "fetch",
164 |             "arguments": {"id": record_id}
165 |         }, auth_headers)
166 |         
167 |         if not result["success"]:
168 |             return {"success": False, "error": result.get("text", "Unknown error")}
169 |         
170 |         # Parse the tool response
171 |         response_data = result["json"]["result"]
172 |         content = response_data.get("content", [{}])[0].get("text", "")
173 |         
174 |         try:
175 |             parsed_content = json.loads(content)
176 |             
177 |             return {
178 |                 "success": True,
179 |                 "record": parsed_content,
180 |                 "raw_content": content,
181 |                 "has_required_fields": all(field in parsed_content for field in ["id", "title", "text"])
182 |             }
183 |         except json.JSONDecodeError:
184 |             return {
185 |                 "success": False,
186 |                 "error": "Fetch tool did not return valid JSON",
187 |                 "raw_content": content
188 |             }
189 | 
190 |     def test_search_fetch_workflow(self, auth_headers: Dict[str, str] = None) -> Dict[str, Any]:
191 |         """Test the complete search->fetch workflow that ChatGPT Deep Research expects"""
192 |         
193 |         # Step 1: Search for something that will return account IDs
194 |         search_result = self.test_search_tool_call("Yves", auth_headers)
195 |         
196 |         if not search_result["success"]:
197 |             return {
198 |                 "success": False,
199 |                 "step": "search",
200 |                 "error": search_result.get("error", "Search failed")
201 |             }
202 |         
203 |         if not search_result["ids"]:
204 |             return {
205 |                 "success": False,
206 |                 "step": "search",
207 |                 "error": "Search returned no IDs"
208 |             }
209 |         
210 |         # Step 2: Fetch the first ID
211 |         first_id = search_result["ids"][0]
212 |         fetch_result = self.test_fetch_tool_call(first_id, auth_headers)
213 |         
214 |         if not fetch_result["success"]:
215 |             return {
216 |                 "success": False,
217 |                 "step": "fetch",
218 |                 "error": fetch_result.get("error", "Fetch failed"),
219 |                 "searched_id": first_id
220 |             }
221 |         
222 |         return {
223 |             "success": True,
224 |             "search_ids": search_result["ids"],
225 |             "fetched_record": fetch_result["record"],
226 |             "workflow_complete": True
227 |         }
228 | 
229 |     def test_openai_specification_compliance(self, auth_headers: Dict[str, str] = None) -> Dict[str, bool]:
230 |         """Test compliance with OpenAI's MCP specification for Deep Research"""
231 |         results = {}
232 |         
233 |         print("\n🧪 Testing OpenAI MCP Specification Compliance")
234 |         print("="*55)
235 |         
236 |         # Test 1: Both required tools exist
237 |         print("🔍 Checking required tools exist")
238 |         search_exists = self.test_search_tool_exists(auth_headers)
239 |         fetch_exists = self.test_fetch_tool_exists(auth_headers)
240 |         
241 |         results["search_tool_exists"] = search_exists["success"]
242 |         results["fetch_tool_exists"] = fetch_exists["success"]
243 |         
244 |         if not search_exists["success"]:
245 |             print("❌ Search tool not found")
246 |             print(f"   Available tools: {search_exists.get('all_tools', [])}")
247 |             return results
248 |         
249 |         if not fetch_exists["success"]:
250 |             print("❌ Fetch tool not found")
251 |             print(f"   Available tools: {fetch_exists.get('all_tools', [])}")
252 |             return results
253 |         
254 |         print("✅ Both search and fetch tools found")
255 |         
256 |         # Test 2: Search tool returns proper format
257 |         print("\n🔍 Testing search tool format")
258 |         search_result = self.test_search_tool_call("Yves", auth_headers)
259 |         results["search_format_valid"] = search_result["success"]
260 |         
261 |         if not search_result["success"]:
262 |             print(f"❌ Search tool failed: {search_result.get('error', 'Unknown error')}")
263 |             return results
264 |         
265 |         print(f"✅ Search tool returns valid format with {search_result['id_count']} IDs")
266 |         
267 |         # Test 3: Fetch tool returns proper format
268 |         if search_result["ids"]:
269 |             print("\n🔍 Testing fetch tool format")
270 |             first_id = search_result["ids"][0]
271 |             fetch_result = self.test_fetch_tool_call(first_id, auth_headers)
272 |             results["fetch_format_valid"] = fetch_result["success"]
273 |             results["fetch_has_required_fields"] = fetch_result.get("has_required_fields", False)
274 |             
275 |             if not fetch_result["success"]:
276 |                 print(f"❌ Fetch tool failed: {fetch_result.get('error', 'Unknown error')}")
277 |                 return results
278 |             
279 |             print("✅ Fetch tool returns valid format")
280 |             
281 |             if fetch_result["has_required_fields"]:
282 |                 print("✅ Fetch response includes required fields (id, title, text)")
283 |             else:
284 |                 print("⚠️  Fetch response missing some required fields")
285 |         else:
286 |             print("⚠️  Cannot test fetch tool - no IDs returned by search")
287 |             results["fetch_format_valid"] = False
288 |             results["fetch_has_required_fields"] = False
289 |         
290 |         # Test 4: Complete workflow
291 |         print("\n🔍 Testing complete search->fetch workflow")
292 |         workflow_result = self.test_search_fetch_workflow(auth_headers)
293 |         results["workflow_complete"] = workflow_result["success"]
294 |         
295 |         if workflow_result["success"]:
296 |             print("✅ Complete workflow successful")
297 |         else:
298 |             print(f"❌ Workflow failed at {workflow_result.get('step', 'unknown')} step")
299 |             print(f"   Error: {workflow_result.get('error', 'Unknown error')}")
300 |         
301 |         return results
302 | 
303 |     def test_page_search_functionality(self, auth_headers: Dict[str, str] = None) -> Dict[str, Any]:
304 |         """Test that the search function includes page searching when query mentions pages"""
305 |         print("\n🔍 Testing page search functionality")
306 |         
307 |         # Test 1: Search with page-related query that matches an account name
308 |         page_search_result = self.test_search_tool_call("Injury Payouts pages", auth_headers)
309 |         
310 |         if not page_search_result["success"]:
311 |             return {
312 |                 "success": False,
313 |                 "error": f"Page search failed: {page_search_result.get('error', 'Unknown error')}"
314 |             }
315 |         
316 |         # Check if page records are included in results
317 |         page_ids = [id for id in page_search_result["ids"] if id.startswith("page:")]
318 |         
319 |         result = {
320 |             "success": True,
321 |             "page_search_works": len(page_ids) > 0,
322 |             "page_ids_found": len(page_ids),
323 |             "total_ids": len(page_search_result["ids"]),
324 |             "page_ids": page_ids
325 |         }
326 |         
327 |         if len(page_ids) > 0:
328 |             print(f"✅ Page search working - found {len(page_ids)} page records")
329 |             
330 |             # Test 2: Fetch a page record
331 |             first_page_id = page_ids[0]
332 |             fetch_result = self.test_fetch_tool_call(first_page_id, auth_headers)
333 |             
334 |             if fetch_result["success"]:
335 |                 print(f"✅ Page fetch working - retrieved page record: {first_page_id}")
336 |                 result["page_fetch_works"] = True
337 |                 result["fetched_page_data"] = fetch_result.get("record", {})
338 |             else:
339 |                 print(f"❌ Page fetch failed: {fetch_result.get('error', 'Unknown error')}")
340 |                 result["page_fetch_works"] = False
341 |         else:
342 |             print("⚠️  No page records found in search results")
343 |             result["page_fetch_works"] = False
344 |         
345 |         return result
346 | 
347 |     def run_openai_compliance_test_suite(self) -> bool:
348 |         """Run complete OpenAI MCP compliance test suite"""
349 |         print("🚀 OpenAI MCP Deep Research Compliance Test Suite")
350 |         print("="*60)
351 |         
352 |         # Check server availability first
353 |         try:
354 |             response = requests.get(f"{self.base_url}/", timeout=5)
355 |             server_running = response.status_code in [200, 404]
356 |         except:
357 |             server_running = False
358 |         
359 |         if not server_running:
360 |             print("❌ Server is not running at", self.base_url)
361 |             print("   Please start the server with:")
362 |             print("   python -m meta_ads_mcp --transport streamable-http --port 8080")
363 |             return False
364 |         
365 |         print("✅ Server is running")
366 |         
367 |         # Test with no authentication (server handles auth implicitly)
368 |         auth_scenarios = [
369 |             {
370 |                 "name": "No Authentication",
371 |                 "headers": None
372 |             }
373 |         ]
374 |         
375 |         all_results = {}
376 |         
377 |         for scenario in auth_scenarios:
378 |             print(f"\n📋 Testing with: {scenario['name']}")
379 |             print("-" * 40)
380 |             
381 |             results = self.test_openai_specification_compliance(scenario["headers"])
382 |             
383 |             # Add page search test
384 |             page_results = self.test_page_search_functionality(scenario["headers"])
385 |             results["page_search_functionality"] = page_results.get("page_search_works", False)
386 |             results["page_fetch_functionality"] = page_results.get("page_fetch_works", False)
387 |             
388 |             all_results[scenario["name"]] = results
389 |         
390 |         # Summary
391 |         print("\n🏁 OPENAI MCP COMPLIANCE TEST RESULTS")
392 |         print("="*40)
393 |         
394 |         overall_success = True
395 |         for scenario_name, results in all_results.items():
396 |             scenario_success = all(results.values()) if results else False
397 |             status = "✅ COMPLIANT" if scenario_success else "❌ NON-COMPLIANT"
398 |             print(f"{scenario_name}: {status}")
399 |             
400 |             if not scenario_success and results:
401 |                 for test_name, test_result in results.items():
402 |                     if not test_result:
403 |                         print(f"   ❌ {test_name}")
404 |             
405 |             if not scenario_success:
406 |                 overall_success = False
407 |         
408 |         print(f"\n📊 Overall OpenAI MCP Compliance: {'✅ COMPLIANT' if overall_success else '❌ NON-COMPLIANT'}")
409 |         
410 |         if overall_success:
411 |             print("\n🎉 Server is fully compatible with OpenAI's MCP specification!")
412 |             print("   • ChatGPT Deep Research: Ready")
413 |             print("   • Search tool: Compliant (includes page search)")
414 |             print("   • Fetch tool: Compliant")
415 |             print("   • Workflow: Complete")
416 |             print("   • Page Search: Enhanced")
417 |         else:
418 |             print("\n⚠️  Server needs updates for OpenAI MCP compliance")
419 |             print("   See failed tests above for required changes")
420 |         
421 |         return overall_success
422 | 
423 | 
424 | def main():
425 |     """Main test execution"""
426 |     tester = OpenAIMCPTester()
427 |     success = tester.run_openai_compliance_test_suite()
428 |     sys.exit(0 if success else 1)
429 | 
430 | 
431 | if __name__ == "__main__":
432 |     main() 
```

--------------------------------------------------------------------------------
/meta_ads_mcp/core/api.py:
--------------------------------------------------------------------------------

```python
  1 | """Core API functionality for Meta Ads API."""
  2 | 
  3 | from typing import Any, Dict, Optional, Callable
  4 | import json
  5 | import httpx
  6 | import asyncio
  7 | import functools
  8 | import os
  9 | from . import auth
 10 | from .auth import needs_authentication, auth_manager, start_callback_server, shutdown_callback_server
 11 | from .utils import logger
 12 | 
 13 | # Constants
 14 | META_GRAPH_API_VERSION = "v22.0"
 15 | META_GRAPH_API_BASE = f"https://graph.facebook.com/{META_GRAPH_API_VERSION}"
 16 | USER_AGENT = "meta-ads-mcp/1.0"
 17 | 
 18 | # Log key environment and configuration at startup
 19 | logger.info("Core API module initialized")
 20 | logger.info(f"Graph API Version: {META_GRAPH_API_VERSION}")
 21 | logger.info(f"META_APP_ID env var present: {'Yes' if os.environ.get('META_APP_ID') else 'No'}")
 22 | 
 23 | class GraphAPIError(Exception):
 24 |     """Exception raised for errors from the Graph API."""
 25 |     def __init__(self, error_data: Dict[str, Any]):
 26 |         self.error_data = error_data
 27 |         self.message = error_data.get('message', 'Unknown Graph API error')
 28 |         super().__init__(self.message)
 29 |         
 30 |         # Log error details
 31 |         logger.error(f"Graph API Error: {self.message}")
 32 |         logger.debug(f"Error details: {error_data}")
 33 |         
 34 |         # Check if this is an auth error
 35 |         if "code" in error_data and error_data["code"] in [190, 102, 4]:
 36 |             # Common auth error codes
 37 |             logger.warning(f"Auth error detected (code: {error_data['code']}). Invalidating token.")
 38 |             auth_manager.invalidate_token()
 39 | 
 40 | 
 41 | async def make_api_request(
 42 |     endpoint: str,
 43 |     access_token: str,
 44 |     params: Optional[Dict[str, Any]] = None,
 45 |     method: str = "GET"
 46 | ) -> Dict[str, Any]:
 47 |     """
 48 |     Make a request to the Meta Graph API.
 49 |     
 50 |     Args:
 51 |         endpoint: API endpoint path (without base URL)
 52 |         access_token: Meta API access token
 53 |         params: Additional query parameters
 54 |         method: HTTP method (GET, POST, DELETE)
 55 |     
 56 |     Returns:
 57 |         API response as a dictionary
 58 |     """
 59 |     # Validate access token before proceeding
 60 |     if not access_token:
 61 |         logger.error("API request attempted with blank access token")
 62 |         return {
 63 |             "error": {
 64 |                 "message": "Authentication Required",
 65 |                 "details": "A valid access token is required to access the Meta API",
 66 |                 "action_required": "Please authenticate first"
 67 |             }
 68 |         }
 69 |         
 70 |     url = f"{META_GRAPH_API_BASE}/{endpoint}"
 71 |     
 72 |     headers = {
 73 |         "User-Agent": USER_AGENT,
 74 |     }
 75 |     
 76 |     request_params = params or {}
 77 |     request_params["access_token"] = access_token
 78 |     
 79 |     # Logging the request (masking token for security)
 80 |     masked_params = {k: "***TOKEN***" if k == "access_token" else v for k, v in request_params.items()}
 81 |     logger.debug(f"API Request: {method} {url}")
 82 |     logger.debug(f"Request params: {masked_params}")
 83 |     
 84 |     # Check for app_id in params
 85 |     app_id = auth_manager.app_id
 86 |     logger.debug(f"Current app_id from auth_manager: {app_id}")
 87 |     
 88 |     async with httpx.AsyncClient() as client:
 89 |         try:
 90 |             if method == "GET":
 91 |                 # For GET, JSON-encode dict/list params (e.g., targeting_spec) to proper strings
 92 |                 encoded_params = {}
 93 |                 for key, value in request_params.items():
 94 |                     if isinstance(value, (dict, list)):
 95 |                         encoded_params[key] = json.dumps(value)
 96 |                     else:
 97 |                         encoded_params[key] = value
 98 |                 response = await client.get(url, params=encoded_params, headers=headers, timeout=30.0)
 99 |             elif method == "POST":
100 |                 # For Meta API, POST requests need data, not JSON
101 |                 if 'targeting' in request_params and isinstance(request_params['targeting'], dict):
102 |                     # Convert targeting dict to string for the API
103 |                     request_params['targeting'] = json.dumps(request_params['targeting'])
104 |                 
105 |                 # Convert lists and dicts to JSON strings    
106 |                 for key, value in request_params.items():
107 |                     if isinstance(value, (list, dict)):
108 |                         request_params[key] = json.dumps(value)
109 |                 
110 |                 logger.debug(f"POST params (prepared): {masked_params}")
111 |                 response = await client.post(url, data=request_params, headers=headers, timeout=30.0)
112 |             elif method == "DELETE":
113 |                 response = await client.delete(url, params=request_params, headers=headers, timeout=30.0)
114 |             else:
115 |                 raise ValueError(f"Unsupported HTTP method: {method}")
116 |             
117 |             response.raise_for_status()
118 |             logger.debug(f"API Response status: {response.status_code}")
119 |             
120 |             # Ensure the response is JSON and return it as a dictionary
121 |             try:
122 |                 return response.json()
123 |             except json.JSONDecodeError:
124 |                 # If not JSON, return text content in a structured format
125 |                 return {
126 |                     "text_response": response.text,
127 |                     "status_code": response.status_code
128 |                 }
129 |         
130 |         except httpx.HTTPStatusError as e:
131 |             error_info = {}
132 |             try:
133 |                 error_info = e.response.json()
134 |             except:
135 |                 error_info = {"status_code": e.response.status_code, "text": e.response.text}
136 |             
137 |             logger.error(f"HTTP Error: {e.response.status_code} - {error_info}")
138 |             
139 |             # Check for authentication errors
140 |             if e.response.status_code == 401 or e.response.status_code == 403:
141 |                 logger.warning("Detected authentication error (401/403)")
142 |                 auth_manager.invalidate_token()
143 |             elif "error" in error_info:
144 |                 error_obj = error_info.get("error", {})
145 |                 # Check for specific FB API errors related to auth
146 |                 if isinstance(error_obj, dict) and error_obj.get("code") in [190, 102, 4, 200, 10]:
147 |                     logger.warning(f"Detected Facebook API auth error: {error_obj.get('code')}")
148 |                     # Log more details about app ID related errors
149 |                     if error_obj.get("code") == 200 and "Provide valid app ID" in error_obj.get("message", ""):
150 |                         logger.error("Meta API authentication configuration issue")
151 |                         logger.error(f"Current app_id: {app_id}")
152 |                         # Provide a clearer error message without the confusing "Provide valid app ID" message
153 |                         return {
154 |                             "error": {
155 |                                 "message": "Meta API authentication configuration issue. Please check your app credentials.",
156 |                                 "original_error": error_obj.get("message"),
157 |                                 "code": error_obj.get("code")
158 |                             }
159 |                         }
160 |                     auth_manager.invalidate_token()
161 |             
162 |             # Include full details for technical users
163 |             full_response = {
164 |                 "headers": dict(e.response.headers),
165 |                 "status_code": e.response.status_code,
166 |                 "url": str(e.response.url),
167 |                 "reason": getattr(e.response, "reason_phrase", "Unknown reason"),
168 |                 "request_method": e.request.method,
169 |                 "request_url": str(e.request.url)
170 |             }
171 |             
172 |             # Return a properly structured error object
173 |             return {
174 |                 "error": {
175 |                     "message": f"HTTP Error: {e.response.status_code}",
176 |                     "details": error_info,
177 |                     "full_response": full_response
178 |                 }
179 |             }
180 |         
181 |         except Exception as e:
182 |             logger.error(f"Request Error: {str(e)}")
183 |             return {"error": {"message": str(e)}}
184 | 
185 | 
186 | # Generic wrapper for all Meta API tools
187 | def meta_api_tool(func):
188 |     """Decorator for Meta API tools that handles authentication and error handling."""
189 |     @functools.wraps(func)
190 |     async def wrapper(*args, **kwargs):
191 |         try:
192 |             # Log function call
193 |             logger.debug(f"Function call: {func.__name__}")
194 |             logger.debug(f"Args: {args}")
195 |             # Log kwargs without sensitive info
196 |             safe_kwargs = {k: ('***TOKEN***' if k == 'access_token' else v) for k, v in kwargs.items()}
197 |             logger.debug(f"Kwargs: {safe_kwargs}")
198 |             
199 |             # Log app ID information
200 |             app_id = auth_manager.app_id
201 |             logger.debug(f"Current app_id: {app_id}")
202 |             logger.debug(f"META_APP_ID env var: {os.environ.get('META_APP_ID')}")
203 |             
204 |             # If access_token is not in kwargs or not kwargs['access_token'], try to get it from auth_manager
205 |             if 'access_token' not in kwargs or not kwargs['access_token']:
206 |                 try:
207 |                     access_token = await auth.get_current_access_token()
208 |                     if access_token:
209 |                         kwargs['access_token'] = access_token
210 |                         logger.debug("Using access token from auth_manager")
211 |                     else:
212 |                         logger.warning("No access token available from auth_manager")
213 |                         # Add more details about why token might be missing
214 |                         if (auth_manager.app_id == "YOUR_META_APP_ID" or not auth_manager.app_id) and not auth_manager.use_pipeboard:
215 |                             logger.error("TOKEN VALIDATION FAILED: No valid app_id configured")
216 |                             logger.error("Please set META_APP_ID environment variable or configure in your code")
217 |                         elif auth_manager.use_pipeboard:
218 |                             logger.error("TOKEN VALIDATION FAILED: Pipeboard authentication enabled but no valid token available")
219 |                             logger.error("Complete authentication via Pipeboard service or check PIPEBOARD_API_TOKEN")
220 |                         else:
221 |                             logger.error("Check logs above for detailed token validation failures")
222 |                 except Exception as e:
223 |                     logger.error(f"Error getting access token: {str(e)}")
224 |                     # Add stack trace for better debugging
225 |                     import traceback
226 |                     logger.error(f"Stack trace: {traceback.format_exc()}")
227 |             
228 |             # Final validation - if we still don't have a valid token, return authentication required
229 |             if 'access_token' not in kwargs or not kwargs['access_token']:
230 |                 logger.warning("No access token available, authentication needed")
231 |                 
232 |                 # Add more specific troubleshooting information
233 |                 auth_url = auth_manager.get_auth_url()
234 |                 app_id = auth_manager.app_id
235 |                 using_pipeboard = auth_manager.use_pipeboard
236 |                 
237 |                 logger.error("TOKEN VALIDATION SUMMARY:")
238 |                 logger.error(f"- Current app_id: '{app_id}'")
239 |                 logger.error(f"- Environment META_APP_ID: '{os.environ.get('META_APP_ID', 'Not set')}'")
240 |                 logger.error(f"- Pipeboard API token configured: {'Yes' if os.environ.get('PIPEBOARD_API_TOKEN') else 'No'}")
241 |                 logger.error(f"- Using Pipeboard authentication: {'Yes' if using_pipeboard else 'No'}")
242 |                 
243 |                 # Check for common configuration issues - but only if not using Pipeboard
244 |                 if not using_pipeboard and (app_id == "YOUR_META_APP_ID" or not app_id):
245 |                     logger.error("ISSUE DETECTED: No valid Meta App ID configured")
246 |                     logger.error("ACTION REQUIRED: Set META_APP_ID environment variable with a valid App ID")
247 |                 elif using_pipeboard:
248 |                     logger.error("ISSUE DETECTED: Pipeboard authentication configured but no valid token available")
249 |                     logger.error("ACTION REQUIRED: Complete authentication via Pipeboard service")
250 |                 
251 |                 # Provide different guidance based on authentication method
252 |                 if using_pipeboard:
253 |                     return json.dumps({
254 |                         "error": {
255 |                             "message": "Pipeboard Authentication Required",
256 |                             "details": {
257 |                                 "description": "Your Pipeboard API token is invalid or has expired",
258 |                                 "action_required": "Update your Pipeboard token",
259 |                                 "setup_url": "https://pipeboard.co/setup",
260 |                                 "token_url": "https://pipeboard.co/api-tokens",
261 |                                 "configuration_status": {
262 |                                     "app_id_configured": bool(app_id) and app_id != "YOUR_META_APP_ID",
263 |                                     "pipeboard_enabled": True,
264 |                                 },
265 |                                 "troubleshooting": "Go to https://pipeboard.co/setup to verify your account setup, then visit https://pipeboard.co/api-tokens to obtain a new API token",
266 |                                 "setup_link": "[Verify your Pipeboard account setup](https://pipeboard.co/setup)",
267 |                                 "token_link": "[Get a new Pipeboard API token](https://pipeboard.co/api-tokens)"
268 |                             }
269 |                         }
270 |                     }, indent=2)
271 |                 else:
272 |                     return json.dumps({
273 |                         "error": {
274 |                             "message": "Authentication Required",
275 |                             "details": {
276 |                                 "description": "You need to authenticate with the Meta API before using this tool",
277 |                                 "action_required": "Please authenticate first",
278 |                                 "auth_url": auth_url,
279 |                                 "configuration_status": {
280 |                                     "app_id_configured": bool(app_id) and app_id != "YOUR_META_APP_ID",
281 |                                     "pipeboard_enabled": False,
282 |                                 },
283 |                                 "troubleshooting": "Check logs for TOKEN VALIDATION FAILED messages",
284 |                                 "markdown_link": f"[Click here to authenticate with Meta Ads API]({auth_url})"
285 |                             }
286 |                         }
287 |                     }, indent=2)
288 |                 
289 |             # Call the original function
290 |             result = await func(*args, **kwargs)
291 |             
292 |             # If the result is a string (JSON), try to parse it to check for errors
293 |             if isinstance(result, str):
294 |                 try:
295 |                     result_dict = json.loads(result)
296 |                     if "error" in result_dict:
297 |                         logger.error(f"Error in API response: {result_dict['error']}")
298 |                         # If this is an app ID error, log more details
299 |                         if isinstance(result_dict.get("details", {}).get("error", {}), dict):
300 |                             error_obj = result_dict["details"]["error"]
301 |                             if error_obj.get("code") == 200 and "Provide valid app ID" in error_obj.get("message", ""):
302 |                                 logger.error("Meta API authentication configuration issue")
303 |                                 logger.error(f"Current app_id: {app_id}")
304 |                                 # Replace the confusing error with a more user-friendly one
305 |                                 return json.dumps({
306 |                                     "error": {
307 |                                         "message": "Meta API Configuration Issue",
308 |                                         "details": {
309 |                                             "description": "Your Meta API app is not properly configured",
310 |                                             "action_required": "Check your META_APP_ID environment variable",
311 |                                             "current_app_id": app_id,
312 |                                             "original_error": error_obj.get("message")
313 |                                         }
314 |                                     }
315 |                                 }, indent=2)
316 |                 except Exception:
317 |                     # Not JSON or other parsing error, wrap it in a dictionary
318 |                     return json.dumps({"data": result}, indent=2)
319 |             
320 |             # If result is already a dictionary, ensure it's properly serialized
321 |             if isinstance(result, dict):
322 |                 return json.dumps(result, indent=2)
323 |             
324 |             return result
325 |         except Exception as e:
326 |             logger.error(f"Error in {func.__name__}: {str(e)}")
327 |             return json.dumps({"error": str(e)}, indent=2)
328 |     
329 |     return wrapper 
```

--------------------------------------------------------------------------------
/tests/test_get_account_pages.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Tests for the updated get_account_pages function with multi-approach strategy.
  3 | """
  4 | 
  5 | import pytest
  6 | import json
  7 | from unittest.mock import AsyncMock, patch
  8 | from meta_ads_mcp.core.ads import get_account_pages
  9 | 
 10 | 
 11 | class TestGetAccountPages:
 12 |     """Test the updated get_account_pages function with comprehensive multi-approach testing."""
 13 |     
 14 |     @pytest.mark.asyncio
 15 |     async def test_get_account_pages_multi_approach_success(self):
 16 |         """Test successful page discovery using multiple approaches."""
 17 |         # Mock data for different endpoints
 18 |         mock_user_pages = {
 19 |             "data": [
 20 |                 {
 21 |                     "id": "111111111",
 22 |                     "name": "Personal Page",
 23 |                     "category": "Personal Blog",
 24 |                     "fan_count": 100
 25 |                 }
 26 |             ]
 27 |         }
 28 |         
 29 |         mock_client_pages = {
 30 |             "data": [
 31 |                 {
 32 |                     "id": "222222222", 
 33 |                     "name": "Client Page",
 34 |                     "category": "Business",
 35 |                     "fan_count": 500
 36 |                 }
 37 |             ]
 38 |         }
 39 |         
 40 |         mock_adcreatives = {
 41 |             "data": [
 42 |                 {
 43 |                     "id": "creative_123",
 44 |                     "object_story_spec": {"page_id": "333333333"}
 45 |                 }
 46 |             ]
 47 |         }
 48 |         
 49 |         # Mock page details for discovered IDs
 50 |         mock_page_details = {
 51 |             "111111111": {
 52 |                 "id": "111111111",
 53 |                 "name": "Personal Page",
 54 |                 "username": "personalpage",
 55 |                 "category": "Personal Blog",
 56 |                 "fan_count": 100,
 57 |                 "link": "https://facebook.com/personalpage",
 58 |                 "verification_status": "not_verified"
 59 |             },
 60 |             "222222222": {
 61 |                 "id": "222222222", 
 62 |                 "name": "Client Page",
 63 |                 "username": "clientpage",
 64 |                 "category": "Business",
 65 |                 "fan_count": 500,
 66 |                 "link": "https://facebook.com/clientpage",
 67 |                 "verification_status": "verified"
 68 |             },
 69 |             "333333333": {
 70 |                 "id": "333333333",
 71 |                 "name": "Creative Page", 
 72 |                 "username": "creativepage",
 73 |                 "category": "Creative",
 74 |                 "fan_count": 1000,
 75 |                 "link": "https://facebook.com/creativepage",
 76 |                 "verification_status": "not_verified"
 77 |             }
 78 |         }
 79 |         
 80 |         with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth, \
 81 |              patch('meta_ads_mcp.core.ads.make_api_request') as mock_api:
 82 |             
 83 |             mock_auth.return_value = "test_access_token"
 84 |             
 85 |             # Mock API calls in sequence for different approaches
 86 |             def mock_api_side_effect(endpoint, access_token, params):
 87 |                 if endpoint == "me/accounts":
 88 |                     return mock_user_pages
 89 |                 elif endpoint == "3182643988557192/owned_pages":
 90 |                     return {"data": []}  # No business pages
 91 |                 elif endpoint == "act_3182643988557192/client_pages":
 92 |                     return mock_client_pages
 93 |                 elif endpoint == "act_3182643988557192/adcreatives":
 94 |                     return mock_adcreatives
 95 |                 elif endpoint == "act_3182643988557192/ads":
 96 |                     return {"data": []}  # No ads
 97 |                 elif endpoint == "act_3182643988557192/promoted_objects":
 98 |                     return {"data": []}  # No promoted objects
 99 |                 elif endpoint == "act_3182643988557192/campaigns":
100 |                     return {"data": []}  # No campaigns
101 |                 elif endpoint in mock_page_details:
102 |                     return mock_page_details[endpoint]
103 |                 else:
104 |                     return {"data": []}
105 |             
106 |             mock_api.side_effect = mock_api_side_effect
107 |             
108 |             # Call the function
109 |             result = await get_account_pages(account_id="act_3182643988557192")
110 |             result_data = json.loads(result)
111 |             
112 |             # Verify the structure and content
113 |             assert "data" in result_data
114 |             assert "total_pages_found" in result_data
115 |             
116 |             # Should find 3 unique pages
117 |             assert result_data["total_pages_found"] == 3
118 |             assert len(result_data["data"]) == 3
119 |             
120 |             # Verify that we found valid page data
121 |             assert all("id" in page for page in result_data["data"])
122 |             
123 |             # Verify the page names are correct
124 |             page_names = [page.get("name") for page in result_data["data"]]
125 |             assert "Personal Page" in page_names
126 |             assert "Client Page" in page_names
127 |             assert "Creative Page" in page_names
128 |             
129 |             # Verify each page has basic required fields
130 |             for page in result_data["data"]:
131 |                 assert "id" in page
132 |                 assert "name" in page
133 |     
134 |     @pytest.mark.asyncio
135 |     async def test_get_account_pages_me_special_case(self):
136 |         """Test the special case when account_id is 'me'."""
137 |         mock_user_pages = {
138 |             "data": [
139 |                 {
140 |                     "id": "444444444",
141 |                     "name": "My Personal Page",
142 |                     "category": "Personal",
143 |                     "fan_count": 50
144 |                 }
145 |             ]
146 |         }
147 |         
148 |         with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth, \
149 |              patch('meta_ads_mcp.core.ads.make_api_request') as mock_api:
150 |             
151 |             mock_auth.return_value = "test_access_token"
152 |             mock_api.return_value = mock_user_pages
153 |             
154 |             result = await get_account_pages(account_id="me")
155 |             result_data = json.loads(result)
156 |             
157 |             # Verify the call was made to me/accounts
158 |             mock_api.assert_called_once_with(
159 |                 "me/accounts",
160 |                 "test_access_token",
161 |                 {"fields": "id,name,username,category,fan_count,link,verification_status,picture"}
162 |             )
163 |             
164 |             # Verify response structure
165 |             assert "data" in result_data
166 |             assert len(result_data["data"]) == 1
167 |             assert result_data["data"][0]["id"] == "444444444"
168 |     
169 |     @pytest.mark.asyncio
170 |     async def test_get_account_pages_tracking_specs_discovery(self):
171 |         """Test page discovery from tracking specs (most reliable method)."""
172 |         mock_ads_data = {
173 |             "data": [
174 |                 {
175 |                     "id": "ad_123",
176 |                     "tracking_specs": [
177 |                         {
178 |                             "page": ["555555555", "666666666"]
179 |                         }
180 |                     ]
181 |                 }
182 |             ]
183 |         }
184 |         
185 |         mock_page_details = {
186 |             "id": "555555555",
187 |             "name": "Tracking Page",
188 |             "username": "trackingpage",
189 |             "category": "Business"
190 |         }
191 |         
192 |         with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth, \
193 |              patch('meta_ads_mcp.core.ads.make_api_request') as mock_api:
194 |             
195 |             mock_auth.return_value = "test_access_token"
196 |             
197 |             def mock_api_side_effect(endpoint, access_token, params):
198 |                 if "tracking_specs" in params.get("fields", ""):
199 |                     return mock_ads_data
200 |                 elif endpoint == "555555555":
201 |                     return mock_page_details
202 |                 elif endpoint == "666666666":
203 |                     return {"error": "Page not accessible"}
204 |                 else:
205 |                     return {"data": []}
206 |             
207 |             mock_api.side_effect = mock_api_side_effect
208 |             
209 |             result = await get_account_pages(account_id="act_123456789")
210 |             result_data = json.loads(result)
211 |             
212 |             # Should find pages from tracking specs
213 |             assert result_data["total_pages_found"] == 2
214 |             
215 |             # Check that one page has details and one has error
216 |             pages = result_data["data"]
217 |             assert len(pages) == 2
218 |             
219 |             # One should have full details, one should have error
220 |             page_with_details = next((p for p in pages if "error" not in p), None)
221 |             page_with_error = next((p for p in pages if "error" in p), None)
222 |             
223 |             assert page_with_details is not None
224 |             assert page_with_error is not None
225 |             assert page_with_details["name"] == "Tracking Page"
226 |             assert "not accessible" in page_with_error["error"]
227 |     
228 |     @pytest.mark.asyncio
229 |     async def test_get_account_pages_no_pages_found(self):
230 |         """Test when no pages are found through any approach."""
231 |         with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth, \
232 |              patch('meta_ads_mcp.core.ads.make_api_request') as mock_api:
233 |             
234 |             mock_auth.return_value = "test_access_token"
235 |             mock_api.return_value = {"data": []}  # All endpoints return empty
236 |             
237 |             result = await get_account_pages(account_id="act_123456789")
238 |             result_data = json.loads(result)
239 |             
240 |             # Should return the fallback message
241 |             assert "data" in result_data
242 |             assert len(result_data["data"]) == 0
243 |             assert "message" in result_data
244 |             assert "No pages found" in result_data["message"]
245 |             assert "suggestion" in result_data
246 |     
247 |     @pytest.mark.asyncio
248 |     async def test_get_account_pages_error_handling(self):
249 |         """Test error handling when API calls fail."""
250 |         with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth, \
251 |              patch('meta_ads_mcp.core.ads.make_api_request') as mock_api:
252 |             
253 |             mock_auth.return_value = "test_access_token"
254 |             mock_api.side_effect = Exception("API Error")
255 |             
256 |             result = await get_account_pages(account_id="act_123456789")
257 |             result_data = json.loads(result)
258 |             
259 |             # Individual API errors are caught and logged, function returns "no pages found"
260 |             assert "data" in result_data
261 |             assert len(result_data["data"]) == 0
262 |             assert "message" in result_data
263 |             assert "No pages found" in result_data["message"]
264 |             
265 |             # But debug info should show the errors
266 |             if "debug" in result_data:
267 |                 debug = result_data["debug"]
268 |                 assert "errors" in debug
269 |                 assert len(debug["errors"]) > 0
270 |                 # Should have multiple API errors logged
271 |                 assert any("API Error" in error for error in debug["errors"])
272 |     
273 |     @pytest.mark.asyncio
274 |     async def test_get_account_pages_no_account_id(self):
275 |         """Test error when no account ID is provided."""
276 |         result = await get_account_pages(account_id=None)
277 |         result_data = json.loads(result)
278 |         
279 |         # The @meta_api_tool decorator handles authentication before function logic
280 |         # So it returns an authentication error instead of the simple account ID error
281 |         # Error responses may be wrapped in a "data" field (MCP format)
282 |         
283 |         # Check for direct error format
284 |         if "error" in result_data or "message" in result_data:
285 |             if "message" in result_data and "Authentication Required" in result_data["message"]:
286 |                 # MCP decorator returns authentication error - this is expected
287 |                 assert True  # This is the expected behavior
288 |                 return
289 |             elif "error" in result_data and "No account ID provided" in result_data["error"]:
290 |                 # Direct function call might return the account ID error
291 |                 assert True  # This is also valid
292 |                 return
293 |         
294 |         # Check for wrapped error format (MCP response format)
295 |         if "data" in result_data:
296 |             try:
297 |                 error_data = json.loads(result_data["data"])
298 |                 if "error" in error_data and "No account ID provided" in error_data["error"]:
299 |                     assert True  # Wrapped error format
300 |                     return
301 |             except (json.JSONDecodeError, TypeError):
302 |                 pass
303 |         
304 |         # Fallback: Check if the response contains any indication of missing account ID
305 |         result_str = str(result_data)
306 |         assert "No account ID provided" in result_str or "Authentication Required" in result_str
307 |     
308 |     @pytest.mark.asyncio
309 |     async def test_get_account_pages_account_id_validation_direct(self):
310 |         """Test account ID validation by directly testing the function logic."""
311 |         # Import the function implementation directly to bypass decorators
312 |         from meta_ads_mcp.core.ads import get_account_pages
313 |         
314 |         # Mock the function to bypass decorator authentication  
315 |         with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
316 |             mock_auth.return_value = "test_token"
317 |             
318 |             # Call with empty string (should be treated as None)
319 |             result = await get_account_pages(account_id="")
320 |             result_data = json.loads(result)
321 |             
322 |             # Response might be wrapped in MCP format
323 |             if "data" in result_data and isinstance(result_data["data"], str):
324 |                 # Parse the nested JSON response
325 |                 inner_data = json.loads(result_data["data"])
326 |                 assert "error" in inner_data
327 |                 assert "No account ID provided" in inner_data["error"]
328 |             else:
329 |                 # Direct response format
330 |                 assert "error" in result_data or "message" in result_data
331 |                 result_str = str(result_data)
332 |                 assert "No account ID provided" in result_str or "Authentication Required" in result_str
333 |     
334 |     @pytest.mark.asyncio
335 |     async def test_get_account_pages_multiple_sources(self):
336 |         """Test that pages from multiple sources are properly collected."""
337 |         # Mock different results for different approaches
338 |         mock_responses = {
339 |             "me/accounts": {"data": [{"id": "111111111"}]},
340 |             "123456789/owned_pages": {"data": []},
341 |             "act_123456789/client_pages": {"data": [{"id": "222222222"}]},
342 |             "act_123456789/adcreatives": {"data": []},
343 |             "act_123456789/ads": {"data": []},
344 |             "act_123456789/promoted_objects": {"data": []},
345 |             "act_123456789/campaigns": {"data": []},
346 |             "111111111": {"id": "111111111", "name": "Page 1"},
347 |             "222222222": {"id": "222222222", "name": "Page 2"}
348 |         }
349 |         
350 |         with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth, \
351 |              patch('meta_ads_mcp.core.ads.make_api_request') as mock_api:
352 |             
353 |             mock_auth.return_value = "test_access_token"
354 |             
355 |             def mock_api_side_effect(endpoint, access_token, params):
356 |                 return mock_responses.get(endpoint, {"data": []})
357 |             
358 |             mock_api.side_effect = mock_api_side_effect
359 |             
360 |             result = await get_account_pages(account_id="act_123456789")
361 |             result_data = json.loads(result)
362 |             
363 |             # Verify basic response structure
364 |             assert "data" in result_data
365 |             assert "total_pages_found" in result_data
366 |             assert result_data["total_pages_found"] == 2
367 |             assert len(result_data["data"]) == 2
368 |             
369 |             # Verify pages have correct names
370 |             page_names = [page.get("name") for page in result_data["data"]]
371 |             assert "Page 1" in page_names
372 |             assert "Page 2" in page_names
373 |     
374 |     @pytest.mark.asyncio
375 |     async def test_get_account_pages_act_prefix_handling(self):
376 |         """Test that account IDs without 'act_' prefix are handled correctly."""
377 |         with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth, \
378 |              patch('meta_ads_mcp.core.ads.make_api_request') as mock_api:
379 |             
380 |             mock_auth.return_value = "test_access_token"
381 |             mock_api.return_value = {"data": []}
382 |             
383 |             # Test with account ID without 'act_' prefix
384 |             result = await get_account_pages(account_id="123456789")
385 |             
386 |             # Check that API calls were made with 'act_' prefix added
387 |             calls = mock_api.call_args_list
388 |             
389 |             # Should find calls with 'act_123456789' in the endpoint
390 |             act_calls = [call for call in calls if 'act_123456789' in str(call)]
391 |             assert len(act_calls) > 0, "Should have made calls with 'act_' prefix"
392 |             
393 |             # Should also have made calls with raw account ID for business endpoints
394 |             business_calls = [call for call in calls if '123456789/owned_pages' in str(call)]
395 |             assert len(business_calls) > 0, "Should have made calls to business endpoints"
396 | 
397 | 
398 | if __name__ == "__main__":
399 |     pytest.main([__file__])
```

--------------------------------------------------------------------------------
/tests/test_budget_update.py:
--------------------------------------------------------------------------------

```python
  1 | #!/usr/bin/env python3
  2 | """
  3 | Unit Tests for Budget Update Functionality
  4 | 
  5 | This test suite validates the budget update parameter implementation for the
  6 | update_adset function in meta_ads_mcp/core/adsets.py.
  7 | 
  8 | Test cases cover:
  9 | - Budget update success scenarios
 10 | - Budget validation (negative, zero, too high values)
 11 | - Budget update with other parameters
 12 | - Error handling and permissions
 13 | """
 14 | 
 15 | import pytest
 16 | import json
 17 | import asyncio
 18 | from unittest.mock import AsyncMock, patch, MagicMock
 19 | from typing import Dict, Any, List
 20 | 
 21 | # Import the function to test
 22 | from meta_ads_mcp.core.adsets import update_adset
 23 | 
 24 | 
 25 | class TestBudgetUpdateFunctionality:
 26 |     """Test suite for budget update functionality"""
 27 |     
 28 |     @pytest.fixture
 29 |     def mock_api_request(self):
 30 |         """Mock for the make_api_request function"""
 31 |         with patch('meta_ads_mcp.core.adsets.make_api_request') as mock:
 32 |             mock.return_value = {
 33 |                 "id": "test_adset_id",
 34 |                 "daily_budget": "5000",
 35 |                 "status": "ACTIVE"
 36 |             }
 37 |             yield mock
 38 |     
 39 |     @pytest.fixture
 40 |     def mock_auth_manager(self):
 41 |         """Mock for the authentication manager"""
 42 |         with patch('meta_ads_mcp.core.api.auth_manager') as mock, \
 43 |              patch('meta_ads_mcp.core.auth.get_current_access_token') as mock_get_token:
 44 |             # Mock a valid access token
 45 |             mock.get_current_access_token.return_value = "test_access_token"
 46 |             mock.is_token_valid.return_value = True
 47 |             mock.app_id = "test_app_id"
 48 |             mock_get_token.return_value = "test_access_token"
 49 |             yield mock
 50 |     
 51 |     @pytest.fixture
 52 |     def valid_adset_id(self):
 53 |         """Valid ad set ID for testing"""
 54 |         return "123456789"
 55 |     
 56 |     @pytest.fixture
 57 |     def valid_daily_budget(self):
 58 |         """Valid daily budget amount in cents"""
 59 |         return "5000"  # $50.00
 60 |     
 61 |     @pytest.fixture
 62 |     def valid_lifetime_budget(self):
 63 |         """Valid lifetime budget amount in cents"""
 64 |         return "50000"  # $500.00
 65 |     
 66 |     @pytest.mark.asyncio
 67 |     async def test_budget_update_success(self, mock_api_request, mock_auth_manager, valid_adset_id, valid_daily_budget):
 68 |         """Test successful budget update"""
 69 |         
 70 |         result = await update_adset(
 71 |             adset_id=valid_adset_id,
 72 |             daily_budget=valid_daily_budget
 73 |         )
 74 |         
 75 |         # Parse the result
 76 |         result_data = json.loads(result)
 77 |         
 78 |         # Verify the API was called with correct parameters
 79 |         mock_api_request.assert_called_once()
 80 |         call_args = mock_api_request.call_args
 81 |         
 82 |         # Check that the endpoint is correct (first argument)
 83 |         assert call_args[0][0] == valid_adset_id
 84 |         
 85 |         # Check that daily_budget was included in parameters (third argument)
 86 |         params = call_args[0][2]  # Third positional argument is params
 87 |         assert 'daily_budget' in params
 88 |         assert params['daily_budget'] == valid_daily_budget
 89 |         
 90 |         # Verify the response structure
 91 |         assert 'id' in result_data
 92 |         assert result_data['id'] == "test_adset_id"
 93 |     
 94 |     @pytest.mark.asyncio
 95 |     async def test_lifetime_budget_update_success(self, mock_api_request, mock_auth_manager, valid_adset_id, valid_lifetime_budget):
 96 |         """Test successful lifetime budget update"""
 97 |         
 98 |         result = await update_adset(
 99 |             adset_id=valid_adset_id,
100 |             lifetime_budget=valid_lifetime_budget
101 |         )
102 |         
103 |         # Parse the result
104 |         result_data = json.loads(result)
105 |         
106 |         # Verify the API was called with correct parameters
107 |         mock_api_request.assert_called_once()
108 |         call_args = mock_api_request.call_args
109 |         
110 |         # Check that lifetime_budget was included in parameters
111 |         params = call_args[0][2]  # Third positional argument is params
112 |         assert 'lifetime_budget' in params
113 |         assert params['lifetime_budget'] == valid_lifetime_budget
114 |         
115 |         # Verify the response structure
116 |         assert 'id' in result_data
117 |     
118 |     @pytest.mark.asyncio
119 |     async def test_both_budget_types_update(self, mock_api_request, mock_auth_manager, valid_adset_id, valid_daily_budget, valid_lifetime_budget):
120 |         """Test updating both daily and lifetime budget simultaneously"""
121 |         
122 |         result = await update_adset(
123 |             adset_id=valid_adset_id,
124 |             daily_budget=valid_daily_budget,
125 |             lifetime_budget=valid_lifetime_budget
126 |         )
127 |         
128 |         # Parse the result
129 |         result_data = json.loads(result)
130 |         
131 |         # Verify the API was called with correct parameters
132 |         mock_api_request.assert_called_once()
133 |         call_args = mock_api_request.call_args
134 |         
135 |         # Check that both budget parameters were included
136 |         params = call_args[0][2]  # Third positional argument is params
137 |         assert 'daily_budget' in params
138 |         assert 'lifetime_budget' in params
139 |         assert params['daily_budget'] == valid_daily_budget
140 |         assert params['lifetime_budget'] == valid_lifetime_budget
141 |         
142 |         # Verify the response structure
143 |         assert 'id' in result_data
144 |     
145 |     @pytest.mark.asyncio
146 |     async def test_budget_update_with_other_parameters(self, mock_api_request, mock_auth_manager, valid_adset_id, valid_daily_budget):
147 |         """Test budget update combined with other parameters"""
148 |         
149 |         result = await update_adset(
150 |             adset_id=valid_adset_id,
151 |             daily_budget=valid_daily_budget,
152 |             status="PAUSED",
153 |             bid_amount=1000,
154 |             bid_strategy="LOWEST_COST_WITH_BID_CAP"
155 |         )
156 |         
157 |         # Parse the result
158 |         result_data = json.loads(result)
159 |         
160 |         # Verify the API was called with correct parameters
161 |         mock_api_request.assert_called_once()
162 |         call_args = mock_api_request.call_args
163 |         
164 |         # Check that all parameters were included
165 |         params = call_args[0][2]  # Third positional argument is params
166 |         assert 'daily_budget' in params
167 |         assert 'status' in params
168 |         assert 'bid_amount' in params
169 |         assert 'bid_strategy' in params
170 |         assert params['daily_budget'] == valid_daily_budget
171 |         assert params['status'] == "PAUSED"
172 |         assert params['bid_amount'] == "1000"
173 |         assert params['bid_strategy'] == "LOWEST_COST_WITH_BID_CAP"
174 |         
175 |         # Verify the response structure
176 |         assert 'id' in result_data
177 |     
178 |     @pytest.mark.asyncio
179 |     async def test_budget_update_with_numeric_values(self, mock_api_request, mock_auth_manager, valid_adset_id):
180 |         """Test budget update with numeric values (should be converted to strings)"""
181 |         
182 |         result = await update_adset(
183 |             adset_id=valid_adset_id,
184 |             daily_budget=5000,  # Integer
185 |             lifetime_budget=50000  # Integer
186 |         )
187 |         
188 |         # Parse the result
189 |         result_data = json.loads(result)
190 |         
191 |         # Verify the API was called with correct parameters
192 |         mock_api_request.assert_called_once()
193 |         call_args = mock_api_request.call_args
194 |         
195 |         # Check that numeric values were converted to strings
196 |         params = call_args[0][2]  # Third positional argument is params
197 |         assert 'daily_budget' in params
198 |         assert 'lifetime_budget' in params
199 |         assert params['daily_budget'] == "5000"
200 |         assert params['lifetime_budget'] == "50000"
201 |         
202 |         # Verify the response structure
203 |         assert 'id' in result_data
204 |     
205 |     @pytest.mark.asyncio
206 |     async def test_budget_update_with_zero_budget(self, mock_api_request, mock_auth_manager, valid_adset_id):
207 |         """Test budget update with zero budget (should be allowed)"""
208 |         
209 |         result = await update_adset(
210 |             adset_id=valid_adset_id,
211 |             daily_budget="0"
212 |         )
213 |         
214 |         # Parse the result
215 |         result_data = json.loads(result)
216 |         
217 |         # Verify the API was called with zero budget
218 |         mock_api_request.assert_called_once()
219 |         call_args = mock_api_request.call_args
220 |         
221 |         params = call_args[0][2]  # Third positional argument is params
222 |         assert 'daily_budget' in params
223 |         assert params['daily_budget'] == "0"
224 |         
225 |         # Verify the response structure
226 |         assert 'id' in result_data
227 |     
228 |     @pytest.mark.asyncio
229 |     async def test_budget_update_with_high_budget(self, mock_api_request, mock_auth_manager, valid_adset_id):
230 |         """Test budget update with high budget values"""
231 |         
232 |         high_budget = "1000000"  # $10,000
233 |         
234 |         result = await update_adset(
235 |             adset_id=valid_adset_id,
236 |             daily_budget=high_budget
237 |         )
238 |         
239 |         # Parse the result
240 |         result_data = json.loads(result)
241 |         
242 |         # Verify the API was called with high budget
243 |         mock_api_request.assert_called_once()
244 |         call_args = mock_api_request.call_args
245 |         
246 |         params = call_args[0][2]  # Third positional argument is params
247 |         assert 'daily_budget' in params
248 |         assert params['daily_budget'] == high_budget
249 |         
250 |         # Verify the response structure
251 |         assert 'id' in result_data
252 |     
253 |     @pytest.mark.asyncio
254 |     async def test_budget_update_api_error_handling(self, mock_api_request, mock_auth_manager, valid_adset_id, valid_daily_budget):
255 |         """Test error handling when API call fails"""
256 |         
257 |         # Mock API to raise an exception
258 |         mock_api_request.side_effect = Exception("API Error: Invalid budget amount")
259 |         
260 |         result = await update_adset(
261 |             adset_id=valid_adset_id,
262 |             daily_budget=valid_daily_budget
263 |         )
264 |         
265 |         # Parse the result
266 |         result_data = json.loads(result)
267 |         
268 |         # Verify error response structure
269 |         # The error is wrapped in a 'data' field as JSON string
270 |         error_data = json.loads(result_data['data'])
271 |         assert 'error' in error_data
272 |         assert 'details' in error_data
273 |         assert 'params_sent' in error_data
274 |         assert "Failed to update ad set" in error_data['error']
275 |         assert "API Error: Invalid budget amount" in error_data['details']
276 |         assert valid_adset_id in error_data['error']
277 |         
278 |         # Verify the parameters that were sent
279 |         assert error_data['params_sent']['daily_budget'] == valid_daily_budget
280 |     
281 |     @pytest.mark.asyncio
282 |     async def test_budget_update_with_invalid_adset_id(self, mock_api_request, mock_auth_manager):
283 |         """Test budget update with invalid ad set ID"""
284 |         
285 |         result = await update_adset(
286 |             adset_id="",  # Empty ad set ID
287 |             daily_budget="5000"
288 |         )
289 |         
290 |         # Parse the result
291 |         result_data = json.loads(result)
292 |         
293 |         # Verify error response
294 |         error_data = json.loads(result_data['data'])
295 |         assert 'error' in error_data
296 |         assert "No ad set ID provided" in error_data['error']
297 |         
298 |         # Verify API was not called
299 |         mock_api_request.assert_not_called()
300 |     
301 |     @pytest.mark.asyncio
302 |     async def test_budget_update_with_no_parameters(self, mock_api_request, mock_auth_manager, valid_adset_id):
303 |         """Test budget update with no parameters provided"""
304 |         
305 |         result = await update_adset(
306 |             adset_id=valid_adset_id
307 |             # No parameters provided
308 |         )
309 |         
310 |         # Parse the result
311 |         result_data = json.loads(result)
312 |         
313 |         # Verify error response
314 |         error_data = json.loads(result_data['data'])
315 |         assert 'error' in error_data
316 |         assert "No update parameters provided" in error_data['error']
317 |         
318 |         # Verify API was not called
319 |         mock_api_request.assert_not_called()
320 |     
321 |     @pytest.mark.asyncio
322 |     async def test_budget_update_with_negative_budget(self, mock_api_request, mock_auth_manager, valid_adset_id):
323 |         """Test budget update with negative budget (should be handled by API)"""
324 |         
325 |         # Mock API to handle negative budget error
326 |         mock_api_request.side_effect = Exception("API Error: Budget amount must be positive")
327 |         
328 |         result = await update_adset(
329 |             adset_id=valid_adset_id,
330 |             daily_budget="-1000"
331 |         )
332 |         
333 |         # Parse the result
334 |         result_data = json.loads(result)
335 |         
336 |         # Verify error response structure
337 |         error_data = json.loads(result_data['data'])
338 |         assert 'error' in error_data
339 |         assert 'details' in error_data
340 |         assert "API Error: Budget amount must be positive" in error_data['details']
341 |     
342 |     @pytest.mark.asyncio
343 |     async def test_budget_update_with_non_numeric_string(self, mock_api_request, mock_auth_manager, valid_adset_id):
344 |         """Test budget update with non-numeric string (should be handled by API)"""
345 |         
346 |         # Mock API to handle non-numeric budget error
347 |         mock_api_request.side_effect = Exception("API Error: Invalid budget format")
348 |         
349 |         result = await update_adset(
350 |             adset_id=valid_adset_id,
351 |             daily_budget="invalid_budget"
352 |         )
353 |         
354 |         # Parse the result
355 |         result_data = json.loads(result)
356 |         
357 |         # Verify error response structure
358 |         error_data = json.loads(result_data['data'])
359 |         assert 'error' in error_data
360 |         assert 'details' in error_data
361 |         assert "API Error: Invalid budget format" in error_data['details']
362 |     
363 |     @pytest.mark.asyncio
364 |     async def test_budget_update_with_permission_error(self, mock_api_request, mock_auth_manager, valid_adset_id, valid_daily_budget):
365 |         """Test budget update with permission error"""
366 |         
367 |         # Mock API to raise permission error
368 |         mock_api_request.side_effect = Exception("API Error: (#100) Insufficient permissions")
369 |         
370 |         result = await update_adset(
371 |             adset_id=valid_adset_id,
372 |             daily_budget=valid_daily_budget
373 |         )
374 |         
375 |         # Parse the result
376 |         result_data = json.loads(result)
377 |         
378 |         # Verify error response structure
379 |         error_data = json.loads(result_data['data'])
380 |         assert 'error' in error_data
381 |         assert 'details' in error_data
382 |         assert "Insufficient permissions" in error_data['details']
383 |     
384 |     @pytest.mark.asyncio
385 |     async def test_budget_update_with_targeting(self, mock_api_request, mock_auth_manager, valid_adset_id, valid_daily_budget):
386 |         """Test budget update combined with targeting update"""
387 |         
388 |         targeting = {
389 |             "age_min": 25,
390 |             "age_max": 45,
391 |             "geo_locations": {"countries": ["US", "CA"]}
392 |         }
393 |         
394 |         result = await update_adset(
395 |             adset_id=valid_adset_id,
396 |             daily_budget=valid_daily_budget,
397 |             targeting=targeting
398 |         )
399 |         
400 |         # Parse the result
401 |         result_data = json.loads(result)
402 |         
403 |         # Verify the API was called with correct parameters
404 |         mock_api_request.assert_called_once()
405 |         call_args = mock_api_request.call_args
406 |         
407 |         # Check that both budget and targeting were included
408 |         params = call_args[0][2]  # Third positional argument is params
409 |         assert 'daily_budget' in params
410 |         assert 'targeting' in params
411 |         assert params['daily_budget'] == valid_daily_budget
412 |         
413 |         # Verify targeting was properly JSON encoded
414 |         targeting_json = json.loads(params['targeting'])
415 |         assert targeting_json['age_min'] == 25
416 |         assert targeting_json['age_max'] == 45
417 |         assert targeting_json['geo_locations']['countries'] == ["US", "CA"]
418 |         
419 |         # Verify the response structure
420 |         assert 'id' in result_data
421 | 
422 | 
423 | class TestBudgetUpdateIntegration:
424 |     """Integration tests for budget update functionality"""
425 |     
426 |     @pytest.mark.asyncio
427 |     async def test_budget_update_workflow(self):
428 |         """Test complete budget update workflow"""
429 |         
430 |         # This test would require a real ad set ID and valid API credentials
431 |         # For now, we'll test the function signature and parameter handling
432 |         
433 |         # Test that the function accepts the new parameters
434 |         with patch('meta_ads_mcp.core.adsets.make_api_request') as mock_api, \
435 |              patch('meta_ads_mcp.core.api.auth_manager') as mock_auth, \
436 |              patch('meta_ads_mcp.core.auth.get_current_access_token') as mock_get_token:
437 |             
438 |             mock_api.return_value = {"id": "test_id", "daily_budget": "5000"}
439 |             mock_auth.get_current_access_token.return_value = "test_access_token"
440 |             mock_auth.is_token_valid.return_value = True
441 |             mock_auth.app_id = "test_app_id"
442 |             mock_get_token.return_value = "test_access_token"
443 |             
444 |             result = await update_adset(
445 |                 adset_id="test_adset_id",
446 |                 daily_budget="5000",
447 |                 lifetime_budget="50000"
448 |             )
449 |             
450 |             # Verify the function executed without errors
451 |             assert result is not None
452 |             # The result might already be a dict or a JSON string
453 |             if isinstance(result, str):
454 |                 result_data = json.loads(result)
455 |             else:
456 |                 result_data = result
457 |             assert 'id' in result_data
458 | 
459 | 
460 | if __name__ == "__main__":
461 |     pytest.main([__file__, "-v"]) 
```

--------------------------------------------------------------------------------
/meta_ads_mcp/core/server.py:
--------------------------------------------------------------------------------

```python
  1 | """MCP server configuration for Meta Ads API."""
  2 | 
  3 | from mcp.server.fastmcp import FastMCP
  4 | import argparse
  5 | import os
  6 | import sys
  7 | import webbrowser
  8 | import json
  9 | from typing import Dict, Any, Optional
 10 | from .auth import login as login_auth
 11 | from .resources import list_resources, get_resource
 12 | from .utils import logger
 13 | from .pipeboard_auth import pipeboard_auth_manager
 14 | import time
 15 | 
 16 | # Initialize FastMCP server
 17 | mcp_server = FastMCP("meta-ads")
 18 | 
 19 | # Register resource URIs
 20 | mcp_server.resource(uri="meta-ads://resources")(list_resources)
 21 | mcp_server.resource(uri="meta-ads://images/{resource_id}")(get_resource)
 22 | 
 23 | 
 24 | class StreamableHTTPHandler:
 25 |     """Handles stateless Streamable HTTP requests for Meta Ads MCP"""
 26 |     
 27 |     def __init__(self):
 28 |         """Initialize handler with no session storage - all auth per request"""
 29 |         logger.debug("StreamableHTTPHandler initialized for stateless operation")
 30 |         
 31 |     def handle_request(self, request_headers: Dict[str, str], request_body: Dict[str, Any]) -> Dict[str, Any]:
 32 |         """Handle individual request with authentication
 33 |         
 34 |         Args:
 35 |             request_headers: HTTP request headers
 36 |             request_body: JSON-RPC request body
 37 |             
 38 |         Returns:
 39 |             JSON response with auth status and any tool results
 40 |         """
 41 |         try:
 42 |             # Extract authentication configuration from headers
 43 |             auth_config = self.get_auth_config_from_headers(request_headers)
 44 |             logger.debug(f"Auth method detected: {auth_config['auth_method']}")
 45 |             
 46 |             # Handle based on auth method
 47 |             if auth_config['auth_method'] == 'bearer':
 48 |                 return self.handle_bearer_request(auth_config, request_body)
 49 |             elif auth_config['auth_method'] == 'custom_meta_app':
 50 |                 return self.handle_custom_app_request(auth_config, request_body)
 51 |             else:
 52 |                 return self.handle_unauthenticated_request(request_body)
 53 |                 
 54 |         except Exception as e:
 55 |             logger.error(f"Error handling request: {e}")
 56 |             return {
 57 |                 'jsonrpc': '2.0',
 58 |                 'error': {
 59 |                     'code': -32603,
 60 |                     'message': 'Internal error',
 61 |                     'data': str(e)
 62 |                 },
 63 |                 'id': request_body.get('id')
 64 |             }
 65 |     
 66 |     def get_auth_config_from_headers(self, request_headers: Dict[str, str]) -> Dict[str, Any]:
 67 |         """Extract authentication configuration from HTTP headers
 68 |         
 69 |         Args:
 70 |             request_headers: HTTP request headers
 71 |             
 72 |         Returns:
 73 |             Dictionary with auth method and relevant credentials
 74 |         """
 75 |         # Security validation - only allow safe headers
 76 |         ALLOWED_VIA_HEADERS = {
 77 |             'pipeboard_api_token': True,   # ✅ Primary method - simple and secure
 78 |             'meta_app_id': True,           # ✅ Fallback only - triggers OAuth complexity
 79 |             'meta_app_secret': False,      # ❌ Server environment only
 80 |             'meta_access_token': False,    # ❌ Use proper auth flows instead
 81 |         }
 82 |         
 83 |         # PRIMARY: Check for Bearer token in Authorization header (handles 90%+ of cases)
 84 |         auth_header = request_headers.get('Authorization') or request_headers.get('authorization')
 85 |         if auth_header and auth_header.lower().startswith('bearer '):
 86 |             token = auth_header[7:].strip()
 87 |             logger.info("Bearer authentication detected (primary path)")
 88 |             return {
 89 |                 'auth_method': 'bearer',
 90 |                 'bearer_token': token,
 91 |                 'requires_oauth': False  # Simple token-based auth
 92 |             }
 93 |         
 94 |         # FALLBACK: Custom Meta app (minority of users)
 95 |         meta_app_id = request_headers.get('X-META-APP-ID') or request_headers.get('x-meta-app-id')
 96 |         if meta_app_id:
 97 |             logger.debug("Custom Meta app authentication detected (fallback path)")
 98 |             return {
 99 |                 'auth_method': 'custom_meta_app',
100 |                 'meta_app_id': meta_app_id,
101 |                 'requires_oauth': True  # Complex OAuth flow required
102 |             }
103 |         
104 |         # No authentication provided
105 |         logger.warning("No authentication method detected in headers")
106 |         return {
107 |             'auth_method': 'none',
108 |             'requires_oauth': False
109 |         }
110 |     
111 |     def handle_bearer_request(self, auth_config: Dict[str, Any], request_body: Dict[str, Any]) -> Dict[str, Any]:
112 |         """Handle request with Bearer token (primary path)
113 |         
114 |         Args:
115 |             auth_config: Authentication configuration from headers
116 |             request_body: JSON-RPC request body
117 |             
118 |         Returns:
119 |             JSON response ready for tool execution
120 |         """
121 |         logger.debug("Processing Bearer authenticated request")
122 |         token = auth_config['bearer_token']
123 |         
124 |         # Token is ready to use immediately for API calls
125 |         # TODO: In next phases, this will execute the actual tool call
126 |         return {
127 |             'jsonrpc': '2.0',
128 |             'result': {
129 |                 'status': 'ready',
130 |                 'auth_method': 'bearer',
131 |                 'message': 'Authentication successful with Bearer token',
132 |                 'token_source': 'bearer_header'
133 |             },
134 |             'id': request_body.get('id')
135 |         }
136 |     
137 |     def handle_custom_app_request(self, auth_config: Dict[str, Any], request_body: Dict[str, Any]) -> Dict[str, Any]:
138 |         """Handle request with custom Meta app (fallback path)
139 |         
140 |         Args:
141 |             auth_config: Authentication configuration from headers
142 |             request_body: JSON-RPC request body
143 |             
144 |         Returns:
145 |             JSON response indicating OAuth flow is required
146 |         """
147 |         logger.debug("Processing custom Meta app request (OAuth required)")
148 |         
149 |         # This may require OAuth flow initiation
150 |         # Each request is independent - no session state
151 |         return {
152 |             'jsonrpc': '2.0',
153 |             'result': {
154 |                 'status': 'oauth_required',
155 |                 'auth_method': 'custom_meta_app',
156 |                 'meta_app_id': auth_config['meta_app_id'],
157 |                 'message': 'OAuth flow required for custom Meta app authentication',
158 |                 'next_steps': 'Use get_login_link tool to initiate OAuth flow'
159 |             },
160 |             'id': request_body.get('id')
161 |         }
162 |     
163 |     def handle_unauthenticated_request(self, request_body: Dict[str, Any]) -> Dict[str, Any]:
164 |         """Handle request with no authentication
165 |         
166 |         Args:
167 |             request_body: JSON-RPC request body
168 |             
169 |         Returns:
170 |             JSON error response requesting authentication
171 |         """
172 |         logger.warning("Unauthenticated request received")
173 |         
174 |         return {
175 |             'jsonrpc': '2.0',
176 |             'error': {
177 |                 'code': -32600,
178 |                 'message': 'Authentication required',
179 |                 'data': {
180 |                     'supported_methods': [
181 |                         'Authorization: Bearer <token> (recommended)',
182 |                         'X-META-APP-ID: Custom Meta app OAuth (advanced users)'
183 |                     ],
184 |                     'documentation': 'https://github.com/pipeboard-co/meta-ads-mcp'
185 |                 }
186 |             },
187 |             'id': request_body.get('id')
188 |         }
189 | 
190 | 
191 | def login_cli():
192 |     """
193 |     Command-line function to authenticate with Meta
194 |     """
195 |     logger.info("Starting Meta Ads CLI authentication flow")
196 |     print("Starting Meta Ads CLI authentication flow...")
197 |     
198 |     # Call the common login function
199 |     login_auth()
200 | 
201 | 
202 | def main():
203 |     """Main entry point for the package"""
204 |     # Log startup information
205 |     logger.info("Meta Ads MCP server starting")
206 |     logger.debug(f"Python version: {sys.version}")
207 |     logger.debug(f"Args: {sys.argv}")
208 |     
209 |     # Initialize argument parser
210 |     parser = argparse.ArgumentParser(
211 |         description="Meta Ads MCP Server - Model Context Protocol server for Meta Ads API",
212 |         epilog="For more information, see https://github.com/pipeboard-co/meta-ads-mcp"
213 |     )
214 |     parser.add_argument("--login", action="store_true", help="Authenticate with Meta and store the token")
215 |     parser.add_argument("--app-id", type=str, help="Meta App ID (Client ID) for authentication")
216 |     parser.add_argument("--version", action="store_true", help="Show the version of the package")
217 |     
218 |     # Transport configuration arguments
219 |     parser.add_argument("--transport", type=str, choices=["stdio", "streamable-http"], 
220 |                        default="stdio", 
221 |                        help="Transport method: 'stdio' for MCP clients (default), 'streamable-http' for HTTP API access")
222 |     parser.add_argument("--port", type=int, default=8080, 
223 |                        help="Port for Streamable HTTP transport (default: 8080, only used with --transport streamable-http)")
224 |     parser.add_argument("--host", type=str, default="localhost", 
225 |                        help="Host for Streamable HTTP transport (default: localhost, only used with --transport streamable-http)")
226 |     parser.add_argument("--sse-response", action="store_true", 
227 |                        help="Use SSE response format instead of JSON (default: JSON, only used with --transport streamable-http)")
228 |     
229 |     args = parser.parse_args()
230 |     logger.debug(f"Parsed args: login={args.login}, app_id={args.app_id}, version={args.version}")
231 |     logger.debug(f"Transport args: transport={args.transport}, port={args.port}, host={args.host}, sse_response={args.sse_response}")
232 |     
233 |     # Validate CLI argument combinations
234 |     if args.transport == "stdio" and (args.port != 8080 or args.host != "localhost" or args.sse_response):
235 |         logger.warning("HTTP transport arguments (--port, --host, --sse-response) are ignored when using stdio transport")
236 |         print("Warning: HTTP transport arguments are ignored when using stdio transport")
237 |     
238 |     # Update app ID if provided as environment variable or command line arg
239 |     from .auth import auth_manager, meta_config
240 |     
241 |     # Check environment variable first (early init)
242 |     env_app_id = os.environ.get("META_APP_ID")
243 |     if env_app_id:
244 |         logger.debug(f"Found META_APP_ID in environment: {env_app_id}")
245 |     else:
246 |         logger.warning("META_APP_ID not found in environment variables")
247 |     
248 |     # Command line takes precedence
249 |     if args.app_id:
250 |         logger.info(f"Setting app_id from command line: {args.app_id}")
251 |         auth_manager.app_id = args.app_id
252 |         meta_config.set_app_id(args.app_id)
253 |     elif env_app_id:
254 |         logger.info(f"Setting app_id from environment: {env_app_id}")
255 |         auth_manager.app_id = env_app_id
256 |         meta_config.set_app_id(env_app_id)
257 |     
258 |     # Log the final app ID that will be used
259 |     logger.info(f"Final app_id from meta_config: {meta_config.get_app_id()}")
260 |     logger.info(f"Final app_id from auth_manager: {auth_manager.app_id}")
261 |     logger.info(f"ENV META_APP_ID: {os.environ.get('META_APP_ID')}")
262 |     
263 |     # Show version if requested
264 |     if args.version:
265 |         from meta_ads_mcp import __version__
266 |         logger.info(f"Displaying version: {__version__}")
267 |         print(f"Meta Ads MCP v{__version__}")
268 |         return 0
269 |     
270 |     # Handle login command
271 |     if args.login:
272 |         login_cli()
273 |         return 0
274 |     
275 |     # Check for Pipeboard authentication and token
276 |     pipeboard_api_token = os.environ.get("PIPEBOARD_API_TOKEN")
277 |     if pipeboard_api_token:
278 |         logger.info("Using Pipeboard authentication")
279 |         print("✅ Pipeboard authentication enabled")
280 |         print(f"   API token: {pipeboard_api_token[:8]}...{pipeboard_api_token[-4:]}")
281 |         # Check for existing token
282 |         token = pipeboard_auth_manager.get_access_token()
283 |         if not token:
284 |             logger.info("No valid Pipeboard token found. Initiating browser-based authentication flow.")
285 |             print("No valid Meta token found. Opening browser for authentication...")
286 |             try:
287 |                 # Initialize the auth flow and get the login URL
288 |                 auth_data = pipeboard_auth_manager.initiate_auth_flow()
289 |                 login_url = auth_data.get('loginUrl')
290 |                 if login_url:
291 |                     logger.info(f"Opening browser with login URL: {login_url}")
292 |                     webbrowser.open(login_url)
293 |                     print("Please authorize the application in your browser.")
294 |                     print("After authorization, the token will be automatically retrieved.")
295 |                     print("Waiting for authentication to complete...")
296 |                     
297 |                     # Poll for token completion
298 |                     max_attempts = 30  # Try for 30 * 2 = 60 seconds
299 |                     for attempt in range(max_attempts):
300 |                         print(f"Waiting for authentication... ({attempt+1}/{max_attempts})")
301 |                         # Try to get the token again
302 |                         token = pipeboard_auth_manager.get_access_token(force_refresh=True)
303 |                         if token:
304 |                             print("Authentication successful!")
305 |                             break
306 |                         time.sleep(2)  # Wait 2 seconds between attempts
307 |                     
308 |                     if not token:
309 |                         print("Authentication timed out. Starting server anyway.")
310 |                         print("You may need to restart the server after completing authentication.")
311 |                 else:
312 |                     logger.error("No login URL received from Pipeboard API")
313 |                     print("Error: Could not get authentication URL. Check your API token.")
314 |             except Exception as e:
315 |                 logger.error(f"Error initiating browser-based authentication: {e}")
316 |                 print(f"Error: Could not start authentication: {e}")
317 |         else:
318 |             print(f"✅ Valid Pipeboard access token found")
319 |             print(f"   Token preview: {token[:10]}...{token[-5:]}")
320 |     
321 |     # Transport-specific server initialization and startup
322 |     if args.transport == "streamable-http":
323 |         logger.info(f"Starting MCP server with Streamable HTTP transport on {args.host}:{args.port}")
324 |         logger.info("Mode: Stateless (no session persistence)")
325 |         logger.info(f"Response format: {'SSE' if args.sse_response else 'JSON'}")
326 |         logger.info("Primary auth method: Bearer Token (recommended)")
327 |         logger.info("Fallback auth method: Custom Meta App OAuth (complex setup)")
328 |         
329 |         print(f"Starting Meta Ads MCP server with Streamable HTTP transport")
330 |         print(f"Server will listen on {args.host}:{args.port}")
331 |         print(f"Response format: {'SSE' if args.sse_response else 'JSON'}")
332 |         print("Primary authentication: Bearer Token (via Authorization: Bearer <token> header)")
333 |         print("Fallback authentication: Custom Meta App OAuth (via X-META-APP-ID header)")
334 |         
335 |         # Configure the existing server with streamable HTTP settings
336 |         mcp_server.settings.host = args.host
337 |         mcp_server.settings.port = args.port
338 |         mcp_server.settings.stateless_http = True
339 |         mcp_server.settings.json_response = not args.sse_response
340 |         
341 |         # Import all tool modules to ensure they are registered
342 |         logger.info("Ensuring all tools are registered for HTTP transport")
343 |         from . import accounts, campaigns, adsets, ads, insights, authentication
344 |         from . import ads_library, budget_schedules, reports, openai_deep_research
345 |         
346 |         # ✅ NEW: Setup HTTP authentication middleware
347 |         logger.info("Setting up HTTP authentication middleware")
348 |         try:
349 |             from .http_auth_integration import setup_fastmcp_http_auth
350 |             
351 |             # Setup the FastMCP HTTP auth integration
352 |             setup_fastmcp_http_auth(mcp_server)
353 |             logger.info("FastMCP HTTP authentication integration setup successful")
354 |             print("✅ FastMCP HTTP authentication integration enabled")
355 |             print("   - Bearer tokens via Authorization: Bearer <token> header")
356 |             print("   - Direct Meta tokens via X-META-ACCESS-TOKEN header")
357 |             
358 |         except Exception as e:
359 |             logger.error(f"Failed to setup FastMCP HTTP authentication integration: {e}")
360 |             print(f"⚠️  FastMCP HTTP authentication integration setup failed: {e}")
361 |             print("   Server will still start but may not support header-based auth")
362 |         
363 |         # Log final server configuration
364 |         logger.info(f"FastMCP server configured with:")
365 |         logger.info(f"  - Host: {mcp_server.settings.host}")
366 |         logger.info(f"  - Port: {mcp_server.settings.port}")
367 |         logger.info(f"  - Stateless HTTP: {mcp_server.settings.stateless_http}")
368 |         logger.info(f"  - JSON Response: {mcp_server.settings.json_response}")
369 |         logger.info(f"  - Streamable HTTP Path: {mcp_server.settings.streamable_http_path}")
370 |         
371 |         # Start the FastMCP server with Streamable HTTP transport
372 |         try:
373 |             logger.info("Starting FastMCP server with Streamable HTTP transport")
374 |             print(f"✅ Server configured successfully")
375 |             print(f"   URL: http://{args.host}:{args.port}{mcp_server.settings.streamable_http_path}/")
376 |             print(f"   Mode: {'Stateless' if mcp_server.settings.stateless_http else 'Stateful'}")
377 |             print(f"   Format: {'JSON' if mcp_server.settings.json_response else 'SSE'}")
378 |             mcp_server.run(transport="streamable-http")
379 |         except Exception as e:
380 |             logger.error(f"Error starting Streamable HTTP server: {e}")
381 |             print(f"Error: Failed to start Streamable HTTP server: {e}")
382 |             return 1
383 |     else:
384 |         # Default stdio transport
385 |         logger.info("Starting MCP server with stdio transport")
386 |         mcp_server.run(transport='stdio') 
```

--------------------------------------------------------------------------------
/tests/test_estimate_audience_size.py:
--------------------------------------------------------------------------------

```python
  1 | #!/usr/bin/env python3
  2 | """
  3 | Unit tests for estimate_audience_size functionality in Meta Ads MCP.
  4 | 
  5 | This module tests the new estimate_audience_size function that replaces validate_interests
  6 | and provides comprehensive audience estimation using Meta's reachestimate API.
  7 | """
  8 | 
  9 | import pytest
 10 | import json
 11 | from unittest.mock import AsyncMock, patch
 12 | 
 13 | from meta_ads_mcp.core.targeting import estimate_audience_size
 14 | 
 15 | 
 16 | class TestEstimateAudienceSize:
 17 |     """Test cases for estimate_audience_size function"""
 18 |     
 19 |     @pytest.mark.asyncio
 20 |     async def test_comprehensive_audience_estimation_success(self):
 21 |         """Test successful comprehensive audience estimation with complex targeting"""
 22 |         mock_response = {
 23 |             "data": [
 24 |                 {
 25 |                     "estimate_mau": 1500000,
 26 |                     "estimate_dau": [
 27 |                         {"min_reach": 100000, "max_reach": 150000, "bid": 100},
 28 |                         {"min_reach": 200000, "max_reach": 250000, "bid": 200}
 29 |                     ],
 30 |                     "bid_estimates": {
 31 |                         "median": 150,
 32 |                         "min": 50,
 33 |                         "max": 300
 34 |                     },
 35 |                     "unsupported_targeting": []
 36 |                 }
 37 |             ]
 38 |         }
 39 |         
 40 |         targeting_spec = {
 41 |             "age_min": 25,
 42 |             "age_max": 65,
 43 |             "geo_locations": {"countries": ["US"]},
 44 |             "flexible_spec": [
 45 |                 {"interests": [{"id": "6003371567474"}]}
 46 |             ]
 47 |         }
 48 |         
 49 |         with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
 50 |             mock_api.return_value = mock_response
 51 |             
 52 |             result = await estimate_audience_size(
 53 |                 access_token="test_token",
 54 |                 account_id="act_123456789",
 55 |                 targeting=targeting_spec,
 56 |                 optimization_goal="REACH"
 57 |             )
 58 |             
 59 |             # Verify API call
 60 |             mock_api.assert_called_once_with(
 61 |                 "act_123456789/reachestimate",
 62 |                 "test_token",
 63 |                 {
 64 |                     "targeting_spec": targeting_spec
 65 |                 },
 66 |                 method="GET"
 67 |             )
 68 |             
 69 |             # Verify response format
 70 |             result_data = json.loads(result)
 71 |             assert result_data["success"] is True
 72 |             assert result_data["account_id"] == "act_123456789"
 73 |             assert result_data["targeting"] == targeting_spec
 74 |             assert result_data["optimization_goal"] == "REACH"
 75 |             assert result_data["estimated_audience_size"] == 1500000
 76 |             assert "estimate_details" in result_data
 77 |             assert result_data["estimate_details"]["monthly_active_users"] == 1500000
 78 |     
 79 |     @pytest.mark.asyncio
 80 |     async def test_different_optimization_goals(self):
 81 |         """Test audience estimation with different optimization goals (parameter is preserved in response)"""
 82 |         mock_response = {
 83 |             "data": [
 84 |                 {
 85 |                     "estimate_mau": 800000,
 86 |                     "estimate_dau": [],
 87 |                     "bid_estimates": {},
 88 |                     "unsupported_targeting": []
 89 |                 }
 90 |             ]
 91 |         }
 92 |         
 93 |         targeting_spec = {
 94 |             "age_min": 18,
 95 |             "age_max": 45,
 96 |             "geo_locations": {"countries": ["US"]}
 97 |         }
 98 |         
 99 |         # Test different optimization goals - they should all use the same reachestimate endpoint
100 |         test_goals = ["REACH", "LINK_CLICKS", "LANDING_PAGE_VIEWS", "CONVERSIONS", "APP_INSTALLS"]
101 |         
102 |         with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
103 |             mock_api.return_value = mock_response
104 |             
105 |             for optimization_goal in test_goals:
106 |                 mock_api.reset_mock()
107 |                 
108 |                 result = await estimate_audience_size(
109 |                     access_token="test_token",
110 |                     account_id="act_123456789",
111 |                     targeting=targeting_spec,
112 |                     optimization_goal=optimization_goal
113 |                 )
114 |                 
115 |                 # Verify API call uses reachestimate endpoint with simplified parameters
116 |                 mock_api.assert_called_once_with(
117 |                     "act_123456789/reachestimate",
118 |                     "test_token",
119 |                     {
120 |                         "targeting_spec": targeting_spec
121 |                     },
122 |                     method="GET"
123 |                 )
124 |                 
125 |                 result_data = json.loads(result)
126 |                 assert result_data["success"] is True
127 |                 assert result_data["optimization_goal"] == optimization_goal
128 |     
129 |     @pytest.mark.asyncio
130 |     async def test_backwards_compatibility_interest_names(self):
131 |         """Test backwards compatibility with interest name validation"""
132 |         mock_response = {
133 |             "data": [
134 |                 {
135 |                     "name": "Japan",
136 |                     "valid": True,
137 |                     "id": 6003700426513,
138 |                     "audience_size": 68310258
139 |                 },
140 |                 {
141 |                     "name": "invalidinterest",
142 |                     "valid": False
143 |                 }
144 |             ]
145 |         }
146 |         
147 |         with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
148 |             mock_api.return_value = mock_response
149 |             
150 |             result = await estimate_audience_size(
151 |                 access_token="test_token",
152 |                 interest_list=["Japan", "invalidinterest"]
153 |             )
154 |             
155 |             # Verify it uses the old validation API
156 |             mock_api.assert_called_once_with(
157 |                 "search",
158 |                 "test_token",
159 |                 {
160 |                     "type": "adinterestvalid",
161 |                     "interest_list": '["Japan", "invalidinterest"]'
162 |                 }
163 |             )
164 |             
165 |             # Verify response matches old format
166 |             result_data = json.loads(result)
167 |             assert result_data == mock_response
168 |             assert result_data["data"][0]["valid"] is True
169 |             assert result_data["data"][1]["valid"] is False
170 |     
171 |     @pytest.mark.asyncio
172 |     async def test_backwards_compatibility_interest_fbids(self):
173 |         """Test backwards compatibility with interest FBID validation"""
174 |         mock_response = {
175 |             "data": [
176 |                 {
177 |                     "id": "6003700426513",
178 |                     "valid": True,
179 |                     "audience_size": 68310258
180 |                 }
181 |             ]
182 |         }
183 |         
184 |         with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
185 |             mock_api.return_value = mock_response
186 |             
187 |             result = await estimate_audience_size(
188 |                 access_token="test_token",
189 |                 interest_fbid_list=["6003700426513"]
190 |             )
191 |             
192 |             # Verify it uses the old validation API
193 |             mock_api.assert_called_once_with(
194 |                 "search",
195 |                 "test_token",
196 |                 {
197 |                     "type": "adinterestvalid",
198 |                     "interest_fbid_list": '["6003700426513"]'
199 |                 }
200 |             )
201 |             
202 |             # Verify response matches old format
203 |             result_data = json.loads(result)
204 |             assert result_data == mock_response
205 |             assert result_data["data"][0]["valid"] is True
206 |     
207 |     @pytest.mark.asyncio
208 |     async def test_backwards_compatibility_both_interest_params(self):
209 |         """Test backwards compatibility with both interest names and FBIDs"""
210 |         mock_response = {
211 |             "data": [
212 |                 {"name": "Japan", "valid": True, "id": 6003700426513, "audience_size": 68310258},
213 |                 {"id": "6003397425735", "valid": True, "audience_size": 12345678}
214 |             ]
215 |         }
216 |         
217 |         with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
218 |             mock_api.return_value = mock_response
219 |             
220 |             result = await estimate_audience_size(
221 |                 access_token="test_token",
222 |                 interest_list=["Japan"],
223 |                 interest_fbid_list=["6003397425735"]
224 |             )
225 |             
226 |             # Verify both parameters are passed
227 |             mock_api.assert_called_once_with(
228 |                 "search",
229 |                 "test_token",
230 |                 {
231 |                     "type": "adinterestvalid",
232 |                     "interest_list": '["Japan"]',
233 |                     "interest_fbid_list": '["6003397425735"]'
234 |                 }
235 |             )
236 |             
237 |             result_data = json.loads(result)
238 |             assert result_data == mock_response
239 |     
240 |     @pytest.mark.asyncio
241 |     async def test_error_no_account_id_for_comprehensive(self):
242 |         """Test error when account_id missing for comprehensive estimation"""
243 |         targeting_spec = {
244 |             "age_min": 25,
245 |             "age_max": 65,
246 |             "geo_locations": {"countries": ["US"]}
247 |         }
248 |         
249 |         result = await estimate_audience_size(
250 |             access_token="test_token",
251 |             targeting=targeting_spec
252 |         )
253 |         
254 |         result_data = json.loads(result)
255 |         # The @meta_api_tool decorator wraps errors in a 'data' field
256 |         assert "data" in result_data
257 |         nested_data = json.loads(result_data["data"])
258 |         assert "error" in nested_data
259 |         assert "account_id is required" in nested_data["error"]
260 |         assert "details" in nested_data
261 |     
262 |     @pytest.mark.asyncio
263 |     async def test_error_no_targeting_for_comprehensive(self):
264 |         """Test error when targeting missing for comprehensive estimation"""
265 |         result = await estimate_audience_size(
266 |             access_token="test_token",
267 |             account_id="act_123456789"
268 |         )
269 |         
270 |         result_data = json.loads(result)
271 |         # The @meta_api_tool decorator wraps errors in a 'data' field
272 |         assert "data" in result_data
273 |         nested_data = json.loads(result_data["data"])
274 |         assert "error" in nested_data
275 |         assert "targeting specification is required" in nested_data["error"]
276 |         assert "example" in nested_data
277 |     
278 |     @pytest.mark.asyncio
279 |     async def test_error_no_parameters(self):
280 |         """Test error when no parameters provided"""
281 |         # Since we're using the @meta_api_tool decorator, we need to simulate
282 |         # its behavior for error handling
283 |         with patch('meta_ads_mcp.core.auth.get_current_access_token') as mock_auth:
284 |             mock_auth.return_value = "test_token"
285 |             
286 |             result = await estimate_audience_size()
287 |             
288 |             result_data = json.loads(result)
289 |             # The @meta_api_tool decorator wraps errors in a 'data' field
290 |             assert "data" in result_data
291 |             nested_data = json.loads(result_data["data"])
292 |             assert "error" in nested_data
293 |     
294 |     @pytest.mark.asyncio
295 |     async def test_error_backwards_compatibility_no_interests(self):
296 |         """Test error in backwards compatibility mode with no interest parameters"""
297 |         result = await estimate_audience_size(
298 |             access_token="test_token"
299 |         )
300 |         
301 |         result_data = json.loads(result)
302 |         # The @meta_api_tool decorator wraps errors in a 'data' field
303 |         assert "data" in result_data
304 |         nested_data = json.loads(result_data["data"])
305 |         assert "error" in nested_data
306 |     
307 |     @pytest.mark.asyncio
308 |     async def test_api_error_handling(self):
309 |         """Test handling of API errors from reachestimate"""
310 |         targeting_spec = {
311 |             "age_min": 25,
312 |             "age_max": 65,
313 |             "geo_locations": {"countries": ["US"]}
314 |         }
315 |         
316 |         # Simulate API exception
317 |         with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
318 |             mock_api.side_effect = Exception("API connection failed")
319 |             
320 |             result = await estimate_audience_size(
321 |                 access_token="test_token",
322 |                 account_id="act_123456789",
323 |                 targeting=targeting_spec
324 |             )
325 |             
326 |             result_data = json.loads(result)
327 |             # The @meta_api_tool decorator wraps errors in a 'data' field
328 |             assert "data" in result_data
329 |             nested_data = json.loads(result_data["data"])
330 |             assert "error" in nested_data
331 |             assert "Failed to get audience estimation from reachestimate endpoint" in nested_data["error"]
332 |             assert "details" in nested_data
333 |     
334 |     @pytest.mark.asyncio
335 |     async def test_empty_api_response(self):
336 |         """Test handling of empty response from reachestimate API"""
337 |         targeting_spec = {
338 |             "age_min": 25,
339 |             "age_max": 65,
340 |             "geo_locations": {"countries": ["US"]}
341 |         }
342 |         
343 |         # Simulate empty API response
344 |         mock_response = {"data": []}
345 |         
346 |         with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
347 |             mock_api.return_value = mock_response
348 |             
349 |             result = await estimate_audience_size(
350 |                 access_token="test_token",
351 |                 account_id="act_123456789",
352 |                 targeting=targeting_spec
353 |             )
354 |             
355 |             result_data = json.loads(result)
356 |             # The @meta_api_tool decorator wraps errors in a 'data' field
357 |             assert "data" in result_data
358 |             nested_data = json.loads(result_data["data"])
359 |             assert "error" in nested_data
360 |             assert "No estimation data returned" in nested_data["error"]
361 |             assert "raw_response" in nested_data
362 |     
363 |     @pytest.mark.asyncio
364 |     async def test_comprehensive_estimation_with_minimal_targeting(self):
365 |         """Test comprehensive estimation with minimal targeting requirements"""
366 |         mock_response = {
367 |             "data": [
368 |                 {
369 |                     "estimate_mau": 500000,
370 |                     "estimate_dau": [],
371 |                     "bid_estimates": {},
372 |                     "unsupported_targeting": []
373 |                 }
374 |             ]
375 |         }
376 |         
377 |         minimal_targeting = {
378 |             "age_min": 18,
379 |             "age_max": 65,
380 |             "geo_locations": {"countries": ["US"]}
381 |         }
382 |         
383 |         with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
384 |             mock_api.return_value = mock_response
385 |             
386 |             result = await estimate_audience_size(
387 |                 access_token="test_token",
388 |                 account_id="act_123456789",
389 |                 targeting=minimal_targeting
390 |             )
391 |             
392 |             result_data = json.loads(result)
393 |             assert result_data["success"] is True
394 |             assert result_data["estimated_audience_size"] == 500000
395 |             assert result_data["targeting"] == minimal_targeting
396 |     
397 |     @pytest.mark.asyncio
398 |     async def test_estimate_details_structure(self):
399 |         """Test that estimate_details contains expected structure"""
400 |         mock_response = {
401 |             "data": [
402 |                 {
403 |                     "estimate_mau": 2000000,
404 |                     "estimate_dau": [
405 |                         {"min_reach": 150000, "max_reach": 200000, "bid": 120}
406 |                     ],
407 |                     "bid_estimates": {
408 |                         "median": 180,
409 |                         "min": 80,
410 |                         "max": 350
411 |                     },
412 |                     "unsupported_targeting": ["custom_audiences"]
413 |                 }
414 |             ]
415 |         }
416 |         
417 |         targeting_spec = {
418 |             "age_min": 25,
419 |             "age_max": 45,
420 |             "geo_locations": {"countries": ["US"]},
421 |             "flexible_spec": [
422 |                 {"interests": [{"id": "6003371567474"}, {"id": "6003462346642"}]}
423 |             ]
424 |         }
425 |         
426 |         with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
427 |             mock_api.return_value = mock_response
428 |             
429 |             result = await estimate_audience_size(
430 |                 access_token="test_token",
431 |                 account_id="act_123456789",
432 |                 targeting=targeting_spec,
433 |                 optimization_goal="CONVERSIONS"
434 |             )
435 |             
436 |             result_data = json.loads(result)
437 |             assert result_data["success"] is True
438 |             
439 |             # Check estimate_details structure
440 |             details = result_data["estimate_details"]
441 |             assert "monthly_active_users" in details
442 |             assert "daily_outcomes_curve" in details
443 |             assert "bid_estimate" in details
444 |             assert "unsupported_targeting" in details
445 |             
446 |             assert details["monthly_active_users"] == 2000000
447 |             assert len(details["daily_outcomes_curve"]) == 1
448 |             assert details["bid_estimate"]["median"] == 180
449 |             assert "custom_audiences" in details["unsupported_targeting"]
450 |     
451 |     @pytest.mark.asyncio
452 |     async def test_function_registration(self):
453 |         """Test that the function is properly registered as an MCP tool"""
454 |         # This test verifies the function has the correct decorators
455 |         assert hasattr(estimate_audience_size, '__wrapped__')  # From @meta_api_tool
456 |         
457 |         # Verify function signature
458 |         import inspect
459 |         sig = inspect.signature(estimate_audience_size)
460 |         
461 |         expected_params = [
462 |             'access_token', 'account_id', 'targeting', 'optimization_goal',
463 |             'interest_list', 'interest_fbid_list'
464 |         ]
465 |         
466 |         for param in expected_params:
467 |             assert param in sig.parameters
468 |         
469 |         # Verify default values
470 |         assert sig.parameters['optimization_goal'].default == "REACH"
471 |         assert sig.parameters['access_token'].default is None
472 |         assert sig.parameters['targeting'].default is None
```

--------------------------------------------------------------------------------
/meta_ads_mcp/core/openai_deep_research.py:
--------------------------------------------------------------------------------

```python
  1 | """OpenAI MCP Deep Research tools for Meta Ads API.
  2 | 
  3 | This module implements the required 'search' and 'fetch' tools for OpenAI's 
  4 | ChatGPT Deep Research feature, providing access to Meta Ads data in the format 
  5 | expected by ChatGPT.
  6 | 
  7 | The tools expose Meta Ads data (accounts, campaigns, ads, etc.) as searchable 
  8 | and fetchable records for ChatGPT Deep Research analysis.
  9 | """
 10 | 
 11 | import json
 12 | import re
 13 | from typing import List, Dict, Any, Optional
 14 | from .api import meta_api_tool, make_api_request
 15 | from .server import mcp_server
 16 | from .utils import logger
 17 | 
 18 | 
 19 | class MetaAdsDataManager:
 20 |     """Manages Meta Ads data for OpenAI MCP search and fetch operations"""
 21 |     
 22 |     def __init__(self):
 23 |         self._cache = {}
 24 |         logger.debug("MetaAdsDataManager initialized")
 25 |     
 26 |     async def _get_ad_accounts(self, access_token: str, limit: int = 200) -> List[Dict[str, Any]]:
 27 |         """Get ad accounts data"""
 28 |         try:
 29 |             endpoint = "me/adaccounts"
 30 |             params = {
 31 |                 "fields": "id,name,account_id,account_status,amount_spent,balance,currency,business_city,business_country_code",
 32 |                 "limit": limit
 33 |             }
 34 |             
 35 |             data = await make_api_request(endpoint, access_token, params)
 36 |             
 37 |             if "data" in data:
 38 |                 return data["data"]
 39 |             return []
 40 |         except Exception as e:
 41 |             logger.error(f"Error fetching ad accounts: {e}")
 42 |             return []
 43 |     
 44 |     async def _get_campaigns(self, access_token: str, account_id: str, limit: int = 25) -> List[Dict[str, Any]]:
 45 |         """Get campaigns data for an account"""
 46 |         try:
 47 |             endpoint = f"{account_id}/campaigns"
 48 |             params = {
 49 |                 "fields": "id,name,status,objective,daily_budget,lifetime_budget,start_time,stop_time,created_time,updated_time",
 50 |                 "limit": limit
 51 |             }
 52 |             
 53 |             data = await make_api_request(endpoint, access_token, params)
 54 |             
 55 |             if "data" in data:
 56 |                 return data["data"]
 57 |             return []
 58 |         except Exception as e:
 59 |             logger.error(f"Error fetching campaigns for {account_id}: {e}")
 60 |             return []
 61 |     
 62 |     async def _get_ads(self, access_token: str, account_id: str, limit: int = 25) -> List[Dict[str, Any]]:
 63 |         """Get ads data for an account"""
 64 |         try:
 65 |             endpoint = f"{account_id}/ads"
 66 |             params = {
 67 |                 "fields": "id,name,status,creative,targeting,bid_amount,created_time,updated_time",
 68 |                 "limit": limit
 69 |             }
 70 |             
 71 |             data = await make_api_request(endpoint, access_token, params)
 72 |             
 73 |             if "data" in data:
 74 |                 return data["data"]
 75 |             return []
 76 |         except Exception as e:
 77 |             logger.error(f"Error fetching ads for {account_id}: {e}")
 78 |             return []
 79 |     
 80 |     async def _get_pages_for_account(self, access_token: str, account_id: str) -> List[Dict[str, Any]]:
 81 |         """Get pages associated with an account"""
 82 |         try:
 83 |             # Import the page discovery function from ads module
 84 |             from .ads import _discover_pages_for_account
 85 |             
 86 |             # Ensure account_id has the 'act_' prefix
 87 |             if not account_id.startswith("act_"):
 88 |                 account_id = f"act_{account_id}"
 89 |             
 90 |             page_discovery_result = await _discover_pages_for_account(account_id, access_token)
 91 |             
 92 |             if not page_discovery_result.get("success"):
 93 |                 return []
 94 |             
 95 |             # Return page data in a consistent format
 96 |             return [{
 97 |                 "id": page_discovery_result["page_id"],
 98 |                 "name": page_discovery_result.get("page_name", "Unknown"),
 99 |                 "source": page_discovery_result.get("source", "unknown"),
100 |                 "account_id": account_id
101 |             }]
102 |         except Exception as e:
103 |             logger.error(f"Error fetching pages for {account_id}: {e}")
104 |             return []
105 |     
106 |     async def _get_businesses(self, access_token: str, user_id: str = "me", limit: int = 25) -> List[Dict[str, Any]]:
107 |         """Get businesses accessible by the current user"""
108 |         try:
109 |             endpoint = f"{user_id}/businesses"
110 |             params = {
111 |                 "fields": "id,name,created_time,verification_status",
112 |                 "limit": limit
113 |             }
114 |             
115 |             data = await make_api_request(endpoint, access_token, params)
116 |             
117 |             if "data" in data:
118 |                 return data["data"]
119 |             return []
120 |         except Exception as e:
121 |             logger.error(f"Error fetching businesses: {e}")
122 |             return []
123 |     
124 |     async def search_records(self, query: str, access_token: str) -> List[str]:
125 |         """Search Meta Ads data and return matching record IDs
126 |         
127 |         Args:
128 |             query: Search query string
129 |             access_token: Meta API access token
130 |             
131 |         Returns:
132 |             List of record IDs that match the query
133 |         """
134 |         logger.info(f"Searching Meta Ads data with query: {query}")
135 |         
136 |         # Normalize query for matching
137 |         query_lower = query.lower()
138 |         query_terms = re.findall(r'\w+', query_lower)
139 |         
140 |         matching_ids = []
141 |         
142 |         try:
143 |             # Search ad accounts
144 |             accounts = await self._get_ad_accounts(access_token, limit=200)
145 |             for account in accounts:
146 |                 account_text = f"{account.get('name', '')} {account.get('id', '')} {account.get('account_status', '')} {account.get('business_city', '')} {account.get('business_country_code', '')}".lower()
147 |                 
148 |                 if any(term in account_text for term in query_terms):
149 |                     record_id = f"account:{account['id']}"
150 |                     matching_ids.append(record_id)
151 |                     
152 |                     # Cache the account data
153 |                     self._cache[record_id] = {
154 |                         "id": record_id,
155 |                         "type": "account",
156 |                         "title": f"Ad Account: {account.get('name', 'Unnamed Account')}",
157 |                         "text": f"Meta Ads Account {account.get('name', 'Unnamed')} (ID: {account.get('id', 'N/A')}) - Status: {account.get('account_status', 'Unknown')}, Currency: {account.get('currency', 'Unknown')}, Spent: ${account.get('amount_spent', 0)}, Balance: ${account.get('balance', 0)}",
158 |                         "metadata": {
159 |                             "account_id": account.get('id'),
160 |                             "account_name": account.get('name'),
161 |                             "status": account.get('account_status'),
162 |                             "currency": account.get('currency'),
163 |                             "business_location": f"{account.get('business_city', '')}, {account.get('business_country_code', '')}".strip(', '),
164 |                             "data_type": "meta_ads_account"
165 |                         },
166 |                         "raw_data": account
167 |                     }
168 |                     
169 |                     # Also search campaigns for this account if it matches
170 |                     campaigns = await self._get_campaigns(access_token, account['id'], limit=10)
171 |                     for campaign in campaigns:
172 |                         campaign_text = f"{campaign.get('name', '')} {campaign.get('objective', '')} {campaign.get('status', '')}".lower()
173 |                         
174 |                         if any(term in campaign_text for term in query_terms):
175 |                             campaign_record_id = f"campaign:{campaign['id']}"
176 |                             matching_ids.append(campaign_record_id)
177 |                             
178 |                             # Cache the campaign data
179 |                             self._cache[campaign_record_id] = {
180 |                                 "id": campaign_record_id,
181 |                                 "type": "campaign",
182 |                                 "title": f"Campaign: {campaign.get('name', 'Unnamed Campaign')}",
183 |                                 "text": f"Meta Ads Campaign {campaign.get('name', 'Unnamed')} (ID: {campaign.get('id', 'N/A')}) - Objective: {campaign.get('objective', 'Unknown')}, Status: {campaign.get('status', 'Unknown')}, Daily Budget: ${campaign.get('daily_budget', 'Not set')}, Account: {account.get('name', 'Unknown')}",
184 |                                 "metadata": {
185 |                                     "campaign_id": campaign.get('id'),
186 |                                     "campaign_name": campaign.get('name'),
187 |                                     "objective": campaign.get('objective'),
188 |                                     "status": campaign.get('status'),
189 |                                     "account_id": account.get('id'),
190 |                                     "account_name": account.get('name'),
191 |                                     "data_type": "meta_ads_campaign"
192 |                                 },
193 |                                 "raw_data": campaign
194 |                             }
195 |             
196 |             # If query specifically mentions "ads" or "ad", also search individual ads
197 |             if any(term in ['ad', 'ads', 'advertisement', 'creative'] for term in query_terms):
198 |                 for account in accounts[:3]:  # Limit to first 3 accounts for performance
199 |                     ads = await self._get_ads(access_token, account['id'], limit=10)
200 |                     for ad in ads:
201 |                         ad_text = f"{ad.get('name', '')} {ad.get('status', '')}".lower()
202 |                         
203 |                         if any(term in ad_text for term in query_terms):
204 |                             ad_record_id = f"ad:{ad['id']}"
205 |                             matching_ids.append(ad_record_id)
206 |                             
207 |                             # Cache the ad data
208 |                             self._cache[ad_record_id] = {
209 |                                 "id": ad_record_id,
210 |                                 "type": "ad",
211 |                                 "title": f"Ad: {ad.get('name', 'Unnamed Ad')}",
212 |                                 "text": f"Meta Ad {ad.get('name', 'Unnamed')} (ID: {ad.get('id', 'N/A')}) - Status: {ad.get('status', 'Unknown')}, Bid Amount: ${ad.get('bid_amount', 'Not set')}, Account: {account.get('name', 'Unknown')}",
213 |                                 "metadata": {
214 |                                     "ad_id": ad.get('id'),
215 |                                     "ad_name": ad.get('name'),
216 |                                     "status": ad.get('status'),
217 |                                     "account_id": account.get('id'),
218 |                                     "account_name": account.get('name'),
219 |                                     "data_type": "meta_ads_ad"
220 |                                 },
221 |                                 "raw_data": ad
222 |                             }
223 |             
224 |             # If query specifically mentions "page" or "pages", also search pages
225 |             if any(term in ['page', 'pages', 'facebook page'] for term in query_terms):
226 |                 for account in accounts[:5]:  # Limit to first 5 accounts for performance
227 |                     pages = await self._get_pages_for_account(access_token, account['id'])
228 |                     for page in pages:
229 |                         page_text = f"{page.get('name', '')} {page.get('source', '')}".lower()
230 |                         
231 |                         if any(term in page_text for term in query_terms):
232 |                             page_record_id = f"page:{page['id']}"
233 |                             matching_ids.append(page_record_id)
234 |                             
235 |                             # Cache the page data
236 |                             self._cache[page_record_id] = {
237 |                                 "id": page_record_id,
238 |                                 "type": "page",
239 |                                 "title": f"Facebook Page: {page.get('name', 'Unnamed Page')}",
240 |                                 "text": f"Facebook Page {page.get('name', 'Unnamed')} (ID: {page.get('id', 'N/A')}) - Source: {page.get('source', 'Unknown')}, Account: {account.get('name', 'Unknown')}",
241 |                                 "metadata": {
242 |                                     "page_id": page.get('id'),
243 |                                     "page_name": page.get('name'),
244 |                                     "source": page.get('source'),
245 |                                     "account_id": account.get('id'),
246 |                                     "account_name": account.get('name'),
247 |                                     "data_type": "meta_ads_page"
248 |                                 },
249 |                                 "raw_data": page
250 |                             }
251 |             
252 |             # If query specifically mentions "business" or "businesses", also search businesses
253 |             if any(term in ['business', 'businesses', 'company', 'companies'] for term in query_terms):
254 |                 businesses = await self._get_businesses(access_token, limit=25)
255 |                 for business in businesses:
256 |                     business_text = f"{business.get('name', '')} {business.get('verification_status', '')}".lower()
257 |                     
258 |                     if any(term in business_text for term in query_terms):
259 |                         business_record_id = f"business:{business['id']}"
260 |                         matching_ids.append(business_record_id)
261 |                         
262 |                         # Cache the business data
263 |                         self._cache[business_record_id] = {
264 |                             "id": business_record_id,
265 |                             "type": "business",
266 |                             "title": f"Business: {business.get('name', 'Unnamed Business')}",
267 |                             "text": f"Meta Business {business.get('name', 'Unnamed')} (ID: {business.get('id', 'N/A')}) - Created: {business.get('created_time', 'Unknown')}, Verification: {business.get('verification_status', 'Unknown')}",
268 |                             "metadata": {
269 |                                 "business_id": business.get('id'),
270 |                                 "business_name": business.get('name'),
271 |                                 "created_time": business.get('created_time'),
272 |                                 "verification_status": business.get('verification_status'),
273 |                                 "data_type": "meta_ads_business"
274 |                             },
275 |                             "raw_data": business
276 |                         }
277 |         
278 |         except Exception as e:
279 |             logger.error(f"Error during search operation: {e}")
280 |             # Return empty list on error, but don't raise exception
281 |             return []
282 |         
283 |         logger.info(f"Search completed. Found {len(matching_ids)} matching records")
284 |         return matching_ids[:50]  # Limit to 50 results for performance
285 |     
286 |     def fetch_record(self, record_id: str) -> Optional[Dict[str, Any]]:
287 |         """Fetch a cached record by ID
288 |         
289 |         Args:
290 |             record_id: The record ID to fetch
291 |             
292 |         Returns:
293 |             Record data or None if not found
294 |         """
295 |         logger.info(f"Fetching record: {record_id}")
296 |         
297 |         record = self._cache.get(record_id)
298 |         if record:
299 |             logger.debug(f"Record found in cache: {record['type']}")
300 |             return record
301 |         else:
302 |             logger.warning(f"Record not found in cache: {record_id}")
303 |             return None
304 | 
305 | 
306 | # Global data manager instance
307 | _data_manager = MetaAdsDataManager()
308 | 
309 | 
310 | @mcp_server.tool()
311 | @meta_api_tool
312 | async def search(
313 |     query: str,
314 |     access_token: Optional[str] = None
315 | ) -> str:
316 |     """
317 |     Search through Meta Ads data and return matching record IDs.
318 |     It searches across ad accounts, campaigns, ads, pages, and businesses to find relevant records
319 |     based on the provided query.
320 |     
321 |     Args:
322 |         query: Search query string to find relevant Meta Ads records
323 |         access_token: Meta API access token (optional - will use cached token if not provided)
324 |         
325 |     Returns:
326 |         JSON response with list of matching record IDs
327 |         
328 |     Example Usage:
329 |         search(query="active campaigns")
330 |         search(query="account spending")
331 |         search(query="facebook ads performance")
332 |         search(query="facebook pages")
333 |         search(query="user businesses")
334 |     """
335 |     if not query:
336 |         return json.dumps({
337 |             "error": "query parameter is required",
338 |             "ids": []
339 |         }, indent=2)
340 |     
341 |     try:
342 |         # Use the data manager to search records
343 |         matching_ids = await _data_manager.search_records(query, access_token)
344 |         
345 |         response = {
346 |             "ids": matching_ids,
347 |             "query": query,
348 |             "total_results": len(matching_ids)
349 |         }
350 |         
351 |         logger.info(f"Search successful. Query: '{query}', Results: {len(matching_ids)}")
352 |         return json.dumps(response, indent=2)
353 |         
354 |     except Exception as e:
355 |         error_msg = str(e)
356 |         logger.error(f"Error in search tool: {error_msg}")
357 |         
358 |         return json.dumps({
359 |             "error": "Failed to search Meta Ads data",
360 |             "details": error_msg,
361 |             "ids": [],
362 |             "query": query
363 |         }, indent=2)
364 | 
365 | 
366 | @mcp_server.tool()
367 | async def fetch(
368 |     id: str
369 | ) -> str:
370 |     """
371 |     Fetch complete record data by ID.
372 |     It retrieves the full data for a specific record identified by its ID.
373 |     
374 |     Args:
375 |         id: The record ID to fetch (format: "type:id", e.g., "account:act_123456")
376 |         
377 |     Returns:
378 |         JSON response with complete record data including id, title, text, and metadata
379 |         
380 |     Example Usage:
381 |         fetch(id="account:act_123456789")
382 |         fetch(id="campaign:23842588888640185")
383 |         fetch(id="ad:23842614006130185")
384 |         fetch(id="page:123456789")
385 |     """
386 |     if not id:
387 |         return json.dumps({
388 |             "error": "id parameter is required"
389 |         }, indent=2)
390 |     
391 |     try:
392 |         # Use the data manager to fetch the record
393 |         record = _data_manager.fetch_record(id)
394 |         
395 |         if record:
396 |             logger.info(f"Record fetched successfully: {id}")
397 |             return json.dumps(record, indent=2)
398 |         else:
399 |             logger.warning(f"Record not found: {id}")
400 |             return json.dumps({
401 |                 "error": f"Record not found: {id}",
402 |                 "id": id
403 |             }, indent=2)
404 |             
405 |     except Exception as e:
406 |         error_msg = str(e)
407 |         logger.error(f"Error in fetch tool: {error_msg}")
408 |         
409 |         return json.dumps({
410 |             "error": "Failed to fetch record",
411 |             "details": error_msg,
412 |             "id": id
413 |         }, indent=2) 
```

--------------------------------------------------------------------------------
/meta_ads_mcp/core/duplication.py:
--------------------------------------------------------------------------------

```python
  1 | """Duplication functionality for Meta Ads API."""
  2 | 
  3 | import json
  4 | import os
  5 | import httpx
  6 | from typing import Optional, Dict, Any, List, Union
  7 | from .server import mcp_server
  8 | from .api import meta_api_tool
  9 | from . import auth
 10 | from .http_auth_integration import FastMCPAuthIntegration
 11 | 
 12 | 
 13 | # Only register the duplication functions if the environment variable is set
 14 | ENABLE_DUPLICATION = bool(os.environ.get("META_ADS_ENABLE_DUPLICATION", ""))
 15 | 
 16 | if ENABLE_DUPLICATION:
 17 |     @mcp_server.tool()
 18 |     @meta_api_tool
 19 |     async def duplicate_campaign(
 20 |         campaign_id: str,
 21 |         access_token: Optional[str] = None,
 22 |         name_suffix: Optional[str] = " - Copy",
 23 |         include_ad_sets: bool = True,
 24 |         include_ads: bool = True,
 25 |         include_creatives: bool = True,
 26 |         copy_schedule: bool = False,
 27 |         new_daily_budget: Optional[float] = None,
 28 |         new_status: Optional[str] = "PAUSED"
 29 |     ) -> str:
 30 |         """
 31 |         Duplicate a Meta Ads campaign with all its ad sets and ads.
 32 | 
 33 |         Recommended: Use this to run robust experiments.
 34 |         
 35 |         Args:
 36 |             campaign_id: Meta Ads campaign ID to duplicate
 37 |             name_suffix: Suffix to add to the duplicated campaign name
 38 |             include_ad_sets: Whether to duplicate ad sets within the campaign
 39 |             include_ads: Whether to duplicate ads within ad sets
 40 |             include_creatives: Whether to duplicate ad creatives
 41 |             copy_schedule: Whether to copy the campaign schedule
 42 |             new_daily_budget: Override the daily budget for the new campaign
 43 |             new_status: Status for the new campaign (ACTIVE or PAUSED)
 44 |         """
 45 |         return await _forward_duplication_request(
 46 |             "campaign",
 47 |             campaign_id,
 48 |             access_token,
 49 |             {
 50 |                 "name_suffix": name_suffix,
 51 |                 "include_ad_sets": include_ad_sets,
 52 |                 "include_ads": include_ads,
 53 |                 "include_creatives": include_creatives,
 54 |                 "copy_schedule": copy_schedule,
 55 |                 "new_daily_budget": new_daily_budget,
 56 |                 "new_status": new_status
 57 |             }
 58 |         )
 59 | 
 60 |     @mcp_server.tool()
 61 |     @meta_api_tool
 62 |     async def duplicate_adset(
 63 |         adset_id: str,
 64 |         access_token: Optional[str] = None,
 65 |         target_campaign_id: Optional[str] = None,
 66 |         name_suffix: Optional[str] = " - Copy",
 67 |         include_ads: bool = True,
 68 |         include_creatives: bool = True,
 69 |         new_daily_budget: Optional[float] = None,
 70 |         new_targeting: Optional[Dict[str, Any]] = None,
 71 |         new_status: Optional[str] = "PAUSED"
 72 |     ) -> str:
 73 |         """
 74 |         Duplicate a Meta Ads ad set with its ads.
 75 | 
 76 |         Recommended: Use this to run robust experiments.
 77 |         
 78 |         Args:
 79 |             adset_id: Meta Ads ad set ID to duplicate
 80 |             target_campaign_id: Campaign ID to move the duplicated ad set to (optional)
 81 |             name_suffix: Suffix to add to the duplicated ad set name
 82 |             include_ads: Whether to duplicate ads within the ad set
 83 |             include_creatives: Whether to duplicate ad creatives
 84 |             new_daily_budget: Override the daily budget for the new ad set
 85 |             new_targeting: Override targeting settings for the new ad set
 86 |             new_status: Status for the new ad set (ACTIVE or PAUSED)
 87 |         """
 88 |         return await _forward_duplication_request(
 89 |             "adset",
 90 |             adset_id,
 91 |             access_token,
 92 |             {
 93 |                 "target_campaign_id": target_campaign_id,
 94 |                 "name_suffix": name_suffix,
 95 |                 "include_ads": include_ads,
 96 |                 "include_creatives": include_creatives,
 97 |                 "new_daily_budget": new_daily_budget,
 98 |                 "new_targeting": new_targeting,
 99 |                 "new_status": new_status
100 |             }
101 |         )
102 | 
103 |     @mcp_server.tool()
104 |     @meta_api_tool
105 |     async def duplicate_ad(
106 |         ad_id: str,
107 |         access_token: Optional[str] = None,
108 |         target_adset_id: Optional[str] = None,
109 |         name_suffix: Optional[str] = " - Copy",
110 |         duplicate_creative: bool = True,
111 |         new_creative_name: Optional[str] = None,
112 |         new_status: Optional[str] = "PAUSED"
113 |     ) -> str:
114 |         """
115 |         Duplicate a Meta Ads ad.
116 | 
117 |         Recommended: Use this to run robust experiments.
118 |         
119 |         Args:
120 |             ad_id: Meta Ads ad ID to duplicate
121 |             target_adset_id: Ad set ID to move the duplicated ad to (optional)
122 |             name_suffix: Suffix to add to the duplicated ad name
123 |             duplicate_creative: Whether to duplicate the ad creative
124 |             new_creative_name: Override name for the duplicated creative
125 |             new_status: Status for the new ad (ACTIVE or PAUSED)
126 |         """
127 |         return await _forward_duplication_request(
128 |             "ad",
129 |             ad_id,
130 |             access_token,
131 |             {
132 |                 "target_adset_id": target_adset_id,
133 |                 "name_suffix": name_suffix,
134 |                 "duplicate_creative": duplicate_creative,
135 |                 "new_creative_name": new_creative_name,
136 |                 "new_status": new_status
137 |             }
138 |         )
139 | 
140 |     @mcp_server.tool()
141 |     @meta_api_tool
142 |     async def duplicate_creative(
143 |         creative_id: str,
144 |         access_token: Optional[str] = None,
145 |         name_suffix: Optional[str] = " - Copy",
146 |         new_primary_text: Optional[str] = None,
147 |         new_headline: Optional[str] = None,
148 |         new_description: Optional[str] = None,
149 |         new_cta_type: Optional[str] = None,
150 |         new_destination_url: Optional[str] = None
151 |     ) -> str:
152 |         """
153 |         Duplicate a Meta Ads creative.
154 | 
155 |         Recommended: Use this to run robust experiments.
156 |         
157 |         Args:
158 |             creative_id: Meta Ads creative ID to duplicate
159 |             name_suffix: Suffix to add to the duplicated creative name
160 |             new_primary_text: Override the primary text for the new creative
161 |             new_headline: Override the headline for the new creative
162 |             new_description: Override the description for the new creative
163 |             new_cta_type: Override the call-to-action type for the new creative
164 |             new_destination_url: Override the destination URL for the new creative
165 |         """
166 |         return await _forward_duplication_request(
167 |             "creative",
168 |             creative_id,
169 |             access_token,
170 |             {
171 |                 "name_suffix": name_suffix,
172 |                 "new_primary_text": new_primary_text,
173 |                 "new_headline": new_headline,
174 |                 "new_description": new_description,
175 |                 "new_cta_type": new_cta_type,
176 |                 "new_destination_url": new_destination_url
177 |             }
178 |         )
179 | 
180 | 
181 | async def _forward_duplication_request(resource_type: str, resource_id: str, access_token: str, options: Dict[str, Any]) -> str:
182 |     """
183 |     Forward duplication request to the cloud-hosted MCP API using dual-header authentication.
184 |     
185 |     This implements the dual-header authentication pattern for MCP server callbacks:
186 |     - Authorization: Bearer <facebook_token> - Facebook access token for Meta API calls
187 |     - X-Pipeboard-Token: <pipeboard_token> - Pipeboard API token for authentication
188 |     
189 |     Args:
190 |         resource_type: Type of resource to duplicate (campaign, adset, ad, creative)
191 |         resource_id: ID of the resource to duplicate
192 |         access_token: Meta API access token (optional, will use context if not provided)
193 |         options: Duplication options
194 |     """
195 |     try:
196 |         # Get tokens from the request context that were set by the HTTP auth middleware
197 |         # In the dual-header authentication pattern:
198 |         # - Pipeboard token comes from X-Pipeboard-Token header (for authentication)
199 |         # - Facebook token comes from Authorization header (for Meta API calls)
200 |         
201 |         # Get tokens from context set by AuthInjectionMiddleware
202 |         pipeboard_token = FastMCPAuthIntegration.get_pipeboard_token()
203 |         facebook_token = FastMCPAuthIntegration.get_auth_token()
204 |         
205 |         # Use provided access_token parameter if no Facebook token found in context
206 |         if not facebook_token:
207 |             facebook_token = access_token if access_token else await auth.get_current_access_token()
208 |         
209 |         # Validate we have both required tokens
210 |         if not pipeboard_token:
211 |             return json.dumps({
212 |                 "error": "authentication_required",
213 |                 "message": "Pipeboard API token not found",
214 |                 "details": {
215 |                     "required": "Valid Pipeboard token via X-Pipeboard-Token header",
216 |                     "received_headers": "Check that the MCP server is forwarding the X-Pipeboard-Token header"
217 |                 }
218 |             }, indent=2)
219 |             
220 |         if not facebook_token:
221 |             return json.dumps({
222 |                 "error": "authentication_required",
223 |                 "message": "Meta Ads access token not found",
224 |                 "details": {
225 |                     "required": "Valid Meta access token from authenticated session",
226 |                     "check": "Ensure Facebook account is connected and token is valid"
227 |                 }
228 |             }, indent=2)
229 | 
230 |         # Construct the API endpoint
231 |         base_url = "https://mcp.pipeboard.co"
232 |         endpoint = f"{base_url}/api/meta/duplicate/{resource_type}/{resource_id}"
233 |         
234 |         # Prepare the dual-header authentication as per API documentation
235 |         headers = {
236 |             "Authorization": f"Bearer {facebook_token}",  # Facebook token for Meta API
237 |             "X-Pipeboard-Token": pipeboard_token,         # Pipeboard token for auth
238 |             "Content-Type": "application/json",
239 |             "User-Agent": "meta-ads-mcp/1.0"
240 |         }
241 |         
242 |         # Remove None values from options
243 |         clean_options = {k: v for k, v in options.items() if v is not None}
244 |         
245 |         # Make the request to the cloud service
246 |         async with httpx.AsyncClient(timeout=30.0) as client:
247 |             response = await client.post(
248 |                 endpoint,
249 |                 headers=headers,
250 |                 json=clean_options
251 |             )
252 |             
253 |             if response.status_code == 200:
254 |                 result = response.json()
255 |                 return json.dumps(result, indent=2)
256 |             elif response.status_code == 400:
257 |                 # Validation failed
258 |                 try:
259 |                     error_data = response.json()
260 |                     return json.dumps({
261 |                         "success": False,
262 |                         "error": "validation_failed",
263 |                         "errors": error_data.get("errors", [response.text]),
264 |                         "warnings": error_data.get("warnings", [])
265 |                     }, indent=2)
266 |                 except:
267 |                     return json.dumps({
268 |                         "success": False,
269 |                         "error": "validation_failed",
270 |                         "errors": [response.text],
271 |                         "warnings": []
272 |                     }, indent=2)
273 |             elif response.status_code == 401:
274 |                 return json.dumps({
275 |                     "success": False,
276 |                     "error": "authentication_error",
277 |                     "message": "Invalid or expired API token"
278 |                 }, indent=2)
279 |             elif response.status_code == 402:
280 |                 try:
281 |                     error_data = response.json()
282 |                     return json.dumps({
283 |                         "success": False,
284 |                         "error": "subscription_required",
285 |                         "message": error_data.get("message", "This feature is not available in your current plan"),
286 |                         "upgrade_url": error_data.get("upgrade_url", "https://pipeboard.co/upgrade"),
287 |                         "suggestion": error_data.get("suggestion", "Please upgrade your account to access this feature")
288 |                     }, indent=2)
289 |                 except:
290 |                     return json.dumps({
291 |                         "success": False,
292 |                         "error": "subscription_required",
293 |                         "message": "This feature is not available in your current plan",
294 |                         "upgrade_url": "https://pipeboard.co/upgrade",
295 |                         "suggestion": "Please upgrade your account to access this feature"
296 |                     }, indent=2)
297 |             elif response.status_code == 403:
298 |                 try:
299 |                     error_data = response.json()
300 |                     # Check if this is a premium feature error
301 |                     if error_data.get("error") == "premium_feature":
302 |                         return json.dumps({
303 |                             "success": False,
304 |                             "error": "premium_feature_required",
305 |                             "message": error_data.get("message", "This is a premium feature that requires subscription"),
306 |                             "details": error_data.get("details", {
307 |                                 "upgrade_url": "https://pipeboard.co/upgrade",
308 |                                 "suggestion": "Please upgrade your account to access this feature"
309 |                             })
310 |                         }, indent=2)
311 |                     else:
312 |                         # Default to facebook connection required
313 |                         return json.dumps({
314 |                             "success": False,
315 |                             "error": "facebook_connection_required",
316 |                             "message": error_data.get("message", "You need to connect your Facebook account first"),
317 |                             "details": error_data.get("details", {
318 |                                 "login_flow_url": "/connections",
319 |                                 "auth_flow_url": "/api/meta/auth"
320 |                             })
321 |                         }, indent=2)
322 |                 except:
323 |                     return json.dumps({
324 |                         "success": False,
325 |                         "error": "facebook_connection_required",
326 |                         "message": "You need to connect your Facebook account first",
327 |                         "details": {
328 |                             "login_flow_url": "/connections",
329 |                             "auth_flow_url": "/api/meta/auth"
330 |                         }
331 |                     }, indent=2)
332 |             elif response.status_code == 404:
333 |                 return json.dumps({
334 |                     "success": False,
335 |                     "error": "resource_not_found",
336 |                     "message": f"{resource_type.title()} not found or access denied",
337 |                     "suggestion": f"Verify the {resource_type} ID and your Facebook account permissions"
338 |                 }, indent=2)
339 |             elif response.status_code == 429:
340 |                 return json.dumps({
341 |                     "error": "rate_limit_exceeded", 
342 |                     "message": "Meta API rate limit exceeded",
343 |                     "details": {
344 |                         "suggestion": "Please wait before retrying",
345 |                         "retry_after": response.headers.get("Retry-After", "60")
346 |                     }
347 |                 }, indent=2)
348 |             elif response.status_code == 502:
349 |                 try:
350 |                     error_data = response.json()
351 |                     return json.dumps({
352 |                         "success": False,
353 |                         "error": "meta_api_error",
354 |                         "message": error_data.get("message", "Facebook API error"),
355 |                         "recoverable": True,
356 |                         "suggestion": "Please wait 5 minutes before retrying"
357 |                     }, indent=2)
358 |                 except:
359 |                     return json.dumps({
360 |                         "success": False,
361 |                         "error": "meta_api_error",
362 |                         "message": "Facebook API error",
363 |                         "recoverable": True,
364 |                         "suggestion": "Please wait 5 minutes before retrying"
365 |                     }, indent=2)
366 |             else:
367 |                 error_detail = response.text
368 |                 try:
369 |                     error_json = response.json()
370 |                     error_detail = error_json.get("message", error_detail)
371 |                 except:
372 |                     pass
373 |                 
374 |                 return json.dumps({
375 |                     "error": "duplication_failed",
376 |                     "message": f"Failed to duplicate {resource_type}",
377 |                     "details": {
378 |                         "status_code": response.status_code,
379 |                         "error_detail": error_detail,
380 |                         "resource_type": resource_type,
381 |                         "resource_id": resource_id
382 |                     }
383 |                 }, indent=2)
384 |     
385 |     except httpx.TimeoutException:
386 |         return json.dumps({
387 |             "error": "request_timeout",
388 |             "message": "Request to duplication service timed out",
389 |             "details": {
390 |                 "suggestion": "Please try again later",
391 |                 "timeout": "30 seconds"
392 |             }
393 |         }, indent=2)
394 |     
395 |     except httpx.RequestError as e:
396 |         return json.dumps({
397 |             "error": "network_error", 
398 |             "message": "Failed to connect to duplication service",
399 |             "details": {
400 |                 "error": str(e),
401 |                 "suggestion": "Check your internet connection and try again"
402 |             }
403 |         }, indent=2)
404 |     
405 |     except Exception as e:
406 |         return json.dumps({
407 |             "error": "unexpected_error",
408 |             "message": f"Unexpected error during {resource_type} duplication",
409 |             "details": {
410 |                 "error": str(e),
411 |                 "resource_type": resource_type,
412 |                 "resource_id": resource_id
413 |             }
414 |         }, indent=2)
415 | 
416 | 
417 | def _get_estimated_components(resource_type: str, options: Dict[str, Any]) -> Dict[str, Any]:
418 |     """Get estimated components that would be duplicated."""
419 |     if resource_type == "campaign":
420 |         components = {"campaigns": 1}
421 |         if options.get("include_ad_sets", True):
422 |             components["ad_sets"] = "3-5 (estimated)"
423 |         if options.get("include_ads", True):
424 |             components["ads"] = "5-15 (estimated)"
425 |         if options.get("include_creatives", True):
426 |             components["creatives"] = "5-15 (estimated)"
427 |         return components
428 |     elif resource_type == "adset":
429 |         components = {"ad_sets": 1}
430 |         if options.get("include_ads", True):
431 |             components["ads"] = "2-5 (estimated)"
432 |         if options.get("include_creatives", True):
433 |             components["creatives"] = "2-5 (estimated)"
434 |         return components
435 |     elif resource_type == "ad":
436 |         components = {"ads": 1}
437 |         if options.get("duplicate_creative", True):
438 |             components["creatives"] = 1
439 |         return components
440 |     elif resource_type == "creative":
441 |         return {"creatives": 1}
442 |     
443 |     return {} 
```
Page 3/6FirstPrevNextLast