#
tokens: 19070/50000 33/33 files
lines: off (toggle) GitHub
raw markdown copy
# Directory Structure

```
├── .dockerignore
├── .github
│   ├── ISSUE_TEMPLATE
│   │   ├── bug_report.yml
│   │   ├── config.yml
│   │   ├── feature_request.yml
│   │   └── question.yml
│   ├── pull_request_template.md
│   └── workflows
│       ├── deploy.yml
│       ├── ping-server.yml
│       └── tests.yml
├── .gitignore
├── .python-version
├── assets
│   ├── cursor-usage.png
│   ├── langdock-usage.png
│   └── paperclip.svg
├── CONTRIBUTING.md
├── docker-compose.prod.yml
├── docker-compose.yml
├── Dockerfile
├── LICENSE.md
├── README.md
├── requirements.txt
├── src
│   ├── core
│   │   ├── __init__.py
│   │   ├── arxiv.py
│   │   ├── openalex.py
│   │   ├── osf.py
│   │   └── providers.py
│   ├── prompts.py
│   ├── server.py
│   ├── tools.py
│   └── utils
│       ├── __init__.py
│       ├── pdf2md.py
│       └── sanitize_api_queries.py
└── tests
    ├── __init__.py
    ├── test_metadata_retrieval.py
    └── test_pdf_retrieval.py
```

# Files

--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------

```
3.12.7

```

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
venv
env
.venv
.env
__pycache__/
*.xlsx
*.csv
.DS_Store
.luarc.json

*node_modules/

```

--------------------------------------------------------------------------------
/.dockerignore:
--------------------------------------------------------------------------------

```
# Git
.git
.gitignore
.gitattributes

# Docker
Dockerfile*
docker-compose*
.dockerignore

# Documentation
*.md
docs/

# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# Virtual environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
.conda/

# IDEs
.cursor/
.vscode/
.idea/
*.swp
*.swo
*~

# OS
.DS_Store
.DS_Store?
._*
.Spotlight-V100
.Trashes
ehthumbs.db
Thumbs.db

# Testing
.tox/
.nox/
.coverage
.pytest_cache/
htmlcov/
.cache
tests/

# Jupyter Notebook
.ipynb_checkpoints

# Environment variables
.env*
env.prod.template

# Logs
*.log
logs/

# Temporary files
*.tmp
*.temp
.tmp/
.temp/
```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
<div align="center">
  <img src="assets/paperclip.svg" alt="Paperclip Logo" width="48" height="48">
  
  # Paperclip MCP Server
</div>

> 📎 Paperclip is a Model Context Protocol (MCP) server that enables searching and retrieving research papers from Arxiv, the Open Science Framework (OSF) API, and OpenAlex.

[![Tests](https://github.com/matsjfunke/paperclip/actions/workflows/tests.yml/badge.svg?branch=main)](https://github.com/matsjfunke/paperclip/actions/workflows/tests.yml)
[![Health Check](https://github.com/matsjfunke/paperclip/actions/workflows/ping-server.yml/badge.svg)](https://github.com/matsjfunke/paperclip/actions/workflows/ping-server.yml)
[![License](https://img.shields.io/badge/license-MIT-blue.svg)](LICENSE.md)

## Quick Start

Setup the paperclip MCP server in your host via the server url `https://paperclip.matsjfunke.com/mcp` no authentication is needed.

Example JSON for cursor:

```json
{
  "mcpServers": {
    "paperclip": {
      "url": "https://paperclip.matsjfunke.com/mcp"
    }
  }
}
```

## Table of Contents

- [Quick Start](#quick-start)
- [Usage Examples](#usage-examples)
- [Supported Paper providers](#supported-paper-providers)
- [Preprint Providers to be added](#preprint-providers-to-be-added)
- [Contributing](#contributing)

## Usage Examples

Here are examples of Paperclip integrated with popular MCP clients:

**Cursor IDE:**

![Paperclip integration with Cursor](assets/cursor-usage.png)

**Langdock:**

![Paperclip integration with Langdock](assets/langdock-usage.png)

## Supported Paper providers

- [AfricArXiv](https://africarxiv.org)
- [AgriXiv](https://agrirxiv.org)
- [ArabXiv](https://arabixiv.org)
- [arXiv](https://arxiv.org)
- [BioHackrXiv](http://guide.biohackrxiv.org/about.html)
- [BodoArXiv](https://bodoarxiv.wordpress.com)
- [COP Preprints](https://www.collegeofphlebology.com)
- [EarthArXiv](https://eartharxiv.org)
- [EcoEvoRxiv](https://www.ecoevorxiv.com)
- [ECSarxiv](https://ecsarxiv.org)
- [EdArXiv](https://edarxiv.org)
- [EngrXiv](https://engrxiv.org)
- [FocusArchive](https://osf.io/preprints/focusarchive)
- [Frenxiv](https://frenxiv.org)
- [INArxiv](https://rinarxiv.lipi.go.id)
- [IndiaRxiv](https://osf.io/preprints/indiarxiv)
- [Law Archive](https://library.law.yale.edu/research/law-archive)
- [LawArXiv](https://osf.io/preprints/lawarxiv)
- [LISSA](https://osf.io/preprints/lissa)
- [LiveData](https://osf.io/preprints/livedata)
- [MarXiv](https://osf.io/preprints/marxiv)
- [MediArXiv](https://mediarxiv.com)
- [MetaArXiv](https://osf.io/preprints/metaarxiv)
- [MindRxiv](https://osf.io/preprints/mindrxiv)
- [NewAddictionSx](https://osf.io/preprints/newaddictionsx)
- [NutriXiv](https://niblunc.org)
- [OpenAlex](https://openalex.org)
- [OSF Preprints](https://osf.io/preprints/osf)
- [PaleoRxiv](https://osf.io/preprints/paleorxiv)
- [PsyArXiv](https://psyarxiv.com)
- [SocArXiv](https://socopen.org/welcome)
- [SportRxiv](http://sportrxiv.org)
- [Thesis Commons](https://osf.io/preprints/thesiscommons)

## Preprint Providers to be added

[List of preprint repositorys](https://en.wikipedia.org/wiki/List_of_preprint_repositories)

- bioRxiv & medRxiv both share the underlying api structure (https://api.biorxiv.org/pubs/[server]/[interval]/[cursor] where [server] can be "biorxiv" or "medrxiv")
- ChemRxiv
- [hal open science](https://hal.science/?lang=en)
- [research square](https://www.researchsquare.com/)
- [osf preprints](https://osf.io/preprints)
- [preprints.org](https://preprints.org)
- [science open](https://www.scienceopen.com/)
- [SSRN](https://www.ssrn.com/index.cfm/en/the-lancet/)
- [synthical](https://synthical.com/feed/new)

## Contributing

Interested in contributing to Paperclip? Check out our [Contributing Guide](CONTRIBUTING.md) for development setup instructions, testing procedures, and more!

```

--------------------------------------------------------------------------------
/LICENSE.md:
--------------------------------------------------------------------------------

```markdown
MIT License

Copyright (c) 2025 Mats Julius Funke 

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

```

--------------------------------------------------------------------------------
/CONTRIBUTING.md:
--------------------------------------------------------------------------------

```markdown
# Contributing to Paperclip

Thank you for your interest in contributing to Paperclip! This guide will help you get started with development.

## Development Setup

### Prerequisites

- Python 3.12+
- pip

### Installation

1. **Fork and clone the repository**

   - Fork this repository on GitHub
   - Clone your fork:

   ```bash
   git clone https://github.com/YOUR_USERNAME/paperclip.git
   cd paperclip
   ```

2. **Create and activate virtual environment**

   ```bash
   python -m venv .venv
   source .venv/bin/activate  # On Windows: .venv\Scripts\activate
   ```

3. **Install dependencies**

   ```bash
   pip install -r requirements.txt
   ```

4. **Add dependencies**
   ```bash
   pip install <new-lib>
   pip freeze > requirements.txt
   ```

### Running the Server with Hot Reload

```bash
# Run with hot reload
watchmedo auto-restart --patterns="*.py" --recursive -- python src/server.py
# Run Server using fastmcp
fastmcp run src/server.py --transport http --host 0.0.0.0 --port 8000
# use docker compose
docker-compose up --build
```

The server will automatically restart when you make changes to any `.py` files.

## Testing

Use the [MCP Inspector](https://inspector.modelcontextprotocol.io/) to interact with the server.

```bash
pnpx @modelcontextprotocol/inspector
```

### Unit Tests

Run the unit tests to verify the functionality of individual components:

```bash
# Run all tests
python -m unittest discover tests -v
```

## Contributing Changes

### Creating a Pull Request

1. **Create a feature branch**

   ```bash
   git checkout -b feat/your-feature-name
   # or for bug fixes:
   git checkout -b fix/issue-description
   ```

2. **Make your changes**

   - Write your code following the existing style
   - Add tests for new functionality
   - Update documentation as needed

3. **Commit your changes and push to your fork**

   ```bash
   git push origin feat/your-feature-name
   ```

4. **Open a Pull Request**

   - Go to the original repository on GitHub
   - Click "New Pull Request"
   - Select your branch from your fork
   - Fill out the PR template with:
     - Clear description of changes
     - Link to related issues (if applicable)
     - Testing steps you've performed

### Pull Request Guidelines

- **Keep PRs focused**: One feature or fix per PR
- **Write clear descriptions**: Explain what changes you made and why
- **Test your changes**: Ensure all tests pass before submitting
- **Update documentation**: Add or update docs for new features
- **Be responsive**: Address feedback and questions promptly

```

--------------------------------------------------------------------------------
/tests/__init__.py:
--------------------------------------------------------------------------------

```python
# Tests package for Paperclip MCP Server
```

--------------------------------------------------------------------------------
/.github/ISSUE_TEMPLATE/config.yml:
--------------------------------------------------------------------------------

```yaml
blank_issues_enabled: true
contact_links:
  - name: 📖 Documentation
    url: https://github.com/matsjfunke/paperclip/blob/main/README.md
    about: Check the README for documentation

```

--------------------------------------------------------------------------------
/src/utils/__init__.py:
--------------------------------------------------------------------------------

```python
"""
Utility functions for the paperclip MCP server.
"""

from .pdf2md import extract_pdf_to_markdown
from .sanitize_api_queries import sanitize_api_queries

__all__ = ["sanitize_api_queries", "extract_pdf_to_markdown"]

```

--------------------------------------------------------------------------------
/docker-compose.yml:
--------------------------------------------------------------------------------

```yaml
version: "3.8"

services:
  paperclip:
    build:
      context: .
    image: paperclip-image
    container_name: paperclip
    ports:
      - 8000:8000
    volumes:
      - ./:/app # mount local backend dir to /app in container to enable live reloading of code changes
    command: watchmedo auto-restart --patterns="*.py" --recursive -- python src/server.py --transport http --host 0.0.0.0 --port 8000

```

--------------------------------------------------------------------------------
/assets/paperclip.svg:
--------------------------------------------------------------------------------

```
<svg  xmlns="http://www.w3.org/2000/svg"  width="48"  height="48"  viewBox="0 0 24 24"  fill="none"  stroke="white"  stroke-width="2"  stroke-linecap="round"  stroke-linejoin="round"  class="icon icon-tabler icons-tabler-outline icon-tabler-paperclip"><path stroke="none" d="M0 0h24v24H0z" fill="none"/><path d="M15 7l-6.5 6.5a1.5 1.5 0 0 0 3 3l6.5 -6.5a3 3 0 0 0 -6 -6l-6.5 6.5a4.5 4.5 0 0 0 9 9l6.5 -6.5" /></svg>
```

--------------------------------------------------------------------------------
/.github/pull_request_template.md:
--------------------------------------------------------------------------------

```markdown
## Description

Brief description of what this PR does.

## Changes Made

- [ ] List specific changes
- [ ] Include any new features
- [ ] Mention any bug fixes

## Testing

- [ ] All existing tests pass
- [ ] Added tests for new functionality (if applicable)
- [ ] Tested base functonality manually with MCP Inspector

## Related Issues

Closes #[issue-number] (if applicable)

## Additional Notes

Any additional context or considerations for reviewers.

```

--------------------------------------------------------------------------------
/Dockerfile:
--------------------------------------------------------------------------------

```dockerfile
FROM python:3.12-slim-bullseye

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Update sources list and install packages (assuming these are needed for your app)
RUN apt-get update && \
    apt-get upgrade -y && \
    apt-get install -y --no-install-recommends \
    libgl1-mesa-glx \
    libglib2.0-0 \
    && apt-get clean && \
    rm -rf /var/lib/apt/lists/*

RUN pip install --upgrade pip
COPY ./src .

EXPOSE 8000
```

--------------------------------------------------------------------------------
/.github/workflows/tests.yml:
--------------------------------------------------------------------------------

```yaml
name: Tests

on:
  push:
    branches: [main]
  workflow_dispatch:

jobs:
  test:
    runs-on: ubuntu-latest

    steps:
      - name: Checkout code
        uses: actions/checkout@v4

      - name: Set up Python 3.12
        uses: actions/setup-python@v4
        with:
          python-version: 3.12

      - name: Install dependencies
        run: |
          python -m pip install --upgrade pip
          pip install -r requirements.txt

      - name: Run all tests
        run: |
          python -m unittest discover tests -v

```

--------------------------------------------------------------------------------
/src/core/__init__.py:
--------------------------------------------------------------------------------

```python
"""
Core package for paperclip MCP server.
"""

from .arxiv import (
    fetch_arxiv_papers,
    fetch_single_arxiv_paper_metadata,
)
from .osf import (
    fetch_osf_preprints,
    fetch_osf_providers,
    fetch_single_osf_preprint_metadata,
)
from .openalex import (
    fetch_openalex_papers,
    fetch_single_openalex_paper_metadata,
)


from .providers import get_all_providers, validate_provider, fetch_osf_providers

__all__ = [
    "fetch_arxiv_papers",
    "fetch_osf_preprints",
    "fetch_osf_providers",
    "fetch_single_arxiv_paper_metadata",
    "fetch_single_osf_preprint_metadata",
    "fetch_openalex_papers",
    "fetch_single_openalex_paper_metadata",
    "get_all_providers",
    "validate_provider",
    "fetch_osf_providers",
]

```

--------------------------------------------------------------------------------
/requirements.txt:
--------------------------------------------------------------------------------

```
annotated-types==0.7.0
anyio==4.9.0
attrs==25.3.0
Authlib==1.6.1
certifi==2025.8.3
cffi==1.17.1
charset-normalizer==3.4.2
click==8.2.1
cryptography==45.0.5
cyclopts==3.22.5
dnspython==2.7.0
docstring_parser==0.17.0
docutils==0.22
email_validator==2.2.0
exceptiongroup==1.3.0
fastmcp==2.11.0
h11==0.16.0
httpcore==1.0.9
httpx==0.28.1
httpx-sse==0.4.1
idna==3.10
isodate==0.7.2
jsonschema==4.25.0
jsonschema-path==0.3.4
jsonschema-specifications==2025.4.1
lazy-object-proxy==1.11.0
markdown-it-py==3.0.0
MarkupSafe==3.0.2
mcp==1.12.3
mdurl==0.1.2
more-itertools==10.7.0
openapi-core==0.19.5
openapi-pydantic==0.5.1
openapi-schema-validator==0.6.3
openapi-spec-validator==0.7.2
parse==1.20.2
pathable==0.4.4
pycparser==2.22
pydantic==2.11.7
pydantic-settings==2.10.1
pydantic_core==2.33.2
Pygments==2.19.2
PyMuPDF==1.26.3
pymupdf4llm==0.0.27
pyperclip==1.9.0
python-dotenv==1.1.1
python-multipart==0.0.20
PyYAML==6.0.2
referencing==0.36.2
requests==2.32.4
rfc3339-validator==0.1.4
rich==14.1.0
rich-rst==1.3.1
rpds-py==0.26.0
six==1.17.0
sniffio==1.3.1
sse-starlette==3.0.2
starlette==0.47.2
typing-inspection==0.4.1
typing_extensions==4.14.1
urllib3==2.5.0
uvicorn==0.35.0
watchdog==6.0.0
Werkzeug==3.1.1

```

--------------------------------------------------------------------------------
/src/prompts.py:
--------------------------------------------------------------------------------

```python
from fastmcp import FastMCP


prompt_mcp = FastMCP()

@prompt_mcp.prompt  
def list_paper_providers() -> str:
    """List all available paper providers."""
    return "List all available paper providers."

@prompt_mcp.prompt
def find_attention_is_all_you_need() -> str:
    """Finds the Attention is all you need paper in arxiv."""
    return "Search for Attention is all you need in arxiv"

@prompt_mcp.prompt
def get_paper_by_id() -> str:
    """Prompt to use the get_paper_by_id tool."""
    return "Retrieve the full content (including abstract, sections, and references) of the paper with ID: 1706.03762"

@prompt_mcp.prompt
def get_paper_metadata_by_id() -> str:
    """Prompt to use the get_paper_metadata_by_id tool."""
    return "Retrieve the metadata of the paper with ID: 1706.03762"

@prompt_mcp.prompt
def get_paper_by_url() -> str:
    """Prompt to use the get_paper_by_url tool."""
    return "Retrieve the full content (including abstract, sections, and references) of the paper with URL: https://arxiv.org/pdf/1706.03762"

@prompt_mcp.prompt
def search_across_providers() -> str:
    """Prompt for searching across all providers (not specifying a provider)."""
    return "Search for papers across all providers with the query: MCP"
```

--------------------------------------------------------------------------------
/.github/ISSUE_TEMPLATE/bug_report.yml:
--------------------------------------------------------------------------------

```yaml
name: Bug Report
description: Report a bug or unexpected behavior
title: "[Bug]: "
labels: ["bug"]
body:
  - type: markdown
    attributes:
      value: |
        Thanks for taking the time to report a bug 🫶 ! Please fill out the information below to help us investigate.

  - type: textarea
    id: description
    attributes:
      label: Bug Description
      description: A clear and concise description of what the bug is.
      placeholder: Describe what happened and what you expected to happen instead.
    validations:
      required: true

  - type: textarea
    id: reproduction
    attributes:
      label: Steps to Reproduce
      description: Steps to reproduce the behavior
    validations:
      required: true

  - type: textarea
    id: expected
    attributes:
      label: Expected Behavior
      description: What you expected to happen
    validations:
      required: true

  - type: textarea
    id: actual
    attributes:
      label: Actual Behavior
      description: What actually happened (include full error message if applicable)
    validations:
      required: true

  - type: textarea
    id: additional-context
    attributes:
      label: Additional Context
      description: Any other context about the problem here.

```

--------------------------------------------------------------------------------
/src/server.py:
--------------------------------------------------------------------------------

```python
from typing import Annotated
import asyncio

from fastmcp import FastMCP

from core import (
    fetch_arxiv_papers,
    fetch_openalex_papers,
    fetch_osf_preprints,
    fetch_single_arxiv_paper_metadata,
    fetch_single_openalex_paper_metadata,
    fetch_single_osf_preprint_metadata,
    fetch_osf_providers,
    get_all_providers,
)
from utils.pdf2md import download_pdf_and_parse_to_markdown, download_paper_and_parse_to_markdown, extract_pdf_to_markdown
from prompts import prompt_mcp
from tools import tools_mcp

mcp = FastMCP(
    name="Paperclip MCP Server",
    instructions="""
        This server provides tools to search, retrieve, and read academic papers from multiple sources.
        - Search papers across providers with filters for query text, subjects, and publication date
        - Read full paper content in markdown format
        - Retrieve paper metadata without downloading content (e.g. title, authors, abstract, publication date, journal info, and download URLs)
    """,
)


# Import subservers
async def setup():
    await mcp.import_server(prompt_mcp, prefix="prompt")
    await mcp.import_server(tools_mcp, prefix="tools")

if __name__ == "__main__":
    asyncio.run(setup())
    mcp.run(transport="http", host="0.0.0.0", port=8000)

```

--------------------------------------------------------------------------------
/.github/ISSUE_TEMPLATE/question.yml:
--------------------------------------------------------------------------------

```yaml
name: Question
description: Ask a question about paperclip
title: "[Question]: "
labels: ["question"]
body:
  - type: markdown
    attributes:
      value: |
        Have a question about paperclip? We're here to help! Please provide as much detail as possible.

  - type: textarea
    id: question
    attributes:
      label: Your Question
      description: What would you like to know about paperclip?
      placeholder: Ask your question here...
    validations:
      required: true

  - type: textarea
    id: context
    attributes:
      label: Context
      description: |
        What are you trying to achieve? Providing context helps us give better answers.
      placeholder: |
        e.g., "I'm trying to understand how paperclip handles..."
        or "I want to use paperclip to..."

  - type: checkboxes
    id: checklist
    attributes:
      label: Checklist
      description: Please confirm you've done the following
      options:
        - label: I've checked the README and documentation
          required: true
        - label: I've searched existing issues for similar questions
          required: true

  - type: textarea
    id: additional-info
    attributes:
      label: Additional Information
      description: Any other details that might be helpful

```

--------------------------------------------------------------------------------
/.github/ISSUE_TEMPLATE/feature_request.yml:
--------------------------------------------------------------------------------

```yaml
name: Feature Request
description: Suggest a new feature or improvement
title: "[Feature]: "
labels: ["enhancement"]
body:
  - type: markdown
    attributes:
      value: |
        Thanks for suggesting a new feature! Please fill out the information below to help us understand your request.

  - type: textarea
    id: summary
    attributes:
      label: Feature Summary
      description: A clear and concise description of the feature you'd like to see added.
      placeholder: Briefly describe the feature you're requesting.
    validations:
      required: true

  - type: textarea
    id: problem
    attributes:
      label: Problem or Use Case
      description: What problem does this feature solve? What use case does it address?
      placeholder: |
        e.g., "I often need to... but currently paperclip doesn't support..."
        or "It would be helpful if paperclip could..."
    validations:
      required: true

  - type: textarea
    id: solution
    attributes:
      label: Proposed Solution
      description: How would you like this feature to work?
      placeholder: |
        Describe your ideal solution. Consider:
        - What command/option would trigger this feature?
        - What would the output look like?
        - How should it interact with existing features?
    validations:
      required: true

  - type: textarea
    id: alternatives
    attributes:
      label: Alternatives Considered
      description: Have you considered any alternative solutions or workarounds?
      placeholder: |
        e.g., "I currently work around this by..."
        or "Other tools like X handle this by..."

  - type: textarea
    id: additional-context
    attributes:
      label: Additional Context
      description: |
        Any other context, screenshots, or examples that would help us understand your request.

```

--------------------------------------------------------------------------------
/src/utils/sanitize_api_queries.py:
--------------------------------------------------------------------------------

```python
"""
Text processing utilities for API interactions.
"""

import re


def sanitize_api_queries(text: str, max_length: int = 200) -> str:
    """
    Clean text for API queries by removing problematic characters and formatting.

    Args:
        text: The text to clean
        max_length: Maximum allowed length (default 200)

    Returns:
        Cleaned text suitable for API queries
    """
    if not text:
        return text

    # Remove or replace problematic characters
    cleaned = text

    # Replace various quote types with simple quotes or remove them
    cleaned = cleaned.replace('"', '"').replace('"', '"').replace(""", "'").replace(""", "'")

    # Remove or replace other problematic special characters
    cleaned = cleaned.replace("&nbsp;", " ")  # Non-breaking space
    cleaned = cleaned.replace("\u00a0", " ")  # Unicode non-breaking space
    cleaned = cleaned.replace("\n", " ").replace("\r", " ").replace("\t", " ")  # Line breaks and tabs

    # Replace multiple spaces with single space
    cleaned = re.sub(r"\s+", " ", cleaned)

    # Remove leading/trailing whitespace
    cleaned = cleaned.strip()

    # Handle length limits
    if len(cleaned) > max_length:
        cleaned = cleaned[: max_length - 3] + "..."

    # Remove or replace characters that commonly cause URL encoding issues
    problematic_chars = ["<", ">", "{", "}", "|", "\\", "^", "`", "[", "]"]
    for char in problematic_chars:
        cleaned = cleaned.replace(char, "")

    # Replace colons which seem to cause OSF API issues
    cleaned = cleaned.replace(":", " -")

    # Replace other potentially problematic punctuation
    cleaned = cleaned.replace(";", ",")  # Semicolons to commas
    cleaned = cleaned.replace("?", "")  # Remove question marks
    cleaned = cleaned.replace("!", "")  # Remove exclamation marks
    cleaned = cleaned.replace("#", "")  # Remove hashtags
    cleaned = cleaned.replace("%", "")  # Remove percent signs

    # Clean up any double spaces created by replacements
    cleaned = re.sub(r"\s+", " ", cleaned).strip()

    return cleaned

```

--------------------------------------------------------------------------------
/src/core/providers.py:
--------------------------------------------------------------------------------

```python
from typing import Any, Dict, List

import requests


def fetch_osf_providers() -> List[Dict[str, Any]]:
    """Fetch current list of valid OSF preprint providers from API"""
    url = "https://api.osf.io/v2/preprint_providers/"
    response = requests.get(url)
    response.raise_for_status()
    data = response.json()

    # Create provider objects from the response
    providers = []
    for provider in data["data"]:
        provider_obj = {
            "id": provider["id"],
            "type": "osf",
            "description": provider["attributes"]["description"],
            "taxonomies": provider["relationships"]["taxonomies"]["links"]["related"]["href"],
            "preprints": provider["relationships"]["preprints"]["links"]["related"]["href"],
        }
        providers.append(provider_obj)

    return sorted(providers, key=lambda p: p["id"])


def get_external_providers() -> List[Dict[str, Any]]:
    """Get list of external (non-OSF) preprint providers"""
    return [
        {
            "id": "arxiv",
            "type": "standalone",
            "description": "arXiv is a free distribution service and an open-access archive for scholarly articles in physics, mathematics, computer science, quantitative biology, quantitative finance, statistics, electrical engineering and systems science, and economics.",
        },
        {
            "id": "openalex",
            "type": "standalone",
            "description": "OpenAlex is a comprehensive index of scholarly works across all disciplines.",
        },
    ]


def get_all_providers() -> List[Dict[str, Any]]:
    """Get combined list of all available providers"""
    osf_providers = fetch_osf_providers()
    external_providers = get_external_providers()
    all_providers = osf_providers + external_providers
    return sorted(all_providers, key=lambda p: p["id"].lower())


def validate_provider(provider_id: str) -> bool:
    """Validate if a provider ID exists in the given providers list"""
    valid_ids = [p["id"] for p in get_all_providers()]
    return provider_id in valid_ids

```

--------------------------------------------------------------------------------
/.github/workflows/deploy.yml:
--------------------------------------------------------------------------------

```yaml
name: Deploy to VPS

on:
  push:
    branches: [main]
  workflow_dispatch:

jobs:
  deploy:
    runs-on: ubuntu-latest

    steps:
      - name: Checkout code
        uses: actions/checkout@v4

      - name: Deploy to VPS
        uses: appleboy/[email protected]
        with:
          host: ${{ secrets.VPS_HOST }}
          username: ${{ secrets.VPS_USERNAME }}
          key: ${{ secrets.VPS_SSH_KEY }}
          passphrase: ${{ secrets.VPS_SSH_KEY_PASSPHRASE }}
          script: |
            cd /opt/paperclip
            git fetch origin main
            git reset --hard origin/main

            echo "🔄 Stopping containers..."
            docker-compose -f docker-compose.prod.yml down

            echo "🏗️ Building containers..."
            docker-compose -f docker-compose.prod.yml build --no-cache

            echo "🚀 Starting containers..."
            docker-compose -f docker-compose.prod.yml up -d

            echo "⏳ Waiting for containers to start..."
            sleep 10

            echo "📊 Container status:"
            docker-compose -f docker-compose.prod.yml ps --format "table {{.Service}}\t{{.Status}}\t{{.Ports}}"

            echo "📋 Recent logs from all services (filtered):"
            docker-compose -f docker-compose.prod.yml logs --tail=30 | grep -E "(ERROR|WARN|INFO|Ready|Starting|Listening)" | head -50

            echo "🧹 Cleaning up..."
            docker system prune -f

      - name: Show specific service logs on failure
        if: failure()
        uses: appleboy/[email protected]
        with:
          host: ${{ secrets.VPS_HOST }}
          username: ${{ secrets.VPS_USERNAME }}
          key: ${{ secrets.VPS_SSH_KEY }}
          passphrase: ${{ secrets.VPS_SSH_KEY_PASSPHRASE }}
          script: |
            cd /opt/paperclip
            echo "🔍 Filtered logs for debugging:"
            echo "--- Traefik status ---"
            docker-compose -f docker-compose.prod.yml logs --tail=50 traefik | grep -E "(ERROR|WARN|Ready|Starting|tls|certificate)" | head -30
            echo "--- Paperclip MCP Server status ---"
            docker-compose -f docker-compose.prod.yml logs --tail=50 paperclip-mcp | grep -E "(ERROR|WARN|Ready|Starting|Listening|Build)" | head -30

```

--------------------------------------------------------------------------------
/.github/workflows/ping-server.yml:
--------------------------------------------------------------------------------

```yaml
name: Health Check

on:
  schedule:
    # Run once per day at 10:00 UTC
    - cron: "0 10 * * *"
  workflow_dispatch:

jobs:
  ping:
    runs-on: ubuntu-latest

    steps:
      - name: Checkout code
        uses: actions/checkout@v4

      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: "3.11"

      - name: Install dependencies
        run: |
          python -m pip install --upgrade pip
          pip install fastmcp==2.11.0

      - name: Create ping script
        run: |
          cat > ping_server.py << 'EOF'
          import asyncio
          import sys
          import os
          from datetime import datetime
          from fastmcp.client.client import Client

          async def ping_server():
              server_url = 'https://paperclip.matsjfunke.com/mcp'
              
              print(f"🏓 Pinging MCP server at: {server_url}")
              print(f"⏰ Timestamp: {datetime.now().isoformat()}")
              
              try:
                  # Create client instance
                  client = Client(server_url)
                  
                  # Connect and ping
                  async with client:
                      print("✅ Successfully connected to server")
                      
                      # Send ping
                      ping_result = await client.ping()
                      
                      if ping_result:
                          print("🎯 Ping successful! Server is responsive")
                          return True
                      else:
                          print("❌ Ping failed! Server did not respond properly")
                          return False
                          
              except Exception as e:
                  print(f"💥 Error connecting to server: {str(e)}")
                  print(f"🔧 Error type: {type(e).__name__}")
                  return False

          if __name__ == "__main__":
              result = asyncio.run(ping_server())
              if not result:
                  sys.exit(1)
          EOF

      - name: Run ping test
        run: python ping_server.py

      - name: Report ping failure
        if: failure()
        run: |
          echo "🚨 Server ping failed!"
          echo "⚠️  This could indicate:"
          echo "   - Server is down or not responding"
          echo "   - Network connectivity issues"
          echo "   - Server is overloaded"
          echo "   - Configuration problems"
          echo ""
          echo "🔍 Check the deploy workflow and server logs for more details"

      - name: Report ping success
        if: success()
        run: |
          echo "✅ Server ping successful!"
          echo "🟢 Paperclip server is healthy and responsive"

```

--------------------------------------------------------------------------------
/tests/test_metadata_retrieval.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Unit tests for metadata retrieval functionality.
"""

import unittest
import sys
import os

# Add src to path to import server modules
sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'src'))

from core import (
    fetch_single_arxiv_paper_metadata,
    fetch_single_openalex_paper_metadata,
    fetch_single_osf_preprint_metadata,
)


class TestMetadataRetrieval(unittest.TestCase):
    """Test class for paper metadata retrieval."""

    def setUp(self):
        """Set up test fixtures."""
        self.osf_id = "2stpg"
        self.openalex_id = "W4385245566" 
        self.arxiv_id = "1709.06308v1"
        
        # Expected results based on tmp.py output
        self.expected_osf_title = "The Economy of Attention and the Novel"
        self.expected_openalex_title = "Attention Is All You Need"
        self.expected_arxiv_title = "Exploring Human-like Attention Supervision in Visual Question Answering"

    def test_osf_metadata_retrieval(self):
        """Test OSF paper metadata retrieval."""
        result = fetch_single_osf_preprint_metadata(self.osf_id)
        
        # Assert that result is a dictionary and not an error
        self.assertIsInstance(result, dict)
        self.assertNotIn("status", result) or result.get("status") != "error"
        
        # Assert title and ID
        self.assertEqual(result.get("title"), self.expected_osf_title)
        self.assertEqual(result.get("id"), self.osf_id)

    def test_openalex_metadata_retrieval(self):
        """Test OpenAlex paper metadata retrieval.""" 
        result = fetch_single_openalex_paper_metadata(self.openalex_id)
        
        # Assert that result is a dictionary and not an error
        self.assertIsInstance(result, dict)
        self.assertNotIn("status", result) or result.get("status") != "error"
        
        # Assert title and ID
        self.assertEqual(result.get("title"), self.expected_openalex_title)
        self.assertEqual(result.get("id"), self.openalex_id)

    def test_arxiv_metadata_retrieval(self):
        """Test ArXiv paper metadata retrieval."""
        result = fetch_single_arxiv_paper_metadata(self.arxiv_id)
        
        # Assert that result is a dictionary and not an error
        self.assertIsInstance(result, dict)
        self.assertNotIn("status", result) or result.get("status") != "error"
        
        # Assert title and ID
        self.assertEqual(result.get("title"), self.expected_arxiv_title)
        self.assertEqual(result.get("id"), self.arxiv_id)

    def test_metadata_contains_required_fields(self):
        """Test that metadata contains essential fields."""
        result = fetch_single_arxiv_paper_metadata(self.arxiv_id)
        
        # Assert required fields are present
        self.assertIn("title", result)
        self.assertIn("id", result)
        self.assertIsNotNone(result.get("title"))
        self.assertIsNotNone(result.get("id"))


if __name__ == "__main__":
    unittest.main()
```

--------------------------------------------------------------------------------
/docker-compose.prod.yml:
--------------------------------------------------------------------------------

```yaml
version: "3.8"

services:
  traefik:
    image: traefik:v3.0
    container_name: traefik
    command:
      - "--api.insecure=false" # Disable insecure API dashboard for production security
      - "--providers.docker=true" # Auto-discover services via Docker labels
      # Only expose services that explicitly set traefik.enable=true (security best practice)
      - "--providers.docker.exposedbydefault=false"
      - "--entrypoints.web.address=:80" # HTTP entrypoint redirects to HTTPS
      - "--entrypoints.websecure.address=:443" # HTTPS entrypoint
      - "--certificatesresolvers.myresolver.acme.tlschallenge=true" # Automatic SSL certificate generation via Let's Encrypt TLS challenge
      - "--certificatesresolvers.myresolver.acme.email=mats.funke@gmail.com" # Email required for Let's Encrypt certificate notifications and recovery
      - "--certificatesresolvers.myresolver.acme.storage=/letsencrypt/acme.json" # SSL certificates persist across container restarts
      # Force HTTP to HTTPS redirect for security (all traffic must be encrypted)
      - "--entrypoints.web.http.redirections.entrypoint.to=websecure"
      - "--entrypoints.web.http.redirections.entrypoint.scheme=https"
    ports:
      - "80:80"
      - "443:443"
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock:ro
      - traefik_letsencrypt:/letsencrypt # SSL certificates persist across container restarts
    networks:
      - web
    restart: unless-stopped

  paperclip-mcp:
    build:
      context: .
      dockerfile: Dockerfile
    container_name: paperclip-mcp
    command: python server.py
    labels:
      - "traefik.enable=true"

      # Define service first to avoid Traefik auto-generating conflicting services
      - "traefik.http.services.paperclip-mcp.loadbalancer.server.port=8000"

      # MCP server route - accessible via HTTPS
      - "traefik.http.routers.paperclip-mcp.rule=Host(`paperclip.matsjfunke.com`)"
      - "traefik.http.routers.paperclip-mcp.entrypoints=websecure"
      - "traefik.http.routers.paperclip-mcp.tls.certresolver=myresolver"
      - "traefik.http.routers.paperclip-mcp.service=paperclip-mcp"

      # CORS headers required for MCP protocol compatibility with AI clients
      - "traefik.http.middlewares.mcp-cors.headers.accesscontrolallowmethods=GET,POST,OPTIONS,PUT,DELETE"
      - "traefik.http.middlewares.mcp-cors.headers.accesscontrolallowheaders=Content-Type,Authorization,Accept,Origin,User-Agent,DNT,Cache-Control,X-Mx-ReqToken,Keep-Alive,X-Requested-With,If-Modified-Since,mcp-session-id"
      - "traefik.http.middlewares.mcp-cors.headers.accesscontrolalloworiginlist=*"
      - "traefik.http.middlewares.mcp-cors.headers.accesscontrolmaxage=86400"

      # Apply CORS middleware to the router
      - "traefik.http.routers.paperclip-mcp.middlewares=mcp-cors"

    networks:
      - web
    restart: unless-stopped
    depends_on:
      - traefik
    environment:
      - PYTHONPATH=/app

volumes:
  # Named volume for Let's Encrypt certificates persistence across container restarts
  traefik_letsencrypt:

networks:
  # Internal network for container communication (external=false for security)
  web:
    external: false

```

--------------------------------------------------------------------------------
/tests/test_pdf_retrieval.py:
--------------------------------------------------------------------------------

```python
#!/usr/bin/env python3
"""
Unit tests for PDF retrieval functionality.
"""

import unittest
import sys
import os
import asyncio

# Add src to path to import server modules
sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'src'))

from core import (
    fetch_single_arxiv_paper_metadata,
    fetch_single_openalex_paper_metadata,
    fetch_single_osf_preprint_metadata,
)
from utils.pdf2md import download_paper_and_parse_to_markdown


class TestPdfRetrieval(unittest.TestCase):
    """Test class for paper PDF retrieval and content extraction."""

    def setUp(self):
        """Set up test fixtures."""
        self.osf_id = "2stpg"
        self.openalex_id = "W4385245566" 
        self.arxiv_id = "1709.06308v1"
        
        # Expected content starts based on tmp_pdf.py output
        self.expected_osf_start = "#### The Economy of Attention and the Novel"
        self.expected_openalex_start = "Skip to main content"
        self.expected_arxiv_start = "## **Exploring Human-like Attention Supervision in Visual Question Answering**"

    def test_osf_pdf_retrieval(self):
        """Test OSF paper PDF retrieval and content extraction."""
        metadata = fetch_single_osf_preprint_metadata(self.osf_id)
        result = asyncio.run(download_paper_and_parse_to_markdown(
            metadata=metadata,
            pdf_url_field="download_url",
            paper_id=self.osf_id,
            write_images=False
        ))
        
        # Assert that result is successful
        self.assertIsInstance(result, dict)
        self.assertEqual(result.get("status"), "success")
        
        # Assert content is retrieved and has expected start
        content = result.get("content", "")
        self.assertGreater(len(content), 1000)  # Should have substantial content
        self.assertTrue(content.startswith(self.expected_osf_start))

    def test_openalex_pdf_retrieval(self):
        """Test OpenAlex paper PDF retrieval and content extraction."""
        metadata = fetch_single_openalex_paper_metadata(self.openalex_id)
        result = asyncio.run(download_paper_and_parse_to_markdown(
            metadata=metadata,
            pdf_url_field="pdf_url",
            paper_id=self.openalex_id,
            write_images=False
        ))
        
        # Assert that result is successful
        self.assertIsInstance(result, dict)
        self.assertEqual(result.get("status"), "success")
        
        # Assert content is retrieved and has expected start
        content = result.get("content", "")
        self.assertGreater(len(content), 1000)  # Should have substantial content
        self.assertTrue(content.startswith(self.expected_openalex_start))

    def test_arxiv_pdf_retrieval(self):
        """Test ArXiv paper PDF retrieval and content extraction."""
        metadata = fetch_single_arxiv_paper_metadata(self.arxiv_id)
        result = asyncio.run(download_paper_and_parse_to_markdown(
            metadata=metadata,
            pdf_url_field="download_url",
            paper_id=self.arxiv_id,
            write_images=False
        ))
        
        # Assert that result is successful
        self.assertIsInstance(result, dict)
        self.assertEqual(result.get("status"), "success")
        
        # Assert content is retrieved and has expected start
        content = result.get("content", "")
        self.assertGreater(len(content), 1000)  # Should have substantial content
        self.assertTrue(content.startswith(self.expected_arxiv_start))

    def test_pdf_content_contains_markdown(self):
        """Test that PDF content is properly converted to markdown."""
        metadata = fetch_single_arxiv_paper_metadata(self.arxiv_id)
        result = asyncio.run(download_paper_and_parse_to_markdown(
            metadata=metadata,
            pdf_url_field="download_url",
            paper_id=self.arxiv_id,
            write_images=False
        ))
        
        # Assert successful retrieval
        self.assertEqual(result.get("status"), "success")
        
        content = result.get("content", "")
        
        # Assert markdown characteristics are present
        self.assertIn("##", content)  # Should contain markdown headers
        self.assertIn("**", content)  # Should contain bold text
        self.assertGreater(len(content.split('\n')), 50)  # Should have many lines

    def test_pdf_retrieval_includes_metadata(self):
        """Test that PDF retrieval includes paper metadata."""
        metadata = fetch_single_osf_preprint_metadata(self.osf_id)
        result = asyncio.run(download_paper_and_parse_to_markdown(
            metadata=metadata,
            pdf_url_field="download_url",
            paper_id=self.osf_id,
            write_images=False
        ))
        
        # Assert successful retrieval
        self.assertEqual(result.get("status"), "success")
        
        # Assert metadata is included
        result_metadata = result.get("metadata", {})
        self.assertIsInstance(result_metadata, dict)
        self.assertIn("title", result_metadata)
        self.assertIn("id", result_metadata)


if __name__ == "__main__":
    unittest.main()
```

--------------------------------------------------------------------------------
/src/core/arxiv.py:
--------------------------------------------------------------------------------

```python
import xml.etree.ElementTree as ET
from typing import Any, Dict, Optional
from urllib.parse import quote, urlencode

import requests

from utils import sanitize_api_queries


def fetch_arxiv_papers(
    query: Optional[str] = None,
    category: Optional[str] = None,
    author: Optional[str] = None,
    title: Optional[str] = None,
    max_results: int = 100,
    start_index: int = 0,
) -> Dict[str, Any]:
    """
    Fetch papers from arXiv API using various search parameters.

    Args:
        query: General search query
        category: arXiv category (e.g., 'cs.AI', 'physics.gen-ph')
        author: Author name to search for
        title: Title keywords to search for
        max_results: Maximum number of results to return (default 100)
        start_index: Starting index for pagination (default 0)

    Returns:
        Dictionary containing papers data from arXiv API
    """
    # Build search query
    search_parts = []

    if query:
        search_parts.append(f"all:{sanitize_api_queries(query, max_length=200)}")
    if category:
        search_parts.append(f"cat:{sanitize_api_queries(category, max_length=50)}")
    if author:
        search_parts.append(f"au:{sanitize_api_queries(author, max_length=100)}")
    if title:
        search_parts.append(f"ti:{sanitize_api_queries(title, max_length=200)}")

    if not search_parts:
        # Default search if no parameters provided
        search_query = "all:*"
    else:
        search_query = " AND ".join(search_parts)

    # Build API URL
    base_url = "http://export.arxiv.org/api/query"
    params = {"search_query": search_query, "start": start_index, "max_results": min(max_results, 20)}

    query_string = urlencode(params, safe=":", quote_via=quote)
    url = f"{base_url}?{query_string}"

    try:
        response = requests.get(url, timeout=30)
        response.raise_for_status()

        # Parse XML response
        root = ET.fromstring(response.content)

        # Extract namespace
        ns = {"atom": "http://www.w3.org/2005/Atom", "arxiv": "http://arxiv.org/schemas/atom"}

        papers = []
        for entry in root.findall("atom:entry", ns):
            paper = _parse_arxiv_entry(entry, ns)
            papers.append(paper)

        return {
            "data": papers,
            "meta": {"total_results": len(papers), "start_index": start_index, "max_results": max_results, "search_query": search_query},
        }

    except requests.exceptions.RequestException as e:
        raise ValueError(f"Request failed: {str(e)}")
    except ET.ParseError as e:
        raise ValueError(f"Failed to parse arXiv response: {str(e)}")


def _parse_arxiv_entry(entry, ns):
    """Parse a single arXiv entry from XML."""
    # Extract basic info
    arxiv_id = entry.find("atom:id", ns).text.split("/")[-1] if entry.find("atom:id", ns) is not None else ""
    title = entry.find("atom:title", ns).text.strip() if entry.find("atom:title", ns) is not None else ""
    summary = entry.find("atom:summary", ns).text.strip() if entry.find("atom:summary", ns) is not None else ""
    published = entry.find("atom:published", ns).text if entry.find("atom:published", ns) is not None else ""
    updated = entry.find("atom:updated", ns).text if entry.find("atom:updated", ns) is not None else ""

    # Extract authors
    authors = []
    for author in entry.findall("atom:author", ns):
        name_elem = author.find("atom:name", ns)
        if name_elem is not None:
            authors.append(name_elem.text)

    # Extract categories
    categories = []
    for category in entry.findall("atom:category", ns):
        term = category.get("term")
        if term:
            categories.append(term)

    # Extract links (PDF, abstract)
    pdf_url = ""
    abstract_url = ""
    for link in entry.findall("atom:link", ns):
        if link.get("type") == "application/pdf":
            pdf_url = link.get("href", "")
        elif link.get("rel") == "alternate":
            abstract_url = link.get("href", "")

    # Extract DOI if available
    doi = ""
    doi_elem = entry.find("arxiv:doi", ns)
    if doi_elem is not None:
        doi = doi_elem.text

    return {
        "id": arxiv_id,
        "title": title,
        "summary": summary,
        "authors": authors,
        "categories": categories,
        "published": published,
        "updated": updated,
        "pdf_url": pdf_url,
        "abstract_url": abstract_url,
        "doi": doi,
    }


def fetch_single_arxiv_paper_metadata(paper_id: str) -> Dict[str, Any]:
    """
    Fetch metadata for a single arXiv paper by ID.

    Args:
        paper_id: arXiv paper ID (e.g., '2301.00001' or 'cs.AI/0001001')

    Returns:
        Dictionary containing paper metadata
    """
    # Validate paper exists first
    pdf_url = f"https://arxiv.org/pdf/{paper_id}"
    response = requests.head(pdf_url, timeout=10)
    if response.status_code != 200:
        raise ValueError(f"arXiv paper not found: {paper_id}")

    # Fetch metadata from API
    try:
        api_url = f"http://export.arxiv.org/api/query?id_list={paper_id}"
        response = requests.get(api_url, timeout=30)
        response.raise_for_status()

        # Parse XML response
        root = ET.fromstring(response.content)
        ns = {"atom": "http://www.w3.org/2005/Atom", "arxiv": "http://arxiv.org/schemas/atom"}

        entry = root.find("atom:entry", ns)
        if entry is None:
            raise ValueError(f"No metadata found for paper: {paper_id}")

        metadata = _parse_arxiv_entry(entry, ns)
        metadata["download_url"] = pdf_url

        return metadata

    except requests.exceptions.RequestException as e:
        raise ValueError(f"Failed to fetch paper metadata: {str(e)}")
    except ET.ParseError as e:
        raise ValueError(f"Failed to parse arXiv response: {str(e)}")
```

--------------------------------------------------------------------------------
/src/utils/pdf2md.py:
--------------------------------------------------------------------------------

```python
"""
PDF processing utilities using pymupdf4llm.

download_paper_and_parse_to_markdown()     download_pdf_and_parse_to_markdown()
(with metadata)                            (direct URL)
    |                                         |
    v                                         v
Extract PDF URL from metadata          Generate filename from URL
    |                                         |
    +-------------------+---------------------+
                        |
                        v
            _download_and_parse_pdf_core()
                        |
                        v
                requests.get(pdf_url)
                        |
                        v
            extract_pdf_to_markdown()
                        |
                        v
        Return (content, size, message)
                        |
            +-----------+-----------+
            |                       |
            v                       v
    Format response            Format response
    with metadata              with pdf_url

The shared core logic eliminates code duplication while maintaining 
distinct interfaces for metadata-based vs direct URL workflows.
"""

import os
from typing import Optional
import tempfile
import httpx
import requests

import pymupdf4llm as pdfmd


async def extract_pdf_to_markdown(file_input, filename: Optional[str] = None, write_images: bool = False) -> str:
    """
    Extract PDF content to markdown using pymupdf4llm.

    Args:
        file_input: Can be either:
                   - A file path (str) to an existing PDF
                   - File bytes/content (bytes) that will be written to temp file
                   - A file object with .read() method (for async file handling)
        filename: Optional filename to use for temp file (only used when file_input is bytes/file object)
        write_images: Whether to extract and write images (default: False)

    Returns:
        Markdown content as string
    """
    temp_path = None

    try:
        # Handle different input types
        if isinstance(file_input, str) and os.path.exists(file_input):
            # Direct file path
            md = pdfmd.to_markdown(file_input, write_images=write_images)
            return md

        elif isinstance(file_input, bytes):
            # File bytes - write to temp file
            temp_filename = filename or "temp_pdf.pdf"
            temp_path = f"/tmp/{temp_filename}"
            with open(temp_path, "wb") as f:
                f.write(file_input)
            md = pdfmd.to_markdown(temp_path, write_images=write_images)
            return md

        elif hasattr(file_input, "read"):
            # File object (like FastAPI UploadFile)
            temp_filename = filename or getattr(file_input, "filename", "temp_pdf.pdf")
            temp_path = f"/tmp/{temp_filename}"

            # Handle both sync and async file objects
            if hasattr(file_input, "__aiter__") or hasattr(file_input.read, "__call__"):
                try:
                    # Try async read first
                    content = await file_input.read()
                except TypeError:
                    # Fall back to sync read
                    content = file_input.read()
            else:
                content = file_input.read()

            with open(temp_path, "wb") as f:
                f.write(content)
            md = pdfmd.to_markdown(temp_path, write_images=write_images)
            return md

        else:
            raise ValueError(f"Unsupported file_input type: {type(file_input)}")

    finally:
        # Clean up temporary file
        if temp_path and os.path.exists(temp_path):
            try:
                os.unlink(temp_path)
            except Exception:
                pass  # Ignore cleanup errors


async def _download_and_parse_pdf_core(
    pdf_url: str, 
    filename: str = "paper.pdf",
    write_images: bool = False
) -> tuple[str, int, str]:
    # Download PDF
    pdf_response = requests.get(pdf_url, timeout=60)
    pdf_response.raise_for_status()
    
    # Parse PDF to markdown
    markdown_content = await extract_pdf_to_markdown(
        pdf_response.content, 
        filename=filename, 
        write_images=write_images
    )
    
    file_size = len(pdf_response.content)
    message = f"Successfully parsed PDF content ({file_size} bytes)"
    
    return markdown_content, file_size, message


async def download_paper_and_parse_to_markdown(
    metadata: dict, 
    pdf_url_field: str = "download_url",
    paper_id: str = "",
    write_images: bool = False
) -> dict:
    # Extract PDF URL from metadata
    pdf_url = metadata.get(pdf_url_field)
    if not pdf_url:
        return {
            "status": "error", 
            "message": f"No PDF URL found in metadata field '{pdf_url_field}'", 
            "metadata": metadata
        }

    try:
        filename = f"{paper_id}.pdf" if paper_id else "paper.pdf"
        markdown_content, file_size, message = await _download_and_parse_pdf_core(
            pdf_url, filename, write_images
        )
        
        return {
            "status": "success",
            "metadata": metadata,
            "content": markdown_content,
            "file_size": file_size,
            "message": message,
        }

    except requests.exceptions.RequestException as e:
        return {
            "status": "error", 
            "message": f"Network error: {str(e)}", 
            "metadata": metadata
        }
    except Exception as e:
        return {
            "status": "error", 
            "message": f"Error parsing PDF: {str(e)}", 
            "metadata": metadata
        }


async def download_pdf_and_parse_to_markdown(pdf_url: str, write_images: bool = False) -> dict:
    try:
        filename = pdf_url.split('/')[-1] if '/' in pdf_url else "paper.pdf"
        if not filename.endswith('.pdf'):
            filename = "paper.pdf"
            
        markdown_content, file_size, message = await _download_and_parse_pdf_core(
            pdf_url, filename, write_images
        )
        
        return {
            "status": "success",
            "content": markdown_content,
            "file_size": file_size,
            "pdf_url": pdf_url,
            "message": message,
        }

    except requests.exceptions.RequestException as e:
        return {
            "status": "error", 
            "message": f"Network error downloading PDF: {str(e)}", 
            "pdf_url": pdf_url
        }
    except Exception as e:
        return {
            "status": "error", 
            "message": f"Error parsing PDF: {str(e)}", 
            "pdf_url": pdf_url
        }

```

--------------------------------------------------------------------------------
/src/tools.py:
--------------------------------------------------------------------------------

```python

from typing import Annotated

from fastmcp import FastMCP

from core import (
    fetch_arxiv_papers,
    fetch_openalex_papers,
    fetch_osf_preprints,
    fetch_single_arxiv_paper_metadata,
    fetch_single_openalex_paper_metadata,
    fetch_single_osf_preprint_metadata,
    fetch_osf_providers,
    get_all_providers,
)
from utils.pdf2md import download_pdf_and_parse_to_markdown, download_paper_and_parse_to_markdown

tools_mcp = FastMCP()

@tools_mcp.tool(
    name="list_providers",
    description="Get the complete list of all available academic paper providers. Includes preprint servers (ArXiv, Open Science Framework (OSF) discipline-specific servers). Returns provider IDs for use with search_papers.",
)
async def list_providers() -> dict:
    """
    Call the osf api and hardcode other supported providers.

    """
    providers = get_all_providers()

    return {
        "providers": providers,
        "total_count": len(providers),
    }


@tools_mcp.tool(
    name="search_papers",
    description="Find papers using supported filters. And retrieve their metadata.",
)
async def search_papers(
    query: Annotated[str | None, "Text search query for title, author, content"] = None,
    provider: Annotated[str | None, "Provider ID to filter preprints (e.g., psyarxiv, socarxiv, arxiv, openalex, osf)"] = None,
    subjects: Annotated[str | None, "Subject categories to filter by (e.g., psychology, neuroscience)"] = None,
    date_published_gte: Annotated[str | None, "Filter preprints published on or after this date (e.g., 2024-01-01)"] = None,
) -> dict:
    if provider and provider not in [p["id"] for p in get_all_providers()]:
        return {
            "error": f"Provider: {provider} not found. Please use list_preprint_providers to get the complete list of all available providers.",
        }
    if not provider:
        all_results = []
        
        arxiv_results = fetch_arxiv_papers(query=query, category=subjects)
        all_results.append(arxiv_results)
    
        openalex_results = fetch_openalex_papers(
            query=query, 
            concepts=subjects, 
            date_published_gte=date_published_gte
        )
        all_results.append(openalex_results)
    
        osf_results = fetch_osf_preprints(
            provider_id="osf",
            subjects=subjects,
            date_published_gte=date_published_gte,
            query=query,
        )
        all_results.append(osf_results)
        
        return {
            "papers": all_results,
            "total_count": len(all_results),
            "providers_searched": ["arxiv", "openalex", "osf"],
        }
    if provider == "osf" or provider in [p["id"] for p in fetch_osf_providers()]:
        return fetch_osf_preprints( provider_id=provider,
            subjects=subjects,
            date_published_gte=date_published_gte,
            query=query,
        )
    elif provider == "arxiv":
        return fetch_arxiv_papers(
            query=query,
            category=subjects,
        )
    elif provider == "openalex":
        return fetch_openalex_papers(
            query=query,
            concepts=subjects,
            date_published_gte=date_published_gte,
        )


@tools_mcp.tool(
    name="get_paper_by_id",
    description="Download and convert an academic paper to markdown format by its ID. Returns full paper content including title, abstract, sections, and references. Supports ArXiv (e.g., '2407.06405v1'), OpenAlex (e.g., 'W4385245566'), and OSF IDs.",
)
async def get_paper_by_id(paper_id: str) -> dict:
    try:
        # Check if it's an OpenAlex paper ID (starts with 'W' followed by numbers)
        if paper_id.startswith("W") and paper_id[1:].isdigit():
            # OpenAlex paper ID format (e.g., "W4385245566")
            metadata = fetch_single_openalex_paper_metadata(paper_id)
            return await download_paper_and_parse_to_markdown(
                metadata=metadata,
                pdf_url_field="pdf_url",
                paper_id=paper_id,
                write_images=False
            )
        # Check if it's an arXiv paper ID (contains 'v' followed by version number or matches arXiv format)
        elif "." in paper_id and ("v" in paper_id or len(paper_id.split(".")[0]) == 4):
            # arXiv paper ID format (e.g., "2407.06405v1" or "cs.AI/0001001")
            metadata = fetch_single_arxiv_paper_metadata(paper_id)
            return await download_paper_and_parse_to_markdown(
                metadata=metadata,
                pdf_url_field="download_url",
                paper_id=paper_id,
                write_images=False
            )
        else:
            # OSF paper ID format
            metadata = fetch_single_osf_preprint_metadata(paper_id)
            # Handle error case from OSF metadata function
            if isinstance(metadata, dict) and metadata.get("status") == "error":
                return metadata
            return await download_paper_and_parse_to_markdown(
                metadata=metadata,
                pdf_url_field="download_url",
                paper_id=paper_id,
                write_images=False
            )
    except ValueError as e:
        return {"status": "error", "message": str(e), "metadata": {}}


@tools_mcp.tool(
    name="get_paper_metadata_by_id",
    description="Get metadata for an academic paper by its ID without downloading full content. Returns title, authors, abstract, publication date, journal info, and download URLs. Supports ArXiv, OpenAlex, and OSF IDs.",
)
async def get_paper_metadata_by_id(preprint_id: str) -> dict:
    # Check if it's an OpenAlex paper ID (starts with 'W' followed by numbers)
    if preprint_id.startswith("W") and preprint_id[1:].isdigit():
        # OpenAlex paper ID format (e.g., "W4385245566")
        return fetch_single_openalex_paper_metadata(preprint_id)
    # Check if it's an arXiv paper ID (contains 'v' followed by version number or matches arXiv format)
    elif "." in preprint_id and ("v" in preprint_id or len(preprint_id.split(".")[0]) == 4):
        # arXiv paper ID format (e.g., "2407.06405v1" or "cs.AI/0001001")
        return fetch_single_arxiv_paper_metadata(preprint_id)
    else:
        # OSF paper ID format
        return fetch_single_osf_preprint_metadata(preprint_id)


@tools_mcp.tool(
    name="get_paper_content_by_url",
    description="Download and convert the PDF of a paper to markdown format from a direct PDF URL. Returns full paper content parsed from the PDF including title, abstract, sections, and references.",
)
async def get_paper_content_by_url(pdf_url: str) -> dict:
    return await download_pdf_and_parse_to_markdown(pdf_url)
```

--------------------------------------------------------------------------------
/src/core/openalex.py:
--------------------------------------------------------------------------------

```python
import requests
from typing import Any, Dict, Optional
from urllib.parse import urlencode

from utils import sanitize_api_queries


def fetch_openalex_papers(
    query: Optional[str] = None,
    author: Optional[str] = None,
    title: Optional[str] = None,
    publisher: Optional[str] = None,
    institution: Optional[str] = None,
    concepts: Optional[str] = None,
    date_published_gte: Optional[str] = None,
    max_results: int = 20,
    page: int = 1,
) -> Dict[str, Any]:
    """
    Fetch papers from the OpenAlex API using various search parameters.

    Args:
        query: General search query (full-text search)
        author: Author name to search for
        title: Title keywords to search for
        publisher: Publisher name to search for
        institution: Institution name to search for
        concepts: Concepts to filter by (e.g., 'computer science', 'artificial intelligence')
        date_published_gte: Published date greater than or equal to (YYYY-MM-DD)
        max_results: Maximum number of results to return (default 20, max 200)
        page: Page number for pagination (default 1)

    Returns:
        Dictionary containing papers data from OpenAlex API
    """
    base_url = "https://api.openalex.org/works"
    filters = {}

    if query:
        filters["search"] = sanitize_api_queries(query, max_length=500)
    if author:
        filters["filter"] = f"authors.author_name.search:{sanitize_api_queries(author, max_length=200)}"
    if title:
        if "filter" in filters:
            filters["filter"] += f",{sanitize_api_queries(title, max_length=500)}"
        else:
            filters["filter"] = f"title.search:{sanitize_api_queries(title, max_length=500)}"
    if publisher:
        if "filter" in filters:
            filters["filter"] += f",publisher.search:{sanitize_api_queries(publisher, max_length=200)}"
        else:
            filters["filter"] = f"publisher.search:{sanitize_api_queries(publisher, max_length=200)}"
    if institution:
        if "filter" in filters:
            filters["filter"] += f",institutions.institution_name.search:{sanitize_api_queries(institution, max_length=200)}"
        else:
            filters["filter"] = f"institutions.institution_name.search:{sanitize_api_queries(institution, max_length=200)}"
    if concepts:
        # OpenAlex concepts can be tricky, simple search might work for now
        if "filter" in filters:
            filters["filter"] += f",concepts.display_name.search:{sanitize_api_queries(concepts, max_length=200)}"
        else:
            filters["filter"] = f"concepts.display_name.search:{sanitize_api_queries(concepts, max_length=200)}"
    if date_published_gte:
        if "filter" in filters:
            filters["filter"] += f",publication_date:>{date_published_gte}"
        else:
            filters["filter"] = f"publication_date:>{date_published_gte}"

    # Add pagination and results limit
    filters["per_page"] = min(max_results, 200)  # OpenAlex max per_page is 200
    filters["page"] = page

    try:
        query_string = urlencode(filters, safe=":,") # Allow colons and commas in filter values
        url = f"{base_url}?{query_string}"
        response = requests.get(url, timeout=30)
        response.raise_for_status()
        data = response.json()

        papers = []
        for result in data.get("results", []):
            paper = _parse_openalex_work(result)
            papers.append(paper)

        return {
            "data": papers,
            "meta": {
                "total_results": data.get("meta", {}).get("count", 0),
                "page": page,
                "per_page": filters["per_page"],
                "search_query": query, # Only include general query for simplicity
            },
            "links": data.get("meta", {}).get("next_page", ""),
        }

    except requests.exceptions.RequestException as e:
        raise ValueError(f"Request failed: {str(e)}")


def _parse_openalex_work(work_data: Dict[str, Any]) -> Dict[str, Any]:
    """Parse a single OpenAlex work entry."""
    # Extract authors
    authors = []
    for authorship in work_data.get("authorships", []):
        author = authorship.get("author", {})
        if author and author.get("display_name"):
            authors.append(author["display_name"])

    # Extract concepts
    concepts = []
    for concept in work_data.get("concepts", []):
        if concept.get("display_name"):
            concepts.append(concept["display_name"])

    # Extract PDF URL from primary location or alternative locations
    pdf_url = ""
    primary_location = work_data.get("primary_location") or {}
    if primary_location and primary_location.get("pdf_url"):
        pdf_url = primary_location["pdf_url"]
    elif primary_location and primary_location.get("landing_page_url"):
        pdf_url = primary_location.get("landing_page_url", "")
    else:
        # Check all locations for a PDF URL if primary doesn't have one
        for location in work_data.get("locations", []):
            if location.get("pdf_url"):
                pdf_url = location["pdf_url"]
                break

    # Extract abstract from inverted index
    abstract = ""
    abstract_inverted_index = work_data.get("abstract_inverted_index", {})
    if abstract_inverted_index:
        abstract = _reconstruct_abstract_from_inverted_index(abstract_inverted_index)

    # Extract OpenAlex ID from URL
    openalex_id = work_data.get("id", "")
    if openalex_id.startswith("https://openalex.org/"):
        openalex_id = openalex_id.replace("https://openalex.org/", "")

    # Get primary location source info
    primary_source = ""
    if primary_location and primary_location.get("source"):
        source = primary_location.get("source") or {}
        primary_source = source.get("display_name", "")

    return {
        "id": openalex_id,
        "doi": work_data.get("doi", ""),
        "title": work_data.get("title", "") or work_data.get("display_name", ""),
        "abstract": abstract,
        "authors": authors,
        "publication_date": work_data.get("publication_date", ""),
        "publication_year": work_data.get("publication_year"),
        "cited_by_count": work_data.get("cited_by_count", 0),
        "concepts": concepts,
        "primary_location_url": (work_data.get("primary_location") or {}).get("landing_page_url", ""),
        "primary_source": primary_source,
        "pdf_url": pdf_url,
        "open_access_status": (work_data.get("open_access") or {}).get("oa_status", "closed"),
        "is_open_access": (work_data.get("primary_location") or {}).get("is_oa", False),
        "type": work_data.get("type", ""),
        "relevance_score": work_data.get("relevance_score", 0),
    }


def _reconstruct_abstract_from_inverted_index(inverted_index: Dict[str, Any]) -> str:
    """Reconstruct abstract text from OpenAlex's inverted index format."""
    if not inverted_index:
        return ""
    
    try:
        # Create a list to hold words at their positions
        word_positions = []
        
        for word, positions in inverted_index.items():
            if isinstance(positions, list):
                for position in positions:
                    word_positions.append((position, word))
        
        # Sort by position and reconstruct text
        word_positions.sort(key=lambda x: x[0])
        abstract_words = [word for _, word in word_positions]
        
        return " ".join(abstract_words)
    except Exception:
        # If reconstruction fails, return empty string
        return ""


def fetch_single_openalex_paper_metadata(paper_id: str) -> Dict[str, Any]:
    """
    Fetch metadata for a single OpenAlex paper by ID.

    Args:
        paper_id: OpenAlex paper ID (e.g., 'W2741809809')

    Returns:
        Dictionary containing paper metadata
    """
    base_url = "https://api.openalex.org/works"
    url = f"{base_url}/{paper_id}"

    try:
        response = requests.get(url, timeout=30)
        response.raise_for_status()
        work_data = response.json()

        if not work_data.get("id"):
            raise ValueError(f"No metadata found for paper: {paper_id}")

        metadata = _parse_openalex_work(work_data)
        return metadata

    except requests.exceptions.RequestException as e:
        raise ValueError(f"Failed to fetch paper metadata: {str(e)}")
```

--------------------------------------------------------------------------------
/src/core/osf.py:
--------------------------------------------------------------------------------

```python
from typing import Any, Dict, Optional
from urllib.parse import quote, urlencode

import requests

from utils import sanitize_api_queries

from .providers import fetch_osf_providers, validate_provider


def fetch_osf_preprints(
    provider_id: Optional[str] = None,
    subjects: Optional[str] = None,
    date_published_gte: Optional[str] = None,
    query: Optional[str] = None,
) -> Dict[str, Any]:
    """
    NOTE: The OSF API only supports a limited set of filters. Many common filters
    like title, DOI, creator, etc. are NOT supported by the OSF API.

    When query is provided, uses the trove search endpoint which supports full-text search.

    Args:
        provider_id: The provider ID (e.g., 'psyarxiv', 'socarxiv')
        subjects: Subject filter (e.g., 'psychology', 'neuroscience')
        date_published_gte: Published date greater than or equal to (YYYY-MM-DD)
        query: Text search query for title, author, content (uses trove endpoint)

    Returns:
        Dictionary containing preprints data from OSF API or trove search
    """
    # If query is provided, use trove search endpoint
    if query:
        return fetch_osf_preprints_via_trove(query, provider_id)

    # Build query parameters (only using OSF API supported filters)
    filters = {}

    if provider_id:
        filters["filter[provider]"] = sanitize_api_queries(provider_id, max_length=50)
    if subjects:
        filters["filter[subjects]"] = sanitize_api_queries(subjects, max_length=100)
    if date_published_gte:
        filters["filter[date_published][gte]"] = date_published_gte  # Dates don't need cleaning

    # Build URL with filters
    base_url = "https://api.osf.io/v2/preprints/"
    if filters:
        query_string = urlencode(filters, safe="", quote_via=quote)
        url = f"{base_url}?{query_string}"
    else:
        url = base_url

    try:
        response = requests.get(url, timeout=30)
        response.raise_for_status()
        return response.json()
    except requests.exceptions.HTTPError as e:
        if response.status_code == 400:
            if len(filters) > 1:
                simple_filters = {}
                if provider_id:
                    simple_filters["filter[provider]"] = sanitize_api_queries(provider_id, max_length=50)

                simple_query = urlencode(simple_filters, safe="", quote_via=quote)
                simple_url = f"{base_url}?{simple_query}"

                try:
                    simple_response = requests.get(simple_url, timeout=30)
                    simple_response.raise_for_status()
                    result = simple_response.json()

                    # Add a note about the simplified search
                    if "meta" not in result:
                        result["meta"] = {}
                    result["meta"][
                        "search_note"
                    ] = f"Original search failed (400 error), showing all results for provider '{provider_id}'. You may need to filter results manually."
                    return result
                except:
                    pass

            raise ValueError(f"Bad request (400) - The search parameters may be invalid. Original error: {str(e)}")
        else:
            raise e
    except requests.exceptions.RequestException as e:
        raise ValueError(f"Request failed: {str(e)}")


def fetch_osf_preprints_via_trove(query: str, provider_id: Optional[str] = None) -> Dict[str, Any]:
    """
    Fetch preprints using the trove search endpoint and transform to standard format.
    """
    from urllib.parse import quote_plus

    # Build trove search URL
    base_url = "https://share.osf.io/trove/index-card-search"
    params = {
        "cardSearchFilter[resourceType]": "Preprint",
        "cardSearchText[*,creator.name,isContainedBy.creator.name]": sanitize_api_queries(query, max_length=200),
        "page[size]": "20",  # Match our default page size
        "sort": "-relevance",
    }

    # Validate provider if specified (we'll filter results later)
    if provider_id:
        if not validate_provider(provider_id):
            osf_providers = fetch_osf_providers()
            valid_ids = [p["id"] for p in osf_providers]
            raise ValueError(f"Invalid OSF provider: {provider_id}. Valid OSF providers: {valid_ids}")

    # Build query string manually to handle complex parameter names
    query_parts = []
    for key, value in params.items():
        query_parts.append(f"{quote_plus(key)}={quote_plus(str(value))}")
    query_string = "&".join(query_parts)
    url = f"{base_url}?{query_string}"

    try:
        headers = {"Accept": "application/json"}
        response = requests.get(url, headers=headers, timeout=30)
        response.raise_for_status()
        trove_data = response.json()

        # Transform trove format to standard OSF API format
        transformed_data = []
        for item in trove_data.get("data", []):
            # Extract OSF ID from @id field
            osf_id = ""
            if "@id" in item and "osf.io/" in item["@id"]:
                osf_id = item["@id"].split("/")[-1]

            # Filter by provider if specified
            if provider_id:
                # Check if this item is from the specified provider
                publisher_info = item.get("publisher", [])
                if isinstance(publisher_info, list) and len(publisher_info) > 0:
                    publisher_id = publisher_info[0].get("@id", "")
                    # Extract provider ID from publisher URL (e.g., "https://osf.io/preprints/psyarxiv" -> "psyarxiv")
                    if provider_id not in publisher_id:
                        continue  # Skip this item if it doesn't match the provider
                else:
                    continue  # Skip if no publisher info

            # Transform to standard format
            transformed_item = {
                "id": osf_id,
                "type": "preprints",
                "attributes": {
                    "title": extract_first_value(item.get("title", [])),
                    "description": extract_first_value(item.get("description", [])),
                    "date_created": extract_first_value(item.get("dateCreated", [])),
                    "date_published": extract_first_value(item.get("dateAccepted", [])),
                    "date_modified": extract_first_value(item.get("dateModified", [])),
                    "doi": extract_doi_from_identifiers(item.get("identifier", [])),
                    "tags": [kw.get("@value", "") for kw in item.get("keyword", [])],
                    "subjects": [subj.get("prefLabel", [{}])[0].get("@value", "") for subj in item.get("subject", [])],
                },
                "relationships": {},
                "links": {"self": item.get("@id", "")},
            }
            transformed_data.append(transformed_item)

        # Return in standard OSF API format
        return {
            "data": transformed_data,
            "meta": {
                "version": "2.0",  # Match OSF API version
                "total": trove_data.get("meta", {}).get("total", len(transformed_data)),
                "search_note": f"Results from trove search for query: '{query}'",
            },
            "links": {
                "first": trove_data.get("links", {}).get("first", ""),
                "next": trove_data.get("links", {}).get("next", ""),
                "last": "",
                "prev": "",
                "meta": "",
            },
        }

    except requests.exceptions.RequestException as e:
        raise ValueError(f"Trove search failed: {str(e)}")


def extract_first_value(field_list):
    """Extract the first @value from a field list."""
    if isinstance(field_list, list) and len(field_list) > 0:
        if isinstance(field_list[0], dict) and "@value" in field_list[0]:
            return field_list[0]["@value"]
        elif isinstance(field_list[0], str):
            return field_list[0]
    return ""


def extract_doi_from_identifiers(identifiers):
    """Extract DOI from identifier list."""
    for identifier in identifiers:
        if isinstance(identifier, dict) and "@value" in identifier:
            value = identifier["@value"]
            if "doi.org" in value or value.startswith("10."):
                return value
    return ""


def fetch_single_osf_preprint_metadata(preprint_id: str) -> Dict[str, Any]:
    try:
        preprint_url = f"https://api.osf.io/v2/preprints/{preprint_id}"
        response = requests.get(preprint_url, timeout=30)
        response.raise_for_status()
        preprint_data = response.json()

        primary_file_url = preprint_data["data"]["relationships"]["primary_file"]["links"]["related"]["href"]
        file_response = requests.get(primary_file_url, timeout=30)
        file_response.raise_for_status()
        file_data = file_response.json()

        # Get the download URL
        download_url = file_data["data"]["links"]["download"]

        # Prepare metadata first
        attributes = preprint_data["data"]["attributes"]
        metadata = {
            "id": preprint_id,
            "title": attributes.get("title", ""),
            "description": attributes.get("description", ""),
            "date_created": attributes.get("date_created", ""),
            "date_published": attributes.get("date_published", ""),
            "date_modified": attributes.get("date_modified", ""),
            "is_published": attributes.get("is_published", False),
            "is_preprint_orphan": attributes.get("is_preprint_orphan", False),
            "license_record": attributes.get("license_record", {}),
            "doi": attributes.get("doi", ""),
            "tags": attributes.get("tags", []),
            "subjects": attributes.get("subjects", []),
            "download_url": download_url,
        }

        if not download_url:
            return {"status": "error", "message": "Download URL not available", "metadata": metadata}

        return metadata
    except requests.exceptions.RequestException as e:
        raise ValueError(f"Failed to fetch preprint metadata: {str(e)}")

```