#
tokens: 46692/50000 8/82 files (page 4/6)
lines: on (toggle) GitHub
raw markdown copy reset
This is page 4 of 6. Use http://codebase.md/nictuku/meta-ads-mcp?lines=true&page={x} to view the full context.

# Directory Structure

```
├── .github
│   └── workflows
│       ├── publish-mcp.yml
│       ├── publish.yml
│       └── test.yml
├── .gitignore
├── .python-version
├── .uv.toml
├── CUSTOM_META_APP.md
├── Dockerfile
├── examples
│   ├── example_http_client.py
│   └── README.md
├── future_improvements.md
├── images
│   └── meta-ads-example.png
├── LICENSE
├── LOCAL_INSTALLATION.md
├── meta_ads_auth.sh
├── meta_ads_mcp
│   ├── __init__.py
│   ├── __main__.py
│   └── core
│       ├── __init__.py
│       ├── accounts.py
│       ├── ads_library.py
│       ├── ads.py
│       ├── adsets.py
│       ├── api.py
│       ├── auth.py
│       ├── authentication.py
│       ├── budget_schedules.py
│       ├── callback_server.py
│       ├── campaigns.py
│       ├── duplication.py
│       ├── http_auth_integration.py
│       ├── insights.py
│       ├── openai_deep_research.py
│       ├── pipeboard_auth.py
│       ├── reports.py
│       ├── resources.py
│       ├── server.py
│       ├── targeting.py
│       └── utils.py
├── META_API_NOTES.md
├── poetry.lock
├── pyproject.toml
├── README.md
├── RELEASE.md
├── requirements.txt
├── server.json
├── setup.py
├── smithery.yaml
├── STREAMABLE_HTTP_SETUP.md
└── tests
    ├── __init__.py
    ├── conftest.py
    ├── e2e_account_info_search_issue.py
    ├── README_REGRESSION_TESTS.md
    ├── README.md
    ├── test_account_info_access_fix.py
    ├── test_account_search.py
    ├── test_budget_update_e2e.py
    ├── test_budget_update.py
    ├── test_create_ad_creative_simple.py
    ├── test_create_simple_creative_e2e.py
    ├── test_dsa_beneficiary.py
    ├── test_dsa_integration.py
    ├── test_duplication_regression.py
    ├── test_duplication.py
    ├── test_dynamic_creatives.py
    ├── test_estimate_audience_size_e2e.py
    ├── test_estimate_audience_size.py
    ├── test_get_account_pages.py
    ├── test_get_ad_creatives_fix.py
    ├── test_get_ad_image_quality_improvements.py
    ├── test_get_ad_image_regression.py
    ├── test_http_transport.py
    ├── test_insights_actions_and_values_e2e.py
    ├── test_insights_pagination.py
    ├── test_integration_openai_mcp.py
    ├── test_is_dynamic_creative_adset.py
    ├── test_mobile_app_adset_creation.py
    ├── test_mobile_app_adset_issue.py
    ├── test_openai_mcp_deep_research.py
    ├── test_openai.py
    ├── test_page_discovery_integration.py
    ├── test_page_discovery.py
    ├── test_targeting_search_e2e.py
    ├── test_targeting.py
    ├── test_update_ad_creative_id.py
    └── test_upload_ad_image.py
```

# Files

--------------------------------------------------------------------------------
/tests/test_get_ad_image_quality_improvements.py:
--------------------------------------------------------------------------------

```python
  1 | """Tests for get_ad_image quality improvements.
  2 | 
  3 | These tests verify that the get_ad_image function now correctly prioritizes
  4 | high-quality ad creative images over profile thumbnails.
  5 | 
  6 | Key improvements tested:
  7 | 1. Prioritizes image_urls_for_viewing over thumbnail_url
  8 | 2. Uses image_url as second priority
  9 | 3. Uses object_story_spec.link_data.picture as third priority
 10 | 4. Only uses thumbnail_url as last resort
 11 | 5. Better logging to show which image source is being used
 12 | """
 13 | 
 14 | import pytest
 15 | import json
 16 | from unittest.mock import AsyncMock, patch, MagicMock
 17 | from meta_ads_mcp.core.ads import get_ad_image
 18 | from meta_ads_mcp.core.utils import extract_creative_image_urls
 19 | 
 20 | 
 21 | class TestGetAdImageQualityImprovements:
 22 |     """Test cases for image quality improvements in get_ad_image function."""
 23 |     
 24 |     @pytest.mark.asyncio
 25 |     async def test_prioritizes_image_urls_for_viewing_over_thumbnail(self):
 26 |         """Test that image_urls_for_viewing is prioritized over thumbnail_url."""
 27 |         
 28 |         # Mock responses for creative with both high-quality and thumbnail URLs
 29 |         mock_ad_data = {
 30 |             "account_id": "act_123456789",
 31 |             "creative": {"id": "creative_123456789"}
 32 |         }
 33 |         
 34 |         mock_creative_details = {
 35 |             "id": "creative_123456789",
 36 |             "name": "Test Creative"
 37 |             # No image_hash - triggers fallback
 38 |         }
 39 |         
 40 |         # Mock get_ad_creatives response with both URLs
 41 |         mock_get_ad_creatives_response = json.dumps({
 42 |             "data": [
 43 |                 {
 44 |                     "id": "creative_123456789",
 45 |                     "name": "Test Creative",
 46 |                     "status": "ACTIVE",
 47 |                     "thumbnail_url": "https://example.com/thumbnail_64x64.jpg",  # Low quality
 48 |                     "image_url": "https://example.com/full_image.jpg",  # Medium quality
 49 |                     "image_urls_for_viewing": [
 50 |                         "https://example.com/high_quality_image.jpg",  # Highest quality
 51 |                         "https://example.com/alt_high_quality.jpg"
 52 |                     ],
 53 |                     "object_story_spec": {
 54 |                         "link_data": {
 55 |                             "picture": "https://example.com/object_story_picture.jpg"
 56 |                         }
 57 |                     }
 58 |                 }
 59 |             ]
 60 |         })
 61 |         
 62 |         # Mock PIL Image processing
 63 |         mock_pil_image = MagicMock()
 64 |         mock_pil_image.mode = "RGB"
 65 |         mock_pil_image.convert.return_value = mock_pil_image
 66 |         
 67 |         mock_byte_stream = MagicMock()
 68 |         mock_byte_stream.getvalue.return_value = b"fake_jpeg_data"
 69 |         
 70 |         with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api, \
 71 |              patch('meta_ads_mcp.core.ads.get_ad_creatives', new_callable=AsyncMock) as mock_get_creatives, \
 72 |              patch('meta_ads_mcp.core.ads.download_image', new_callable=AsyncMock) as mock_download, \
 73 |              patch('meta_ads_mcp.core.ads.PILImage.open') as mock_pil_open, \
 74 |              patch('meta_ads_mcp.core.ads.io.BytesIO') as mock_bytesio:
 75 |             
 76 |             mock_api.side_effect = [mock_ad_data, mock_creative_details]
 77 |             mock_get_creatives.return_value = mock_get_ad_creatives_response
 78 |             mock_download.return_value = b"fake_image_bytes"
 79 |             mock_pil_open.return_value = mock_pil_image
 80 |             mock_bytesio.return_value = mock_byte_stream
 81 |             
 82 |             # This should prioritize image_urls_for_viewing[0]
 83 |             result = await get_ad_image(access_token="test_token", ad_id="test_ad_id")
 84 |             
 85 |             # Verify it used the highest quality URL
 86 |             assert result is not None
 87 |             mock_download.assert_called_once_with("https://example.com/high_quality_image.jpg")
 88 |     
 89 |     @pytest.mark.asyncio
 90 |     async def test_falls_back_to_image_url_when_image_urls_for_viewing_unavailable(self):
 91 |         """Test fallback to image_url when image_urls_for_viewing is not available."""
 92 |         
 93 |         # Mock responses for creative without image_urls_for_viewing
 94 |         mock_ad_data = {
 95 |             "account_id": "act_123456789",
 96 |             "creative": {"id": "creative_123456789"}
 97 |         }
 98 |         
 99 |         mock_creative_details = {
100 |             "id": "creative_123456789",
101 |             "name": "Test Creative"
102 |         }
103 |         
104 |         # Mock get_ad_creatives response without image_urls_for_viewing
105 |         mock_get_ad_creatives_response = json.dumps({
106 |             "data": [
107 |                 {
108 |                     "id": "creative_123456789",
109 |                     "name": "Test Creative",
110 |                     "status": "ACTIVE",
111 |                     "thumbnail_url": "https://example.com/thumbnail.jpg",
112 |                     "image_url": "https://example.com/full_image.jpg",  # Should be used
113 |                     "object_story_spec": {
114 |                         "link_data": {
115 |                             "picture": "https://example.com/object_story_picture.jpg"
116 |                         }
117 |                     }
118 |                 }
119 |             ]
120 |         })
121 |         
122 |         # Mock PIL Image processing
123 |         mock_pil_image = MagicMock()
124 |         mock_pil_image.mode = "RGB"
125 |         mock_pil_image.convert.return_value = mock_pil_image
126 |         
127 |         mock_byte_stream = MagicMock()
128 |         mock_byte_stream.getvalue.return_value = b"fake_jpeg_data"
129 |         
130 |         with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api, \
131 |              patch('meta_ads_mcp.core.ads.get_ad_creatives', new_callable=AsyncMock) as mock_get_creatives, \
132 |              patch('meta_ads_mcp.core.ads.download_image', new_callable=AsyncMock) as mock_download, \
133 |              patch('meta_ads_mcp.core.ads.PILImage.open') as mock_pil_open, \
134 |              patch('meta_ads_mcp.core.ads.io.BytesIO') as mock_bytesio:
135 |             
136 |             mock_api.side_effect = [mock_ad_data, mock_creative_details]
137 |             mock_get_creatives.return_value = mock_get_ad_creatives_response
138 |             mock_download.return_value = b"fake_image_bytes"
139 |             mock_pil_open.return_value = mock_pil_image
140 |             mock_bytesio.return_value = mock_byte_stream
141 |             
142 |             # This should fall back to image_url
143 |             result = await get_ad_image(access_token="test_token", ad_id="test_ad_id")
144 |             
145 |             # Verify it used image_url
146 |             assert result is not None
147 |             mock_download.assert_called_once_with("https://example.com/full_image.jpg")
148 |     
149 |     @pytest.mark.asyncio
150 |     async def test_falls_back_to_object_story_spec_picture_when_image_url_unavailable(self):
151 |         """Test fallback to object_story_spec.link_data.picture when image_url is not available."""
152 |         
153 |         # Mock responses for creative without image_url
154 |         mock_ad_data = {
155 |             "account_id": "act_123456789",
156 |             "creative": {"id": "creative_123456789"}
157 |         }
158 |         
159 |         mock_creative_details = {
160 |             "id": "creative_123456789",
161 |             "name": "Test Creative"
162 |         }
163 |         
164 |         # Mock get_ad_creatives response without image_url
165 |         mock_get_ad_creatives_response = json.dumps({
166 |             "data": [
167 |                 {
168 |                     "id": "creative_123456789",
169 |                     "name": "Test Creative",
170 |                     "status": "ACTIVE",
171 |                     "thumbnail_url": "https://example.com/thumbnail.jpg",
172 |                     "object_story_spec": {
173 |                         "link_data": {
174 |                             "picture": "https://example.com/object_story_picture.jpg"  # Should be used
175 |                         }
176 |                     }
177 |                 }
178 |             ]
179 |         })
180 |         
181 |         # Mock PIL Image processing
182 |         mock_pil_image = MagicMock()
183 |         mock_pil_image.mode = "RGB"
184 |         mock_pil_image.convert.return_value = mock_pil_image
185 |         
186 |         mock_byte_stream = MagicMock()
187 |         mock_byte_stream.getvalue.return_value = b"fake_jpeg_data"
188 |         
189 |         with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api, \
190 |              patch('meta_ads_mcp.core.ads.get_ad_creatives', new_callable=AsyncMock) as mock_get_creatives, \
191 |              patch('meta_ads_mcp.core.ads.download_image', new_callable=AsyncMock) as mock_download, \
192 |              patch('meta_ads_mcp.core.ads.PILImage.open') as mock_pil_open, \
193 |              patch('meta_ads_mcp.core.ads.io.BytesIO') as mock_bytesio:
194 |             
195 |             mock_api.side_effect = [mock_ad_data, mock_creative_details]
196 |             mock_get_creatives.return_value = mock_get_ad_creatives_response
197 |             mock_download.return_value = b"fake_image_bytes"
198 |             mock_pil_open.return_value = mock_pil_image
199 |             mock_bytesio.return_value = mock_byte_stream
200 |             
201 |             # This should fall back to object_story_spec.link_data.picture
202 |             result = await get_ad_image(access_token="test_token", ad_id="test_ad_id")
203 |             
204 |             # Verify it used object_story_spec.link_data.picture
205 |             assert result is not None
206 |             mock_download.assert_called_once_with("https://example.com/object_story_picture.jpg")
207 |     
208 |     @pytest.mark.asyncio
209 |     async def test_uses_thumbnail_url_only_as_last_resort(self):
210 |         """Test that thumbnail_url is only used when no other options are available."""
211 |         
212 |         # Mock responses for creative with only thumbnail_url
213 |         mock_ad_data = {
214 |             "account_id": "act_123456789",
215 |             "creative": {"id": "creative_123456789"}
216 |         }
217 |         
218 |         mock_creative_details = {
219 |             "id": "creative_123456789",
220 |             "name": "Test Creative"
221 |         }
222 |         
223 |         # Mock get_ad_creatives response with only thumbnail_url
224 |         mock_get_ad_creatives_response = json.dumps({
225 |             "data": [
226 |                 {
227 |                     "id": "creative_123456789",
228 |                     "name": "Test Creative",
229 |                     "status": "ACTIVE",
230 |                     "thumbnail_url": "https://example.com/thumbnail_only.jpg"  # Only option
231 |                 }
232 |             ]
233 |         })
234 |         
235 |         # Mock PIL Image processing
236 |         mock_pil_image = MagicMock()
237 |         mock_pil_image.mode = "RGB"
238 |         mock_pil_image.convert.return_value = mock_pil_image
239 |         
240 |         mock_byte_stream = MagicMock()
241 |         mock_byte_stream.getvalue.return_value = b"fake_jpeg_data"
242 |         
243 |         with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api, \
244 |              patch('meta_ads_mcp.core.ads.get_ad_creatives', new_callable=AsyncMock) as mock_get_creatives, \
245 |              patch('meta_ads_mcp.core.ads.download_image', new_callable=AsyncMock) as mock_download, \
246 |              patch('meta_ads_mcp.core.ads.PILImage.open') as mock_pil_open, \
247 |              patch('meta_ads_mcp.core.ads.io.BytesIO') as mock_bytesio:
248 |             
249 |             mock_api.side_effect = [mock_ad_data, mock_creative_details]
250 |             mock_get_creatives.return_value = mock_get_ad_creatives_response
251 |             mock_download.return_value = b"fake_image_bytes"
252 |             mock_pil_open.return_value = mock_pil_image
253 |             mock_bytesio.return_value = mock_byte_stream
254 |             
255 |             # This should use thumbnail_url as last resort
256 |             result = await get_ad_image(access_token="test_token", ad_id="test_ad_id")
257 |             
258 |             # Verify it used thumbnail_url
259 |             assert result is not None
260 |             mock_download.assert_called_once_with("https://example.com/thumbnail_only.jpg")
261 |     
262 |     def test_extract_creative_image_urls_prioritizes_quality(self):
263 |         """Test that extract_creative_image_urls correctly prioritizes image quality."""
264 |         
265 |         # Test creative with multiple image URLs
266 |         test_creative = {
267 |             "id": "creative_123456789",
268 |             "name": "Test Creative",
269 |             "thumbnail_url": "https://example.com/thumbnail.jpg",  # Lowest priority
270 |             "image_url": "https://example.com/image.jpg",  # Medium priority
271 |             "image_urls_for_viewing": [
272 |                 "https://example.com/high_quality_1.jpg",  # Highest priority
273 |                 "https://example.com/high_quality_2.jpg"
274 |             ],
275 |             "object_story_spec": {
276 |                 "link_data": {
277 |                     "picture": "https://example.com/object_story_picture.jpg"  # High priority
278 |                 }
279 |             }
280 |         }
281 |         
282 |         # Extract URLs
283 |         urls = extract_creative_image_urls(test_creative)
284 |         
285 |         # Verify correct priority order
286 |         assert len(urls) >= 4
287 |         assert urls[0] == "https://example.com/high_quality_1.jpg"  # First priority
288 |         assert urls[1] == "https://example.com/high_quality_2.jpg"  # Second priority
289 |         assert "https://example.com/image.jpg" in urls  # Medium priority
290 |         assert "https://example.com/object_story_picture.jpg" in urls  # High priority
291 |         assert urls[-1] == "https://example.com/thumbnail.jpg"  # Last priority
292 |     
293 |     def test_extract_creative_image_urls_handles_missing_fields(self):
294 |         """Test that extract_creative_image_urls handles missing fields gracefully."""
295 |         
296 |         # Test creative with minimal fields
297 |         test_creative = {
298 |             "id": "creative_123456789",
299 |             "name": "Minimal Creative",
300 |             "thumbnail_url": "https://example.com/thumbnail.jpg"
301 |         }
302 |         
303 |         # Extract URLs
304 |         urls = extract_creative_image_urls(test_creative)
305 |         
306 |         # Should still work with only thumbnail_url
307 |         assert len(urls) == 1
308 |         assert urls[0] == "https://example.com/thumbnail.jpg"
309 |     
310 |     def test_extract_creative_image_urls_removes_duplicates(self):
311 |         """Test that extract_creative_image_urls removes duplicate URLs."""
312 |         
313 |         # Test creative with duplicate URLs
314 |         test_creative = {
315 |             "id": "creative_123456789",
316 |             "name": "Duplicate URLs Creative",
317 |             "thumbnail_url": "https://example.com/same_url.jpg",
318 |             "image_url": "https://example.com/same_url.jpg",  # Duplicate
319 |             "image_urls_for_viewing": [
320 |                 "https://example.com/same_url.jpg",  # Duplicate
321 |                 "https://example.com/unique_url.jpg"
322 |             ]
323 |         }
324 |         
325 |         # Extract URLs
326 |         urls = extract_creative_image_urls(test_creative)
327 |         
328 |         # Should remove duplicates while preserving order
329 |         assert len(urls) == 2
330 |         assert urls[0] == "https://example.com/same_url.jpg"  # First occurrence
331 |         assert urls[1] == "https://example.com/unique_url.jpg"
332 |     
333 |     @pytest.mark.asyncio
334 |     async def test_get_ad_image_with_real_world_example(self):
335 |         """Test with a real-world example that mimics the actual API response structure."""
336 |         
337 |         # Mock responses based on real API data
338 |         mock_ad_data = {
339 |             "account_id": "act_15975950",
340 |             "creative": {"id": "606995022142818"}
341 |         }
342 |         
343 |         mock_creative_details = {
344 |             "id": "606995022142818",
345 |             "name": "Test Creative"
346 |         }
347 |         
348 |         # Mock get_ad_creatives response based on real data
349 |         mock_get_ad_creatives_response = json.dumps({
350 |             "data": [
351 |                 {
352 |                     "id": "606995022142818",
353 |                     "name": "Test Creative",
354 |                     "status": "ACTIVE",
355 |                     "thumbnail_url": "https://external.fbsb6-1.fna.fbcdn.net/emg1/v/t13/13476424677788553381?url=https%3A%2F%2Fwww.facebook.com%2Fads%2Fimage%2F%3Fd%3DAQLuJ5l4AROBvIUchp4g4JXxIT5uAZiAsgHQkD8Iw7BeVtkXNUUfs3leWpqQplJCJdixVIg3mq9KichJ64eRfM-r8aY4GtVQp8TvS_HBByJ8fGg_Cs7Kb8YkN4IDwJ4iQIIkMx30LycCKzuYtp9M-vOk&fb_obo=1&utld=facebook.com&stp=c0.5000x0.5000f_dst-emg0_p64x64_q75_tt6&edm=AEuWsiQEAAAA&_nc_gid=_QBCRbZxDq-i1ZiGEXxW2w&_nc_eui2=AeEbQXzmAdoqWLIXjuTDJ0xAoThZu47BlQqhOFm7jsGVCloP48Ep6Y_qIA5tcqrcSDff5f_k8xGzFIpD7PnUws8c&_nc_oc=Adn3GeYlXxbfEeY0wCBSgNdlwO80wXt5R5bgY2NozdroZ6CRSaXIaOSjVSK9S1LsqsY4GL_0dVzU80RY8QMucEkZ&ccb=13-1&oh=06_Q3-1AcBKUD0rfLGATAveIM5hMSWG9c7DsJzq2arvOl8W4Bpn&oe=688C87B2&_nc_sid=58080a",
356 |                     "image_url": "https://scontent.fbsb6-1.fna.fbcdn.net/v/t45.1600-4/518574136_1116014047008737_2492837958169838537_n.png?stp=dst-jpg_tt6&_nc_cat=109&ccb=1-7&_nc_sid=890911&_nc_eui2=AeHbHqoiAUgF0QeX-tvUoDjYeTyJad_QEPF5PIlp39AQ8dP8cvOlHwiJjny8AUv7xxAlYyy5BGCqFU_oVM9CI7ln&_nc_ohc=VTTYlMOAWZoQ7kNvwGjLMW5&_nc_oc=AdnYDrpNrLovWZC_RG4tvoICGPjBNfzNJimhx-4SKW4BU2i_yzL00dX0-OiYEYokq394g8xR-1a-OuVDAm4HsSJy&_nc_zt=1&_nc_ht=scontent.fbsb6-1.fna&edm=AEuWsiQEAAAA&_nc_gid=_QBCRbZxDq-i1ZiGEXxW2w&oh=00_AfTujKmF365FnGgcokkkdWnK-vmnzQK8Icvlk0kB8SKM3g&oe=68906FC4",
357 |                     "image_urls_for_viewing": [
358 |                         "https://scontent.fbsb6-1.fna.fbcdn.net/v/t45.1600-4/518574136_1116014047008737_2492837958169838537_n.png?stp=dst-jpg_tt6&_nc_cat=109&ccb=1-7&_nc_sid=890911&_nc_eui2=AeHbHqoiAUgF0QeX-tvUoDjYeTyJad_QEPF5PIlp39AQ8dP8cvOlHwiJjny8AUv7xxAlYyy5BGCqFU_oVM9CI7ln&_nc_ohc=VTTYlMOAWZoQ7kNvwGjLMW5&_nc_oc=AdnYDrpNrLovWZC_RG4tvoICGPjBNfzNJimhx-4SKW4BU2i_yzL00dX0-OiYEYokq394g8xR-1a-OuVDAm4HsSJy&_nc_zt=1&_nc_ht=scontent.fbsb6-1.fna&edm=AEuWsiQEAAAA&_nc_gid=_QBCRbZxDq-i1ZiGEXxW2w&oh=00_AfTujKmF365FnGgcokkkdWnK-vmnzQK8Icvlk0kB8SKM3g&oe=68906FC4",
359 |                         "https://external.fbsb6-1.fna.fbcdn.net/emg1/v/t13/13476424677788553381?url=https%3A%2F%2Fwww.facebook.com%2Fads%2Fimage%2F%3Fd%3DAQLuJ5l4AROBvIUchp4g4JXxIT5uAZiAsgHQkD8Iw7BeVtkXNUUfs3leWpqQplJCJdixVIg3mq9KichJ64eRfM-r8aY4GtVQp8TvS_HBByJ8fGg_Cs7Kb8YkN4IDwJ4iQIIkMx30LycCKzuYtp9M-vOk&fb_obo=1&utld=facebook.com&stp=c0.5000x0.5000f_dst-emg0_p64x64_q75_tt6&edm=AEuWsiQEAAAA&_nc_gid=_QBCRbZxDq-i1ZiGEXxW2w&_nc_eui2=AeEbQXzmAdoqWLIXjuTDJ0xAoThZu47BlQqhOFm7jsGVCloP48Ep6Y_qIA5tcqrcSDff5f_k8xGzFIpD7PnUws8c&_nc_oc=Adn3GeYlXxbfEeY0wCBSgNdlwO80wXt5R5bgY2NozdroZ6CRSaXIaOSjVSK9S1LsqsY4GL_0dVzU80RY8QMucEkZ&ccb=13-1&oh=06_Q3-1AcBKUD0rfLGATAveIM5hMSWG9c7DsJzq2arvOl8W4Bpn&oe=688C87B2&_nc_sid=58080a"
360 |                     ]
361 |                 }
362 |             ]
363 |         })
364 |         
365 |         # Mock PIL Image processing
366 |         mock_pil_image = MagicMock()
367 |         mock_pil_image.mode = "RGB"
368 |         mock_pil_image.convert.return_value = mock_pil_image
369 |         
370 |         mock_byte_stream = MagicMock()
371 |         mock_byte_stream.getvalue.return_value = b"fake_jpeg_data"
372 |         
373 |         with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api, \
374 |              patch('meta_ads_mcp.core.ads.get_ad_creatives', new_callable=AsyncMock) as mock_get_creatives, \
375 |              patch('meta_ads_mcp.core.ads.download_image', new_callable=AsyncMock) as mock_download, \
376 |              patch('meta_ads_mcp.core.ads.PILImage.open') as mock_pil_open, \
377 |              patch('meta_ads_mcp.core.ads.io.BytesIO') as mock_bytesio:
378 |             
379 |             mock_api.side_effect = [mock_ad_data, mock_creative_details]
380 |             mock_get_creatives.return_value = mock_get_ad_creatives_response
381 |             mock_download.return_value = b"fake_image_bytes"
382 |             mock_pil_open.return_value = mock_pil_image
383 |             mock_bytesio.return_value = mock_byte_stream
384 |             
385 |             # This should use the first image_urls_for_viewing URL (high quality)
386 |             result = await get_ad_image(access_token="test_token", ad_id="test_ad_id")
387 |             
388 |             # Verify it used the high-quality URL (not the thumbnail)
389 |             assert result is not None
390 |             expected_url = "https://scontent.fbsb6-1.fna.fbcdn.net/v/t45.1600-4/518574136_1116014047008737_2492837958169838537_n.png?stp=dst-jpg_tt6&_nc_cat=109&ccb=1-7&_nc_sid=890911&_nc_eui2=AeHbHqoiAUgF0QeX-tvUoDjYeTyJad_QEPF5PIlp39AQ8dP8cvOlHwiJjny8AUv7xxAlYyy5BGCqFU_oVM9CI7ln&_nc_ohc=VTTYlMOAWZoQ7kNvwGjLMW5&_nc_oc=AdnYDrpNrLovWZC_RG4tvoICGPjBNfzNJimhx-4SKW4BU2i_yzL00dX0-OiYEYokq394g8xR-1a-OuVDAm4HsSJy&_nc_zt=1&_nc_ht=scontent.fbsb6-1.fna&edm=AEuWsiQEAAAA&_nc_gid=_QBCRbZxDq-i1ZiGEXxW2w&oh=00_AfTujKmF365FnGgcokkkdWnK-vmnzQK8Icvlk0kB8SKM3g&oe=68906FC4"
391 |             mock_download.assert_called_once_with(expected_url) 
```

--------------------------------------------------------------------------------
/tests/test_get_ad_image_regression.py:
--------------------------------------------------------------------------------

```python
  1 | """Regression tests for get_ad_image function fixes.
  2 | 
  3 | Tests for multiple issues that were fixed:
  4 | 
  5 | 1. JSON Parsing Error: 'TypeError: the JSON object must be str, bytes or bytearray, not dict'
  6 |    - Caused by wrong parameter order and incorrect JSON parsing
  7 |    - Fixed by correcting parameter order and JSON parsing logic
  8 | 
  9 | 2. Missing Image Hash Support: "Error: No image hashes found in creative"
 10 |    - Many modern creatives don't have image_hash but have direct URLs
 11 |    - Fixed by adding direct URL fallback using image_urls_for_viewing and thumbnail_url
 12 | 
 13 | 3. Image Quality Issue: Function was returning profile thumbnails instead of ad creative images
 14 |    - Fixed by prioritizing image_urls_for_viewing over thumbnail_url
 15 |    - Added proper fallback hierarchy: image_urls_for_viewing > image_url > object_story_spec.picture > thumbnail_url
 16 | 
 17 | The fixes enable get_ad_image to work with both traditional hash-based and modern URL-based creatives,
 18 | and ensure high-quality images are returned instead of thumbnails.
 19 | """
 20 | 
 21 | import pytest
 22 | import json
 23 | from unittest.mock import AsyncMock, patch, MagicMock
 24 | from meta_ads_mcp.core.ads import get_ad_image
 25 | 
 26 | 
 27 | @pytest.mark.asyncio
 28 | class TestGetAdImageRegressionFix:
 29 |     """Regression test cases for the get_ad_image JSON parsing bug fix."""
 30 |     
 31 |     async def test_get_ad_image_json_parsing_regression_fix(self):
 32 |         """Regression test: ensure get_ad_image doesn't throw JSON parsing error."""
 33 |         
 34 |         # Mock responses for the main API flow
 35 |         mock_ad_data = {
 36 |             "account_id": "act_123456789",
 37 |             "creative": {"id": "creative_123456789"}
 38 |         }
 39 |         
 40 |         mock_creative_details = {
 41 |             "id": "creative_123456789",
 42 |             "name": "Test Creative", 
 43 |             "image_hash": "test_hash_123"
 44 |         }
 45 |         
 46 |         mock_image_data = {
 47 |             "data": [{
 48 |                 "hash": "test_hash_123",
 49 |                 "url": "https://example.com/image.jpg",
 50 |                 "width": 1200,
 51 |                 "height": 628,
 52 |                 "name": "test_image.jpg",
 53 |                 "status": "ACTIVE"
 54 |             }]
 55 |         }
 56 |         
 57 |         # Mock PIL Image processing to return a valid Image object
 58 |         mock_pil_image = MagicMock()
 59 |         mock_pil_image.mode = "RGB"
 60 |         mock_pil_image.convert.return_value = mock_pil_image
 61 |         
 62 |         mock_byte_stream = MagicMock()
 63 |         mock_byte_stream.getvalue.return_value = b"fake_jpeg_data"
 64 |         
 65 |         with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api, \
 66 |              patch('meta_ads_mcp.core.ads.download_image', new_callable=AsyncMock) as mock_download, \
 67 |              patch('meta_ads_mcp.core.ads.PILImage.open') as mock_pil_open, \
 68 |              patch('meta_ads_mcp.core.ads.io.BytesIO') as mock_bytesio:
 69 |             
 70 |             mock_api.side_effect = [mock_ad_data, mock_creative_details, mock_image_data]
 71 |             mock_download.return_value = b"fake_image_bytes"
 72 |             mock_pil_open.return_value = mock_pil_image
 73 |             mock_bytesio.return_value = mock_byte_stream
 74 |             
 75 |             # This should NOT raise "the JSON object must be str, bytes or bytearray, not dict"
 76 |             # Previously this would fail with: TypeError: the JSON object must be str, bytes or bytearray, not dict
 77 |             result = await get_ad_image(access_token="test_token", ad_id="120228922871870272")
 78 |             
 79 |             # Verify we get an Image object (success) - the exact test depends on the mocking
 80 |             # The key is that we don't get the JSON parsing error
 81 |             assert result is not None
 82 |             
 83 |             # The main regression check: if we got here without an exception, the JSON parsing is fixed
 84 |             # We might get different results based on mocking, but the critical JSON parsing should work
 85 |             
 86 |     async def test_get_ad_image_fallback_path_json_parsing(self):
 87 |         """Test the fallback path that calls get_ad_creatives handles JSON parsing correctly."""
 88 |         
 89 |         # Mock responses that trigger the fallback path (no direct image hash)
 90 |         mock_ad_data = {
 91 |             "account_id": "act_123456789",
 92 |             "creative": {"id": "creative_123456789"}
 93 |         }
 94 |         
 95 |         mock_creative_details = {
 96 |             "id": "creative_123456789",
 97 |             "name": "Test Creative"
 98 |             # No image_hash - this will trigger the fallback
 99 |         }
100 |         
101 |         # Mock get_ad_creatives response (wrapped format that caused the original bug)
102 |         mock_get_ad_creatives_response = json.dumps({
103 |             "data": json.dumps({
104 |                 "data": [
105 |                     {
106 |                         "id": "creative_123456789",
107 |                         "name": "Test Creative",
108 |                         "object_story_spec": {
109 |                             "link_data": {
110 |                                 "image_hash": "fallback_hash_123"
111 |                             }
112 |                         }
113 |                     }
114 |                 ]
115 |             })
116 |         })
117 |         
118 |         mock_image_data = {
119 |             "data": [{
120 |                 "hash": "fallback_hash_123",
121 |                 "url": "https://example.com/fallback_image.jpg",
122 |                 "width": 1200,
123 |                 "height": 628
124 |             }]
125 |         }
126 |         
127 |         # Mock PIL Image processing
128 |         mock_pil_image = MagicMock()
129 |         mock_pil_image.mode = "RGB"
130 |         mock_pil_image.convert.return_value = mock_pil_image
131 |         
132 |         mock_byte_stream = MagicMock()
133 |         mock_byte_stream.getvalue.return_value = b"fake_jpeg_data"
134 |         
135 |         with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api, \
136 |              patch('meta_ads_mcp.core.ads.get_ad_creatives', new_callable=AsyncMock) as mock_get_creatives, \
137 |              patch('meta_ads_mcp.core.ads.download_image', new_callable=AsyncMock) as mock_download, \
138 |              patch('meta_ads_mcp.core.ads.PILImage.open') as mock_pil_open, \
139 |              patch('meta_ads_mcp.core.ads.io.BytesIO') as mock_bytesio:
140 |             
141 |             mock_api.side_effect = [mock_ad_data, mock_creative_details, mock_image_data]
142 |             mock_get_creatives.return_value = mock_get_ad_creatives_response
143 |             mock_download.return_value = b"fake_image_bytes"
144 |             mock_pil_open.return_value = mock_pil_image
145 |             mock_bytesio.return_value = mock_byte_stream
146 |             
147 |             # This should handle the wrapped JSON response correctly
148 |             # Previously would fail: TypeError: the JSON object must be str, bytes or bytearray, not dict
149 |             result = await get_ad_image(access_token="test_token", ad_id="120228922871870272")
150 |             
151 |             # Verify the fallback path worked - key is no JSON parsing exception
152 |             assert result is not None
153 |             # Verify get_ad_creatives was called (fallback path was triggered)
154 |             mock_get_creatives.assert_called_once()
155 |     
156 |     async def test_get_ad_image_no_ad_id(self):
157 |         """Test get_ad_image with no ad_id provided."""
158 |         
159 |         result = await get_ad_image(access_token="test_token", ad_id=None)
160 |         
161 |         # Should return error string, not throw JSON parsing error
162 |         assert isinstance(result, str)
163 |         assert "Error: No ad ID provided" in result
164 |     
165 |     async def test_get_ad_image_parameter_order_regression(self):
166 |         """Regression test: ensure get_ad_creatives is called with correct parameter order."""
167 |         
168 |         # This test ensures we don't regress to calling get_ad_creatives(ad_id, "", access_token)
169 |         # which was the original bug
170 |         
171 |         mock_ad_data = {
172 |             "account_id": "act_123456789", 
173 |             "creative": {"id": "creative_123456789"}
174 |         }
175 |         
176 |         mock_creative_details = {
177 |             "id": "creative_123456789",
178 |             "name": "Test Creative"
179 |             # No image_hash to trigger fallback
180 |         }
181 |         
182 |         with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api, \
183 |              patch('meta_ads_mcp.core.ads.get_ad_creatives', new_callable=AsyncMock) as mock_get_creatives:
184 |             
185 |             mock_api.side_effect = [mock_ad_data, mock_creative_details]
186 |             mock_get_creatives.return_value = json.dumps({"data": json.dumps({"data": []})})
187 |             
188 |             # Call get_ad_image - it should reach the fallback path
189 |             result = await get_ad_image(access_token="test_token", ad_id="test_ad_id")
190 |             
191 |             # Verify get_ad_creatives was called with correct parameter names (not positional)
192 |             mock_get_creatives.assert_called_once_with(ad_id="test_ad_id", access_token="test_token")
193 |             
194 |                          # The key regression test: this should not have raised a JSON parsing error
195 |     
196 |     async def test_get_ad_image_direct_url_fallback_with_image_urls_for_viewing(self):
197 |         """Test direct URL fallback using image_urls_for_viewing when no image_hash found."""
198 |         
199 |         # Mock responses for modern creative without image_hash
200 |         mock_ad_data = {
201 |             "account_id": "act_123456789",
202 |             "creative": {"id": "creative_123456789"}
203 |         }
204 |         
205 |         mock_creative_details = {
206 |             "id": "creative_123456789",
207 |             "name": "Modern Creative"
208 |             # No image_hash - this will trigger fallback
209 |         }
210 |         
211 |         # Mock get_ad_creatives response with direct URLs
212 |         mock_get_ad_creatives_response = json.dumps({
213 |             "data": [
214 |                 {
215 |                     "id": "creative_123456789",
216 |                     "name": "Modern Creative",
217 |                     "status": "ACTIVE",
218 |                     "thumbnail_url": "https://example.com/thumb.jpg",
219 |                     "image_urls_for_viewing": [
220 |                         "https://example.com/full_image.jpg",
221 |                         "https://example.com/alt_image.jpg"
222 |                     ]
223 |                 }
224 |             ]
225 |         })
226 |         
227 |         # Mock PIL Image processing
228 |         mock_pil_image = MagicMock()
229 |         mock_pil_image.mode = "RGB"
230 |         mock_pil_image.convert.return_value = mock_pil_image
231 |         
232 |         mock_byte_stream = MagicMock()
233 |         mock_byte_stream.getvalue.return_value = b"fake_jpeg_data"
234 |         
235 |         with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api, \
236 |              patch('meta_ads_mcp.core.ads.get_ad_creatives', new_callable=AsyncMock) as mock_get_creatives, \
237 |              patch('meta_ads_mcp.core.ads.download_image', new_callable=AsyncMock) as mock_download, \
238 |              patch('meta_ads_mcp.core.ads.PILImage.open') as mock_pil_open, \
239 |              patch('meta_ads_mcp.core.ads.io.BytesIO') as mock_bytesio:
240 |             
241 |             mock_api.side_effect = [mock_ad_data, mock_creative_details]
242 |             mock_get_creatives.return_value = mock_get_ad_creatives_response
243 |             mock_download.return_value = b"fake_image_bytes"
244 |             mock_pil_open.return_value = mock_pil_image
245 |             mock_bytesio.return_value = mock_byte_stream
246 |             
247 |             # This should use direct URL fallback successfully
248 |             result = await get_ad_image(access_token="test_token", ad_id="test_ad_id")
249 |             
250 |             # Verify it used the direct URL approach
251 |             assert result is not None
252 |             mock_get_creatives.assert_called_once()
253 |             mock_download.assert_called_once_with("https://example.com/full_image.jpg")
254 |     
255 |     async def test_get_ad_image_direct_url_fallback_with_thumbnail_url_only(self):
256 |         """Test direct URL fallback using thumbnail_url when image_urls_for_viewing not available."""
257 |         
258 |         # Mock responses for creative with only thumbnail_url
259 |         mock_ad_data = {
260 |             "account_id": "act_123456789",
261 |             "creative": {"id": "creative_123456789"}
262 |         }
263 |         
264 |         mock_creative_details = {
265 |             "id": "creative_123456789",
266 |             "name": "Thumbnail Only Creative"
267 |             # No image_hash
268 |         }
269 |         
270 |         # Mock get_ad_creatives response with only thumbnail_url
271 |         mock_get_ad_creatives_response = json.dumps({
272 |             "data": [
273 |                 {
274 |                     "id": "creative_123456789",
275 |                     "name": "Thumbnail Only Creative",
276 |                     "status": "ACTIVE",
277 |                     "thumbnail_url": "https://example.com/thumb_only.jpg"
278 |                     # No image_urls_for_viewing
279 |                 }
280 |             ]
281 |         })
282 |         
283 |         # Mock PIL Image processing
284 |         mock_pil_image = MagicMock()
285 |         mock_pil_image.mode = "RGB"
286 |         mock_pil_image.convert.return_value = mock_pil_image
287 |         
288 |         mock_byte_stream = MagicMock()
289 |         mock_byte_stream.getvalue.return_value = b"fake_jpeg_data"
290 |         
291 |         with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api, \
292 |              patch('meta_ads_mcp.core.ads.get_ad_creatives', new_callable=AsyncMock) as mock_get_creatives, \
293 |              patch('meta_ads_mcp.core.ads.download_image', new_callable=AsyncMock) as mock_download, \
294 |              patch('meta_ads_mcp.core.ads.PILImage.open') as mock_pil_open, \
295 |              patch('meta_ads_mcp.core.ads.io.BytesIO') as mock_bytesio:
296 |             
297 |             mock_api.side_effect = [mock_ad_data, mock_creative_details]
298 |             mock_get_creatives.return_value = mock_get_ad_creatives_response
299 |             mock_download.return_value = b"fake_image_bytes"
300 |             mock_pil_open.return_value = mock_pil_image
301 |             mock_bytesio.return_value = mock_byte_stream
302 |             
303 |             # This should fall back to thumbnail_url
304 |             result = await get_ad_image(access_token="test_token", ad_id="test_ad_id")
305 |             
306 |             # Verify it used the thumbnail URL
307 |             assert result is not None
308 |             mock_download.assert_called_once_with("https://example.com/thumb_only.jpg")
309 |     
310 |     async def test_get_ad_image_no_direct_urls_available(self):
311 |         """Test error handling when no direct URLs are available."""
312 |         
313 |         # Mock responses for creative without any URLs
314 |         mock_ad_data = {
315 |             "account_id": "act_123456789",
316 |             "creative": {"id": "creative_123456789"}
317 |         }
318 |         
319 |         mock_creative_details = {
320 |             "id": "creative_123456789",
321 |             "name": "No URLs Creative"
322 |             # No image_hash
323 |         }
324 |         
325 |         # Mock get_ad_creatives response without URLs
326 |         mock_get_ad_creatives_response = json.dumps({
327 |             "data": [
328 |                 {
329 |                     "id": "creative_123456789", 
330 |                     "name": "No URLs Creative",
331 |                     "status": "ACTIVE"
332 |                     # No thumbnail_url or image_urls_for_viewing
333 |                 }
334 |             ]
335 |         })
336 |         
337 |         with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api, \
338 |              patch('meta_ads_mcp.core.ads.get_ad_creatives', new_callable=AsyncMock) as mock_get_creatives:
339 |             
340 |             mock_api.side_effect = [mock_ad_data, mock_creative_details]
341 |             mock_get_creatives.return_value = mock_get_ad_creatives_response
342 |             
343 |             # This should return appropriate error
344 |             result = await get_ad_image(access_token="test_token", ad_id="test_ad_id")
345 |             
346 |             # Should get error about no URLs
347 |             assert isinstance(result, str)
348 |             assert "No image URLs found" in result
349 |     
350 |     async def test_get_ad_image_direct_url_download_failure(self):
351 |         """Test error handling when direct URL download fails."""
352 |         
353 |         # Mock responses for creative with URLs but download failure
354 |         mock_ad_data = {
355 |             "account_id": "act_123456789",
356 |             "creative": {"id": "creative_123456789"}
357 |         }
358 |         
359 |         mock_creative_details = {
360 |             "id": "creative_123456789",
361 |             "name": "Download Fail Creative"
362 |         }
363 |         
364 |         mock_get_ad_creatives_response = json.dumps({
365 |             "data": [
366 |                 {
367 |                     "id": "creative_123456789",
368 |                     "name": "Download Fail Creative",
369 |                     "image_urls_for_viewing": ["https://example.com/broken_image.jpg"]
370 |                 }
371 |             ]
372 |         })
373 |         
374 |         with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api, \
375 |              patch('meta_ads_mcp.core.ads.get_ad_creatives', new_callable=AsyncMock) as mock_get_creatives, \
376 |              patch('meta_ads_mcp.core.ads.download_image', new_callable=AsyncMock) as mock_download:
377 |             
378 |             mock_api.side_effect = [mock_ad_data, mock_creative_details]
379 |             mock_get_creatives.return_value = mock_get_ad_creatives_response
380 |             mock_download.return_value = None  # Simulate download failure
381 |             
382 |             # This should return download error
383 |             result = await get_ad_image(access_token="test_token", ad_id="test_ad_id")
384 |             
385 |             # Should get error about download failure
386 |             assert isinstance(result, str)
387 |             assert "Failed to download image from direct URL" in result
388 |     
389 |     async def test_get_ad_image_quality_improvement_prioritizes_high_quality(self):
390 |         """Test that the image quality improvement correctly prioritizes high-quality images over thumbnails."""
391 |         
392 |         # Mock responses for creative with both high-quality and thumbnail URLs
393 |         mock_ad_data = {
394 |             "account_id": "act_123456789",
395 |             "creative": {"id": "creative_123456789"}
396 |         }
397 |         
398 |         mock_creative_details = {
399 |             "id": "creative_123456789",
400 |             "name": "Quality Test Creative"
401 |         }
402 |         
403 |         # Mock get_ad_creatives response with both URLs
404 |         mock_get_ad_creatives_response = json.dumps({
405 |             "data": [
406 |                 {
407 |                     "id": "creative_123456789",
408 |                     "name": "Quality Test Creative",
409 |                     "status": "ACTIVE",
410 |                     "thumbnail_url": "https://example.com/thumbnail_64x64.jpg",  # Low quality thumbnail
411 |                     "image_url": "https://example.com/full_image.jpg",  # Medium quality
412 |                     "image_urls_for_viewing": [
413 |                         "https://example.com/high_quality_image.jpg",  # Highest quality
414 |                         "https://example.com/alt_high_quality.jpg"
415 |                     ],
416 |                     "object_story_spec": {
417 |                         "link_data": {
418 |                             "picture": "https://example.com/object_story_picture.jpg"
419 |                         }
420 |                     }
421 |                 }
422 |             ]
423 |         })
424 |         
425 |         # Mock PIL Image processing
426 |         mock_pil_image = MagicMock()
427 |         mock_pil_image.mode = "RGB"
428 |         mock_pil_image.convert.return_value = mock_pil_image
429 |         
430 |         mock_byte_stream = MagicMock()
431 |         mock_byte_stream.getvalue.return_value = b"fake_jpeg_data"
432 |         
433 |         with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api, \
434 |              patch('meta_ads_mcp.core.ads.get_ad_creatives', new_callable=AsyncMock) as mock_get_creatives, \
435 |              patch('meta_ads_mcp.core.ads.download_image', new_callable=AsyncMock) as mock_download, \
436 |              patch('meta_ads_mcp.core.ads.PILImage.open') as mock_pil_open, \
437 |              patch('meta_ads_mcp.core.ads.io.BytesIO') as mock_bytesio:
438 |             
439 |             mock_api.side_effect = [mock_ad_data, mock_creative_details]
440 |             mock_get_creatives.return_value = mock_get_ad_creatives_response
441 |             mock_download.return_value = b"fake_image_bytes"
442 |             mock_pil_open.return_value = mock_pil_image
443 |             mock_bytesio.return_value = mock_byte_stream
444 |             
445 |             # This should prioritize image_urls_for_viewing[0] over thumbnail_url
446 |             result = await get_ad_image(access_token="test_token", ad_id="test_ad_id")
447 |             
448 |             # Verify it used the highest quality URL, not the thumbnail
449 |             assert result is not None
450 |             mock_download.assert_called_once_with("https://example.com/high_quality_image.jpg")
451 |             
452 |             # Verify it did NOT use the thumbnail URL
453 |             # Check that the call was made with the high-quality URL, not the thumbnail
454 |             mock_download.assert_called_once_with("https://example.com/high_quality_image.jpg") 
```

--------------------------------------------------------------------------------
/tests/test_dsa_integration.py:
--------------------------------------------------------------------------------

```python
  1 | #!/usr/bin/env python3
  2 | """
  3 | Integration test for DSA beneficiary functionality.
  4 | 
  5 | This test demonstrates the complete DSA beneficiary implementation working end-to-end,
  6 | including detection, parameter support, and error handling.
  7 | """
  8 | 
  9 | import pytest
 10 | import json
 11 | from unittest.mock import AsyncMock, patch
 12 | 
 13 | from meta_ads_mcp.core.adsets import create_adset, get_adset_details
 14 | from meta_ads_mcp.core.accounts import get_account_info
 15 | 
 16 | 
 17 | class TestDSAIntegration:
 18 |     """Integration tests for DSA beneficiary functionality"""
 19 |     
 20 |     @pytest.mark.asyncio
 21 |     async def test_dsa_beneficiary_complete_workflow(self):
 22 |         """Test complete DSA beneficiary workflow from account detection to ad set creation"""
 23 |         
 24 |         # Step 1: Get account info and detect DSA requirement
 25 |         mock_account_response = {
 26 |             "id": "act_701351919139047",
 27 |             "name": "Test European Account",
 28 |             "account_status": 1,
 29 |             "business_country_code": "DE",  # Germany - DSA compliant
 30 |             "business_city": "Berlin",
 31 |             "currency": "EUR"
 32 |         }
 33 |         
 34 |         with patch('meta_ads_mcp.core.accounts.make_api_request', new_callable=AsyncMock) as mock_account_api:
 35 |             with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
 36 |                 mock_auth.return_value = "test_access_token"
 37 |                 mock_account_api.return_value = mock_account_response
 38 |                 
 39 |                 # Get account info and verify DSA detection
 40 |                 result = await get_account_info(account_id="act_701351919139047")
 41 |                 
 42 |                 # Handle new return format (dictionary instead of JSON string)
 43 |                 if isinstance(result, dict):
 44 |                     result_data = result
 45 |                 else:
 46 |                     result_data = json.loads(result)
 47 |                 
 48 |                 # Verify DSA requirement is detected
 49 |                 assert result_data["business_country_code"] == "DE"
 50 |                 assert result_data["dsa_required"] == True
 51 |                 assert "DSA (Digital Services Act)" in result_data["dsa_compliance_note"]
 52 |         
 53 |         # Step 2: Create ad set with DSA beneficiary
 54 |         mock_adset_response = {
 55 |             "id": "23842588888640185",
 56 |             "name": "Test European Ad Set",
 57 |             "status": "PAUSED",
 58 |             "dsa_beneficiary": "Test Organization GmbH"
 59 |         }
 60 |         
 61 |         with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_adset_api:
 62 |             with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
 63 |                 mock_auth.return_value = "test_access_token"
 64 |                 mock_adset_api.return_value = mock_adset_response
 65 |                 
 66 |                 # Create ad set with DSA beneficiary
 67 |                 result = await create_adset(
 68 |                     account_id="act_701351919139047",
 69 |                     campaign_id="23842588888640184",
 70 |                     name="Test European Ad Set",
 71 |                     optimization_goal="LINK_CLICKS",
 72 |                     billing_event="IMPRESSIONS",
 73 |                     dsa_beneficiary="Test Organization GmbH"
 74 |                 )
 75 |                 
 76 |                 result_data = json.loads(result)
 77 |                 
 78 |                 # Verify successful creation with DSA beneficiary
 79 |                 assert result_data["id"] == "23842588888640185"
 80 |                 assert result_data["dsa_beneficiary"] == "Test Organization GmbH"
 81 |                 
 82 |                 # Verify API call included DSA beneficiary parameter
 83 |                 mock_adset_api.assert_called_once()
 84 |                 call_args = mock_adset_api.call_args
 85 |                 params = call_args[0][2]  # Third argument is params
 86 |                 assert "dsa_beneficiary" in params
 87 |                 assert params["dsa_beneficiary"] == "Test Organization GmbH"
 88 |     
 89 |     @pytest.mark.asyncio
 90 |     async def test_dsa_beneficiary_error_handling_integration(self):
 91 |         """Test DSA beneficiary error handling in real-world scenarios"""
 92 |         
 93 |         # Test 1: Missing DSA beneficiary for European account
 94 |         mock_error_response = {
 95 |             "error": {
 96 |                 "message": "Enter the person or organization that benefits from ads in this ad set",
 97 |                 "type": "OAuthException",
 98 |                 "code": 100
 99 |             }
100 |         }
101 |         
102 |         with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
103 |             with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
104 |                 mock_auth.return_value = "test_access_token"
105 |                 mock_api.side_effect = Exception(json.dumps(mock_error_response))
106 |                 
107 |                 result = await create_adset(
108 |                     account_id="act_701351919139047",
109 |                     campaign_id="23842588888640184",
110 |                     name="Test European Ad Set",
111 |                     optimization_goal="LINK_CLICKS",
112 |                     billing_event="IMPRESSIONS"
113 |                     # No DSA beneficiary provided
114 |                 )
115 |                 
116 |                 result_data = json.loads(result)
117 |                 
118 |                 # Handle response wrapped in 'data' field by meta_api_tool decorator
119 |                 if "data" in result_data:
120 |                     actual_data = json.loads(result_data["data"])
121 |                 else:
122 |                     actual_data = result_data
123 |                 
124 |                 # Verify error is properly handled
125 |                 assert "error" in actual_data
126 |                 assert "benefits from ads" in actual_data["error"]
127 |         
128 |         # Test 2: Permission error for DSA beneficiary
129 |         with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
130 |             with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
131 |                 mock_auth.return_value = "test_access_token"
132 |                 mock_api.side_effect = Exception("Permission denied: business_management permission required")
133 |                 
134 |                 result = await create_adset(
135 |                     account_id="act_701351919139047",
136 |                     campaign_id="23842588888640184",
137 |                     name="Test European Ad Set",
138 |                     optimization_goal="LINK_CLICKS",
139 |                     billing_event="IMPRESSIONS",
140 |                     dsa_beneficiary="Test Organization GmbH"
141 |                 )
142 |                 
143 |                 result_data = json.loads(result)
144 |                 
145 |                 # Handle response wrapped in 'data' field by meta_api_tool decorator
146 |                 if "data" in result_data:
147 |                     actual_data = json.loads(result_data["data"])
148 |                 else:
149 |                     actual_data = result_data
150 |                 
151 |                 # Verify permission error is handled
152 |                 assert "error" in actual_data
153 |                 assert "permission" in actual_data["error"].lower()
154 |     
155 |     @pytest.mark.asyncio
156 |     async def test_dsa_beneficiary_regional_compliance_integration(self):
157 |         """Test DSA beneficiary compliance across different regions"""
158 |         
159 |         # Test 1: European account (DSA required)
160 |         mock_de_account_response = {
161 |             "id": "act_de",
162 |             "name": "German Account",
163 |             "business_country_code": "DE",
164 |             "currency": "EUR"
165 |         }
166 |         
167 |         with patch('meta_ads_mcp.core.accounts.make_api_request', new_callable=AsyncMock) as mock_api:
168 |             with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
169 |                 mock_auth.return_value = "test_access_token"
170 |                 mock_api.return_value = mock_de_account_response
171 |                 
172 |                 result = await get_account_info(account_id="act_de")
173 |                 
174 |                 # Handle new return format (dictionary instead of JSON string)
175 |                 if isinstance(result, dict):
176 |                     result_data = result
177 |                 else:
178 |                     result_data = json.loads(result)
179 |                 
180 |                 # Verify DSA requirement for German account
181 |                 assert result_data["business_country_code"] == "DE"
182 |                 assert result_data["dsa_required"] == True
183 |         
184 |         # Test 2: US account (DSA not required)
185 |         mock_us_account_response = {
186 |             "id": "act_us",
187 |             "name": "US Account",
188 |             "business_country_code": "US",
189 |             "currency": "USD"
190 |         }
191 |         
192 |         with patch('meta_ads_mcp.core.accounts.make_api_request', new_callable=AsyncMock) as mock_api:
193 |             with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
194 |                 mock_auth.return_value = "test_access_token"
195 |                 mock_api.return_value = mock_us_account_response
196 |                 
197 |                 result = await get_account_info(account_id="act_us")
198 |                 
199 |                 # Handle new return format (dictionary instead of JSON string)
200 |                 if isinstance(result, dict):
201 |                     result_data = result
202 |                 else:
203 |                     result_data = json.loads(result)
204 |                 
205 |                 # Verify no DSA requirement for US account
206 |                 assert result_data["business_country_code"] == "US"
207 |                 assert result_data["dsa_required"] == False
208 |     
209 |     @pytest.mark.asyncio
210 |     async def test_dsa_beneficiary_parameter_formats_integration(self):
211 |         """Test different DSA beneficiary parameter formats in real scenarios"""
212 |         
213 |         test_cases = [
214 |             "Test Organization GmbH",
215 |             "Test Organization, Inc.",
216 |             "Test Organization Ltd.",
217 |             "Test Organization AG",
218 |             "Test Organization BV",
219 |             "Test Organization SARL"
220 |         ]
221 |         
222 |         for beneficiary_name in test_cases:
223 |             mock_response = {
224 |                 "id": "23842588888640185",
225 |                 "name": "Test Ad Set",
226 |                 "status": "PAUSED",
227 |                 "dsa_beneficiary": beneficiary_name
228 |             }
229 |             
230 |             with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
231 |                 with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
232 |                     mock_auth.return_value = "test_access_token"
233 |                     mock_api.return_value = mock_response
234 |                     
235 |                     result = await create_adset(
236 |                         account_id="act_701351919139047",
237 |                         campaign_id="23842588888640184",
238 |                         name="Test Ad Set",
239 |                         optimization_goal="LINK_CLICKS",
240 |                         billing_event="IMPRESSIONS",
241 |                         dsa_beneficiary=beneficiary_name
242 |                     )
243 |                     
244 |                     result_data = json.loads(result)
245 |                     
246 |                     # Verify successful creation with different formats
247 |                     assert result_data["id"] == "23842588888640185"
248 |                     assert result_data["dsa_beneficiary"] == beneficiary_name
249 |     
250 |     @pytest.mark.asyncio
251 |     async def test_dsa_beneficiary_retrieval_integration(self):
252 |         """Test complete workflow including retrieving DSA beneficiary information"""
253 |         
254 |         # Step 1: Create ad set with DSA beneficiary
255 |         mock_create_response = {
256 |             "id": "120229746629010183",
257 |             "name": "Test Ad Set with DSA",
258 |             "status": "PAUSED"
259 |         }
260 |         
261 |         with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
262 |             with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
263 |                 mock_auth.return_value = "test_access_token"
264 |                 mock_api.return_value = mock_create_response
265 |                 
266 |                 # Create ad set
267 |                 result = await create_adset(
268 |                     account_id="act_701351919139047",
269 |                     campaign_id="120229656904980183",
270 |                     name="Test Ad Set with DSA",
271 |                     optimization_goal="LINK_CLICKS",
272 |                     billing_event="IMPRESSIONS",
273 |                     dsa_beneficiary="Test Organization Inc"
274 |                 )
275 |                 
276 |                 result_data = json.loads(result)
277 |                 adset_id = result_data["id"]
278 |                 
279 |                 # Verify creation was successful
280 |                 assert adset_id == "120229746629010183"
281 |         
282 |         # Step 2: Retrieve ad set details including DSA beneficiary
283 |         mock_details_response = {
284 |             "id": "120229746629010183",
285 |             "name": "Test Ad Set with DSA",
286 |             "campaign_id": "120229656904980183",
287 |             "status": "PAUSED",
288 |             "daily_budget": "1000",
289 |             "targeting": {
290 |                 "geo_locations": {"countries": ["US"]},
291 |                 "age_min": 25,
292 |                 "age_max": 65
293 |             },
294 |             "bid_amount": 200,
295 |             "optimization_goal": "LINK_CLICKS",
296 |             "billing_event": "IMPRESSIONS",
297 |             "dsa_beneficiary": "Test Organization Inc"
298 |         }
299 |         
300 |         with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
301 |             with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
302 |                 mock_auth.return_value = "test_access_token"
303 |                 mock_api.return_value = mock_details_response
304 |                 
305 |                 # Retrieve ad set details
306 |                 result = await get_adset_details(adset_id=adset_id)
307 |                 result_data = json.loads(result)
308 |                 
309 |                 # Verify DSA beneficiary field is retrieved correctly
310 |                 assert result_data["id"] == "120229746629010183"
311 |                 assert "dsa_beneficiary" in result_data
312 |                 assert result_data["dsa_beneficiary"] == "Test Organization Inc"
313 |                 
314 |                 # Verify API call included dsa_beneficiary in fields
315 |                 mock_api.assert_called_once()
316 |                 call_args = mock_api.call_args
317 |                 assert "dsa_beneficiary" in str(call_args)
318 |     
319 |     @pytest.mark.asyncio
320 |     async def test_dsa_beneficiary_us_account_integration(self):
321 |         """Test DSA beneficiary behavior for US accounts (optional parameter)"""
322 |         
323 |         # Step 1: Verify US account doesn't require DSA
324 |         mock_us_account_response = {
325 |             "id": "act_701351919139047",
326 |             "name": "US Business Account",
327 |             "business_country_code": "US",
328 |             "account_status": 1
329 |         }
330 |         
331 |         with patch('meta_ads_mcp.core.accounts.make_api_request', new_callable=AsyncMock) as mock_api:
332 |             with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
333 |                 mock_auth.return_value = "test_access_token"
334 |                 mock_api.return_value = mock_us_account_response
335 |                 
336 |                 result = await get_account_info(account_id="act_701351919139047")
337 |                 result_data = json.loads(result)
338 |                 
339 |                 # Verify US account doesn't require DSA
340 |                 assert result_data["business_country_code"] == "US"
341 |                 assert result_data["dsa_required"] == False
342 |         
343 |         # Step 2: Create ad set without DSA beneficiary (should work for US)
344 |         mock_create_response = {
345 |             "id": "120229746624860183",
346 |             "name": "Test US Ad Set",
347 |             "status": "PAUSED"
348 |         }
349 |         
350 |         with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
351 |             with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
352 |                 mock_auth.return_value = "test_access_token"
353 |                 mock_api.return_value = mock_create_response
354 |                 
355 |                 result = await create_adset(
356 |                     account_id="act_701351919139047",
357 |                     campaign_id="120229656904980183",
358 |                     name="Test US Ad Set",
359 |                     optimization_goal="LINK_CLICKS",
360 |                     billing_event="IMPRESSIONS"
361 |                     # No DSA beneficiary provided
362 |                 )
363 |                 
364 |                 result_data = json.loads(result)
365 |                 
366 |                 # Verify creation was successful without DSA beneficiary
367 |                 assert result_data["id"] == "120229746624860183"
368 |         
369 |         # Step 3: Retrieve ad set details (should not have dsa_beneficiary field)
370 |         mock_details_response = {
371 |             "id": "120229746624860183",
372 |             "name": "Test US Ad Set",
373 |             "campaign_id": "120229656904980183",
374 |             "status": "PAUSED",
375 |             "daily_budget": "1000",
376 |             "targeting": {
377 |                 "geo_locations": {"countries": ["US"]}
378 |             },
379 |             "bid_amount": 200,
380 |             "optimization_goal": "LINK_CLICKS",
381 |             "billing_event": "IMPRESSIONS"
382 |             # No dsa_beneficiary field
383 |         }
384 |         
385 |         with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
386 |             with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
387 |                 mock_auth.return_value = "test_access_token"
388 |                 mock_api.return_value = mock_details_response
389 |                 
390 |                 result = await get_adset_details(adset_id="120229746624860183")
391 |                 result_data = json.loads(result)
392 |                 
393 |                 # Verify ad set details are retrieved correctly
394 |                 assert result_data["id"] == "120229746624860183"
395 |                 assert "dsa_beneficiary" not in result_data  # Should not be present for US accounts 
396 |     
397 |     @pytest.mark.asyncio
398 |     async def test_account_info_requires_account_id(self):
399 |         """Test that get_account_info requires an account_id parameter"""
400 |         
401 |         with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
402 |             mock_auth.return_value = "test_access_token"
403 |             
404 |             # Test without account_id parameter
405 |             result = await get_account_info(account_id=None)
406 |             
407 |             # Handle new return format (dictionary instead of JSON string)
408 |             if isinstance(result, dict):
409 |                 result_data = result
410 |             else:
411 |                 result_data = json.loads(result)
412 |             
413 |             # Verify error message for missing account_id
414 |             assert "error" in result_data
415 |             assert "Account ID is required" in result_data["error"]["message"]
416 |             assert "Please specify an account_id parameter" in result_data["error"]["details"]
417 |             assert "example" in result_data["error"]
418 |     
419 |     @pytest.mark.asyncio
420 |     async def test_account_info_inaccessible_account_error(self):
421 |         """Test that get_account_info provides helpful error for inaccessible accounts"""
422 |         
423 |         # Mock permission error for direct account access (first API call)
424 |         mock_permission_error = {
425 |             "error": {
426 |                 "message": "Insufficient access privileges",
427 |                 "type": "OAuthException",
428 |                 "code": 200
429 |             }
430 |         }
431 |         
432 |         # Mock accessible accounts response (second API call)
433 |         mock_accessible_accounts = {
434 |             "data": [
435 |                 {"id": "act_123", "name": "Test Account 1"},
436 |                 {"id": "act_456", "name": "Test Account 2"}
437 |             ]
438 |         }
439 |         
440 |         with patch('meta_ads_mcp.core.accounts.make_api_request', new_callable=AsyncMock) as mock_api:
441 |             with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
442 |                 mock_auth.return_value = "test_access_token"
443 |                 # First call returns permission error, second call returns accessible accounts
444 |                 mock_api.side_effect = [mock_permission_error, mock_accessible_accounts]
445 |                 
446 |                 result = await get_account_info(account_id="act_inaccessible")
447 |                 
448 |                 # Handle new return format (dictionary instead of JSON string)
449 |                 if isinstance(result, dict):
450 |                     result_data = result
451 |                 else:
452 |                     result_data = json.loads(result)
453 |                 
454 |                 # Verify helpful error message for inaccessible account
455 |                 assert "error" in result_data
456 |                 assert "not accessible to your user account" in result_data["error"]["message"]
457 |                 assert "accessible_accounts" in result_data["error"]
458 |                 assert "suggestion" in result_data["error"]
459 |                 assert len(result_data["error"]["accessible_accounts"]) == 2 
```

--------------------------------------------------------------------------------
/tests/test_mobile_app_adset_creation.py:
--------------------------------------------------------------------------------

```python
  1 | #!/usr/bin/env python3
  2 | """
  3 | Unit Tests for Mobile App Adset Creation Functionality
  4 | 
  5 | This test suite validates the mobile app parameters implementation for the
  6 | create_adset function in meta_ads_mcp/core/adsets.py.
  7 | 
  8 | Test cases cover:
  9 | - Mobile app adset creation success scenarios
 10 | - promoted_object parameter validation and formatting
 11 | - destination_type parameter validation
 12 | - Mobile app specific error handling
 13 | - Cross-platform mobile app support (iOS, Android)
 14 | - Integration with APP_INSTALLS optimization goal
 15 | 
 16 | Usage:
 17 |     uv run python -m pytest tests/test_mobile_app_adset_creation.py -v
 18 | 
 19 | Related to Issue #008: Missing Mobile App Parameters in create_adset Function
 20 | """
 21 | 
 22 | import pytest
 23 | import json
 24 | import asyncio
 25 | from unittest.mock import AsyncMock, patch, MagicMock
 26 | from typing import Dict, Any, List
 27 | 
 28 | # Import the function to test
 29 | from meta_ads_mcp.core.adsets import create_adset
 30 | 
 31 | 
 32 | class TestMobileAppAdsetCreation:
 33 |     """Test suite for mobile app adset creation functionality"""
 34 |     
 35 |     @pytest.fixture
 36 |     def mock_api_request(self):
 37 |         """Mock for the make_api_request function"""
 38 |         with patch('meta_ads_mcp.core.adsets.make_api_request') as mock:
 39 |             mock.return_value = {
 40 |                 "id": "test_mobile_adset_id",
 41 |                 "name": "Test Mobile App Adset",
 42 |                 "optimization_goal": "APP_INSTALLS",
 43 |                 "promoted_object": {
 44 |                     "application_id": "123456789012345",
 45 |                     "object_store_url": "https://apps.apple.com/app/id123456789"
 46 |                 },
 47 |                 "destination_type": "APP_STORE"
 48 |             }
 49 |             yield mock
 50 |     
 51 |     @pytest.fixture
 52 |     def mock_auth_manager(self):
 53 |         """Mock for the authentication manager"""
 54 |         with patch('meta_ads_mcp.core.api.auth_manager') as mock, \
 55 |              patch('meta_ads_mcp.core.auth.get_current_access_token') as mock_get_token:
 56 |             # Mock a valid access token
 57 |             mock.get_current_access_token.return_value = "test_access_token"
 58 |             mock.is_token_valid.return_value = True
 59 |             mock.app_id = "test_app_id"
 60 |             mock_get_token.return_value = "test_access_token"
 61 |             yield mock
 62 |     
 63 |     @pytest.fixture
 64 |     def valid_mobile_app_params(self):
 65 |         """Valid mobile app parameters for testing"""
 66 |         return {
 67 |             "account_id": "act_123456789",
 68 |             "campaign_id": "campaign_123456789",
 69 |             "name": "Test Mobile App Adset",
 70 |             "optimization_goal": "APP_INSTALLS",
 71 |             "billing_event": "IMPRESSIONS",
 72 |             "targeting": {
 73 |                 "age_min": 18,
 74 |                 "age_max": 65,
 75 |                 "app_install_state": "not_installed",
 76 |                 "geo_locations": {"countries": ["US"]},
 77 |                 "user_device": ["Android_Smartphone", "iPhone"],
 78 |                 "user_os": ["Android", "iOS"]
 79 |             }
 80 |         }
 81 |     
 82 |     @pytest.fixture
 83 |     def ios_promoted_object(self):
 84 |         """Valid iOS app promoted object"""
 85 |         return {
 86 |             "application_id": "123456789012345",
 87 |             "object_store_url": "https://apps.apple.com/app/id123456789",
 88 |             "custom_event_type": "APP_INSTALL"
 89 |         }
 90 |     
 91 |     @pytest.fixture
 92 |     def android_promoted_object(self):
 93 |         """Valid Android app promoted object"""
 94 |         return {
 95 |             "application_id": "987654321098765",
 96 |             "object_store_url": "https://play.google.com/store/apps/details?id=com.example.app",
 97 |             "custom_event_type": "APP_INSTALL"
 98 |         }
 99 |     
100 |     @pytest.fixture 
101 |     def promoted_object_with_pixel(self):
102 |         """Promoted object with Facebook pixel for tracking"""
103 |         return {
104 |             "application_id": "123456789012345",
105 |             "object_store_url": "https://apps.apple.com/app/id123456789",
106 |             "custom_event_type": "APP_INSTALL",
107 |             "pixel_id": "pixel_123456789"
108 |         }
109 | 
110 |     # Test: Mobile App Adset Creation Success
111 |     @pytest.mark.asyncio
112 |     async def test_mobile_app_adset_creation_success_ios(
113 |         self, mock_api_request, mock_auth_manager, valid_mobile_app_params, ios_promoted_object
114 |     ):
115 |         """Test successful iOS mobile app adset creation"""
116 |         
117 |         result = await create_adset(
118 |             **valid_mobile_app_params,
119 |             promoted_object=ios_promoted_object,
120 |             destination_type="APP_STORE"
121 |         )
122 |         
123 |         # Parse the result
124 |         result_data = json.loads(result)
125 |         
126 |         # Verify the API was called with correct parameters
127 |         mock_api_request.assert_called_once()
128 |         call_args = mock_api_request.call_args
129 |         
130 |         # Check endpoint (first argument)
131 |         assert call_args[0][0] == f"{valid_mobile_app_params['account_id']}/adsets"
132 |         
133 |         # Check parameters (third argument)
134 |         params = call_args[0][2]
135 |         assert 'promoted_object' in params
136 |         assert 'destination_type' in params
137 |         
138 |         # Verify promoted_object is properly JSON-encoded
139 |         promoted_obj_param = json.loads(params['promoted_object']) if isinstance(params['promoted_object'], str) else params['promoted_object']
140 |         assert promoted_obj_param['application_id'] == ios_promoted_object['application_id']
141 |         assert promoted_obj_param['object_store_url'] == ios_promoted_object['object_store_url']
142 |         
143 |         # Verify destination_type
144 |         assert params['destination_type'] == "APP_STORE"
145 |         
146 |         # Verify response structure
147 |         assert 'id' in result_data
148 |         assert result_data['optimization_goal'] == "APP_INSTALLS"
149 | 
150 |     @pytest.mark.asyncio
151 |     async def test_mobile_app_adset_creation_success_android(
152 |         self, mock_api_request, mock_auth_manager, valid_mobile_app_params, android_promoted_object
153 |     ):
154 |         """Test successful Android mobile app adset creation"""
155 |         
156 |         result = await create_adset(
157 |             **valid_mobile_app_params,
158 |             promoted_object=android_promoted_object,
159 |             destination_type="APP_STORE"
160 |         )
161 |         
162 |         # Parse the result  
163 |         result_data = json.loads(result)
164 |         
165 |         # Verify the API was called
166 |         mock_api_request.assert_called_once()
167 |         call_args = mock_api_request.call_args
168 |         params = call_args[0][2]
169 |         
170 |         # Verify Android-specific promoted_object
171 |         promoted_obj_param = json.loads(params['promoted_object']) if isinstance(params['promoted_object'], str) else params['promoted_object']
172 |         assert promoted_obj_param['application_id'] == android_promoted_object['application_id']
173 |         assert "play.google.com" in promoted_obj_param['object_store_url']
174 |         
175 |         # Verify response
176 |         assert 'id' in result_data
177 | 
178 |     @pytest.mark.asyncio
179 |     async def test_mobile_app_adset_with_pixel_tracking(
180 |         self, mock_api_request, mock_auth_manager, valid_mobile_app_params, promoted_object_with_pixel
181 |     ):
182 |         """Test mobile app adset creation with Facebook pixel tracking"""
183 |         
184 |         result = await create_adset(
185 |             **valid_mobile_app_params,
186 |             promoted_object=promoted_object_with_pixel,
187 |             destination_type="APP_STORE"
188 |         )
189 |         
190 |         # Verify pixel_id is included
191 |         call_args = mock_api_request.call_args
192 |         params = call_args[0][2]
193 |         promoted_obj_param = json.loads(params['promoted_object']) if isinstance(params['promoted_object'], str) else params['promoted_object']
194 |         
195 |         assert 'pixel_id' in promoted_obj_param
196 |         assert promoted_obj_param['pixel_id'] == promoted_object_with_pixel['pixel_id']
197 | 
198 |     # Test: Parameter Validation
199 |     @pytest.mark.asyncio
200 |     async def test_invalid_promoted_object_missing_application_id(
201 |         self, mock_api_request, mock_auth_manager, valid_mobile_app_params
202 |     ):
203 |         """Test validation error for promoted_object missing application_id"""
204 |         
205 |         invalid_promoted_object = {
206 |             "object_store_url": "https://apps.apple.com/app/id123456789",
207 |             "custom_event_type": "APP_INSTALL"
208 |         }
209 |         
210 |         result = await create_adset(
211 |             **valid_mobile_app_params,
212 |             promoted_object=invalid_promoted_object,
213 |             destination_type="APP_STORE"
214 |         )
215 |         
216 |         result_data = json.loads(result)
217 |         
218 |         # Should return validation error - check for data wrapper format
219 |         if "data" in result_data:
220 |             error_data = json.loads(result_data["data"]) 
221 |             assert 'error' in error_data
222 |             assert 'application_id' in error_data['error'].lower()
223 |         else:
224 |             assert 'error' in result_data
225 |             assert 'application_id' in result_data['error'].lower()
226 | 
227 |     @pytest.mark.asyncio 
228 |     async def test_invalid_promoted_object_missing_store_url(
229 |         self, mock_api_request, mock_auth_manager, valid_mobile_app_params
230 |     ):
231 |         """Test validation error for promoted_object missing object_store_url"""
232 |         
233 |         invalid_promoted_object = {
234 |             "application_id": "123456789012345",
235 |             "custom_event_type": "APP_INSTALL"
236 |         }
237 |         
238 |         result = await create_adset(
239 |             **valid_mobile_app_params,
240 |             promoted_object=invalid_promoted_object,
241 |             destination_type="APP_STORE"
242 |         )
243 |         
244 |         result_data = json.loads(result)
245 |         
246 |         # Should return validation error - check for data wrapper format
247 |         if "data" in result_data:
248 |             error_data = json.loads(result_data["data"]) 
249 |             assert 'error' in error_data
250 |             assert 'object_store_url' in error_data['error'].lower()
251 |         else:
252 |             assert 'error' in result_data
253 |             assert 'object_store_url' in result_data['error'].lower()
254 | 
255 |     @pytest.mark.asyncio
256 |     async def test_invalid_destination_type(
257 |         self, mock_api_request, mock_auth_manager, valid_mobile_app_params, ios_promoted_object
258 |     ):
259 |         """Test validation error for invalid destination_type value"""
260 |         
261 |         result = await create_adset(
262 |             **valid_mobile_app_params,
263 |             promoted_object=ios_promoted_object,
264 |             destination_type="INVALID_TYPE"
265 |         )
266 |         
267 |         result_data = json.loads(result)
268 |         
269 |         # Should return validation error - check for data wrapper format
270 |         if "data" in result_data:
271 |             error_data = json.loads(result_data["data"]) 
272 |             assert 'error' in error_data
273 |             assert 'destination_type' in error_data['error'].lower()
274 |         else:
275 |             assert 'error' in result_data
276 |             assert 'destination_type' in result_data['error'].lower()
277 | 
278 |     @pytest.mark.asyncio
279 |     async def test_app_installs_requires_promoted_object(
280 |         self, mock_api_request, mock_auth_manager, valid_mobile_app_params
281 |     ):
282 |         """Test that APP_INSTALLS optimization goal requires promoted_object"""
283 |         
284 |         result = await create_adset(
285 |             **valid_mobile_app_params,
286 |             # Missing promoted_object
287 |             destination_type="APP_STORE"
288 |         )
289 |         
290 |         result_data = json.loads(result)
291 |         
292 |         # Should return validation error - check for data wrapper format
293 |         if "data" in result_data:
294 |             error_data = json.loads(result_data["data"]) 
295 |             assert 'error' in error_data
296 |             assert 'promoted_object' in error_data['error'].lower()
297 |             assert 'app_installs' in error_data['error'].lower()
298 |         else:
299 |             assert 'error' in result_data
300 |             assert 'promoted_object' in result_data['error'].lower()
301 |             assert 'app_installs' in result_data['error'].lower()
302 | 
303 |     # Test: Cross-platform Support
304 |     @pytest.mark.asyncio
305 |     async def test_ios_app_store_url_validation(
306 |         self, mock_api_request, mock_auth_manager, valid_mobile_app_params
307 |     ):
308 |         """Test iOS App Store URL format validation"""
309 |         
310 |         ios_promoted_object = {
311 |             "application_id": "123456789012345",
312 |             "object_store_url": "https://apps.apple.com/app/id123456789",
313 |             "custom_event_type": "APP_INSTALL"
314 |         }
315 |         
316 |         result = await create_adset(
317 |             **valid_mobile_app_params,
318 |             promoted_object=ios_promoted_object,
319 |             destination_type="APP_STORE"
320 |         )
321 |         
322 |         result_data = json.loads(result)
323 |         
324 |         # Should succeed for valid iOS URL
325 |         assert 'error' not in result_data or result_data.get('error') is None
326 | 
327 |     @pytest.mark.asyncio
328 |     async def test_google_play_url_validation(
329 |         self, mock_api_request, mock_auth_manager, valid_mobile_app_params
330 |     ):
331 |         """Test Google Play Store URL format validation"""
332 |         
333 |         android_promoted_object = {
334 |             "application_id": "987654321098765",
335 |             "object_store_url": "https://play.google.com/store/apps/details?id=com.example.app",
336 |             "custom_event_type": "APP_INSTALL"
337 |         }
338 |         
339 |         result = await create_adset(
340 |             **valid_mobile_app_params,
341 |             promoted_object=android_promoted_object,
342 |             destination_type="APP_STORE"
343 |         )
344 |         
345 |         result_data = json.loads(result)
346 |         
347 |         # Should succeed for valid Google Play URL
348 |         assert 'error' not in result_data or result_data.get('error') is None
349 | 
350 |     @pytest.mark.asyncio
351 |     async def test_invalid_store_url_format(
352 |         self, mock_api_request, mock_auth_manager, valid_mobile_app_params
353 |     ):
354 |         """Test validation error for invalid app store URL format"""
355 |         
356 |         invalid_promoted_object = {
357 |             "application_id": "123456789012345",
358 |             "object_store_url": "https://example.com/invalid-url",
359 |             "custom_event_type": "APP_INSTALL"
360 |         }
361 |         
362 |         result = await create_adset(
363 |             **valid_mobile_app_params,
364 |             promoted_object=invalid_promoted_object,
365 |             destination_type="APP_STORE"
366 |         )
367 |         
368 |         result_data = json.loads(result)
369 |         
370 |         # Should return validation error for invalid URL - check for data wrapper format
371 |         if "data" in result_data:
372 |             error_data = json.loads(result_data["data"]) 
373 |             assert 'error' in error_data
374 |             assert 'store url' in error_data['error'].lower() or 'object_store_url' in error_data['error'].lower()
375 |         else:
376 |             assert 'error' in result_data
377 |             assert 'store url' in result_data['error'].lower() or 'object_store_url' in result_data['error'].lower()
378 | 
379 |     # Test: Destination Type Variations
380 |     @pytest.mark.asyncio
381 |     async def test_deeplink_destination_type(
382 |         self, mock_api_request, mock_auth_manager, valid_mobile_app_params, ios_promoted_object
383 |     ):
384 |         """Test DEEPLINK destination_type"""
385 |         
386 |         result = await create_adset(
387 |             **valid_mobile_app_params,
388 |             promoted_object=ios_promoted_object,
389 |             destination_type="DEEPLINK"
390 |         )
391 |         
392 |         call_args = mock_api_request.call_args
393 |         params = call_args[0][2]
394 |         assert params['destination_type'] == "DEEPLINK"
395 | 
396 |     @pytest.mark.asyncio
397 |     async def test_app_install_destination_type(
398 |         self, mock_api_request, mock_auth_manager, valid_mobile_app_params, ios_promoted_object
399 |     ):
400 |         """Test APP_INSTALL destination_type"""
401 |         
402 |         result = await create_adset(
403 |             **valid_mobile_app_params,
404 |             promoted_object=ios_promoted_object,
405 |             destination_type="APP_INSTALL"
406 |         )
407 |         
408 |         call_args = mock_api_request.call_args
409 |         params = call_args[0][2]
410 |         assert params['destination_type'] == "APP_INSTALL"
411 | 
412 |     @pytest.mark.asyncio
413 |     async def test_on_ad_destination_type_for_lead_generation(
414 |         self, mock_api_request, mock_auth_manager, valid_mobile_app_params
415 |     ):
416 |         """Test ON_AD destination_type for lead generation campaigns (Issue #009 fix)"""
417 |         
418 |         # Create a lead generation adset configuration (without promoted_object since it's for lead gen, not mobile apps)
419 |         lead_gen_params = valid_mobile_app_params.copy()
420 |         lead_gen_params.update({
421 |             "optimization_goal": "LEAD_GENERATION",
422 |             "billing_event": "IMPRESSIONS"
423 |         })
424 |         
425 |         result = await create_adset(
426 |             **lead_gen_params,
427 |             destination_type="ON_AD"
428 |         )
429 |         
430 |         # Should pass validation and include destination_type in API call
431 |         call_args = mock_api_request.call_args
432 |         params = call_args[0][2]
433 |         assert params['destination_type'] == "ON_AD"
434 | 
435 |     @pytest.mark.asyncio
436 |     async def test_on_ad_validation_passes(
437 |         self, mock_api_request, mock_auth_manager, valid_mobile_app_params
438 |     ):
439 |         """Test that ON_AD destination_type passes validation (Issue #009 regression test)"""
440 |         
441 |         # Use parameters that work with ON_AD (lead generation, not mobile app)
442 |         lead_gen_params = valid_mobile_app_params.copy()
443 |         lead_gen_params.update({
444 |             "optimization_goal": "LEAD_GENERATION",
445 |             "billing_event": "IMPRESSIONS"
446 |         })
447 |         
448 |         result = await create_adset(
449 |             **lead_gen_params,
450 |             destination_type="ON_AD"
451 |         )
452 |         
453 |         result_data = json.loads(result)
454 |         
455 |         # Should NOT return a validation error about destination_type
456 |         # Before the fix, this would return: "Invalid destination_type: ON_AD" 
457 |         if "data" in result_data:
458 |             error_data = json.loads(result_data["data"])
459 |             assert "error" not in error_data or "destination_type" not in error_data.get("error", "").lower()
460 |         else:
461 |             assert "error" not in result_data or "destination_type" not in result_data.get("error", "").lower()
462 | 
463 |     # Test: Error Handling
464 |     @pytest.mark.asyncio
465 |     async def test_meta_api_error_handling(
466 |         self, mock_auth_manager, valid_mobile_app_params, ios_promoted_object
467 |     ):
468 |         """Test handling of Meta API errors for mobile app adsets"""
469 |         
470 |         with patch('meta_ads_mcp.core.adsets.make_api_request') as mock_api:
471 |             # Mock Meta API error response
472 |             mock_api.side_effect = Exception("HTTP Error: 400 - Select a dataset and conversion event for your ad set")
473 |             
474 |             result = await create_adset(
475 |                 **valid_mobile_app_params,
476 |                 promoted_object=ios_promoted_object,
477 |                 destination_type="APP_STORE"
478 |             )
479 |             
480 |             result_data = json.loads(result)
481 |             
482 |             # Should handle the error gracefully - check for data wrapper format
483 |             if "data" in result_data:
484 |                 error_data = json.loads(result_data["data"]) 
485 |                 assert 'error' in error_data
486 |                 # Check for error text in either error message or details
487 |                 error_text = error_data.get('error', '').lower()
488 |                 details_text = error_data.get('details', '').lower()
489 |                 assert 'dataset' in error_text or 'conversion event' in error_text or \
490 |                        'dataset' in details_text or 'conversion event' in details_text
491 |             else:
492 |                 assert 'error' in result_data
493 |                 error_text = result_data.get('error', '').lower()
494 |                 details_text = result_data.get('details', '').lower()
495 |                 assert 'dataset' in error_text or 'conversion event' in error_text or \
496 |                        'dataset' in details_text or 'conversion event' in details_text
497 | 
498 |     # Test: Backward Compatibility
499 |     @pytest.mark.asyncio
500 |     async def test_backward_compatibility_non_mobile_campaigns(
501 |         self, mock_api_request, mock_auth_manager
502 |     ):
503 |         """Test that non-mobile campaigns still work without mobile app parameters"""
504 |         
505 |         non_mobile_params = {
506 |             "account_id": "act_123456789",
507 |             "campaign_id": "campaign_123456789", 
508 |             "name": "Test Web Adset",
509 |             "optimization_goal": "LINK_CLICKS",
510 |             "billing_event": "LINK_CLICKS",
511 |             "targeting": {
512 |                 "age_min": 18,
513 |                 "age_max": 65,
514 |                 "geo_locations": {"countries": ["US"]}
515 |             }
516 |         }
517 |         
518 |         result = await create_adset(**non_mobile_params)
519 |         
520 |         # Should work without mobile app parameters
521 |         mock_api_request.assert_called_once()
522 |         call_args = mock_api_request.call_args
523 |         params = call_args[0][2]
524 |         
525 |         # Should not include mobile app parameters
526 |         assert 'promoted_object' not in params
527 |         assert 'destination_type' not in params
528 | 
529 |     @pytest.mark.asyncio
530 |     async def test_optional_mobile_parameters(
531 |         self, mock_api_request, mock_auth_manager, valid_mobile_app_params, ios_promoted_object
532 |     ):
533 |         """Test that mobile app parameters are optional for non-APP_INSTALLS campaigns"""
534 |         
535 |         non_app_install_params = valid_mobile_app_params.copy()
536 |         non_app_install_params['optimization_goal'] = "REACH"
537 |         
538 |         result = await create_adset(
539 |             **non_app_install_params,
540 |             # Mobile app parameters should be optional for non-APP_INSTALLS
541 |             promoted_object=ios_promoted_object,
542 |             destination_type="APP_STORE"
543 |         )
544 |         
545 |         # Should work and include mobile parameters if provided
546 |         mock_api_request.assert_called_once()
547 |         call_args = mock_api_request.call_args
548 |         params = call_args[0][2]
549 |         
550 |         # Mobile parameters should be included if provided
551 |         assert 'promoted_object' in params
552 |         assert 'destination_type' in params
```

--------------------------------------------------------------------------------
/tests/test_targeting_search_e2e.py:
--------------------------------------------------------------------------------

```python
  1 | #!/usr/bin/env python3
  2 | """
  3 | End-to-End Targeting Search Test for Meta Ads MCP
  4 | 
  5 | This test validates that the targeting search tools correctly find and return
  6 | targeting options (interests, behaviors, demographics, geo locations) from the
  7 | Meta Ads API through a pre-authenticated MCP server.
  8 | 
  9 | Test functions:
 10 | - search_interests
 11 | - get_interest_suggestions  
 12 | - validate_interests
 13 | - search_behaviors
 14 | - search_demographics
 15 | - search_geo_locations
 16 | """
 17 | 
 18 | import requests
 19 | import json
 20 | import os
 21 | import sys
 22 | from typing import Dict, Any, List
 23 | 
 24 | # Load environment variables from .env file
 25 | try:
 26 |     from dotenv import load_dotenv
 27 |     load_dotenv()
 28 |     print("✅ Loaded environment variables from .env file")
 29 | except ImportError:
 30 |     print("⚠️  python-dotenv not installed, using system environment variables only")
 31 | 
 32 | class TargetingSearchTester:
 33 |     """Test suite focused on targeting search functionality"""
 34 |     
 35 |     def __init__(self, base_url: str = "http://localhost:8080"):
 36 |         self.base_url = base_url.rstrip('/')
 37 |         self.endpoint = f"{self.base_url}/mcp/"
 38 |         self.request_id = 1
 39 |         
 40 |         # Test data for validation
 41 |         self.test_queries = {
 42 |             "interests": ["baseball", "cooking", "travel"],
 43 |             "geo_locations": ["New York", "California", "Japan"],
 44 |             "interest_suggestions": ["Basketball", "Soccer"],
 45 |             "demographics": ["life_events", "industries", "family_statuses"]
 46 |         }
 47 |         
 48 |     def _make_request(self, method: str, params: Dict[str, Any] = None, 
 49 |                      headers: Dict[str, str] = None) -> Dict[str, Any]:
 50 |         """Make a JSON-RPC request to the MCP server"""
 51 |         
 52 |         default_headers = {
 53 |             "Content-Type": "application/json",
 54 |             "Accept": "application/json, text/event-stream",
 55 |             "User-Agent": "Targeting-Search-Test-Client/1.0"
 56 |         }
 57 |         
 58 |         if headers:
 59 |             default_headers.update(headers)
 60 |         
 61 |         payload = {
 62 |             "jsonrpc": "2.0",
 63 |             "method": method,
 64 |             "id": self.request_id
 65 |         }
 66 |         
 67 |         if params:
 68 |             payload["params"] = params
 69 |         
 70 |         try:
 71 |             response = requests.post(
 72 |                 self.endpoint,
 73 |                 headers=default_headers,
 74 |                 json=payload,
 75 |                 timeout=15
 76 |             )
 77 |             
 78 |             self.request_id += 1
 79 |             
 80 |             return {
 81 |                 "status_code": response.status_code,
 82 |                 "headers": dict(response.headers),
 83 |                 "json": response.json() if response.status_code == 200 else None,
 84 |                 "text": response.text,
 85 |                 "success": response.status_code == 200
 86 |             }
 87 |             
 88 |         except requests.exceptions.RequestException as e:
 89 |             return {
 90 |                 "status_code": 0,
 91 |                 "headers": {},
 92 |                 "json": None,
 93 |                 "text": str(e),
 94 |                 "success": False,
 95 |                 "error": str(e)
 96 |             }
 97 | 
 98 |     def test_search_interests(self) -> Dict[str, Any]:
 99 |         """Test search_interests functionality"""
100 |         
101 |         print(f"\n🔍 Testing search_interests function")
102 |         results = {}
103 |         
104 |         for query in self.test_queries["interests"]:
105 |             print(f"   🔎 Searching for interests: '{query}'")
106 |             
107 |             result = self._make_request("tools/call", {
108 |                 "name": "search_interests",
109 |                 "arguments": {
110 |                     "query": query,
111 |                     "limit": 5
112 |                 }
113 |             })
114 |             
115 |             if not result["success"]:
116 |                 results[query] = {
117 |                     "success": False,
118 |                     "error": result.get("text", "Unknown error")
119 |                 }
120 |                 print(f"   ❌ Failed: {result.get('text', 'Unknown error')}")
121 |                 continue
122 |             
123 |             # Parse response
124 |             response_data = result["json"]["result"]
125 |             content = response_data.get("content", [{}])[0].get("text", "")
126 |             
127 |             try:
128 |                 parsed_content = json.loads(content)
129 |                 
130 |                 if "error" in parsed_content:
131 |                     results[query] = {
132 |                         "success": False,
133 |                         "error": parsed_content["error"]
134 |                     }
135 |                     print(f"   ❌ API Error: {parsed_content['error']}")
136 |                     continue
137 |                 
138 |                 interests = parsed_content.get("data", [])
139 |                 
140 |                 results[query] = {
141 |                     "success": True,
142 |                     "count": len(interests),
143 |                     "interests": interests[:3],  # Keep first 3 for display
144 |                     "has_required_fields": all(
145 |                         "id" in interest and "name" in interest 
146 |                         for interest in interests
147 |                     )
148 |                 }
149 |                 
150 |                 print(f"   ✅ Found {len(interests)} interests")
151 |                 for interest in interests[:3]:
152 |                     print(f"      • {interest.get('name', 'N/A')} (ID: {interest.get('id', 'N/A')})")
153 |                 
154 |             except json.JSONDecodeError:
155 |                 results[query] = {
156 |                     "success": False,
157 |                     "error": "Invalid JSON response",
158 |                     "raw_content": content
159 |                 }
160 |                 print(f"   ❌ Invalid JSON: {content}")
161 |         
162 |         return results
163 | 
164 |     def test_get_interest_suggestions(self) -> Dict[str, Any]:
165 |         """Test get_interest_suggestions functionality"""
166 |         
167 |         print(f"\n🔍 Testing get_interest_suggestions function")
168 |         
169 |         interest_list = self.test_queries["interest_suggestions"]
170 |         print(f"   🔎 Getting suggestions for: {interest_list}")
171 |         
172 |         result = self._make_request("tools/call", {
173 |             "name": "get_interest_suggestions",
174 |             "arguments": {
175 |                 "interest_list": interest_list,
176 |                 "limit": 5
177 |             }
178 |         })
179 |         
180 |         if not result["success"]:
181 |             return {
182 |                 "success": False,
183 |                 "error": result.get("text", "Unknown error")
184 |             }
185 |         
186 |         # Parse response
187 |         response_data = result["json"]["result"]
188 |         content = response_data.get("content", [{}])[0].get("text", "")
189 |         
190 |         try:
191 |             parsed_content = json.loads(content)
192 |             
193 |             if "error" in parsed_content:
194 |                 return {
195 |                     "success": False,
196 |                     "error": parsed_content["error"]
197 |                 }
198 |             
199 |             suggestions = parsed_content.get("data", [])
200 |             
201 |             result_data = {
202 |                 "success": True,
203 |                 "count": len(suggestions),
204 |                 "suggestions": suggestions[:3],  # Keep first 3 for display
205 |                 "has_required_fields": all(
206 |                     "id" in suggestion and "name" in suggestion 
207 |                     for suggestion in suggestions
208 |                 )
209 |             }
210 |             
211 |             print(f"   ✅ Found {len(suggestions)} suggestions")
212 |             for suggestion in suggestions[:3]:
213 |                 print(f"      • {suggestion.get('name', 'N/A')} (ID: {suggestion.get('id', 'N/A')})")
214 |             
215 |             return result_data
216 |             
217 |         except json.JSONDecodeError:
218 |             return {
219 |                 "success": False,
220 |                 "error": "Invalid JSON response",
221 |                 "raw_content": content
222 |             }
223 | 
224 |     def test_validate_interests(self) -> Dict[str, Any]:
225 |         """Test validate_interests functionality"""
226 |         
227 |         print(f"\n🔍 Testing validate_interests function")
228 |         
229 |         # Test with known valid and invalid interest names
230 |         test_interests = ["Japan", "Basketball", "invalidinterestname12345"]
231 |         print(f"   🔎 Validating interests: {test_interests}")
232 |         
233 |         result = self._make_request("tools/call", {
234 |             "name": "validate_interests",
235 |             "arguments": {
236 |                 "interest_list": test_interests
237 |             }
238 |         })
239 |         
240 |         if not result["success"]:
241 |             return {
242 |                 "success": False,
243 |                 "error": result.get("text", "Unknown error")
244 |             }
245 |         
246 |         # Parse response
247 |         response_data = result["json"]["result"]
248 |         content = response_data.get("content", [{}])[0].get("text", "")
249 |         
250 |         try:
251 |             parsed_content = json.loads(content)
252 |             
253 |             if "error" in parsed_content:
254 |                 return {
255 |                     "success": False,
256 |                     "error": parsed_content["error"]
257 |                 }
258 |             
259 |             validations = parsed_content.get("data", [])
260 |             
261 |             result_data = {
262 |                 "success": True,
263 |                 "count": len(validations),
264 |                 "validations": validations,
265 |                 "has_valid_interests": any(
266 |                     validation.get("valid", False) for validation in validations
267 |                 ),
268 |                 "has_invalid_interests": any(
269 |                     not validation.get("valid", True) for validation in validations
270 |                 )
271 |             }
272 |             
273 |             print(f"   ✅ Validated {len(validations)} interests")
274 |             for validation in validations:
275 |                 status = "✅" if validation.get("valid") else "❌"
276 |                 print(f"      {status} {validation.get('name', 'N/A')}")
277 |             
278 |             return result_data
279 |             
280 |         except json.JSONDecodeError:
281 |             return {
282 |                 "success": False,
283 |                 "error": "Invalid JSON response",
284 |                 "raw_content": content
285 |             }
286 | 
287 |     def test_search_behaviors(self) -> Dict[str, Any]:
288 |         """Test search_behaviors functionality"""
289 |         
290 |         print(f"\n🔍 Testing search_behaviors function")
291 |         
292 |         result = self._make_request("tools/call", {
293 |             "name": "search_behaviors",
294 |             "arguments": {
295 |                 "limit": 5
296 |             }
297 |         })
298 |         
299 |         if not result["success"]:
300 |             return {
301 |                 "success": False,
302 |                 "error": result.get("text", "Unknown error")
303 |             }
304 |         
305 |         # Parse response
306 |         response_data = result["json"]["result"]
307 |         content = response_data.get("content", [{}])[0].get("text", "")
308 |         
309 |         try:
310 |             parsed_content = json.loads(content)
311 |             
312 |             if "error" in parsed_content:
313 |                 return {
314 |                     "success": False,
315 |                     "error": parsed_content["error"]
316 |                 }
317 |             
318 |             behaviors = parsed_content.get("data", [])
319 |             
320 |             result_data = {
321 |                 "success": True,
322 |                 "count": len(behaviors),
323 |                 "behaviors": behaviors[:3],  # Keep first 3 for display
324 |                 "has_required_fields": all(
325 |                     "id" in behavior and "name" in behavior 
326 |                     for behavior in behaviors
327 |                 )
328 |             }
329 |             
330 |             print(f"   ✅ Found {len(behaviors)} behaviors")
331 |             for behavior in behaviors[:3]:
332 |                 print(f"      • {behavior.get('name', 'N/A')} (ID: {behavior.get('id', 'N/A')})")
333 |             
334 |             return result_data
335 |             
336 |         except json.JSONDecodeError:
337 |             return {
338 |                 "success": False,
339 |                 "error": "Invalid JSON response",
340 |                 "raw_content": content
341 |             }
342 | 
343 |     def test_search_demographics(self) -> Dict[str, Any]:
344 |         """Test search_demographics functionality"""
345 |         
346 |         print(f"\n🔍 Testing search_demographics function")
347 |         results = {}
348 |         
349 |         for demo_class in self.test_queries["demographics"]:
350 |             print(f"   🔎 Searching demographics class: '{demo_class}'")
351 |             
352 |             result = self._make_request("tools/call", {
353 |                 "name": "search_demographics",
354 |                 "arguments": {
355 |                     "demographic_class": demo_class,
356 |                     "limit": 3
357 |                 }
358 |             })
359 |             
360 |             if not result["success"]:
361 |                 results[demo_class] = {
362 |                     "success": False,
363 |                     "error": result.get("text", "Unknown error")
364 |                 }
365 |                 print(f"   ❌ Failed: {result.get('text', 'Unknown error')}")
366 |                 continue
367 |             
368 |             # Parse response
369 |             response_data = result["json"]["result"]
370 |             content = response_data.get("content", [{}])[0].get("text", "")
371 |             
372 |             try:
373 |                 parsed_content = json.loads(content)
374 |                 
375 |                 if "error" in parsed_content:
376 |                     results[demo_class] = {
377 |                         "success": False,
378 |                         "error": parsed_content["error"]
379 |                     }
380 |                     print(f"   ❌ API Error: {parsed_content['error']}")
381 |                     continue
382 |                 
383 |                 demographics = parsed_content.get("data", [])
384 |                 
385 |                 results[demo_class] = {
386 |                     "success": True,
387 |                     "count": len(demographics),
388 |                     "demographics": demographics[:2],  # Keep first 2 for display
389 |                     "has_required_fields": all(
390 |                         "id" in demo and "name" in demo 
391 |                         for demo in demographics
392 |                     )
393 |                 }
394 |                 
395 |                 print(f"   ✅ Found {len(demographics)} {demo_class}")
396 |                 for demo in demographics[:2]:
397 |                     print(f"      • {demo.get('name', 'N/A')} (ID: {demo.get('id', 'N/A')})")
398 |                 
399 |             except json.JSONDecodeError:
400 |                 results[demo_class] = {
401 |                     "success": False,
402 |                     "error": "Invalid JSON response",
403 |                     "raw_content": content
404 |                 }
405 |                 print(f"   ❌ Invalid JSON: {content}")
406 |         
407 |         return results
408 | 
409 |     def test_search_geo_locations(self) -> Dict[str, Any]:
410 |         """Test search_geo_locations functionality"""
411 |         
412 |         print(f"\n🔍 Testing search_geo_locations function")
413 |         results = {}
414 |         
415 |         for query in self.test_queries["geo_locations"]:
416 |             print(f"   🔎 Searching for locations: '{query}'")
417 |             
418 |             result = self._make_request("tools/call", {
419 |                 "name": "search_geo_locations",
420 |                 "arguments": {
421 |                     "query": query,
422 |                     "location_types": ["country", "region", "city"],
423 |                     "limit": 3
424 |                 }
425 |             })
426 |             
427 |             if not result["success"]:
428 |                 results[query] = {
429 |                     "success": False,
430 |                     "error": result.get("text", "Unknown error")
431 |                 }
432 |                 print(f"   ❌ Failed: {result.get('text', 'Unknown error')}")
433 |                 continue
434 |             
435 |             # Parse response
436 |             response_data = result["json"]["result"]
437 |             content = response_data.get("content", [{}])[0].get("text", "")
438 |             
439 |             try:
440 |                 parsed_content = json.loads(content)
441 |                 
442 |                 if "error" in parsed_content:
443 |                     results[query] = {
444 |                         "success": False,
445 |                         "error": parsed_content["error"]
446 |                     }
447 |                     print(f"   ❌ API Error: {parsed_content['error']}")
448 |                     continue
449 |                 
450 |                 locations = parsed_content.get("data", [])
451 |                 
452 |                 results[query] = {
453 |                     "success": True,
454 |                     "count": len(locations),
455 |                     "locations": locations[:3],  # Keep first 3 for display
456 |                     "has_required_fields": all(
457 |                         "key" in location and "name" in location and "type" in location
458 |                         for location in locations
459 |                     )
460 |                 }
461 |                 
462 |                 print(f"   ✅ Found {len(locations)} locations")
463 |                 for location in locations[:3]:
464 |                     print(f"      • {location.get('name', 'N/A')} ({location.get('type', 'N/A')}, Key: {location.get('key', 'N/A')})")
465 |                 
466 |             except json.JSONDecodeError:
467 |                 results[query] = {
468 |                     "success": False,
469 |                     "error": "Invalid JSON response",
470 |                     "raw_content": content
471 |                 }
472 |                 print(f"   ❌ Invalid JSON: {content}")
473 |         
474 |         return results
475 | 
476 |     def run_targeting_search_tests(self) -> bool:
477 |         """Run comprehensive targeting search tests"""
478 |         
479 |         print("🚀 Meta Ads Targeting Search End-to-End Test Suite")
480 |         print("="*60)
481 |         
482 |         # Check server availability
483 |         try:
484 |             response = requests.get(f"{self.base_url}/", timeout=5)
485 |             server_running = response.status_code in [200, 404]
486 |         except:
487 |             server_running = False
488 |         
489 |         if not server_running:
490 |             print("❌ Server is not running at", self.base_url)
491 |             print("   Please start the server with:")
492 |             print("   python3 -m meta_ads_mcp --transport streamable-http --port 8080")
493 |             return False
494 |         
495 |         print("✅ Server is running")
496 |         print("🔐 Using implicit authentication from server")
497 |         
498 |         # Test 1: Search Interests
499 |         print("\n" + "="*60)
500 |         print("📋 PHASE 1: Testing Interest Search")
501 |         print("="*60)
502 |         
503 |         interests_results = self.test_search_interests()
504 |         interests_success = any(
505 |             result.get("success") and result.get("count", 0) > 0 
506 |             for result in interests_results.values()
507 |         )
508 |         
509 |         # Test 2: Interest Suggestions
510 |         print("\n" + "="*60)
511 |         print("📋 PHASE 2: Testing Interest Suggestions")
512 |         print("="*60)
513 |         
514 |         suggestions_result = self.test_get_interest_suggestions()
515 |         suggestions_success = suggestions_result.get("success") and suggestions_result.get("count", 0) > 0
516 |         
517 |         # Test 3: Interest Validation
518 |         print("\n" + "="*60)
519 |         print("📋 PHASE 3: Testing Interest Validation")
520 |         print("="*60)
521 |         
522 |         validation_result = self.test_validate_interests()
523 |         validation_success = (validation_result.get("success") and 
524 |                             validation_result.get("has_valid_interests") and 
525 |                             validation_result.get("has_invalid_interests"))
526 |         
527 |         # Test 4: Behavior Search
528 |         print("\n" + "="*60)
529 |         print("📋 PHASE 4: Testing Behavior Search")
530 |         print("="*60)
531 |         
532 |         behaviors_result = self.test_search_behaviors()
533 |         behaviors_success = behaviors_result.get("success") and behaviors_result.get("count", 0) > 0
534 |         
535 |         # Test 5: Demographics Search
536 |         print("\n" + "="*60)
537 |         print("📋 PHASE 5: Testing Demographics Search")
538 |         print("="*60)
539 |         
540 |         demographics_results = self.test_search_demographics()
541 |         demographics_success = any(
542 |             result.get("success") and result.get("count", 0) > 0 
543 |             for result in demographics_results.values()
544 |         )
545 |         
546 |         # Test 6: Geo Location Search
547 |         print("\n" + "="*60)
548 |         print("📋 PHASE 6: Testing Geo Location Search")
549 |         print("="*60)
550 |         
551 |         geo_results = self.test_search_geo_locations()
552 |         geo_success = any(
553 |             result.get("success") and result.get("count", 0) > 0 
554 |             for result in geo_results.values()
555 |         )
556 |         
557 |         # Final assessment
558 |         print("\n" + "="*60)
559 |         print("📊 FINAL RESULTS")
560 |         print("="*60)
561 |         
562 |         all_tests = [
563 |             ("Interest Search", interests_success),
564 |             ("Interest Suggestions", suggestions_success),
565 |             ("Interest Validation", validation_success),
566 |             ("Behavior Search", behaviors_success),
567 |             ("Demographics Search", demographics_success),
568 |             ("Geo Location Search", geo_success)
569 |         ]
570 |         
571 |         passed_tests = sum(1 for _, success in all_tests if success)
572 |         total_tests = len(all_tests)
573 |         
574 |         for test_name, success in all_tests:
575 |             status = "✅ PASSED" if success else "❌ FAILED"
576 |             print(f"   • {test_name}: {status}")
577 |         
578 |         overall_success = passed_tests >= 4  # At least 4 out of 6 tests should pass
579 |         
580 |         if overall_success:
581 |             print(f"\n✅ Targeting search tests: SUCCESS ({passed_tests}/{total_tests} passed)")
582 |             print("   • Core targeting search functionality is working")
583 |             print("   • Meta Ads API integration is functional")
584 |             return True
585 |         else:
586 |             print(f"\n❌ Targeting search tests: FAILED ({passed_tests}/{total_tests} passed)")
587 |             print("   • Some targeting search functions are not working properly")
588 |             return False
589 | 
590 | 
591 | def main():
592 |     """Main test execution"""
593 |     tester = TargetingSearchTester()
594 |     success = tester.run_targeting_search_tests()
595 |     
596 |     if success:
597 |         print("\n🎉 All targeting search tests passed!")
598 |     else:
599 |         print("\n⚠️  Some targeting search tests failed - see details above")
600 |     
601 |     sys.exit(0 if success else 1)
602 | 
603 | 
604 | if __name__ == "__main__":
605 |     main() 
```

--------------------------------------------------------------------------------
/tests/test_insights_actions_and_values_e2e.py:
--------------------------------------------------------------------------------

```python
  1 | #!/usr/bin/env python3
  2 | """
  3 | Unit Tests for Insights Actions and Action Values Functionality
  4 | 
  5 | This test suite validates the actions and action_values field implementation for the
  6 | get_insights function in meta_ads_mcp/core/insights.py.
  7 | 
  8 | Test cases cover:
  9 | - Actions and action_values field inclusion in API requests
 10 | - Different levels of aggregation (ad, adset, campaign, account)
 11 | - Time range handling with actions and action_values
 12 | - Error handling and validation
 13 | - Purchase data extraction from actions and action_values
 14 | """
 15 | 
 16 | import pytest
 17 | import json
 18 | import asyncio
 19 | import requests
 20 | from unittest.mock import AsyncMock, patch, MagicMock
 21 | from typing import Dict, Any, List
 22 | 
 23 | # Import the function to test
 24 | from meta_ads_mcp.core.insights import get_insights
 25 | 
 26 | 
 27 | class TestInsightsActionsAndValues:
 28 |     """Test suite for actions and action_values in insights"""
 29 |     
 30 |     @pytest.fixture
 31 |     def mock_api_request(self):
 32 |         """Mock for the make_api_request function"""
 33 |         with patch('meta_ads_mcp.core.insights.make_api_request') as mock:
 34 |             mock.return_value = {
 35 |                 "data": [
 36 |                     {
 37 |                         "campaign_id": "test_campaign_id",
 38 |                         "campaign_name": "Test Campaign",
 39 |                         "impressions": "1000",
 40 |                         "clicks": "50",
 41 |                         "spend": "100.00",
 42 |                         "actions": [
 43 |                             {"action_type": "purchase", "value": "5"},
 44 |                             {"action_type": "lead", "value": "3"},
 45 |                             {"action_type": "view_content", "value": "20"}
 46 |                         ],
 47 |                         "action_values": [
 48 |                             {"action_type": "purchase", "value": "500.00"},
 49 |                             {"action_type": "lead", "value": "150.00"},
 50 |                             {"action_type": "view_content", "value": "0.00"}
 51 |                         ],
 52 |                         "cost_per_action_type": [
 53 |                             {"action_type": "purchase", "value": "20.00"},
 54 |                             {"action_type": "lead", "value": "33.33"}
 55 |                         ]
 56 |                     }
 57 |                 ],
 58 |                 "paging": {}
 59 |             }
 60 |             yield mock
 61 |     
 62 |     @pytest.fixture
 63 |     def mock_auth_manager(self):
 64 |         """Mock for the authentication manager"""
 65 |         with patch('meta_ads_mcp.core.api.auth_manager') as mock, \
 66 |              patch('meta_ads_mcp.core.auth.get_current_access_token') as mock_get_token:
 67 |             # Mock a valid access token
 68 |             mock.get_current_access_token.return_value = "test_access_token"
 69 |             mock.is_token_valid.return_value = True
 70 |             mock.app_id = "test_app_id"
 71 |             mock_get_token.return_value = "test_access_token"
 72 |             yield mock
 73 |     
 74 |     @pytest.fixture
 75 |     def valid_campaign_id(self):
 76 |         """Valid campaign ID for testing"""
 77 |         return "123456789"
 78 |     
 79 |     @pytest.fixture
 80 |     def valid_account_id(self):
 81 |         """Valid account ID for testing"""
 82 |         return "act_701351919139047"
 83 |     
 84 |     @pytest.mark.asyncio
 85 |     async def test_actions_and_action_values_included_in_fields(self, mock_api_request, mock_auth_manager, valid_campaign_id):
 86 |         """Test that actions and action_values are included in the fields parameter"""
 87 |         
 88 |         result = await get_insights(
 89 |             object_id=valid_campaign_id,
 90 |             time_range="last_30d",
 91 |             level="campaign"
 92 |         )
 93 |         
 94 |         # Parse the result
 95 |         result_data = json.loads(result)
 96 |         
 97 |         # Verify the API was called with correct parameters
 98 |         mock_api_request.assert_called_once()
 99 |         call_args = mock_api_request.call_args
100 |         
101 |         # Check that the endpoint is correct (first argument)
102 |         assert call_args[0][0] == f"{valid_campaign_id}/insights"
103 |         
104 |         # Check that actions and action_values are included in fields parameter
105 |         params = call_args[0][2]  # Third positional argument is params
106 |         assert 'fields' in params
107 |         
108 |         fields = params['fields']
109 |         assert 'actions' in fields
110 |         assert 'action_values' in fields
111 |         assert 'cost_per_action_type' in fields
112 |         
113 |         # Verify the response structure
114 |         assert 'data' in result_data
115 |         assert len(result_data['data']) > 0
116 |         assert 'actions' in result_data['data'][0]
117 |         assert 'action_values' in result_data['data'][0]
118 |     
119 |     @pytest.mark.asyncio
120 |     async def test_purchase_data_extraction(self, mock_api_request, mock_auth_manager, valid_campaign_id):
121 |         """Test that purchase data can be extracted from actions and action_values"""
122 |         
123 |         result = await get_insights(
124 |             object_id=valid_campaign_id,
125 |             time_range="last_30d",
126 |             level="campaign"
127 |         )
128 |         
129 |         # Parse the result
130 |         result_data = json.loads(result)
131 |         
132 |         # Get the first data point
133 |         data_point = result_data['data'][0]
134 |         
135 |         # Extract purchase data from actions
136 |         actions = data_point.get('actions', [])
137 |         purchase_actions = [action for action in actions if action.get('action_type') == 'purchase']
138 |         
139 |         # Extract purchase data from action_values
140 |         action_values = data_point.get('action_values', [])
141 |         purchase_values = [action_value for action_value in action_values if action_value.get('action_type') == 'purchase']
142 |         
143 |         # Verify purchase data exists
144 |         assert len(purchase_actions) > 0, "No purchase actions found"
145 |         assert len(purchase_values) > 0, "No purchase action_values found"
146 |         
147 |         # Verify purchase data values
148 |         purchase_count = purchase_actions[0].get('value')
149 |         purchase_value = purchase_values[0].get('value')
150 |         
151 |         assert purchase_count == "5", f"Expected purchase count 5, got {purchase_count}"
152 |         assert purchase_value == "500.00", f"Expected purchase value 500.00, got {purchase_value}"
153 |     
154 |     @pytest.mark.asyncio
155 |     async def test_actions_at_adset_level(self, mock_api_request, mock_auth_manager, valid_campaign_id):
156 |         """Test actions and action_values at adset level"""
157 |         
158 |         result = await get_insights(
159 |             object_id=valid_campaign_id,
160 |             time_range="last_30d",
161 |             level="adset"
162 |         )
163 |         
164 |         # Parse the result
165 |         result_data = json.loads(result)
166 |         
167 |         # Verify the API was called with correct parameters
168 |         mock_api_request.assert_called_once()
169 |         call_args = mock_api_request.call_args
170 |         
171 |         # Check that the level parameter is correct
172 |         params = call_args[0][2]
173 |         assert params['level'] == 'adset'
174 |         
175 |         # Verify the response structure
176 |         assert 'data' in result_data
177 |     
178 |     @pytest.mark.asyncio
179 |     async def test_actions_at_ad_level(self, mock_api_request, mock_auth_manager, valid_campaign_id):
180 |         """Test actions and action_values at ad level"""
181 |         
182 |         result = await get_insights(
183 |             object_id=valid_campaign_id,
184 |             time_range="last_30d",
185 |             level="ad"
186 |         )
187 |         
188 |         # Parse the result
189 |         result_data = json.loads(result)
190 |         
191 |         # Verify the API was called with correct parameters
192 |         mock_api_request.assert_called_once()
193 |         call_args = mock_api_request.call_args
194 |         
195 |         # Check that the level parameter is correct
196 |         params = call_args[0][2]
197 |         assert params['level'] == 'ad'
198 |         
199 |         # Verify the response structure
200 |         assert 'data' in result_data
201 |     
202 |     @pytest.mark.asyncio
203 |     async def test_actions_with_custom_time_range(self, mock_api_request, mock_auth_manager, valid_campaign_id):
204 |         """Test actions and action_values with custom time range"""
205 |         
206 |         custom_time_range = {"since": "2024-01-01", "until": "2024-01-31"}
207 |         
208 |         result = await get_insights(
209 |             object_id=valid_campaign_id,
210 |             time_range=custom_time_range,
211 |             level="campaign"
212 |         )
213 |         
214 |         # Parse the result
215 |         result_data = json.loads(result)
216 |         
217 |         # Verify the API was called with correct parameters
218 |         mock_api_request.assert_called_once()
219 |         call_args = mock_api_request.call_args
220 |         
221 |         # Check that time_range is properly formatted
222 |         params = call_args[0][2]
223 |         assert 'time_range' in params
224 |         assert params['time_range'] == json.dumps(custom_time_range)
225 |         
226 |         # Verify the response structure
227 |         assert 'data' in result_data
228 |     
229 |     @pytest.mark.asyncio
230 |     async def test_actions_with_breakdown(self, mock_api_request, mock_auth_manager, valid_campaign_id):
231 |         """Test actions and action_values with breakdown dimension"""
232 |         
233 |         result = await get_insights(
234 |             object_id=valid_campaign_id,
235 |             time_range="last_30d",
236 |             level="campaign",
237 |             breakdown="age"
238 |         )
239 |         
240 |         # Parse the result
241 |         result_data = json.loads(result)
242 |         
243 |         # Verify the API was called with correct parameters
244 |         mock_api_request.assert_called_once()
245 |         call_args = mock_api_request.call_args
246 |         
247 |         # Check that breakdown is included
248 |         params = call_args[0][2]
249 |         assert 'breakdowns' in params
250 |         assert params['breakdowns'] == 'age'
251 |         
252 |         # Verify the response structure
253 |         assert 'data' in result_data
254 |     
255 |     @pytest.mark.asyncio
256 |     async def test_actions_without_object_id(self, mock_api_request, mock_auth_manager):
257 |         """Test error handling when no object_id is provided"""
258 |         
259 |         result = await get_insights(
260 |             time_range="last_30d",
261 |             level="campaign"
262 |         )
263 |         
264 |         # Parse the result. The decorator returns a dict error for missing required args
265 |         if isinstance(result, dict):
266 |             result_data = result
267 |         else:
268 |             result_data = json.loads(result)
269 | 
270 |         assert 'error' in result_data
271 |         assert "missing 1 required positional argument: 'object_id'" in result_data['error']
272 |         
273 |         # Verify API was not called
274 |         mock_api_request.assert_not_called()
275 |     
276 |     @pytest.mark.asyncio
277 |     async def test_actions_with_invalid_time_range(self, mock_api_request, mock_auth_manager, valid_campaign_id):
278 |         """Test error handling with invalid time range"""
279 |         
280 |         invalid_time_range = {"since": "2024-01-01"}  # Missing "until"
281 |         
282 |         result = await get_insights(
283 |             object_id=valid_campaign_id,
284 |             time_range=invalid_time_range,
285 |             level="campaign"
286 |         )
287 |         
288 |         # Parse the result
289 |         result_data = json.loads(result)
290 |         
291 |         # The error response is wrapped in a 'data' field
292 |         if 'data' in result_data:
293 |             error_data = json.loads(result_data['data'])
294 |             assert 'error' in error_data
295 |             assert 'since' in error_data['error'] and 'until' in error_data['error']
296 |         else:
297 |             assert 'error' in result_data
298 |             assert 'since' in result_data['error'] and 'until' in result_data['error']
299 |         
300 |         # Verify API was not called
301 |         mock_api_request.assert_not_called()
302 |     
303 |     @pytest.mark.asyncio
304 |     async def test_actions_api_error_handling(self, mock_api_request, mock_auth_manager, valid_campaign_id):
305 |         """Test error handling when API call fails"""
306 |         
307 |         # Mock API to raise an exception
308 |         mock_api_request.side_effect = Exception("API Error")
309 |         
310 |         result = await get_insights(
311 |             object_id=valid_campaign_id,
312 |             time_range="last_30d",
313 |             level="campaign"
314 |         )
315 |         
316 |         # Parse the result
317 |         # The API error handling returns a dict directly, not a JSON string
318 |         if isinstance(result, dict):
319 |             result_data = result
320 |         else:
321 |             result_data = json.loads(result)
322 |         
323 |         # Verify error response
324 |         assert 'error' in result_data
325 |         assert 'API Error' in result_data['error']
326 |     
327 |     @pytest.mark.asyncio
328 |     async def test_actions_fields_completeness(self, mock_api_request, mock_auth_manager, valid_campaign_id):
329 |         """Test that all required fields are included in the request"""
330 |         
331 |         result = await get_insights(
332 |             object_id=valid_campaign_id,
333 |             time_range="last_30d",
334 |             level="campaign"
335 |         )
336 |         
337 |         # Verify the API was called with correct parameters
338 |         mock_api_request.assert_called_once()
339 |         call_args = mock_api_request.call_args
340 |         
341 |         # Check that all required fields are included
342 |         params = call_args[0][2]
343 |         fields = params['fields']
344 |         
345 |         # Required fields for actions and action_values
346 |         required_fields = [
347 |             'account_id', 'account_name', 'campaign_id', 'campaign_name',
348 |             'adset_id', 'adset_name', 'ad_id', 'ad_name',
349 |             'impressions', 'clicks', 'spend', 'cpc', 'cpm', 'ctr',
350 |             'reach', 'frequency', 'actions', 'action_values', 'conversions',
351 |             'unique_clicks', 'cost_per_action_type'
352 |         ]
353 |         
354 |         for field in required_fields:
355 |             assert field in fields, f"Field '{field}' not found in fields parameter"
356 |     
357 |     @pytest.mark.asyncio
358 |     async def test_multiple_action_types(self, mock_api_request, mock_auth_manager, valid_campaign_id):
359 |         """Test handling of multiple action types in the response"""
360 |         
361 |         result = await get_insights(
362 |             object_id=valid_campaign_id,
363 |             time_range="last_30d",
364 |             level="campaign"
365 |         )
366 |         
367 |         # Parse the result
368 |         result_data = json.loads(result)
369 |         
370 |         # Get the first data point
371 |         data_point = result_data['data'][0]
372 |         
373 |         # Check that multiple action types are present
374 |         actions = data_point.get('actions', [])
375 |         action_types = [action.get('action_type') for action in actions]
376 |         
377 |         assert 'purchase' in action_types, "Purchase action type not found"
378 |         assert 'lead' in action_types, "Lead action type not found"
379 |         assert 'view_content' in action_types, "View content action type not found"
380 |         
381 |         # Check action_values has corresponding entries
382 |         action_values = data_point.get('action_values', [])
383 |         action_value_types = [action_value.get('action_type') for action_value in action_values]
384 |         
385 |         assert 'purchase' in action_value_types, "Purchase action_value type not found"
386 |         assert 'lead' in action_value_types, "Lead action_value type not found"
387 | 
388 | 
389 | @pytest.mark.e2e
390 | @pytest.mark.skip(reason="E2E test - run manually only")
391 | class TestInsightsActionsAndValuesE2E:
392 |     """E2E tests for actions and action_values via MCP HTTP server"""
393 |     
394 |     def __init__(self, base_url: str = "http://localhost:8080"):
395 |         self.base_url = base_url.rstrip('/')
396 |         self.endpoint = f"{self.base_url}/mcp/"
397 |         self.request_id = 1
398 |         # Default account from workspace rules
399 |         self.account_id = "act_701351919139047"
400 | 
401 |     def _make_request(self, method: str, params: Dict[str, Any] = None) -> Dict[str, Any]:
402 |         headers = {
403 |             "Content-Type": "application/json",
404 |             "Accept": "application/json, text/event-stream",
405 |             "User-Agent": "Insights-E2E-Test-Client/1.0"
406 |         }
407 |         payload = {
408 |             "jsonrpc": "2.0",
409 |             "method": method,
410 |             "id": self.request_id
411 |         }
412 |         if params:
413 |             payload["params"] = params
414 |         try:
415 |             resp = requests.post(self.endpoint, headers=headers, json=payload, timeout=30)
416 |             self.request_id += 1
417 |             return {
418 |                 "status_code": resp.status_code,
419 |                 "json": resp.json() if resp.status_code == 200 else None,
420 |                 "text": resp.text,
421 |                 "success": resp.status_code == 200
422 |             }
423 |         except requests.exceptions.RequestException as e:
424 |             return {"status_code": 0, "json": None, "text": str(e), "success": False, "error": str(e)}
425 | 
426 |     def _check_for_errors(self, parsed_content: Dict[str, Any]) -> Dict[str, Any]:
427 |         if "data" in parsed_content:
428 |             data = parsed_content["data"]
429 |             if isinstance(data, dict) and 'error' in data:
430 |                 return {"has_error": True, "error_message": data['error'], "format": "wrapped_dict"}
431 |             if isinstance(data, str):
432 |                 try:
433 |                     error_data = json.loads(data)
434 |                     if 'error' in error_data:
435 |                         return {"has_error": True, "error_message": error_data['error'], "format": "wrapped_json"}
436 |                 except json.JSONDecodeError:
437 |                     pass
438 |         if 'error' in parsed_content:
439 |             return {"has_error": True, "error_message": parsed_content['error'], "format": "direct"}
440 |         return {"has_error": False}
441 | 
442 |     def test_tool_exists_and_has_required_fields(self):
443 |         result = self._make_request("tools/list", {})
444 |         assert result["success"], f"tools/list failed: {result.get('text', '')}"
445 |         tools = result["json"]["result"].get("tools", [])
446 |         tool = next((t for t in tools if t["name"] == "get_insights"), None)
447 |         assert tool is not None, "get_insights tool not found"
448 |         props = tool.get("inputSchema", {}).get("properties", {})
449 |         for req in ["object_id", "time_range", "breakdown", "level"]:
450 |             assert req in props, f"Missing parameter in schema: {req}"
451 | 
452 |     def test_get_insights_account_level(self):
453 |         params = {
454 |             "name": "get_insights",
455 |             "arguments": {
456 |                 "object_id": self.account_id,
457 |                 "time_range": "last_30d",
458 |                 "level": "account"
459 |             }
460 |         }
461 |         result = self._make_request("tools/call", params)
462 |         assert result["success"], f"tools/call failed: {result.get('text', '')}"
463 |         response_data = result["json"]["result"]
464 |         content = response_data.get("content", [{}])[0].get("text", "")
465 |         parsed = json.loads(content)
466 |         err = self._check_for_errors(parsed)
467 |         # Don't fail if auth or permissions block; just assert structure is parsable
468 |         if err["has_error"]:
469 |             assert isinstance(err["error_message"], (str, dict))
470 |         else:
471 |             # Expect data list on success
472 |             data = parsed.get("data") if isinstance(parsed, dict) else None
473 |             assert data is not None
474 | 
475 | def main():
476 |     tester = TestInsightsActionsAndValuesE2E()
477 |     print("🚀 Insights Actions E2E (manual)")
478 |     # Basic smoke run
479 |     try:
480 |         tester.test_tool_exists_and_has_required_fields()
481 |         print("✅ Tool schema ok")
482 |         tester.test_get_insights_account_level()
483 |         print("✅ Account-level insights request executed")
484 |     except AssertionError as e:
485 |         print(f"❌ Assertion failed: {e}")
486 |     except Exception as e:
487 |         print(f"❌ Error: {e}")
488 | 
489 | if __name__ == "__main__":
490 |     main()
491 | 
492 | 
493 | def extract_purchase_data(insights_data):
494 |     """
495 |     Helper function to extract purchase data from insights response.
496 |     
497 |     Args:
498 |         insights_data: The data array from insights response
499 |         
500 |     Returns:
501 |         dict: Dictionary with purchase count and value
502 |     """
503 |     if not insights_data or len(insights_data) == 0:
504 |         return {"purchase_count": 0, "purchase_value": 0.0}
505 |     
506 |     data_point = insights_data[0]
507 |     
508 |     # Extract purchase count from actions
509 |     actions = data_point.get('actions', [])
510 |     purchase_actions = [action for action in actions if action.get('action_type') == 'purchase']
511 |     purchase_count = int(purchase_actions[0].get('value', 0)) if purchase_actions else 0
512 |     
513 |     # Extract purchase value from action_values
514 |     action_values = data_point.get('action_values', [])
515 |     purchase_values = [action_value for action_value in action_values if action_value.get('action_type') == 'purchase']
516 |     purchase_value = float(purchase_values[0].get('value', 0)) if purchase_values else 0.0
517 |     
518 |     return {
519 |         "purchase_count": purchase_count,
520 |         "purchase_value": purchase_value
521 |     }
522 | 
523 | 
524 | class TestPurchaseDataExtraction:
525 |     """Test suite for purchase data extraction helper function"""
526 |     
527 |     def test_extract_purchase_data_with_purchases(self):
528 |         """Test extraction when purchase data is present"""
529 |         insights_data = [{
530 |             "actions": [
531 |                 {"action_type": "purchase", "value": "5"},
532 |                 {"action_type": "lead", "value": "3"}
533 |             ],
534 |             "action_values": [
535 |                 {"action_type": "purchase", "value": "500.00"},
536 |                 {"action_type": "lead", "value": "150.00"}
537 |             ]
538 |         }]
539 |         
540 |         result = extract_purchase_data(insights_data)
541 |         
542 |         assert result["purchase_count"] == 5
543 |         assert result["purchase_value"] == 500.0
544 |     
545 |     def test_extract_purchase_data_without_purchases(self):
546 |         """Test extraction when no purchase data is present"""
547 |         insights_data = [{
548 |             "actions": [
549 |                 {"action_type": "lead", "value": "3"},
550 |                 {"action_type": "view_content", "value": "20"}
551 |             ],
552 |             "action_values": [
553 |                 {"action_type": "lead", "value": "150.00"},
554 |                 {"action_type": "view_content", "value": "0.00"}
555 |             ]
556 |         }]
557 |         
558 |         result = extract_purchase_data(insights_data)
559 |         
560 |         assert result["purchase_count"] == 0
561 |         assert result["purchase_value"] == 0.0
562 |     
563 |     def test_extract_purchase_data_empty_data(self):
564 |         """Test extraction with empty data"""
565 |         insights_data = []
566 |         
567 |         result = extract_purchase_data(insights_data)
568 |         
569 |         assert result["purchase_count"] == 0
570 |         assert result["purchase_value"] == 0.0 
```

--------------------------------------------------------------------------------
/meta_ads_mcp/core/auth.py:
--------------------------------------------------------------------------------

```python
  1 | """Authentication related functionality for Meta Ads API."""
  2 | 
  3 | from typing import Any, Dict, Optional
  4 | import time
  5 | import platform
  6 | import pathlib
  7 | import os
  8 | import webbrowser
  9 | import asyncio
 10 | import json
 11 | from .utils import logger
 12 | import requests
 13 | 
 14 | # Import from the new callback server module
 15 | from .callback_server import (
 16 |     start_callback_server,
 17 |     shutdown_callback_server,
 18 |     token_container,
 19 |     callback_server_port
 20 | )
 21 | 
 22 | # Import the new Pipeboard authentication
 23 | from .pipeboard_auth import pipeboard_auth_manager
 24 | 
 25 | # Auth constants
 26 | # Scope includes pages_show_list and pages_read_engagement to fix issue #16
 27 | # where get_account_pages failed for regular users due to missing page permissions
 28 | AUTH_SCOPE = "business_management,public_profile,pages_show_list,pages_read_engagement"
 29 | AUTH_REDIRECT_URI = "http://localhost:8888/callback"
 30 | AUTH_RESPONSE_TYPE = "token"
 31 | 
 32 | # Log important configuration information
 33 | logger.info("Authentication module initialized")
 34 | logger.info(f"Auth scope: {AUTH_SCOPE}")
 35 | logger.info(f"Default redirect URI: {AUTH_REDIRECT_URI}")
 36 | 
 37 | # Global flag for authentication state
 38 | needs_authentication = False
 39 | 
 40 | # Meta configuration singleton
 41 | class MetaConfig:
 42 |     _instance = None
 43 |     
 44 |     def __new__(cls):
 45 |         if cls._instance is None:
 46 |             logger.debug("Creating new MetaConfig instance")
 47 |             cls._instance = super(MetaConfig, cls).__new__(cls)
 48 |             cls._instance.app_id = os.environ.get("META_APP_ID", "779761636818489")
 49 |             logger.info(f"MetaConfig initialized with app_id from env/default: {cls._instance.app_id}")
 50 |         return cls._instance
 51 |     
 52 |     def set_app_id(self, app_id):
 53 |         """Set the Meta App ID for API calls"""
 54 |         logger.info(f"Setting Meta App ID: {app_id}")
 55 |         self.app_id = app_id
 56 |         # Also update environment variable for modules that might read directly from it
 57 |         os.environ["META_APP_ID"] = app_id
 58 |         logger.debug(f"Updated META_APP_ID environment variable: {os.environ.get('META_APP_ID')}")
 59 |     
 60 |     def get_app_id(self):
 61 |         """Get the current Meta App ID"""
 62 |         # Check if we have one set
 63 |         if hasattr(self, 'app_id') and self.app_id:
 64 |             logger.debug(f"Using app_id from instance: {self.app_id}")
 65 |             return self.app_id
 66 |         
 67 |         # If not, try environment variable
 68 |         env_app_id = os.environ.get("META_APP_ID", "")
 69 |         if env_app_id:
 70 |             logger.debug(f"Using app_id from environment: {env_app_id}")
 71 |             # Update our instance for future use
 72 |             self.app_id = env_app_id
 73 |             return env_app_id
 74 |         
 75 |         logger.warning("No app_id found in instance or environment variables")
 76 |         return ""
 77 |     
 78 |     def is_configured(self):
 79 |         """Check if the Meta configuration is complete"""
 80 |         app_id = self.get_app_id()
 81 |         configured = bool(app_id)
 82 |         logger.debug(f"MetaConfig.is_configured() = {configured} (app_id: {app_id})")
 83 |         return configured
 84 | 
 85 | # Create singleton instance
 86 | meta_config = MetaConfig()
 87 | 
 88 | class TokenInfo:
 89 |     """Stores token information including expiration"""
 90 |     def __init__(self, access_token: str, expires_in: Optional[int] = None, user_id: Optional[str] = None):
 91 |         self.access_token = access_token
 92 |         self.expires_in = expires_in
 93 |         self.user_id = user_id
 94 |         self.created_at = int(time.time())
 95 |         logger.debug(f"TokenInfo created. Expires in: {expires_in if expires_in else 'Not specified'}")
 96 |     
 97 |     def is_expired(self) -> bool:
 98 |         """Check if the token is expired"""
 99 |         if not self.expires_in:
100 |             return False  # If no expiration is set, assume it's not expired
101 |         
102 |         current_time = int(time.time())
103 |         return current_time > (self.created_at + self.expires_in)
104 |     
105 |     def serialize(self) -> Dict[str, Any]:
106 |         """Convert to a dictionary for storage"""
107 |         return {
108 |             "access_token": self.access_token,
109 |             "expires_in": self.expires_in,
110 |             "user_id": self.user_id,
111 |             "created_at": self.created_at
112 |         }
113 |     
114 |     @classmethod
115 |     def deserialize(cls, data: Dict[str, Any]) -> 'TokenInfo':
116 |         """Create from a stored dictionary"""
117 |         token = cls(
118 |             access_token=data.get("access_token", ""),
119 |             expires_in=data.get("expires_in"),
120 |             user_id=data.get("user_id")
121 |         )
122 |         token.created_at = data.get("created_at", int(time.time()))
123 |         return token
124 | 
125 | 
126 | class AuthManager:
127 |     """Manages authentication with Meta APIs"""
128 |     def __init__(self, app_id: str, redirect_uri: str = AUTH_REDIRECT_URI):
129 |         self.app_id = app_id
130 |         self.redirect_uri = redirect_uri
131 |         self.token_info = None
132 |         # Check for Pipeboard token first
133 |         self.use_pipeboard = bool(os.environ.get("PIPEBOARD_API_TOKEN", ""))
134 |         if not self.use_pipeboard:
135 |             self._load_cached_token()
136 |     
137 |     def _get_token_cache_path(self) -> pathlib.Path:
138 |         """Get the platform-specific path for token cache file"""
139 |         if platform.system() == "Windows":
140 |             base_path = pathlib.Path(os.environ.get("APPDATA", ""))
141 |         elif platform.system() == "Darwin":  # macOS
142 |             base_path = pathlib.Path.home() / "Library" / "Application Support"
143 |         else:  # Assume Linux/Unix
144 |             base_path = pathlib.Path.home() / ".config"
145 |         
146 |         # Create directory if it doesn't exist
147 |         cache_dir = base_path / "meta-ads-mcp"
148 |         cache_dir.mkdir(parents=True, exist_ok=True)
149 |         
150 |         return cache_dir / "token_cache.json"
151 |     
152 |     def _load_cached_token(self) -> bool:
153 |         """Load token from cache if available"""
154 |         cache_path = self._get_token_cache_path()
155 |         
156 |         if not cache_path.exists():
157 |             return False
158 |         
159 |         try:
160 |             with open(cache_path, "r") as f:
161 |                 data = json.load(f)
162 |                 
163 |                 # Validate the cached data structure
164 |                 required_fields = ["access_token", "created_at"]
165 |                 if not all(field in data for field in required_fields):
166 |                     logger.warning("Cached token data is missing required fields")
167 |                     return False
168 |                 
169 |                 # Check if the token looks valid (basic format check)
170 |                 if not data.get("access_token") or len(data["access_token"]) < 20:
171 |                     logger.warning("Cached token appears malformed")
172 |                     return False
173 |                 
174 |                 self.token_info = TokenInfo.deserialize(data)
175 |                 
176 |                 # Check if token is expired
177 |                 if self.token_info.is_expired():
178 |                     logger.info("Cached token is expired, removing cache file")
179 |                     # Remove the expired cache file
180 |                     try:
181 |                         cache_path.unlink()
182 |                         logger.info(f"Removed expired token cache: {cache_path}")
183 |                     except Exception as e:
184 |                         logger.warning(f"Could not remove expired cache file: {e}")
185 |                     self.token_info = None
186 |                     return False
187 |                 
188 |                 # Additional validation: check if token is too old (more than 60 days)
189 |                 current_time = int(time.time())
190 |                 if self.token_info.created_at and (current_time - self.token_info.created_at) > (60 * 24 * 3600):
191 |                     logger.warning("Cached token is too old (more than 60 days), removing cache file")
192 |                     try:
193 |                         cache_path.unlink()
194 |                         logger.info(f"Removed old token cache: {cache_path}")
195 |                     except Exception as e:
196 |                         logger.warning(f"Could not remove old cache file: {e}")
197 |                     self.token_info = None
198 |                     return False
199 |                 
200 |                 logger.info(f"Loaded cached token (expires in {(self.token_info.created_at + self.token_info.expires_in) - int(time.time())} seconds)")
201 |                 return True
202 |         except Exception as e:
203 |             logger.error(f"Error loading cached token: {e}")
204 |             # If there's any error reading the cache, try to remove the corrupted file
205 |             try:
206 |                 cache_path.unlink()
207 |                 logger.info(f"Removed corrupted token cache: {cache_path}")
208 |             except Exception as cleanup_error:
209 |                 logger.warning(f"Could not remove corrupted cache file: {cleanup_error}")
210 |             return False
211 |     
212 |     def _save_token_to_cache(self) -> None:
213 |         """Save token to cache file"""
214 |         if not self.token_info:
215 |             return
216 |         
217 |         cache_path = self._get_token_cache_path()
218 |         
219 |         try:
220 |             with open(cache_path, "w") as f:
221 |                 json.dump(self.token_info.serialize(), f)
222 |             logger.info(f"Token cached at: {cache_path}")
223 |         except Exception as e:
224 |             logger.error(f"Error saving token to cache: {e}")
225 |     
226 |     def get_auth_url(self) -> str:
227 |         """Generate the Facebook OAuth URL for desktop app flow"""
228 |         return (
229 |             f"https://www.facebook.com/v22.0/dialog/oauth?"
230 |             f"client_id={self.app_id}&"
231 |             f"redirect_uri={self.redirect_uri}&"
232 |             f"scope={AUTH_SCOPE}&"
233 |             f"response_type={AUTH_RESPONSE_TYPE}"
234 |         )
235 |     
236 |     def authenticate(self, force_refresh: bool = False) -> Optional[str]:
237 |         """
238 |         Authenticate with Meta APIs
239 |         
240 |         Args:
241 |             force_refresh: Force token refresh even if cached token exists
242 |             
243 |         Returns:
244 |             Access token if successful, None otherwise
245 |         """
246 |         # If Pipeboard auth is available, use that instead
247 |         if self.use_pipeboard:
248 |             logger.info("Using Pipeboard authentication")
249 |             return pipeboard_auth_manager.get_access_token(force_refresh=force_refresh)
250 |         
251 |         # Otherwise, use the original OAuth flow
252 |         # Check if we already have a valid token
253 |         if not force_refresh and self.token_info and not self.token_info.is_expired():
254 |             return self.token_info.access_token
255 |         
256 |         # Start the callback server if not already running
257 |         try:
258 |             port = start_callback_server()
259 |             
260 |             # Update redirect URI with the actual port
261 |             self.redirect_uri = f"http://localhost:{port}/callback"
262 |             
263 |             # Generate the auth URL
264 |             auth_url = self.get_auth_url()
265 |             
266 |             # Open browser with auth URL
267 |             logger.info(f"Opening browser with URL: {auth_url}")
268 |             webbrowser.open(auth_url)
269 |             
270 |             # We don't wait for the token here anymore
271 |             # The token will be processed by the callback server
272 |             # Just return None to indicate we've started the flow
273 |             return None
274 |         except Exception as e:
275 |             logger.error(f"Failed to start callback server: {e}")
276 |             logger.info("Callback server disabled. OAuth authentication flow cannot be used.")
277 |             return None
278 |     
279 |     def get_access_token(self) -> Optional[str]:
280 |         """
281 |         Get the current access token, refreshing if necessary
282 |         
283 |         Returns:
284 |             Access token if available, None otherwise
285 |         """
286 |         # If using Pipeboard, always delegate to the Pipeboard auth manager
287 |         if self.use_pipeboard:
288 |             return pipeboard_auth_manager.get_access_token()
289 |             
290 |         if not self.token_info or self.token_info.is_expired():
291 |             return None
292 |         
293 |         return self.token_info.access_token
294 |         
295 |     def invalidate_token(self) -> None:
296 |         """Invalidate the current token, usually because it has expired or is invalid"""
297 |         # If using Pipeboard, delegate to the Pipeboard auth manager
298 |         if self.use_pipeboard:
299 |             pipeboard_auth_manager.invalidate_token()
300 |             return
301 |             
302 |         if self.token_info:
303 |             logger.info(f"Invalidating token: {self.token_info.access_token[:10]}...")
304 |             self.token_info = None
305 |             
306 |             # Signal that authentication is needed
307 |             global needs_authentication
308 |             needs_authentication = True
309 |             
310 |             # Remove the cached token file
311 |             try:
312 |                 cache_path = self._get_token_cache_path()
313 |                 if cache_path.exists():
314 |                     os.remove(cache_path)
315 |                     logger.info(f"Removed cached token file: {cache_path}")
316 |             except Exception as e:
317 |                 logger.error(f"Error removing cached token file: {e}")
318 |     
319 |     def clear_token(self) -> None:
320 |         """Alias for invalidate_token for consistency with other APIs"""
321 |         self.invalidate_token()
322 | 
323 | 
324 | def process_token_response(token_container):
325 |     """Process the token response from Facebook."""
326 |     global needs_authentication, auth_manager
327 |     
328 |     if token_container and token_container.get('token'):
329 |         logger.info("Processing token response from Facebook OAuth")
330 |         
331 |         # Exchange the short-lived token for a long-lived token
332 |         short_lived_token = token_container['token']
333 |         long_lived_token_info = exchange_token_for_long_lived(short_lived_token)
334 |         
335 |         if long_lived_token_info:
336 |             logger.info(f"Successfully exchanged for long-lived token (expires in {long_lived_token_info.expires_in} seconds)")
337 |             
338 |             try:
339 |                 auth_manager.token_info = long_lived_token_info
340 |                 logger.info(f"Long-lived token info set in auth_manager, expires in {long_lived_token_info.expires_in} seconds")
341 |             except NameError:
342 |                 logger.error("auth_manager not defined when trying to process token")
343 |                 
344 |             try:
345 |                 logger.info("Attempting to save long-lived token to cache")
346 |                 auth_manager._save_token_to_cache()
347 |                 logger.info(f"Long-lived token successfully saved to cache at {auth_manager._get_token_cache_path()}")
348 |             except Exception as e:
349 |                 logger.error(f"Error saving token to cache: {e}")
350 |                 
351 |             needs_authentication = False
352 |             return True
353 |         else:
354 |             # Fall back to the short-lived token if exchange fails
355 |             logger.warning("Failed to exchange for long-lived token, using short-lived token instead")
356 |             token_info = TokenInfo(
357 |                 access_token=short_lived_token,
358 |                 expires_in=token_container.get('expires_in', 0)
359 |             )
360 |             
361 |             try:
362 |                 auth_manager.token_info = token_info
363 |                 logger.info(f"Short-lived token info set in auth_manager, expires in {token_info.expires_in} seconds")
364 |             except NameError:
365 |                 logger.error("auth_manager not defined when trying to process token")
366 |                 
367 |             try:
368 |                 logger.info("Attempting to save token to cache")
369 |                 auth_manager._save_token_to_cache()
370 |                 logger.info(f"Token successfully saved to cache at {auth_manager._get_token_cache_path()}")
371 |             except Exception as e:
372 |                 logger.error(f"Error saving token to cache: {e}")
373 |                 
374 |             needs_authentication = False
375 |             return True
376 |     else:
377 |         logger.warning("Received empty token in process_token_response")
378 |         needs_authentication = True
379 |         return False
380 | 
381 | 
382 | def exchange_token_for_long_lived(short_lived_token):
383 |     """
384 |     Exchange a short-lived token for a long-lived token (60 days validity).
385 |     
386 |     Args:
387 |         short_lived_token: The short-lived access token received from OAuth flow
388 |         
389 |     Returns:
390 |         TokenInfo object with the long-lived token, or None if exchange failed
391 |     """
392 |     logger.info("Attempting to exchange short-lived token for long-lived token")
393 |     
394 |     try:
395 |         # Get the app ID from the configuration
396 |         app_id = meta_config.get_app_id()
397 |         
398 |         # Get the app secret - this should be securely stored
399 |         app_secret = os.environ.get("META_APP_SECRET", "")
400 |         
401 |         if not app_id or not app_secret:
402 |             logger.error("Missing app_id or app_secret for token exchange")
403 |             return None
404 |             
405 |         # Make the API request to exchange the token
406 |         url = "https://graph.facebook.com/v22.0/oauth/access_token"
407 |         params = {
408 |             "grant_type": "fb_exchange_token",
409 |             "client_id": app_id,
410 |             "client_secret": app_secret,
411 |             "fb_exchange_token": short_lived_token
412 |         }
413 |         
414 |         logger.debug(f"Making token exchange request to {url}")
415 |         response = requests.get(url, params=params)
416 |         
417 |         if response.status_code == 200:
418 |             data = response.json()
419 |             logger.debug(f"Token exchange response: {data}")
420 |             
421 |             # Create TokenInfo from the response
422 |             # The response includes access_token and expires_in (in seconds)
423 |             new_token = data.get("access_token")
424 |             expires_in = data.get("expires_in")
425 |             
426 |             if new_token:
427 |                 logger.info(f"Received long-lived token, expires in {expires_in} seconds (~{expires_in//86400} days)")
428 |                 return TokenInfo(
429 |                     access_token=new_token,
430 |                     expires_in=expires_in
431 |                 )
432 |             else:
433 |                 logger.error("No access_token in exchange response")
434 |                 return None
435 |         else:
436 |             logger.error(f"Token exchange failed with status {response.status_code}: {response.text}")
437 |             return None
438 |     except Exception as e:
439 |         logger.error(f"Error exchanging token: {e}")
440 |         return None
441 | 
442 | 
443 | async def get_current_access_token() -> Optional[str]:
444 |     """Get the current access token from auth manager"""
445 |     # Check for environment variable first - this takes highest precedence
446 |     env_token = os.environ.get("META_ACCESS_TOKEN")
447 |     if env_token:
448 |         logger.debug("Using access token from META_ACCESS_TOKEN environment variable")
449 |         # Basic validation
450 |         if len(env_token) < 20:  # Most Meta tokens are much longer
451 |             logger.error(f"TOKEN VALIDATION FAILED: Token from environment variable appears malformed (length: {len(env_token)})")
452 |             return None
453 |         return env_token
454 |         
455 |     # Use the singleton auth manager
456 |     global auth_manager
457 |     
458 |     # Log the function call and current app ID
459 |     logger.debug("get_current_access_token() called")
460 |     app_id = meta_config.get_app_id()
461 |     logger.debug(f"Current app_id: {app_id}")
462 |     
463 |     # Check if using Pipeboard authentication
464 |     using_pipeboard = auth_manager.use_pipeboard
465 |     
466 |     # Check if app_id is valid - but only if not using Pipeboard authentication
467 |     if not app_id and not using_pipeboard:
468 |         logger.error("TOKEN VALIDATION FAILED: No valid app_id configured")
469 |         logger.error("Please set META_APP_ID environment variable or configure via meta_config.set_app_id()")
470 |         return None
471 |     
472 |     # Attempt to get access token
473 |     try:
474 |         token = auth_manager.get_access_token()
475 |         
476 |         if token:
477 |             # Add basic token validation - check if it looks like a valid token
478 |             if len(token) < 20:  # Most Meta tokens are much longer
479 |                 logger.error(f"TOKEN VALIDATION FAILED: Token appears malformed (length: {len(token)})")
480 |                 auth_manager.invalidate_token()
481 |                 return None
482 |                 
483 |             logger.debug(f"Access token found in auth_manager (starts with: {token[:10]}...)")
484 |             return token
485 |         else:
486 |             logger.warning("No valid access token available in auth_manager")
487 |             
488 |             # Check why token might be missing
489 |             if hasattr(auth_manager, 'token_info') and auth_manager.token_info:
490 |                 if auth_manager.token_info.is_expired():
491 |                     logger.error("TOKEN VALIDATION FAILED: Token is expired")
492 |                     # Add expiration details
493 |                     if hasattr(auth_manager.token_info, 'expires_in') and auth_manager.token_info.expires_in:
494 |                         expiry_time = auth_manager.token_info.created_at + auth_manager.token_info.expires_in
495 |                         current_time = int(time.time())
496 |                         expired_seconds_ago = current_time - expiry_time
497 |                         logger.error(f"Token expired {expired_seconds_ago} seconds ago")
498 |                 elif not auth_manager.token_info.access_token:
499 |                     logger.error("TOKEN VALIDATION FAILED: Token object exists but access_token is empty")
500 |                 else:
501 |                     logger.error("TOKEN VALIDATION FAILED: Token exists but was rejected for unknown reason")
502 |             else:
503 |                 logger.error("TOKEN VALIDATION FAILED: No token information available")
504 |                 
505 |             # Suggest next steps for troubleshooting
506 |             logger.error("To fix: Try re-authenticating or check if your token has been revoked")
507 |             return None
508 |     except Exception as e:
509 |         logger.error(f"Error getting access token: {str(e)}")
510 |         import traceback
511 |         logger.error(f"Token validation stacktrace: {traceback.format_exc()}")
512 |         return None
513 | 
514 | 
515 | def login():
516 |     """
517 |     Start the login flow to authenticate with Meta
518 |     """
519 |     print("Starting Meta Ads authentication flow...")
520 |     
521 |     try:
522 |         # Start the callback server first
523 |         try:
524 |             port = start_callback_server()
525 |         except Exception as callback_error:
526 |             print(f"Error: {callback_error}")
527 |             print("Callback server is disabled. Please use alternative authentication methods:")
528 |             print("- Set PIPEBOARD_API_TOKEN environment variable for Pipeboard authentication")
529 |             print("- Or provide a direct META_ACCESS_TOKEN environment variable")
530 |             return
531 |         
532 |         # Get the auth URL and open the browser
533 |         auth_url = auth_manager.get_auth_url()
534 |         print(f"Opening browser with URL: {auth_url}")
535 |         webbrowser.open(auth_url)
536 |         
537 |         # Wait for token to be received
538 |         print("Waiting for authentication to complete...")
539 |         max_wait = 300  # 5 minutes
540 |         wait_interval = 2  # 2 seconds
541 |         
542 |         for _ in range(max_wait // wait_interval):
543 |             if token_container["token"]:
544 |                 token = token_container["token"]
545 |                 print("Authentication successful!")
546 |                 # Verify token works by getting basic user info
547 |                 try:
548 |                     from .api import make_api_request
549 |                     result = asyncio.run(make_api_request("me", token, {}))
550 |                     print(f"Authenticated as: {result.get('name', 'Unknown')} (ID: {result.get('id', 'Unknown')})")
551 |                     return
552 |                 except Exception as e:
553 |                     print(f"Warning: Could not verify token: {e}")
554 |                     return
555 |             time.sleep(wait_interval)
556 |         
557 |         print("Authentication timed out. Please try again.")
558 |     except Exception as e:
559 |         print(f"Error during authentication: {e}")
560 |         print(f"Direct authentication URL: {auth_manager.get_auth_url()}")
561 |         print("You can manually open this URL in your browser to complete authentication.")
562 | 
563 | # Initialize auth manager with a placeholder - will be updated at runtime
564 | META_APP_ID = os.environ.get("META_APP_ID", "YOUR_META_APP_ID")
565 | 
566 | # Create the auth manager
567 | auth_manager = AuthManager(META_APP_ID) 
```

--------------------------------------------------------------------------------
/meta_ads_mcp/core/pipeboard_auth.py:
--------------------------------------------------------------------------------

```python
  1 | """Authentication with Meta Ads API via pipeboard.co."""
  2 | 
  3 | import os
  4 | import json
  5 | import time
  6 | import requests
  7 | from pathlib import Path
  8 | import platform
  9 | from typing import Optional, Dict, Any
 10 | from .utils import logger
 11 | 
 12 | # Enable more detailed logging
 13 | import logging
 14 | logger.setLevel(logging.DEBUG)
 15 | 
 16 | # Base URL for pipeboard API
 17 | PIPEBOARD_API_BASE = "https://pipeboard.co/api"
 18 | 
 19 | # Debug message about API base URL
 20 | logger.info(f"Pipeboard API base URL: {PIPEBOARD_API_BASE}")
 21 | 
 22 | class TokenInfo:
 23 |     """Stores token information including expiration"""
 24 |     def __init__(self, access_token: str, expires_at: Optional[str] = None, token_type: Optional[str] = None):
 25 |         self.access_token = access_token
 26 |         self.expires_at = expires_at
 27 |         self.token_type = token_type
 28 |         self.created_at = int(time.time())
 29 |         logger.debug(f"TokenInfo created. Expires at: {expires_at if expires_at else 'Not specified'}")
 30 |     
 31 |     def is_expired(self) -> bool:
 32 |         """Check if the token is expired"""
 33 |         if not self.expires_at:
 34 |             logger.debug("No expiration date set for token, assuming not expired")
 35 |             return False  # If no expiration is set, assume it's not expired
 36 |         
 37 |         # Parse ISO 8601 date format to timestamp
 38 |         try:
 39 |             # Convert the expires_at string to a timestamp
 40 |             # Format is like "2023-12-31T23:59:59.999Z" or "2023-12-31T23:59:59.999+00:00"
 41 |             from datetime import datetime
 42 |             
 43 |             # Remove the Z suffix if present and handle +00:00 format
 44 |             expires_at_str = self.expires_at
 45 |             if expires_at_str.endswith('Z'):
 46 |                 expires_at_str = expires_at_str[:-1]  # Remove Z
 47 |                 
 48 |             # Handle microseconds if present
 49 |             if '.' in expires_at_str:
 50 |                 datetime_format = "%Y-%m-%dT%H:%M:%S.%f"
 51 |             else:
 52 |                 datetime_format = "%Y-%m-%dT%H:%M:%S"
 53 |                 
 54 |             # Handle timezone offset
 55 |             timezone_offset = "+00:00"
 56 |             if "+" in expires_at_str:
 57 |                 expires_at_str, timezone_offset = expires_at_str.split("+")
 58 |                 timezone_offset = "+" + timezone_offset
 59 |             
 60 |             # Parse the datetime without timezone info
 61 |             expires_datetime = datetime.strptime(expires_at_str, datetime_format)
 62 |             
 63 |             # Convert to timestamp (assume UTC)
 64 |             expires_timestamp = expires_datetime.timestamp()
 65 |             current_time = time.time()
 66 |             
 67 |             # Check if token is expired and log result
 68 |             is_expired = current_time > expires_timestamp
 69 |             time_diff = expires_timestamp - current_time
 70 |             if is_expired:
 71 |                 logger.debug(f"Token is expired! Current time: {datetime.fromtimestamp(current_time)}, " 
 72 |                              f"Expires at: {datetime.fromtimestamp(expires_timestamp)}, "
 73 |                              f"Expired {abs(time_diff):.0f} seconds ago")
 74 |             else:
 75 |                 logger.debug(f"Token is still valid. Expires at: {datetime.fromtimestamp(expires_timestamp)}, "
 76 |                              f"Time remaining: {time_diff:.0f} seconds")
 77 |             
 78 |             return is_expired
 79 |         except Exception as e:
 80 |             logger.error(f"Error parsing expiration date: {e}")
 81 |             # Log the actual value to help diagnose format issues
 82 |             logger.error(f"Invalid expires_at value: '{self.expires_at}'")
 83 |             # Log detailed error information
 84 |             import traceback
 85 |             logger.error(f"Traceback: {traceback.format_exc()}")
 86 |             return False  # If we can't parse the date, assume it's not expired
 87 |     
 88 |     def serialize(self) -> Dict[str, Any]:
 89 |         """Convert to a dictionary for storage"""
 90 |         return {
 91 |             "access_token": self.access_token,
 92 |             "expires_at": self.expires_at,
 93 |             "token_type": self.token_type,
 94 |             "created_at": self.created_at
 95 |         }
 96 |     
 97 |     @classmethod
 98 |     def deserialize(cls, data: Dict[str, Any]) -> 'TokenInfo':
 99 |         """Create from a stored dictionary"""
100 |         logger.debug(f"Deserializing token data with keys: {', '.join(data.keys())}")
101 |         if 'expires_at' in data:
102 |             logger.debug(f"Token expires_at from cache: {data['expires_at']}")
103 |         
104 |         token = cls(
105 |             access_token=data.get("access_token", ""),
106 |             expires_at=data.get("expires_at"),
107 |             token_type=data.get("token_type")
108 |         )
109 |         token.created_at = data.get("created_at", int(time.time()))
110 |         return token
111 | 
112 | 
113 | class PipeboardAuthManager:
114 |     """Manages authentication with Meta APIs via pipeboard.co"""
115 |     def __init__(self):
116 |         self.api_token = os.environ.get("PIPEBOARD_API_TOKEN", "")
117 |         logger.debug(f"PipeboardAuthManager initialized with API token: {self.api_token[:5]}..." if self.api_token else "No API token")
118 |         if self.api_token:
119 |             logger.info("Pipeboard authentication enabled. Will use pipeboard.co for Meta authentication.")
120 |         else:
121 |             logger.info("Pipeboard authentication not enabled. Set PIPEBOARD_API_TOKEN environment variable to enable.")
122 |         self.token_info = None
123 |         # Note: Token caching is disabled to always fetch fresh tokens from Pipeboard
124 |     
125 |     def _get_token_cache_path(self) -> Path:
126 |         """Get the platform-specific path for token cache file"""
127 |         if platform.system() == "Windows":
128 |             base_path = Path(os.environ.get("APPDATA", ""))
129 |         elif platform.system() == "Darwin":  # macOS
130 |             base_path = Path.home() / "Library" / "Application Support"
131 |         else:  # Assume Linux/Unix
132 |             base_path = Path.home() / ".config"
133 |         
134 |         # Create directory if it doesn't exist
135 |         cache_dir = base_path / "meta-ads-mcp"
136 |         cache_dir.mkdir(parents=True, exist_ok=True)
137 |         
138 |         cache_path = cache_dir / "pipeboard_token_cache.json"
139 |         logger.debug(f"Token cache path: {cache_path}")
140 |         return cache_path
141 |     
142 |     def _load_cached_token(self) -> bool:
143 |         """Load token from cache if available"""
144 |         cache_path = self._get_token_cache_path()
145 |         
146 |         if not cache_path.exists():
147 |             logger.debug(f"Token cache file not found at {cache_path}")
148 |             return False
149 |         
150 |         try:
151 |             with open(cache_path, "r") as f:
152 |                 logger.debug(f"Reading token cache from {cache_path}")
153 |                 data = json.load(f)
154 |                 
155 |                 # Validate the cached data structure
156 |                 required_fields = ["access_token"]
157 |                 if not all(field in data for field in required_fields):
158 |                     logger.warning("Cached token data is missing required fields")
159 |                     return False
160 |                 
161 |                 # Check if the token looks valid (basic format check)
162 |                 if not data.get("access_token") or len(data["access_token"]) < 20:
163 |                     logger.warning("Cached token appears malformed")
164 |                     return False
165 |                 
166 |                 self.token_info = TokenInfo.deserialize(data)
167 |                 
168 |                 # Log token details (partial token for security)
169 |                 masked_token = self.token_info.access_token[:10] + "..." + self.token_info.access_token[-5:] if self.token_info.access_token else "None"
170 |                 logger.debug(f"Loaded token: {masked_token}")
171 |                 
172 |                 # Check if token is expired
173 |                 if self.token_info.is_expired():
174 |                     logger.info("Cached token is expired, removing cache file")
175 |                     # Remove the expired cache file
176 |                     try:
177 |                         cache_path.unlink()
178 |                         logger.info(f"Removed expired token cache: {cache_path}")
179 |                     except Exception as e:
180 |                         logger.warning(f"Could not remove expired cache file: {e}")
181 |                     self.token_info = None
182 |                     return False
183 |                 
184 |                 # Additional validation: check if token is too old (more than 60 days)
185 |                 current_time = int(time.time())
186 |                 if self.token_info.created_at and (current_time - self.token_info.created_at) > (60 * 24 * 3600):
187 |                     logger.warning("Cached token is too old (more than 60 days), removing cache file")
188 |                     try:
189 |                         cache_path.unlink()
190 |                         logger.info(f"Removed old token cache: {cache_path}")
191 |                     except Exception as e:
192 |                         logger.warning(f"Could not remove old cache file: {e}")
193 |                     self.token_info = None
194 |                     return False
195 |                 
196 |                 logger.info(f"Loaded cached token (expires at {self.token_info.expires_at})")
197 |                 return True
198 |         except json.JSONDecodeError as e:
199 |             logger.error(f"Error parsing token cache file: {e}")
200 |             logger.debug("Token cache file might be corrupted, trying to read raw content")
201 |             try:
202 |                 with open(cache_path, "r") as f:
203 |                     raw_content = f.read()
204 |                 logger.debug(f"Raw cache file content (first 100 chars): {raw_content[:100]}")
205 |             except Exception as e2:
206 |                 logger.error(f"Could not read raw cache file: {e2}")
207 |             # If there's any error reading the cache, try to remove the corrupted file
208 |             try:
209 |                 cache_path.unlink()
210 |                 logger.info(f"Removed corrupted token cache: {cache_path}")
211 |             except Exception as cleanup_error:
212 |                 logger.warning(f"Could not remove corrupted cache file: {cleanup_error}")
213 |             return False
214 |         except Exception as e:
215 |             logger.error(f"Error loading cached token: {e}")
216 |             # If there's any error reading the cache, try to remove the corrupted file
217 |             try:
218 |                 cache_path.unlink()
219 |                 logger.info(f"Removed corrupted token cache: {cache_path}")
220 |             except Exception as cleanup_error:
221 |                 logger.warning(f"Could not remove corrupted cache file: {cleanup_error}")
222 |             return False
223 |     
224 |     def _save_token_to_cache(self) -> None:
225 |         """Save token to cache file"""
226 |         if not self.token_info:
227 |             logger.debug("No token to save to cache")
228 |             return
229 |         
230 |         cache_path = self._get_token_cache_path()
231 |         
232 |         try:
233 |             token_data = self.token_info.serialize()
234 |             logger.debug(f"Saving token to cache. Expires at: {token_data.get('expires_at')}")
235 |             
236 |             with open(cache_path, "w") as f:
237 |                 json.dump(token_data, f)
238 |             logger.info(f"Token cached at: {cache_path}")
239 |         except Exception as e:
240 |             logger.error(f"Error saving token to cache: {e}")
241 |     
242 |     def initiate_auth_flow(self) -> Dict[str, str]:
243 |         """
244 |         Initiate the Meta OAuth flow via pipeboard.co
245 |         
246 |         Returns:
247 |             Dict with loginUrl and status info
248 |         """
249 |         if not self.api_token:
250 |             logger.error("No PIPEBOARD_API_TOKEN environment variable set")
251 |             raise ValueError("No PIPEBOARD_API_TOKEN environment variable set")
252 |             
253 |         # Exactly match the format used in meta_auth_test.sh
254 |         url = f"{PIPEBOARD_API_BASE}/meta/auth?api_token={self.api_token}"
255 |         headers = {
256 |             "Content-Type": "application/json"
257 |         }
258 |         
259 |         logger.info(f"Initiating auth flow with POST request to {url}")
260 |         
261 |         try:
262 |             # Make the POST request exactly as in the working meta_auth_test.sh script
263 |             response = requests.post(url, headers=headers)
264 |             logger.info(f"Auth flow response status: {response.status_code}")
265 |             
266 |             # Better error handling
267 |             if response.status_code != 200:
268 |                 logger.error(f"Auth flow error: HTTP {response.status_code}")
269 |                 error_text = response.text if response.text else "No response content"
270 |                 logger.error(f"Response content: {error_text}")
271 |                 if response.status_code == 404:
272 |                     raise ValueError(f"Pipeboard API endpoint not found. Check if the server is running at {PIPEBOARD_API_BASE}")
273 |                 elif response.status_code == 401:
274 |                     raise ValueError(f"Unauthorized: Invalid API token. Check your PIPEBOARD_API_TOKEN.")
275 |                 
276 |                 response.raise_for_status()
277 |             
278 |             # Parse the response    
279 |             try:
280 |                 data = response.json()
281 |                 logger.info(f"Received response keys: {', '.join(data.keys())}")
282 |             except json.JSONDecodeError:
283 |                 logger.error(f"Could not parse JSON response: {response.text}")
284 |                 raise ValueError(f"Invalid JSON response from auth endpoint: {response.text[:100]}")
285 |             
286 |             # Log auth flow response (without sensitive information)
287 |             if 'loginUrl' in data:
288 |                 logger.info(f"Auth flow initiated successfully with login URL: {data['loginUrl'][:30]}...")
289 |             else:
290 |                 logger.warning(f"Auth flow response missing loginUrl field. Response keys: {', '.join(data.keys())}")
291 |             
292 |             return data
293 |         except requests.exceptions.ConnectionError as e:
294 |             logger.error(f"Connection error to Pipeboard: {e}")
295 |             logger.debug(f"Attempting to connect to: {PIPEBOARD_API_BASE}")
296 |             raise
297 |         except requests.exceptions.Timeout as e:
298 |             logger.error(f"Timeout connecting to Pipeboard: {e}")
299 |             raise
300 |         except requests.exceptions.RequestException as e:
301 |             logger.error(f"Error initiating auth flow: {e}")
302 |             raise
303 |         except Exception as e:
304 |             logger.error(f"Unexpected error initiating auth flow: {e}")
305 |             raise
306 |     
307 |     def get_access_token(self, force_refresh: bool = False) -> Optional[str]:
308 |         """
309 |         Get the current access token, refreshing if necessary or if forced
310 |         
311 |         Args:
312 |             force_refresh: Force token refresh even if cached token exists
313 |             
314 |         Returns:
315 |             Access token if available, None otherwise
316 |         """
317 |         # First check if API token is configured
318 |         if not self.api_token:
319 |             logger.error("TOKEN VALIDATION FAILED: No Pipeboard API token configured")
320 |             logger.error("Please set PIPEBOARD_API_TOKEN environment variable")
321 |             return None
322 |             
323 |         logger.info("Getting fresh token from Pipeboard (caching disabled)")
324 |         
325 |         # If force refresh or no token/expired token, get a new one from Pipeboard
326 |         try:
327 |             # Make a request to get the token, using the same URL format as initiate_auth_flow
328 |             url = f"{PIPEBOARD_API_BASE}/meta/token?api_token={self.api_token}"
329 |             headers = {
330 |                 "Content-Type": "application/json"
331 |             }
332 |             
333 |             logger.info(f"Requesting token from {url}")
334 |             
335 |             # Add timeout for better error messages
336 |             try:
337 |                 response = requests.get(url, headers=headers, timeout=10)
338 |             except requests.exceptions.Timeout:
339 |                 logger.error("TOKEN VALIDATION FAILED: Timeout while connecting to Pipeboard API")
340 |                 logger.error(f"Could not connect to {PIPEBOARD_API_BASE} within 10 seconds")
341 |                 return None
342 |             except requests.exceptions.ConnectionError:
343 |                 logger.error("TOKEN VALIDATION FAILED: Connection error with Pipeboard API")
344 |                 logger.error(f"Could not connect to {PIPEBOARD_API_BASE} - check if service is running")
345 |                 return None
346 |                 
347 |             logger.info(f"Token request response status: {response.status_code}")
348 |             
349 |             # Better error handling with response content
350 |             if response.status_code != 200:
351 |                 logger.error(f"TOKEN VALIDATION FAILED: HTTP error {response.status_code}")
352 |                 error_text = response.text if response.text else "No response content"
353 |                 logger.error(f"Response content: {error_text}")
354 |                 
355 |                 # Add more specific error messages for common status codes
356 |                 if response.status_code == 401:
357 |                     logger.error("Authentication failed: Invalid Pipeboard API token")
358 |                 elif response.status_code == 404:
359 |                     logger.error("Endpoint not found: Check if Pipeboard API service is running correctly")
360 |                 elif response.status_code == 400:
361 |                     logger.error("Bad request: The request to Pipeboard API was malformed")
362 |                     
363 |                 response.raise_for_status()
364 |                 
365 |             try:
366 |                 data = response.json()
367 |                 logger.info(f"Received token response with keys: {', '.join(data.keys())}")
368 |             except json.JSONDecodeError:
369 |                 logger.error("TOKEN VALIDATION FAILED: Invalid JSON response from Pipeboard API")
370 |                 logger.error(f"Response content (first 100 chars): {response.text[:100]}")
371 |                 return None
372 |             
373 |             # Validate response data
374 |             if "access_token" not in data:
375 |                 logger.error("TOKEN VALIDATION FAILED: No access_token in Pipeboard API response")
376 |                 logger.error(f"Response keys: {', '.join(data.keys())}")
377 |                 if "error" in data:
378 |                     logger.error(f"Error details: {data['error']}")
379 |                 else:
380 |                     logger.error("No error information available in response")
381 |                 return None
382 |                 
383 |             # Create new token info
384 |             self.token_info = TokenInfo(
385 |                 access_token=data.get("access_token"),
386 |                 expires_at=data.get("expires_at"),
387 |                 token_type=data.get("token_type", "bearer")
388 |             )
389 |             
390 |             # Note: Token caching is disabled
391 |             
392 |             masked_token = self.token_info.access_token[:10] + "..." + self.token_info.access_token[-5:] if self.token_info.access_token else "None"
393 |             logger.info(f"Successfully retrieved access token: {masked_token}")
394 |             return self.token_info.access_token
395 |         except requests.RequestException as e:
396 |             status_code = e.response.status_code if hasattr(e, 'response') and e.response else None
397 |             response_text = e.response.text if hasattr(e, 'response') and e.response else "No response"
398 |             
399 |             if status_code == 401:
400 |                 logger.error(f"Unauthorized: Check your PIPEBOARD_API_TOKEN. Response: {response_text}")
401 |             elif status_code == 404:
402 |                 logger.error(f"No token available: You might need to complete authorization first. Response: {response_text}")
403 |                 # Return None so caller can handle the auth flow
404 |                 return None
405 |             else:
406 |                 logger.error(f"Error getting access token (status {status_code}): {e}")
407 |                 logger.error(f"Response content: {response_text}")
408 |             return None
409 |         except Exception as e:
410 |             logger.error(f"Unexpected error getting access token: {e}")
411 |             return None
412 |         
413 |     def invalidate_token(self) -> None:
414 |         """Invalidate the current token, usually because it has expired or is invalid"""
415 |         if self.token_info:
416 |             logger.info(f"Invalidating token: {self.token_info.access_token[:10]}...")
417 |             self.token_info = None
418 |             
419 |             # Remove the cached token file
420 |             try:
421 |                 cache_path = self._get_token_cache_path()
422 |                 if cache_path.exists():
423 |                     os.remove(cache_path)
424 |                     logger.info(f"Removed cached token file: {cache_path}")
425 |                 else:
426 |                     logger.debug(f"No token cache file to remove: {cache_path}")
427 |             except Exception as e:
428 |                 logger.error(f"Error removing cached token file: {e}")
429 |         else:
430 |             logger.debug("No token to invalidate")
431 | 
432 |     def test_token_validity(self) -> bool:
433 |         """
434 |         Test if the current token is valid with the Meta Graph API
435 |         
436 |         Returns:
437 |             True if valid, False otherwise
438 |         """
439 |         if not self.token_info or not self.token_info.access_token:
440 |             logger.debug("No token to test")
441 |             logger.error("TOKEN VALIDATION FAILED: Missing token to test")
442 |             return False
443 |             
444 |         # Log token details for debugging (partial token for security)
445 |         masked_token = self.token_info.access_token[:5] + "..." + self.token_info.access_token[-5:] if self.token_info.access_token else "None"
446 |         token_type = self.token_info.token_type if hasattr(self.token_info, 'token_type') and self.token_info.token_type else "bearer"
447 |         logger.debug(f"Testing token validity (token: {masked_token}, type: {token_type})")
448 |             
449 |         try:
450 |             # Make a simple request to the /me endpoint to test the token
451 |             META_GRAPH_API_VERSION = "v22.0"
452 |             url = f"https://graph.facebook.com/{META_GRAPH_API_VERSION}/me"
453 |             headers = {"Authorization": f"Bearer {self.token_info.access_token}"}
454 |             
455 |             logger.debug(f"Testing token validity with request to {url}")
456 |             
457 |             # Add timeout and better error handling
458 |             try:
459 |                 response = requests.get(url, headers=headers, timeout=10)
460 |             except requests.exceptions.Timeout:
461 |                 logger.error("TOKEN VALIDATION FAILED: Timeout while connecting to Meta API")
462 |                 logger.error("The Graph API did not respond within 10 seconds")
463 |                 return False
464 |             except requests.exceptions.ConnectionError:
465 |                 logger.error("TOKEN VALIDATION FAILED: Connection error with Meta API")
466 |                 logger.error("Could not establish connection to Graph API - check network connectivity")
467 |                 return False
468 |             
469 |             if response.status_code == 200:
470 |                 data = response.json()
471 |                 logger.debug(f"Token is valid. User ID: {data.get('id')}")
472 |                 # Add more useful user information for debugging
473 |                 user_info = f"User ID: {data.get('id')}"
474 |                 if 'name' in data:
475 |                     user_info += f", Name: {data.get('name')}"
476 |                 logger.info(f"Meta API token validated successfully ({user_info})")
477 |                 return True
478 |             else:
479 |                 logger.error(f"TOKEN VALIDATION FAILED: API returned status {response.status_code}")
480 |                 
481 |                 # Try to parse the error response for more detailed information
482 |                 try:
483 |                     error_data = response.json()
484 |                     if 'error' in error_data:
485 |                         error_obj = error_data.get('error', {})
486 |                         error_code = error_obj.get('code', 'unknown')
487 |                         error_message = error_obj.get('message', 'Unknown error')
488 |                         logger.error(f"Meta API error: Code {error_code} - {error_message}")
489 |                         
490 |                         # Add specific guidance for common error codes
491 |                         if error_code == 190:
492 |                             logger.error("Error indicates the token is invalid or has expired")
493 |                         elif error_code == 4:
494 |                             logger.error("Error indicates rate limiting - too many requests")
495 |                         elif error_code == 200:
496 |                             logger.error("Error indicates API permissions or configuration issue")
497 |                     else:
498 |                         logger.error(f"No error object in response: {error_data}")
499 |                 except json.JSONDecodeError:
500 |                     logger.error(f"Could not parse error response: {response.text[:200]}")
501 |                 
502 |                 return False
503 |         except Exception as e:
504 |             logger.error(f"TOKEN VALIDATION FAILED: Unexpected error: {str(e)}")
505 |             
506 |             # Add stack trace for debugging complex issues
507 |             import traceback
508 |             logger.error(f"Stack trace: {traceback.format_exc()}")
509 |             
510 |             return False
511 | 
512 | 
513 | # Create singleton instance
514 | pipeboard_auth_manager = PipeboardAuthManager() 
```
Page 4/6FirstPrevNextLast