This is page 2 of 5. Use http://codebase.md/nictuku/meta-ads-mcp?page={x} to view the full context.
# Directory Structure
```
├── .github
│ └── workflows
│ ├── publish-mcp.yml
│ ├── publish.yml
│ └── test.yml
├── .gitignore
├── .python-version
├── .uv.toml
├── CUSTOM_META_APP.md
├── Dockerfile
├── examples
│ ├── example_http_client.py
│ └── README.md
├── future_improvements.md
├── images
│ └── meta-ads-example.png
├── LICENSE
├── LOCAL_INSTALLATION.md
├── meta_ads_auth.sh
├── meta_ads_mcp
│ ├── __init__.py
│ ├── __main__.py
│ └── core
│ ├── __init__.py
│ ├── accounts.py
│ ├── ads_library.py
│ ├── ads.py
│ ├── adsets.py
│ ├── api.py
│ ├── auth.py
│ ├── authentication.py
│ ├── budget_schedules.py
│ ├── callback_server.py
│ ├── campaigns.py
│ ├── duplication.py
│ ├── http_auth_integration.py
│ ├── insights.py
│ ├── openai_deep_research.py
│ ├── pipeboard_auth.py
│ ├── reports.py
│ ├── resources.py
│ ├── server.py
│ ├── targeting.py
│ └── utils.py
├── META_API_NOTES.md
├── poetry.lock
├── pyproject.toml
├── README.md
├── RELEASE.md
├── requirements.txt
├── server.json
├── setup.py
├── smithery.yaml
├── STREAMABLE_HTTP_SETUP.md
└── tests
├── __init__.py
├── conftest.py
├── e2e_account_info_search_issue.py
├── README_REGRESSION_TESTS.md
├── README.md
├── test_account_info_access_fix.py
├── test_account_search.py
├── test_budget_update_e2e.py
├── test_budget_update.py
├── test_create_ad_creative_simple.py
├── test_create_simple_creative_e2e.py
├── test_dsa_beneficiary.py
├── test_dsa_integration.py
├── test_duplication_regression.py
├── test_duplication.py
├── test_dynamic_creatives.py
├── test_estimate_audience_size_e2e.py
├── test_estimate_audience_size.py
├── test_get_account_pages.py
├── test_get_ad_creatives_fix.py
├── test_get_ad_image_quality_improvements.py
├── test_get_ad_image_regression.py
├── test_http_transport.py
├── test_insights_actions_and_values_e2e.py
├── test_insights_pagination.py
├── test_integration_openai_mcp.py
├── test_is_dynamic_creative_adset.py
├── test_mobile_app_adset_creation.py
├── test_mobile_app_adset_issue.py
├── test_openai_mcp_deep_research.py
├── test_openai.py
├── test_page_discovery_integration.py
├── test_page_discovery.py
├── test_targeting_search_e2e.py
├── test_targeting.py
├── test_update_ad_creative_id.py
└── test_upload_ad_image.py
```
# Files
--------------------------------------------------------------------------------
/tests/test_page_discovery_integration.py:
--------------------------------------------------------------------------------
```python
"""
Integration tests for page discovery functionality.
Tests the complete workflow from page discovery to ad creative creation.
"""
import pytest
import json
from unittest.mock import AsyncMock, patch
from meta_ads_mcp.core.ads import create_ad_creative, search_pages_by_name, get_account_pages
class TestPageDiscoveryIntegration:
"""Integration tests for page discovery functionality."""
@pytest.mark.asyncio
async def test_end_to_end_page_discovery_in_create_ad_creative(self):
"""Test that create_ad_creative automatically discovers pages when no page_id is provided."""
# Mock the page discovery to return a successful result
mock_discovery_result = {
"success": True,
"page_id": "123456789",
"page_name": "Test Page",
"source": "tracking_specs"
}
# Mock the API request for creating the creative (will fail due to invalid image, but that's expected)
mock_creative_response = {
"error": {
"message": "Invalid parameter",
"type": "OAuthException",
"code": 100,
"error_subcode": 2446386,
"error_user_title": "Image Not Found",
"error_user_msg": "The image you selected is not available."
}
}
with patch('meta_ads_mcp.core.ads._discover_pages_for_account') as mock_discover, \
patch('meta_ads_mcp.core.ads.make_api_request') as mock_api, \
patch('meta_ads_mcp.core.auth.get_current_access_token') as mock_get_token:
# Provide a valid access token to bypass authentication
mock_get_token.return_value = "test_token_123"
mock_discover.return_value = mock_discovery_result
mock_api.return_value = mock_creative_response
# Call create_ad_creative without providing page_id
result = await create_ad_creative(
account_id="act_123456789",
name="Test Creative",
image_hash="test_hash_123",
message="Test message",
access_token="test_token_123" # Provide explicit token
)
result_data = json.loads(result)
# Handle MCP wrapper - check if result is wrapped in 'data' field
if "data" in result_data:
actual_result = json.loads(result_data["data"])
else:
actual_result = result_data
# Verify that the function attempted to create a creative (even though it failed due to invalid image)
assert "error" in actual_result
assert "Image Not Found" in actual_result["error"]["error_user_title"]
# Verify that page discovery was called
mock_discover.assert_called_once_with("act_123456789", "test_token_123")
@pytest.mark.asyncio
async def test_search_pages_by_name_integration(self):
"""Test the complete search_pages_by_name function with real-like data."""
# Mock the page discovery to return a successful result
mock_discovery_result = {
"success": True,
"page_id": "123456789",
"page_name": "Injury Payouts",
"source": "tracking_specs"
}
with patch('meta_ads_mcp.core.ads._discover_pages_for_account') as mock_discover, \
patch('meta_ads_mcp.core.auth.get_current_access_token') as mock_get_token:
# Provide a valid access token to bypass authentication
mock_get_token.return_value = "test_token_123"
mock_discover.return_value = mock_discovery_result
# Test searching for pages
result = await search_pages_by_name(
account_id="act_123456789",
search_term="Injury",
access_token="test_token_123" # Provide explicit token
)
result_data = json.loads(result)
# Handle MCP wrapper - check if result is wrapped in 'data' field
if "data" in result_data and isinstance(result_data["data"], str):
actual_result = json.loads(result_data["data"])
else:
actual_result = result_data
# Verify the search results
assert len(actual_result["data"]) == 1
assert actual_result["data"][0]["id"] == "123456789"
assert actual_result["data"][0]["name"] == "Injury Payouts"
assert actual_result["search_term"] == "Injury"
assert actual_result["total_found"] == 1
assert actual_result["total_available"] == 1
@pytest.mark.asyncio
async def test_create_ad_creative_with_manual_page_id(self):
"""Test that create_ad_creative works with manually provided page_id."""
# Mock the API request for creating the creative
mock_creative_response = {
"error": {
"message": "Invalid parameter",
"type": "OAuthException",
"code": 100,
"error_subcode": 2446386,
"error_user_title": "Image Not Found",
"error_user_msg": "The image you selected is not available."
}
}
with patch('meta_ads_mcp.core.ads.make_api_request') as mock_api, \
patch('meta_ads_mcp.core.auth.get_current_access_token') as mock_get_token:
# Provide a valid access token to bypass authentication
mock_get_token.return_value = "test_token_123"
mock_api.return_value = mock_creative_response
# Call create_ad_creative with a manual page_id
result = await create_ad_creative(
account_id="act_123456789",
name="Test Creative",
image_hash="test_hash_123",
page_id="123456789", # Manual page ID
message="Test message",
access_token="test_token_123" # Provide explicit token
)
result_data = json.loads(result)
# Handle MCP wrapper - check if result is wrapped in 'data' field
if "data" in result_data:
actual_result = json.loads(result_data["data"])
else:
actual_result = result_data
# Verify that the function attempted to create a creative
assert "error" in actual_result
assert "Image Not Found" in actual_result["error"]["error_user_title"]
# Verify that make_api_request was called for creating the creative
mock_api.assert_called_once()
@pytest.mark.asyncio
async def test_create_ad_creative_no_pages_found(self):
"""Test create_ad_creative when no pages are found."""
# Mock the page discovery to return no pages
mock_discovery_result = {
"success": False,
"message": "No suitable pages found for this account"
}
with patch('meta_ads_mcp.core.ads._discover_pages_for_account') as mock_discover, \
patch('meta_ads_mcp.core.auth.get_current_access_token') as mock_get_token:
# Provide a valid access token to bypass authentication
mock_get_token.return_value = "test_token_123"
mock_discover.return_value = mock_discovery_result
# Call create_ad_creative without providing page_id
result = await create_ad_creative(
account_id="act_123456789",
name="Test Creative",
image_hash="test_hash_123",
message="Test message",
access_token="test_token_123" # Provide explicit token
)
result_data = json.loads(result)
# Handle MCP wrapper - check if result is wrapped in 'data' field
if "data" in result_data:
actual_result = json.loads(result_data["data"])
else:
actual_result = result_data
# Verify that the function returned an error about no pages found
assert "error" in actual_result
assert "No page ID provided and no suitable pages found" in actual_result["error"]
assert "suggestions" in actual_result
@pytest.mark.asyncio
async def test_search_pages_by_name_no_search_term(self):
"""Test search_pages_by_name without a search term (should return all pages)."""
# Mock the page discovery to return a successful result
mock_discovery_result = {
"success": True,
"page_id": "123456789",
"page_name": "Test Page",
"source": "tracking_specs"
}
with patch('meta_ads_mcp.core.ads._discover_pages_for_account') as mock_discover, \
patch('meta_ads_mcp.core.auth.get_current_access_token') as mock_get_token:
# Provide a valid access token to bypass authentication
mock_get_token.return_value = "test_token_123"
mock_discover.return_value = mock_discovery_result
# Test searching without a search term
result = await search_pages_by_name(
account_id="act_123456789",
access_token="test_token_123" # Provide explicit token
)
result_data = json.loads(result)
# Handle MCP wrapper - check if result is wrapped in 'data' field
if "data" in result_data and isinstance(result_data["data"], str):
actual_result = json.loads(result_data["data"])
else:
actual_result = result_data
# Verify the results
assert len(actual_result["data"]) == 1
assert actual_result["data"][0]["id"] == "123456789"
assert actual_result["data"][0]["name"] == "Test Page"
assert actual_result["total_available"] == 1
assert "note" in actual_result
@pytest.mark.asyncio
async def test_search_pages_by_name_no_matches(self):
"""Test search_pages_by_name when no pages match the search term."""
# Mock the page discovery to return a successful result
mock_discovery_result = {
"success": True,
"page_id": "123456789",
"page_name": "Test Page",
"source": "tracking_specs"
}
with patch('meta_ads_mcp.core.ads._discover_pages_for_account') as mock_discover, \
patch('meta_ads_mcp.core.auth.get_current_access_token') as mock_get_token:
# Provide a valid access token to bypass authentication
mock_get_token.return_value = "test_token_123"
mock_discover.return_value = mock_discovery_result
# Test searching for a term that doesn't match
result = await search_pages_by_name(
account_id="act_123456789",
search_term="Nonexistent",
access_token="test_token_123" # Provide explicit token
)
result_data = json.loads(result)
# Handle MCP wrapper - check if result is wrapped in 'data' field
if "data" in result_data and isinstance(result_data["data"], str):
actual_result = json.loads(result_data["data"])
else:
actual_result = result_data
# Verify that no pages were found
assert len(actual_result["data"]) == 0
assert actual_result["search_term"] == "Nonexistent"
assert actual_result["total_found"] == 0
assert actual_result["total_available"] == 1
if __name__ == "__main__":
pytest.main([__file__])
```
--------------------------------------------------------------------------------
/tests/e2e_account_info_search_issue.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
E2E Test for Account Info Search Issue
This test reproduces the issue reported by a user where get_account_info
cannot find account ID 414174661097171, while get_ad_accounts can see it.
Usage:
1. Start the server: uv run python -m meta_ads_mcp --transport streamable-http --port 8080
2. Run test: uv run python tests/e2e_account_info_search_issue.py
Or with pytest (manual only):
uv run python -m pytest tests/e2e_account_info_search_issue.py -v -m e2e
"""
import pytest
import requests
import json
from typing import Dict, Any
@pytest.mark.e2e
@pytest.mark.skip(reason="E2E test - run manually only")
class TestAccountInfoSearchIssue:
"""E2E test for account info search issue"""
def __init__(self, base_url: str = "http://localhost:8080"):
self.base_url = base_url.rstrip('/')
self.endpoint = f"{self.base_url}/mcp/"
self.request_id = 1
self.target_account_id = "414174661097171"
def test_get_ad_accounts_can_see_target_account(self):
"""Verify get_ad_accounts can see account 414174661097171"""
print(f"\n🔍 Testing if get_ad_accounts can see account {self.target_account_id}")
params = {
"name": "get_ad_accounts",
"arguments": {}
}
result = self._make_request("tools/call", params)
if not result["success"]:
return {
"success": False,
"error": f"Request failed: {result.get('error', 'Unknown error')}",
"status_code": result["status_code"]
}
try:
response_data = result["json"]["result"]
content = response_data.get("content", [{}])[0].get("text", "")
parsed_content = json.loads(content)
# Check for errors first
error_info = self._check_for_errors(parsed_content)
if error_info["has_error"]:
return {
"success": False,
"error": f"get_ad_accounts returned error: {error_info['error_message']}",
"error_format": error_info["format"]
}
# Extract accounts data
accounts = []
if "data" in parsed_content:
data = parsed_content["data"]
# Handle case where data is already parsed (list/dict)
if isinstance(data, list):
accounts = data
elif isinstance(data, dict) and "data" in data:
accounts = data["data"]
# Handle case where data is a JSON string that needs parsing
elif isinstance(data, str):
try:
parsed_data = json.loads(data)
if isinstance(parsed_data, list):
accounts = parsed_data
elif isinstance(parsed_data, dict) and "data" in parsed_data:
accounts = parsed_data["data"]
except json.JSONDecodeError:
pass
elif isinstance(parsed_content, list):
accounts = parsed_content
# Search for target account
found_account = None
for account in accounts:
if isinstance(account, dict):
account_id = account.get("id", "").replace("act_", "")
if account_id == self.target_account_id:
found_account = account
break
result_data = {
"success": True,
"total_accounts": len(accounts),
"target_account_found": found_account is not None,
"found_account_details": found_account if found_account else None,
"all_account_ids": [
acc.get("id", "").replace("act_", "")
for acc in accounts
if isinstance(acc, dict) and acc.get("id")
]
}
print(f"✅ get_ad_accounts results:")
print(f" Total accounts found: {result_data['total_accounts']}")
print(f" Target account {self.target_account_id} found: {result_data['target_account_found']}")
if found_account:
print(f" Account details: {found_account}")
return result_data
except json.JSONDecodeError as e:
return {
"success": False,
"error": f"Could not parse get_ad_accounts response: {str(e)}",
"raw_content": content
}
def test_get_account_info_cannot_find_target_account(self):
"""Verify get_account_info cannot find account 414174661097171"""
print(f"\n🔍 Testing if get_account_info can find account {self.target_account_id}")
params = {
"name": "get_account_info",
"arguments": {
"account_id": self.target_account_id
}
}
result = self._make_request("tools/call", params)
if not result["success"]:
return {
"success": False,
"error": f"Request failed: {result.get('error', 'Unknown error')}",
"status_code": result["status_code"]
}
try:
response_data = result["json"]["result"]
content = response_data.get("content", [{}])[0].get("text", "")
parsed_content = json.loads(content)
# Check for errors
error_info = self._check_for_errors(parsed_content)
result_data = {
"success": True,
"has_error": error_info["has_error"],
"error_message": error_info.get("error_message", ""),
"error_format": error_info.get("format", ""),
"raw_response": parsed_content
}
print(f"✅ get_account_info results:")
print(f" Has error: {result_data['has_error']}")
if result_data['has_error']:
print(f" Error message: {result_data['error_message']}")
print(f" Error format: {result_data['error_format']}")
else:
print(f" Unexpected success: {parsed_content}")
return result_data
except json.JSONDecodeError as e:
return {
"success": False,
"error": f"Could not parse get_account_info response: {str(e)}",
"raw_content": content
}
def _check_for_errors(self, parsed_content: Dict[str, Any]) -> Dict[str, Any]:
"""Properly handle both wrapped and direct error formats"""
# Check for data wrapper format first
if "data" in parsed_content:
data = parsed_content["data"]
# Handle case where data is already parsed (dict/list)
if isinstance(data, dict) and 'error' in data:
return {
"has_error": True,
"error_message": data['error'],
"error_details": data.get('details', ''),
"format": "wrapped_dict"
}
# Handle case where data is a JSON string that needs parsing
if isinstance(data, str):
try:
error_data = json.loads(data)
if 'error' in error_data:
return {
"has_error": True,
"error_message": error_data['error'],
"error_details": error_data.get('details', ''),
"format": "wrapped_json"
}
except json.JSONDecodeError:
# Data field exists but isn't valid JSON
pass
# Check for direct error format
if 'error' in parsed_content:
return {
"has_error": True,
"error_message": parsed_content['error'],
"error_details": parsed_content.get('details', ''),
"format": "direct"
}
return {"has_error": False}
def _make_request(self, method: str, params: Dict[str, Any] = None) -> Dict[str, Any]:
"""Make a JSON-RPC request to the MCP server"""
headers = {
"Content-Type": "application/json",
"Accept": "application/json, text/event-stream",
"User-Agent": "E2E-Test-Client/1.0"
}
payload = {
"jsonrpc": "2.0",
"method": method,
"id": self.request_id
}
if params:
payload["params"] = params
try:
response = requests.post(
self.endpoint,
headers=headers,
json=payload,
timeout=30
)
self.request_id += 1
return {
"status_code": response.status_code,
"json": response.json() if response.status_code == 200 else None,
"text": response.text,
"success": response.status_code == 200
}
except requests.exceptions.RequestException as e:
return {
"status_code": 0,
"json": None,
"text": str(e),
"success": False,
"error": str(e)
}
def run_validation():
"""Run the validation tests"""
print("🚀 Starting Account Info Search Issue Validation")
print(f"Target Account ID: 414174661097171")
print("="*60)
test_instance = TestAccountInfoSearchIssue()
# Test 1: Check if get_ad_accounts can see the account
accounts_result = test_instance.test_get_ad_accounts_can_see_target_account()
# Test 2: Check if get_account_info can find the account
account_info_result = test_instance.test_get_account_info_cannot_find_target_account()
print("\n" + "="*60)
print("📊 VALIDATION SUMMARY")
print("="*60)
if accounts_result["success"]:
print(f"✅ get_ad_accounts: Found {accounts_result['total_accounts']} total accounts")
if accounts_result["target_account_found"]:
print(f"✅ get_ad_accounts: Target account 414174661097171 IS visible")
else:
print(f"❌ get_ad_accounts: Target account 414174661097171 is NOT visible")
print(f" Available account IDs: {accounts_result.get('all_account_ids', [])}")
else:
print(f"❌ get_ad_accounts: Failed - {accounts_result['error']}")
if account_info_result["success"]:
if account_info_result["has_error"]:
print(f"❌ get_account_info: Cannot find account (Error: {account_info_result['error_message']})")
else:
print(f"✅ get_account_info: Successfully found account")
else:
print(f"❌ get_account_info: Test failed - {account_info_result['error']}")
# Determine if issue is confirmed
issue_confirmed = (
accounts_result.get("success", False) and
accounts_result.get("target_account_found", False) and
account_info_result.get("success", False) and
account_info_result.get("has_error", False)
)
print("\n" + "="*60)
if issue_confirmed:
print("🐛 ISSUE CONFIRMED:")
print(" - get_ad_accounts CAN see the account")
print(" - get_account_info CANNOT find the account")
print(" - This validates the user's complaint")
else:
print("🤔 ISSUE NOT CONFIRMED:")
print(" - The behavior may be different than reported")
print(" - Check individual test results above")
print("="*60)
if __name__ == "__main__":
run_validation()
```
--------------------------------------------------------------------------------
/meta_ads_mcp/core/http_auth_integration.py:
--------------------------------------------------------------------------------
```python
"""
FastMCP HTTP Authentication Integration for Meta Ads MCP
This module provides direct integration with FastMCP to inject authentication
from HTTP headers into the tool execution context.
"""
import asyncio
import contextvars
from typing import Optional
from .utils import logger
import json
# Use context variables instead of thread-local storage for better async support
_auth_token: contextvars.ContextVar[Optional[str]] = contextvars.ContextVar('auth_token', default=None)
_pipeboard_token: contextvars.ContextVar[Optional[str]] = contextvars.ContextVar('pipeboard_token', default=None)
class FastMCPAuthIntegration:
"""Direct integration with FastMCP for HTTP authentication"""
@staticmethod
def set_auth_token(token: str) -> None:
"""Set authentication token for the current context
Args:
token: Access token to use for this request
"""
_auth_token.set(token)
@staticmethod
def get_auth_token() -> Optional[str]:
"""Get authentication token for the current context
Returns:
Access token if set, None otherwise
"""
return _auth_token.get(None)
@staticmethod
def set_pipeboard_token(token: str) -> None:
"""Set Pipeboard token for the current context
Args:
token: Pipeboard API token to use for this request
"""
_pipeboard_token.set(token)
@staticmethod
def get_pipeboard_token() -> Optional[str]:
"""Get Pipeboard token for the current context
Returns:
Pipeboard token if set, None otherwise
"""
return _pipeboard_token.get(None)
@staticmethod
def clear_auth_token() -> None:
"""Clear authentication token for the current context"""
_auth_token.set(None)
@staticmethod
def clear_pipeboard_token() -> None:
"""Clear Pipeboard token for the current context"""
_pipeboard_token.set(None)
@staticmethod
def extract_token_from_headers(headers: dict) -> Optional[str]:
"""Extract token from HTTP headers
Args:
headers: HTTP request headers
Returns:
Token if found, None otherwise
"""
# Check for Bearer token in Authorization header (primary method)
auth_header = headers.get('Authorization') or headers.get('authorization')
if auth_header and auth_header.lower().startswith('bearer '):
token = auth_header[7:].strip()
logger.debug("Found Bearer token in Authorization header")
return token
# Check for direct Meta access token
meta_token = headers.get('X-META-ACCESS-TOKEN') or headers.get('x-meta-access-token')
if meta_token:
return meta_token
# Check for Pipeboard token (legacy support, to be removed)
pipeboard_token = headers.get('X-PIPEBOARD-API-TOKEN') or headers.get('x-pipeboard-api-token')
if pipeboard_token:
logger.debug("Found Pipeboard token in legacy headers")
return pipeboard_token
return None
@staticmethod
def extract_pipeboard_token_from_headers(headers: dict) -> Optional[str]:
"""Extract Pipeboard token from HTTP headers
Args:
headers: HTTP request headers
Returns:
Pipeboard token if found, None otherwise
"""
# Check for Pipeboard token in X-Pipeboard-Token header (duplication API pattern)
pipeboard_token = headers.get('X-Pipeboard-Token') or headers.get('x-pipeboard-token')
if pipeboard_token:
logger.debug("Found Pipeboard token in X-Pipeboard-Token header")
return pipeboard_token
# Check for legacy Pipeboard token header
legacy_token = headers.get('X-PIPEBOARD-API-TOKEN') or headers.get('x-pipeboard-api-token')
if legacy_token:
logger.debug("Found Pipeboard token in legacy X-PIPEBOARD-API-TOKEN header")
return legacy_token
return None
def patch_fastmcp_server(mcp_server):
"""Patch FastMCP server to inject authentication from HTTP headers
Args:
mcp_server: FastMCP server instance to patch
"""
logger.info("Patching FastMCP server for HTTP authentication")
# Store the original run method
original_run = mcp_server.run
def patched_run(transport="stdio", **kwargs):
"""Enhanced run method that sets up HTTP auth integration"""
logger.debug(f"Starting FastMCP with transport: {transport}")
if transport == "streamable-http":
logger.debug("Setting up HTTP authentication for streamable-http transport")
setup_http_auth_patching()
# Call the original run method
return original_run(transport=transport, **kwargs)
# Replace the run method
mcp_server.run = patched_run
logger.info("FastMCP server patching complete")
def setup_http_auth_patching():
"""Setup HTTP authentication patching for auth system"""
logger.info("Setting up HTTP authentication patching")
# Import and patch the auth system
from . import auth
from . import api
from . import authentication
# Store the original function
original_get_current_access_token = auth.get_current_access_token
async def get_current_access_token_with_http_support() -> Optional[str]:
"""Enhanced get_current_access_token that checks HTTP context first"""
# Check for context-scoped token first
context_token = FastMCPAuthIntegration.get_auth_token()
if context_token:
return context_token
# Fall back to original implementation
return await original_get_current_access_token()
# Replace the function in all modules that imported it
auth.get_current_access_token = get_current_access_token_with_http_support
api.get_current_access_token = get_current_access_token_with_http_support
authentication.get_current_access_token = get_current_access_token_with_http_support
logger.info("Auth system patching complete - patched in auth, api, and authentication modules")
# Global instance for easy access
fastmcp_auth = FastMCPAuthIntegration()
# Forward declaration of setup_starlette_middleware
def setup_starlette_middleware(app):
pass
def setup_fastmcp_http_auth(mcp_server):
"""Setup HTTP authentication integration with FastMCP
Args:
mcp_server: FastMCP server instance to configure
"""
logger.info("Setting up FastMCP HTTP authentication integration")
# 1. Patch FastMCP's run method to ensure our get_current_access_token patch is applied
# This remains crucial for the token to be picked up by tool calls.
patch_fastmcp_server(mcp_server) # This patches mcp_server.run
# 2. Patch the methods that provide the Starlette app instance
# This ensures our middleware is added to the app Uvicorn will actually serve.
app_provider_methods = []
if mcp_server.settings.json_response:
if hasattr(mcp_server, "streamable_http_app") and callable(mcp_server.streamable_http_app):
app_provider_methods.append("streamable_http_app")
else:
logger.warning("mcp_server.streamable_http_app not found or not callable, cannot patch for JSON responses.")
else: # SSE
if hasattr(mcp_server, "sse_app") and callable(mcp_server.sse_app):
app_provider_methods.append("sse_app")
else:
logger.warning("mcp_server.sse_app not found or not callable, cannot patch for SSE responses.")
if not app_provider_methods:
logger.error("No suitable app provider method (streamable_http_app or sse_app) found on mcp_server. Cannot add HTTP Auth middleware.")
# Fallback or error handling might be needed here if this is critical
for method_name in app_provider_methods:
original_app_provider_method = getattr(mcp_server, method_name)
def new_patched_app_provider_method(*args, **kwargs):
# Call the original method to get/create the Starlette app
app = original_app_provider_method(*args, **kwargs)
if app:
logger.debug(f"Original {method_name} returned app: {type(app)}. Adding AuthInjectionMiddleware.")
# Now, add our middleware to this specific app instance
setup_starlette_middleware(app)
else:
logger.error(f"Original {method_name} returned None or a non-app object.")
return app
setattr(mcp_server, method_name, new_patched_app_provider_method)
logger.debug(f"Patched mcp_server.{method_name} to inject AuthInjectionMiddleware.")
# The old setup_request_middleware call is no longer needed here,
# as middleware addition is now handled by patching the app provider methods.
# try:
# setup_request_middleware(mcp_server)
# except Exception as e:
# logger.warning(f"Could not setup request middleware: {e}")
logger.info("FastMCP HTTP authentication integration setup attempt complete.")
# Remove the old setup_request_middleware function as its logic is integrated above
# def setup_request_middleware(mcp_server): ... (delete this function)
# --- AuthInjectionMiddleware definition ---
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
import json # Ensure json is imported if not already at the top
class AuthInjectionMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
logger.debug(f"HTTP Auth Middleware: Processing request to {request.url.path}")
logger.debug(f"HTTP Auth Middleware: Request headers: {list(request.headers.keys())}")
# Extract both types of tokens for dual-header authentication
auth_token = FastMCPAuthIntegration.extract_token_from_headers(dict(request.headers))
pipeboard_token = FastMCPAuthIntegration.extract_pipeboard_token_from_headers(dict(request.headers))
if auth_token:
logger.debug(f"HTTP Auth Middleware: Extracted auth token: {auth_token[:10]}...")
logger.debug("Injecting auth token into request context")
FastMCPAuthIntegration.set_auth_token(auth_token)
if pipeboard_token:
logger.debug(f"HTTP Auth Middleware: Extracted Pipeboard token: {pipeboard_token[:10]}...")
logger.debug("Injecting Pipeboard token into request context")
FastMCPAuthIntegration.set_pipeboard_token(pipeboard_token)
if not auth_token and not pipeboard_token:
logger.warning("HTTP Auth Middleware: No authentication tokens found in headers")
try:
response = await call_next(request)
return response
finally:
# Clear tokens that were set for this request
if auth_token:
FastMCPAuthIntegration.clear_auth_token()
if pipeboard_token:
FastMCPAuthIntegration.clear_pipeboard_token()
def setup_starlette_middleware(app):
"""Add AuthInjectionMiddleware to the Starlette app if not already present.
Args:
app: Starlette app instance
"""
if not app:
logger.error("Cannot setup Starlette middleware, app is None.")
return
# Check if our specific middleware class is already in the stack
already_added = False
# Starlette's app.middleware is a list of Middleware objects.
# app.user_middleware contains middleware added by app.add_middleware()
for middleware_item in app.user_middleware:
if middleware_item.cls == AuthInjectionMiddleware:
already_added = True
break
if not already_added:
try:
app.add_middleware(AuthInjectionMiddleware)
logger.info("AuthInjectionMiddleware added to Starlette app successfully.")
except Exception as e:
logger.error(f"Failed to add AuthInjectionMiddleware to Starlette app: {e}", exc_info=True)
else:
logger.debug("AuthInjectionMiddleware already present in Starlette app's middleware stack.")
```
--------------------------------------------------------------------------------
/tests/test_mobile_app_adset_issue.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
E2E Test for mobile app adset creation issue (Issue #008)
This test validates that the create_adset tool supports required parameters
for mobile app campaigns:
- promoted_object configuration
- destination_type settings
- Conversion event dataset linking
- Custom event type specification
Expected Meta API error when parameters are missing:
"Select a dataset and conversion event for your ad set (Code 100)"
Usage (Manual execution only):
1. Start the server: uv run python -m meta_ads_mcp --transport streamable-http --port 8080
2. Run test: uv run python tests/test_mobile_app_adset_issue.py
Or with pytest (explicit E2E flag required):
uv run python -m pytest tests/test_mobile_app_adset_issue.py -v -m e2e
Note: This test is marked as E2E and will NOT run automatically in CI.
It must be executed manually to validate mobile app campaign functionality.
"""
import pytest
import requests
import json
import time
import sys
import os
from typing import Dict, Any
# Add project root to path for imports
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
class MobileAppAdsetTester:
"""Test suite for mobile app adset creation functionality"""
def __init__(self, base_url: str = "http://localhost:8080"):
self.base_url = base_url.rstrip('/')
self.endpoint = f"{self.base_url}/mcp/"
self.request_id = 1
def _make_request(self, method: str, params: Dict[str, Any] = None,
headers: Dict[str, str] = None) -> Dict[str, Any]:
"""Make a JSON-RPC request to the MCP server"""
# Default headers for MCP protocol with streamable HTTP transport
default_headers = {
"Content-Type": "application/json",
"Accept": "application/json, text/event-stream",
"User-Agent": "MobileApp-Test-Client/1.0"
}
if headers:
default_headers.update(headers)
payload = {
"jsonrpc": "2.0",
"method": method,
"id": self.request_id
}
if params:
payload["params"] = params
try:
response = requests.post(
self.endpoint,
headers=default_headers,
json=payload,
timeout=30 # Increased timeout for API calls
)
self.request_id += 1
return {
"status_code": response.status_code,
"headers": dict(response.headers),
"json": response.json() if response.status_code == 200 else None,
"text": response.text,
"success": response.status_code == 200
}
except requests.exceptions.RequestException as e:
return {
"status_code": 0,
"headers": {},
"json": None,
"text": str(e),
"success": False,
"error": str(e)
}
def test_create_adset_tool_exists(self) -> Dict[str, Any]:
"""Test that create_adset tool exists and check its parameters"""
result = self._make_request("tools/list", {})
if not result["success"]:
return {"success": False, "error": "Failed to get tools list"}
tools = result["json"]["result"].get("tools", [])
create_adset_tool = next((tool for tool in tools if tool["name"] == "create_adset"), None)
if not create_adset_tool:
return {"success": False, "error": "create_adset tool not found"}
# Check if mobile app specific parameters are supported
input_schema = create_adset_tool.get("inputSchema", {})
properties = input_schema.get("properties", {})
mobile_app_params = ["promoted_object", "destination_type"]
missing_params = []
for param in mobile_app_params:
if param not in properties:
missing_params.append(param)
return {
"success": True,
"tool": create_adset_tool,
"missing_mobile_app_params": missing_params,
"has_mobile_app_support": len(missing_params) == 0
}
def test_reproduce_mobile_app_error(self) -> Dict[str, Any]:
"""Reproduce mobile app adset creation error scenario"""
# Test parameters for mobile app campaign
test_params = {
"name": "create_adset",
"arguments": {
"account_id": "act_123456789012345", # Generic test account
"campaign_id": "120230566078340163", # This will likely be invalid but that's OK for testing
"name": "test mobile app ad set",
"status": "PAUSED",
"targeting": {
"age_max": 65,
"age_min": 18,
"app_install_state": "not_installed",
"geo_locations": {
"countries": ["DE"],
"location_types": ["home", "recent"]
},
"user_device": ["Android_Smartphone", "Android_Tablet"],
"user_os": ["Android"],
"brand_safety_content_filter_levels": ["FACEBOOK_STANDARD", "AN_STANDARD"],
"targeting_automation": {"advantage_audience": 1}
},
"optimization_goal": "APP_INSTALLS",
"billing_event": "IMPRESSIONS"
}
}
result = self._make_request("tools/call", test_params)
if not result["success"]:
return {
"success": False,
"error": f"MCP call failed: {result.get('text', 'Unknown error')}"
}
# Parse the response
response_data = result["json"]["result"]
content = response_data.get("content", [{}])[0].get("text", "")
try:
parsed_content = json.loads(content)
# Check if this is an error response
if "error" in parsed_content:
error_details = parsed_content["error"]
if isinstance(error_details, dict) and "details" in error_details:
meta_error = error_details["details"]
# Check for the specific error we're looking for
if isinstance(meta_error, dict) and "error" in meta_error:
error_code = meta_error["error"].get("code")
error_message = meta_error["error"].get("error_user_msg", "")
is_dataset_error = (
error_code == 100 and
"conversion event" in error_message.lower()
)
return {
"success": True,
"reproduced_error": is_dataset_error,
"error_code": error_code,
"error_message": error_message,
"full_response": parsed_content
}
return {
"success": True,
"reproduced_error": False,
"unexpected_response": parsed_content
}
except json.JSONDecodeError as e:
return {
"success": False,
"error": f"Failed to parse response: {e}",
"raw_content": content
}
# Pytest E2E test class - marked to prevent automatic execution
@pytest.mark.e2e
@pytest.mark.skip(reason="E2E test - requires running MCP server - execute manually only")
class TestMobileAppAdsetIssueE2E:
"""E2E test for mobile app adset creation functionality (Issue #008)"""
def setup_method(self):
"""Set up test instance"""
self.tester = MobileAppAdsetTester()
def test_create_adset_tool_has_mobile_app_params(self):
"""Test that create_adset tool exists and has mobile app parameters"""
result = self.tester.test_create_adset_tool_exists()
assert result["success"], f"Tool test failed: {result.get('error', 'Unknown error')}"
missing_params = result["missing_mobile_app_params"]
has_mobile_support = result["has_mobile_app_support"]
# Report results but don't fail if parameters are missing (this is what we're testing)
if missing_params:
pytest.skip(f"Missing mobile app parameters: {missing_params}")
else:
# Parameters are present - mobile app support is available
assert has_mobile_support, "Tool should have mobile app support when parameters are present"
def test_reproduce_mobile_app_error_scenario(self):
"""Test reproducing mobile app adset creation error scenario"""
result = self.tester.test_reproduce_mobile_app_error()
assert result["success"], f"Error reproduction test failed: {result.get('error', 'Unknown error')}"
# This test is mainly for validation, not assertion
# The actual error depends on authentication and server state
if result.get("reproduced_error"):
print(f"Reproduced error - Code: {result.get('error_code')}, Message: {result.get('error_message')}")
else:
print("Different response received (may indicate parameters are working or auth issues)")
def main():
"""Run mobile app adset creation tests (manual execution)"""
print("🚀 Mobile App Adset Creation E2E Test")
print("=" * 50)
print("⚠️ This is an E2E test - requires MCP server running on localhost:8080")
print(" Start server with: uv run python -m meta_ads_mcp --transport streamable-http --port 8080")
print()
tester = MobileAppAdsetTester()
# Test 1: Check if create_adset tool exists and has mobile app parameters
print("\n🧪 Test 1: Checking create_adset tool parameters...")
tool_test = tester.test_create_adset_tool_exists()
if tool_test["success"]:
missing_params = tool_test["missing_mobile_app_params"]
if missing_params:
print(f"❌ Missing mobile app parameters: {missing_params}")
print("⚠️ Mobile app campaigns may not work without these parameters")
else:
print("✅ All mobile app parameters are present")
else:
print(f"❌ Tool test failed: {tool_test['error']}")
# Test 2: Try to reproduce mobile app error scenario
print("\n🧪 Test 2: Testing mobile app campaign creation...")
error_test = tester.test_reproduce_mobile_app_error()
if error_test["success"]:
if error_test.get("reproduced_error"):
print("✅ Successfully reproduced the error!")
print(f" Error Code: {error_test['error_code']}")
print(f" Error Message: {error_test['error_message']}")
else:
print("⚠️ Error not reproduced - different response received")
if "unexpected_response" in error_test:
print(f" Response: {json.dumps(error_test['unexpected_response'], indent=2)}")
else:
print(f"❌ Error reproduction test failed: {error_test['error']}")
# Summary
print("\n🏁 TEST SUMMARY")
print("=" * 30)
if tool_test["success"]:
missing_params = tool_test["missing_mobile_app_params"]
issue_confirmed = len(missing_params) > 0
fix_validated = len(missing_params) == 0
if fix_validated:
print("✅ MOBILE APP SUPPORT VALIDATED")
print(" All required mobile app parameters are present")
print(" Mobile app campaigns should work correctly!")
elif issue_confirmed:
print("❌ MOBILE APP SUPPORT INCOMPLETE")
print(f" Missing parameters: {missing_params}")
print(" Mobile app campaigns may fail without these parameters")
else:
print("❓ STATUS UNCLEAR")
print(" Could not determine mobile app parameter status")
else:
print("❌ TEST FAILED")
print(" Could not connect to MCP server or validate tools")
return 0
if __name__ == "__main__":
sys.exit(main())
```
--------------------------------------------------------------------------------
/tests/test_http_transport.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
HTTP Transport Integration Tests for Meta Ads MCP
This test suite validates the complete HTTP transport functionality including:
- MCP protocol compliance (initialize, tools/list, tools/call)
- Authentication header processing
- JSON-RPC request/response format
- Error handling and validation
Usage:
1. Start the server: python -m meta_ads_mcp --transport streamable-http --port 8080
2. Run tests: python -m pytest tests/test_http_transport.py -v
Or run directly:
python tests/test_http_transport.py
"""
import requests
import json
import time
import sys
import os
from typing import Dict, Any, Optional
# Add project root to path for imports
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
class HTTPTransportTester:
"""Test suite for Meta Ads MCP HTTP transport"""
def __init__(self, base_url: str = "http://localhost:8080"):
self.base_url = base_url.rstrip('/')
self.endpoint = f"{self.base_url}/mcp/"
self.request_id = 1
def _make_request(self, method: str, params: Dict[str, Any] = None,
headers: Dict[str, str] = None) -> Dict[str, Any]:
"""Make a JSON-RPC request to the MCP server"""
# Default headers for MCP protocol
default_headers = {
"Content-Type": "application/json",
"Accept": "application/json, text/event-stream",
"User-Agent": "MCP-Test-Client/1.0"
}
if headers:
default_headers.update(headers)
payload = {
"jsonrpc": "2.0",
"method": method,
"id": self.request_id
}
if params:
payload["params"] = params
try:
response = requests.post(
self.endpoint,
headers=default_headers,
json=payload,
timeout=10
)
self.request_id += 1
return {
"status_code": response.status_code,
"headers": dict(response.headers),
"json": response.json() if response.status_code == 200 else None,
"text": response.text,
"success": response.status_code == 200
}
except requests.exceptions.RequestException as e:
return {
"status_code": 0,
"headers": {},
"json": None,
"text": str(e),
"success": False,
"error": str(e)
}
def test_server_availability(self) -> bool:
"""Test if the server is running and accessible"""
try:
response = requests.get(f"{self.base_url}/", timeout=5)
# We expect a 404 for the root path, but it means the server is running
return response.status_code in [200, 404]
except:
return False
def test_mcp_initialize(self, auth_headers: Dict[str, str] = None) -> Dict[str, Any]:
"""Test MCP initialize method"""
return self._make_request("initialize", {
"protocolVersion": "2024-11-05",
"capabilities": {
"roots": {"listChanged": True},
"sampling": {}
},
"clientInfo": {
"name": "meta-ads-test-client",
"version": "1.0.0"
}
}, auth_headers)
def test_tools_list(self, auth_headers: Dict[str, str] = None) -> Dict[str, Any]:
"""Test tools/list method"""
return self._make_request("tools/list", {}, auth_headers)
def test_tool_call(self, tool_name: str, arguments: Dict[str, Any] = None,
auth_headers: Dict[str, str] = None) -> Dict[str, Any]:
"""Test tools/call method"""
params = {"name": tool_name}
if arguments:
params["arguments"] = arguments
return self._make_request("tools/call", params, auth_headers)
def run_protocol_flow_test(self, auth_headers: Dict[str, str] = None,
scenario_name: str = "Default") -> Dict[str, bool]:
"""Run complete MCP protocol flow test"""
results = {}
print(f"\n🧪 Testing: {scenario_name}")
print("="*50)
# Test 1: Initialize
print("🔍 Testing MCP Initialize Request")
init_result = self.test_mcp_initialize(auth_headers)
results["initialize"] = init_result["success"]
if not init_result["success"]:
print(f"❌ Initialize failed: {init_result.get('text', 'Unknown error')}")
return results
print("✅ Initialize successful")
if init_result["json"] and "result" in init_result["json"]:
server_info = init_result["json"]["result"].get("serverInfo", {})
print(f" Server: {server_info.get('name', 'unknown')} v{server_info.get('version', 'unknown')}")
# Test 2: Tools List
print("\n🔍 Testing Tools List Request")
tools_result = self.test_tools_list(auth_headers)
results["tools_list"] = tools_result["success"]
if not tools_result["success"]:
print(f"❌ Tools list failed: {tools_result.get('text', 'Unknown error')}")
return results
print("✅ Tools list successful")
if tools_result["json"] and "result" in tools_result["json"]:
tools = tools_result["json"]["result"].get("tools", [])
print(f" Found {len(tools)} tools")
# Test 3: Tool Call
print("\n🔍 Testing Tool Call: get_ad_accounts")
tool_result = self.test_tool_call("get_ad_accounts", {"limit": 3}, auth_headers)
results["tool_call"] = tool_result["success"]
if not tool_result["success"]:
print(f"❌ Tool call failed: {tool_result.get('text', 'Unknown error')}")
return results
print("✅ Tool call successful")
# Check if it's an authentication error (expected with test tokens)
if tool_result["json"] and "result" in tool_result["json"]:
content = tool_result["json"]["result"].get("content", [{}])[0].get("text", "")
if "Authentication Required" in content:
print(" 📋 Result: Authentication required (expected with test tokens)")
else:
print(f" 📋 Result: {content[:100]}...")
print(f"\n📊 Scenario Results:")
print(f" Initialize: {'✅' if results['initialize'] else '❌'}")
print(f" Tools List: {'✅' if results['tools_list'] else '❌'}")
print(f" Tool Call: {'✅' if results['tool_call'] else '❌'}")
return results
def run_comprehensive_test_suite(self) -> bool:
"""Run complete test suite with multiple authentication scenarios"""
print("🚀 Meta Ads MCP HTTP Transport Test Suite")
print("="*60)
# Check server availability first
print("🔍 Checking server status...")
if not self.test_server_availability():
print("❌ Server is not running at", self.base_url)
print(" Please start the server with:")
print(" python -m meta_ads_mcp --transport streamable-http --port 8080 --host localhost")
return False
print("✅ Server is running")
all_results = {}
# Test scenarios
scenarios = [
{
"name": "No Authentication",
"headers": None
},
{
"name": "Bearer Token (Primary Path)",
"headers": {"Authorization": "Bearer test_pipeboard_token_12345"}
},
{
"name": "Custom Meta App ID (Fallback Path)",
"headers": {"X-META-APP-ID": "123456789012345"}
},
{
"name": "Both Auth Methods",
"headers": {
"Authorization": "Bearer test_pipeboard_token_12345",
"X-META-APP-ID": "123456789012345"
}
}
]
# Run tests for each scenario
for scenario in scenarios:
results = self.run_protocol_flow_test(
auth_headers=scenario["headers"],
scenario_name=scenario["name"]
)
all_results[scenario["name"]] = results
# Run specific get_ads filtering tests
print("\n🧪 Testing get_ads filtering functionality")
print("="*50)
ads_filter_results = self.test_get_ads_filtering()
all_results["get_ads_filtering"] = ads_filter_results
# Summary
print("\n🏁 TEST SUITE COMPLETED")
print("="*30)
all_passed = True
for scenario_name, results in all_results.items():
if isinstance(results, dict):
scenario_success = all(results.values())
else:
scenario_success = results
status = "✅ SUCCESS" if scenario_success else "❌ FAILED"
print(f"{scenario_name}: {status}")
if not scenario_success:
all_passed = False
print(f"\n📊 Overall Result: {'✅ ALL TESTS PASSED' if all_passed else '❌ SOME TESTS FAILED'}")
if all_passed:
print("\n🎉 Meta Ads MCP HTTP transport is fully functional!")
print(" • MCP protocol compliance: Complete")
print(" • Authentication integration: Working")
print(" • All tools accessible via HTTP")
print(" • get_ads filtering: Working correctly")
print(" • Ready for production use")
return all_passed
def test_get_ads_filtering(self) -> Dict[str, bool]:
"""Test get_ads function with different filtering parameters"""
results = {}
# Test with basic auth headers for these tests
auth_headers = {"Authorization": "Bearer test_pipeboard_token_12345"}
# Test 1: get_ads without filters (should use account endpoint)
print("🔍 Testing get_ads without filters")
result1 = self.test_tool_call("get_ads", {
"account_id": "act_123456789",
"limit": 5
}, auth_headers)
results["no_filters"] = result1["success"]
if result1["success"]:
print("✅ get_ads without filters successful")
else:
print(f"❌ get_ads without filters failed: {result1.get('text', 'Unknown error')}")
# Test 2: get_ads with campaign_id filter (should use campaign endpoint)
print("🔍 Testing get_ads with campaign_id filter")
result2 = self.test_tool_call("get_ads", {
"account_id": "act_123456789",
"campaign_id": "123456789012345",
"limit": 5
}, auth_headers)
results["campaign_filter"] = result2["success"]
if result2["success"]:
print("✅ get_ads with campaign_id filter successful")
else:
print(f"❌ get_ads with campaign_id filter failed: {result2.get('text', 'Unknown error')}")
# Test 3: get_ads with adset_id filter (should use adset endpoint)
print("🔍 Testing get_ads with adset_id filter")
result3 = self.test_tool_call("get_ads", {
"account_id": "act_123456789",
"adset_id": "120228975637820183",
"limit": 5
}, auth_headers)
results["adset_filter"] = result3["success"]
if result3["success"]:
print("✅ get_ads with adset_id filter successful")
else:
print(f"❌ get_ads with adset_id filter failed: {result3.get('text', 'Unknown error')}")
# Test 4: get_ads with both campaign_id and adset_id (adset_id should take priority)
print("🔍 Testing get_ads with both campaign_id and adset_id (adset_id priority)")
result4 = self.test_tool_call("get_ads", {
"account_id": "act_123456789",
"campaign_id": "123456789012345",
"adset_id": "120228975637820183",
"limit": 5
}, auth_headers)
results["priority_test"] = result4["success"]
if result4["success"]:
print("✅ get_ads priority test successful")
else:
print(f"❌ get_ads priority test failed: {result4.get('text', 'Unknown error')}")
return results
def main():
"""Main test execution"""
tester = HTTPTransportTester()
success = tester.run_comprehensive_test_suite()
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()
```
--------------------------------------------------------------------------------
/tests/test_update_ad_creative_id.py:
--------------------------------------------------------------------------------
```python
"""Tests for update_ad function with creative_id parameter.
Tests for the enhanced update_ad function that supports:
- Updating ad creative via creative_id parameter
- Combining creative updates with other parameters (status, bid_amount, tracking_specs)
- Error handling for invalid creative_id values
- Validation of creative permissions and compatibility
"""
import pytest
import json
from unittest.mock import AsyncMock, patch
from meta_ads_mcp.core.ads import update_ad
@pytest.mark.asyncio
class TestUpdateAdCreativeId:
"""Test cases for update_ad function with creative_id parameter."""
async def test_update_ad_creative_id_success(self):
"""Test successfully updating ad with new creative_id."""
sample_response = {
"success": True,
"id": "test_ad_123"
}
with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
mock_api.return_value = sample_response
result = await update_ad(
ad_id="test_ad_123",
creative_id="new_creative_456",
access_token="test_token"
)
result_data = json.loads(result)
assert "success" in result_data
assert result_data["success"] is True
# Verify API was called with correct parameters
mock_api.assert_called_once()
call_args = mock_api.call_args
# Check endpoint
assert call_args[0][0] == "test_ad_123"
# Check parameters
params = call_args[0][2] # Third argument is params
assert "creative" in params
creative_data = json.loads(params["creative"])
assert creative_data["creative_id"] == "new_creative_456"
async def test_update_ad_creative_id_with_other_params(self):
"""Test updating creative_id along with status and bid_amount."""
sample_response = {
"success": True,
"id": "test_ad_123"
}
with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
mock_api.return_value = sample_response
result = await update_ad(
ad_id="test_ad_123",
creative_id="new_creative_456",
status="ACTIVE",
bid_amount=150,
access_token="test_token"
)
result_data = json.loads(result)
assert "success" in result_data
assert result_data["success"] is True
# Verify API was called with all parameters
mock_api.assert_called_once()
call_args = mock_api.call_args
params = call_args[0][2]
assert "creative" in params
assert "status" in params
assert "bid_amount" in params
creative_data = json.loads(params["creative"])
assert creative_data["creative_id"] == "new_creative_456"
assert params["status"] == "ACTIVE"
assert params["bid_amount"] == "150"
async def test_update_ad_creative_id_with_tracking_specs(self):
"""Test updating creative_id along with tracking_specs."""
sample_response = {
"success": True,
"id": "test_ad_123"
}
tracking_specs = [{"action.type": "offsite_conversion", "fb_pixel": ["123456"]}]
with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
mock_api.return_value = sample_response
result = await update_ad(
ad_id="test_ad_123",
creative_id="new_creative_456",
tracking_specs=tracking_specs,
access_token="test_token"
)
result_data = json.loads(result)
assert "success" in result_data
assert result_data["success"] is True
# Verify API was called with all parameters
mock_api.assert_called_once()
call_args = mock_api.call_args
params = call_args[0][2]
assert "creative" in params
assert "tracking_specs" in params
creative_data = json.loads(params["creative"])
assert creative_data["creative_id"] == "new_creative_456"
# tracking_specs should be JSON encoded
tracking_data = json.loads(params["tracking_specs"])
assert tracking_data == tracking_specs
async def test_update_ad_invalid_creative_id(self):
"""Test updating ad with invalid creative_id."""
# Simulate API error for invalid creative_id
api_error = {
"error": {
"message": "Invalid creative ID",
"type": "OAuthException",
"code": 100
}
}
with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
mock_api.return_value = api_error
result = await update_ad(
ad_id="test_ad_123",
creative_id="invalid_creative_999",
access_token="test_token"
)
result_data = json.loads(result)
# The error might be wrapped in a 'data' field per the memory
if "data" in result_data:
error_data = json.loads(result_data["data"])
assert "error" in error_data
else:
assert "error" in result_data
async def test_update_ad_nonexistent_creative_id(self):
"""Test updating ad with non-existent creative_id."""
# Simulate API error for non-existent creative_id
api_error = {
"error": {
"message": "Creative does not exist",
"type": "OAuthException",
"code": 803
}
}
with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
mock_api.return_value = api_error
result = await update_ad(
ad_id="test_ad_123",
creative_id="nonexistent_creative_999",
access_token="test_token"
)
result_data = json.loads(result)
# The error might be wrapped in a 'data' field per the memory
if "data" in result_data:
error_data = json.loads(result_data["data"])
assert "error" in error_data
else:
assert "error" in result_data
async def test_update_ad_creative_id_from_different_account(self):
"""Test updating ad with creative_id from different account."""
# Simulate API error for cross-account creative access
api_error = {
"error": {
"message": "Creative does not belong to this account",
"type": "OAuthException",
"code": 200
}
}
with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
mock_api.return_value = api_error
result = await update_ad(
ad_id="test_ad_123",
creative_id="other_account_creative_456",
access_token="test_token"
)
result_data = json.loads(result)
# The error might be wrapped in a 'data' field per the memory
if "data" in result_data:
error_data = json.loads(result_data["data"])
assert "error" in error_data
else:
assert "error" in result_data
async def test_update_ad_no_parameters(self):
"""Test update_ad with no parameters provided."""
result = await update_ad(
ad_id="test_ad_123",
access_token="test_token"
)
result_data = json.loads(result)
# The error might be wrapped in a 'data' field
if "data" in result_data:
error_data = json.loads(result_data["data"])
assert "error" in error_data
assert "No update parameters provided" in error_data["error"]
else:
assert "error" in result_data
assert "No update parameters provided" in result_data["error"]
async def test_update_ad_missing_ad_id(self):
"""Test update_ad with missing ad_id."""
result = await update_ad(
ad_id="",
creative_id="new_creative_456",
access_token="test_token"
)
result_data = json.loads(result)
# The error might be wrapped in a 'data' field
if "data" in result_data:
error_data = json.loads(result_data["data"])
assert "error" in error_data
assert "Ad ID is required" in error_data["error"]
else:
assert "error" in result_data
assert "Ad ID is required" in result_data["error"]
async def test_update_ad_creative_id_only(self):
"""Test updating only the creative_id without other parameters."""
sample_response = {
"success": True,
"id": "test_ad_123"
}
with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
mock_api.return_value = sample_response
result = await update_ad(
ad_id="test_ad_123",
creative_id="new_creative_456",
access_token="test_token"
)
result_data = json.loads(result)
assert "success" in result_data
assert result_data["success"] is True
# Verify API was called with only creative parameter
mock_api.assert_called_once()
call_args = mock_api.call_args
params = call_args[0][2]
# Should only have creative parameter, no status or bid_amount
assert "creative" in params
assert "status" not in params
assert "bid_amount" not in params
assert "tracking_specs" not in params
creative_data = json.loads(params["creative"])
assert creative_data["creative_id"] == "new_creative_456"
async def test_update_ad_creative_compatibility_validation(self):
"""Test creative compatibility validation with different creative types."""
# This test simulates an API error for incompatible creative types
api_error = {
"error": {
"message": "Creative format not compatible with ad set placement",
"type": "OAuthException",
"code": 1487
}
}
with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
mock_api.return_value = api_error
result = await update_ad(
ad_id="test_ad_123",
creative_id="incompatible_creative_456",
access_token="test_token"
)
result_data = json.loads(result)
# The error might be wrapped in a 'data' field per the memory
if "data" in result_data:
error_data = json.loads(result_data["data"])
assert "error" in error_data
else:
assert "error" in result_data
async def test_update_ad_api_error_handling(self):
"""Test general API error handling for update_ad."""
with patch('meta_ads_mcp.core.ads.make_api_request', new_callable=AsyncMock) as mock_api:
# Simulate API request throwing an exception
mock_api.side_effect = Exception("API connection failed")
result = await update_ad(
ad_id="test_ad_123",
creative_id="new_creative_456",
access_token="test_token"
)
result_data = json.loads(result)
# The error might be wrapped in a 'data' field
if "data" in result_data:
error_data = json.loads(result_data["data"])
assert "error" in error_data
assert "API connection failed" in error_data["error"]
else:
assert "error" in result_data
assert "API connection failed" in result_data["error"]
```
--------------------------------------------------------------------------------
/meta_ads_mcp/core/campaigns.py:
--------------------------------------------------------------------------------
```python
"""Campaign-related functionality for Meta Ads API."""
import json
from typing import List, Optional, Dict, Any, Union
from .api import meta_api_tool, make_api_request
from .accounts import get_ad_accounts
from .server import mcp_server
@mcp_server.tool()
@meta_api_tool
async def get_campaigns(account_id: str, access_token: Optional[str] = None, limit: int = 10, status_filter: str = "", after: str = "") -> str:
"""
Get campaigns for a Meta Ads account with optional filtering.
Note: By default, the Meta API returns a subset of available fields.
Other fields like 'effective_status', 'special_ad_categories',
'lifetime_budget', 'spend_cap', 'budget_remaining', 'promoted_object',
'source_campaign_id', etc., might be available but require specifying them
in the API call (currently not exposed by this tool's parameters).
Args:
account_id: Meta Ads account ID (format: act_XXXXXXXXX)
access_token: Meta API access token (optional - will use cached token if not provided)
limit: Maximum number of campaigns to return (default: 10)
status_filter: Filter by effective status (e.g., 'ACTIVE', 'PAUSED', 'ARCHIVED').
Maps to the 'effective_status' API parameter, which expects an array
(this function handles the required JSON formatting). Leave empty for all statuses.
after: Pagination cursor to get the next set of results
"""
# Require explicit account_id
if not account_id:
return json.dumps({"error": "No account ID specified"}, indent=2)
endpoint = f"{account_id}/campaigns"
params = {
"fields": "id,name,objective,status,daily_budget,lifetime_budget,buying_type,start_time,stop_time,created_time,updated_time,bid_strategy",
"limit": limit
}
if status_filter:
# API expects an array, encode it as a JSON string
params["effective_status"] = json.dumps([status_filter])
if after:
params["after"] = after
data = await make_api_request(endpoint, access_token, params)
return json.dumps(data, indent=2)
@mcp_server.tool()
@meta_api_tool
async def get_campaign_details(campaign_id: str, access_token: Optional[str] = None) -> str:
"""
Get detailed information about a specific campaign.
Note: This function requests a specific set of fields ('id,name,objective,status,...').
The Meta API offers many other fields for campaigns (e.g., 'effective_status', 'source_campaign_id', etc.)
that could be added to the 'fields' parameter in the code if needed.
Args:
campaign_id: Meta Ads campaign ID
access_token: Meta API access token (optional - will use cached token if not provided)
"""
if not campaign_id:
return json.dumps({"error": "No campaign ID provided"}, indent=2)
endpoint = f"{campaign_id}"
params = {
"fields": "id,name,objective,status,daily_budget,lifetime_budget,buying_type,start_time,stop_time,created_time,updated_time,bid_strategy,special_ad_categories,special_ad_category_country,budget_remaining,configured_status"
}
data = await make_api_request(endpoint, access_token, params)
return json.dumps(data, indent=2)
@mcp_server.tool()
@meta_api_tool
async def create_campaign(
account_id: str,
name: str,
objective: str,
access_token: Optional[str] = None,
status: str = "PAUSED",
special_ad_categories: Optional[List[str]] = None,
daily_budget: Optional[int] = None,
lifetime_budget: Optional[int] = None,
buying_type: Optional[str] = None,
bid_strategy: Optional[str] = None,
bid_cap: Optional[int] = None,
spend_cap: Optional[int] = None,
campaign_budget_optimization: Optional[bool] = None,
ab_test_control_setups: Optional[List[Dict[str, Any]]] = None,
use_adset_level_budgets: bool = False
) -> str:
"""
Create a new campaign in a Meta Ads account.
Args:
account_id: Meta Ads account ID (format: act_XXXXXXXXX)
name: Campaign name
objective: Campaign objective (ODAX, outcome-based). Must be one of:
OUTCOME_AWARENESS, OUTCOME_TRAFFIC, OUTCOME_ENGAGEMENT,
OUTCOME_LEADS, OUTCOME_SALES, OUTCOME_APP_PROMOTION.
Note: Legacy objectives like BRAND_AWARENESS, LINK_CLICKS,
CONVERSIONS, APP_INSTALLS, etc. are not valid for new
campaigns and will cause a 400 error. Use the outcome-based
values above (e.g., BRAND_AWARENESS → OUTCOME_AWARENESS).
access_token: Meta API access token (optional - will use cached token if not provided)
status: Initial campaign status (default: PAUSED)
special_ad_categories: List of special ad categories if applicable
daily_budget: Daily budget in account currency (in cents) as a string (only used if use_adset_level_budgets=False)
lifetime_budget: Lifetime budget in account currency (in cents) as a string (only used if use_adset_level_budgets=False)
buying_type: Buying type (e.g., 'AUCTION')
bid_strategy: Bid strategy. Must be one of: 'LOWEST_COST_WITHOUT_CAP', 'LOWEST_COST_WITH_BID_CAP', 'COST_CAP', 'LOWEST_COST_WITH_MIN_ROAS'.
bid_cap: Bid cap in account currency (in cents) as a string
spend_cap: Spending limit for the campaign in account currency (in cents) as a string
campaign_budget_optimization: Whether to enable campaign budget optimization (only used if use_adset_level_budgets=False)
ab_test_control_setups: Settings for A/B testing (e.g., [{"name":"Creative A", "ad_format":"SINGLE_IMAGE"}])
use_adset_level_budgets: If True, budgets will be set at the ad set level instead of campaign level (default: False)
"""
# Check required parameters
if not account_id:
return json.dumps({"error": "No account ID provided"}, indent=2)
if not name:
return json.dumps({"error": "No campaign name provided"}, indent=2)
if not objective:
return json.dumps({"error": "No campaign objective provided"}, indent=2)
# Special_ad_categories is required by the API, set default if not provided
if special_ad_categories is None:
special_ad_categories = []
# For this example, we'll add a fixed daily budget if none is provided and we're not using ad set level budgets
if not daily_budget and not lifetime_budget and not use_adset_level_budgets:
daily_budget = "1000" # Default to $10 USD
endpoint = f"{account_id}/campaigns"
params = {
"name": name,
"objective": objective,
"status": status,
"special_ad_categories": json.dumps(special_ad_categories) # Properly format as JSON string
}
# Only set campaign-level budgets if we're not using ad set level budgets
if not use_adset_level_budgets:
# Convert budget values to strings if they aren't already
if daily_budget is not None:
params["daily_budget"] = str(daily_budget)
if lifetime_budget is not None:
params["lifetime_budget"] = str(lifetime_budget)
if campaign_budget_optimization is not None:
params["campaign_budget_optimization"] = "true" if campaign_budget_optimization else "false"
# Add new parameters
if buying_type:
params["buying_type"] = buying_type
if bid_strategy:
params["bid_strategy"] = bid_strategy
if bid_cap is not None:
params["bid_cap"] = str(bid_cap)
if spend_cap is not None:
params["spend_cap"] = str(spend_cap)
if ab_test_control_setups:
params["ab_test_control_setups"] = json.dumps(ab_test_control_setups)
try:
data = await make_api_request(endpoint, access_token, params, method="POST")
# Add a note about budget strategy if using ad set level budgets
if use_adset_level_budgets:
data["budget_strategy"] = "ad_set_level"
data["note"] = "Campaign created with ad set level budgets. Set budgets when creating ad sets within this campaign."
return json.dumps(data, indent=2)
except Exception as e:
error_msg = str(e)
return json.dumps({
"error": "Failed to create campaign",
"details": error_msg,
"params_sent": params
}, indent=2)
@mcp_server.tool()
@meta_api_tool
async def update_campaign(
campaign_id: str,
access_token: Optional[str] = None,
name: Optional[str] = None,
status: Optional[str] = None,
special_ad_categories: Optional[List[str]] = None,
daily_budget: Optional[int] = None,
lifetime_budget: Optional[int] = None,
bid_strategy: Optional[str] = None,
bid_cap: Optional[int] = None,
spend_cap: Optional[int] = None,
campaign_budget_optimization: Optional[bool] = None,
objective: Optional[str] = None, # Add objective if it's updatable
use_adset_level_budgets: Optional[bool] = None, # Add other updatable fields as needed based on API docs
) -> str:
"""
Update an existing campaign in a Meta Ads account.
Args:
campaign_id: Meta Ads campaign ID
access_token: Meta API access token (optional - will use cached token if not provided)
name: New campaign name
status: New campaign status (e.g., 'ACTIVE', 'PAUSED')
special_ad_categories: List of special ad categories if applicable
daily_budget: New daily budget in account currency (in cents) as a string.
Set to empty string "" to remove the daily budget.
lifetime_budget: New lifetime budget in account currency (in cents) as a string.
Set to empty string "" to remove the lifetime budget.
bid_strategy: New bid strategy
bid_cap: New bid cap in account currency (in cents) as a string
spend_cap: New spending limit for the campaign in account currency (in cents) as a string
campaign_budget_optimization: Enable/disable campaign budget optimization
objective: New campaign objective (Note: May not always be updatable)
use_adset_level_budgets: If True, removes campaign-level budgets to switch to ad set level budgets
"""
if not campaign_id:
return json.dumps({"error": "No campaign ID provided"}, indent=2)
endpoint = f"{campaign_id}"
params = {}
# Add parameters to the request only if they are provided
if name is not None:
params["name"] = name
if status is not None:
params["status"] = status
if special_ad_categories is not None:
# Note: Updating special_ad_categories might have specific API rules or might not be allowed after creation.
# The API might require an empty list `[]` to clear categories. Check Meta Docs.
params["special_ad_categories"] = json.dumps(special_ad_categories)
# Handle budget parameters based on use_adset_level_budgets setting
if use_adset_level_budgets is not None:
if use_adset_level_budgets:
# Remove campaign-level budgets when switching to ad set level budgets
params["daily_budget"] = ""
params["lifetime_budget"] = ""
if campaign_budget_optimization is not None:
params["campaign_budget_optimization"] = "false"
else:
# If switching back to campaign-level budgets, use the provided budget values
if daily_budget is not None:
if daily_budget == "":
params["daily_budget"] = ""
else:
params["daily_budget"] = str(daily_budget)
if lifetime_budget is not None:
if lifetime_budget == "":
params["lifetime_budget"] = ""
else:
params["lifetime_budget"] = str(lifetime_budget)
if campaign_budget_optimization is not None:
params["campaign_budget_optimization"] = "true" if campaign_budget_optimization else "false"
else:
# Normal budget updates when not changing budget strategy
if daily_budget is not None:
# To remove budget, set to empty string
if daily_budget == "":
params["daily_budget"] = ""
else:
params["daily_budget"] = str(daily_budget)
if lifetime_budget is not None:
# To remove budget, set to empty string
if lifetime_budget == "":
params["lifetime_budget"] = ""
else:
params["lifetime_budget"] = str(lifetime_budget)
if campaign_budget_optimization is not None:
params["campaign_budget_optimization"] = "true" if campaign_budget_optimization else "false"
if bid_strategy is not None:
params["bid_strategy"] = bid_strategy
if bid_cap is not None:
params["bid_cap"] = str(bid_cap)
if spend_cap is not None:
params["spend_cap"] = str(spend_cap)
if objective is not None:
params["objective"] = objective # Caution: Objective changes might reset learning or be restricted
if not params:
return json.dumps({"error": "No update parameters provided"}, indent=2)
try:
# Use POST method for updates as per Meta API documentation
data = await make_api_request(endpoint, access_token, params, method="POST")
# Add a note about budget strategy if switching to ad set level budgets
if use_adset_level_budgets is not None and use_adset_level_budgets:
data["budget_strategy"] = "ad_set_level"
data["note"] = "Campaign updated to use ad set level budgets. Set budgets when creating ad sets within this campaign."
return json.dumps(data, indent=2)
except Exception as e:
error_msg = str(e)
# Include campaign_id in error for better context
return json.dumps({
"error": f"Failed to update campaign {campaign_id}",
"details": error_msg,
"params_sent": params # Be careful about logging sensitive data if any
}, indent=2)
```
--------------------------------------------------------------------------------
/tests/test_page_discovery.py:
--------------------------------------------------------------------------------
```python
"""
Test page discovery functionality for Meta Ads MCP.
"""
import pytest
import json
from unittest.mock import AsyncMock, patch
from meta_ads_mcp.core.ads import _discover_pages_for_account, search_pages_by_name, _search_pages_by_name_core
class TestPageDiscovery:
"""Test page discovery functionality."""
@pytest.mark.asyncio
async def test_discover_pages_from_tracking_specs(self):
"""Test page discovery from tracking specs (most reliable method)."""
mock_ads_data = {
"data": [
{
"id": "123456789",
"tracking_specs": [
{
"page": ["987654321", "111222333"]
}
]
}
]
}
mock_page_data = {
"id": "987654321",
"name": "Test Page",
"username": "testpage",
"category": "Test Category"
}
with patch('meta_ads_mcp.core.ads.make_api_request') as mock_api:
# Mock the ads endpoint call
mock_api.side_effect = [
mock_ads_data, # First call for ads
mock_page_data # Second call for page details
]
result = await _discover_pages_for_account("act_123456789", "test_token")
assert result["success"] is True
# Check that we got one of the expected page IDs (set order is not guaranteed)
assert result["page_id"] in ["987654321", "111222333"]
assert result["page_name"] == "Test Page"
assert result["source"] == "tracking_specs"
@pytest.mark.asyncio
async def test_discover_pages_from_client_pages(self):
"""Test page discovery from client_pages endpoint."""
mock_client_pages_data = {
"data": [
{
"id": "555666777",
"name": "Client Page",
"username": "clientpage"
}
]
}
# Mock empty ads data, then client_pages data
with patch('meta_ads_mcp.core.ads.make_api_request') as mock_api:
mock_api.side_effect = [
{"data": []}, # No ads found
mock_client_pages_data # Client pages found
]
result = await _discover_pages_for_account("act_123456789", "test_token")
assert result["success"] is True
assert result["page_id"] == "555666777"
assert result["page_name"] == "Client Page"
assert result["source"] == "client_pages"
@pytest.mark.asyncio
async def test_discover_pages_from_assigned_pages(self):
"""Test page discovery from assigned_pages endpoint."""
mock_assigned_pages_data = {
"data": [
{
"id": "888999000",
"name": "Assigned Page"
}
]
}
# Mock empty responses for first two methods, then assigned_pages
with patch('meta_ads_mcp.core.ads.make_api_request') as mock_api:
mock_api.side_effect = [
{"data": []}, # No ads found
{"data": []}, # No client pages found
mock_assigned_pages_data # Assigned pages found
]
result = await _discover_pages_for_account("act_123456789", "test_token")
assert result["success"] is True
assert result["page_id"] == "888999000"
assert result["page_name"] == "Assigned Page"
assert result["source"] == "assigned_pages"
@pytest.mark.asyncio
async def test_discover_pages_no_pages_found(self):
"""Test page discovery when no pages are found."""
with patch('meta_ads_mcp.core.ads.make_api_request') as mock_api:
mock_api.side_effect = [
{"data": []}, # No ads found
{"data": []}, # No client pages found
{"data": []} # No assigned pages found
]
result = await _discover_pages_for_account("act_123456789", "test_token")
assert result["success"] is False
assert "No suitable pages found" in result["message"]
@pytest.mark.asyncio
async def test_discover_pages_with_invalid_page_ids(self):
"""Test page discovery with invalid page IDs in tracking_specs."""
mock_ads_data = {
"data": [
{
"id": "123456789",
"tracking_specs": [
{
"page": ["invalid_id", "not_numeric", "123abc"]
}
]
}
]
}
with patch('meta_ads_mcp.core.ads.make_api_request') as mock_api:
mock_api.side_effect = [
mock_ads_data, # Ads with invalid page IDs
{"data": []}, # No client pages
{"data": []} # No assigned pages
]
result = await _discover_pages_for_account("act_123456789", "test_token")
assert result["success"] is False
assert "No suitable pages found" in result["message"]
@pytest.mark.asyncio
async def test_discover_pages_api_error_handling(self):
"""Test page discovery with API errors."""
with patch('meta_ads_mcp.core.ads.make_api_request') as mock_api:
mock_api.side_effect = Exception("API Error")
result = await _discover_pages_for_account("act_123456789", "test_token")
assert result["success"] is False
assert "Error during page discovery" in result["message"]
@pytest.mark.asyncio
async def test_search_pages_by_name_logic(self):
"""Test the core search logic without authentication interference."""
# Test the filtering logic directly
mock_pages_data = {
"data": [
{"id": "111", "name": "Test Page 1"},
{"id": "222", "name": "Another Test Page"},
{"id": "333", "name": "Different Page"}
]
}
# Test filtering with search term
search_term_lower = "test"
filtered_pages = []
for page in mock_pages_data["data"]:
page_name = page.get("name", "").lower()
if search_term_lower in page_name:
filtered_pages.append(page)
assert len(filtered_pages) == 2
assert filtered_pages[0]["name"] == "Test Page 1"
assert filtered_pages[1]["name"] == "Another Test Page"
@pytest.mark.asyncio
async def test_search_pages_by_name_no_matches(self):
"""Test search logic with no matching results."""
mock_pages_data = {
"data": [
{"id": "111", "name": "Test Page 1"},
{"id": "222", "name": "Another Test Page"}
]
}
# Test filtering with non-matching search term
search_term_lower = "nonexistent"
filtered_pages = []
for page in mock_pages_data["data"]:
page_name = page.get("name", "").lower()
if search_term_lower in page_name:
filtered_pages.append(page)
assert len(filtered_pages) == 0
@pytest.mark.asyncio
async def test_search_pages_by_name_core_success(self):
"""Test the core search function with successful page discovery."""
mock_discovery_result = {
"success": True,
"page_id": "123456789",
"page_name": "Test Page",
"source": "tracking_specs"
}
with patch('meta_ads_mcp.core.ads._discover_pages_for_account') as mock_discover:
mock_discover.return_value = mock_discovery_result
result = await _search_pages_by_name_core("test_token", "act_123456789", "test")
result_data = json.loads(result)
assert len(result_data["data"]) == 1
assert result_data["data"][0]["id"] == "123456789"
assert result_data["data"][0]["name"] == "Test Page"
assert result_data["search_term"] == "test"
assert result_data["total_found"] == 1
assert result_data["total_available"] == 1
@pytest.mark.asyncio
async def test_search_pages_by_name_core_no_pages(self):
"""Test the core search function when no pages are found."""
mock_discovery_result = {
"success": False,
"message": "No pages found"
}
with patch('meta_ads_mcp.core.ads._discover_pages_for_account') as mock_discover:
mock_discover.return_value = mock_discovery_result
result = await _search_pages_by_name_core("test_token", "act_123456789", "test")
result_data = json.loads(result)
assert len(result_data["data"]) == 0
assert "No pages found" in result_data["message"]
@pytest.mark.asyncio
async def test_search_pages_by_name_core_no_search_term(self):
"""Test the core search function without search term."""
mock_discovery_result = {
"success": True,
"page_id": "123456789",
"page_name": "Test Page",
"source": "tracking_specs"
}
with patch('meta_ads_mcp.core.ads._discover_pages_for_account') as mock_discover:
mock_discover.return_value = mock_discovery_result
result = await _search_pages_by_name_core("test_token", "act_123456789")
result_data = json.loads(result)
assert len(result_data["data"]) == 1
assert result_data["total_available"] == 1
assert "note" in result_data
@pytest.mark.asyncio
async def test_search_pages_by_name_core_exception_handling(self):
"""Test the core search function with exception handling."""
with patch('meta_ads_mcp.core.ads._discover_pages_for_account') as mock_discover:
mock_discover.side_effect = Exception("Test exception")
result = await _search_pages_by_name_core("test_token", "act_123456789", "test")
result_data = json.loads(result)
assert "error" in result_data
assert "Failed to search pages by name" in result_data["error"]
@pytest.mark.asyncio
async def test_discover_pages_with_multiple_page_ids(self):
"""Test page discovery with multiple page IDs in tracking_specs."""
mock_ads_data = {
"data": [
{
"id": "123456789",
"tracking_specs": [
{
"page": ["111222333", "444555666", "777888999"]
}
]
}
]
}
mock_page_data = {
"id": "111222333",
"name": "First Page",
"username": "firstpage"
}
with patch('meta_ads_mcp.core.ads.make_api_request') as mock_api:
mock_api.side_effect = [
mock_ads_data, # First call for ads
mock_page_data # Second call for page details
]
result = await _discover_pages_for_account("act_123456789", "test_token")
assert result["success"] is True
# Should get the first page ID from the set
assert result["page_id"] in ["111222333", "444555666", "777888999"]
assert result["page_name"] == "First Page"
@pytest.mark.asyncio
async def test_discover_pages_with_mixed_valid_invalid_ids(self):
"""Test page discovery with mixed valid and invalid page IDs."""
mock_ads_data = {
"data": [
{
"id": "123456789",
"tracking_specs": [
{
"page": ["invalid", "123456789", "not_numeric", "987654321"]
}
]
}
]
}
mock_page_data = {
"id": "123456789",
"name": "Valid Page",
"username": "validpage"
}
with patch('meta_ads_mcp.core.ads.make_api_request') as mock_api:
mock_api.side_effect = [
mock_ads_data, # First call for ads
mock_page_data # Second call for page details
]
result = await _discover_pages_for_account("act_123456789", "test_token")
assert result["success"] is True
# Should get one of the valid numeric IDs
assert result["page_id"] in ["123456789", "987654321"]
assert result["page_name"] == "Valid Page"
@pytest.mark.asyncio
async def test_search_pages_by_name_case_insensitive(self):
"""Test search function with case insensitive matching."""
mock_discovery_result = {
"success": True,
"page_id": "123456789",
"page_name": "Test Page",
"source": "tracking_specs"
}
with patch('meta_ads_mcp.core.ads._discover_pages_for_account') as mock_discover:
mock_discover.return_value = mock_discovery_result
# Test with uppercase search term
result = await _search_pages_by_name_core("test_token", "act_123456789", "TEST")
result_data = json.loads(result)
assert len(result_data["data"]) == 1
assert result_data["total_found"] == 1
# Test with lowercase search term
result = await _search_pages_by_name_core("test_token", "act_123456789", "test")
result_data = json.loads(result)
assert len(result_data["data"]) == 1
assert result_data["total_found"] == 1
if __name__ == "__main__":
pytest.main([__file__])
```
--------------------------------------------------------------------------------
/tests/test_account_info_access_fix.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Unit tests for get_account_info accessibility fix.
This module tests the fix for the issue where get_account_info couldn't access
accounts that were visible in get_ad_accounts but not in the limited direct
accessibility list.
The fix changes the logic to try fetching account info directly first,
rather than pre-checking against a limited accessibility list.
"""
import pytest
import json
from unittest.mock import AsyncMock, patch
from meta_ads_mcp.core.accounts import get_account_info
class TestAccountInfoAccessFix:
"""Test cases for the get_account_info accessibility fix"""
@pytest.mark.asyncio
async def test_account_info_direct_access_success(self):
"""Test that get_account_info works when direct API call succeeds"""
# Mock the direct account info API response
mock_account_response = {
"id": "act_414174661097171",
"name": "Venture Hunting & Outdoors",
"account_id": "414174661097171",
"account_status": 1,
"amount_spent": "5818510",
"balance": "97677",
"currency": "AUD",
"timezone_name": "Australia/Brisbane",
"business_country_code": "AU"
}
with patch('meta_ads_mcp.core.accounts.make_api_request', new_callable=AsyncMock) as mock_api:
with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
mock_auth.return_value = "test_access_token"
mock_api.return_value = mock_account_response
result = await get_account_info(account_id="414174661097171")
# Handle both string and dict return formats
if isinstance(result, str):
result_data = json.loads(result)
else:
result_data = result
# Verify the account info was returned successfully
assert "error" not in result_data
assert result_data["id"] == "act_414174661097171"
assert result_data["name"] == "Venture Hunting & Outdoors"
assert result_data["account_id"] == "414174661097171"
assert result_data["currency"] == "AUD"
assert result_data["timezone_name"] == "Australia/Brisbane"
# Verify DSA compliance detection was added
assert "dsa_required" in result_data
assert result_data["dsa_required"] is False # AU is not European
assert "dsa_compliance_note" in result_data
# Verify the API was called with correct parameters
mock_api.assert_called_once_with(
"act_414174661097171",
"test_access_token",
{
"fields": "id,name,account_id,account_status,amount_spent,balance,currency,age,business_city,business_country_code,timezone_name"
}
)
@pytest.mark.asyncio
async def test_account_info_permission_error_with_helpful_message(self):
"""Test that permission errors provide helpful error messages with accessible accounts"""
# Mock the permission error response from direct API call
mock_permission_error = {
"error": {
"message": "Insufficient privileges to access the object",
"type": "OAuthException",
"code": 200
}
}
# Mock accessible accounts response for helpful error message
mock_accessible_accounts = {
"data": [
{"id": "act_123456", "name": "Accessible Account 1"},
{"id": "act_789012", "name": "Accessible Account 2"},
{"id": "act_345678", "name": "Accessible Account 3"}
]
}
with patch('meta_ads_mcp.core.accounts.make_api_request', new_callable=AsyncMock) as mock_api:
with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
mock_auth.return_value = "test_access_token"
# First call returns permission error, second call returns accessible accounts
mock_api.side_effect = [mock_permission_error, mock_accessible_accounts]
result = await get_account_info(account_id="414174661097171")
# Handle both string and dict return formats
if isinstance(result, str):
result_data = json.loads(result)
else:
result_data = result
# Verify helpful error message
assert "error" in result_data
assert "not accessible to your user account" in result_data["error"]["message"]
assert "accessible_accounts" in result_data["error"]
assert "suggestion" in result_data["error"]
assert result_data["error"]["total_accessible_accounts"] == 3
# Verify accessible accounts list
accessible_accounts = result_data["error"]["accessible_accounts"]
assert len(accessible_accounts) == 3
assert accessible_accounts[0]["id"] == "act_123456"
assert accessible_accounts[0]["name"] == "Accessible Account 1"
# Verify API calls were made
assert mock_api.call_count == 2
# First call: direct account access attempt
mock_api.assert_any_call(
"act_414174661097171",
"test_access_token",
{
"fields": "id,name,account_id,account_status,amount_spent,balance,currency,age,business_city,business_country_code,timezone_name"
}
)
# Second call: get accessible accounts for error message
mock_api.assert_any_call(
"me/adaccounts",
"test_access_token",
{
"fields": "id,name,account_id,account_status,amount_spent,balance,currency,age,business_city,business_country_code",
"limit": 50
}
)
@pytest.mark.asyncio
async def test_account_info_non_permission_error_passthrough(self):
"""Test that non-permission errors are passed through unchanged"""
# Mock a non-permission error (e.g., invalid account ID)
mock_error_response = {
"error": {
"message": "Invalid account ID format",
"type": "GraphAPIException",
"code": 100
}
}
with patch('meta_ads_mcp.core.accounts.make_api_request', new_callable=AsyncMock) as mock_api:
with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
mock_auth.return_value = "test_access_token"
mock_api.return_value = mock_error_response
result = await get_account_info(account_id="invalid_id")
# Handle both string and dict return formats
if isinstance(result, str):
result_data = json.loads(result)
else:
result_data = result
# Verify the original error is returned unchanged
assert result_data == mock_error_response
# Verify only one API call was made (no attempt to get accessible accounts)
mock_api.assert_called_once()
@pytest.mark.asyncio
async def test_account_info_missing_account_id_error(self):
"""Test that missing account_id parameter returns appropriate error"""
with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
mock_auth.return_value = "test_access_token"
result = await get_account_info(account_id=None)
# Handle both string and dict return formats
if isinstance(result, str):
result_data = json.loads(result)
else:
result_data = result
# Verify error message
assert "error" in result_data
assert "Account ID is required" in result_data["error"]["message"]
assert "Please specify an account_id parameter" in result_data["error"]["details"]
@pytest.mark.asyncio
async def test_account_info_act_prefix_handling(self):
"""Test that account_id prefix handling works correctly"""
mock_account_response = {
"id": "act_123456789",
"name": "Test Account",
"account_id": "123456789"
}
with patch('meta_ads_mcp.core.accounts.make_api_request', new_callable=AsyncMock) as mock_api:
with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
mock_auth.return_value = "test_access_token"
mock_api.return_value = mock_account_response
# Test with account ID without act_ prefix
result = await get_account_info(account_id="123456789")
# Verify the API was called with the act_ prefix added
mock_api.assert_called_once_with(
"act_123456789", # Should have act_ prefix added
"test_access_token",
{
"fields": "id,name,account_id,account_status,amount_spent,balance,currency,age,business_city,business_country_code,timezone_name"
}
)
@pytest.mark.asyncio
async def test_account_info_european_dsa_detection(self):
"""Test that DSA requirements are properly detected for European accounts"""
# Mock account response for German business
mock_account_response = {
"id": "act_999888777",
"name": "German Test Account",
"account_id": "999888777",
"business_country_code": "DE" # Germany - should trigger DSA
}
with patch('meta_ads_mcp.core.accounts.make_api_request', new_callable=AsyncMock) as mock_api:
with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
mock_auth.return_value = "test_access_token"
mock_api.return_value = mock_account_response
result = await get_account_info(account_id="999888777")
# Handle both string and dict return formats
if isinstance(result, str):
result_data = json.loads(result)
else:
result_data = result
# Verify DSA requirements were properly detected
assert "dsa_required" in result_data
assert result_data["dsa_required"] is True # DE is European
assert "dsa_compliance_note" in result_data
assert "European DSA" in result_data["dsa_compliance_note"]
class TestAccountInfoAccessRegression:
"""Regression tests to ensure the fix doesn't break existing functionality"""
@pytest.mark.asyncio
async def test_regression_basic_account_info_still_works(self):
"""Regression test: ensure basic account info functionality still works"""
mock_account_response = {
"id": "act_123456789",
"name": "Basic Test Account",
"account_id": "123456789",
"account_status": 1,
"currency": "USD",
"business_country_code": "US"
}
with patch('meta_ads_mcp.core.accounts.make_api_request', new_callable=AsyncMock) as mock_api:
with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
mock_auth.return_value = "test_access_token"
mock_api.return_value = mock_account_response
result = await get_account_info(account_id="act_123456789")
# Handle both string and dict return formats
if isinstance(result, str):
result_data = json.loads(result)
else:
result_data = result
# Verify basic functionality works
assert "error" not in result_data
assert result_data["id"] == "act_123456789"
assert result_data["name"] == "Basic Test Account"
def test_account_info_fix_comparison(self):
"""
Documentation test: explains what the fix changed
BEFORE: get_account_info checked accessibility first against limited list (50 accounts)
AFTER: get_account_info tries direct API call first, only shows error if API fails
This allows accounts visible through business manager (like 414174661097171)
to work properly even if they're not in the limited direct accessibility list.
"""
# This is a documentation test - no actual code execution
old_behavior = "Pre-check accessibility against limited 50 account list"
new_behavior = "Try direct API call first, handle permission errors gracefully"
assert old_behavior != new_behavior
# The key insight: get_ad_accounts shows 107 accounts through business manager,
# but "me/adaccounts" only shows 50 directly accessible accounts
total_visible_accounts = 107
directly_accessible_accounts = 50
assert total_visible_accounts > directly_accessible_accounts
# Account 414174661097171 was in the 107 but not in the 50
# The fix allows get_account_info to work for such accounts
```
--------------------------------------------------------------------------------
/tests/test_targeting.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Unit tests for targeting search functionality in Meta Ads MCP.
"""
import pytest
import json
from unittest.mock import AsyncMock, patch
from meta_ads_mcp.core.targeting import (
search_interests,
get_interest_suggestions,
estimate_audience_size,
search_behaviors,
search_demographics,
search_geo_locations
)
class TestSearchInterests:
"""Test cases for search_interests function"""
@pytest.mark.asyncio
async def test_search_interests_success(self):
"""Test successful interest search"""
mock_response = {
"data": [
{
"id": "6003139266461",
"name": "Movies",
"audience_size": 1234567890,
"path": ["Entertainment", "Movies"]
},
{
"id": "6003397425735",
"name": "Tennis",
"audience_size": 987654321,
"path": ["Sports", "Tennis"]
}
]
}
with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
mock_api.return_value = mock_response
result = await search_interests(access_token="test_token", query="movies", limit=10)
# Verify API call
mock_api.assert_called_once_with(
"search",
"test_token",
{
"type": "adinterest",
"q": "movies",
"limit": 10
}
)
# Verify response
result_data = json.loads(result)
assert result_data == mock_response
assert len(result_data["data"]) == 2
assert result_data["data"][0]["name"] == "Movies"
@pytest.mark.asyncio
async def test_search_interests_no_query(self):
"""Test search_interests with empty query parameter"""
result = await search_interests(
query="", # Now provide the required parameter but with empty value
access_token="test_token"
)
result_data = json.loads(result)
# The @meta_api_tool decorator wraps errors in a 'data' field
assert "data" in result_data
nested_data = json.loads(result_data["data"])
assert "error" in nested_data
assert nested_data["error"] == "No search query provided"
@pytest.mark.asyncio
async def test_search_interests_default_limit(self):
"""Test search_interests with default limit"""
mock_response = {"data": []}
# Mock both the API request and the auth system to bypass decorator issues
with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
with patch('meta_ads_mcp.core.auth.get_current_access_token') as mock_auth:
mock_auth.return_value = "test_token"
mock_api.return_value = mock_response
result = await search_interests(query="test")
# Verify default limit is used
mock_api.assert_called_once_with(
"search",
"test_token",
{
"type": "adinterest",
"q": "test",
"limit": 25
}
)
# Verify the result is properly formatted
result_data = json.loads(result)
assert "data" in result_data
class TestGetInterestSuggestions:
"""Test cases for get_interest_suggestions function"""
@pytest.mark.asyncio
async def test_get_interest_suggestions_success(self):
"""Test successful interest suggestions"""
mock_response = {
"data": [
{
"id": "6003022269556",
"name": "Rugby football",
"audience_size": 13214830,
"path": [],
"description": None
},
{
"id": "6003146664949",
"name": "Netball",
"audience_size": 4333770,
"path": [],
"description": None
}
]
}
with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
mock_api.return_value = mock_response
result = await get_interest_suggestions(
access_token="test_token",
interest_list=["Basketball", "Soccer"],
limit=15
)
# Verify API call
mock_api.assert_called_once_with(
"search",
"test_token",
{
"type": "adinterestsuggestion",
"interest_list": '["Basketball", "Soccer"]',
"limit": 15
}
)
# Verify response
result_data = json.loads(result)
assert result_data == mock_response
assert len(result_data["data"]) == 2
@pytest.mark.asyncio
async def test_get_interest_suggestions_no_list(self):
"""Test get_interest_suggestions with empty interest list"""
result = await get_interest_suggestions(
interest_list=[], # Now provide the required parameter but with empty value
access_token="test_token"
)
result_data = json.loads(result)
# The @meta_api_tool decorator wraps errors in a 'data' field
assert "data" in result_data
nested_data = json.loads(result_data["data"])
assert "error" in nested_data
assert nested_data["error"] == "No interest list provided"
class TestEstimateAudienceSizeBackwardsCompatibility:
"""Test cases for estimate_audience_size function backwards compatibility"""
@pytest.mark.asyncio
async def test_validate_interests_by_name_success(self):
"""Test successful interest validation by name"""
mock_response = {
"data": [
{
"name": "Japan",
"valid": True,
"id": 6003700426513,
"audience_size": 68310258
},
{
"name": "nonexistantkeyword",
"valid": False
}
]
}
with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
mock_api.return_value = mock_response
result = await estimate_audience_size(
access_token="test_token",
interest_list=["Japan", "nonexistantkeyword"]
)
# Verify API call
mock_api.assert_called_once_with(
"search",
"test_token",
{
"type": "adinterestvalid",
"interest_list": '["Japan", "nonexistantkeyword"]'
}
)
# Verify response
result_data = json.loads(result)
assert result_data == mock_response
assert result_data["data"][0]["valid"] is True
assert result_data["data"][1]["valid"] is False
@pytest.mark.asyncio
async def test_validate_interests_by_fbid_success(self):
"""Test successful interest validation by FBID"""
mock_response = {
"data": [
{
"id": "6003700426513",
"valid": True,
"audience_size": 68310258
}
]
}
with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
mock_api.return_value = mock_response
result = await estimate_audience_size(
access_token="test_token",
interest_fbid_list=["6003700426513"]
)
# Verify API call
mock_api.assert_called_once_with(
"search",
"test_token",
{
"type": "adinterestvalid",
"interest_fbid_list": '["6003700426513"]'
}
)
# Verify response
result_data = json.loads(result)
assert result_data == mock_response
@pytest.mark.asyncio
async def test_validate_interests_no_input(self):
"""Test estimate_audience_size with no input lists (backwards compatibility)"""
result = await estimate_audience_size(access_token="test_token")
result_data = json.loads(result)
# The @meta_api_tool decorator wraps errors in a 'data' field
assert "data" in result_data
nested_data = json.loads(result_data["data"])
assert "error" in nested_data
assert nested_data["error"] == "No interest list or FBID list provided"
class TestSearchBehaviors:
"""Test cases for search_behaviors function"""
@pytest.mark.asyncio
async def test_search_behaviors_success(self):
"""Test successful behavior search"""
mock_response = {
"data": [
{
"id": 6007101597783,
"name": "Business Travelers",
"audience_size_lower_bound": 1000000,
"audience_size_upper_bound": 2000000,
"path": ["Travel", "Business Travel"],
"description": "People who travel for business",
"type": "behaviors"
}
]
}
with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
mock_api.return_value = mock_response
result = await search_behaviors(access_token="test_token", limit=25)
# Verify API call
mock_api.assert_called_once_with(
"search",
"test_token",
{
"type": "adTargetingCategory",
"class": "behaviors",
"limit": 25
}
)
# Verify response
result_data = json.loads(result)
assert result_data == mock_response
class TestSearchDemographics:
"""Test cases for search_demographics function"""
@pytest.mark.asyncio
async def test_search_demographics_success(self):
"""Test successful demographics search"""
mock_response = {
"data": [
{
"id": 6015559470583,
"name": "Parents (All)",
"audience_size_lower_bound": 500000000,
"audience_size_upper_bound": 750000000,
"path": ["Family", "Parents"],
"description": "Parents of children of any age",
"type": "demographics"
}
]
}
with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
mock_api.return_value = mock_response
result = await search_demographics(
access_token="test_token",
demographic_class="life_events",
limit=30
)
# Verify API call
mock_api.assert_called_once_with(
"search",
"test_token",
{
"type": "adTargetingCategory",
"class": "life_events",
"limit": 30
}
)
# Verify response
result_data = json.loads(result)
assert result_data == mock_response
class TestSearchGeoLocations:
"""Test cases for search_geo_locations function"""
@pytest.mark.asyncio
async def test_search_geo_locations_success(self):
"""Test successful geo location search"""
mock_response = {
"data": [
{
"key": "US",
"name": "United States",
"type": "country",
"supports_city": True,
"supports_region": True
},
{
"key": "3847",
"name": "California",
"type": "region",
"country_code": "US",
"country_name": "United States",
"supports_city": True,
"supports_region": True
}
]
}
with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
mock_api.return_value = mock_response
result = await search_geo_locations(
access_token="test_token",
query="United States",
location_types=["country", "region"],
limit=10
)
# Verify API call
mock_api.assert_called_once_with(
"search",
"test_token",
{
"type": "adgeolocation",
"q": "United States",
"location_types": '["country", "region"]',
"limit": 10
}
)
# Verify response
result_data = json.loads(result)
assert result_data == mock_response
@pytest.mark.asyncio
async def test_search_geo_locations_no_query(self):
"""Test search_geo_locations with empty query"""
result = await search_geo_locations(
query="", # Now provide the required parameter but with empty value
access_token="test_token"
)
result_data = json.loads(result)
# The @meta_api_tool decorator wraps errors in a 'data' field
assert "data" in result_data
nested_data = json.loads(result_data["data"])
assert "error" in nested_data
assert nested_data["error"] == "No search query provided"
@pytest.mark.asyncio
async def test_search_geo_locations_no_location_types(self):
"""Test search_geo_locations without location_types filter"""
mock_response = {"data": []}
# Mock both the API request and the auth system to bypass decorator issues
with patch('meta_ads_mcp.core.targeting.make_api_request', new_callable=AsyncMock) as mock_api:
with patch('meta_ads_mcp.core.auth.get_current_access_token') as mock_auth:
mock_auth.return_value = "test_token"
mock_api.return_value = mock_response
result = await search_geo_locations(query="test")
# Verify API call doesn't include location_types
mock_api.assert_called_once_with(
"search",
"test_token",
{
"type": "adgeolocation",
"q": "test",
"limit": 25
}
)
# Verify the result is properly formatted
result_data = json.loads(result)
assert "data" in result_data
```
--------------------------------------------------------------------------------
/meta_ads_mcp/core/adsets.py:
--------------------------------------------------------------------------------
```python
"""Ad Set-related functionality for Meta Ads API."""
import json
from typing import Optional, Dict, Any, List
from .api import meta_api_tool, make_api_request
from .accounts import get_ad_accounts
from .server import mcp_server
@mcp_server.tool()
@meta_api_tool
async def get_adsets(account_id: str, access_token: Optional[str] = None, limit: int = 10, campaign_id: str = "") -> str:
"""
Get ad sets for a Meta Ads account with optional filtering by campaign.
Args:
account_id: Meta Ads account ID (format: act_XXXXXXXXX)
access_token: Meta API access token (optional - will use cached token if not provided)
limit: Maximum number of ad sets to return (default: 10)
campaign_id: Optional campaign ID to filter by
"""
# Require explicit account_id
if not account_id:
return json.dumps({"error": "No account ID specified"}, indent=2)
# Change endpoint based on whether campaign_id is provided
if campaign_id:
endpoint = f"{campaign_id}/adsets"
params = {
"fields": "id,name,campaign_id,status,daily_budget,lifetime_budget,targeting,bid_amount,bid_strategy,optimization_goal,billing_event,start_time,end_time,created_time,updated_time,is_dynamic_creative,frequency_control_specs{event,interval_days,max_frequency}",
"limit": limit
}
else:
# Use account endpoint if no campaign_id is given
endpoint = f"{account_id}/adsets"
params = {
"fields": "id,name,campaign_id,status,daily_budget,lifetime_budget,targeting,bid_amount,bid_strategy,optimization_goal,billing_event,start_time,end_time,created_time,updated_time,is_dynamic_creative,frequency_control_specs{event,interval_days,max_frequency}",
"limit": limit
}
# Note: Removed the attempt to add campaign_id to params for the account endpoint case,
# as it was ineffective and the logic now uses the correct endpoint for campaign filtering.
data = await make_api_request(endpoint, access_token, params)
return json.dumps(data, indent=2)
@mcp_server.tool()
@meta_api_tool
async def get_adset_details(adset_id: str, access_token: Optional[str] = None) -> str:
"""
Get detailed information about a specific ad set.
Args:
adset_id: Meta Ads ad set ID
access_token: Meta API access token (optional - will use cached token if not provided)
Example:
To call this function through MCP, pass the adset_id as the first argument:
{
"args": "YOUR_ADSET_ID"
}
"""
if not adset_id:
return json.dumps({"error": "No ad set ID provided"}, indent=2)
endpoint = f"{adset_id}"
# Explicitly prioritize frequency_control_specs in the fields request
params = {
"fields": "id,name,campaign_id,status,frequency_control_specs{event,interval_days,max_frequency},daily_budget,lifetime_budget,targeting,bid_amount,bid_strategy,optimization_goal,billing_event,start_time,end_time,created_time,updated_time,attribution_spec,destination_type,promoted_object,pacing_type,budget_remaining,dsa_beneficiary,is_dynamic_creative"
}
data = await make_api_request(endpoint, access_token, params)
# For debugging - check if frequency_control_specs was returned
if 'frequency_control_specs' not in data:
data['_meta'] = {
'note': 'No frequency_control_specs field was returned by the API. This means either no frequency caps are set or the API did not include this field in the response.'
}
return json.dumps(data, indent=2)
@mcp_server.tool()
@meta_api_tool
async def create_adset(
account_id: str,
campaign_id: str,
name: str,
optimization_goal: str,
billing_event: str,
status: str = "PAUSED",
daily_budget: Optional[int] = None,
lifetime_budget: Optional[int] = None,
targeting: Optional[Dict[str, Any]] = None,
bid_amount: Optional[int] = None,
bid_strategy: Optional[str] = None,
start_time: Optional[str] = None,
end_time: Optional[str] = None,
dsa_beneficiary: Optional[str] = None,
promoted_object: Optional[Dict[str, Any]] = None,
destination_type: Optional[str] = None,
is_dynamic_creative: Optional[bool] = None,
access_token: Optional[str] = None
) -> str:
"""
Create a new ad set in a Meta Ads account.
Args:
account_id: Meta Ads account ID (format: act_XXXXXXXXX)
campaign_id: Meta Ads campaign ID this ad set belongs to
name: Ad set name
optimization_goal: Conversion optimization goal (e.g., 'LINK_CLICKS', 'REACH', 'CONVERSIONS', 'APP_INSTALLS')
billing_event: How you're charged (e.g., 'IMPRESSIONS', 'LINK_CLICKS')
status: Initial ad set status (default: PAUSED)
daily_budget: Daily budget in account currency (in cents) as a string
lifetime_budget: Lifetime budget in account currency (in cents) as a string
targeting: Targeting specifications including age, location, interests, etc.
Use targeting_automation.advantage_audience=1 for automatic audience finding
bid_amount: Bid amount in account currency (in cents)
bid_strategy: Bid strategy (e.g., 'LOWEST_COST', 'LOWEST_COST_WITH_BID_CAP')
start_time: Start time in ISO 8601 format (e.g., '2023-12-01T12:00:00-0800')
end_time: End time in ISO 8601 format
dsa_beneficiary: DSA beneficiary (person/organization benefiting from ads) for European compliance
promoted_object: Mobile app configuration for APP_INSTALLS campaigns. Required fields: application_id, object_store_url.
Optional fields: custom_event_type, pixel_id, page_id.
Example: {"application_id": "123456789012345", "object_store_url": "https://apps.apple.com/app/id123456789"}
destination_type: Where users are directed after clicking the ad (e.g., 'APP_STORE', 'DEEPLINK', 'APP_INSTALL', 'ON_AD').
Required for mobile app campaigns and lead generation campaigns.
Use 'ON_AD' for lead generation campaigns where user interaction happens within the ad.
is_dynamic_creative: Enable Dynamic Creative for this ad set (required when using dynamic creatives with asset_feed_spec/dynamic_creative_spec).
access_token: Meta API access token (optional - will use cached token if not provided)
"""
# Check required parameters
if not account_id:
return json.dumps({"error": "No account ID provided"}, indent=2)
if not campaign_id:
return json.dumps({"error": "No campaign ID provided"}, indent=2)
if not name:
return json.dumps({"error": "No ad set name provided"}, indent=2)
if not optimization_goal:
return json.dumps({"error": "No optimization goal provided"}, indent=2)
if not billing_event:
return json.dumps({"error": "No billing event provided"}, indent=2)
# Validate mobile app parameters for APP_INSTALLS campaigns
if optimization_goal == "APP_INSTALLS":
if not promoted_object:
return json.dumps({
"error": "promoted_object is required for APP_INSTALLS optimization goal",
"details": "Mobile app campaigns must specify which app is being promoted",
"required_fields": ["application_id", "object_store_url"]
}, indent=2)
# Validate promoted_object structure
if not isinstance(promoted_object, dict):
return json.dumps({
"error": "promoted_object must be a dictionary",
"example": {"application_id": "123456789012345", "object_store_url": "https://apps.apple.com/app/id123456789"}
}, indent=2)
# Validate required promoted_object fields
if "application_id" not in promoted_object:
return json.dumps({
"error": "promoted_object missing required field: application_id",
"details": "application_id is the Facebook app ID for your mobile app"
}, indent=2)
if "object_store_url" not in promoted_object:
return json.dumps({
"error": "promoted_object missing required field: object_store_url",
"details": "object_store_url should be the App Store or Google Play URL for your app"
}, indent=2)
# Validate store URL format
store_url = promoted_object["object_store_url"]
valid_store_patterns = [
"apps.apple.com", # iOS App Store
"play.google.com", # Google Play Store
"itunes.apple.com" # Alternative iOS format
]
if not any(pattern in store_url for pattern in valid_store_patterns):
return json.dumps({
"error": "Invalid object_store_url format",
"details": "URL must be from App Store (apps.apple.com) or Google Play (play.google.com)",
"provided_url": store_url
}, indent=2)
# Validate destination_type if provided
if destination_type:
valid_destination_types = ["APP_STORE", "DEEPLINK", "APP_INSTALL", "ON_AD"]
if destination_type not in valid_destination_types:
return json.dumps({
"error": f"Invalid destination_type: {destination_type}",
"valid_values": valid_destination_types
}, indent=2)
# Basic targeting is required if not provided
if not targeting:
targeting = {
"age_min": 18,
"age_max": 65,
"geo_locations": {"countries": ["US"]},
"targeting_automation": {"advantage_audience": 1}
}
endpoint = f"{account_id}/adsets"
params = {
"name": name,
"campaign_id": campaign_id,
"status": status,
"optimization_goal": optimization_goal,
"billing_event": billing_event,
"targeting": json.dumps(targeting) # Properly format as JSON string
}
# Convert budget values to strings if they aren't already
if daily_budget is not None:
params["daily_budget"] = str(daily_budget)
if lifetime_budget is not None:
params["lifetime_budget"] = str(lifetime_budget)
# Add other parameters if provided
if bid_amount is not None:
params["bid_amount"] = str(bid_amount)
if bid_strategy:
params["bid_strategy"] = bid_strategy
if start_time:
params["start_time"] = start_time
if end_time:
params["end_time"] = end_time
# Add DSA beneficiary if provided
if dsa_beneficiary:
params["dsa_beneficiary"] = dsa_beneficiary
# Add mobile app parameters if provided
if promoted_object:
params["promoted_object"] = json.dumps(promoted_object)
if destination_type:
params["destination_type"] = destination_type
# Enable Dynamic Creative if requested
if is_dynamic_creative is not None:
params["is_dynamic_creative"] = "true" if bool(is_dynamic_creative) else "false"
try:
data = await make_api_request(endpoint, access_token, params, method="POST")
return json.dumps(data, indent=2)
except Exception as e:
error_msg = str(e)
# Enhanced error handling for DSA beneficiary issues
if "permission" in error_msg.lower() or "insufficient" in error_msg.lower():
return json.dumps({
"error": "Insufficient permissions to set DSA beneficiary. Please ensure you have business_management permissions.",
"details": error_msg,
"params_sent": params,
"permission_required": True
}, indent=2)
elif "dsa_beneficiary" in error_msg.lower() and ("not supported" in error_msg.lower() or "parameter" in error_msg.lower()):
return json.dumps({
"error": "DSA beneficiary parameter not supported in this API version. Please set DSA beneficiary manually in Facebook Ads Manager.",
"details": error_msg,
"params_sent": params,
"manual_setup_required": True
}, indent=2)
elif "benefits from ads" in error_msg or "DSA beneficiary" in error_msg:
return json.dumps({
"error": "DSA beneficiary required for European compliance. Please provide the person or organization that benefits from ads in this ad set.",
"details": error_msg,
"params_sent": params,
"dsa_required": True
}, indent=2)
else:
return json.dumps({
"error": "Failed to create ad set",
"details": error_msg,
"params_sent": params
}, indent=2)
@mcp_server.tool()
@meta_api_tool
async def update_adset(adset_id: str, frequency_control_specs: Optional[List[Dict[str, Any]]] = None, bid_strategy: Optional[str] = None,
bid_amount: Optional[int] = None, status: Optional[str] = None, targeting: Optional[Dict[str, Any]] = None,
optimization_goal: Optional[str] = None, daily_budget: Optional[int] = None, lifetime_budget: Optional[int] = None,
is_dynamic_creative: Optional[bool] = None,
access_token: Optional[str] = None) -> str:
"""
Update an ad set with new settings including frequency caps and budgets.
Args:
adset_id: Meta Ads ad set ID
frequency_control_specs: List of frequency control specifications
(e.g. [{"event": "IMPRESSIONS", "interval_days": 7, "max_frequency": 3}])
bid_strategy: Bid strategy (e.g., 'LOWEST_COST_WITH_BID_CAP')
bid_amount: Bid amount in account currency (in cents for USD)
status: Update ad set status (ACTIVE, PAUSED, etc.)
targeting: Complete targeting specifications (will replace existing targeting)
(e.g. {"targeting_automation":{"advantage_audience":1}, "geo_locations": {"countries": ["US"]}})
optimization_goal: Conversion optimization goal (e.g., 'LINK_CLICKS', 'CONVERSIONS', 'APP_INSTALLS', etc.)
daily_budget: Daily budget in account currency (in cents) as a string
lifetime_budget: Lifetime budget in account currency (in cents) as a string
is_dynamic_creative: Enable/disable Dynamic Creative for this ad set.
access_token: Meta API access token (optional - will use cached token if not provided)
"""
if not adset_id:
return json.dumps({"error": "No ad set ID provided"}, indent=2)
params = {}
if frequency_control_specs is not None:
params['frequency_control_specs'] = frequency_control_specs
if bid_strategy is not None:
params['bid_strategy'] = bid_strategy
if bid_amount is not None:
params['bid_amount'] = str(bid_amount)
if status is not None:
params['status'] = status
if optimization_goal is not None:
params['optimization_goal'] = optimization_goal
if targeting is not None:
# Ensure proper JSON encoding for targeting
if isinstance(targeting, dict):
params['targeting'] = json.dumps(targeting)
else:
params['targeting'] = targeting # Already a string
# Add budget parameters if provided
if daily_budget is not None:
params['daily_budget'] = str(daily_budget)
if lifetime_budget is not None:
params['lifetime_budget'] = str(lifetime_budget)
if is_dynamic_creative is not None:
params['is_dynamic_creative'] = "true" if bool(is_dynamic_creative) else "false"
if not params:
return json.dumps({"error": "No update parameters provided"}, indent=2)
endpoint = f"{adset_id}"
try:
# Use POST method for updates as per Meta API documentation
data = await make_api_request(endpoint, access_token, params, method="POST")
return json.dumps(data, indent=2)
except Exception as e:
error_msg = str(e)
# Include adset_id in error for better context
return json.dumps({
"error": f"Failed to update ad set {adset_id}",
"details": error_msg,
"params_sent": params
}, indent=2)
```
--------------------------------------------------------------------------------
/tests/test_openai_mcp_deep_research.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
OpenAI MCP Deep Research Integration Tests
This test suite validates the OpenAI MCP specification compliance:
- search tool: Returns list of IDs based on query
- fetch tool: Returns complete record data by ID
- ChatGPT Deep Research compatibility
- Integration with existing authentication
Usage:
1. Start the server: python -m meta_ads_mcp --transport streamable-http --port 8080
2. Run tests: python -m pytest tests/test_openai_mcp_deep_research.py -v
Or run directly:
python tests/test_openai_mcp_deep_research.py
"""
import requests
import json
import time
import sys
import os
from typing import Dict, Any, Optional, List
# Load environment variables from .env file
try:
from dotenv import load_dotenv
load_dotenv()
print("✅ Loaded environment variables from .env file")
except ImportError:
print("⚠️ python-dotenv not installed, using system environment variables only")
print(" Install with: pip install python-dotenv")
# Add project root to path for imports
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
class OpenAIMCPTester:
"""Test suite for OpenAI MCP Deep Research compatibility"""
def __init__(self, base_url: str = "http://localhost:8080"):
self.base_url = base_url.rstrip('/')
self.endpoint = f"{self.base_url}/mcp/"
self.request_id = 1
def _make_request(self, method: str, params: Dict[str, Any] = None,
headers: Dict[str, str] = None) -> Dict[str, Any]:
"""Make a JSON-RPC request to the MCP server"""
# Default headers for MCP protocol with streamable HTTP transport
default_headers = {
"Content-Type": "application/json",
"Accept": "application/json, text/event-stream",
"User-Agent": "OpenAI-MCP-Test-Client/1.0"
}
if headers:
default_headers.update(headers)
payload = {
"jsonrpc": "2.0",
"method": method,
"id": self.request_id
}
if params:
payload["params"] = params
try:
response = requests.post(
self.endpoint,
headers=default_headers,
json=payload,
timeout=10
)
self.request_id += 1
return {
"status_code": response.status_code,
"headers": dict(response.headers),
"json": response.json() if response.status_code == 200 else None,
"text": response.text,
"success": response.status_code == 200
}
except requests.exceptions.RequestException as e:
return {
"status_code": 0,
"headers": {},
"json": None,
"text": str(e),
"success": False,
"error": str(e)
}
def test_search_tool_exists(self, auth_headers: Dict[str, str] = None) -> Dict[str, Any]:
"""Test that the search tool is available in tools list"""
result = self._make_request("tools/list", {}, auth_headers)
if not result["success"]:
return {"success": False, "error": "Failed to get tools list"}
tools = result["json"]["result"].get("tools", [])
search_tool = next((tool for tool in tools if tool["name"] == "search"), None)
return {
"success": search_tool is not None,
"tool": search_tool,
"all_tools": [tool["name"] for tool in tools]
}
def test_fetch_tool_exists(self, auth_headers: Dict[str, str] = None) -> Dict[str, Any]:
"""Test that the fetch tool is available in tools list"""
result = self._make_request("tools/list", {}, auth_headers)
if not result["success"]:
return {"success": False, "error": "Failed to get tools list"}
tools = result["json"]["result"].get("tools", [])
fetch_tool = next((tool for tool in tools if tool["name"] == "fetch"), None)
return {
"success": fetch_tool is not None,
"tool": fetch_tool,
"all_tools": [tool["name"] for tool in tools]
}
def test_search_tool_call(self, query: str, auth_headers: Dict[str, str] = None) -> Dict[str, Any]:
"""Test calling the search tool with a query"""
result = self._make_request("tools/call", {
"name": "search",
"arguments": {"query": query}
}, auth_headers)
if not result["success"]:
return {"success": False, "error": result.get("text", "Unknown error")}
# Parse the tool response
response_data = result["json"]["result"]
content = response_data.get("content", [{}])[0].get("text", "")
try:
parsed_content = json.loads(content)
ids = parsed_content.get("ids", [])
return {
"success": True,
"ids": ids,
"raw_content": content,
"id_count": len(ids)
}
except json.JSONDecodeError:
return {
"success": False,
"error": "Search tool did not return valid JSON",
"raw_content": content
}
def test_fetch_tool_call(self, record_id: str, auth_headers: Dict[str, str] = None) -> Dict[str, Any]:
"""Test calling the fetch tool with an ID"""
result = self._make_request("tools/call", {
"name": "fetch",
"arguments": {"id": record_id}
}, auth_headers)
if not result["success"]:
return {"success": False, "error": result.get("text", "Unknown error")}
# Parse the tool response
response_data = result["json"]["result"]
content = response_data.get("content", [{}])[0].get("text", "")
try:
parsed_content = json.loads(content)
return {
"success": True,
"record": parsed_content,
"raw_content": content,
"has_required_fields": all(field in parsed_content for field in ["id", "title", "text"])
}
except json.JSONDecodeError:
return {
"success": False,
"error": "Fetch tool did not return valid JSON",
"raw_content": content
}
def test_search_fetch_workflow(self, auth_headers: Dict[str, str] = None) -> Dict[str, Any]:
"""Test the complete search->fetch workflow that ChatGPT Deep Research expects"""
# Step 1: Search for something that will return account IDs
search_result = self.test_search_tool_call("Yves", auth_headers)
if not search_result["success"]:
return {
"success": False,
"step": "search",
"error": search_result.get("error", "Search failed")
}
if not search_result["ids"]:
return {
"success": False,
"step": "search",
"error": "Search returned no IDs"
}
# Step 2: Fetch the first ID
first_id = search_result["ids"][0]
fetch_result = self.test_fetch_tool_call(first_id, auth_headers)
if not fetch_result["success"]:
return {
"success": False,
"step": "fetch",
"error": fetch_result.get("error", "Fetch failed"),
"searched_id": first_id
}
return {
"success": True,
"search_ids": search_result["ids"],
"fetched_record": fetch_result["record"],
"workflow_complete": True
}
def test_openai_specification_compliance(self, auth_headers: Dict[str, str] = None) -> Dict[str, bool]:
"""Test compliance with OpenAI's MCP specification for Deep Research"""
results = {}
print("\n🧪 Testing OpenAI MCP Specification Compliance")
print("="*55)
# Test 1: Both required tools exist
print("🔍 Checking required tools exist")
search_exists = self.test_search_tool_exists(auth_headers)
fetch_exists = self.test_fetch_tool_exists(auth_headers)
results["search_tool_exists"] = search_exists["success"]
results["fetch_tool_exists"] = fetch_exists["success"]
if not search_exists["success"]:
print("❌ Search tool not found")
print(f" Available tools: {search_exists.get('all_tools', [])}")
return results
if not fetch_exists["success"]:
print("❌ Fetch tool not found")
print(f" Available tools: {fetch_exists.get('all_tools', [])}")
return results
print("✅ Both search and fetch tools found")
# Test 2: Search tool returns proper format
print("\n🔍 Testing search tool format")
search_result = self.test_search_tool_call("Yves", auth_headers)
results["search_format_valid"] = search_result["success"]
if not search_result["success"]:
print(f"❌ Search tool failed: {search_result.get('error', 'Unknown error')}")
return results
print(f"✅ Search tool returns valid format with {search_result['id_count']} IDs")
# Test 3: Fetch tool returns proper format
if search_result["ids"]:
print("\n🔍 Testing fetch tool format")
first_id = search_result["ids"][0]
fetch_result = self.test_fetch_tool_call(first_id, auth_headers)
results["fetch_format_valid"] = fetch_result["success"]
results["fetch_has_required_fields"] = fetch_result.get("has_required_fields", False)
if not fetch_result["success"]:
print(f"❌ Fetch tool failed: {fetch_result.get('error', 'Unknown error')}")
return results
print("✅ Fetch tool returns valid format")
if fetch_result["has_required_fields"]:
print("✅ Fetch response includes required fields (id, title, text)")
else:
print("⚠️ Fetch response missing some required fields")
else:
print("⚠️ Cannot test fetch tool - no IDs returned by search")
results["fetch_format_valid"] = False
results["fetch_has_required_fields"] = False
# Test 4: Complete workflow
print("\n🔍 Testing complete search->fetch workflow")
workflow_result = self.test_search_fetch_workflow(auth_headers)
results["workflow_complete"] = workflow_result["success"]
if workflow_result["success"]:
print("✅ Complete workflow successful")
else:
print(f"❌ Workflow failed at {workflow_result.get('step', 'unknown')} step")
print(f" Error: {workflow_result.get('error', 'Unknown error')}")
return results
def test_page_search_functionality(self, auth_headers: Dict[str, str] = None) -> Dict[str, Any]:
"""Test that the search function includes page searching when query mentions pages"""
print("\n🔍 Testing page search functionality")
# Test 1: Search with page-related query that matches an account name
page_search_result = self.test_search_tool_call("Injury Payouts pages", auth_headers)
if not page_search_result["success"]:
return {
"success": False,
"error": f"Page search failed: {page_search_result.get('error', 'Unknown error')}"
}
# Check if page records are included in results
page_ids = [id for id in page_search_result["ids"] if id.startswith("page:")]
result = {
"success": True,
"page_search_works": len(page_ids) > 0,
"page_ids_found": len(page_ids),
"total_ids": len(page_search_result["ids"]),
"page_ids": page_ids
}
if len(page_ids) > 0:
print(f"✅ Page search working - found {len(page_ids)} page records")
# Test 2: Fetch a page record
first_page_id = page_ids[0]
fetch_result = self.test_fetch_tool_call(first_page_id, auth_headers)
if fetch_result["success"]:
print(f"✅ Page fetch working - retrieved page record: {first_page_id}")
result["page_fetch_works"] = True
result["fetched_page_data"] = fetch_result.get("record", {})
else:
print(f"❌ Page fetch failed: {fetch_result.get('error', 'Unknown error')}")
result["page_fetch_works"] = False
else:
print("⚠️ No page records found in search results")
result["page_fetch_works"] = False
return result
def run_openai_compliance_test_suite(self) -> bool:
"""Run complete OpenAI MCP compliance test suite"""
print("🚀 OpenAI MCP Deep Research Compliance Test Suite")
print("="*60)
# Check server availability first
try:
response = requests.get(f"{self.base_url}/", timeout=5)
server_running = response.status_code in [200, 404]
except:
server_running = False
if not server_running:
print("❌ Server is not running at", self.base_url)
print(" Please start the server with:")
print(" python -m meta_ads_mcp --transport streamable-http --port 8080")
return False
print("✅ Server is running")
# Test with no authentication (server handles auth implicitly)
auth_scenarios = [
{
"name": "No Authentication",
"headers": None
}
]
all_results = {}
for scenario in auth_scenarios:
print(f"\n📋 Testing with: {scenario['name']}")
print("-" * 40)
results = self.test_openai_specification_compliance(scenario["headers"])
# Add page search test
page_results = self.test_page_search_functionality(scenario["headers"])
results["page_search_functionality"] = page_results.get("page_search_works", False)
results["page_fetch_functionality"] = page_results.get("page_fetch_works", False)
all_results[scenario["name"]] = results
# Summary
print("\n🏁 OPENAI MCP COMPLIANCE TEST RESULTS")
print("="*40)
overall_success = True
for scenario_name, results in all_results.items():
scenario_success = all(results.values()) if results else False
status = "✅ COMPLIANT" if scenario_success else "❌ NON-COMPLIANT"
print(f"{scenario_name}: {status}")
if not scenario_success and results:
for test_name, test_result in results.items():
if not test_result:
print(f" ❌ {test_name}")
if not scenario_success:
overall_success = False
print(f"\n📊 Overall OpenAI MCP Compliance: {'✅ COMPLIANT' if overall_success else '❌ NON-COMPLIANT'}")
if overall_success:
print("\n🎉 Server is fully compatible with OpenAI's MCP specification!")
print(" • ChatGPT Deep Research: Ready")
print(" • Search tool: Compliant (includes page search)")
print(" • Fetch tool: Compliant")
print(" • Workflow: Complete")
print(" • Page Search: Enhanced")
else:
print("\n⚠️ Server needs updates for OpenAI MCP compliance")
print(" See failed tests above for required changes")
return overall_success
def main():
"""Main test execution"""
tester = OpenAIMCPTester()
success = tester.run_openai_compliance_test_suite()
sys.exit(0 if success else 1)
if __name__ == "__main__":
main()
```
--------------------------------------------------------------------------------
/meta_ads_mcp/core/api.py:
--------------------------------------------------------------------------------
```python
"""Core API functionality for Meta Ads API."""
from typing import Any, Dict, Optional, Callable
import json
import httpx
import asyncio
import functools
import os
from . import auth
from .auth import needs_authentication, auth_manager, start_callback_server, shutdown_callback_server
from .utils import logger
# Constants
META_GRAPH_API_VERSION = "v22.0"
META_GRAPH_API_BASE = f"https://graph.facebook.com/{META_GRAPH_API_VERSION}"
USER_AGENT = "meta-ads-mcp/1.0"
# Log key environment and configuration at startup
logger.info("Core API module initialized")
logger.info(f"Graph API Version: {META_GRAPH_API_VERSION}")
logger.info(f"META_APP_ID env var present: {'Yes' if os.environ.get('META_APP_ID') else 'No'}")
class GraphAPIError(Exception):
"""Exception raised for errors from the Graph API."""
def __init__(self, error_data: Dict[str, Any]):
self.error_data = error_data
self.message = error_data.get('message', 'Unknown Graph API error')
super().__init__(self.message)
# Log error details
logger.error(f"Graph API Error: {self.message}")
logger.debug(f"Error details: {error_data}")
# Check if this is an auth error
if "code" in error_data and error_data["code"] in [190, 102, 4]:
# Common auth error codes
logger.warning(f"Auth error detected (code: {error_data['code']}). Invalidating token.")
auth_manager.invalidate_token()
async def make_api_request(
endpoint: str,
access_token: str,
params: Optional[Dict[str, Any]] = None,
method: str = "GET"
) -> Dict[str, Any]:
"""
Make a request to the Meta Graph API.
Args:
endpoint: API endpoint path (without base URL)
access_token: Meta API access token
params: Additional query parameters
method: HTTP method (GET, POST, DELETE)
Returns:
API response as a dictionary
"""
# Validate access token before proceeding
if not access_token:
logger.error("API request attempted with blank access token")
return {
"error": {
"message": "Authentication Required",
"details": "A valid access token is required to access the Meta API",
"action_required": "Please authenticate first"
}
}
url = f"{META_GRAPH_API_BASE}/{endpoint}"
headers = {
"User-Agent": USER_AGENT,
}
request_params = params or {}
request_params["access_token"] = access_token
# Logging the request (masking token for security)
masked_params = {k: "***TOKEN***" if k == "access_token" else v for k, v in request_params.items()}
logger.debug(f"API Request: {method} {url}")
logger.debug(f"Request params: {masked_params}")
# Check for app_id in params
app_id = auth_manager.app_id
logger.debug(f"Current app_id from auth_manager: {app_id}")
async with httpx.AsyncClient() as client:
try:
if method == "GET":
# For GET, JSON-encode dict/list params (e.g., targeting_spec) to proper strings
encoded_params = {}
for key, value in request_params.items():
if isinstance(value, (dict, list)):
encoded_params[key] = json.dumps(value)
else:
encoded_params[key] = value
response = await client.get(url, params=encoded_params, headers=headers, timeout=30.0)
elif method == "POST":
# For Meta API, POST requests need data, not JSON
if 'targeting' in request_params and isinstance(request_params['targeting'], dict):
# Convert targeting dict to string for the API
request_params['targeting'] = json.dumps(request_params['targeting'])
# Convert lists and dicts to JSON strings
for key, value in request_params.items():
if isinstance(value, (list, dict)):
request_params[key] = json.dumps(value)
logger.debug(f"POST params (prepared): {masked_params}")
response = await client.post(url, data=request_params, headers=headers, timeout=30.0)
elif method == "DELETE":
response = await client.delete(url, params=request_params, headers=headers, timeout=30.0)
else:
raise ValueError(f"Unsupported HTTP method: {method}")
response.raise_for_status()
logger.debug(f"API Response status: {response.status_code}")
# Ensure the response is JSON and return it as a dictionary
try:
return response.json()
except json.JSONDecodeError:
# If not JSON, return text content in a structured format
return {
"text_response": response.text,
"status_code": response.status_code
}
except httpx.HTTPStatusError as e:
error_info = {}
try:
error_info = e.response.json()
except:
error_info = {"status_code": e.response.status_code, "text": e.response.text}
logger.error(f"HTTP Error: {e.response.status_code} - {error_info}")
# Check for authentication errors
if e.response.status_code == 401 or e.response.status_code == 403:
logger.warning("Detected authentication error (401/403)")
auth_manager.invalidate_token()
elif "error" in error_info:
error_obj = error_info.get("error", {})
# Check for specific FB API errors related to auth
if isinstance(error_obj, dict) and error_obj.get("code") in [190, 102, 4, 200, 10]:
logger.warning(f"Detected Facebook API auth error: {error_obj.get('code')}")
# Log more details about app ID related errors
if error_obj.get("code") == 200 and "Provide valid app ID" in error_obj.get("message", ""):
logger.error("Meta API authentication configuration issue")
logger.error(f"Current app_id: {app_id}")
# Provide a clearer error message without the confusing "Provide valid app ID" message
return {
"error": {
"message": "Meta API authentication configuration issue. Please check your app credentials.",
"original_error": error_obj.get("message"),
"code": error_obj.get("code")
}
}
auth_manager.invalidate_token()
# Include full details for technical users
full_response = {
"headers": dict(e.response.headers),
"status_code": e.response.status_code,
"url": str(e.response.url),
"reason": getattr(e.response, "reason_phrase", "Unknown reason"),
"request_method": e.request.method,
"request_url": str(e.request.url)
}
# Return a properly structured error object
return {
"error": {
"message": f"HTTP Error: {e.response.status_code}",
"details": error_info,
"full_response": full_response
}
}
except Exception as e:
logger.error(f"Request Error: {str(e)}")
return {"error": {"message": str(e)}}
# Generic wrapper for all Meta API tools
def meta_api_tool(func):
"""Decorator for Meta API tools that handles authentication and error handling."""
@functools.wraps(func)
async def wrapper(*args, **kwargs):
try:
# Log function call
logger.debug(f"Function call: {func.__name__}")
logger.debug(f"Args: {args}")
# Log kwargs without sensitive info
safe_kwargs = {k: ('***TOKEN***' if k == 'access_token' else v) for k, v in kwargs.items()}
logger.debug(f"Kwargs: {safe_kwargs}")
# Log app ID information
app_id = auth_manager.app_id
logger.debug(f"Current app_id: {app_id}")
logger.debug(f"META_APP_ID env var: {os.environ.get('META_APP_ID')}")
# If access_token is not in kwargs or not kwargs['access_token'], try to get it from auth_manager
if 'access_token' not in kwargs or not kwargs['access_token']:
try:
access_token = await auth.get_current_access_token()
if access_token:
kwargs['access_token'] = access_token
logger.debug("Using access token from auth_manager")
else:
logger.warning("No access token available from auth_manager")
# Add more details about why token might be missing
if (auth_manager.app_id == "YOUR_META_APP_ID" or not auth_manager.app_id) and not auth_manager.use_pipeboard:
logger.error("TOKEN VALIDATION FAILED: No valid app_id configured")
logger.error("Please set META_APP_ID environment variable or configure in your code")
elif auth_manager.use_pipeboard:
logger.error("TOKEN VALIDATION FAILED: Pipeboard authentication enabled but no valid token available")
logger.error("Complete authentication via Pipeboard service or check PIPEBOARD_API_TOKEN")
else:
logger.error("Check logs above for detailed token validation failures")
except Exception as e:
logger.error(f"Error getting access token: {str(e)}")
# Add stack trace for better debugging
import traceback
logger.error(f"Stack trace: {traceback.format_exc()}")
# Final validation - if we still don't have a valid token, return authentication required
if 'access_token' not in kwargs or not kwargs['access_token']:
logger.warning("No access token available, authentication needed")
# Add more specific troubleshooting information
auth_url = auth_manager.get_auth_url()
app_id = auth_manager.app_id
using_pipeboard = auth_manager.use_pipeboard
logger.error("TOKEN VALIDATION SUMMARY:")
logger.error(f"- Current app_id: '{app_id}'")
logger.error(f"- Environment META_APP_ID: '{os.environ.get('META_APP_ID', 'Not set')}'")
logger.error(f"- Pipeboard API token configured: {'Yes' if os.environ.get('PIPEBOARD_API_TOKEN') else 'No'}")
logger.error(f"- Using Pipeboard authentication: {'Yes' if using_pipeboard else 'No'}")
# Check for common configuration issues - but only if not using Pipeboard
if not using_pipeboard and (app_id == "YOUR_META_APP_ID" or not app_id):
logger.error("ISSUE DETECTED: No valid Meta App ID configured")
logger.error("ACTION REQUIRED: Set META_APP_ID environment variable with a valid App ID")
elif using_pipeboard:
logger.error("ISSUE DETECTED: Pipeboard authentication configured but no valid token available")
logger.error("ACTION REQUIRED: Complete authentication via Pipeboard service")
# Provide different guidance based on authentication method
if using_pipeboard:
return json.dumps({
"error": {
"message": "Pipeboard Authentication Required",
"details": {
"description": "Your Pipeboard API token is invalid or has expired",
"action_required": "Update your Pipeboard token",
"setup_url": "https://pipeboard.co/setup",
"token_url": "https://pipeboard.co/api-tokens",
"configuration_status": {
"app_id_configured": bool(app_id) and app_id != "YOUR_META_APP_ID",
"pipeboard_enabled": True,
},
"troubleshooting": "Go to https://pipeboard.co/setup to verify your account setup, then visit https://pipeboard.co/api-tokens to obtain a new API token",
"setup_link": "[Verify your Pipeboard account setup](https://pipeboard.co/setup)",
"token_link": "[Get a new Pipeboard API token](https://pipeboard.co/api-tokens)"
}
}
}, indent=2)
else:
return json.dumps({
"error": {
"message": "Authentication Required",
"details": {
"description": "You need to authenticate with the Meta API before using this tool",
"action_required": "Please authenticate first",
"auth_url": auth_url,
"configuration_status": {
"app_id_configured": bool(app_id) and app_id != "YOUR_META_APP_ID",
"pipeboard_enabled": False,
},
"troubleshooting": "Check logs for TOKEN VALIDATION FAILED messages",
"markdown_link": f"[Click here to authenticate with Meta Ads API]({auth_url})"
}
}
}, indent=2)
# Call the original function
result = await func(*args, **kwargs)
# If the result is a string (JSON), try to parse it to check for errors
if isinstance(result, str):
try:
result_dict = json.loads(result)
if "error" in result_dict:
logger.error(f"Error in API response: {result_dict['error']}")
# If this is an app ID error, log more details
if isinstance(result_dict.get("details", {}).get("error", {}), dict):
error_obj = result_dict["details"]["error"]
if error_obj.get("code") == 200 and "Provide valid app ID" in error_obj.get("message", ""):
logger.error("Meta API authentication configuration issue")
logger.error(f"Current app_id: {app_id}")
# Replace the confusing error with a more user-friendly one
return json.dumps({
"error": {
"message": "Meta API Configuration Issue",
"details": {
"description": "Your Meta API app is not properly configured",
"action_required": "Check your META_APP_ID environment variable",
"current_app_id": app_id,
"original_error": error_obj.get("message")
}
}
}, indent=2)
except Exception:
# Not JSON or other parsing error, wrap it in a dictionary
return json.dumps({"data": result}, indent=2)
# If result is already a dictionary, ensure it's properly serialized
if isinstance(result, dict):
return json.dumps(result, indent=2)
return result
except Exception as e:
logger.error(f"Error in {func.__name__}: {str(e)}")
return json.dumps({"error": str(e)}, indent=2)
return wrapper
```
--------------------------------------------------------------------------------
/tests/test_get_account_pages.py:
--------------------------------------------------------------------------------
```python
"""
Tests for the updated get_account_pages function with multi-approach strategy.
"""
import pytest
import json
from unittest.mock import AsyncMock, patch
from meta_ads_mcp.core.ads import get_account_pages
class TestGetAccountPages:
"""Test the updated get_account_pages function with comprehensive multi-approach testing."""
@pytest.mark.asyncio
async def test_get_account_pages_multi_approach_success(self):
"""Test successful page discovery using multiple approaches."""
# Mock data for different endpoints
mock_user_pages = {
"data": [
{
"id": "111111111",
"name": "Personal Page",
"category": "Personal Blog",
"fan_count": 100
}
]
}
mock_client_pages = {
"data": [
{
"id": "222222222",
"name": "Client Page",
"category": "Business",
"fan_count": 500
}
]
}
mock_adcreatives = {
"data": [
{
"id": "creative_123",
"object_story_spec": {"page_id": "333333333"}
}
]
}
# Mock page details for discovered IDs
mock_page_details = {
"111111111": {
"id": "111111111",
"name": "Personal Page",
"username": "personalpage",
"category": "Personal Blog",
"fan_count": 100,
"link": "https://facebook.com/personalpage",
"verification_status": "not_verified"
},
"222222222": {
"id": "222222222",
"name": "Client Page",
"username": "clientpage",
"category": "Business",
"fan_count": 500,
"link": "https://facebook.com/clientpage",
"verification_status": "verified"
},
"333333333": {
"id": "333333333",
"name": "Creative Page",
"username": "creativepage",
"category": "Creative",
"fan_count": 1000,
"link": "https://facebook.com/creativepage",
"verification_status": "not_verified"
}
}
with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth, \
patch('meta_ads_mcp.core.ads.make_api_request') as mock_api:
mock_auth.return_value = "test_access_token"
# Mock API calls in sequence for different approaches
def mock_api_side_effect(endpoint, access_token, params):
if endpoint == "me/accounts":
return mock_user_pages
elif endpoint == "3182643988557192/owned_pages":
return {"data": []} # No business pages
elif endpoint == "act_3182643988557192/client_pages":
return mock_client_pages
elif endpoint == "act_3182643988557192/adcreatives":
return mock_adcreatives
elif endpoint == "act_3182643988557192/ads":
return {"data": []} # No ads
elif endpoint == "act_3182643988557192/promoted_objects":
return {"data": []} # No promoted objects
elif endpoint == "act_3182643988557192/campaigns":
return {"data": []} # No campaigns
elif endpoint in mock_page_details:
return mock_page_details[endpoint]
else:
return {"data": []}
mock_api.side_effect = mock_api_side_effect
# Call the function
result = await get_account_pages(account_id="act_3182643988557192")
result_data = json.loads(result)
# Verify the structure and content
assert "data" in result_data
assert "total_pages_found" in result_data
# Should find 3 unique pages
assert result_data["total_pages_found"] == 3
assert len(result_data["data"]) == 3
# Verify that we found valid page data
assert all("id" in page for page in result_data["data"])
# Verify the page names are correct
page_names = [page.get("name") for page in result_data["data"]]
assert "Personal Page" in page_names
assert "Client Page" in page_names
assert "Creative Page" in page_names
# Verify each page has basic required fields
for page in result_data["data"]:
assert "id" in page
assert "name" in page
@pytest.mark.asyncio
async def test_get_account_pages_me_special_case(self):
"""Test the special case when account_id is 'me'."""
mock_user_pages = {
"data": [
{
"id": "444444444",
"name": "My Personal Page",
"category": "Personal",
"fan_count": 50
}
]
}
with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth, \
patch('meta_ads_mcp.core.ads.make_api_request') as mock_api:
mock_auth.return_value = "test_access_token"
mock_api.return_value = mock_user_pages
result = await get_account_pages(account_id="me")
result_data = json.loads(result)
# Verify the call was made to me/accounts
mock_api.assert_called_once_with(
"me/accounts",
"test_access_token",
{"fields": "id,name,username,category,fan_count,link,verification_status,picture"}
)
# Verify response structure
assert "data" in result_data
assert len(result_data["data"]) == 1
assert result_data["data"][0]["id"] == "444444444"
@pytest.mark.asyncio
async def test_get_account_pages_tracking_specs_discovery(self):
"""Test page discovery from tracking specs (most reliable method)."""
mock_ads_data = {
"data": [
{
"id": "ad_123",
"tracking_specs": [
{
"page": ["555555555", "666666666"]
}
]
}
]
}
mock_page_details = {
"id": "555555555",
"name": "Tracking Page",
"username": "trackingpage",
"category": "Business"
}
with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth, \
patch('meta_ads_mcp.core.ads.make_api_request') as mock_api:
mock_auth.return_value = "test_access_token"
def mock_api_side_effect(endpoint, access_token, params):
if "tracking_specs" in params.get("fields", ""):
return mock_ads_data
elif endpoint == "555555555":
return mock_page_details
elif endpoint == "666666666":
return {"error": "Page not accessible"}
else:
return {"data": []}
mock_api.side_effect = mock_api_side_effect
result = await get_account_pages(account_id="act_123456789")
result_data = json.loads(result)
# Should find pages from tracking specs
assert result_data["total_pages_found"] == 2
# Check that one page has details and one has error
pages = result_data["data"]
assert len(pages) == 2
# One should have full details, one should have error
page_with_details = next((p for p in pages if "error" not in p), None)
page_with_error = next((p for p in pages if "error" in p), None)
assert page_with_details is not None
assert page_with_error is not None
assert page_with_details["name"] == "Tracking Page"
assert "not accessible" in page_with_error["error"]
@pytest.mark.asyncio
async def test_get_account_pages_no_pages_found(self):
"""Test when no pages are found through any approach."""
with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth, \
patch('meta_ads_mcp.core.ads.make_api_request') as mock_api:
mock_auth.return_value = "test_access_token"
mock_api.return_value = {"data": []} # All endpoints return empty
result = await get_account_pages(account_id="act_123456789")
result_data = json.loads(result)
# Should return the fallback message
assert "data" in result_data
assert len(result_data["data"]) == 0
assert "message" in result_data
assert "No pages found" in result_data["message"]
assert "suggestion" in result_data
@pytest.mark.asyncio
async def test_get_account_pages_error_handling(self):
"""Test error handling when API calls fail."""
with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth, \
patch('meta_ads_mcp.core.ads.make_api_request') as mock_api:
mock_auth.return_value = "test_access_token"
mock_api.side_effect = Exception("API Error")
result = await get_account_pages(account_id="act_123456789")
result_data = json.loads(result)
# Individual API errors are caught and logged, function returns "no pages found"
assert "data" in result_data
assert len(result_data["data"]) == 0
assert "message" in result_data
assert "No pages found" in result_data["message"]
# But debug info should show the errors
if "debug" in result_data:
debug = result_data["debug"]
assert "errors" in debug
assert len(debug["errors"]) > 0
# Should have multiple API errors logged
assert any("API Error" in error for error in debug["errors"])
@pytest.mark.asyncio
async def test_get_account_pages_no_account_id(self):
"""Test error when no account ID is provided."""
result = await get_account_pages(account_id=None)
result_data = json.loads(result)
# The @meta_api_tool decorator handles authentication before function logic
# So it returns an authentication error instead of the simple account ID error
# Error responses may be wrapped in a "data" field (MCP format)
# Check for direct error format
if "error" in result_data or "message" in result_data:
if "message" in result_data and "Authentication Required" in result_data["message"]:
# MCP decorator returns authentication error - this is expected
assert True # This is the expected behavior
return
elif "error" in result_data and "No account ID provided" in result_data["error"]:
# Direct function call might return the account ID error
assert True # This is also valid
return
# Check for wrapped error format (MCP response format)
if "data" in result_data:
try:
error_data = json.loads(result_data["data"])
if "error" in error_data and "No account ID provided" in error_data["error"]:
assert True # Wrapped error format
return
except (json.JSONDecodeError, TypeError):
pass
# Fallback: Check if the response contains any indication of missing account ID
result_str = str(result_data)
assert "No account ID provided" in result_str or "Authentication Required" in result_str
@pytest.mark.asyncio
async def test_get_account_pages_account_id_validation_direct(self):
"""Test account ID validation by directly testing the function logic."""
# Import the function implementation directly to bypass decorators
from meta_ads_mcp.core.ads import get_account_pages
# Mock the function to bypass decorator authentication
with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth:
mock_auth.return_value = "test_token"
# Call with empty string (should be treated as None)
result = await get_account_pages(account_id="")
result_data = json.loads(result)
# Response might be wrapped in MCP format
if "data" in result_data and isinstance(result_data["data"], str):
# Parse the nested JSON response
inner_data = json.loads(result_data["data"])
assert "error" in inner_data
assert "No account ID provided" in inner_data["error"]
else:
# Direct response format
assert "error" in result_data or "message" in result_data
result_str = str(result_data)
assert "No account ID provided" in result_str or "Authentication Required" in result_str
@pytest.mark.asyncio
async def test_get_account_pages_multiple_sources(self):
"""Test that pages from multiple sources are properly collected."""
# Mock different results for different approaches
mock_responses = {
"me/accounts": {"data": [{"id": "111111111"}]},
"123456789/owned_pages": {"data": []},
"act_123456789/client_pages": {"data": [{"id": "222222222"}]},
"act_123456789/adcreatives": {"data": []},
"act_123456789/ads": {"data": []},
"act_123456789/promoted_objects": {"data": []},
"act_123456789/campaigns": {"data": []},
"111111111": {"id": "111111111", "name": "Page 1"},
"222222222": {"id": "222222222", "name": "Page 2"}
}
with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth, \
patch('meta_ads_mcp.core.ads.make_api_request') as mock_api:
mock_auth.return_value = "test_access_token"
def mock_api_side_effect(endpoint, access_token, params):
return mock_responses.get(endpoint, {"data": []})
mock_api.side_effect = mock_api_side_effect
result = await get_account_pages(account_id="act_123456789")
result_data = json.loads(result)
# Verify basic response structure
assert "data" in result_data
assert "total_pages_found" in result_data
assert result_data["total_pages_found"] == 2
assert len(result_data["data"]) == 2
# Verify pages have correct names
page_names = [page.get("name") for page in result_data["data"]]
assert "Page 1" in page_names
assert "Page 2" in page_names
@pytest.mark.asyncio
async def test_get_account_pages_act_prefix_handling(self):
"""Test that account IDs without 'act_' prefix are handled correctly."""
with patch('meta_ads_mcp.core.auth.get_current_access_token', new_callable=AsyncMock) as mock_auth, \
patch('meta_ads_mcp.core.ads.make_api_request') as mock_api:
mock_auth.return_value = "test_access_token"
mock_api.return_value = {"data": []}
# Test with account ID without 'act_' prefix
result = await get_account_pages(account_id="123456789")
# Check that API calls were made with 'act_' prefix added
calls = mock_api.call_args_list
# Should find calls with 'act_123456789' in the endpoint
act_calls = [call for call in calls if 'act_123456789' in str(call)]
assert len(act_calls) > 0, "Should have made calls with 'act_' prefix"
# Should also have made calls with raw account ID for business endpoints
business_calls = [call for call in calls if '123456789/owned_pages' in str(call)]
assert len(business_calls) > 0, "Should have made calls to business endpoints"
if __name__ == "__main__":
pytest.main([__file__])
```
--------------------------------------------------------------------------------
/tests/test_budget_update.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Unit Tests for Budget Update Functionality
This test suite validates the budget update parameter implementation for the
update_adset function in meta_ads_mcp/core/adsets.py.
Test cases cover:
- Budget update success scenarios
- Budget validation (negative, zero, too high values)
- Budget update with other parameters
- Error handling and permissions
"""
import pytest
import json
import asyncio
from unittest.mock import AsyncMock, patch, MagicMock
from typing import Dict, Any, List
# Import the function to test
from meta_ads_mcp.core.adsets import update_adset
class TestBudgetUpdateFunctionality:
"""Test suite for budget update functionality"""
@pytest.fixture
def mock_api_request(self):
"""Mock for the make_api_request function"""
with patch('meta_ads_mcp.core.adsets.make_api_request') as mock:
mock.return_value = {
"id": "test_adset_id",
"daily_budget": "5000",
"status": "ACTIVE"
}
yield mock
@pytest.fixture
def mock_auth_manager(self):
"""Mock for the authentication manager"""
with patch('meta_ads_mcp.core.api.auth_manager') as mock, \
patch('meta_ads_mcp.core.auth.get_current_access_token') as mock_get_token:
# Mock a valid access token
mock.get_current_access_token.return_value = "test_access_token"
mock.is_token_valid.return_value = True
mock.app_id = "test_app_id"
mock_get_token.return_value = "test_access_token"
yield mock
@pytest.fixture
def valid_adset_id(self):
"""Valid ad set ID for testing"""
return "123456789"
@pytest.fixture
def valid_daily_budget(self):
"""Valid daily budget amount in cents"""
return "5000" # $50.00
@pytest.fixture
def valid_lifetime_budget(self):
"""Valid lifetime budget amount in cents"""
return "50000" # $500.00
@pytest.mark.asyncio
async def test_budget_update_success(self, mock_api_request, mock_auth_manager, valid_adset_id, valid_daily_budget):
"""Test successful budget update"""
result = await update_adset(
adset_id=valid_adset_id,
daily_budget=valid_daily_budget
)
# Parse the result
result_data = json.loads(result)
# Verify the API was called with correct parameters
mock_api_request.assert_called_once()
call_args = mock_api_request.call_args
# Check that the endpoint is correct (first argument)
assert call_args[0][0] == valid_adset_id
# Check that daily_budget was included in parameters (third argument)
params = call_args[0][2] # Third positional argument is params
assert 'daily_budget' in params
assert params['daily_budget'] == valid_daily_budget
# Verify the response structure
assert 'id' in result_data
assert result_data['id'] == "test_adset_id"
@pytest.mark.asyncio
async def test_lifetime_budget_update_success(self, mock_api_request, mock_auth_manager, valid_adset_id, valid_lifetime_budget):
"""Test successful lifetime budget update"""
result = await update_adset(
adset_id=valid_adset_id,
lifetime_budget=valid_lifetime_budget
)
# Parse the result
result_data = json.loads(result)
# Verify the API was called with correct parameters
mock_api_request.assert_called_once()
call_args = mock_api_request.call_args
# Check that lifetime_budget was included in parameters
params = call_args[0][2] # Third positional argument is params
assert 'lifetime_budget' in params
assert params['lifetime_budget'] == valid_lifetime_budget
# Verify the response structure
assert 'id' in result_data
@pytest.mark.asyncio
async def test_both_budget_types_update(self, mock_api_request, mock_auth_manager, valid_adset_id, valid_daily_budget, valid_lifetime_budget):
"""Test updating both daily and lifetime budget simultaneously"""
result = await update_adset(
adset_id=valid_adset_id,
daily_budget=valid_daily_budget,
lifetime_budget=valid_lifetime_budget
)
# Parse the result
result_data = json.loads(result)
# Verify the API was called with correct parameters
mock_api_request.assert_called_once()
call_args = mock_api_request.call_args
# Check that both budget parameters were included
params = call_args[0][2] # Third positional argument is params
assert 'daily_budget' in params
assert 'lifetime_budget' in params
assert params['daily_budget'] == valid_daily_budget
assert params['lifetime_budget'] == valid_lifetime_budget
# Verify the response structure
assert 'id' in result_data
@pytest.mark.asyncio
async def test_budget_update_with_other_parameters(self, mock_api_request, mock_auth_manager, valid_adset_id, valid_daily_budget):
"""Test budget update combined with other parameters"""
result = await update_adset(
adset_id=valid_adset_id,
daily_budget=valid_daily_budget,
status="PAUSED",
bid_amount=1000,
bid_strategy="LOWEST_COST_WITH_BID_CAP"
)
# Parse the result
result_data = json.loads(result)
# Verify the API was called with correct parameters
mock_api_request.assert_called_once()
call_args = mock_api_request.call_args
# Check that all parameters were included
params = call_args[0][2] # Third positional argument is params
assert 'daily_budget' in params
assert 'status' in params
assert 'bid_amount' in params
assert 'bid_strategy' in params
assert params['daily_budget'] == valid_daily_budget
assert params['status'] == "PAUSED"
assert params['bid_amount'] == "1000"
assert params['bid_strategy'] == "LOWEST_COST_WITH_BID_CAP"
# Verify the response structure
assert 'id' in result_data
@pytest.mark.asyncio
async def test_budget_update_with_numeric_values(self, mock_api_request, mock_auth_manager, valid_adset_id):
"""Test budget update with numeric values (should be converted to strings)"""
result = await update_adset(
adset_id=valid_adset_id,
daily_budget=5000, # Integer
lifetime_budget=50000 # Integer
)
# Parse the result
result_data = json.loads(result)
# Verify the API was called with correct parameters
mock_api_request.assert_called_once()
call_args = mock_api_request.call_args
# Check that numeric values were converted to strings
params = call_args[0][2] # Third positional argument is params
assert 'daily_budget' in params
assert 'lifetime_budget' in params
assert params['daily_budget'] == "5000"
assert params['lifetime_budget'] == "50000"
# Verify the response structure
assert 'id' in result_data
@pytest.mark.asyncio
async def test_budget_update_with_zero_budget(self, mock_api_request, mock_auth_manager, valid_adset_id):
"""Test budget update with zero budget (should be allowed)"""
result = await update_adset(
adset_id=valid_adset_id,
daily_budget="0"
)
# Parse the result
result_data = json.loads(result)
# Verify the API was called with zero budget
mock_api_request.assert_called_once()
call_args = mock_api_request.call_args
params = call_args[0][2] # Third positional argument is params
assert 'daily_budget' in params
assert params['daily_budget'] == "0"
# Verify the response structure
assert 'id' in result_data
@pytest.mark.asyncio
async def test_budget_update_with_high_budget(self, mock_api_request, mock_auth_manager, valid_adset_id):
"""Test budget update with high budget values"""
high_budget = "1000000" # $10,000
result = await update_adset(
adset_id=valid_adset_id,
daily_budget=high_budget
)
# Parse the result
result_data = json.loads(result)
# Verify the API was called with high budget
mock_api_request.assert_called_once()
call_args = mock_api_request.call_args
params = call_args[0][2] # Third positional argument is params
assert 'daily_budget' in params
assert params['daily_budget'] == high_budget
# Verify the response structure
assert 'id' in result_data
@pytest.mark.asyncio
async def test_budget_update_api_error_handling(self, mock_api_request, mock_auth_manager, valid_adset_id, valid_daily_budget):
"""Test error handling when API call fails"""
# Mock API to raise an exception
mock_api_request.side_effect = Exception("API Error: Invalid budget amount")
result = await update_adset(
adset_id=valid_adset_id,
daily_budget=valid_daily_budget
)
# Parse the result
result_data = json.loads(result)
# Verify error response structure
# The error is wrapped in a 'data' field as JSON string
error_data = json.loads(result_data['data'])
assert 'error' in error_data
assert 'details' in error_data
assert 'params_sent' in error_data
assert "Failed to update ad set" in error_data['error']
assert "API Error: Invalid budget amount" in error_data['details']
assert valid_adset_id in error_data['error']
# Verify the parameters that were sent
assert error_data['params_sent']['daily_budget'] == valid_daily_budget
@pytest.mark.asyncio
async def test_budget_update_with_invalid_adset_id(self, mock_api_request, mock_auth_manager):
"""Test budget update with invalid ad set ID"""
result = await update_adset(
adset_id="", # Empty ad set ID
daily_budget="5000"
)
# Parse the result
result_data = json.loads(result)
# Verify error response
error_data = json.loads(result_data['data'])
assert 'error' in error_data
assert "No ad set ID provided" in error_data['error']
# Verify API was not called
mock_api_request.assert_not_called()
@pytest.mark.asyncio
async def test_budget_update_with_no_parameters(self, mock_api_request, mock_auth_manager, valid_adset_id):
"""Test budget update with no parameters provided"""
result = await update_adset(
adset_id=valid_adset_id
# No parameters provided
)
# Parse the result
result_data = json.loads(result)
# Verify error response
error_data = json.loads(result_data['data'])
assert 'error' in error_data
assert "No update parameters provided" in error_data['error']
# Verify API was not called
mock_api_request.assert_not_called()
@pytest.mark.asyncio
async def test_budget_update_with_negative_budget(self, mock_api_request, mock_auth_manager, valid_adset_id):
"""Test budget update with negative budget (should be handled by API)"""
# Mock API to handle negative budget error
mock_api_request.side_effect = Exception("API Error: Budget amount must be positive")
result = await update_adset(
adset_id=valid_adset_id,
daily_budget="-1000"
)
# Parse the result
result_data = json.loads(result)
# Verify error response structure
error_data = json.loads(result_data['data'])
assert 'error' in error_data
assert 'details' in error_data
assert "API Error: Budget amount must be positive" in error_data['details']
@pytest.mark.asyncio
async def test_budget_update_with_non_numeric_string(self, mock_api_request, mock_auth_manager, valid_adset_id):
"""Test budget update with non-numeric string (should be handled by API)"""
# Mock API to handle non-numeric budget error
mock_api_request.side_effect = Exception("API Error: Invalid budget format")
result = await update_adset(
adset_id=valid_adset_id,
daily_budget="invalid_budget"
)
# Parse the result
result_data = json.loads(result)
# Verify error response structure
error_data = json.loads(result_data['data'])
assert 'error' in error_data
assert 'details' in error_data
assert "API Error: Invalid budget format" in error_data['details']
@pytest.mark.asyncio
async def test_budget_update_with_permission_error(self, mock_api_request, mock_auth_manager, valid_adset_id, valid_daily_budget):
"""Test budget update with permission error"""
# Mock API to raise permission error
mock_api_request.side_effect = Exception("API Error: (#100) Insufficient permissions")
result = await update_adset(
adset_id=valid_adset_id,
daily_budget=valid_daily_budget
)
# Parse the result
result_data = json.loads(result)
# Verify error response structure
error_data = json.loads(result_data['data'])
assert 'error' in error_data
assert 'details' in error_data
assert "Insufficient permissions" in error_data['details']
@pytest.mark.asyncio
async def test_budget_update_with_targeting(self, mock_api_request, mock_auth_manager, valid_adset_id, valid_daily_budget):
"""Test budget update combined with targeting update"""
targeting = {
"age_min": 25,
"age_max": 45,
"geo_locations": {"countries": ["US", "CA"]}
}
result = await update_adset(
adset_id=valid_adset_id,
daily_budget=valid_daily_budget,
targeting=targeting
)
# Parse the result
result_data = json.loads(result)
# Verify the API was called with correct parameters
mock_api_request.assert_called_once()
call_args = mock_api_request.call_args
# Check that both budget and targeting were included
params = call_args[0][2] # Third positional argument is params
assert 'daily_budget' in params
assert 'targeting' in params
assert params['daily_budget'] == valid_daily_budget
# Verify targeting was properly JSON encoded
targeting_json = json.loads(params['targeting'])
assert targeting_json['age_min'] == 25
assert targeting_json['age_max'] == 45
assert targeting_json['geo_locations']['countries'] == ["US", "CA"]
# Verify the response structure
assert 'id' in result_data
class TestBudgetUpdateIntegration:
"""Integration tests for budget update functionality"""
@pytest.mark.asyncio
async def test_budget_update_workflow(self):
"""Test complete budget update workflow"""
# This test would require a real ad set ID and valid API credentials
# For now, we'll test the function signature and parameter handling
# Test that the function accepts the new parameters
with patch('meta_ads_mcp.core.adsets.make_api_request') as mock_api, \
patch('meta_ads_mcp.core.api.auth_manager') as mock_auth, \
patch('meta_ads_mcp.core.auth.get_current_access_token') as mock_get_token:
mock_api.return_value = {"id": "test_id", "daily_budget": "5000"}
mock_auth.get_current_access_token.return_value = "test_access_token"
mock_auth.is_token_valid.return_value = True
mock_auth.app_id = "test_app_id"
mock_get_token.return_value = "test_access_token"
result = await update_adset(
adset_id="test_adset_id",
daily_budget="5000",
lifetime_budget="50000"
)
# Verify the function executed without errors
assert result is not None
# The result might already be a dict or a JSON string
if isinstance(result, str):
result_data = json.loads(result)
else:
result_data = result
assert 'id' in result_data
if __name__ == "__main__":
pytest.main([__file__, "-v"])
```
--------------------------------------------------------------------------------
/meta_ads_mcp/core/server.py:
--------------------------------------------------------------------------------
```python
"""MCP server configuration for Meta Ads API."""
from mcp.server.fastmcp import FastMCP
import argparse
import os
import sys
import webbrowser
import json
from typing import Dict, Any, Optional
from .auth import login as login_auth
from .resources import list_resources, get_resource
from .utils import logger
from .pipeboard_auth import pipeboard_auth_manager
import time
# Initialize FastMCP server
mcp_server = FastMCP("meta-ads")
# Register resource URIs
mcp_server.resource(uri="meta-ads://resources")(list_resources)
mcp_server.resource(uri="meta-ads://images/{resource_id}")(get_resource)
class StreamableHTTPHandler:
"""Handles stateless Streamable HTTP requests for Meta Ads MCP"""
def __init__(self):
"""Initialize handler with no session storage - all auth per request"""
logger.debug("StreamableHTTPHandler initialized for stateless operation")
def handle_request(self, request_headers: Dict[str, str], request_body: Dict[str, Any]) -> Dict[str, Any]:
"""Handle individual request with authentication
Args:
request_headers: HTTP request headers
request_body: JSON-RPC request body
Returns:
JSON response with auth status and any tool results
"""
try:
# Extract authentication configuration from headers
auth_config = self.get_auth_config_from_headers(request_headers)
logger.debug(f"Auth method detected: {auth_config['auth_method']}")
# Handle based on auth method
if auth_config['auth_method'] == 'bearer':
return self.handle_bearer_request(auth_config, request_body)
elif auth_config['auth_method'] == 'custom_meta_app':
return self.handle_custom_app_request(auth_config, request_body)
else:
return self.handle_unauthenticated_request(request_body)
except Exception as e:
logger.error(f"Error handling request: {e}")
return {
'jsonrpc': '2.0',
'error': {
'code': -32603,
'message': 'Internal error',
'data': str(e)
},
'id': request_body.get('id')
}
def get_auth_config_from_headers(self, request_headers: Dict[str, str]) -> Dict[str, Any]:
"""Extract authentication configuration from HTTP headers
Args:
request_headers: HTTP request headers
Returns:
Dictionary with auth method and relevant credentials
"""
# Security validation - only allow safe headers
ALLOWED_VIA_HEADERS = {
'pipeboard_api_token': True, # ✅ Primary method - simple and secure
'meta_app_id': True, # ✅ Fallback only - triggers OAuth complexity
'meta_app_secret': False, # ❌ Server environment only
'meta_access_token': False, # ❌ Use proper auth flows instead
}
# PRIMARY: Check for Bearer token in Authorization header (handles 90%+ of cases)
auth_header = request_headers.get('Authorization') or request_headers.get('authorization')
if auth_header and auth_header.lower().startswith('bearer '):
token = auth_header[7:].strip()
logger.info("Bearer authentication detected (primary path)")
return {
'auth_method': 'bearer',
'bearer_token': token,
'requires_oauth': False # Simple token-based auth
}
# FALLBACK: Custom Meta app (minority of users)
meta_app_id = request_headers.get('X-META-APP-ID') or request_headers.get('x-meta-app-id')
if meta_app_id:
logger.debug("Custom Meta app authentication detected (fallback path)")
return {
'auth_method': 'custom_meta_app',
'meta_app_id': meta_app_id,
'requires_oauth': True # Complex OAuth flow required
}
# No authentication provided
logger.warning("No authentication method detected in headers")
return {
'auth_method': 'none',
'requires_oauth': False
}
def handle_bearer_request(self, auth_config: Dict[str, Any], request_body: Dict[str, Any]) -> Dict[str, Any]:
"""Handle request with Bearer token (primary path)
Args:
auth_config: Authentication configuration from headers
request_body: JSON-RPC request body
Returns:
JSON response ready for tool execution
"""
logger.debug("Processing Bearer authenticated request")
token = auth_config['bearer_token']
# Token is ready to use immediately for API calls
# TODO: In next phases, this will execute the actual tool call
return {
'jsonrpc': '2.0',
'result': {
'status': 'ready',
'auth_method': 'bearer',
'message': 'Authentication successful with Bearer token',
'token_source': 'bearer_header'
},
'id': request_body.get('id')
}
def handle_custom_app_request(self, auth_config: Dict[str, Any], request_body: Dict[str, Any]) -> Dict[str, Any]:
"""Handle request with custom Meta app (fallback path)
Args:
auth_config: Authentication configuration from headers
request_body: JSON-RPC request body
Returns:
JSON response indicating OAuth flow is required
"""
logger.debug("Processing custom Meta app request (OAuth required)")
# This may require OAuth flow initiation
# Each request is independent - no session state
return {
'jsonrpc': '2.0',
'result': {
'status': 'oauth_required',
'auth_method': 'custom_meta_app',
'meta_app_id': auth_config['meta_app_id'],
'message': 'OAuth flow required for custom Meta app authentication',
'next_steps': 'Use get_login_link tool to initiate OAuth flow'
},
'id': request_body.get('id')
}
def handle_unauthenticated_request(self, request_body: Dict[str, Any]) -> Dict[str, Any]:
"""Handle request with no authentication
Args:
request_body: JSON-RPC request body
Returns:
JSON error response requesting authentication
"""
logger.warning("Unauthenticated request received")
return {
'jsonrpc': '2.0',
'error': {
'code': -32600,
'message': 'Authentication required',
'data': {
'supported_methods': [
'Authorization: Bearer <token> (recommended)',
'X-META-APP-ID: Custom Meta app OAuth (advanced users)'
],
'documentation': 'https://github.com/pipeboard-co/meta-ads-mcp'
}
},
'id': request_body.get('id')
}
def login_cli():
"""
Command-line function to authenticate with Meta
"""
logger.info("Starting Meta Ads CLI authentication flow")
print("Starting Meta Ads CLI authentication flow...")
# Call the common login function
login_auth()
def main():
"""Main entry point for the package"""
# Log startup information
logger.info("Meta Ads MCP server starting")
logger.debug(f"Python version: {sys.version}")
logger.debug(f"Args: {sys.argv}")
# Initialize argument parser
parser = argparse.ArgumentParser(
description="Meta Ads MCP Server - Model Context Protocol server for Meta Ads API",
epilog="For more information, see https://github.com/pipeboard-co/meta-ads-mcp"
)
parser.add_argument("--login", action="store_true", help="Authenticate with Meta and store the token")
parser.add_argument("--app-id", type=str, help="Meta App ID (Client ID) for authentication")
parser.add_argument("--version", action="store_true", help="Show the version of the package")
# Transport configuration arguments
parser.add_argument("--transport", type=str, choices=["stdio", "streamable-http"],
default="stdio",
help="Transport method: 'stdio' for MCP clients (default), 'streamable-http' for HTTP API access")
parser.add_argument("--port", type=int, default=8080,
help="Port for Streamable HTTP transport (default: 8080, only used with --transport streamable-http)")
parser.add_argument("--host", type=str, default="localhost",
help="Host for Streamable HTTP transport (default: localhost, only used with --transport streamable-http)")
parser.add_argument("--sse-response", action="store_true",
help="Use SSE response format instead of JSON (default: JSON, only used with --transport streamable-http)")
args = parser.parse_args()
logger.debug(f"Parsed args: login={args.login}, app_id={args.app_id}, version={args.version}")
logger.debug(f"Transport args: transport={args.transport}, port={args.port}, host={args.host}, sse_response={args.sse_response}")
# Validate CLI argument combinations
if args.transport == "stdio" and (args.port != 8080 or args.host != "localhost" or args.sse_response):
logger.warning("HTTP transport arguments (--port, --host, --sse-response) are ignored when using stdio transport")
print("Warning: HTTP transport arguments are ignored when using stdio transport")
# Update app ID if provided as environment variable or command line arg
from .auth import auth_manager, meta_config
# Check environment variable first (early init)
env_app_id = os.environ.get("META_APP_ID")
if env_app_id:
logger.debug(f"Found META_APP_ID in environment: {env_app_id}")
else:
logger.warning("META_APP_ID not found in environment variables")
# Command line takes precedence
if args.app_id:
logger.info(f"Setting app_id from command line: {args.app_id}")
auth_manager.app_id = args.app_id
meta_config.set_app_id(args.app_id)
elif env_app_id:
logger.info(f"Setting app_id from environment: {env_app_id}")
auth_manager.app_id = env_app_id
meta_config.set_app_id(env_app_id)
# Log the final app ID that will be used
logger.info(f"Final app_id from meta_config: {meta_config.get_app_id()}")
logger.info(f"Final app_id from auth_manager: {auth_manager.app_id}")
logger.info(f"ENV META_APP_ID: {os.environ.get('META_APP_ID')}")
# Show version if requested
if args.version:
from meta_ads_mcp import __version__
logger.info(f"Displaying version: {__version__}")
print(f"Meta Ads MCP v{__version__}")
return 0
# Handle login command
if args.login:
login_cli()
return 0
# Check for Pipeboard authentication and token
pipeboard_api_token = os.environ.get("PIPEBOARD_API_TOKEN")
if pipeboard_api_token:
logger.info("Using Pipeboard authentication")
print("✅ Pipeboard authentication enabled")
print(f" API token: {pipeboard_api_token[:8]}...{pipeboard_api_token[-4:]}")
# Check for existing token
token = pipeboard_auth_manager.get_access_token()
if not token:
logger.info("No valid Pipeboard token found. Initiating browser-based authentication flow.")
print("No valid Meta token found. Opening browser for authentication...")
try:
# Initialize the auth flow and get the login URL
auth_data = pipeboard_auth_manager.initiate_auth_flow()
login_url = auth_data.get('loginUrl')
if login_url:
logger.info(f"Opening browser with login URL: {login_url}")
webbrowser.open(login_url)
print("Please authorize the application in your browser.")
print("After authorization, the token will be automatically retrieved.")
print("Waiting for authentication to complete...")
# Poll for token completion
max_attempts = 30 # Try for 30 * 2 = 60 seconds
for attempt in range(max_attempts):
print(f"Waiting for authentication... ({attempt+1}/{max_attempts})")
# Try to get the token again
token = pipeboard_auth_manager.get_access_token(force_refresh=True)
if token:
print("Authentication successful!")
break
time.sleep(2) # Wait 2 seconds between attempts
if not token:
print("Authentication timed out. Starting server anyway.")
print("You may need to restart the server after completing authentication.")
else:
logger.error("No login URL received from Pipeboard API")
print("Error: Could not get authentication URL. Check your API token.")
except Exception as e:
logger.error(f"Error initiating browser-based authentication: {e}")
print(f"Error: Could not start authentication: {e}")
else:
print(f"✅ Valid Pipeboard access token found")
print(f" Token preview: {token[:10]}...{token[-5:]}")
# Transport-specific server initialization and startup
if args.transport == "streamable-http":
logger.info(f"Starting MCP server with Streamable HTTP transport on {args.host}:{args.port}")
logger.info("Mode: Stateless (no session persistence)")
logger.info(f"Response format: {'SSE' if args.sse_response else 'JSON'}")
logger.info("Primary auth method: Bearer Token (recommended)")
logger.info("Fallback auth method: Custom Meta App OAuth (complex setup)")
print(f"Starting Meta Ads MCP server with Streamable HTTP transport")
print(f"Server will listen on {args.host}:{args.port}")
print(f"Response format: {'SSE' if args.sse_response else 'JSON'}")
print("Primary authentication: Bearer Token (via Authorization: Bearer <token> header)")
print("Fallback authentication: Custom Meta App OAuth (via X-META-APP-ID header)")
# Configure the existing server with streamable HTTP settings
mcp_server.settings.host = args.host
mcp_server.settings.port = args.port
mcp_server.settings.stateless_http = True
mcp_server.settings.json_response = not args.sse_response
# Import all tool modules to ensure they are registered
logger.info("Ensuring all tools are registered for HTTP transport")
from . import accounts, campaigns, adsets, ads, insights, authentication
from . import ads_library, budget_schedules, reports, openai_deep_research
# ✅ NEW: Setup HTTP authentication middleware
logger.info("Setting up HTTP authentication middleware")
try:
from .http_auth_integration import setup_fastmcp_http_auth
# Setup the FastMCP HTTP auth integration
setup_fastmcp_http_auth(mcp_server)
logger.info("FastMCP HTTP authentication integration setup successful")
print("✅ FastMCP HTTP authentication integration enabled")
print(" - Bearer tokens via Authorization: Bearer <token> header")
print(" - Direct Meta tokens via X-META-ACCESS-TOKEN header")
except Exception as e:
logger.error(f"Failed to setup FastMCP HTTP authentication integration: {e}")
print(f"⚠️ FastMCP HTTP authentication integration setup failed: {e}")
print(" Server will still start but may not support header-based auth")
# Log final server configuration
logger.info(f"FastMCP server configured with:")
logger.info(f" - Host: {mcp_server.settings.host}")
logger.info(f" - Port: {mcp_server.settings.port}")
logger.info(f" - Stateless HTTP: {mcp_server.settings.stateless_http}")
logger.info(f" - JSON Response: {mcp_server.settings.json_response}")
logger.info(f" - Streamable HTTP Path: {mcp_server.settings.streamable_http_path}")
# Start the FastMCP server with Streamable HTTP transport
try:
logger.info("Starting FastMCP server with Streamable HTTP transport")
print(f"✅ Server configured successfully")
print(f" URL: http://{args.host}:{args.port}{mcp_server.settings.streamable_http_path}/")
print(f" Mode: {'Stateless' if mcp_server.settings.stateless_http else 'Stateful'}")
print(f" Format: {'JSON' if mcp_server.settings.json_response else 'SSE'}")
mcp_server.run(transport="streamable-http")
except Exception as e:
logger.error(f"Error starting Streamable HTTP server: {e}")
print(f"Error: Failed to start Streamable HTTP server: {e}")
return 1
else:
# Default stdio transport
logger.info("Starting MCP server with stdio transport")
mcp_server.run(transport='stdio')
```