#
tokens: 7329/50000 11/11 files
lines: off (toggle) GitHub
raw markdown copy
# Directory Structure

```
├── .gitignore
├── .python-version
├── assets
│   ├── use-backlinks-mcp-on-cursor.png
│   └── use-keyword-mcp-on-cursor.png
├── LICENSE
├── main.py
├── pyproject.toml
├── README_CN.md
├── README.md
├── src
│   └── seo_mcp
│       ├── __init__.py
│       ├── backlinks.py
│       ├── keywords.py
│       ├── logger.py
│       ├── server.py
│       └── traffic.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------

```
3.10
```

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class

# C extensions
*.so

# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# PyInstaller
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
.pytest_cache/

# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# Jupyter Notebook
.ipynb_checkpoints

# VS Code
.vscode/

# PyCharm
.idea/

# Signature cache
signature_cache.json 

temp/

logs/
```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
# SEO MCP

A MCP (Model Control Protocol) SEO tool service based on Ahrefs data. Includes features such as backlink analysis, keyword research, traffic estimation, and more.

[中文](./README_CN.md)

## Overview

This service provides an API to retrieve SEO data from Ahrefs. It handles the entire process, including solving the CAPTCHA, authentication, and data retrieval. The results are cached to improve performance and reduce API costs.

> This MCP service is for educational purposes only. Please do not misuse it. This project is inspired by `@哥飞社群`.

## Features

- 🔍 Backlink Analysis

  - Get detailed backlink data for any domain
  - View domain rating, anchor text, and link attributes
  - Filter educational and government domains

- 🎯 Keyword Research

  - Generate keyword ideas from a seed keyword
  - Get keyword difficulty score
  - View search volume and trends

- 📊 Traffic Analysis

  - Estimate website traffic
  - View traffic history and trends
  - Analyze popular pages and country distribution
  - Track keyword rankings

- 🚀 Performance Optimization

  - Use CapSolver to automatically solve CAPTCHA
  - Response caching

## Installation

### Prerequisites

- Python 3.10 or higher
- CapSolver account and API key ([register here](https://dashboard.capsolver.com/passport/register?inviteCode=1dTH7WQSfHD0))

### Install from PyPI

```bash
pip install seo-mcp
```

Or use `uv`:

```bash
uv pip install seo-mcp
```

### Manual Installation

1. Clone the repository:

   ```bash
   git clone https://github.com/cnych/seo-mcp.git
   cd seo-mcp
   ```

2. Install dependencies:

   ```bash
   pip install -e .
   # Or
   uv pip install -e .
   ```

3. Set the CapSolver API key:

   ```bash
   export CAPSOLVER_API_KEY="your-capsolver-api-key"
   ```

## Usage

### Run the service

You can run the service in the following ways:

#### Use in Cursor IDE

In the Cursor settings, switch to the MCP tab, click the `+Add new global MCP server` button, and then input:

```json
{
  "mcpServers": {
    "SEO MCP": {
      "command": "uvx",
      "args": ["--python", "3.10", "seo-mcp"],
      "env": {
        "CAPSOLVER_API_KEY": "CAP-xxxxxx"
      }
    }
  }
}
```

You can also create a `.cursor/mcp.json` file in the project root directory, with the same content.

### API Reference

The service provides the following MCP tools:

#### `get_backlinks_list(domain: str)`

Get the backlinks of a domain.

**Parameters:**

- `domain` (string): The domain to analyze (e.g. "example.com")

**Returns:**

```json
{
  "overview": {
    "domainRating": 76,
    "backlinks": 1500,
    "refDomains": 300
  },
  "backlinks": [
    {
      "anchor": "Example link",
      "domainRating": 76,
      "title": "Page title",
      "urlFrom": "https://referringsite.com/page",
      "urlTo": "https://example.com/page",
      "edu": false,
      "gov": false
    }
  ]
}
```

#### `keyword_generator(keyword: str, country: str = "us", search_engine: str = "Google")`

Generate keyword ideas.

**Parameters:**

- `keyword` (string): The seed keyword
- `country` (string): Country code (default: "us")
- `search_engine` (string): Search engine (default: "Google")

**Returns:**

```json
[
  {
    "keyword": "Example keyword",
    "volume": 1000,
    "difficulty": 45,
    "cpc": 2.5
  }
]
```

#### `get_traffic(domain_or_url: str, country: str = "None", mode: str = "subdomains")`

Get the traffic estimation.

**Parameters:**

- `domain_or_url` (string): The domain or URL to analyze
- `country` (string): Country filter (default: "None")
- `mode` (string): Analysis mode ("subdomains" or "exact")

**Returns:**

```json
{
  "traffic_history": [...],
  "traffic": {
    "trafficMonthlyAvg": 50000,
    "costMontlyAvg": 25000
  },
  "top_pages": [...],
  "top_countries": [...],
  "top_keywords": [...]
}
```

#### `keyword_difficulty(keyword: str, country: str = "us")`

Get the keyword difficulty score.

**Parameters:**

- `keyword` (string): The keyword to analyze
- `country` (string): Country code (default: "us")

**Returns:**

```json
{
  "difficulty": 45,
  "serp": [...],
  "related": [...]
}
```

## Development

For development:

```bash
git clone https://github.com/cnych/seo-mcp.git
cd seo-mcp
uv sync
```

## How it works

1. The user sends a request through MCP
2. The service uses CapSolver to solve the Cloudflare Turnstile CAPTCHA
3. The service gets the authentication token from Ahrefs
4. The service retrieves the requested SEO data
5. The service processes and returns the formatted results

## Troubleshooting

- **CapSolver API key error**:Check the `CAPSOLVER_API_KEY` environment variable
- **Rate limiting**:Reduce request frequency
- **No results**:The domain may not be indexed by Ahrefs
- **Other issues**:See [GitHub repository](https://github.com/cnych/seo-mcp)

## License

MIT License - See LICENSE file

```

--------------------------------------------------------------------------------
/src/seo_mcp/__init__.py:
--------------------------------------------------------------------------------

```python
"""
SEO MCP - A FastMCP service for retrieving SEO information for any domain using Ahrefs' data.
"""

__version__ = "0.2.4"

```

--------------------------------------------------------------------------------
/main.py:
--------------------------------------------------------------------------------

```python
from seo_mcp.server import main as server_main

def main():
    """Entry point for the backlinks-mcp package"""
    server_main()

if __name__ == "__main__":
    main()
```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
[build-system]
requires = ["pdm-backend>=2.4.0"]
build-backend = "pdm.backend"

[project]
name = "seo-mcp"
version = "0.2.4"
description = "A free SEO tool MCP (Model Control Protocol) service based on Ahrefs data. Includes features such as backlinks, keyword ideas, and more."
readme = "README.md"
authors = [
    {name = "cnych", email = "[email protected]"}
]
license = {text = "MIT"}
classifiers = [
    "Programming Language :: Python :: 3",
    "License :: OSI Approved :: MIT License",
    "Operating System :: OS Independent",
]
requires-python = ">=3.10"
dependencies = [
    "fastmcp>=2.0.0",
    "requests>=2.32.3",
    "pydantic>=2.5.0",
    "pydantic-core>=2.14.0"
]

[project.scripts]
seo-mcp = "seo_mcp.server:main"

[project.urls]
"Homepage" = "https://github.com/cnych/seo-mcp"
"Bug Tracker" = "https://github.com/cnych/seo-mcp/issues"

[tool.setuptools]
packages = ["seo_mcp"]
package-dir = {"" = "src"}

```

--------------------------------------------------------------------------------
/src/seo_mcp/logger.py:
--------------------------------------------------------------------------------

```python
"""
A general logging module providing a unified logging function
"""
import logging
import os
from datetime import datetime


DEBUG = os.environ.get("DEBUG", "False")


def setup_logger(name: str, log_dir: str = "logs", level: int = logging.INFO) -> logging.Logger:
    """
    Setup a logger for the given name
    
    Args:
        name: The name of the logger
        log_dir: The directory to save the log files
        level: The level of the logger
    
    Returns:
        logging.Logger: The configured logger
    """
    if not DEBUG:
        return logging.getLogger(name)
    
    # Create the log directory
    os.makedirs(log_dir, exist_ok=True)
    
    # Create the log file name, format: module_name_YYYYMMDD.log
    log_file = os.path.join(log_dir, f"{name}_{datetime.now().strftime('%Y%m%d')}.log")
    
    # Create the logger
    logger = logging.getLogger(name)
    logger.setLevel(level)
    
    # If the logger already has handlers, don't add a new handler
    if not logger.handlers:
        # Create the file handler
        file_handler = logging.FileHandler(log_file, encoding='utf-8')
        file_handler.setLevel(level)
        
        # Create the console handler
        console_handler = logging.StreamHandler()
        console_handler.setLevel(level)
        
        # Create the formatter
        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        file_handler.setFormatter(formatter)
        console_handler.setFormatter(formatter)
        
        # Add the handlers to the logger
        logger.addHandler(file_handler)
        logger.addHandler(console_handler)
    
    return logger
```

--------------------------------------------------------------------------------
/src/seo_mcp/traffic.py:
--------------------------------------------------------------------------------

```python
"""
Check the estimated search traffic for any website. Try Ahrefs' free traffic checker.
"""

from typing import Optional, Dict, Any, Literal, List
import requests
import json


def check_traffic(token: str, domain_or_url: str, mode: Literal["subdomains", "exact"] = "subdomains", country: str = "None") -> Optional[Dict[str, Any]]:
    """
    Check the estimated search traffic for any website.
    
    Args:
        domain_or_url (str): The domain or URL to query
        token (str): Verification token
        mode (str): Query mode, default is "subdomains"
        country (str): Country, default is "None"
    
    Returns:
        Optional[Dict[str, Any]]: Dictionary containing traffic data, returns None if request fails
    """
    if not token:
        return None
    
    url = "https://ahrefs.com/v4/stGetFreeTrafficOverview"
    
    # 将参数转换为JSON字符串,然后作为单个input参数传递
    params = {
        "input": json.dumps({
            "captcha": token,
            "country": country,
            "protocol": "None",
            "mode": mode,
            "url": domain_or_url
        })
    }
    
    headers = {
        "accept": "*/*",
        "content-type": "application/json",
        "referer": f"https://ahrefs.com/traffic-checker/?input={domain_or_url}&mode={mode}"
    }

    try:
        response = requests.get(url, params=params, headers=headers)
        if response.status_code != 200:
            return None
        
        data: Optional[List[Any]] = response.json()

        # 检查响应数据格式
        if not isinstance(data, list) or len(data) < 2 or data[0] != "Ok":
            return None
        
        # 提取有效数据
        traffic_data = data[1]
        
        # 格式化返回结果
        result = {
            "traffic_history": traffic_data.get("traffic_history", []),
            "traffic": {
                "trafficMonthlyAvg": traffic_data.get("traffic", {}).get("trafficMonthlyAvg", 0),
                "costMontlyAvg": traffic_data.get("traffic", {}).get("costMontlyAvg", 0)
            },
            "top_pages": traffic_data.get("top_pages", []),
            "top_countries": traffic_data.get("top_countries", []),
            "top_keywords": traffic_data.get("top_keywords", [])
        }
        
        return result
    except Exception as e:
        return None
```

--------------------------------------------------------------------------------
/src/seo_mcp/server.py:
--------------------------------------------------------------------------------

```python
"""
SEO MCP Server: A free SEO tool MCP (Model Control Protocol) service based on Ahrefs data. Includes features such as backlinks, keyword ideas, and more.
"""
import requests
import time
import os
import urllib.parse
from typing import Dict, List, Optional, Any, Literal

from fastmcp import FastMCP

from seo_mcp.backlinks import get_backlinks, load_signature_from_cache, get_signature_and_overview
from seo_mcp.keywords import get_keyword_ideas, get_keyword_difficulty
from seo_mcp.traffic import check_traffic


mcp = FastMCP("SEO MCP")

# CapSolver website: https://dashboard.capsolver.com/passport/register?inviteCode=1dTH7WQSfHD0
# Get API Key from environment variable - must be set for production use
api_key = os.environ.get("CAPSOLVER_API_KEY")


def get_capsolver_token(site_url: str) -> Optional[str]:
    """
    Use CapSolver to solve the captcha and get a token
    
    Args:
        site_url: Site URL to query
        
    Returns:
        Verification token or None if failed
    """
    if not api_key:
        return None
    
    payload = {
        "clientKey": api_key,
        "task": {
            "type": 'AntiTurnstileTaskProxyLess',
            "websiteKey": "0x4AAAAAAAAzi9ITzSN9xKMi",  # site key of your target site: ahrefs.com,
            "websiteURL": site_url,
            "metadata": {
                "action": ""  # optional
            }
        }
    }
    res = requests.post("https://api.capsolver.com/createTask", json=payload)
    resp = res.json()
    task_id = resp.get("taskId")
    if not task_id:
        return None
 
    while True:
        time.sleep(1)  # delay
        payload = {"clientKey": api_key, "taskId": task_id}
        res = requests.post("https://api.capsolver.com/getTaskResult", json=payload)
        resp = res.json()
        status = resp.get("status")
        if status == "ready":
            token = resp.get("solution", {}).get('token')
            return token
        if status == "failed" or resp.get("errorId"):
            return None


@mcp.tool()
def get_backlinks_list(domain: str) -> Optional[Dict[str, Any]]:
    """
    Get backlinks list for the specified domain
    Args:
        domain (str): The domain to query
    Returns:
        List of backlinks for the domain, containing title, URL, domain rating, etc.
    """
    # Try to get signature from cache
    signature, valid_until, overview_data = load_signature_from_cache(domain)
    
    # If no valid signature in cache, get a new one
    if not signature or not valid_until:
        # Step 1: Get token
        site_url = f"https://ahrefs.com/backlink-checker/?input={domain}&mode=subdomains"
        token = get_capsolver_token(site_url)
        if not token:
            raise Exception(f"Failed to get verification token for domain: {domain}")
        
        # Step 2: Get signature and validUntil
        signature, valid_until, overview_data = get_signature_and_overview(token, domain)
        if not signature or not valid_until:
            raise Exception(f"Failed to get signature for domain: {domain}")
    
    # Step 3: Get backlinks list
    backlinks = get_backlinks(signature, valid_until, domain)
    return {
        "overview": overview_data,
        "backlinks": backlinks
    }


@mcp.tool()
def keyword_generator(keyword: str, country: str = "us", search_engine: str = "Google") -> Optional[List[str]]:
    """
    Get keyword ideas for the specified keyword
    """
    site_url = f"https://ahrefs.com/keyword-generator/?country={country}&input={urllib.parse.quote(keyword)}"
    token = get_capsolver_token(site_url)
    if not token:
        raise Exception(f"Failed to get verification token for keyword: {keyword}")
    return get_keyword_ideas(token, keyword, country, search_engine)


@mcp.tool()
def get_traffic(domain_or_url: str, country: str = "None", mode: Literal["subdomains", "exact"] = "subdomains") -> Optional[Dict[str, Any]]:
    """
    Check the estimated search traffic for any website. 

    Args:
        domain_or_url (str): The domain or URL to query
        country (str): The country to query, default is "None"
        mode (["subdomains", "exact"]): The mode to use for the query
    Returns:
        Traffic data for the specified domain or URL
    """
    site_url = f"https://ahrefs.com/traffic-checker/?input={domain_or_url}&mode={mode}"
    token = get_capsolver_token(site_url)
    if not token:
        raise Exception(f"Failed to get verification token for domain: {domain_or_url}")
    return check_traffic(token, domain_or_url, mode, country)


@mcp.tool()
def keyword_difficulty(keyword: str, country: str = "us") -> Optional[Dict[str, Any]]:
    """
    Get keyword difficulty for the specified keyword
    """
    site_url = f"https://ahrefs.com/keyword-difficulty/?country={country}&input={urllib.parse.quote(keyword)}"
    token = get_capsolver_token(site_url)
    if not token:
        raise Exception(f"Failed to get verification token for keyword: {keyword}")
    return get_keyword_difficulty(token, keyword, country)


def main():
    """Run the MCP server"""
    mcp.run()


if __name__ == "__main__":
    main()

```

--------------------------------------------------------------------------------
/src/seo_mcp/keywords.py:
--------------------------------------------------------------------------------

```python
from typing import List, Optional, Any, Dict

import requests


def format_keyword_ideas(keyword_data: Optional[List[Any]]) -> List[str]:
    if not keyword_data or len(keyword_data) < 2:
        return ["\n❌ No valid keyword ideas retrieved"]
    
    data = keyword_data[1]

    result = []
    
    # 处理常规关键词推荐
    if "allIdeas" in data and "results" in data["allIdeas"]:
        all_ideas = data["allIdeas"]["results"]
        # total = data["allIdeas"].get("total", 0)
        for idea in all_ideas:
            simplified_idea = {
                "keyword": idea.get('keyword', 'No keyword'),
                "country": idea.get('country', '-'),
                "difficulty": idea.get('difficultyLabel', 'Unknown'),
                "volume": idea.get('volumeLabel', 'Unknown'),
                "updatedAt": idea.get('updatedAt', '-')
            }
            result.append({
                "label": "keyword ideas",
                "value": simplified_idea
            })
    
    # 处理问题类关键词推荐
    if "questionIdeas" in data and "results" in data["questionIdeas"]:
        question_ideas = data["questionIdeas"]["results"]
        # total = data["questionIdeas"].get("total", 0)
        for idea in question_ideas:
            simplified_idea = {
                "keyword": idea.get('keyword', 'No keyword'),
                "country": idea.get('country', '-'),
                "difficulty": idea.get('difficultyLabel', 'Unknown'),
                "volume": idea.get('volumeLabel', 'Unknown'),
                "updatedAt": idea.get('updatedAt', '-')
            }
            result.append({
                "label": "question ideas",
                "value": simplified_idea
            })
    
    if not result:
        return ["\n❌ No valid keyword ideas retrieved"]
    
    return result


def get_keyword_ideas(token: str, keyword: str, country: str = "us", search_engine: str = "Google") -> Optional[List[str]]:
    if not token:
        return None
    
    url = "https://ahrefs.com/v4/stGetFreeKeywordIdeas"
    payload = {
        "withQuestionIdeas": True,
        "captcha": token,
        "searchEngine": search_engine,
        "country": country,
        "keyword": ["Some", keyword]
    }
    
    headers = {
        "Content-Type": "application/json"
    }
    
    response = requests.post(url, json=payload, headers=headers)
    if response.status_code != 200:
        return None
    
    data = response.json()

    return format_keyword_ideas(data)


def get_keyword_difficulty(token: str, keyword: str, country: str = "us") -> Optional[Dict[str, Any]]:
    """
    Get keyword difficulty information
    
    Args:
        token (str): Verification token
        keyword (str): Keyword to query
        country (str): Country/region code, default is "us"
        
    Returns:
        Optional[Dict[str, Any]]: Dictionary containing keyword difficulty information, returns None if request fails
    """
    if not token:
        return None
    
    url = "https://ahrefs.com/v4/stGetFreeSerpOverviewForKeywordDifficultyChecker"
    
    payload = {
        "captcha": token,
        "country": country,
        "keyword": keyword
    }
    
    headers = {
        "accept": "*/*",
        "content-type": "application/json; charset=utf-8",
        "referer": f"https://ahrefs.com/keyword-difficulty/?country={country}&input={keyword}"
    }
    
    try:
        response = requests.post(url, json=payload, headers=headers)
        if response.status_code != 200:
            return None
        
        data: Optional[List[Any]] = response.json()
        # 检查响应数据格式
        if not isinstance(data, list) or len(data) < 2 or data[0] != "Ok":
            return None
        
        # 提取有效数据
        kd_data = data[1]
        
        # 格式化返回结果
        result = {
            "difficulty": kd_data.get("difficulty", 0),  # Keyword difficulty
            "shortage": kd_data.get("shortage", 0),      # Keyword shortage
            "lastUpdate": kd_data.get("lastUpdate", ""), # Last update time
            "serp": {
                "results": []
            }
        }
        
        # 处理SERP结果
        if "serp" in kd_data and "results" in kd_data["serp"]:
            serp_results = []
            for item in kd_data["serp"]["results"]:
                # 只处理有机搜索结果
                if item.get("content") and item["content"][0] == "organic":
                    organic_data = item["content"][1]
                    if "link" in organic_data and organic_data["link"][0] == "Some":
                        link_data = organic_data["link"][1]
                        result_item = {
                            "title": link_data.get("title", ""),
                            "url": link_data.get("url", [None, {}])[1].get("url", ""),
                            "position": item.get("pos", 0)
                        }
                        
                        # 添加指标数据(如果有)
                        if "metrics" in link_data and link_data["metrics"]:
                            metrics = link_data["metrics"]
                            result_item.update({
                                "domainRating": metrics.get("domainRating", 0),
                                "urlRating": metrics.get("urlRating", 0),
                                "traffic": metrics.get("traffic", 0),
                                "keywords": metrics.get("keywords", 0),
                                "topKeyword": metrics.get("topKeyword", ""),
                                "topVolume": metrics.get("topVolume", 0)
                            })
                        
                        serp_results.append(result_item)
            
            result["serp"]["results"] = serp_results
        
        return result
    except Exception:
        return None

```

--------------------------------------------------------------------------------
/src/seo_mcp/backlinks.py:
--------------------------------------------------------------------------------

```python
from typing import Any, List, Optional, Dict, Tuple, cast
import os
import json
import time
from datetime import datetime
import requests

# Cache file path for storing signatures
SIGNATURE_CACHE_FILE = "signature_cache.json"


def iso_to_timestamp(iso_date_string: str) -> float:
    """
    Convert ISO 8601 format datetime string to timestamp
    Example: "2025-04-12T14:59:18Z" -> 1744916358.0
    """
    # Handle UTC time represented by "Z"
    if iso_date_string.endswith('Z'):
        iso_date_string = iso_date_string[:-1] + '+00:00'
    dt = datetime.fromisoformat(iso_date_string)
    return dt.timestamp()



def save_signature_to_cache(signature: str, valid_until: str, overview_data: Dict[str, Any], domain: str) -> bool:
    """
    Save signature information to local cache file
    
    Args:
        signature: Obtained signature
        valid_until: Signature expiration time
        domain: Domain name
        
    Returns:
        True if saved successfully, False otherwise
    """
    # Read existing cache
    cache_data: Dict[str, Dict[str, Any]] = {}
    if os.path.exists(SIGNATURE_CACHE_FILE):
        try:
            with open(SIGNATURE_CACHE_FILE, 'r') as f:
                cache_data = json.load(f)
        except:
            pass
    
    # Update cache for current domain
    cache_data[domain] = {
        "signature": signature,
        "valid_until": valid_until,
        "overview_data": overview_data,
        "timestamp": datetime.now().timestamp()
    }
    
    try:
        with open(SIGNATURE_CACHE_FILE, 'w') as f:
            json.dump(cache_data, f)
        return True
    except Exception as e:
        return False


def load_signature_from_cache(domain: str) -> Tuple[Optional[str], Optional[str], Optional[Dict[str, Any]]]:
    """
    Load signature information for a specific domain from local cache file
    Returns the signature and valid_until if cache is valid, otherwise None
    
    Args:
        domain: Domain to query
        
    Returns:
        (signature, valid_until) tuple, or (None, None) if no valid cache
    """
    if not os.path.exists(SIGNATURE_CACHE_FILE):
        return None, None, None
    
    try:
        with open(SIGNATURE_CACHE_FILE, 'r') as f:
            cache_data = json.load(f)
        
        # Check if cache exists for current domain
        if domain not in cache_data:
            return None, None, None
        
        domain_cache = cache_data[domain]
        
        # Check if signature is expired
        valid_until = domain_cache.get("valid_until")
        
        if valid_until:
            # Convert ISO date string to timestamp for comparison
            valid_until_timestamp = iso_to_timestamp(valid_until)
            current_time = time.time()
            
            if current_time < valid_until_timestamp:
                return domain_cache.get("signature"), valid_until, domain_cache.get("overview_data")
            else:
                return None, None, None
        else:
            return None, None, None
    except Exception:
        return None, None, None



def get_signature_and_overview(token: str, domain: str) -> Tuple[Optional[str], Optional[str], Optional[Dict[str, Any]]]:
    """
    Get signature and validUntil parameters using the token
    
    Args:
        token: Verification token
        domain: Domain to query
        
    Returns:
        (signature, valid_until, overview_data) tuple, or (None, None, None) if failed
    """
    url = "https://ahrefs.com/v4/stGetFreeBacklinksOverview"
    payload = {
        "captcha": token,
        "mode": "subdomains",
        "url": domain
    }
    
    headers = {
        "Content-Type": "application/json"
    }
    
    response = requests.post(url, json=payload, headers=headers)
    if response.status_code != 200:
        return None, None, None
    
    data = response.json()
    
    try:
        # Assuming data format is always ['Ok', {signature object}]
        if isinstance(data, list) and len(cast(List[Any], data)) > 1:
            second_element: Dict[str, Any] = cast(Dict[str, Any], data[1])
            signature: str = cast(str, second_element['signedInput']['signature'])
            valid_until: str = cast(str, second_element['signedInput']['input']['validUntil'])
            overview_data: Dict[str, Any] = cast(Dict[str, Any], second_element['data'])
            
            # Save the new signature to cache
            save_signature_to_cache(signature, valid_until, overview_data, domain)
            
            return signature, valid_until, overview_data
        else:
            return None, None, None
    except Exception:
        return None, None, None
    

def format_backlinks(backlinks_data: List[Any], domain: str) -> List[Any]:
    """
    Format backlinks data
    """
    if backlinks_data and len(backlinks_data) > 1 and "topBacklinks" in backlinks_data[1]:
        backlinks = backlinks_data[1]["topBacklinks"]["backlinks"]
        # Only keep necessary fields
        simplified_backlinks = []
        for backlink in backlinks:
            simplified_backlink = {
                "anchor": backlink.get("anchor", ""),
                "domainRating": backlink.get("domainRating", 0),
                "title": backlink.get("title", ""),
                "urlFrom": backlink.get("urlFrom", ""),
                "urlTo": backlink.get("urlTo", ""),
                "edu": backlink.get("edu", False),
                "gov": backlink.get("gov", False),
            }
            simplified_backlinks.append(simplified_backlink)
        return simplified_backlinks
    else:
        return []
    

def get_backlinks(signature: str, valid_until: str, domain: str) -> Optional[List[Any]]:
    if not signature or not valid_until:
        return None
    
    url = "https://ahrefs.com/v4/stGetFreeBacklinksList"
    payload = {
        "reportType": "TopBacklinks",
        "signedInput": {
            "signature": signature,
            "input": {
                "validUntil": valid_until,
                "mode": "subdomains",
                "url": f"{domain}/"
            }
        }
    }
    
    headers = {
        "Content-Type": "application/json"
    }
    
    response = requests.post(url, json=payload, headers=headers)
    if response.status_code != 200:
        return None
    
    data = response.json()

    return format_backlinks(data, domain)



def get_backlinks_overview(signature: str, valid_until: str, domain: str) -> Optional[Dict[str, Any]]:
    """
    Retrieve backlinks overview data for a domain using Ahrefs API.
    
    Args:
        signature: The authentication signature
        valid_until: Signature expiration timestamp
        domain: The domain to get overview data for
        
    Returns:
        Dictionary containing backlinks overview data or None if request fails
    """
    if not signature or not valid_until:
        print("ERROR: No signature or valid_until, cannot proceed")
        return None
    
    url = "https://ahrefs.com/v4/stGetFreeBacklinksOverview"
    payload = {
        "captcha": signature,
        "mode": "subdomains",
        "url": domain
    }
    
    headers = {
        "Content-Type": "application/json",
        "accept": "*/*",
        "sec-fetch-site": "same-origin"
    }
    
    try:
        response = requests.post(url, json=payload, headers=headers)
        if response.status_code != 200:
            print(f"ERROR: Failed to get backlinks overview, status code: {response.status_code}, response: {response.text}")
            return None
        
        data = response.json()
        return data
    except Exception:
        return None

```