This is page 4 of 6. Use http://codebase.md/nictuku/meta-ads-mcp?lines=true&page={x} to view the full context.
# Directory Structure
```
├── .github
│ └── workflows
│ ├── publish-mcp.yml
│ ├── publish.yml
│ └── test.yml
├── .gitignore
├── .python-version
├── .uv.toml
├── CUSTOM_META_APP.md
├── Dockerfile
├── examples
│ ├── example_http_client.py
│ └── README.md
├── future_improvements.md
├── images
│ └── meta-ads-example.png
├── LICENSE
├── LOCAL_INSTALLATION.md
├── meta_ads_auth.sh
├── meta_ads_mcp
│ ├── __init__.py
│ ├── __main__.py
│ └── core
│ ├── __init__.py
│ ├── accounts.py
│ ├── ads_library.py
│ ├── ads.py
│ ├── adsets.py
│ ├── api.py
│ ├── auth.py
│ ├── authentication.py
│ ├── budget_schedules.py
│ ├── callback_server.py
│ ├── campaigns.py
│ ├── duplication.py
│ ├── http_auth_integration.py
│ ├── insights.py
│ ├── openai_deep_research.py
│ ├── pipeboard_auth.py
│ ├── reports.py
│ ├── resources.py
│ ├── server.py
│ ├── targeting.py
│ └── utils.py
├── META_API_NOTES.md
├── poetry.lock
├── pyproject.toml
├── README.md
├── RELEASE.md
├── requirements.txt
├── server.json
├── setup.py
├── smithery.yaml
├── STREAMABLE_HTTP_SETUP.md
└── tests
├── __init__.py
├── conftest.py
├── e2e_account_info_search_issue.py
├── README_REGRESSION_TESTS.md
├── README.md
├── test_account_info_access_fix.py
├── test_account_search.py
├── test_budget_update_e2e.py
├── test_budget_update.py
├── test_create_ad_creative_simple.py
├── test_create_simple_creative_e2e.py
├── test_dsa_beneficiary.py
├── test_dsa_integration.py
├── test_duplication_regression.py
├── test_duplication.py
├── test_dynamic_creatives.py
├── test_estimate_audience_size_e2e.py
├── test_estimate_audience_size.py
├── test_get_account_pages.py
├── test_get_ad_creatives_fix.py
├── test_get_ad_image_quality_improvements.py
├── test_get_ad_image_regression.py
├── test_http_transport.py
├── test_insights_actions_and_values_e2e.py
├── test_insights_pagination.py
├── test_integration_openai_mcp.py
├── test_is_dynamic_creative_adset.py
├── test_mobile_app_adset_creation.py
├── test_mobile_app_adset_issue.py
├── test_openai_mcp_deep_research.py
├── test_openai.py
├── test_page_discovery_integration.py
├── test_page_discovery.py
├── test_targeting_search_e2e.py
├── test_targeting.py
├── test_update_ad_creative_id.py
└── test_upload_ad_image.py
```
# Files
--------------------------------------------------------------------------------
/tests/test_get_ad_image_quality_improvements.py:
--------------------------------------------------------------------------------
```python
1 | """Tests for get_ad_image quality improvements.
2 |
3 | These tests verify that the get_ad_image function now correctly prioritizes
4 | high-quality ad creative images over profile thumbnails.
5 |
6 | Key improvements tested:
7 | 1. Prioritizes image_urls_for_viewing over thumbnail_url
8 | 2. Uses image_url as second priority
9 | 3. Uses object_story_spec.link_data.picture as third priority
10 | 4. Only uses thumbnail_url as last resort
11 | 5. Better logging to show which image source is being used
12 | """
13 |
14 | import pytest
15 | import json
16 | from unittest.mock import AsyncMock, patch, MagicMock
17 | from meta_ads_mcp.core.ads import get_ad_image
18 | from meta_ads_mcp.core.utils import extract_creative_image_urls
19 |
20 |
21 | class TestGetAdImageQualityImprovements:
22 | """Test cases for image quality improvements in get_ad_image function."""
23 |
24 | @pytest.mark.asyncio
25 | async def test_prioritizes_image_urls_for_viewing_over_thumbnail(self):
26 | """Test that image_urls_for_viewing is prioritized over thumbnail_url."""
27 |
28 | # Mock responses for creative with both high-quality and thumbnail URLs
29 | mock_ad_data = {
30 | "account_id": "act_123456789",
31 | "creative": {"id": "creative_123456789"}
32 | }
33 |
34 | mock_creative_details = {
35 | "id": "creative_123456789",
36 | "name": "Test Creative"
37 | # No image_hash - triggers fallback
38 | }
39 |
40 | # Mock get_ad_creatives response with both URLs
41 | mock_get_ad_creatives_response = json.dumps({
42 | "data": [
43 | {
44 | "id": "creative_123456789",
45 | "name": "Test Creative",
46 | "status": "ACTIVE",
47 | "thumbnail_url": "https://example.com/thumbnail_64x64.jpg", # Low quality
48 | "image_url": "https://example.com/full_image.jpg", # Medium quality
49 | "image_urls_for_viewing": [
50 | "https://example.com/high_quality_image.jpg", # Highest quality
51 | "https://example.com/alt_high_quality.jpg"
52 | ],
53 | "object_story_spec": {
54 | "link_data": {
55 | "picture": "https://example.com/object_story_picture.jpg"
56 | }
57 | }
58 | }
59 | ]
60 | })
61 |
62 | # Mock PIL Image processing
63 | mock_pil_image = MagicMock()
64 | mock_pil_image.mode = "RGB"
65 | mock_pil_image.convert.return_value = mock_pil_image
66 |
67 | mock_byte_stream = MagicMock()
68 | mock_byte_stream.getvalue.return_value = b"fake_jpeg_data"
69 |
70 | with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api, \
71 | patch('meta_ads_mcp.core.ads.get_ad_creatives', new_callable=AsyncMock) as mock_get_creatives, \
72 | patch('meta_ads_mcp.core.ads.download_image', new_callable=AsyncMock) as mock_download, \
73 | patch('meta_ads_mcp.core.ads.PILImage.open') as mock_pil_open, \
74 | patch('meta_ads_mcp.core.ads.io.BytesIO') as mock_bytesio:
75 |
76 | mock_api.side_effect = [mock_ad_data, mock_creative_details]
77 | mock_get_creatives.return_value = mock_get_ad_creatives_response
78 | mock_download.return_value = b"fake_image_bytes"
79 | mock_pil_open.return_value = mock_pil_image
80 | mock_bytesio.return_value = mock_byte_stream
81 |
82 | # This should prioritize image_urls_for_viewing[0]
83 | result = await get_ad_image(access_token="test_token", ad_id="test_ad_id")
84 |
85 | # Verify it used the highest quality URL
86 | assert result is not None
87 | mock_download.assert_called_once_with("https://example.com/high_quality_image.jpg")
88 |
89 | @pytest.mark.asyncio
90 | async def test_falls_back_to_image_url_when_image_urls_for_viewing_unavailable(self):
91 | """Test fallback to image_url when image_urls_for_viewing is not available."""
92 |
93 | # Mock responses for creative without image_urls_for_viewing
94 | mock_ad_data = {
95 | "account_id": "act_123456789",
96 | "creative": {"id": "creative_123456789"}
97 | }
98 |
99 | mock_creative_details = {
100 | "id": "creative_123456789",
101 | "name": "Test Creative"
102 | }
103 |
104 | # Mock get_ad_creatives response without image_urls_for_viewing
105 | mock_get_ad_creatives_response = json.dumps({
106 | "data": [
107 | {
108 | "id": "creative_123456789",
109 | "name": "Test Creative",
110 | "status": "ACTIVE",
111 | "thumbnail_url": "https://example.com/thumbnail.jpg",
112 | "image_url": "https://example.com/full_image.jpg", # Should be used
113 | "object_story_spec": {
114 | "link_data": {
115 | "picture": "https://example.com/object_story_picture.jpg"
116 | }
117 | }
118 | }
119 | ]
120 | })
121 |
122 | # Mock PIL Image processing
123 | mock_pil_image = MagicMock()
124 | mock_pil_image.mode = "RGB"
125 | mock_pil_image.convert.return_value = mock_pil_image
126 |
127 | mock_byte_stream = MagicMock()
128 | mock_byte_stream.getvalue.return_value = b"fake_jpeg_data"
129 |
130 | with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api, \
131 | patch('meta_ads_mcp.core.ads.get_ad_creatives', new_callable=AsyncMock) as mock_get_creatives, \
132 | patch('meta_ads_mcp.core.ads.download_image', new_callable=AsyncMock) as mock_download, \
133 | patch('meta_ads_mcp.core.ads.PILImage.open') as mock_pil_open, \
134 | patch('meta_ads_mcp.core.ads.io.BytesIO') as mock_bytesio:
135 |
136 | mock_api.side_effect = [mock_ad_data, mock_creative_details]
137 | mock_get_creatives.return_value = mock_get_ad_creatives_response
138 | mock_download.return_value = b"fake_image_bytes"
139 | mock_pil_open.return_value = mock_pil_image
140 | mock_bytesio.return_value = mock_byte_stream
141 |
142 | # This should fall back to image_url
143 | result = await get_ad_image(access_token="test_token", ad_id="test_ad_id")
144 |
145 | # Verify it used image_url
146 | assert result is not None
147 | mock_download.assert_called_once_with("https://example.com/full_image.jpg")
148 |
149 | @pytest.mark.asyncio
150 | async def test_falls_back_to_object_story_spec_picture_when_image_url_unavailable(self):
151 | """Test fallback to object_story_spec.link_data.picture when image_url is not available."""
152 |
153 | # Mock responses for creative without image_url
154 | mock_ad_data = {
155 | "account_id": "act_123456789",
156 | "creative": {"id": "creative_123456789"}
157 | }
158 |
159 | mock_creative_details = {
160 | "id": "creative_123456789",
161 | "name": "Test Creative"
162 | }
163 |
164 | # Mock get_ad_creatives response without image_url
165 | mock_get_ad_creatives_response = json.dumps({
166 | "data": [
167 | {
168 | "id": "creative_123456789",
169 | "name": "Test Creative",
170 | "status": "ACTIVE",
171 | "thumbnail_url": "https://example.com/thumbnail.jpg",
172 | "object_story_spec": {
173 | "link_data": {
174 | "picture": "https://example.com/object_story_picture.jpg" # Should be used
175 | }
176 | }
177 | }
178 | ]
179 | })
180 |
181 | # Mock PIL Image processing
182 | mock_pil_image = MagicMock()
183 | mock_pil_image.mode = "RGB"
184 | mock_pil_image.convert.return_value = mock_pil_image
185 |
186 | mock_byte_stream = MagicMock()
187 | mock_byte_stream.getvalue.return_value = b"fake_jpeg_data"
188 |
189 | with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api, \
190 | patch('meta_ads_mcp.core.ads.get_ad_creatives', new_callable=AsyncMock) as mock_get_creatives, \
191 | patch('meta_ads_mcp.core.ads.download_image', new_callable=AsyncMock) as mock_download, \
192 | patch('meta_ads_mcp.core.ads.PILImage.open') as mock_pil_open, \
193 | patch('meta_ads_mcp.core.ads.io.BytesIO') as mock_bytesio:
194 |
195 | mock_api.side_effect = [mock_ad_data, mock_creative_details]
196 | mock_get_creatives.return_value = mock_get_ad_creatives_response
197 | mock_download.return_value = b"fake_image_bytes"
198 | mock_pil_open.return_value = mock_pil_image
199 | mock_bytesio.return_value = mock_byte_stream
200 |
201 | # This should fall back to object_story_spec.link_data.picture
202 | result = await get_ad_image(access_token="test_token", ad_id="test_ad_id")
203 |
204 | # Verify it used object_story_spec.link_data.picture
205 | assert result is not None
206 | mock_download.assert_called_once_with("https://example.com/object_story_picture.jpg")
207 |
208 | @pytest.mark.asyncio
209 | async def test_uses_thumbnail_url_only_as_last_resort(self):
210 | """Test that thumbnail_url is only used when no other options are available."""
211 |
212 | # Mock responses for creative with only thumbnail_url
213 | mock_ad_data = {
214 | "account_id": "act_123456789",
215 | "creative": {"id": "creative_123456789"}
216 | }
217 |
218 | mock_creative_details = {
219 | "id": "creative_123456789",
220 | "name": "Test Creative"
221 | }
222 |
223 | # Mock get_ad_creatives response with only thumbnail_url
224 | mock_get_ad_creatives_response = json.dumps({
225 | "data": [
226 | {
227 | "id": "creative_123456789",
228 | "name": "Test Creative",
229 | "status": "ACTIVE",
230 | "thumbnail_url": "https://example.com/thumbnail_only.jpg" # Only option
231 | }
232 | ]
233 | })
234 |
235 | # Mock PIL Image processing
236 | mock_pil_image = MagicMock()
237 | mock_pil_image.mode = "RGB"
238 | mock_pil_image.convert.return_value = mock_pil_image
239 |
240 | mock_byte_stream = MagicMock()
241 | mock_byte_stream.getvalue.return_value = b"fake_jpeg_data"
242 |
243 | with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api, \
244 | patch('meta_ads_mcp.core.ads.get_ad_creatives', new_callable=AsyncMock) as mock_get_creatives, \
245 | patch('meta_ads_mcp.core.ads.download_image', new_callable=AsyncMock) as mock_download, \
246 | patch('meta_ads_mcp.core.ads.PILImage.open') as mock_pil_open, \
247 | patch('meta_ads_mcp.core.ads.io.BytesIO') as mock_bytesio:
248 |
249 | mock_api.side_effect = [mock_ad_data, mock_creative_details]
250 | mock_get_creatives.return_value = mock_get_ad_creatives_response
251 | mock_download.return_value = b"fake_image_bytes"
252 | mock_pil_open.return_value = mock_pil_image
253 | mock_bytesio.return_value = mock_byte_stream
254 |
255 | # This should use thumbnail_url as last resort
256 | result = await get_ad_image(access_token="test_token", ad_id="test_ad_id")
257 |
258 | # Verify it used thumbnail_url
259 | assert result is not None
260 | mock_download.assert_called_once_with("https://example.com/thumbnail_only.jpg")
261 |
262 | def test_extract_creative_image_urls_prioritizes_quality(self):
263 | """Test that extract_creative_image_urls correctly prioritizes image quality."""
264 |
265 | # Test creative with multiple image URLs
266 | test_creative = {
267 | "id": "creative_123456789",
268 | "name": "Test Creative",
269 | "thumbnail_url": "https://example.com/thumbnail.jpg", # Lowest priority
270 | "image_url": "https://example.com/image.jpg", # Medium priority
271 | "image_urls_for_viewing": [
272 | "https://example.com/high_quality_1.jpg", # Highest priority
273 | "https://example.com/high_quality_2.jpg"
274 | ],
275 | "object_story_spec": {
276 | "link_data": {
277 | "picture": "https://example.com/object_story_picture.jpg" # High priority
278 | }
279 | }
280 | }
281 |
282 | # Extract URLs
283 | urls = extract_creative_image_urls(test_creative)
284 |
285 | # Verify correct priority order
286 | assert len(urls) >= 4
287 | assert urls[0] == "https://example.com/high_quality_1.jpg" # First priority
288 | assert urls[1] == "https://example.com/high_quality_2.jpg" # Second priority
289 | assert "https://example.com/image.jpg" in urls # Medium priority
290 | assert "https://example.com/object_story_picture.jpg" in urls # High priority
291 | assert urls[-1] == "https://example.com/thumbnail.jpg" # Last priority
292 |
293 | def test_extract_creative_image_urls_handles_missing_fields(self):
294 | """Test that extract_creative_image_urls handles missing fields gracefully."""
295 |
296 | # Test creative with minimal fields
297 | test_creative = {
298 | "id": "creative_123456789",
299 | "name": "Minimal Creative",
300 | "thumbnail_url": "https://example.com/thumbnail.jpg"
301 | }
302 |
303 | # Extract URLs
304 | urls = extract_creative_image_urls(test_creative)
305 |
306 | # Should still work with only thumbnail_url
307 | assert len(urls) == 1
308 | assert urls[0] == "https://example.com/thumbnail.jpg"
309 |
310 | def test_extract_creative_image_urls_removes_duplicates(self):
311 | """Test that extract_creative_image_urls removes duplicate URLs."""
312 |
313 | # Test creative with duplicate URLs
314 | test_creative = {
315 | "id": "creative_123456789",
316 | "name": "Duplicate URLs Creative",
317 | "thumbnail_url": "https://example.com/same_url.jpg",
318 | "image_url": "https://example.com/same_url.jpg", # Duplicate
319 | "image_urls_for_viewing": [
320 | "https://example.com/same_url.jpg", # Duplicate
321 | "https://example.com/unique_url.jpg"
322 | ]
323 | }
324 |
325 | # Extract URLs
326 | urls = extract_creative_image_urls(test_creative)
327 |
328 | # Should remove duplicates while preserving order
329 | assert len(urls) == 2
330 | assert urls[0] == "https://example.com/same_url.jpg" # First occurrence
331 | assert urls[1] == "https://example.com/unique_url.jpg"
332 |
333 | @pytest.mark.asyncio
334 | async def test_get_ad_image_with_real_world_example(self):
335 | """Test with a real-world example that mimics the actual API response structure."""
336 |
337 | # Mock responses based on real API data
338 | mock_ad_data = {
339 | "account_id": "act_15975950",
340 | "creative": {"id": "606995022142818"}
341 | }
342 |
343 | mock_creative_details = {
344 | "id": "606995022142818",
345 | "name": "Test Creative"
346 | }
347 |
348 | # Mock get_ad_creatives response based on real data
349 | mock_get_ad_creatives_response = json.dumps({
350 | "data": [
351 | {
352 | "id": "606995022142818",
353 | "name": "Test Creative",
354 | "status": "ACTIVE",
355 | "thumbnail_url": "https://external.fbsb6-1.fna.fbcdn.net/emg1/v/t13/13476424677788553381?url=https%3A%2F%2Fwww.facebook.com%2Fads%2Fimage%2F%3Fd%3DAQLuJ5l4AROBvIUchp4g4JXxIT5uAZiAsgHQkD8Iw7BeVtkXNUUfs3leWpqQplJCJdixVIg3mq9KichJ64eRfM-r8aY4GtVQp8TvS_HBByJ8fGg_Cs7Kb8YkN4IDwJ4iQIIkMx30LycCKzuYtp9M-vOk&fb_obo=1&utld=facebook.com&stp=c0.5000x0.5000f_dst-emg0_p64x64_q75_tt6&edm=AEuWsiQEAAAA&_nc_gid=_QBCRbZxDq-i1ZiGEXxW2w&_nc_eui2=AeEbQXzmAdoqWLIXjuTDJ0xAoThZu47BlQqhOFm7jsGVCloP48Ep6Y_qIA5tcqrcSDff5f_k8xGzFIpD7PnUws8c&_nc_oc=Adn3GeYlXxbfEeY0wCBSgNdlwO80wXt5R5bgY2NozdroZ6CRSaXIaOSjVSK9S1LsqsY4GL_0dVzU80RY8QMucEkZ&ccb=13-1&oh=06_Q3-1AcBKUD0rfLGATAveIM5hMSWG9c7DsJzq2arvOl8W4Bpn&oe=688C87B2&_nc_sid=58080a",
356 | "image_url": "https://scontent.fbsb6-1.fna.fbcdn.net/v/t45.1600-4/518574136_1116014047008737_2492837958169838537_n.png?stp=dst-jpg_tt6&_nc_cat=109&ccb=1-7&_nc_sid=890911&_nc_eui2=AeHbHqoiAUgF0QeX-tvUoDjYeTyJad_QEPF5PIlp39AQ8dP8cvOlHwiJjny8AUv7xxAlYyy5BGCqFU_oVM9CI7ln&_nc_ohc=VTTYlMOAWZoQ7kNvwGjLMW5&_nc_oc=AdnYDrpNrLovWZC_RG4tvoICGPjBNfzNJimhx-4SKW4BU2i_yzL00dX0-OiYEYokq394g8xR-1a-OuVDAm4HsSJy&_nc_zt=1&_nc_ht=scontent.fbsb6-1.fna&edm=AEuWsiQEAAAA&_nc_gid=_QBCRbZxDq-i1ZiGEXxW2w&oh=00_AfTujKmF365FnGgcokkkdWnK-vmnzQK8Icvlk0kB8SKM3g&oe=68906FC4",
357 | "image_urls_for_viewing": [
358 | "https://scontent.fbsb6-1.fna.fbcdn.net/v/t45.1600-4/518574136_1116014047008737_2492837958169838537_n.png?stp=dst-jpg_tt6&_nc_cat=109&ccb=1-7&_nc_sid=890911&_nc_eui2=AeHbHqoiAUgF0QeX-tvUoDjYeTyJad_QEPF5PIlp39AQ8dP8cvOlHwiJjny8AUv7xxAlYyy5BGCqFU_oVM9CI7ln&_nc_ohc=VTTYlMOAWZoQ7kNvwGjLMW5&_nc_oc=AdnYDrpNrLovWZC_RG4tvoICGPjBNfzNJimhx-4SKW4BU2i_yzL00dX0-OiYEYokq394g8xR-1a-OuVDAm4HsSJy&_nc_zt=1&_nc_ht=scontent.fbsb6-1.fna&edm=AEuWsiQEAAAA&_nc_gid=_QBCRbZxDq-i1ZiGEXxW2w&oh=00_AfTujKmF365FnGgcokkkdWnK-vmnzQK8Icvlk0kB8SKM3g&oe=68906FC4",
359 | "https://external.fbsb6-1.fna.fbcdn.net/emg1/v/t13/13476424677788553381?url=https%3A%2F%2Fwww.facebook.com%2Fads%2Fimage%2F%3Fd%3DAQLuJ5l4AROBvIUchp4g4JXxIT5uAZiAsgHQkD8Iw7BeVtkXNUUfs3leWpqQplJCJdixVIg3mq9KichJ64eRfM-r8aY4GtVQp8TvS_HBByJ8fGg_Cs7Kb8YkN4IDwJ4iQIIkMx30LycCKzuYtp9M-vOk&fb_obo=1&utld=facebook.com&stp=c0.5000x0.5000f_dst-emg0_p64x64_q75_tt6&edm=AEuWsiQEAAAA&_nc_gid=_QBCRbZxDq-i1ZiGEXxW2w&_nc_eui2=AeEbQXzmAdoqWLIXjuTDJ0xAoThZu47BlQqhOFm7jsGVCloP48Ep6Y_qIA5tcqrcSDff5f_k8xGzFIpD7PnUws8c&_nc_oc=Adn3GeYlXxbfEeY0wCBSgNdlwO80wXt5R5bgY2NozdroZ6CRSaXIaOSjVSK9S1LsqsY4GL_0dVzU80RY8QMucEkZ&ccb=13-1&oh=06_Q3-1AcBKUD0rfLGATAveIM5hMSWG9c7DsJzq2arvOl8W4Bpn&oe=688C87B2&_nc_sid=58080a"
360 | ]
361 | }
362 | ]
363 | })
364 |
365 | # Mock PIL Image processing
366 | mock_pil_image = MagicMock()
367 | mock_pil_image.mode = "RGB"
368 | mock_pil_image.convert.return_value = mock_pil_image
369 |
370 | mock_byte_stream = MagicMock()
371 | mock_byte_stream.getvalue.return_value = b"fake_jpeg_data"
372 |
373 | with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api, \
374 | patch('meta_ads_mcp.core.ads.get_ad_creatives', new_callable=AsyncMock) as mock_get_creatives, \
375 | patch('meta_ads_mcp.core.ads.download_image', new_callable=AsyncMock) as mock_download, \
376 | patch('meta_ads_mcp.core.ads.PILImage.open') as mock_pil_open, \
377 | patch('meta_ads_mcp.core.ads.io.BytesIO') as mock_bytesio:
378 |
379 | mock_api.side_effect = [mock_ad_data, mock_creative_details]
380 | mock_get_creatives.return_value = mock_get_ad_creatives_response
381 | mock_download.return_value = b"fake_image_bytes"
382 | mock_pil_open.return_value = mock_pil_image
383 | mock_bytesio.return_value = mock_byte_stream
384 |
385 | # This should use the first image_urls_for_viewing URL (high quality)
386 | result = await get_ad_image(access_token="test_token", ad_id="test_ad_id")
387 |
388 | # Verify it used the high-quality URL (not the thumbnail)
389 | assert result is not None
390 | expected_url = "https://scontent.fbsb6-1.fna.fbcdn.net/v/t45.1600-4/518574136_1116014047008737_2492837958169838537_n.png?stp=dst-jpg_tt6&_nc_cat=109&ccb=1-7&_nc_sid=890911&_nc_eui2=AeHbHqoiAUgF0QeX-tvUoDjYeTyJad_QEPF5PIlp39AQ8dP8cvOlHwiJjny8AUv7xxAlYyy5BGCqFU_oVM9CI7ln&_nc_ohc=VTTYlMOAWZoQ7kNvwGjLMW5&_nc_oc=AdnYDrpNrLovWZC_RG4tvoICGPjBNfzNJimhx-4SKW4BU2i_yzL00dX0-OiYEYokq394g8xR-1a-OuVDAm4HsSJy&_nc_zt=1&_nc_ht=scontent.fbsb6-1.fna&edm=AEuWsiQEAAAA&_nc_gid=_QBCRbZxDq-i1ZiGEXxW2w&oh=00_AfTujKmF365FnGgcokkkdWnK-vmnzQK8Icvlk0kB8SKM3g&oe=68906FC4"
391 | mock_download.assert_called_once_with(expected_url)
```
--------------------------------------------------------------------------------
/tests/test_get_ad_image_regression.py:
--------------------------------------------------------------------------------
```python
1 | """Regression tests for get_ad_image function fixes.
2 |
3 | Tests for multiple issues that were fixed:
4 |
5 | 1. JSON Parsing Error: 'TypeError: the JSON object must be str, bytes or bytearray, not dict'
6 | - Caused by wrong parameter order and incorrect JSON parsing
7 | - Fixed by correcting parameter order and JSON parsing logic
8 |
9 | 2. Missing Image Hash Support: "Error: No image hashes found in creative"
10 | - Many modern creatives don't have image_hash but have direct URLs
11 | - Fixed by adding direct URL fallback using image_urls_for_viewing and thumbnail_url
12 |
13 | 3. Image Quality Issue: Function was returning profile thumbnails instead of ad creative images
14 | - Fixed by prioritizing image_urls_for_viewing over thumbnail_url
15 | - Added proper fallback hierarchy: image_urls_for_viewing > image_url > object_story_spec.picture > thumbnail_url
16 |
17 | The fixes enable get_ad_image to work with both traditional hash-based and modern URL-based creatives,
18 | and ensure high-quality images are returned instead of thumbnails.
19 | """
20 |
21 | import pytest
22 | import json
23 | from unittest.mock import AsyncMock, patch, MagicMock
24 | from meta_ads_mcp.core.ads import get_ad_image
25 |
26 |
27 | @pytest.mark.asyncio
28 | class TestGetAdImageRegressionFix:
29 | """Regression test cases for the get_ad_image JSON parsing bug fix."""
30 |
31 | async def test_get_ad_image_json_parsing_regression_fix(self):
32 | """Regression test: ensure get_ad_image doesn't throw JSON parsing error."""
33 |
34 | # Mock responses for the main API flow
35 | mock_ad_data = {
36 | "account_id": "act_123456789",
37 | "creative": {"id": "creative_123456789"}
38 | }
39 |
40 | mock_creative_details = {
41 | "id": "creative_123456789",
42 | "name": "Test Creative",
43 | "image_hash": "test_hash_123"
44 | }
45 |
46 | mock_image_data = {
47 | "data": [{
48 | "hash": "test_hash_123",
49 | "url": "https://example.com/image.jpg",
50 | "width": 1200,
51 | "height": 628,
52 | "name": "test_image.jpg",
53 | "status": "ACTIVE"
54 | }]
55 | }
56 |
57 | # Mock PIL Image processing to return a valid Image object
58 | mock_pil_image = MagicMock()
59 | mock_pil_image.mode = "RGB"
60 | mock_pil_image.convert.return_value = mock_pil_image
61 |
62 | mock_byte_stream = MagicMock()
63 | mock_byte_stream.getvalue.return_value = b"fake_jpeg_data"
64 |
65 | with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api, \
66 | patch('meta_ads_mcp.core.ads.download_image', new_callable=AsyncMock) as mock_download, \
67 | patch('meta_ads_mcp.core.ads.PILImage.open') as mock_pil_open, \
68 | patch('meta_ads_mcp.core.ads.io.BytesIO') as mock_bytesio:
69 |
70 | mock_api.side_effect = [mock_ad_data, mock_creative_details, mock_image_data]
71 | mock_download.return_value = b"fake_image_bytes"
72 | mock_pil_open.return_value = mock_pil_image
73 | mock_bytesio.return_value = mock_byte_stream
74 |
75 | # This should NOT raise "the JSON object must be str, bytes or bytearray, not dict"
76 | # Previously this would fail with: TypeError: the JSON object must be str, bytes or bytearray, not dict
77 | result = await get_ad_image(access_token="test_token", ad_id="120228922871870272")
78 |
79 | # Verify we get an Image object (success) - the exact test depends on the mocking
80 | # The key is that we don't get the JSON parsing error
81 | assert result is not None
82 |
83 | # The main regression check: if we got here without an exception, the JSON parsing is fixed
84 | # We might get different results based on mocking, but the critical JSON parsing should work
85 |
86 | async def test_get_ad_image_fallback_path_json_parsing(self):
87 | """Test the fallback path that calls get_ad_creatives handles JSON parsing correctly."""
88 |
89 | # Mock responses that trigger the fallback path (no direct image hash)
90 | mock_ad_data = {
91 | "account_id": "act_123456789",
92 | "creative": {"id": "creative_123456789"}
93 | }
94 |
95 | mock_creative_details = {
96 | "id": "creative_123456789",
97 | "name": "Test Creative"
98 | # No image_hash - this will trigger the fallback
99 | }
100 |
101 | # Mock get_ad_creatives response (wrapped format that caused the original bug)
102 | mock_get_ad_creatives_response = json.dumps({
103 | "data": json.dumps({
104 | "data": [
105 | {
106 | "id": "creative_123456789",
107 | "name": "Test Creative",
108 | "object_story_spec": {
109 | "link_data": {
110 | "image_hash": "fallback_hash_123"
111 | }
112 | }
113 | }
114 | ]
115 | })
116 | })
117 |
118 | mock_image_data = {
119 | "data": [{
120 | "hash": "fallback_hash_123",
121 | "url": "https://example.com/fallback_image.jpg",
122 | "width": 1200,
123 | "height": 628
124 | }]
125 | }
126 |
127 | # Mock PIL Image processing
128 | mock_pil_image = MagicMock()
129 | mock_pil_image.mode = "RGB"
130 | mock_pil_image.convert.return_value = mock_pil_image
131 |
132 | mock_byte_stream = MagicMock()
133 | mock_byte_stream.getvalue.return_value = b"fake_jpeg_data"
134 |
135 | with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api, \
136 | patch('meta_ads_mcp.core.ads.get_ad_creatives', new_callable=AsyncMock) as mock_get_creatives, \
137 | patch('meta_ads_mcp.core.ads.download_image', new_callable=AsyncMock) as mock_download, \
138 | patch('meta_ads_mcp.core.ads.PILImage.open') as mock_pil_open, \
139 | patch('meta_ads_mcp.core.ads.io.BytesIO') as mock_bytesio:
140 |
141 | mock_api.side_effect = [mock_ad_data, mock_creative_details, mock_image_data]
142 | mock_get_creatives.return_value = mock_get_ad_creatives_response
143 | mock_download.return_value = b"fake_image_bytes"
144 | mock_pil_open.return_value = mock_pil_image
145 | mock_bytesio.return_value = mock_byte_stream
146 |
147 | # This should handle the wrapped JSON response correctly
148 | # Previously would fail: TypeError: the JSON object must be str, bytes or bytearray, not dict
149 | result = await get_ad_image(access_token="test_token", ad_id="120228922871870272")
150 |
151 | # Verify the fallback path worked - key is no JSON parsing exception
152 | assert result is not None
153 | # Verify get_ad_creatives was called (fallback path was triggered)
154 | mock_get_creatives.assert_called_once()
155 |
156 | async def test_get_ad_image_no_ad_id(self):
157 | """Test get_ad_image with no ad_id provided."""
158 |
159 | result = await get_ad_image(access_token="test_token", ad_id=None)
160 |
161 | # Should return error string, not throw JSON parsing error
162 | assert isinstance(result, str)
163 | assert "Error: No ad ID provided" in result
164 |
165 | async def test_get_ad_image_parameter_order_regression(self):
166 | """Regression test: ensure get_ad_creatives is called with correct parameter order."""
167 |
168 | # This test ensures we don't regress to calling get_ad_creatives(ad_id, "", access_token)
169 | # which was the original bug
170 |
171 | mock_ad_data = {
172 | "account_id": "act_123456789",
173 | "creative": {"id": "creative_123456789"}
174 | }
175 |
176 | mock_creative_details = {
177 | "id": "creative_123456789",
178 | "name": "Test Creative"
179 | # No image_hash to trigger fallback
180 | }
181 |
182 | with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api, \
183 | patch('meta_ads_mcp.core.ads.get_ad_creatives', new_callable=AsyncMock) as mock_get_creatives:
184 |
185 | mock_api.side_effect = [mock_ad_data, mock_creative_details]
186 | mock_get_creatives.return_value = json.dumps({"data": json.dumps({"data": []})})
187 |
188 | # Call get_ad_image - it should reach the fallback path
189 | result = await get_ad_image(access_token="test_token", ad_id="test_ad_id")
190 |
191 | # Verify get_ad_creatives was called with correct parameter names (not positional)
192 | mock_get_creatives.assert_called_once_with(ad_id="test_ad_id", access_token="test_token")
193 |
194 | # The key regression test: this should not have raised a JSON parsing error
195 |
196 | async def test_get_ad_image_direct_url_fallback_with_image_urls_for_viewing(self):
197 | """Test direct URL fallback using image_urls_for_viewing when no image_hash found."""
198 |
199 | # Mock responses for modern creative without image_hash
200 | mock_ad_data = {
201 | "account_id": "act_123456789",
202 | "creative": {"id": "creative_123456789"}
203 | }
204 |
205 | mock_creative_details = {
206 | "id": "creative_123456789",
207 | "name": "Modern Creative"
208 | # No image_hash - this will trigger fallback
209 | }
210 |
211 | # Mock get_ad_creatives response with direct URLs
212 | mock_get_ad_creatives_response = json.dumps({
213 | "data": [
214 | {
215 | "id": "creative_123456789",
216 | "name": "Modern Creative",
217 | "status": "ACTIVE",
218 | "thumbnail_url": "https://example.com/thumb.jpg",
219 | "image_urls_for_viewing": [
220 | "https://example.com/full_image.jpg",
221 | "https://example.com/alt_image.jpg"
222 | ]
223 | }
224 | ]
225 | })
226 |
227 | # Mock PIL Image processing
228 | mock_pil_image = MagicMock()
229 | mock_pil_image.mode = "RGB"
230 | mock_pil_image.convert.return_value = mock_pil_image
231 |
232 | mock_byte_stream = MagicMock()
233 | mock_byte_stream.getvalue.return_value = b"fake_jpeg_data"
234 |
235 | with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api, \
236 | patch('meta_ads_mcp.core.ads.get_ad_creatives', new_callable=AsyncMock) as mock_get_creatives, \
237 | patch('meta_ads_mcp.core.ads.download_image', new_callable=AsyncMock) as mock_download, \
238 | patch('meta_ads_mcp.core.ads.PILImage.open') as mock_pil_open, \
239 | patch('meta_ads_mcp.core.ads.io.BytesIO') as mock_bytesio:
240 |
241 | mock_api.side_effect = [mock_ad_data, mock_creative_details]
242 | mock_get_creatives.return_value = mock_get_ad_creatives_response
243 | mock_download.return_value = b"fake_image_bytes"
244 | mock_pil_open.return_value = mock_pil_image
245 | mock_bytesio.return_value = mock_byte_stream
246 |
247 | # This should use direct URL fallback successfully
248 | result = await get_ad_image(access_token="test_token", ad_id="test_ad_id")
249 |
250 | # Verify it used the direct URL approach
251 | assert result is not None
252 | mock_get_creatives.assert_called_once()
253 | mock_download.assert_called_once_with("https://example.com/full_image.jpg")
254 |
255 | async def test_get_ad_image_direct_url_fallback_with_thumbnail_url_only(self):
256 | """Test direct URL fallback using thumbnail_url when image_urls_for_viewing not available."""
257 |
258 | # Mock responses for creative with only thumbnail_url
259 | mock_ad_data = {
260 | "account_id": "act_123456789",
261 | "creative": {"id": "creative_123456789"}
262 | }
263 |
264 | mock_creative_details = {
265 | "id": "creative_123456789",
266 | "name": "Thumbnail Only Creative"
267 | # No image_hash
268 | }
269 |
270 | # Mock get_ad_creatives response with only thumbnail_url
271 | mock_get_ad_creatives_response = json.dumps({
272 | "data": [
273 | {
274 | "id": "creative_123456789",
275 | "name": "Thumbnail Only Creative",
276 | "status": "ACTIVE",
277 | "thumbnail_url": "https://example.com/thumb_only.jpg"
278 | # No image_urls_for_viewing
279 | }
280 | ]
281 | })
282 |
283 | # Mock PIL Image processing
284 | mock_pil_image = MagicMock()
285 | mock_pil_image.mode = "RGB"
286 | mock_pil_image.convert.return_value = mock_pil_image
287 |
288 | mock_byte_stream = MagicMock()
289 | mock_byte_stream.getvalue.return_value = b"fake_jpeg_data"
290 |
291 | with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api, \
292 | patch('meta_ads_mcp.core.ads.get_ad_creatives', new_callable=AsyncMock) as mock_get_creatives, \
293 | patch('meta_ads_mcp.core.ads.download_image', new_callable=AsyncMock) as mock_download, \
294 | patch('meta_ads_mcp.core.ads.PILImage.open') as mock_pil_open, \
295 | patch('meta_ads_mcp.core.ads.io.BytesIO') as mock_bytesio:
296 |
297 | mock_api.side_effect = [mock_ad_data, mock_creative_details]
298 | mock_get_creatives.return_value = mock_get_ad_creatives_response
299 | mock_download.return_value = b"fake_image_bytes"
300 | mock_pil_open.return_value = mock_pil_image
301 | mock_bytesio.return_value = mock_byte_stream
302 |
303 | # This should fall back to thumbnail_url
304 | result = await get_ad_image(access_token="test_token", ad_id="test_ad_id")
305 |
306 | # Verify it used the thumbnail URL
307 | assert result is not None
308 | mock_download.assert_called_once_with("https://example.com/thumb_only.jpg")
309 |
310 | async def test_get_ad_image_no_direct_urls_available(self):
311 | """Test error handling when no direct URLs are available."""
312 |
313 | # Mock responses for creative without any URLs
314 | mock_ad_data = {
315 | "account_id": "act_123456789",
316 | "creative": {"id": "creative_123456789"}
317 | }
318 |
319 | mock_creative_details = {
320 | "id": "creative_123456789",
321 | "name": "No URLs Creative"
322 | # No image_hash
323 | }
324 |
325 | # Mock get_ad_creatives response without URLs
326 | mock_get_ad_creatives_response = json.dumps({
327 | "data": [
328 | {
329 | "id": "creative_123456789",
330 | "name": "No URLs Creative",
331 | "status": "ACTIVE"
332 | # No thumbnail_url or image_urls_for_viewing
333 | }
334 | ]
335 | })
336 |
337 | with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api, \
338 | patch('meta_ads_mcp.core.ads.get_ad_creatives', new_callable=AsyncMock) as mock_get_creatives:
339 |
340 | mock_api.side_effect = [mock_ad_data, mock_creative_details]
341 | mock_get_creatives.return_value = mock_get_ad_creatives_response
342 |
343 | # This should return appropriate error
344 | result = await get_ad_image(access_token="test_token", ad_id="test_ad_id")
345 |
346 | # Should get error about no URLs
347 | assert isinstance(result, str)
348 | assert "No image URLs found" in result
349 |
350 | async def test_get_ad_image_direct_url_download_failure(self):
351 | """Test error handling when direct URL download fails."""
352 |
353 | # Mock responses for creative with URLs but download failure
354 | mock_ad_data = {
355 | "account_id": "act_123456789",
356 | "creative": {"id": "creative_123456789"}
357 | }
358 |
359 | mock_creative_details = {
360 | "id": "creative_123456789",
361 | "name": "Download Fail Creative"
362 | }
363 |
364 | mock_get_ad_creatives_response = json.dumps({
365 | "data": [
366 | {
367 | "id": "creative_123456789",
368 | "name": "Download Fail Creative",
369 | "image_urls_for_viewing": ["https://example.com/broken_image.jpg"]
370 | }
371 | ]
372 | })
373 |
374 | with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api, \
375 | patch('meta_ads_mcp.core.ads.get_ad_creatives', new_callable=AsyncMock) as mock_get_creatives, \
376 | patch('meta_ads_mcp.core.ads.download_image', new_callable=AsyncMock) as mock_download:
377 |
378 | mock_api.side_effect = [mock_ad_data, mock_creative_details]
379 | mock_get_creatives.return_value = mock_get_ad_creatives_response
380 | mock_download.return_value = None # Simulate download failure
381 |
382 | # This should return download error
383 | result = await get_ad_image(access_token="test_token", ad_id="test_ad_id")
384 |
385 | # Should get error about download failure
386 | assert isinstance(result, str)
387 | assert "Failed to download image from direct URL" in result
388 |
389 | async def test_get_ad_image_quality_improvement_prioritizes_high_quality(self):
390 | """Test that the image quality improvement correctly prioritizes high-quality images over thumbnails."""
391 |
392 | # Mock responses for creative with both high-quality and thumbnail URLs
393 | mock_ad_data = {
394 | "account_id": "act_123456789",
395 | "creative": {"id": "creative_123456789"}
396 | }
397 |
398 | mock_creative_details = {
399 | "id": "creative_123456789",
400 | "name": "Quality Test Creative"
401 | }
402 |
403 | # Mock get_ad_creatives response with both URLs
404 | mock_get_ad_creatives_response = json.dumps({
405 | "data": [
406 | {
407 | "id": "creative_123456789",
408 | "name": "Quality Test Creative",
409 | "status": "ACTIVE",
410 | "thumbnail_url": "https://example.com/thumbnail_64x64.jpg", # Low quality thumbnail
411 | "image_url": "https://example.com/full_image.jpg", # Medium quality
412 | "image_urls_for_viewing": [
413 | "https://example.com/high_quality_image.jpg", # Highest quality
414 | "https://example.com/alt_high_quality.jpg"
415 | ],
416 | "object_story_spec": {
417 | "link_data": {
418 | "picture": "https://example.com/object_story_picture.jpg"
419 | }
420 | }
421 | }
422 | ]
423 | })
424 |
425 | # Mock PIL Image processing
426 | mock_pil_image = MagicMock()
427 | mock_pil_image.mode = "RGB"
428 | mock_pil_image.convert.return_value = mock_pil_image
429 |
430 | mock_byte_stream = MagicMock()
431 | mock_byte_stream.getvalue.return_value = b"fake_jpeg_data"
432 |
433 | with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api, \
434 | patch('meta_ads_mcp.core.ads.get_ad_creatives', new_callable=AsyncMock) as mock_get_creatives, \
435 | patch('meta_ads_mcp.core.ads.download_image', new_callable=AsyncMock) as mock_download, \
436 | patch('meta_ads_mcp.core.ads.PILImage.open') as mock_pil_open, \
437 | patch('meta_ads_mcp.core.ads.io.BytesIO') as mock_bytesio:
438 |
439 | mock_api.side_effect = [mock_ad_data, mock_creative_details]
440 | mock_get_creatives.return_value = mock_get_ad_creatives_response
441 | mock_download.return_value = b"fake_image_bytes"
442 | mock_pil_open.return_value = mock_pil_image
443 | mock_bytesio.return_value = mock_byte_stream
444 |
445 | # This should prioritize image_urls_for_viewing[0] over thumbnail_url
446 | result = await get_ad_image(access_token="test_token", ad_id="test_ad_id")
447 |
448 | # Verify it used the highest quality URL, not the thumbnail
449 | assert result is not None
450 | mock_download.assert_called_once_with("https://example.com/high_quality_image.jpg")
451 |
452 | # Verify it did NOT use the thumbnail URL
453 | # Check that the call was made with the high-quality URL, not the thumbnail
454 | mock_download.assert_called_once_with("https://example.com/high_quality_image.jpg")
```
--------------------------------------------------------------------------------
/tests/test_dsa_integration.py:
--------------------------------------------------------------------------------
```python
1 | #!/usr/bin/env python3
2 | """
3 | Integration test for DSA beneficiary functionality.
4 |
5 | This test demonstrates the complete DSA beneficiary implementation working end-to-end,
6 | including detection, parameter support, and error handling.
7 | """
8 |
9 | import pytest
10 | import json
11 | from unittest.mock import AsyncMock, patch
12 |
13 | from meta_ads_mcp.core.adsets import create_adset, get_adset_details
14 | from meta_ads_mcp.core.accounts import get_account_info
15 |
16 |
17 | class TestDSAIntegration:
18 | """Integration tests for DSA beneficiary functionality"""
19 |
20 | @pytest.mark.asyncio
21 | async def test_dsa_beneficiary_complete_workflow(self):
22 | """Test complete DSA beneficiary workflow from account detection to ad set creation"""
23 |
24 | # Step 1: Get account info and detect DSA requirement
25 | mock_account_response = {
26 | "id": "act_701351919139047",
27 | "name": "Test European Account",
28 | "account_status": 1,
29 | "business_country_code": "DE", # Germany - DSA compliant
30 | "business_city": "Berlin",
31 | "currency": "EUR"
32 | }
33 |
34 | with patch('meta_ads_mcp.core.accounts.make_api_request', new_callable=AsyncMock) as mock_account_api:
35 | with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
36 | mock_auth.return_value = "test_access_token"
37 | mock_account_api.return_value = mock_account_response
38 |
39 | # Get account info and verify DSA detection
40 | result = await get_account_info(account_id="act_701351919139047")
41 |
42 | # Handle new return format (dictionary instead of JSON string)
43 | if isinstance(result, dict):
44 | result_data = result
45 | else:
46 | result_data = json.loads(result)
47 |
48 | # Verify DSA requirement is detected
49 | assert result_data["business_country_code"] == "DE"
50 | assert result_data["dsa_required"] == True
51 | assert "DSA (Digital Services Act)" in result_data["dsa_compliance_note"]
52 |
53 | # Step 2: Create ad set with DSA beneficiary
54 | mock_adset_response = {
55 | "id": "23842588888640185",
56 | "name": "Test European Ad Set",
57 | "status": "PAUSED",
58 | "dsa_beneficiary": "Test Organization GmbH"
59 | }
60 |
61 | with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_adset_api:
62 | with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
63 | mock_auth.return_value = "test_access_token"
64 | mock_adset_api.return_value = mock_adset_response
65 |
66 | # Create ad set with DSA beneficiary
67 | result = await create_adset(
68 | account_id="act_701351919139047",
69 | campaign_id="23842588888640184",
70 | name="Test European Ad Set",
71 | optimization_goal="LINK_CLICKS",
72 | billing_event="IMPRESSIONS",
73 | dsa_beneficiary="Test Organization GmbH"
74 | )
75 |
76 | result_data = json.loads(result)
77 |
78 | # Verify successful creation with DSA beneficiary
79 | assert result_data["id"] == "23842588888640185"
80 | assert result_data["dsa_beneficiary"] == "Test Organization GmbH"
81 |
82 | # Verify API call included DSA beneficiary parameter
83 | mock_adset_api.assert_called_once()
84 | call_args = mock_adset_api.call_args
85 | params = call_args[0][2] # Third argument is params
86 | assert "dsa_beneficiary" in params
87 | assert params["dsa_beneficiary"] == "Test Organization GmbH"
88 |
89 | @pytest.mark.asyncio
90 | async def test_dsa_beneficiary_error_handling_integration(self):
91 | """Test DSA beneficiary error handling in real-world scenarios"""
92 |
93 | # Test 1: Missing DSA beneficiary for European account
94 | mock_error_response = {
95 | "error": {
96 | "message": "Enter the person or organization that benefits from ads in this ad set",
97 | "type": "OAuthException",
98 | "code": 100
99 | }
100 | }
101 |
102 | with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
103 | with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
104 | mock_auth.return_value = "test_access_token"
105 | mock_api.side_effect = Exception(json.dumps(mock_error_response))
106 |
107 | result = await create_adset(
108 | account_id="act_701351919139047",
109 | campaign_id="23842588888640184",
110 | name="Test European Ad Set",
111 | optimization_goal="LINK_CLICKS",
112 | billing_event="IMPRESSIONS"
113 | # No DSA beneficiary provided
114 | )
115 |
116 | result_data = json.loads(result)
117 |
118 | # Handle response wrapped in 'data' field by meta_api_tool decorator
119 | if "data" in result_data:
120 | actual_data = json.loads(result_data["data"])
121 | else:
122 | actual_data = result_data
123 |
124 | # Verify error is properly handled
125 | assert "error" in actual_data
126 | assert "benefits from ads" in actual_data["error"]
127 |
128 | # Test 2: Permission error for DSA beneficiary
129 | with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
130 | with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
131 | mock_auth.return_value = "test_access_token"
132 | mock_api.side_effect = Exception("Permission denied: business_management permission required")
133 |
134 | result = await create_adset(
135 | account_id="act_701351919139047",
136 | campaign_id="23842588888640184",
137 | name="Test European Ad Set",
138 | optimization_goal="LINK_CLICKS",
139 | billing_event="IMPRESSIONS",
140 | dsa_beneficiary="Test Organization GmbH"
141 | )
142 |
143 | result_data = json.loads(result)
144 |
145 | # Handle response wrapped in 'data' field by meta_api_tool decorator
146 | if "data" in result_data:
147 | actual_data = json.loads(result_data["data"])
148 | else:
149 | actual_data = result_data
150 |
151 | # Verify permission error is handled
152 | assert "error" in actual_data
153 | assert "permission" in actual_data["error"].lower()
154 |
155 | @pytest.mark.asyncio
156 | async def test_dsa_beneficiary_regional_compliance_integration(self):
157 | """Test DSA beneficiary compliance across different regions"""
158 |
159 | # Test 1: European account (DSA required)
160 | mock_de_account_response = {
161 | "id": "act_de",
162 | "name": "German Account",
163 | "business_country_code": "DE",
164 | "currency": "EUR"
165 | }
166 |
167 | with patch('meta_ads_mcp.core.accounts.make_api_request', new_callable=AsyncMock) as mock_api:
168 | with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
169 | mock_auth.return_value = "test_access_token"
170 | mock_api.return_value = mock_de_account_response
171 |
172 | result = await get_account_info(account_id="act_de")
173 |
174 | # Handle new return format (dictionary instead of JSON string)
175 | if isinstance(result, dict):
176 | result_data = result
177 | else:
178 | result_data = json.loads(result)
179 |
180 | # Verify DSA requirement for German account
181 | assert result_data["business_country_code"] == "DE"
182 | assert result_data["dsa_required"] == True
183 |
184 | # Test 2: US account (DSA not required)
185 | mock_us_account_response = {
186 | "id": "act_us",
187 | "name": "US Account",
188 | "business_country_code": "US",
189 | "currency": "USD"
190 | }
191 |
192 | with patch('meta_ads_mcp.core.accounts.make_api_request', new_callable=AsyncMock) as mock_api:
193 | with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
194 | mock_auth.return_value = "test_access_token"
195 | mock_api.return_value = mock_us_account_response
196 |
197 | result = await get_account_info(account_id="act_us")
198 |
199 | # Handle new return format (dictionary instead of JSON string)
200 | if isinstance(result, dict):
201 | result_data = result
202 | else:
203 | result_data = json.loads(result)
204 |
205 | # Verify no DSA requirement for US account
206 | assert result_data["business_country_code"] == "US"
207 | assert result_data["dsa_required"] == False
208 |
209 | @pytest.mark.asyncio
210 | async def test_dsa_beneficiary_parameter_formats_integration(self):
211 | """Test different DSA beneficiary parameter formats in real scenarios"""
212 |
213 | test_cases = [
214 | "Test Organization GmbH",
215 | "Test Organization, Inc.",
216 | "Test Organization Ltd.",
217 | "Test Organization AG",
218 | "Test Organization BV",
219 | "Test Organization SARL"
220 | ]
221 |
222 | for beneficiary_name in test_cases:
223 | mock_response = {
224 | "id": "23842588888640185",
225 | "name": "Test Ad Set",
226 | "status": "PAUSED",
227 | "dsa_beneficiary": beneficiary_name
228 | }
229 |
230 | with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
231 | with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
232 | mock_auth.return_value = "test_access_token"
233 | mock_api.return_value = mock_response
234 |
235 | result = await create_adset(
236 | account_id="act_701351919139047",
237 | campaign_id="23842588888640184",
238 | name="Test Ad Set",
239 | optimization_goal="LINK_CLICKS",
240 | billing_event="IMPRESSIONS",
241 | dsa_beneficiary=beneficiary_name
242 | )
243 |
244 | result_data = json.loads(result)
245 |
246 | # Verify successful creation with different formats
247 | assert result_data["id"] == "23842588888640185"
248 | assert result_data["dsa_beneficiary"] == beneficiary_name
249 |
250 | @pytest.mark.asyncio
251 | async def test_dsa_beneficiary_retrieval_integration(self):
252 | """Test complete workflow including retrieving DSA beneficiary information"""
253 |
254 | # Step 1: Create ad set with DSA beneficiary
255 | mock_create_response = {
256 | "id": "120229746629010183",
257 | "name": "Test Ad Set with DSA",
258 | "status": "PAUSED"
259 | }
260 |
261 | with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
262 | with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
263 | mock_auth.return_value = "test_access_token"
264 | mock_api.return_value = mock_create_response
265 |
266 | # Create ad set
267 | result = await create_adset(
268 | account_id="act_701351919139047",
269 | campaign_id="120229656904980183",
270 | name="Test Ad Set with DSA",
271 | optimization_goal="LINK_CLICKS",
272 | billing_event="IMPRESSIONS",
273 | dsa_beneficiary="Test Organization Inc"
274 | )
275 |
276 | result_data = json.loads(result)
277 | adset_id = result_data["id"]
278 |
279 | # Verify creation was successful
280 | assert adset_id == "120229746629010183"
281 |
282 | # Step 2: Retrieve ad set details including DSA beneficiary
283 | mock_details_response = {
284 | "id": "120229746629010183",
285 | "name": "Test Ad Set with DSA",
286 | "campaign_id": "120229656904980183",
287 | "status": "PAUSED",
288 | "daily_budget": "1000",
289 | "targeting": {
290 | "geo_locations": {"countries": ["US"]},
291 | "age_min": 25,
292 | "age_max": 65
293 | },
294 | "bid_amount": 200,
295 | "optimization_goal": "LINK_CLICKS",
296 | "billing_event": "IMPRESSIONS",
297 | "dsa_beneficiary": "Test Organization Inc"
298 | }
299 |
300 | with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
301 | with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
302 | mock_auth.return_value = "test_access_token"
303 | mock_api.return_value = mock_details_response
304 |
305 | # Retrieve ad set details
306 | result = await get_adset_details(adset_id=adset_id)
307 | result_data = json.loads(result)
308 |
309 | # Verify DSA beneficiary field is retrieved correctly
310 | assert result_data["id"] == "120229746629010183"
311 | assert "dsa_beneficiary" in result_data
312 | assert result_data["dsa_beneficiary"] == "Test Organization Inc"
313 |
314 | # Verify API call included dsa_beneficiary in fields
315 | mock_api.assert_called_once()
316 | call_args = mock_api.call_args
317 | assert "dsa_beneficiary" in str(call_args)
318 |
319 | @pytest.mark.asyncio
320 | async def test_dsa_beneficiary_us_account_integration(self):
321 | """Test DSA beneficiary behavior for US accounts (optional parameter)"""
322 |
323 | # Step 1: Verify US account doesn't require DSA
324 | mock_us_account_response = {
325 | "id": "act_701351919139047",
326 | "name": "US Business Account",
327 | "business_country_code": "US",
328 | "account_status": 1
329 | }
330 |
331 | with patch('meta_ads_mcp.core.accounts.make_api_request', new_callable=AsyncMock) as mock_api:
332 | with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
333 | mock_auth.return_value = "test_access_token"
334 | mock_api.return_value = mock_us_account_response
335 |
336 | result = await get_account_info(account_id="act_701351919139047")
337 | result_data = json.loads(result)
338 |
339 | # Verify US account doesn't require DSA
340 | assert result_data["business_country_code"] == "US"
341 | assert result_data["dsa_required"] == False
342 |
343 | # Step 2: Create ad set without DSA beneficiary (should work for US)
344 | mock_create_response = {
345 | "id": "120229746624860183",
346 | "name": "Test US Ad Set",
347 | "status": "PAUSED"
348 | }
349 |
350 | with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
351 | with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
352 | mock_auth.return_value = "test_access_token"
353 | mock_api.return_value = mock_create_response
354 |
355 | result = await create_adset(
356 | account_id="act_701351919139047",
357 | campaign_id="120229656904980183",
358 | name="Test US Ad Set",
359 | optimization_goal="LINK_CLICKS",
360 | billing_event="IMPRESSIONS"
361 | # No DSA beneficiary provided
362 | )
363 |
364 | result_data = json.loads(result)
365 |
366 | # Verify creation was successful without DSA beneficiary
367 | assert result_data["id"] == "120229746624860183"
368 |
369 | # Step 3: Retrieve ad set details (should not have dsa_beneficiary field)
370 | mock_details_response = {
371 | "id": "120229746624860183",
372 | "name": "Test US Ad Set",
373 | "campaign_id": "120229656904980183",
374 | "status": "PAUSED",
375 | "daily_budget": "1000",
376 | "targeting": {
377 | "geo_locations": {"countries": ["US"]}
378 | },
379 | "bid_amount": 200,
380 | "optimization_goal": "LINK_CLICKS",
381 | "billing_event": "IMPRESSIONS"
382 | # No dsa_beneficiary field
383 | }
384 |
385 | with patch('meta_ads_mcp.core.adsets.make_api_request', new_callable=AsyncMock) as mock_api:
386 | with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
387 | mock_auth.return_value = "test_access_token"
388 | mock_api.return_value = mock_details_response
389 |
390 | result = await get_adset_details(adset_id="120229746624860183")
391 | result_data = json.loads(result)
392 |
393 | # Verify ad set details are retrieved correctly
394 | assert result_data["id"] == "120229746624860183"
395 | assert "dsa_beneficiary" not in result_data # Should not be present for US accounts
396 |
397 | @pytest.mark.asyncio
398 | async def test_account_info_requires_account_id(self):
399 | """Test that get_account_info requires an account_id parameter"""
400 |
401 | with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
402 | mock_auth.return_value = "test_access_token"
403 |
404 | # Test without account_id parameter
405 | result = await get_account_info(account_id=None)
406 |
407 | # Handle new return format (dictionary instead of JSON string)
408 | if isinstance(result, dict):
409 | result_data = result
410 | else:
411 | result_data = json.loads(result)
412 |
413 | # Verify error message for missing account_id
414 | assert "error" in result_data
415 | assert "Account ID is required" in result_data["error"]["message"]
416 | assert "Please specify an account_id parameter" in result_data["error"]["details"]
417 | assert "example" in result_data["error"]
418 |
419 | @pytest.mark.asyncio
420 | async def test_account_info_inaccessible_account_error(self):
421 | """Test that get_account_info provides helpful error for inaccessible accounts"""
422 |
423 | # Mock permission error for direct account access (first API call)
424 | mock_permission_error = {
425 | "error": {
426 | "message": "Insufficient access privileges",
427 | "type": "OAuthException",
428 | "code": 200
429 | }
430 | }
431 |
432 | # Mock accessible accounts response (second API call)
433 | mock_accessible_accounts = {
434 | "data": [
435 | {"id": "act_123", "name": "Test Account 1"},
436 | {"id": "act_456", "name": "Test Account 2"}
437 | ]
438 | }
439 |
440 | with patch('meta_ads_mcp.core.accounts.make_api_request', new_callable=AsyncMock) as mock_api:
441 | with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
442 | mock_auth.return_value = "test_access_token"
443 | # First call returns permission error, second call returns accessible accounts
444 | mock_api.side_effect = [mock_permission_error, mock_accessible_accounts]
445 |
446 | result = await get_account_info(account_id="act_inaccessible")
447 |
448 | # Handle new return format (dictionary instead of JSON string)
449 | if isinstance(result, dict):
450 | result_data = result
451 | else:
452 | result_data = json.loads(result)
453 |
454 | # Verify helpful error message for inaccessible account
455 | assert "error" in result_data
456 | assert "not accessible to your user account" in result_data["error"]["message"]
457 | assert "accessible_accounts" in result_data["error"]
458 | assert "suggestion" in result_data["error"]
459 | assert len(result_data["error"]["accessible_accounts"]) == 2
```
--------------------------------------------------------------------------------
/tests/test_mobile_app_adset_creation.py:
--------------------------------------------------------------------------------
```python
1 | #!/usr/bin/env python3
2 | """
3 | Unit Tests for Mobile App Adset Creation Functionality
4 |
5 | This test suite validates the mobile app parameters implementation for the
6 | create_adset function in meta_ads_mcp/core/adsets.py.
7 |
8 | Test cases cover:
9 | - Mobile app adset creation success scenarios
10 | - promoted_object parameter validation and formatting
11 | - destination_type parameter validation
12 | - Mobile app specific error handling
13 | - Cross-platform mobile app support (iOS, Android)
14 | - Integration with APP_INSTALLS optimization goal
15 |
16 | Usage:
17 | uv run python -m pytest tests/test_mobile_app_adset_creation.py -v
18 |
19 | Related to Issue #008: Missing Mobile App Parameters in create_adset Function
20 | """
21 |
22 | import pytest
23 | import json
24 | import asyncio
25 | from unittest.mock import AsyncMock, patch, MagicMock
26 | from typing import Dict, Any, List
27 |
28 | # Import the function to test
29 | from meta_ads_mcp.core.adsets import create_adset
30 |
31 |
32 | class TestMobileAppAdsetCreation:
33 | """Test suite for mobile app adset creation functionality"""
34 |
35 | @pytest.fixture
36 | def mock_api_request(self):
37 | """Mock for the make_api_request function"""
38 | with patch('meta_ads_mcp.core.adsets.make_api_request') as mock:
39 | mock.return_value = {
40 | "id": "test_mobile_adset_id",
41 | "name": "Test Mobile App Adset",
42 | "optimization_goal": "APP_INSTALLS",
43 | "promoted_object": {
44 | "application_id": "123456789012345",
45 | "object_store_url": "https://apps.apple.com/app/id123456789"
46 | },
47 | "destination_type": "APP_STORE"
48 | }
49 | yield mock
50 |
51 | @pytest.fixture
52 | def mock_auth_manager(self):
53 | """Mock for the authentication manager"""
54 | with patch('meta_ads_mcp.core.api.auth_manager') as mock, \
55 | patch('meta_ads_mcp.core.auth.get_current_access_token') as mock_get_token:
56 | # Mock a valid access token
57 | mock.get_current_access_token.return_value = "test_access_token"
58 | mock.is_token_valid.return_value = True
59 | mock.app_id = "test_app_id"
60 | mock_get_token.return_value = "test_access_token"
61 | yield mock
62 |
63 | @pytest.fixture
64 | def valid_mobile_app_params(self):
65 | """Valid mobile app parameters for testing"""
66 | return {
67 | "account_id": "act_123456789",
68 | "campaign_id": "campaign_123456789",
69 | "name": "Test Mobile App Adset",
70 | "optimization_goal": "APP_INSTALLS",
71 | "billing_event": "IMPRESSIONS",
72 | "targeting": {
73 | "age_min": 18,
74 | "age_max": 65,
75 | "app_install_state": "not_installed",
76 | "geo_locations": {"countries": ["US"]},
77 | "user_device": ["Android_Smartphone", "iPhone"],
78 | "user_os": ["Android", "iOS"]
79 | }
80 | }
81 |
82 | @pytest.fixture
83 | def ios_promoted_object(self):
84 | """Valid iOS app promoted object"""
85 | return {
86 | "application_id": "123456789012345",
87 | "object_store_url": "https://apps.apple.com/app/id123456789",
88 | "custom_event_type": "APP_INSTALL"
89 | }
90 |
91 | @pytest.fixture
92 | def android_promoted_object(self):
93 | """Valid Android app promoted object"""
94 | return {
95 | "application_id": "987654321098765",
96 | "object_store_url": "https://play.google.com/store/apps/details?id=com.example.app",
97 | "custom_event_type": "APP_INSTALL"
98 | }
99 |
100 | @pytest.fixture
101 | def promoted_object_with_pixel(self):
102 | """Promoted object with Facebook pixel for tracking"""
103 | return {
104 | "application_id": "123456789012345",
105 | "object_store_url": "https://apps.apple.com/app/id123456789",
106 | "custom_event_type": "APP_INSTALL",
107 | "pixel_id": "pixel_123456789"
108 | }
109 |
110 | # Test: Mobile App Adset Creation Success
111 | @pytest.mark.asyncio
112 | async def test_mobile_app_adset_creation_success_ios(
113 | self, mock_api_request, mock_auth_manager, valid_mobile_app_params, ios_promoted_object
114 | ):
115 | """Test successful iOS mobile app adset creation"""
116 |
117 | result = await create_adset(
118 | **valid_mobile_app_params,
119 | promoted_object=ios_promoted_object,
120 | destination_type="APP_STORE"
121 | )
122 |
123 | # Parse the result
124 | result_data = json.loads(result)
125 |
126 | # Verify the API was called with correct parameters
127 | mock_api_request.assert_called_once()
128 | call_args = mock_api_request.call_args
129 |
130 | # Check endpoint (first argument)
131 | assert call_args[0][0] == f"{valid_mobile_app_params['account_id']}/adsets"
132 |
133 | # Check parameters (third argument)
134 | params = call_args[0][2]
135 | assert 'promoted_object' in params
136 | assert 'destination_type' in params
137 |
138 | # Verify promoted_object is properly JSON-encoded
139 | promoted_obj_param = json.loads(params['promoted_object']) if isinstance(params['promoted_object'], str) else params['promoted_object']
140 | assert promoted_obj_param['application_id'] == ios_promoted_object['application_id']
141 | assert promoted_obj_param['object_store_url'] == ios_promoted_object['object_store_url']
142 |
143 | # Verify destination_type
144 | assert params['destination_type'] == "APP_STORE"
145 |
146 | # Verify response structure
147 | assert 'id' in result_data
148 | assert result_data['optimization_goal'] == "APP_INSTALLS"
149 |
150 | @pytest.mark.asyncio
151 | async def test_mobile_app_adset_creation_success_android(
152 | self, mock_api_request, mock_auth_manager, valid_mobile_app_params, android_promoted_object
153 | ):
154 | """Test successful Android mobile app adset creation"""
155 |
156 | result = await create_adset(
157 | **valid_mobile_app_params,
158 | promoted_object=android_promoted_object,
159 | destination_type="APP_STORE"
160 | )
161 |
162 | # Parse the result
163 | result_data = json.loads(result)
164 |
165 | # Verify the API was called
166 | mock_api_request.assert_called_once()
167 | call_args = mock_api_request.call_args
168 | params = call_args[0][2]
169 |
170 | # Verify Android-specific promoted_object
171 | promoted_obj_param = json.loads(params['promoted_object']) if isinstance(params['promoted_object'], str) else params['promoted_object']
172 | assert promoted_obj_param['application_id'] == android_promoted_object['application_id']
173 | assert "play.google.com" in promoted_obj_param['object_store_url']
174 |
175 | # Verify response
176 | assert 'id' in result_data
177 |
178 | @pytest.mark.asyncio
179 | async def test_mobile_app_adset_with_pixel_tracking(
180 | self, mock_api_request, mock_auth_manager, valid_mobile_app_params, promoted_object_with_pixel
181 | ):
182 | """Test mobile app adset creation with Facebook pixel tracking"""
183 |
184 | result = await create_adset(
185 | **valid_mobile_app_params,
186 | promoted_object=promoted_object_with_pixel,
187 | destination_type="APP_STORE"
188 | )
189 |
190 | # Verify pixel_id is included
191 | call_args = mock_api_request.call_args
192 | params = call_args[0][2]
193 | promoted_obj_param = json.loads(params['promoted_object']) if isinstance(params['promoted_object'], str) else params['promoted_object']
194 |
195 | assert 'pixel_id' in promoted_obj_param
196 | assert promoted_obj_param['pixel_id'] == promoted_object_with_pixel['pixel_id']
197 |
198 | # Test: Parameter Validation
199 | @pytest.mark.asyncio
200 | async def test_invalid_promoted_object_missing_application_id(
201 | self, mock_api_request, mock_auth_manager, valid_mobile_app_params
202 | ):
203 | """Test validation error for promoted_object missing application_id"""
204 |
205 | invalid_promoted_object = {
206 | "object_store_url": "https://apps.apple.com/app/id123456789",
207 | "custom_event_type": "APP_INSTALL"
208 | }
209 |
210 | result = await create_adset(
211 | **valid_mobile_app_params,
212 | promoted_object=invalid_promoted_object,
213 | destination_type="APP_STORE"
214 | )
215 |
216 | result_data = json.loads(result)
217 |
218 | # Should return validation error - check for data wrapper format
219 | if "data" in result_data:
220 | error_data = json.loads(result_data["data"])
221 | assert 'error' in error_data
222 | assert 'application_id' in error_data['error'].lower()
223 | else:
224 | assert 'error' in result_data
225 | assert 'application_id' in result_data['error'].lower()
226 |
227 | @pytest.mark.asyncio
228 | async def test_invalid_promoted_object_missing_store_url(
229 | self, mock_api_request, mock_auth_manager, valid_mobile_app_params
230 | ):
231 | """Test validation error for promoted_object missing object_store_url"""
232 |
233 | invalid_promoted_object = {
234 | "application_id": "123456789012345",
235 | "custom_event_type": "APP_INSTALL"
236 | }
237 |
238 | result = await create_adset(
239 | **valid_mobile_app_params,
240 | promoted_object=invalid_promoted_object,
241 | destination_type="APP_STORE"
242 | )
243 |
244 | result_data = json.loads(result)
245 |
246 | # Should return validation error - check for data wrapper format
247 | if "data" in result_data:
248 | error_data = json.loads(result_data["data"])
249 | assert 'error' in error_data
250 | assert 'object_store_url' in error_data['error'].lower()
251 | else:
252 | assert 'error' in result_data
253 | assert 'object_store_url' in result_data['error'].lower()
254 |
255 | @pytest.mark.asyncio
256 | async def test_invalid_destination_type(
257 | self, mock_api_request, mock_auth_manager, valid_mobile_app_params, ios_promoted_object
258 | ):
259 | """Test validation error for invalid destination_type value"""
260 |
261 | result = await create_adset(
262 | **valid_mobile_app_params,
263 | promoted_object=ios_promoted_object,
264 | destination_type="INVALID_TYPE"
265 | )
266 |
267 | result_data = json.loads(result)
268 |
269 | # Should return validation error - check for data wrapper format
270 | if "data" in result_data:
271 | error_data = json.loads(result_data["data"])
272 | assert 'error' in error_data
273 | assert 'destination_type' in error_data['error'].lower()
274 | else:
275 | assert 'error' in result_data
276 | assert 'destination_type' in result_data['error'].lower()
277 |
278 | @pytest.mark.asyncio
279 | async def test_app_installs_requires_promoted_object(
280 | self, mock_api_request, mock_auth_manager, valid_mobile_app_params
281 | ):
282 | """Test that APP_INSTALLS optimization goal requires promoted_object"""
283 |
284 | result = await create_adset(
285 | **valid_mobile_app_params,
286 | # Missing promoted_object
287 | destination_type="APP_STORE"
288 | )
289 |
290 | result_data = json.loads(result)
291 |
292 | # Should return validation error - check for data wrapper format
293 | if "data" in result_data:
294 | error_data = json.loads(result_data["data"])
295 | assert 'error' in error_data
296 | assert 'promoted_object' in error_data['error'].lower()
297 | assert 'app_installs' in error_data['error'].lower()
298 | else:
299 | assert 'error' in result_data
300 | assert 'promoted_object' in result_data['error'].lower()
301 | assert 'app_installs' in result_data['error'].lower()
302 |
303 | # Test: Cross-platform Support
304 | @pytest.mark.asyncio
305 | async def test_ios_app_store_url_validation(
306 | self, mock_api_request, mock_auth_manager, valid_mobile_app_params
307 | ):
308 | """Test iOS App Store URL format validation"""
309 |
310 | ios_promoted_object = {
311 | "application_id": "123456789012345",
312 | "object_store_url": "https://apps.apple.com/app/id123456789",
313 | "custom_event_type": "APP_INSTALL"
314 | }
315 |
316 | result = await create_adset(
317 | **valid_mobile_app_params,
318 | promoted_object=ios_promoted_object,
319 | destination_type="APP_STORE"
320 | )
321 |
322 | result_data = json.loads(result)
323 |
324 | # Should succeed for valid iOS URL
325 | assert 'error' not in result_data or result_data.get('error') is None
326 |
327 | @pytest.mark.asyncio
328 | async def test_google_play_url_validation(
329 | self, mock_api_request, mock_auth_manager, valid_mobile_app_params
330 | ):
331 | """Test Google Play Store URL format validation"""
332 |
333 | android_promoted_object = {
334 | "application_id": "987654321098765",
335 | "object_store_url": "https://play.google.com/store/apps/details?id=com.example.app",
336 | "custom_event_type": "APP_INSTALL"
337 | }
338 |
339 | result = await create_adset(
340 | **valid_mobile_app_params,
341 | promoted_object=android_promoted_object,
342 | destination_type="APP_STORE"
343 | )
344 |
345 | result_data = json.loads(result)
346 |
347 | # Should succeed for valid Google Play URL
348 | assert 'error' not in result_data or result_data.get('error') is None
349 |
350 | @pytest.mark.asyncio
351 | async def test_invalid_store_url_format(
352 | self, mock_api_request, mock_auth_manager, valid_mobile_app_params
353 | ):
354 | """Test validation error for invalid app store URL format"""
355 |
356 | invalid_promoted_object = {
357 | "application_id": "123456789012345",
358 | "object_store_url": "https://example.com/invalid-url",
359 | "custom_event_type": "APP_INSTALL"
360 | }
361 |
362 | result = await create_adset(
363 | **valid_mobile_app_params,
364 | promoted_object=invalid_promoted_object,
365 | destination_type="APP_STORE"
366 | )
367 |
368 | result_data = json.loads(result)
369 |
370 | # Should return validation error for invalid URL - check for data wrapper format
371 | if "data" in result_data:
372 | error_data = json.loads(result_data["data"])
373 | assert 'error' in error_data
374 | assert 'store url' in error_data['error'].lower() or 'object_store_url' in error_data['error'].lower()
375 | else:
376 | assert 'error' in result_data
377 | assert 'store url' in result_data['error'].lower() or 'object_store_url' in result_data['error'].lower()
378 |
379 | # Test: Destination Type Variations
380 | @pytest.mark.asyncio
381 | async def test_deeplink_destination_type(
382 | self, mock_api_request, mock_auth_manager, valid_mobile_app_params, ios_promoted_object
383 | ):
384 | """Test DEEPLINK destination_type"""
385 |
386 | result = await create_adset(
387 | **valid_mobile_app_params,
388 | promoted_object=ios_promoted_object,
389 | destination_type="DEEPLINK"
390 | )
391 |
392 | call_args = mock_api_request.call_args
393 | params = call_args[0][2]
394 | assert params['destination_type'] == "DEEPLINK"
395 |
396 | @pytest.mark.asyncio
397 | async def test_app_install_destination_type(
398 | self, mock_api_request, mock_auth_manager, valid_mobile_app_params, ios_promoted_object
399 | ):
400 | """Test APP_INSTALL destination_type"""
401 |
402 | result = await create_adset(
403 | **valid_mobile_app_params,
404 | promoted_object=ios_promoted_object,
405 | destination_type="APP_INSTALL"
406 | )
407 |
408 | call_args = mock_api_request.call_args
409 | params = call_args[0][2]
410 | assert params['destination_type'] == "APP_INSTALL"
411 |
412 | @pytest.mark.asyncio
413 | async def test_on_ad_destination_type_for_lead_generation(
414 | self, mock_api_request, mock_auth_manager, valid_mobile_app_params
415 | ):
416 | """Test ON_AD destination_type for lead generation campaigns (Issue #009 fix)"""
417 |
418 | # Create a lead generation adset configuration (without promoted_object since it's for lead gen, not mobile apps)
419 | lead_gen_params = valid_mobile_app_params.copy()
420 | lead_gen_params.update({
421 | "optimization_goal": "LEAD_GENERATION",
422 | "billing_event": "IMPRESSIONS"
423 | })
424 |
425 | result = await create_adset(
426 | **lead_gen_params,
427 | destination_type="ON_AD"
428 | )
429 |
430 | # Should pass validation and include destination_type in API call
431 | call_args = mock_api_request.call_args
432 | params = call_args[0][2]
433 | assert params['destination_type'] == "ON_AD"
434 |
435 | @pytest.mark.asyncio
436 | async def test_on_ad_validation_passes(
437 | self, mock_api_request, mock_auth_manager, valid_mobile_app_params
438 | ):
439 | """Test that ON_AD destination_type passes validation (Issue #009 regression test)"""
440 |
441 | # Use parameters that work with ON_AD (lead generation, not mobile app)
442 | lead_gen_params = valid_mobile_app_params.copy()
443 | lead_gen_params.update({
444 | "optimization_goal": "LEAD_GENERATION",
445 | "billing_event": "IMPRESSIONS"
446 | })
447 |
448 | result = await create_adset(
449 | **lead_gen_params,
450 | destination_type="ON_AD"
451 | )
452 |
453 | result_data = json.loads(result)
454 |
455 | # Should NOT return a validation error about destination_type
456 | # Before the fix, this would return: "Invalid destination_type: ON_AD"
457 | if "data" in result_data:
458 | error_data = json.loads(result_data["data"])
459 | assert "error" not in error_data or "destination_type" not in error_data.get("error", "").lower()
460 | else:
461 | assert "error" not in result_data or "destination_type" not in result_data.get("error", "").lower()
462 |
463 | # Test: Error Handling
464 | @pytest.mark.asyncio
465 | async def test_meta_api_error_handling(
466 | self, mock_auth_manager, valid_mobile_app_params, ios_promoted_object
467 | ):
468 | """Test handling of Meta API errors for mobile app adsets"""
469 |
470 | with patch('meta_ads_mcp.core.adsets.make_api_request') as mock_api:
471 | # Mock Meta API error response
472 | mock_api.side_effect = Exception("HTTP Error: 400 - Select a dataset and conversion event for your ad set")
473 |
474 | result = await create_adset(
475 | **valid_mobile_app_params,
476 | promoted_object=ios_promoted_object,
477 | destination_type="APP_STORE"
478 | )
479 |
480 | result_data = json.loads(result)
481 |
482 | # Should handle the error gracefully - check for data wrapper format
483 | if "data" in result_data:
484 | error_data = json.loads(result_data["data"])
485 | assert 'error' in error_data
486 | # Check for error text in either error message or details
487 | error_text = error_data.get('error', '').lower()
488 | details_text = error_data.get('details', '').lower()
489 | assert 'dataset' in error_text or 'conversion event' in error_text or \
490 | 'dataset' in details_text or 'conversion event' in details_text
491 | else:
492 | assert 'error' in result_data
493 | error_text = result_data.get('error', '').lower()
494 | details_text = result_data.get('details', '').lower()
495 | assert 'dataset' in error_text or 'conversion event' in error_text or \
496 | 'dataset' in details_text or 'conversion event' in details_text
497 |
498 | # Test: Backward Compatibility
499 | @pytest.mark.asyncio
500 | async def test_backward_compatibility_non_mobile_campaigns(
501 | self, mock_api_request, mock_auth_manager
502 | ):
503 | """Test that non-mobile campaigns still work without mobile app parameters"""
504 |
505 | non_mobile_params = {
506 | "account_id": "act_123456789",
507 | "campaign_id": "campaign_123456789",
508 | "name": "Test Web Adset",
509 | "optimization_goal": "LINK_CLICKS",
510 | "billing_event": "LINK_CLICKS",
511 | "targeting": {
512 | "age_min": 18,
513 | "age_max": 65,
514 | "geo_locations": {"countries": ["US"]}
515 | }
516 | }
517 |
518 | result = await create_adset(**non_mobile_params)
519 |
520 | # Should work without mobile app parameters
521 | mock_api_request.assert_called_once()
522 | call_args = mock_api_request.call_args
523 | params = call_args[0][2]
524 |
525 | # Should not include mobile app parameters
526 | assert 'promoted_object' not in params
527 | assert 'destination_type' not in params
528 |
529 | @pytest.mark.asyncio
530 | async def test_optional_mobile_parameters(
531 | self, mock_api_request, mock_auth_manager, valid_mobile_app_params, ios_promoted_object
532 | ):
533 | """Test that mobile app parameters are optional for non-APP_INSTALLS campaigns"""
534 |
535 | non_app_install_params = valid_mobile_app_params.copy()
536 | non_app_install_params['optimization_goal'] = "REACH"
537 |
538 | result = await create_adset(
539 | **non_app_install_params,
540 | # Mobile app parameters should be optional for non-APP_INSTALLS
541 | promoted_object=ios_promoted_object,
542 | destination_type="APP_STORE"
543 | )
544 |
545 | # Should work and include mobile parameters if provided
546 | mock_api_request.assert_called_once()
547 | call_args = mock_api_request.call_args
548 | params = call_args[0][2]
549 |
550 | # Mobile parameters should be included if provided
551 | assert 'promoted_object' in params
552 | assert 'destination_type' in params
```
--------------------------------------------------------------------------------
/tests/test_targeting_search_e2e.py:
--------------------------------------------------------------------------------
```python
1 | #!/usr/bin/env python3
2 | """
3 | End-to-End Targeting Search Test for Meta Ads MCP
4 |
5 | This test validates that the targeting search tools correctly find and return
6 | targeting options (interests, behaviors, demographics, geo locations) from the
7 | Meta Ads API through a pre-authenticated MCP server.
8 |
9 | Test functions:
10 | - search_interests
11 | - get_interest_suggestions
12 | - validate_interests
13 | - search_behaviors
14 | - search_demographics
15 | - search_geo_locations
16 | """
17 |
18 | import requests
19 | import json
20 | import os
21 | import sys
22 | from typing import Dict, Any, List
23 |
24 | # Load environment variables from .env file
25 | try:
26 | from dotenv import load_dotenv
27 | load_dotenv()
28 | print("✅ Loaded environment variables from .env file")
29 | except ImportError:
30 | print("⚠️ python-dotenv not installed, using system environment variables only")
31 |
32 | class TargetingSearchTester:
33 | """Test suite focused on targeting search functionality"""
34 |
35 | def __init__(self, base_url: str = "http://localhost:8080"):
36 | self.base_url = base_url.rstrip('/')
37 | self.endpoint = f"{self.base_url}/mcp/"
38 | self.request_id = 1
39 |
40 | # Test data for validation
41 | self.test_queries = {
42 | "interests": ["baseball", "cooking", "travel"],
43 | "geo_locations": ["New York", "California", "Japan"],
44 | "interest_suggestions": ["Basketball", "Soccer"],
45 | "demographics": ["life_events", "industries", "family_statuses"]
46 | }
47 |
48 | def _make_request(self, method: str, params: Dict[str, Any] = None,
49 | headers: Dict[str, str] = None) -> Dict[str, Any]:
50 | """Make a JSON-RPC request to the MCP server"""
51 |
52 | default_headers = {
53 | "Content-Type": "application/json",
54 | "Accept": "application/json, text/event-stream",
55 | "User-Agent": "Targeting-Search-Test-Client/1.0"
56 | }
57 |
58 | if headers:
59 | default_headers.update(headers)
60 |
61 | payload = {
62 | "jsonrpc": "2.0",
63 | "method": method,
64 | "id": self.request_id
65 | }
66 |
67 | if params:
68 | payload["params"] = params
69 |
70 | try:
71 | response = requests.post(
72 | self.endpoint,
73 | headers=default_headers,
74 | json=payload,
75 | timeout=15
76 | )
77 |
78 | self.request_id += 1
79 |
80 | return {
81 | "status_code": response.status_code,
82 | "headers": dict(response.headers),
83 | "json": response.json() if response.status_code == 200 else None,
84 | "text": response.text,
85 | "success": response.status_code == 200
86 | }
87 |
88 | except requests.exceptions.RequestException as e:
89 | return {
90 | "status_code": 0,
91 | "headers": {},
92 | "json": None,
93 | "text": str(e),
94 | "success": False,
95 | "error": str(e)
96 | }
97 |
98 | def test_search_interests(self) -> Dict[str, Any]:
99 | """Test search_interests functionality"""
100 |
101 | print(f"\n🔍 Testing search_interests function")
102 | results = {}
103 |
104 | for query in self.test_queries["interests"]:
105 | print(f" 🔎 Searching for interests: '{query}'")
106 |
107 | result = self._make_request("tools/call", {
108 | "name": "search_interests",
109 | "arguments": {
110 | "query": query,
111 | "limit": 5
112 | }
113 | })
114 |
115 | if not result["success"]:
116 | results[query] = {
117 | "success": False,
118 | "error": result.get("text", "Unknown error")
119 | }
120 | print(f" ❌ Failed: {result.get('text', 'Unknown error')}")
121 | continue
122 |
123 | # Parse response
124 | response_data = result["json"]["result"]
125 | content = response_data.get("content", [{}])[0].get("text", "")
126 |
127 | try:
128 | parsed_content = json.loads(content)
129 |
130 | if "error" in parsed_content:
131 | results[query] = {
132 | "success": False,
133 | "error": parsed_content["error"]
134 | }
135 | print(f" ❌ API Error: {parsed_content['error']}")
136 | continue
137 |
138 | interests = parsed_content.get("data", [])
139 |
140 | results[query] = {
141 | "success": True,
142 | "count": len(interests),
143 | "interests": interests[:3], # Keep first 3 for display
144 | "has_required_fields": all(
145 | "id" in interest and "name" in interest
146 | for interest in interests
147 | )
148 | }
149 |
150 | print(f" ✅ Found {len(interests)} interests")
151 | for interest in interests[:3]:
152 | print(f" • {interest.get('name', 'N/A')} (ID: {interest.get('id', 'N/A')})")
153 |
154 | except json.JSONDecodeError:
155 | results[query] = {
156 | "success": False,
157 | "error": "Invalid JSON response",
158 | "raw_content": content
159 | }
160 | print(f" ❌ Invalid JSON: {content}")
161 |
162 | return results
163 |
164 | def test_get_interest_suggestions(self) -> Dict[str, Any]:
165 | """Test get_interest_suggestions functionality"""
166 |
167 | print(f"\n🔍 Testing get_interest_suggestions function")
168 |
169 | interest_list = self.test_queries["interest_suggestions"]
170 | print(f" 🔎 Getting suggestions for: {interest_list}")
171 |
172 | result = self._make_request("tools/call", {
173 | "name": "get_interest_suggestions",
174 | "arguments": {
175 | "interest_list": interest_list,
176 | "limit": 5
177 | }
178 | })
179 |
180 | if not result["success"]:
181 | return {
182 | "success": False,
183 | "error": result.get("text", "Unknown error")
184 | }
185 |
186 | # Parse response
187 | response_data = result["json"]["result"]
188 | content = response_data.get("content", [{}])[0].get("text", "")
189 |
190 | try:
191 | parsed_content = json.loads(content)
192 |
193 | if "error" in parsed_content:
194 | return {
195 | "success": False,
196 | "error": parsed_content["error"]
197 | }
198 |
199 | suggestions = parsed_content.get("data", [])
200 |
201 | result_data = {
202 | "success": True,
203 | "count": len(suggestions),
204 | "suggestions": suggestions[:3], # Keep first 3 for display
205 | "has_required_fields": all(
206 | "id" in suggestion and "name" in suggestion
207 | for suggestion in suggestions
208 | )
209 | }
210 |
211 | print(f" ✅ Found {len(suggestions)} suggestions")
212 | for suggestion in suggestions[:3]:
213 | print(f" • {suggestion.get('name', 'N/A')} (ID: {suggestion.get('id', 'N/A')})")
214 |
215 | return result_data
216 |
217 | except json.JSONDecodeError:
218 | return {
219 | "success": False,
220 | "error": "Invalid JSON response",
221 | "raw_content": content
222 | }
223 |
224 | def test_validate_interests(self) -> Dict[str, Any]:
225 | """Test validate_interests functionality"""
226 |
227 | print(f"\n🔍 Testing validate_interests function")
228 |
229 | # Test with known valid and invalid interest names
230 | test_interests = ["Japan", "Basketball", "invalidinterestname12345"]
231 | print(f" 🔎 Validating interests: {test_interests}")
232 |
233 | result = self._make_request("tools/call", {
234 | "name": "validate_interests",
235 | "arguments": {
236 | "interest_list": test_interests
237 | }
238 | })
239 |
240 | if not result["success"]:
241 | return {
242 | "success": False,
243 | "error": result.get("text", "Unknown error")
244 | }
245 |
246 | # Parse response
247 | response_data = result["json"]["result"]
248 | content = response_data.get("content", [{}])[0].get("text", "")
249 |
250 | try:
251 | parsed_content = json.loads(content)
252 |
253 | if "error" in parsed_content:
254 | return {
255 | "success": False,
256 | "error": parsed_content["error"]
257 | }
258 |
259 | validations = parsed_content.get("data", [])
260 |
261 | result_data = {
262 | "success": True,
263 | "count": len(validations),
264 | "validations": validations,
265 | "has_valid_interests": any(
266 | validation.get("valid", False) for validation in validations
267 | ),
268 | "has_invalid_interests": any(
269 | not validation.get("valid", True) for validation in validations
270 | )
271 | }
272 |
273 | print(f" ✅ Validated {len(validations)} interests")
274 | for validation in validations:
275 | status = "✅" if validation.get("valid") else "❌"
276 | print(f" {status} {validation.get('name', 'N/A')}")
277 |
278 | return result_data
279 |
280 | except json.JSONDecodeError:
281 | return {
282 | "success": False,
283 | "error": "Invalid JSON response",
284 | "raw_content": content
285 | }
286 |
287 | def test_search_behaviors(self) -> Dict[str, Any]:
288 | """Test search_behaviors functionality"""
289 |
290 | print(f"\n🔍 Testing search_behaviors function")
291 |
292 | result = self._make_request("tools/call", {
293 | "name": "search_behaviors",
294 | "arguments": {
295 | "limit": 5
296 | }
297 | })
298 |
299 | if not result["success"]:
300 | return {
301 | "success": False,
302 | "error": result.get("text", "Unknown error")
303 | }
304 |
305 | # Parse response
306 | response_data = result["json"]["result"]
307 | content = response_data.get("content", [{}])[0].get("text", "")
308 |
309 | try:
310 | parsed_content = json.loads(content)
311 |
312 | if "error" in parsed_content:
313 | return {
314 | "success": False,
315 | "error": parsed_content["error"]
316 | }
317 |
318 | behaviors = parsed_content.get("data", [])
319 |
320 | result_data = {
321 | "success": True,
322 | "count": len(behaviors),
323 | "behaviors": behaviors[:3], # Keep first 3 for display
324 | "has_required_fields": all(
325 | "id" in behavior and "name" in behavior
326 | for behavior in behaviors
327 | )
328 | }
329 |
330 | print(f" ✅ Found {len(behaviors)} behaviors")
331 | for behavior in behaviors[:3]:
332 | print(f" • {behavior.get('name', 'N/A')} (ID: {behavior.get('id', 'N/A')})")
333 |
334 | return result_data
335 |
336 | except json.JSONDecodeError:
337 | return {
338 | "success": False,
339 | "error": "Invalid JSON response",
340 | "raw_content": content
341 | }
342 |
343 | def test_search_demographics(self) -> Dict[str, Any]:
344 | """Test search_demographics functionality"""
345 |
346 | print(f"\n🔍 Testing search_demographics function")
347 | results = {}
348 |
349 | for demo_class in self.test_queries["demographics"]:
350 | print(f" 🔎 Searching demographics class: '{demo_class}'")
351 |
352 | result = self._make_request("tools/call", {
353 | "name": "search_demographics",
354 | "arguments": {
355 | "demographic_class": demo_class,
356 | "limit": 3
357 | }
358 | })
359 |
360 | if not result["success"]:
361 | results[demo_class] = {
362 | "success": False,
363 | "error": result.get("text", "Unknown error")
364 | }
365 | print(f" ❌ Failed: {result.get('text', 'Unknown error')}")
366 | continue
367 |
368 | # Parse response
369 | response_data = result["json"]["result"]
370 | content = response_data.get("content", [{}])[0].get("text", "")
371 |
372 | try:
373 | parsed_content = json.loads(content)
374 |
375 | if "error" in parsed_content:
376 | results[demo_class] = {
377 | "success": False,
378 | "error": parsed_content["error"]
379 | }
380 | print(f" ❌ API Error: {parsed_content['error']}")
381 | continue
382 |
383 | demographics = parsed_content.get("data", [])
384 |
385 | results[demo_class] = {
386 | "success": True,
387 | "count": len(demographics),
388 | "demographics": demographics[:2], # Keep first 2 for display
389 | "has_required_fields": all(
390 | "id" in demo and "name" in demo
391 | for demo in demographics
392 | )
393 | }
394 |
395 | print(f" ✅ Found {len(demographics)} {demo_class}")
396 | for demo in demographics[:2]:
397 | print(f" • {demo.get('name', 'N/A')} (ID: {demo.get('id', 'N/A')})")
398 |
399 | except json.JSONDecodeError:
400 | results[demo_class] = {
401 | "success": False,
402 | "error": "Invalid JSON response",
403 | "raw_content": content
404 | }
405 | print(f" ❌ Invalid JSON: {content}")
406 |
407 | return results
408 |
409 | def test_search_geo_locations(self) -> Dict[str, Any]:
410 | """Test search_geo_locations functionality"""
411 |
412 | print(f"\n🔍 Testing search_geo_locations function")
413 | results = {}
414 |
415 | for query in self.test_queries["geo_locations"]:
416 | print(f" 🔎 Searching for locations: '{query}'")
417 |
418 | result = self._make_request("tools/call", {
419 | "name": "search_geo_locations",
420 | "arguments": {
421 | "query": query,
422 | "location_types": ["country", "region", "city"],
423 | "limit": 3
424 | }
425 | })
426 |
427 | if not result["success"]:
428 | results[query] = {
429 | "success": False,
430 | "error": result.get("text", "Unknown error")
431 | }
432 | print(f" ❌ Failed: {result.get('text', 'Unknown error')}")
433 | continue
434 |
435 | # Parse response
436 | response_data = result["json"]["result"]
437 | content = response_data.get("content", [{}])[0].get("text", "")
438 |
439 | try:
440 | parsed_content = json.loads(content)
441 |
442 | if "error" in parsed_content:
443 | results[query] = {
444 | "success": False,
445 | "error": parsed_content["error"]
446 | }
447 | print(f" ❌ API Error: {parsed_content['error']}")
448 | continue
449 |
450 | locations = parsed_content.get("data", [])
451 |
452 | results[query] = {
453 | "success": True,
454 | "count": len(locations),
455 | "locations": locations[:3], # Keep first 3 for display
456 | "has_required_fields": all(
457 | "key" in location and "name" in location and "type" in location
458 | for location in locations
459 | )
460 | }
461 |
462 | print(f" ✅ Found {len(locations)} locations")
463 | for location in locations[:3]:
464 | print(f" • {location.get('name', 'N/A')} ({location.get('type', 'N/A')}, Key: {location.get('key', 'N/A')})")
465 |
466 | except json.JSONDecodeError:
467 | results[query] = {
468 | "success": False,
469 | "error": "Invalid JSON response",
470 | "raw_content": content
471 | }
472 | print(f" ❌ Invalid JSON: {content}")
473 |
474 | return results
475 |
476 | def run_targeting_search_tests(self) -> bool:
477 | """Run comprehensive targeting search tests"""
478 |
479 | print("🚀 Meta Ads Targeting Search End-to-End Test Suite")
480 | print("="*60)
481 |
482 | # Check server availability
483 | try:
484 | response = requests.get(f"{self.base_url}/", timeout=5)
485 | server_running = response.status_code in [200, 404]
486 | except:
487 | server_running = False
488 |
489 | if not server_running:
490 | print("❌ Server is not running at", self.base_url)
491 | print(" Please start the server with:")
492 | print(" python3 -m meta_ads_mcp --transport streamable-http --port 8080")
493 | return False
494 |
495 | print("✅ Server is running")
496 | print("🔐 Using implicit authentication from server")
497 |
498 | # Test 1: Search Interests
499 | print("\n" + "="*60)
500 | print("📋 PHASE 1: Testing Interest Search")
501 | print("="*60)
502 |
503 | interests_results = self.test_search_interests()
504 | interests_success = any(
505 | result.get("success") and result.get("count", 0) > 0
506 | for result in interests_results.values()
507 | )
508 |
509 | # Test 2: Interest Suggestions
510 | print("\n" + "="*60)
511 | print("📋 PHASE 2: Testing Interest Suggestions")
512 | print("="*60)
513 |
514 | suggestions_result = self.test_get_interest_suggestions()
515 | suggestions_success = suggestions_result.get("success") and suggestions_result.get("count", 0) > 0
516 |
517 | # Test 3: Interest Validation
518 | print("\n" + "="*60)
519 | print("📋 PHASE 3: Testing Interest Validation")
520 | print("="*60)
521 |
522 | validation_result = self.test_validate_interests()
523 | validation_success = (validation_result.get("success") and
524 | validation_result.get("has_valid_interests") and
525 | validation_result.get("has_invalid_interests"))
526 |
527 | # Test 4: Behavior Search
528 | print("\n" + "="*60)
529 | print("📋 PHASE 4: Testing Behavior Search")
530 | print("="*60)
531 |
532 | behaviors_result = self.test_search_behaviors()
533 | behaviors_success = behaviors_result.get("success") and behaviors_result.get("count", 0) > 0
534 |
535 | # Test 5: Demographics Search
536 | print("\n" + "="*60)
537 | print("📋 PHASE 5: Testing Demographics Search")
538 | print("="*60)
539 |
540 | demographics_results = self.test_search_demographics()
541 | demographics_success = any(
542 | result.get("success") and result.get("count", 0) > 0
543 | for result in demographics_results.values()
544 | )
545 |
546 | # Test 6: Geo Location Search
547 | print("\n" + "="*60)
548 | print("📋 PHASE 6: Testing Geo Location Search")
549 | print("="*60)
550 |
551 | geo_results = self.test_search_geo_locations()
552 | geo_success = any(
553 | result.get("success") and result.get("count", 0) > 0
554 | for result in geo_results.values()
555 | )
556 |
557 | # Final assessment
558 | print("\n" + "="*60)
559 | print("📊 FINAL RESULTS")
560 | print("="*60)
561 |
562 | all_tests = [
563 | ("Interest Search", interests_success),
564 | ("Interest Suggestions", suggestions_success),
565 | ("Interest Validation", validation_success),
566 | ("Behavior Search", behaviors_success),
567 | ("Demographics Search", demographics_success),
568 | ("Geo Location Search", geo_success)
569 | ]
570 |
571 | passed_tests = sum(1 for _, success in all_tests if success)
572 | total_tests = len(all_tests)
573 |
574 | for test_name, success in all_tests:
575 | status = "✅ PASSED" if success else "❌ FAILED"
576 | print(f" • {test_name}: {status}")
577 |
578 | overall_success = passed_tests >= 4 # At least 4 out of 6 tests should pass
579 |
580 | if overall_success:
581 | print(f"\n✅ Targeting search tests: SUCCESS ({passed_tests}/{total_tests} passed)")
582 | print(" • Core targeting search functionality is working")
583 | print(" • Meta Ads API integration is functional")
584 | return True
585 | else:
586 | print(f"\n❌ Targeting search tests: FAILED ({passed_tests}/{total_tests} passed)")
587 | print(" • Some targeting search functions are not working properly")
588 | return False
589 |
590 |
591 | def main():
592 | """Main test execution"""
593 | tester = TargetingSearchTester()
594 | success = tester.run_targeting_search_tests()
595 |
596 | if success:
597 | print("\n🎉 All targeting search tests passed!")
598 | else:
599 | print("\n⚠️ Some targeting search tests failed - see details above")
600 |
601 | sys.exit(0 if success else 1)
602 |
603 |
604 | if __name__ == "__main__":
605 | main()
```
--------------------------------------------------------------------------------
/tests/test_insights_actions_and_values_e2e.py:
--------------------------------------------------------------------------------
```python
1 | #!/usr/bin/env python3
2 | """
3 | Unit Tests for Insights Actions and Action Values Functionality
4 |
5 | This test suite validates the actions and action_values field implementation for the
6 | get_insights function in meta_ads_mcp/core/insights.py.
7 |
8 | Test cases cover:
9 | - Actions and action_values field inclusion in API requests
10 | - Different levels of aggregation (ad, adset, campaign, account)
11 | - Time range handling with actions and action_values
12 | - Error handling and validation
13 | - Purchase data extraction from actions and action_values
14 | """
15 |
16 | import pytest
17 | import json
18 | import asyncio
19 | import requests
20 | from unittest.mock import AsyncMock, patch, MagicMock
21 | from typing import Dict, Any, List
22 |
23 | # Import the function to test
24 | from meta_ads_mcp.core.insights import get_insights
25 |
26 |
27 | class TestInsightsActionsAndValues:
28 | """Test suite for actions and action_values in insights"""
29 |
30 | @pytest.fixture
31 | def mock_api_request(self):
32 | """Mock for the make_api_request function"""
33 | with patch('meta_ads_mcp.core.insights.make_api_request') as mock:
34 | mock.return_value = {
35 | "data": [
36 | {
37 | "campaign_id": "test_campaign_id",
38 | "campaign_name": "Test Campaign",
39 | "impressions": "1000",
40 | "clicks": "50",
41 | "spend": "100.00",
42 | "actions": [
43 | {"action_type": "purchase", "value": "5"},
44 | {"action_type": "lead", "value": "3"},
45 | {"action_type": "view_content", "value": "20"}
46 | ],
47 | "action_values": [
48 | {"action_type": "purchase", "value": "500.00"},
49 | {"action_type": "lead", "value": "150.00"},
50 | {"action_type": "view_content", "value": "0.00"}
51 | ],
52 | "cost_per_action_type": [
53 | {"action_type": "purchase", "value": "20.00"},
54 | {"action_type": "lead", "value": "33.33"}
55 | ]
56 | }
57 | ],
58 | "paging": {}
59 | }
60 | yield mock
61 |
62 | @pytest.fixture
63 | def mock_auth_manager(self):
64 | """Mock for the authentication manager"""
65 | with patch('meta_ads_mcp.core.api.auth_manager') as mock, \
66 | patch('meta_ads_mcp.core.auth.get_current_access_token') as mock_get_token:
67 | # Mock a valid access token
68 | mock.get_current_access_token.return_value = "test_access_token"
69 | mock.is_token_valid.return_value = True
70 | mock.app_id = "test_app_id"
71 | mock_get_token.return_value = "test_access_token"
72 | yield mock
73 |
74 | @pytest.fixture
75 | def valid_campaign_id(self):
76 | """Valid campaign ID for testing"""
77 | return "123456789"
78 |
79 | @pytest.fixture
80 | def valid_account_id(self):
81 | """Valid account ID for testing"""
82 | return "act_701351919139047"
83 |
84 | @pytest.mark.asyncio
85 | async def test_actions_and_action_values_included_in_fields(self, mock_api_request, mock_auth_manager, valid_campaign_id):
86 | """Test that actions and action_values are included in the fields parameter"""
87 |
88 | result = await get_insights(
89 | object_id=valid_campaign_id,
90 | time_range="last_30d",
91 | level="campaign"
92 | )
93 |
94 | # Parse the result
95 | result_data = json.loads(result)
96 |
97 | # Verify the API was called with correct parameters
98 | mock_api_request.assert_called_once()
99 | call_args = mock_api_request.call_args
100 |
101 | # Check that the endpoint is correct (first argument)
102 | assert call_args[0][0] == f"{valid_campaign_id}/insights"
103 |
104 | # Check that actions and action_values are included in fields parameter
105 | params = call_args[0][2] # Third positional argument is params
106 | assert 'fields' in params
107 |
108 | fields = params['fields']
109 | assert 'actions' in fields
110 | assert 'action_values' in fields
111 | assert 'cost_per_action_type' in fields
112 |
113 | # Verify the response structure
114 | assert 'data' in result_data
115 | assert len(result_data['data']) > 0
116 | assert 'actions' in result_data['data'][0]
117 | assert 'action_values' in result_data['data'][0]
118 |
119 | @pytest.mark.asyncio
120 | async def test_purchase_data_extraction(self, mock_api_request, mock_auth_manager, valid_campaign_id):
121 | """Test that purchase data can be extracted from actions and action_values"""
122 |
123 | result = await get_insights(
124 | object_id=valid_campaign_id,
125 | time_range="last_30d",
126 | level="campaign"
127 | )
128 |
129 | # Parse the result
130 | result_data = json.loads(result)
131 |
132 | # Get the first data point
133 | data_point = result_data['data'][0]
134 |
135 | # Extract purchase data from actions
136 | actions = data_point.get('actions', [])
137 | purchase_actions = [action for action in actions if action.get('action_type') == 'purchase']
138 |
139 | # Extract purchase data from action_values
140 | action_values = data_point.get('action_values', [])
141 | purchase_values = [action_value for action_value in action_values if action_value.get('action_type') == 'purchase']
142 |
143 | # Verify purchase data exists
144 | assert len(purchase_actions) > 0, "No purchase actions found"
145 | assert len(purchase_values) > 0, "No purchase action_values found"
146 |
147 | # Verify purchase data values
148 | purchase_count = purchase_actions[0].get('value')
149 | purchase_value = purchase_values[0].get('value')
150 |
151 | assert purchase_count == "5", f"Expected purchase count 5, got {purchase_count}"
152 | assert purchase_value == "500.00", f"Expected purchase value 500.00, got {purchase_value}"
153 |
154 | @pytest.mark.asyncio
155 | async def test_actions_at_adset_level(self, mock_api_request, mock_auth_manager, valid_campaign_id):
156 | """Test actions and action_values at adset level"""
157 |
158 | result = await get_insights(
159 | object_id=valid_campaign_id,
160 | time_range="last_30d",
161 | level="adset"
162 | )
163 |
164 | # Parse the result
165 | result_data = json.loads(result)
166 |
167 | # Verify the API was called with correct parameters
168 | mock_api_request.assert_called_once()
169 | call_args = mock_api_request.call_args
170 |
171 | # Check that the level parameter is correct
172 | params = call_args[0][2]
173 | assert params['level'] == 'adset'
174 |
175 | # Verify the response structure
176 | assert 'data' in result_data
177 |
178 | @pytest.mark.asyncio
179 | async def test_actions_at_ad_level(self, mock_api_request, mock_auth_manager, valid_campaign_id):
180 | """Test actions and action_values at ad level"""
181 |
182 | result = await get_insights(
183 | object_id=valid_campaign_id,
184 | time_range="last_30d",
185 | level="ad"
186 | )
187 |
188 | # Parse the result
189 | result_data = json.loads(result)
190 |
191 | # Verify the API was called with correct parameters
192 | mock_api_request.assert_called_once()
193 | call_args = mock_api_request.call_args
194 |
195 | # Check that the level parameter is correct
196 | params = call_args[0][2]
197 | assert params['level'] == 'ad'
198 |
199 | # Verify the response structure
200 | assert 'data' in result_data
201 |
202 | @pytest.mark.asyncio
203 | async def test_actions_with_custom_time_range(self, mock_api_request, mock_auth_manager, valid_campaign_id):
204 | """Test actions and action_values with custom time range"""
205 |
206 | custom_time_range = {"since": "2024-01-01", "until": "2024-01-31"}
207 |
208 | result = await get_insights(
209 | object_id=valid_campaign_id,
210 | time_range=custom_time_range,
211 | level="campaign"
212 | )
213 |
214 | # Parse the result
215 | result_data = json.loads(result)
216 |
217 | # Verify the API was called with correct parameters
218 | mock_api_request.assert_called_once()
219 | call_args = mock_api_request.call_args
220 |
221 | # Check that time_range is properly formatted
222 | params = call_args[0][2]
223 | assert 'time_range' in params
224 | assert params['time_range'] == json.dumps(custom_time_range)
225 |
226 | # Verify the response structure
227 | assert 'data' in result_data
228 |
229 | @pytest.mark.asyncio
230 | async def test_actions_with_breakdown(self, mock_api_request, mock_auth_manager, valid_campaign_id):
231 | """Test actions and action_values with breakdown dimension"""
232 |
233 | result = await get_insights(
234 | object_id=valid_campaign_id,
235 | time_range="last_30d",
236 | level="campaign",
237 | breakdown="age"
238 | )
239 |
240 | # Parse the result
241 | result_data = json.loads(result)
242 |
243 | # Verify the API was called with correct parameters
244 | mock_api_request.assert_called_once()
245 | call_args = mock_api_request.call_args
246 |
247 | # Check that breakdown is included
248 | params = call_args[0][2]
249 | assert 'breakdowns' in params
250 | assert params['breakdowns'] == 'age'
251 |
252 | # Verify the response structure
253 | assert 'data' in result_data
254 |
255 | @pytest.mark.asyncio
256 | async def test_actions_without_object_id(self, mock_api_request, mock_auth_manager):
257 | """Test error handling when no object_id is provided"""
258 |
259 | result = await get_insights(
260 | time_range="last_30d",
261 | level="campaign"
262 | )
263 |
264 | # Parse the result. The decorator returns a dict error for missing required args
265 | if isinstance(result, dict):
266 | result_data = result
267 | else:
268 | result_data = json.loads(result)
269 |
270 | assert 'error' in result_data
271 | assert "missing 1 required positional argument: 'object_id'" in result_data['error']
272 |
273 | # Verify API was not called
274 | mock_api_request.assert_not_called()
275 |
276 | @pytest.mark.asyncio
277 | async def test_actions_with_invalid_time_range(self, mock_api_request, mock_auth_manager, valid_campaign_id):
278 | """Test error handling with invalid time range"""
279 |
280 | invalid_time_range = {"since": "2024-01-01"} # Missing "until"
281 |
282 | result = await get_insights(
283 | object_id=valid_campaign_id,
284 | time_range=invalid_time_range,
285 | level="campaign"
286 | )
287 |
288 | # Parse the result
289 | result_data = json.loads(result)
290 |
291 | # The error response is wrapped in a 'data' field
292 | if 'data' in result_data:
293 | error_data = json.loads(result_data['data'])
294 | assert 'error' in error_data
295 | assert 'since' in error_data['error'] and 'until' in error_data['error']
296 | else:
297 | assert 'error' in result_data
298 | assert 'since' in result_data['error'] and 'until' in result_data['error']
299 |
300 | # Verify API was not called
301 | mock_api_request.assert_not_called()
302 |
303 | @pytest.mark.asyncio
304 | async def test_actions_api_error_handling(self, mock_api_request, mock_auth_manager, valid_campaign_id):
305 | """Test error handling when API call fails"""
306 |
307 | # Mock API to raise an exception
308 | mock_api_request.side_effect = Exception("API Error")
309 |
310 | result = await get_insights(
311 | object_id=valid_campaign_id,
312 | time_range="last_30d",
313 | level="campaign"
314 | )
315 |
316 | # Parse the result
317 | # The API error handling returns a dict directly, not a JSON string
318 | if isinstance(result, dict):
319 | result_data = result
320 | else:
321 | result_data = json.loads(result)
322 |
323 | # Verify error response
324 | assert 'error' in result_data
325 | assert 'API Error' in result_data['error']
326 |
327 | @pytest.mark.asyncio
328 | async def test_actions_fields_completeness(self, mock_api_request, mock_auth_manager, valid_campaign_id):
329 | """Test that all required fields are included in the request"""
330 |
331 | result = await get_insights(
332 | object_id=valid_campaign_id,
333 | time_range="last_30d",
334 | level="campaign"
335 | )
336 |
337 | # Verify the API was called with correct parameters
338 | mock_api_request.assert_called_once()
339 | call_args = mock_api_request.call_args
340 |
341 | # Check that all required fields are included
342 | params = call_args[0][2]
343 | fields = params['fields']
344 |
345 | # Required fields for actions and action_values
346 | required_fields = [
347 | 'account_id', 'account_name', 'campaign_id', 'campaign_name',
348 | 'adset_id', 'adset_name', 'ad_id', 'ad_name',
349 | 'impressions', 'clicks', 'spend', 'cpc', 'cpm', 'ctr',
350 | 'reach', 'frequency', 'actions', 'action_values', 'conversions',
351 | 'unique_clicks', 'cost_per_action_type'
352 | ]
353 |
354 | for field in required_fields:
355 | assert field in fields, f"Field '{field}' not found in fields parameter"
356 |
357 | @pytest.mark.asyncio
358 | async def test_multiple_action_types(self, mock_api_request, mock_auth_manager, valid_campaign_id):
359 | """Test handling of multiple action types in the response"""
360 |
361 | result = await get_insights(
362 | object_id=valid_campaign_id,
363 | time_range="last_30d",
364 | level="campaign"
365 | )
366 |
367 | # Parse the result
368 | result_data = json.loads(result)
369 |
370 | # Get the first data point
371 | data_point = result_data['data'][0]
372 |
373 | # Check that multiple action types are present
374 | actions = data_point.get('actions', [])
375 | action_types = [action.get('action_type') for action in actions]
376 |
377 | assert 'purchase' in action_types, "Purchase action type not found"
378 | assert 'lead' in action_types, "Lead action type not found"
379 | assert 'view_content' in action_types, "View content action type not found"
380 |
381 | # Check action_values has corresponding entries
382 | action_values = data_point.get('action_values', [])
383 | action_value_types = [action_value.get('action_type') for action_value in action_values]
384 |
385 | assert 'purchase' in action_value_types, "Purchase action_value type not found"
386 | assert 'lead' in action_value_types, "Lead action_value type not found"
387 |
388 |
389 | @pytest.mark.e2e
390 | @pytest.mark.skip(reason="E2E test - run manually only")
391 | class TestInsightsActionsAndValuesE2E:
392 | """E2E tests for actions and action_values via MCP HTTP server"""
393 |
394 | def __init__(self, base_url: str = "http://localhost:8080"):
395 | self.base_url = base_url.rstrip('/')
396 | self.endpoint = f"{self.base_url}/mcp/"
397 | self.request_id = 1
398 | # Default account from workspace rules
399 | self.account_id = "act_701351919139047"
400 |
401 | def _make_request(self, method: str, params: Dict[str, Any] = None) -> Dict[str, Any]:
402 | headers = {
403 | "Content-Type": "application/json",
404 | "Accept": "application/json, text/event-stream",
405 | "User-Agent": "Insights-E2E-Test-Client/1.0"
406 | }
407 | payload = {
408 | "jsonrpc": "2.0",
409 | "method": method,
410 | "id": self.request_id
411 | }
412 | if params:
413 | payload["params"] = params
414 | try:
415 | resp = requests.post(self.endpoint, headers=headers, json=payload, timeout=30)
416 | self.request_id += 1
417 | return {
418 | "status_code": resp.status_code,
419 | "json": resp.json() if resp.status_code == 200 else None,
420 | "text": resp.text,
421 | "success": resp.status_code == 200
422 | }
423 | except requests.exceptions.RequestException as e:
424 | return {"status_code": 0, "json": None, "text": str(e), "success": False, "error": str(e)}
425 |
426 | def _check_for_errors(self, parsed_content: Dict[str, Any]) -> Dict[str, Any]:
427 | if "data" in parsed_content:
428 | data = parsed_content["data"]
429 | if isinstance(data, dict) and 'error' in data:
430 | return {"has_error": True, "error_message": data['error'], "format": "wrapped_dict"}
431 | if isinstance(data, str):
432 | try:
433 | error_data = json.loads(data)
434 | if 'error' in error_data:
435 | return {"has_error": True, "error_message": error_data['error'], "format": "wrapped_json"}
436 | except json.JSONDecodeError:
437 | pass
438 | if 'error' in parsed_content:
439 | return {"has_error": True, "error_message": parsed_content['error'], "format": "direct"}
440 | return {"has_error": False}
441 |
442 | def test_tool_exists_and_has_required_fields(self):
443 | result = self._make_request("tools/list", {})
444 | assert result["success"], f"tools/list failed: {result.get('text', '')}"
445 | tools = result["json"]["result"].get("tools", [])
446 | tool = next((t for t in tools if t["name"] == "get_insights"), None)
447 | assert tool is not None, "get_insights tool not found"
448 | props = tool.get("inputSchema", {}).get("properties", {})
449 | for req in ["object_id", "time_range", "breakdown", "level"]:
450 | assert req in props, f"Missing parameter in schema: {req}"
451 |
452 | def test_get_insights_account_level(self):
453 | params = {
454 | "name": "get_insights",
455 | "arguments": {
456 | "object_id": self.account_id,
457 | "time_range": "last_30d",
458 | "level": "account"
459 | }
460 | }
461 | result = self._make_request("tools/call", params)
462 | assert result["success"], f"tools/call failed: {result.get('text', '')}"
463 | response_data = result["json"]["result"]
464 | content = response_data.get("content", [{}])[0].get("text", "")
465 | parsed = json.loads(content)
466 | err = self._check_for_errors(parsed)
467 | # Don't fail if auth or permissions block; just assert structure is parsable
468 | if err["has_error"]:
469 | assert isinstance(err["error_message"], (str, dict))
470 | else:
471 | # Expect data list on success
472 | data = parsed.get("data") if isinstance(parsed, dict) else None
473 | assert data is not None
474 |
475 | def main():
476 | tester = TestInsightsActionsAndValuesE2E()
477 | print("🚀 Insights Actions E2E (manual)")
478 | # Basic smoke run
479 | try:
480 | tester.test_tool_exists_and_has_required_fields()
481 | print("✅ Tool schema ok")
482 | tester.test_get_insights_account_level()
483 | print("✅ Account-level insights request executed")
484 | except AssertionError as e:
485 | print(f"❌ Assertion failed: {e}")
486 | except Exception as e:
487 | print(f"❌ Error: {e}")
488 |
489 | if __name__ == "__main__":
490 | main()
491 |
492 |
493 | def extract_purchase_data(insights_data):
494 | """
495 | Helper function to extract purchase data from insights response.
496 |
497 | Args:
498 | insights_data: The data array from insights response
499 |
500 | Returns:
501 | dict: Dictionary with purchase count and value
502 | """
503 | if not insights_data or len(insights_data) == 0:
504 | return {"purchase_count": 0, "purchase_value": 0.0}
505 |
506 | data_point = insights_data[0]
507 |
508 | # Extract purchase count from actions
509 | actions = data_point.get('actions', [])
510 | purchase_actions = [action for action in actions if action.get('action_type') == 'purchase']
511 | purchase_count = int(purchase_actions[0].get('value', 0)) if purchase_actions else 0
512 |
513 | # Extract purchase value from action_values
514 | action_values = data_point.get('action_values', [])
515 | purchase_values = [action_value for action_value in action_values if action_value.get('action_type') == 'purchase']
516 | purchase_value = float(purchase_values[0].get('value', 0)) if purchase_values else 0.0
517 |
518 | return {
519 | "purchase_count": purchase_count,
520 | "purchase_value": purchase_value
521 | }
522 |
523 |
524 | class TestPurchaseDataExtraction:
525 | """Test suite for purchase data extraction helper function"""
526 |
527 | def test_extract_purchase_data_with_purchases(self):
528 | """Test extraction when purchase data is present"""
529 | insights_data = [{
530 | "actions": [
531 | {"action_type": "purchase", "value": "5"},
532 | {"action_type": "lead", "value": "3"}
533 | ],
534 | "action_values": [
535 | {"action_type": "purchase", "value": "500.00"},
536 | {"action_type": "lead", "value": "150.00"}
537 | ]
538 | }]
539 |
540 | result = extract_purchase_data(insights_data)
541 |
542 | assert result["purchase_count"] == 5
543 | assert result["purchase_value"] == 500.0
544 |
545 | def test_extract_purchase_data_without_purchases(self):
546 | """Test extraction when no purchase data is present"""
547 | insights_data = [{
548 | "actions": [
549 | {"action_type": "lead", "value": "3"},
550 | {"action_type": "view_content", "value": "20"}
551 | ],
552 | "action_values": [
553 | {"action_type": "lead", "value": "150.00"},
554 | {"action_type": "view_content", "value": "0.00"}
555 | ]
556 | }]
557 |
558 | result = extract_purchase_data(insights_data)
559 |
560 | assert result["purchase_count"] == 0
561 | assert result["purchase_value"] == 0.0
562 |
563 | def test_extract_purchase_data_empty_data(self):
564 | """Test extraction with empty data"""
565 | insights_data = []
566 |
567 | result = extract_purchase_data(insights_data)
568 |
569 | assert result["purchase_count"] == 0
570 | assert result["purchase_value"] == 0.0
```
--------------------------------------------------------------------------------
/meta_ads_mcp/core/auth.py:
--------------------------------------------------------------------------------
```python
1 | """Authentication related functionality for Meta Ads API."""
2 |
3 | from typing import Any, Dict, Optional
4 | import time
5 | import platform
6 | import pathlib
7 | import os
8 | import webbrowser
9 | import asyncio
10 | import json
11 | from .utils import logger
12 | import requests
13 |
14 | # Import from the new callback server module
15 | from .callback_server import (
16 | start_callback_server,
17 | shutdown_callback_server,
18 | token_container,
19 | callback_server_port
20 | )
21 |
22 | # Import the new Pipeboard authentication
23 | from .pipeboard_auth import pipeboard_auth_manager
24 |
25 | # Auth constants
26 | # Scope includes pages_show_list and pages_read_engagement to fix issue #16
27 | # where get_account_pages failed for regular users due to missing page permissions
28 | AUTH_SCOPE = "business_management,public_profile,pages_show_list,pages_read_engagement"
29 | AUTH_REDIRECT_URI = "http://localhost:8888/callback"
30 | AUTH_RESPONSE_TYPE = "token"
31 |
32 | # Log important configuration information
33 | logger.info("Authentication module initialized")
34 | logger.info(f"Auth scope: {AUTH_SCOPE}")
35 | logger.info(f"Default redirect URI: {AUTH_REDIRECT_URI}")
36 |
37 | # Global flag for authentication state
38 | needs_authentication = False
39 |
40 | # Meta configuration singleton
41 | class MetaConfig:
42 | _instance = None
43 |
44 | def __new__(cls):
45 | if cls._instance is None:
46 | logger.debug("Creating new MetaConfig instance")
47 | cls._instance = super(MetaConfig, cls).__new__(cls)
48 | cls._instance.app_id = os.environ.get("META_APP_ID", "779761636818489")
49 | logger.info(f"MetaConfig initialized with app_id from env/default: {cls._instance.app_id}")
50 | return cls._instance
51 |
52 | def set_app_id(self, app_id):
53 | """Set the Meta App ID for API calls"""
54 | logger.info(f"Setting Meta App ID: {app_id}")
55 | self.app_id = app_id
56 | # Also update environment variable for modules that might read directly from it
57 | os.environ["META_APP_ID"] = app_id
58 | logger.debug(f"Updated META_APP_ID environment variable: {os.environ.get('META_APP_ID')}")
59 |
60 | def get_app_id(self):
61 | """Get the current Meta App ID"""
62 | # Check if we have one set
63 | if hasattr(self, 'app_id') and self.app_id:
64 | logger.debug(f"Using app_id from instance: {self.app_id}")
65 | return self.app_id
66 |
67 | # If not, try environment variable
68 | env_app_id = os.environ.get("META_APP_ID", "")
69 | if env_app_id:
70 | logger.debug(f"Using app_id from environment: {env_app_id}")
71 | # Update our instance for future use
72 | self.app_id = env_app_id
73 | return env_app_id
74 |
75 | logger.warning("No app_id found in instance or environment variables")
76 | return ""
77 |
78 | def is_configured(self):
79 | """Check if the Meta configuration is complete"""
80 | app_id = self.get_app_id()
81 | configured = bool(app_id)
82 | logger.debug(f"MetaConfig.is_configured() = {configured} (app_id: {app_id})")
83 | return configured
84 |
85 | # Create singleton instance
86 | meta_config = MetaConfig()
87 |
88 | class TokenInfo:
89 | """Stores token information including expiration"""
90 | def __init__(self, access_token: str, expires_in: Optional[int] = None, user_id: Optional[str] = None):
91 | self.access_token = access_token
92 | self.expires_in = expires_in
93 | self.user_id = user_id
94 | self.created_at = int(time.time())
95 | logger.debug(f"TokenInfo created. Expires in: {expires_in if expires_in else 'Not specified'}")
96 |
97 | def is_expired(self) -> bool:
98 | """Check if the token is expired"""
99 | if not self.expires_in:
100 | return False # If no expiration is set, assume it's not expired
101 |
102 | current_time = int(time.time())
103 | return current_time > (self.created_at + self.expires_in)
104 |
105 | def serialize(self) -> Dict[str, Any]:
106 | """Convert to a dictionary for storage"""
107 | return {
108 | "access_token": self.access_token,
109 | "expires_in": self.expires_in,
110 | "user_id": self.user_id,
111 | "created_at": self.created_at
112 | }
113 |
114 | @classmethod
115 | def deserialize(cls, data: Dict[str, Any]) -> 'TokenInfo':
116 | """Create from a stored dictionary"""
117 | token = cls(
118 | access_token=data.get("access_token", ""),
119 | expires_in=data.get("expires_in"),
120 | user_id=data.get("user_id")
121 | )
122 | token.created_at = data.get("created_at", int(time.time()))
123 | return token
124 |
125 |
126 | class AuthManager:
127 | """Manages authentication with Meta APIs"""
128 | def __init__(self, app_id: str, redirect_uri: str = AUTH_REDIRECT_URI):
129 | self.app_id = app_id
130 | self.redirect_uri = redirect_uri
131 | self.token_info = None
132 | # Check for Pipeboard token first
133 | self.use_pipeboard = bool(os.environ.get("PIPEBOARD_API_TOKEN", ""))
134 | if not self.use_pipeboard:
135 | self._load_cached_token()
136 |
137 | def _get_token_cache_path(self) -> pathlib.Path:
138 | """Get the platform-specific path for token cache file"""
139 | if platform.system() == "Windows":
140 | base_path = pathlib.Path(os.environ.get("APPDATA", ""))
141 | elif platform.system() == "Darwin": # macOS
142 | base_path = pathlib.Path.home() / "Library" / "Application Support"
143 | else: # Assume Linux/Unix
144 | base_path = pathlib.Path.home() / ".config"
145 |
146 | # Create directory if it doesn't exist
147 | cache_dir = base_path / "meta-ads-mcp"
148 | cache_dir.mkdir(parents=True, exist_ok=True)
149 |
150 | return cache_dir / "token_cache.json"
151 |
152 | def _load_cached_token(self) -> bool:
153 | """Load token from cache if available"""
154 | cache_path = self._get_token_cache_path()
155 |
156 | if not cache_path.exists():
157 | return False
158 |
159 | try:
160 | with open(cache_path, "r") as f:
161 | data = json.load(f)
162 |
163 | # Validate the cached data structure
164 | required_fields = ["access_token", "created_at"]
165 | if not all(field in data for field in required_fields):
166 | logger.warning("Cached token data is missing required fields")
167 | return False
168 |
169 | # Check if the token looks valid (basic format check)
170 | if not data.get("access_token") or len(data["access_token"]) < 20:
171 | logger.warning("Cached token appears malformed")
172 | return False
173 |
174 | self.token_info = TokenInfo.deserialize(data)
175 |
176 | # Check if token is expired
177 | if self.token_info.is_expired():
178 | logger.info("Cached token is expired, removing cache file")
179 | # Remove the expired cache file
180 | try:
181 | cache_path.unlink()
182 | logger.info(f"Removed expired token cache: {cache_path}")
183 | except Exception as e:
184 | logger.warning(f"Could not remove expired cache file: {e}")
185 | self.token_info = None
186 | return False
187 |
188 | # Additional validation: check if token is too old (more than 60 days)
189 | current_time = int(time.time())
190 | if self.token_info.created_at and (current_time - self.token_info.created_at) > (60 * 24 * 3600):
191 | logger.warning("Cached token is too old (more than 60 days), removing cache file")
192 | try:
193 | cache_path.unlink()
194 | logger.info(f"Removed old token cache: {cache_path}")
195 | except Exception as e:
196 | logger.warning(f"Could not remove old cache file: {e}")
197 | self.token_info = None
198 | return False
199 |
200 | logger.info(f"Loaded cached token (expires in {(self.token_info.created_at + self.token_info.expires_in) - int(time.time())} seconds)")
201 | return True
202 | except Exception as e:
203 | logger.error(f"Error loading cached token: {e}")
204 | # If there's any error reading the cache, try to remove the corrupted file
205 | try:
206 | cache_path.unlink()
207 | logger.info(f"Removed corrupted token cache: {cache_path}")
208 | except Exception as cleanup_error:
209 | logger.warning(f"Could not remove corrupted cache file: {cleanup_error}")
210 | return False
211 |
212 | def _save_token_to_cache(self) -> None:
213 | """Save token to cache file"""
214 | if not self.token_info:
215 | return
216 |
217 | cache_path = self._get_token_cache_path()
218 |
219 | try:
220 | with open(cache_path, "w") as f:
221 | json.dump(self.token_info.serialize(), f)
222 | logger.info(f"Token cached at: {cache_path}")
223 | except Exception as e:
224 | logger.error(f"Error saving token to cache: {e}")
225 |
226 | def get_auth_url(self) -> str:
227 | """Generate the Facebook OAuth URL for desktop app flow"""
228 | return (
229 | f"https://www.facebook.com/v22.0/dialog/oauth?"
230 | f"client_id={self.app_id}&"
231 | f"redirect_uri={self.redirect_uri}&"
232 | f"scope={AUTH_SCOPE}&"
233 | f"response_type={AUTH_RESPONSE_TYPE}"
234 | )
235 |
236 | def authenticate(self, force_refresh: bool = False) -> Optional[str]:
237 | """
238 | Authenticate with Meta APIs
239 |
240 | Args:
241 | force_refresh: Force token refresh even if cached token exists
242 |
243 | Returns:
244 | Access token if successful, None otherwise
245 | """
246 | # If Pipeboard auth is available, use that instead
247 | if self.use_pipeboard:
248 | logger.info("Using Pipeboard authentication")
249 | return pipeboard_auth_manager.get_access_token(force_refresh=force_refresh)
250 |
251 | # Otherwise, use the original OAuth flow
252 | # Check if we already have a valid token
253 | if not force_refresh and self.token_info and not self.token_info.is_expired():
254 | return self.token_info.access_token
255 |
256 | # Start the callback server if not already running
257 | try:
258 | port = start_callback_server()
259 |
260 | # Update redirect URI with the actual port
261 | self.redirect_uri = f"http://localhost:{port}/callback"
262 |
263 | # Generate the auth URL
264 | auth_url = self.get_auth_url()
265 |
266 | # Open browser with auth URL
267 | logger.info(f"Opening browser with URL: {auth_url}")
268 | webbrowser.open(auth_url)
269 |
270 | # We don't wait for the token here anymore
271 | # The token will be processed by the callback server
272 | # Just return None to indicate we've started the flow
273 | return None
274 | except Exception as e:
275 | logger.error(f"Failed to start callback server: {e}")
276 | logger.info("Callback server disabled. OAuth authentication flow cannot be used.")
277 | return None
278 |
279 | def get_access_token(self) -> Optional[str]:
280 | """
281 | Get the current access token, refreshing if necessary
282 |
283 | Returns:
284 | Access token if available, None otherwise
285 | """
286 | # If using Pipeboard, always delegate to the Pipeboard auth manager
287 | if self.use_pipeboard:
288 | return pipeboard_auth_manager.get_access_token()
289 |
290 | if not self.token_info or self.token_info.is_expired():
291 | return None
292 |
293 | return self.token_info.access_token
294 |
295 | def invalidate_token(self) -> None:
296 | """Invalidate the current token, usually because it has expired or is invalid"""
297 | # If using Pipeboard, delegate to the Pipeboard auth manager
298 | if self.use_pipeboard:
299 | pipeboard_auth_manager.invalidate_token()
300 | return
301 |
302 | if self.token_info:
303 | logger.info(f"Invalidating token: {self.token_info.access_token[:10]}...")
304 | self.token_info = None
305 |
306 | # Signal that authentication is needed
307 | global needs_authentication
308 | needs_authentication = True
309 |
310 | # Remove the cached token file
311 | try:
312 | cache_path = self._get_token_cache_path()
313 | if cache_path.exists():
314 | os.remove(cache_path)
315 | logger.info(f"Removed cached token file: {cache_path}")
316 | except Exception as e:
317 | logger.error(f"Error removing cached token file: {e}")
318 |
319 | def clear_token(self) -> None:
320 | """Alias for invalidate_token for consistency with other APIs"""
321 | self.invalidate_token()
322 |
323 |
324 | def process_token_response(token_container):
325 | """Process the token response from Facebook."""
326 | global needs_authentication, auth_manager
327 |
328 | if token_container and token_container.get('token'):
329 | logger.info("Processing token response from Facebook OAuth")
330 |
331 | # Exchange the short-lived token for a long-lived token
332 | short_lived_token = token_container['token']
333 | long_lived_token_info = exchange_token_for_long_lived(short_lived_token)
334 |
335 | if long_lived_token_info:
336 | logger.info(f"Successfully exchanged for long-lived token (expires in {long_lived_token_info.expires_in} seconds)")
337 |
338 | try:
339 | auth_manager.token_info = long_lived_token_info
340 | logger.info(f"Long-lived token info set in auth_manager, expires in {long_lived_token_info.expires_in} seconds")
341 | except NameError:
342 | logger.error("auth_manager not defined when trying to process token")
343 |
344 | try:
345 | logger.info("Attempting to save long-lived token to cache")
346 | auth_manager._save_token_to_cache()
347 | logger.info(f"Long-lived token successfully saved to cache at {auth_manager._get_token_cache_path()}")
348 | except Exception as e:
349 | logger.error(f"Error saving token to cache: {e}")
350 |
351 | needs_authentication = False
352 | return True
353 | else:
354 | # Fall back to the short-lived token if exchange fails
355 | logger.warning("Failed to exchange for long-lived token, using short-lived token instead")
356 | token_info = TokenInfo(
357 | access_token=short_lived_token,
358 | expires_in=token_container.get('expires_in', 0)
359 | )
360 |
361 | try:
362 | auth_manager.token_info = token_info
363 | logger.info(f"Short-lived token info set in auth_manager, expires in {token_info.expires_in} seconds")
364 | except NameError:
365 | logger.error("auth_manager not defined when trying to process token")
366 |
367 | try:
368 | logger.info("Attempting to save token to cache")
369 | auth_manager._save_token_to_cache()
370 | logger.info(f"Token successfully saved to cache at {auth_manager._get_token_cache_path()}")
371 | except Exception as e:
372 | logger.error(f"Error saving token to cache: {e}")
373 |
374 | needs_authentication = False
375 | return True
376 | else:
377 | logger.warning("Received empty token in process_token_response")
378 | needs_authentication = True
379 | return False
380 |
381 |
382 | def exchange_token_for_long_lived(short_lived_token):
383 | """
384 | Exchange a short-lived token for a long-lived token (60 days validity).
385 |
386 | Args:
387 | short_lived_token: The short-lived access token received from OAuth flow
388 |
389 | Returns:
390 | TokenInfo object with the long-lived token, or None if exchange failed
391 | """
392 | logger.info("Attempting to exchange short-lived token for long-lived token")
393 |
394 | try:
395 | # Get the app ID from the configuration
396 | app_id = meta_config.get_app_id()
397 |
398 | # Get the app secret - this should be securely stored
399 | app_secret = os.environ.get("META_APP_SECRET", "")
400 |
401 | if not app_id or not app_secret:
402 | logger.error("Missing app_id or app_secret for token exchange")
403 | return None
404 |
405 | # Make the API request to exchange the token
406 | url = "https://graph.facebook.com/v22.0/oauth/access_token"
407 | params = {
408 | "grant_type": "fb_exchange_token",
409 | "client_id": app_id,
410 | "client_secret": app_secret,
411 | "fb_exchange_token": short_lived_token
412 | }
413 |
414 | logger.debug(f"Making token exchange request to {url}")
415 | response = requests.get(url, params=params)
416 |
417 | if response.status_code == 200:
418 | data = response.json()
419 | logger.debug(f"Token exchange response: {data}")
420 |
421 | # Create TokenInfo from the response
422 | # The response includes access_token and expires_in (in seconds)
423 | new_token = data.get("access_token")
424 | expires_in = data.get("expires_in")
425 |
426 | if new_token:
427 | logger.info(f"Received long-lived token, expires in {expires_in} seconds (~{expires_in//86400} days)")
428 | return TokenInfo(
429 | access_token=new_token,
430 | expires_in=expires_in
431 | )
432 | else:
433 | logger.error("No access_token in exchange response")
434 | return None
435 | else:
436 | logger.error(f"Token exchange failed with status {response.status_code}: {response.text}")
437 | return None
438 | except Exception as e:
439 | logger.error(f"Error exchanging token: {e}")
440 | return None
441 |
442 |
443 | async def get_current_access_token() -> Optional[str]:
444 | """Get the current access token from auth manager"""
445 | # Check for environment variable first - this takes highest precedence
446 | env_token = os.environ.get("META_ACCESS_TOKEN")
447 | if env_token:
448 | logger.debug("Using access token from META_ACCESS_TOKEN environment variable")
449 | # Basic validation
450 | if len(env_token) < 20: # Most Meta tokens are much longer
451 | logger.error(f"TOKEN VALIDATION FAILED: Token from environment variable appears malformed (length: {len(env_token)})")
452 | return None
453 | return env_token
454 |
455 | # Use the singleton auth manager
456 | global auth_manager
457 |
458 | # Log the function call and current app ID
459 | logger.debug("get_current_access_token() called")
460 | app_id = meta_config.get_app_id()
461 | logger.debug(f"Current app_id: {app_id}")
462 |
463 | # Check if using Pipeboard authentication
464 | using_pipeboard = auth_manager.use_pipeboard
465 |
466 | # Check if app_id is valid - but only if not using Pipeboard authentication
467 | if not app_id and not using_pipeboard:
468 | logger.error("TOKEN VALIDATION FAILED: No valid app_id configured")
469 | logger.error("Please set META_APP_ID environment variable or configure via meta_config.set_app_id()")
470 | return None
471 |
472 | # Attempt to get access token
473 | try:
474 | token = auth_manager.get_access_token()
475 |
476 | if token:
477 | # Add basic token validation - check if it looks like a valid token
478 | if len(token) < 20: # Most Meta tokens are much longer
479 | logger.error(f"TOKEN VALIDATION FAILED: Token appears malformed (length: {len(token)})")
480 | auth_manager.invalidate_token()
481 | return None
482 |
483 | logger.debug(f"Access token found in auth_manager (starts with: {token[:10]}...)")
484 | return token
485 | else:
486 | logger.warning("No valid access token available in auth_manager")
487 |
488 | # Check why token might be missing
489 | if hasattr(auth_manager, 'token_info') and auth_manager.token_info:
490 | if auth_manager.token_info.is_expired():
491 | logger.error("TOKEN VALIDATION FAILED: Token is expired")
492 | # Add expiration details
493 | if hasattr(auth_manager.token_info, 'expires_in') and auth_manager.token_info.expires_in:
494 | expiry_time = auth_manager.token_info.created_at + auth_manager.token_info.expires_in
495 | current_time = int(time.time())
496 | expired_seconds_ago = current_time - expiry_time
497 | logger.error(f"Token expired {expired_seconds_ago} seconds ago")
498 | elif not auth_manager.token_info.access_token:
499 | logger.error("TOKEN VALIDATION FAILED: Token object exists but access_token is empty")
500 | else:
501 | logger.error("TOKEN VALIDATION FAILED: Token exists but was rejected for unknown reason")
502 | else:
503 | logger.error("TOKEN VALIDATION FAILED: No token information available")
504 |
505 | # Suggest next steps for troubleshooting
506 | logger.error("To fix: Try re-authenticating or check if your token has been revoked")
507 | return None
508 | except Exception as e:
509 | logger.error(f"Error getting access token: {str(e)}")
510 | import traceback
511 | logger.error(f"Token validation stacktrace: {traceback.format_exc()}")
512 | return None
513 |
514 |
515 | def login():
516 | """
517 | Start the login flow to authenticate with Meta
518 | """
519 | print("Starting Meta Ads authentication flow...")
520 |
521 | try:
522 | # Start the callback server first
523 | try:
524 | port = start_callback_server()
525 | except Exception as callback_error:
526 | print(f"Error: {callback_error}")
527 | print("Callback server is disabled. Please use alternative authentication methods:")
528 | print("- Set PIPEBOARD_API_TOKEN environment variable for Pipeboard authentication")
529 | print("- Or provide a direct META_ACCESS_TOKEN environment variable")
530 | return
531 |
532 | # Get the auth URL and open the browser
533 | auth_url = auth_manager.get_auth_url()
534 | print(f"Opening browser with URL: {auth_url}")
535 | webbrowser.open(auth_url)
536 |
537 | # Wait for token to be received
538 | print("Waiting for authentication to complete...")
539 | max_wait = 300 # 5 minutes
540 | wait_interval = 2 # 2 seconds
541 |
542 | for _ in range(max_wait // wait_interval):
543 | if token_container["token"]:
544 | token = token_container["token"]
545 | print("Authentication successful!")
546 | # Verify token works by getting basic user info
547 | try:
548 | from .api import make_api_request
549 | result = asyncio.run(make_api_request("me", token, {}))
550 | print(f"Authenticated as: {result.get('name', 'Unknown')} (ID: {result.get('id', 'Unknown')})")
551 | return
552 | except Exception as e:
553 | print(f"Warning: Could not verify token: {e}")
554 | return
555 | time.sleep(wait_interval)
556 |
557 | print("Authentication timed out. Please try again.")
558 | except Exception as e:
559 | print(f"Error during authentication: {e}")
560 | print(f"Direct authentication URL: {auth_manager.get_auth_url()}")
561 | print("You can manually open this URL in your browser to complete authentication.")
562 |
563 | # Initialize auth manager with a placeholder - will be updated at runtime
564 | META_APP_ID = os.environ.get("META_APP_ID", "YOUR_META_APP_ID")
565 |
566 | # Create the auth manager
567 | auth_manager = AuthManager(META_APP_ID)
```
--------------------------------------------------------------------------------
/meta_ads_mcp/core/pipeboard_auth.py:
--------------------------------------------------------------------------------
```python
1 | """Authentication with Meta Ads API via pipeboard.co."""
2 |
3 | import os
4 | import json
5 | import time
6 | import requests
7 | from pathlib import Path
8 | import platform
9 | from typing import Optional, Dict, Any
10 | from .utils import logger
11 |
12 | # Enable more detailed logging
13 | import logging
14 | logger.setLevel(logging.DEBUG)
15 |
16 | # Base URL for pipeboard API
17 | PIPEBOARD_API_BASE = "https://pipeboard.co/api"
18 |
19 | # Debug message about API base URL
20 | logger.info(f"Pipeboard API base URL: {PIPEBOARD_API_BASE}")
21 |
22 | class TokenInfo:
23 | """Stores token information including expiration"""
24 | def __init__(self, access_token: str, expires_at: Optional[str] = None, token_type: Optional[str] = None):
25 | self.access_token = access_token
26 | self.expires_at = expires_at
27 | self.token_type = token_type
28 | self.created_at = int(time.time())
29 | logger.debug(f"TokenInfo created. Expires at: {expires_at if expires_at else 'Not specified'}")
30 |
31 | def is_expired(self) -> bool:
32 | """Check if the token is expired"""
33 | if not self.expires_at:
34 | logger.debug("No expiration date set for token, assuming not expired")
35 | return False # If no expiration is set, assume it's not expired
36 |
37 | # Parse ISO 8601 date format to timestamp
38 | try:
39 | # Convert the expires_at string to a timestamp
40 | # Format is like "2023-12-31T23:59:59.999Z" or "2023-12-31T23:59:59.999+00:00"
41 | from datetime import datetime
42 |
43 | # Remove the Z suffix if present and handle +00:00 format
44 | expires_at_str = self.expires_at
45 | if expires_at_str.endswith('Z'):
46 | expires_at_str = expires_at_str[:-1] # Remove Z
47 |
48 | # Handle microseconds if present
49 | if '.' in expires_at_str:
50 | datetime_format = "%Y-%m-%dT%H:%M:%S.%f"
51 | else:
52 | datetime_format = "%Y-%m-%dT%H:%M:%S"
53 |
54 | # Handle timezone offset
55 | timezone_offset = "+00:00"
56 | if "+" in expires_at_str:
57 | expires_at_str, timezone_offset = expires_at_str.split("+")
58 | timezone_offset = "+" + timezone_offset
59 |
60 | # Parse the datetime without timezone info
61 | expires_datetime = datetime.strptime(expires_at_str, datetime_format)
62 |
63 | # Convert to timestamp (assume UTC)
64 | expires_timestamp = expires_datetime.timestamp()
65 | current_time = time.time()
66 |
67 | # Check if token is expired and log result
68 | is_expired = current_time > expires_timestamp
69 | time_diff = expires_timestamp - current_time
70 | if is_expired:
71 | logger.debug(f"Token is expired! Current time: {datetime.fromtimestamp(current_time)}, "
72 | f"Expires at: {datetime.fromtimestamp(expires_timestamp)}, "
73 | f"Expired {abs(time_diff):.0f} seconds ago")
74 | else:
75 | logger.debug(f"Token is still valid. Expires at: {datetime.fromtimestamp(expires_timestamp)}, "
76 | f"Time remaining: {time_diff:.0f} seconds")
77 |
78 | return is_expired
79 | except Exception as e:
80 | logger.error(f"Error parsing expiration date: {e}")
81 | # Log the actual value to help diagnose format issues
82 | logger.error(f"Invalid expires_at value: '{self.expires_at}'")
83 | # Log detailed error information
84 | import traceback
85 | logger.error(f"Traceback: {traceback.format_exc()}")
86 | return False # If we can't parse the date, assume it's not expired
87 |
88 | def serialize(self) -> Dict[str, Any]:
89 | """Convert to a dictionary for storage"""
90 | return {
91 | "access_token": self.access_token,
92 | "expires_at": self.expires_at,
93 | "token_type": self.token_type,
94 | "created_at": self.created_at
95 | }
96 |
97 | @classmethod
98 | def deserialize(cls, data: Dict[str, Any]) -> 'TokenInfo':
99 | """Create from a stored dictionary"""
100 | logger.debug(f"Deserializing token data with keys: {', '.join(data.keys())}")
101 | if 'expires_at' in data:
102 | logger.debug(f"Token expires_at from cache: {data['expires_at']}")
103 |
104 | token = cls(
105 | access_token=data.get("access_token", ""),
106 | expires_at=data.get("expires_at"),
107 | token_type=data.get("token_type")
108 | )
109 | token.created_at = data.get("created_at", int(time.time()))
110 | return token
111 |
112 |
113 | class PipeboardAuthManager:
114 | """Manages authentication with Meta APIs via pipeboard.co"""
115 | def __init__(self):
116 | self.api_token = os.environ.get("PIPEBOARD_API_TOKEN", "")
117 | logger.debug(f"PipeboardAuthManager initialized with API token: {self.api_token[:5]}..." if self.api_token else "No API token")
118 | if self.api_token:
119 | logger.info("Pipeboard authentication enabled. Will use pipeboard.co for Meta authentication.")
120 | else:
121 | logger.info("Pipeboard authentication not enabled. Set PIPEBOARD_API_TOKEN environment variable to enable.")
122 | self.token_info = None
123 | # Note: Token caching is disabled to always fetch fresh tokens from Pipeboard
124 |
125 | def _get_token_cache_path(self) -> Path:
126 | """Get the platform-specific path for token cache file"""
127 | if platform.system() == "Windows":
128 | base_path = Path(os.environ.get("APPDATA", ""))
129 | elif platform.system() == "Darwin": # macOS
130 | base_path = Path.home() / "Library" / "Application Support"
131 | else: # Assume Linux/Unix
132 | base_path = Path.home() / ".config"
133 |
134 | # Create directory if it doesn't exist
135 | cache_dir = base_path / "meta-ads-mcp"
136 | cache_dir.mkdir(parents=True, exist_ok=True)
137 |
138 | cache_path = cache_dir / "pipeboard_token_cache.json"
139 | logger.debug(f"Token cache path: {cache_path}")
140 | return cache_path
141 |
142 | def _load_cached_token(self) -> bool:
143 | """Load token from cache if available"""
144 | cache_path = self._get_token_cache_path()
145 |
146 | if not cache_path.exists():
147 | logger.debug(f"Token cache file not found at {cache_path}")
148 | return False
149 |
150 | try:
151 | with open(cache_path, "r") as f:
152 | logger.debug(f"Reading token cache from {cache_path}")
153 | data = json.load(f)
154 |
155 | # Validate the cached data structure
156 | required_fields = ["access_token"]
157 | if not all(field in data for field in required_fields):
158 | logger.warning("Cached token data is missing required fields")
159 | return False
160 |
161 | # Check if the token looks valid (basic format check)
162 | if not data.get("access_token") or len(data["access_token"]) < 20:
163 | logger.warning("Cached token appears malformed")
164 | return False
165 |
166 | self.token_info = TokenInfo.deserialize(data)
167 |
168 | # Log token details (partial token for security)
169 | masked_token = self.token_info.access_token[:10] + "..." + self.token_info.access_token[-5:] if self.token_info.access_token else "None"
170 | logger.debug(f"Loaded token: {masked_token}")
171 |
172 | # Check if token is expired
173 | if self.token_info.is_expired():
174 | logger.info("Cached token is expired, removing cache file")
175 | # Remove the expired cache file
176 | try:
177 | cache_path.unlink()
178 | logger.info(f"Removed expired token cache: {cache_path}")
179 | except Exception as e:
180 | logger.warning(f"Could not remove expired cache file: {e}")
181 | self.token_info = None
182 | return False
183 |
184 | # Additional validation: check if token is too old (more than 60 days)
185 | current_time = int(time.time())
186 | if self.token_info.created_at and (current_time - self.token_info.created_at) > (60 * 24 * 3600):
187 | logger.warning("Cached token is too old (more than 60 days), removing cache file")
188 | try:
189 | cache_path.unlink()
190 | logger.info(f"Removed old token cache: {cache_path}")
191 | except Exception as e:
192 | logger.warning(f"Could not remove old cache file: {e}")
193 | self.token_info = None
194 | return False
195 |
196 | logger.info(f"Loaded cached token (expires at {self.token_info.expires_at})")
197 | return True
198 | except json.JSONDecodeError as e:
199 | logger.error(f"Error parsing token cache file: {e}")
200 | logger.debug("Token cache file might be corrupted, trying to read raw content")
201 | try:
202 | with open(cache_path, "r") as f:
203 | raw_content = f.read()
204 | logger.debug(f"Raw cache file content (first 100 chars): {raw_content[:100]}")
205 | except Exception as e2:
206 | logger.error(f"Could not read raw cache file: {e2}")
207 | # If there's any error reading the cache, try to remove the corrupted file
208 | try:
209 | cache_path.unlink()
210 | logger.info(f"Removed corrupted token cache: {cache_path}")
211 | except Exception as cleanup_error:
212 | logger.warning(f"Could not remove corrupted cache file: {cleanup_error}")
213 | return False
214 | except Exception as e:
215 | logger.error(f"Error loading cached token: {e}")
216 | # If there's any error reading the cache, try to remove the corrupted file
217 | try:
218 | cache_path.unlink()
219 | logger.info(f"Removed corrupted token cache: {cache_path}")
220 | except Exception as cleanup_error:
221 | logger.warning(f"Could not remove corrupted cache file: {cleanup_error}")
222 | return False
223 |
224 | def _save_token_to_cache(self) -> None:
225 | """Save token to cache file"""
226 | if not self.token_info:
227 | logger.debug("No token to save to cache")
228 | return
229 |
230 | cache_path = self._get_token_cache_path()
231 |
232 | try:
233 | token_data = self.token_info.serialize()
234 | logger.debug(f"Saving token to cache. Expires at: {token_data.get('expires_at')}")
235 |
236 | with open(cache_path, "w") as f:
237 | json.dump(token_data, f)
238 | logger.info(f"Token cached at: {cache_path}")
239 | except Exception as e:
240 | logger.error(f"Error saving token to cache: {e}")
241 |
242 | def initiate_auth_flow(self) -> Dict[str, str]:
243 | """
244 | Initiate the Meta OAuth flow via pipeboard.co
245 |
246 | Returns:
247 | Dict with loginUrl and status info
248 | """
249 | if not self.api_token:
250 | logger.error("No PIPEBOARD_API_TOKEN environment variable set")
251 | raise ValueError("No PIPEBOARD_API_TOKEN environment variable set")
252 |
253 | # Exactly match the format used in meta_auth_test.sh
254 | url = f"{PIPEBOARD_API_BASE}/meta/auth?api_token={self.api_token}"
255 | headers = {
256 | "Content-Type": "application/json"
257 | }
258 |
259 | logger.info(f"Initiating auth flow with POST request to {url}")
260 |
261 | try:
262 | # Make the POST request exactly as in the working meta_auth_test.sh script
263 | response = requests.post(url, headers=headers)
264 | logger.info(f"Auth flow response status: {response.status_code}")
265 |
266 | # Better error handling
267 | if response.status_code != 200:
268 | logger.error(f"Auth flow error: HTTP {response.status_code}")
269 | error_text = response.text if response.text else "No response content"
270 | logger.error(f"Response content: {error_text}")
271 | if response.status_code == 404:
272 | raise ValueError(f"Pipeboard API endpoint not found. Check if the server is running at {PIPEBOARD_API_BASE}")
273 | elif response.status_code == 401:
274 | raise ValueError(f"Unauthorized: Invalid API token. Check your PIPEBOARD_API_TOKEN.")
275 |
276 | response.raise_for_status()
277 |
278 | # Parse the response
279 | try:
280 | data = response.json()
281 | logger.info(f"Received response keys: {', '.join(data.keys())}")
282 | except json.JSONDecodeError:
283 | logger.error(f"Could not parse JSON response: {response.text}")
284 | raise ValueError(f"Invalid JSON response from auth endpoint: {response.text[:100]}")
285 |
286 | # Log auth flow response (without sensitive information)
287 | if 'loginUrl' in data:
288 | logger.info(f"Auth flow initiated successfully with login URL: {data['loginUrl'][:30]}...")
289 | else:
290 | logger.warning(f"Auth flow response missing loginUrl field. Response keys: {', '.join(data.keys())}")
291 |
292 | return data
293 | except requests.exceptions.ConnectionError as e:
294 | logger.error(f"Connection error to Pipeboard: {e}")
295 | logger.debug(f"Attempting to connect to: {PIPEBOARD_API_BASE}")
296 | raise
297 | except requests.exceptions.Timeout as e:
298 | logger.error(f"Timeout connecting to Pipeboard: {e}")
299 | raise
300 | except requests.exceptions.RequestException as e:
301 | logger.error(f"Error initiating auth flow: {e}")
302 | raise
303 | except Exception as e:
304 | logger.error(f"Unexpected error initiating auth flow: {e}")
305 | raise
306 |
307 | def get_access_token(self, force_refresh: bool = False) -> Optional[str]:
308 | """
309 | Get the current access token, refreshing if necessary or if forced
310 |
311 | Args:
312 | force_refresh: Force token refresh even if cached token exists
313 |
314 | Returns:
315 | Access token if available, None otherwise
316 | """
317 | # First check if API token is configured
318 | if not self.api_token:
319 | logger.error("TOKEN VALIDATION FAILED: No Pipeboard API token configured")
320 | logger.error("Please set PIPEBOARD_API_TOKEN environment variable")
321 | return None
322 |
323 | logger.info("Getting fresh token from Pipeboard (caching disabled)")
324 |
325 | # If force refresh or no token/expired token, get a new one from Pipeboard
326 | try:
327 | # Make a request to get the token, using the same URL format as initiate_auth_flow
328 | url = f"{PIPEBOARD_API_BASE}/meta/token?api_token={self.api_token}"
329 | headers = {
330 | "Content-Type": "application/json"
331 | }
332 |
333 | logger.info(f"Requesting token from {url}")
334 |
335 | # Add timeout for better error messages
336 | try:
337 | response = requests.get(url, headers=headers, timeout=10)
338 | except requests.exceptions.Timeout:
339 | logger.error("TOKEN VALIDATION FAILED: Timeout while connecting to Pipeboard API")
340 | logger.error(f"Could not connect to {PIPEBOARD_API_BASE} within 10 seconds")
341 | return None
342 | except requests.exceptions.ConnectionError:
343 | logger.error("TOKEN VALIDATION FAILED: Connection error with Pipeboard API")
344 | logger.error(f"Could not connect to {PIPEBOARD_API_BASE} - check if service is running")
345 | return None
346 |
347 | logger.info(f"Token request response status: {response.status_code}")
348 |
349 | # Better error handling with response content
350 | if response.status_code != 200:
351 | logger.error(f"TOKEN VALIDATION FAILED: HTTP error {response.status_code}")
352 | error_text = response.text if response.text else "No response content"
353 | logger.error(f"Response content: {error_text}")
354 |
355 | # Add more specific error messages for common status codes
356 | if response.status_code == 401:
357 | logger.error("Authentication failed: Invalid Pipeboard API token")
358 | elif response.status_code == 404:
359 | logger.error("Endpoint not found: Check if Pipeboard API service is running correctly")
360 | elif response.status_code == 400:
361 | logger.error("Bad request: The request to Pipeboard API was malformed")
362 |
363 | response.raise_for_status()
364 |
365 | try:
366 | data = response.json()
367 | logger.info(f"Received token response with keys: {', '.join(data.keys())}")
368 | except json.JSONDecodeError:
369 | logger.error("TOKEN VALIDATION FAILED: Invalid JSON response from Pipeboard API")
370 | logger.error(f"Response content (first 100 chars): {response.text[:100]}")
371 | return None
372 |
373 | # Validate response data
374 | if "access_token" not in data:
375 | logger.error("TOKEN VALIDATION FAILED: No access_token in Pipeboard API response")
376 | logger.error(f"Response keys: {', '.join(data.keys())}")
377 | if "error" in data:
378 | logger.error(f"Error details: {data['error']}")
379 | else:
380 | logger.error("No error information available in response")
381 | return None
382 |
383 | # Create new token info
384 | self.token_info = TokenInfo(
385 | access_token=data.get("access_token"),
386 | expires_at=data.get("expires_at"),
387 | token_type=data.get("token_type", "bearer")
388 | )
389 |
390 | # Note: Token caching is disabled
391 |
392 | masked_token = self.token_info.access_token[:10] + "..." + self.token_info.access_token[-5:] if self.token_info.access_token else "None"
393 | logger.info(f"Successfully retrieved access token: {masked_token}")
394 | return self.token_info.access_token
395 | except requests.RequestException as e:
396 | status_code = e.response.status_code if hasattr(e, 'response') and e.response else None
397 | response_text = e.response.text if hasattr(e, 'response') and e.response else "No response"
398 |
399 | if status_code == 401:
400 | logger.error(f"Unauthorized: Check your PIPEBOARD_API_TOKEN. Response: {response_text}")
401 | elif status_code == 404:
402 | logger.error(f"No token available: You might need to complete authorization first. Response: {response_text}")
403 | # Return None so caller can handle the auth flow
404 | return None
405 | else:
406 | logger.error(f"Error getting access token (status {status_code}): {e}")
407 | logger.error(f"Response content: {response_text}")
408 | return None
409 | except Exception as e:
410 | logger.error(f"Unexpected error getting access token: {e}")
411 | return None
412 |
413 | def invalidate_token(self) -> None:
414 | """Invalidate the current token, usually because it has expired or is invalid"""
415 | if self.token_info:
416 | logger.info(f"Invalidating token: {self.token_info.access_token[:10]}...")
417 | self.token_info = None
418 |
419 | # Remove the cached token file
420 | try:
421 | cache_path = self._get_token_cache_path()
422 | if cache_path.exists():
423 | os.remove(cache_path)
424 | logger.info(f"Removed cached token file: {cache_path}")
425 | else:
426 | logger.debug(f"No token cache file to remove: {cache_path}")
427 | except Exception as e:
428 | logger.error(f"Error removing cached token file: {e}")
429 | else:
430 | logger.debug("No token to invalidate")
431 |
432 | def test_token_validity(self) -> bool:
433 | """
434 | Test if the current token is valid with the Meta Graph API
435 |
436 | Returns:
437 | True if valid, False otherwise
438 | """
439 | if not self.token_info or not self.token_info.access_token:
440 | logger.debug("No token to test")
441 | logger.error("TOKEN VALIDATION FAILED: Missing token to test")
442 | return False
443 |
444 | # Log token details for debugging (partial token for security)
445 | masked_token = self.token_info.access_token[:5] + "..." + self.token_info.access_token[-5:] if self.token_info.access_token else "None"
446 | token_type = self.token_info.token_type if hasattr(self.token_info, 'token_type') and self.token_info.token_type else "bearer"
447 | logger.debug(f"Testing token validity (token: {masked_token}, type: {token_type})")
448 |
449 | try:
450 | # Make a simple request to the /me endpoint to test the token
451 | META_GRAPH_API_VERSION = "v22.0"
452 | url = f"https://graph.facebook.com/{META_GRAPH_API_VERSION}/me"
453 | headers = {"Authorization": f"Bearer {self.token_info.access_token}"}
454 |
455 | logger.debug(f"Testing token validity with request to {url}")
456 |
457 | # Add timeout and better error handling
458 | try:
459 | response = requests.get(url, headers=headers, timeout=10)
460 | except requests.exceptions.Timeout:
461 | logger.error("TOKEN VALIDATION FAILED: Timeout while connecting to Meta API")
462 | logger.error("The Graph API did not respond within 10 seconds")
463 | return False
464 | except requests.exceptions.ConnectionError:
465 | logger.error("TOKEN VALIDATION FAILED: Connection error with Meta API")
466 | logger.error("Could not establish connection to Graph API - check network connectivity")
467 | return False
468 |
469 | if response.status_code == 200:
470 | data = response.json()
471 | logger.debug(f"Token is valid. User ID: {data.get('id')}")
472 | # Add more useful user information for debugging
473 | user_info = f"User ID: {data.get('id')}"
474 | if 'name' in data:
475 | user_info += f", Name: {data.get('name')}"
476 | logger.info(f"Meta API token validated successfully ({user_info})")
477 | return True
478 | else:
479 | logger.error(f"TOKEN VALIDATION FAILED: API returned status {response.status_code}")
480 |
481 | # Try to parse the error response for more detailed information
482 | try:
483 | error_data = response.json()
484 | if 'error' in error_data:
485 | error_obj = error_data.get('error', {})
486 | error_code = error_obj.get('code', 'unknown')
487 | error_message = error_obj.get('message', 'Unknown error')
488 | logger.error(f"Meta API error: Code {error_code} - {error_message}")
489 |
490 | # Add specific guidance for common error codes
491 | if error_code == 190:
492 | logger.error("Error indicates the token is invalid or has expired")
493 | elif error_code == 4:
494 | logger.error("Error indicates rate limiting - too many requests")
495 | elif error_code == 200:
496 | logger.error("Error indicates API permissions or configuration issue")
497 | else:
498 | logger.error(f"No error object in response: {error_data}")
499 | except json.JSONDecodeError:
500 | logger.error(f"Could not parse error response: {response.text[:200]}")
501 |
502 | return False
503 | except Exception as e:
504 | logger.error(f"TOKEN VALIDATION FAILED: Unexpected error: {str(e)}")
505 |
506 | # Add stack trace for debugging complex issues
507 | import traceback
508 | logger.error(f"Stack trace: {traceback.format_exc()}")
509 |
510 | return False
511 |
512 |
513 | # Create singleton instance
514 | pipeboard_auth_manager = PipeboardAuthManager()
```