# Directory Structure
```
├── .dockerignore
├── .github
│ ├── ISSUE_TEMPLATE
│ │ ├── bug_report.yml
│ │ ├── config.yml
│ │ ├── feature_request.yml
│ │ └── question.yml
│ ├── pull_request_template.md
│ └── workflows
│ ├── deploy.yml
│ ├── ping-server.yml
│ └── tests.yml
├── .gitignore
├── .python-version
├── assets
│ ├── cursor-usage.png
│ ├── langdock-usage.png
│ └── paperclip.svg
├── CONTRIBUTING.md
├── docker-compose.prod.yml
├── docker-compose.yml
├── Dockerfile
├── LICENSE.md
├── README.md
├── requirements.txt
├── src
│ ├── core
│ │ ├── __init__.py
│ │ ├── arxiv.py
│ │ ├── openalex.py
│ │ ├── osf.py
│ │ └── providers.py
│ ├── prompts.py
│ ├── server.py
│ ├── tools.py
│ └── utils
│ ├── __init__.py
│ ├── pdf2md.py
│ └── sanitize_api_queries.py
└── tests
├── __init__.py
├── test_metadata_retrieval.py
└── test_pdf_retrieval.py
```
# Files
--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------
```
3.12.7
```
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
venv
env
.venv
.env
__pycache__/
*.xlsx
*.csv
.DS_Store
.luarc.json
*node_modules/
```
--------------------------------------------------------------------------------
/.dockerignore:
--------------------------------------------------------------------------------
```
# Git
.git
.gitignore
.gitattributes
# Docker
Dockerfile*
docker-compose*
.dockerignore
# Documentation
*.md
docs/
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# Virtual environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
.conda/
# IDEs
.cursor/
.vscode/
.idea/
*.swp
*.swo
*~
# OS
.DS_Store
.DS_Store?
._*
.Spotlight-V100
.Trashes
ehthumbs.db
Thumbs.db
# Testing
.tox/
.nox/
.coverage
.pytest_cache/
htmlcov/
.cache
tests/
# Jupyter Notebook
.ipynb_checkpoints
# Environment variables
.env*
env.prod.template
# Logs
*.log
logs/
# Temporary files
*.tmp
*.temp
.tmp/
.temp/
```
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
```markdown
<div align="center">
<img src="assets/paperclip.svg" alt="Paperclip Logo" width="48" height="48">
# Paperclip MCP Server
</div>
> 📎 Paperclip is a Model Context Protocol (MCP) server that enables searching and retrieving research papers from Arxiv, the Open Science Framework (OSF) API, and OpenAlex.
[](https://github.com/matsjfunke/paperclip/actions/workflows/tests.yml)
[](https://github.com/matsjfunke/paperclip/actions/workflows/ping-server.yml)
[](LICENSE.md)
## Quick Start
Setup the paperclip MCP server in your host via the server url `https://paperclip.matsjfunke.com/mcp` no authentication is needed.
Example JSON for cursor:
```json
{
"mcpServers": {
"paperclip": {
"url": "https://paperclip.matsjfunke.com/mcp"
}
}
}
```
## Table of Contents
- [Quick Start](#quick-start)
- [Usage Examples](#usage-examples)
- [Supported Paper providers](#supported-paper-providers)
- [Preprint Providers to be added](#preprint-providers-to-be-added)
- [Contributing](#contributing)
## Usage Examples
Here are examples of Paperclip integrated with popular MCP clients:
**Cursor IDE:**

**Langdock:**

## Supported Paper providers
- [AfricArXiv](https://africarxiv.org)
- [AgriXiv](https://agrirxiv.org)
- [ArabXiv](https://arabixiv.org)
- [arXiv](https://arxiv.org)
- [BioHackrXiv](http://guide.biohackrxiv.org/about.html)
- [BodoArXiv](https://bodoarxiv.wordpress.com)
- [COP Preprints](https://www.collegeofphlebology.com)
- [EarthArXiv](https://eartharxiv.org)
- [EcoEvoRxiv](https://www.ecoevorxiv.com)
- [ECSarxiv](https://ecsarxiv.org)
- [EdArXiv](https://edarxiv.org)
- [EngrXiv](https://engrxiv.org)
- [FocusArchive](https://osf.io/preprints/focusarchive)
- [Frenxiv](https://frenxiv.org)
- [INArxiv](https://rinarxiv.lipi.go.id)
- [IndiaRxiv](https://osf.io/preprints/indiarxiv)
- [Law Archive](https://library.law.yale.edu/research/law-archive)
- [LawArXiv](https://osf.io/preprints/lawarxiv)
- [LISSA](https://osf.io/preprints/lissa)
- [LiveData](https://osf.io/preprints/livedata)
- [MarXiv](https://osf.io/preprints/marxiv)
- [MediArXiv](https://mediarxiv.com)
- [MetaArXiv](https://osf.io/preprints/metaarxiv)
- [MindRxiv](https://osf.io/preprints/mindrxiv)
- [NewAddictionSx](https://osf.io/preprints/newaddictionsx)
- [NutriXiv](https://niblunc.org)
- [OpenAlex](https://openalex.org)
- [OSF Preprints](https://osf.io/preprints/osf)
- [PaleoRxiv](https://osf.io/preprints/paleorxiv)
- [PsyArXiv](https://psyarxiv.com)
- [SocArXiv](https://socopen.org/welcome)
- [SportRxiv](http://sportrxiv.org)
- [Thesis Commons](https://osf.io/preprints/thesiscommons)
## Preprint Providers to be added
[List of preprint repositorys](https://en.wikipedia.org/wiki/List_of_preprint_repositories)
- bioRxiv & medRxiv both share the underlying api structure (https://api.biorxiv.org/pubs/[server]/[interval]/[cursor] where [server] can be "biorxiv" or "medrxiv")
- ChemRxiv
- [hal open science](https://hal.science/?lang=en)
- [research square](https://www.researchsquare.com/)
- [osf preprints](https://osf.io/preprints)
- [preprints.org](https://preprints.org)
- [science open](https://www.scienceopen.com/)
- [SSRN](https://www.ssrn.com/index.cfm/en/the-lancet/)
- [synthical](https://synthical.com/feed/new)
## Contributing
Interested in contributing to Paperclip? Check out our [Contributing Guide](CONTRIBUTING.md) for development setup instructions, testing procedures, and more!
```
--------------------------------------------------------------------------------
/LICENSE.md:
--------------------------------------------------------------------------------
```markdown
MIT License
Copyright (c) 2025 Mats Julius Funke
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
```
--------------------------------------------------------------------------------
/CONTRIBUTING.md:
--------------------------------------------------------------------------------
```markdown
# Contributing to Paperclip
Thank you for your interest in contributing to Paperclip! This guide will help you get started with development.
## Development Setup
### Prerequisites
- Python 3.12+
- pip
### Installation
1. **Fork and clone the repository**
- Fork this repository on GitHub
- Clone your fork:
```bash
git clone https://github.com/YOUR_USERNAME/paperclip.git
cd paperclip
```
2. **Create and activate virtual environment**
```bash
python -m venv .venv
source .venv/bin/activate # On Windows: .venv\Scripts\activate
```
3. **Install dependencies**
```bash
pip install -r requirements.txt
```
4. **Add dependencies**
```bash
pip install <new-lib>
pip freeze > requirements.txt
```
### Running the Server with Hot Reload
```bash
# Run with hot reload
watchmedo auto-restart --patterns="*.py" --recursive -- python src/server.py
# Run Server using fastmcp
fastmcp run src/server.py --transport http --host 0.0.0.0 --port 8000
# use docker compose
docker-compose up --build
```
The server will automatically restart when you make changes to any `.py` files.
## Testing
Use the [MCP Inspector](https://inspector.modelcontextprotocol.io/) to interact with the server.
```bash
pnpx @modelcontextprotocol/inspector
```
### Unit Tests
Run the unit tests to verify the functionality of individual components:
```bash
# Run all tests
python -m unittest discover tests -v
```
## Contributing Changes
### Creating a Pull Request
1. **Create a feature branch**
```bash
git checkout -b feat/your-feature-name
# or for bug fixes:
git checkout -b fix/issue-description
```
2. **Make your changes**
- Write your code following the existing style
- Add tests for new functionality
- Update documentation as needed
3. **Commit your changes and push to your fork**
```bash
git push origin feat/your-feature-name
```
4. **Open a Pull Request**
- Go to the original repository on GitHub
- Click "New Pull Request"
- Select your branch from your fork
- Fill out the PR template with:
- Clear description of changes
- Link to related issues (if applicable)
- Testing steps you've performed
### Pull Request Guidelines
- **Keep PRs focused**: One feature or fix per PR
- **Write clear descriptions**: Explain what changes you made and why
- **Test your changes**: Ensure all tests pass before submitting
- **Update documentation**: Add or update docs for new features
- **Be responsive**: Address feedback and questions promptly
```
--------------------------------------------------------------------------------
/tests/__init__.py:
--------------------------------------------------------------------------------
```python
# Tests package for Paperclip MCP Server
```
--------------------------------------------------------------------------------
/.github/ISSUE_TEMPLATE/config.yml:
--------------------------------------------------------------------------------
```yaml
blank_issues_enabled: true
contact_links:
- name: 📖 Documentation
url: https://github.com/matsjfunke/paperclip/blob/main/README.md
about: Check the README for documentation
```
--------------------------------------------------------------------------------
/src/utils/__init__.py:
--------------------------------------------------------------------------------
```python
"""
Utility functions for the paperclip MCP server.
"""
from .pdf2md import extract_pdf_to_markdown
from .sanitize_api_queries import sanitize_api_queries
__all__ = ["sanitize_api_queries", "extract_pdf_to_markdown"]
```
--------------------------------------------------------------------------------
/docker-compose.yml:
--------------------------------------------------------------------------------
```yaml
version: "3.8"
services:
paperclip:
build:
context: .
image: paperclip-image
container_name: paperclip
ports:
- 8000:8000
volumes:
- ./:/app # mount local backend dir to /app in container to enable live reloading of code changes
command: watchmedo auto-restart --patterns="*.py" --recursive -- python src/server.py --transport http --host 0.0.0.0 --port 8000
```
--------------------------------------------------------------------------------
/assets/paperclip.svg:
--------------------------------------------------------------------------------
```
<svg xmlns="http://www.w3.org/2000/svg" width="48" height="48" viewBox="0 0 24 24" fill="none" stroke="white" stroke-width="2" stroke-linecap="round" stroke-linejoin="round" class="icon icon-tabler icons-tabler-outline icon-tabler-paperclip"><path stroke="none" d="M0 0h24v24H0z" fill="none"/><path d="M15 7l-6.5 6.5a1.5 1.5 0 0 0 3 3l6.5 -6.5a3 3 0 0 0 -6 -6l-6.5 6.5a4.5 4.5 0 0 0 9 9l6.5 -6.5" /></svg>
```
--------------------------------------------------------------------------------
/.github/pull_request_template.md:
--------------------------------------------------------------------------------
```markdown
## Description
Brief description of what this PR does.
## Changes Made
- [ ] List specific changes
- [ ] Include any new features
- [ ] Mention any bug fixes
## Testing
- [ ] All existing tests pass
- [ ] Added tests for new functionality (if applicable)
- [ ] Tested base functonality manually with MCP Inspector
## Related Issues
Closes #[issue-number] (if applicable)
## Additional Notes
Any additional context or considerations for reviewers.
```
--------------------------------------------------------------------------------
/Dockerfile:
--------------------------------------------------------------------------------
```dockerfile
FROM python:3.12-slim-bullseye
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Update sources list and install packages (assuming these are needed for your app)
RUN apt-get update && \
apt-get upgrade -y && \
apt-get install -y --no-install-recommends \
libgl1-mesa-glx \
libglib2.0-0 \
&& apt-get clean && \
rm -rf /var/lib/apt/lists/*
RUN pip install --upgrade pip
COPY ./src .
EXPOSE 8000
```
--------------------------------------------------------------------------------
/.github/workflows/tests.yml:
--------------------------------------------------------------------------------
```yaml
name: Tests
on:
push:
branches: [main]
workflow_dispatch:
jobs:
test:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Python 3.12
uses: actions/setup-python@v4
with:
python-version: 3.12
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Run all tests
run: |
python -m unittest discover tests -v
```
--------------------------------------------------------------------------------
/src/core/__init__.py:
--------------------------------------------------------------------------------
```python
"""
Core package for paperclip MCP server.
"""
from .arxiv import (
fetch_arxiv_papers,
fetch_single_arxiv_paper_metadata,
)
from .osf import (
fetch_osf_preprints,
fetch_osf_providers,
fetch_single_osf_preprint_metadata,
)
from .openalex import (
fetch_openalex_papers,
fetch_single_openalex_paper_metadata,
)
from .providers import get_all_providers, validate_provider, fetch_osf_providers
__all__ = [
"fetch_arxiv_papers",
"fetch_osf_preprints",
"fetch_osf_providers",
"fetch_single_arxiv_paper_metadata",
"fetch_single_osf_preprint_metadata",
"fetch_openalex_papers",
"fetch_single_openalex_paper_metadata",
"get_all_providers",
"validate_provider",
"fetch_osf_providers",
]
```
--------------------------------------------------------------------------------
/requirements.txt:
--------------------------------------------------------------------------------
```
annotated-types==0.7.0
anyio==4.9.0
attrs==25.3.0
Authlib==1.6.1
certifi==2025.8.3
cffi==1.17.1
charset-normalizer==3.4.2
click==8.2.1
cryptography==45.0.5
cyclopts==3.22.5
dnspython==2.7.0
docstring_parser==0.17.0
docutils==0.22
email_validator==2.2.0
exceptiongroup==1.3.0
fastmcp==2.11.0
h11==0.16.0
httpcore==1.0.9
httpx==0.28.1
httpx-sse==0.4.1
idna==3.10
isodate==0.7.2
jsonschema==4.25.0
jsonschema-path==0.3.4
jsonschema-specifications==2025.4.1
lazy-object-proxy==1.11.0
markdown-it-py==3.0.0
MarkupSafe==3.0.2
mcp==1.12.3
mdurl==0.1.2
more-itertools==10.7.0
openapi-core==0.19.5
openapi-pydantic==0.5.1
openapi-schema-validator==0.6.3
openapi-spec-validator==0.7.2
parse==1.20.2
pathable==0.4.4
pycparser==2.22
pydantic==2.11.7
pydantic-settings==2.10.1
pydantic_core==2.33.2
Pygments==2.19.2
PyMuPDF==1.26.3
pymupdf4llm==0.0.27
pyperclip==1.9.0
python-dotenv==1.1.1
python-multipart==0.0.20
PyYAML==6.0.2
referencing==0.36.2
requests==2.32.4
rfc3339-validator==0.1.4
rich==14.1.0
rich-rst==1.3.1
rpds-py==0.26.0
six==1.17.0
sniffio==1.3.1
sse-starlette==3.0.2
starlette==0.47.2
typing-inspection==0.4.1
typing_extensions==4.14.1
urllib3==2.5.0
uvicorn==0.35.0
watchdog==6.0.0
Werkzeug==3.1.1
```
--------------------------------------------------------------------------------
/src/prompts.py:
--------------------------------------------------------------------------------
```python
from fastmcp import FastMCP
prompt_mcp = FastMCP()
@prompt_mcp.prompt
def list_paper_providers() -> str:
"""List all available paper providers."""
return "List all available paper providers."
@prompt_mcp.prompt
def find_attention_is_all_you_need() -> str:
"""Finds the Attention is all you need paper in arxiv."""
return "Search for Attention is all you need in arxiv"
@prompt_mcp.prompt
def get_paper_by_id() -> str:
"""Prompt to use the get_paper_by_id tool."""
return "Retrieve the full content (including abstract, sections, and references) of the paper with ID: 1706.03762"
@prompt_mcp.prompt
def get_paper_metadata_by_id() -> str:
"""Prompt to use the get_paper_metadata_by_id tool."""
return "Retrieve the metadata of the paper with ID: 1706.03762"
@prompt_mcp.prompt
def get_paper_by_url() -> str:
"""Prompt to use the get_paper_by_url tool."""
return "Retrieve the full content (including abstract, sections, and references) of the paper with URL: https://arxiv.org/pdf/1706.03762"
@prompt_mcp.prompt
def search_across_providers() -> str:
"""Prompt for searching across all providers (not specifying a provider)."""
return "Search for papers across all providers with the query: MCP"
```
--------------------------------------------------------------------------------
/.github/ISSUE_TEMPLATE/bug_report.yml:
--------------------------------------------------------------------------------
```yaml
name: Bug Report
description: Report a bug or unexpected behavior
title: "[Bug]: "
labels: ["bug"]
body:
- type: markdown
attributes:
value: |
Thanks for taking the time to report a bug 🫶 ! Please fill out the information below to help us investigate.
- type: textarea
id: description
attributes:
label: Bug Description
description: A clear and concise description of what the bug is.
placeholder: Describe what happened and what you expected to happen instead.
validations:
required: true
- type: textarea
id: reproduction
attributes:
label: Steps to Reproduce
description: Steps to reproduce the behavior
validations:
required: true
- type: textarea
id: expected
attributes:
label: Expected Behavior
description: What you expected to happen
validations:
required: true
- type: textarea
id: actual
attributes:
label: Actual Behavior
description: What actually happened (include full error message if applicable)
validations:
required: true
- type: textarea
id: additional-context
attributes:
label: Additional Context
description: Any other context about the problem here.
```
--------------------------------------------------------------------------------
/src/server.py:
--------------------------------------------------------------------------------
```python
from typing import Annotated
import asyncio
from fastmcp import FastMCP
from core import (
fetch_arxiv_papers,
fetch_openalex_papers,
fetch_osf_preprints,
fetch_single_arxiv_paper_metadata,
fetch_single_openalex_paper_metadata,
fetch_single_osf_preprint_metadata,
fetch_osf_providers,
get_all_providers,
)
from utils.pdf2md import download_pdf_and_parse_to_markdown, download_paper_and_parse_to_markdown, extract_pdf_to_markdown
from prompts import prompt_mcp
from tools import tools_mcp
mcp = FastMCP(
name="Paperclip MCP Server",
instructions="""
This server provides tools to search, retrieve, and read academic papers from multiple sources.
- Search papers across providers with filters for query text, subjects, and publication date
- Read full paper content in markdown format
- Retrieve paper metadata without downloading content (e.g. title, authors, abstract, publication date, journal info, and download URLs)
""",
)
# Import subservers
async def setup():
await mcp.import_server(prompt_mcp, prefix="prompt")
await mcp.import_server(tools_mcp, prefix="tools")
if __name__ == "__main__":
asyncio.run(setup())
mcp.run(transport="http", host="0.0.0.0", port=8000)
```
--------------------------------------------------------------------------------
/.github/ISSUE_TEMPLATE/question.yml:
--------------------------------------------------------------------------------
```yaml
name: Question
description: Ask a question about paperclip
title: "[Question]: "
labels: ["question"]
body:
- type: markdown
attributes:
value: |
Have a question about paperclip? We're here to help! Please provide as much detail as possible.
- type: textarea
id: question
attributes:
label: Your Question
description: What would you like to know about paperclip?
placeholder: Ask your question here...
validations:
required: true
- type: textarea
id: context
attributes:
label: Context
description: |
What are you trying to achieve? Providing context helps us give better answers.
placeholder: |
e.g., "I'm trying to understand how paperclip handles..."
or "I want to use paperclip to..."
- type: checkboxes
id: checklist
attributes:
label: Checklist
description: Please confirm you've done the following
options:
- label: I've checked the README and documentation
required: true
- label: I've searched existing issues for similar questions
required: true
- type: textarea
id: additional-info
attributes:
label: Additional Information
description: Any other details that might be helpful
```
--------------------------------------------------------------------------------
/.github/ISSUE_TEMPLATE/feature_request.yml:
--------------------------------------------------------------------------------
```yaml
name: Feature Request
description: Suggest a new feature or improvement
title: "[Feature]: "
labels: ["enhancement"]
body:
- type: markdown
attributes:
value: |
Thanks for suggesting a new feature! Please fill out the information below to help us understand your request.
- type: textarea
id: summary
attributes:
label: Feature Summary
description: A clear and concise description of the feature you'd like to see added.
placeholder: Briefly describe the feature you're requesting.
validations:
required: true
- type: textarea
id: problem
attributes:
label: Problem or Use Case
description: What problem does this feature solve? What use case does it address?
placeholder: |
e.g., "I often need to... but currently paperclip doesn't support..."
or "It would be helpful if paperclip could..."
validations:
required: true
- type: textarea
id: solution
attributes:
label: Proposed Solution
description: How would you like this feature to work?
placeholder: |
Describe your ideal solution. Consider:
- What command/option would trigger this feature?
- What would the output look like?
- How should it interact with existing features?
validations:
required: true
- type: textarea
id: alternatives
attributes:
label: Alternatives Considered
description: Have you considered any alternative solutions or workarounds?
placeholder: |
e.g., "I currently work around this by..."
or "Other tools like X handle this by..."
- type: textarea
id: additional-context
attributes:
label: Additional Context
description: |
Any other context, screenshots, or examples that would help us understand your request.
```
--------------------------------------------------------------------------------
/src/utils/sanitize_api_queries.py:
--------------------------------------------------------------------------------
```python
"""
Text processing utilities for API interactions.
"""
import re
def sanitize_api_queries(text: str, max_length: int = 200) -> str:
"""
Clean text for API queries by removing problematic characters and formatting.
Args:
text: The text to clean
max_length: Maximum allowed length (default 200)
Returns:
Cleaned text suitable for API queries
"""
if not text:
return text
# Remove or replace problematic characters
cleaned = text
# Replace various quote types with simple quotes or remove them
cleaned = cleaned.replace('"', '"').replace('"', '"').replace(""", "'").replace(""", "'")
# Remove or replace other problematic special characters
cleaned = cleaned.replace(" ", " ") # Non-breaking space
cleaned = cleaned.replace("\u00a0", " ") # Unicode non-breaking space
cleaned = cleaned.replace("\n", " ").replace("\r", " ").replace("\t", " ") # Line breaks and tabs
# Replace multiple spaces with single space
cleaned = re.sub(r"\s+", " ", cleaned)
# Remove leading/trailing whitespace
cleaned = cleaned.strip()
# Handle length limits
if len(cleaned) > max_length:
cleaned = cleaned[: max_length - 3] + "..."
# Remove or replace characters that commonly cause URL encoding issues
problematic_chars = ["<", ">", "{", "}", "|", "\\", "^", "`", "[", "]"]
for char in problematic_chars:
cleaned = cleaned.replace(char, "")
# Replace colons which seem to cause OSF API issues
cleaned = cleaned.replace(":", " -")
# Replace other potentially problematic punctuation
cleaned = cleaned.replace(";", ",") # Semicolons to commas
cleaned = cleaned.replace("?", "") # Remove question marks
cleaned = cleaned.replace("!", "") # Remove exclamation marks
cleaned = cleaned.replace("#", "") # Remove hashtags
cleaned = cleaned.replace("%", "") # Remove percent signs
# Clean up any double spaces created by replacements
cleaned = re.sub(r"\s+", " ", cleaned).strip()
return cleaned
```
--------------------------------------------------------------------------------
/src/core/providers.py:
--------------------------------------------------------------------------------
```python
from typing import Any, Dict, List
import requests
def fetch_osf_providers() -> List[Dict[str, Any]]:
"""Fetch current list of valid OSF preprint providers from API"""
url = "https://api.osf.io/v2/preprint_providers/"
response = requests.get(url)
response.raise_for_status()
data = response.json()
# Create provider objects from the response
providers = []
for provider in data["data"]:
provider_obj = {
"id": provider["id"],
"type": "osf",
"description": provider["attributes"]["description"],
"taxonomies": provider["relationships"]["taxonomies"]["links"]["related"]["href"],
"preprints": provider["relationships"]["preprints"]["links"]["related"]["href"],
}
providers.append(provider_obj)
return sorted(providers, key=lambda p: p["id"])
def get_external_providers() -> List[Dict[str, Any]]:
"""Get list of external (non-OSF) preprint providers"""
return [
{
"id": "arxiv",
"type": "standalone",
"description": "arXiv is a free distribution service and an open-access archive for scholarly articles in physics, mathematics, computer science, quantitative biology, quantitative finance, statistics, electrical engineering and systems science, and economics.",
},
{
"id": "openalex",
"type": "standalone",
"description": "OpenAlex is a comprehensive index of scholarly works across all disciplines.",
},
]
def get_all_providers() -> List[Dict[str, Any]]:
"""Get combined list of all available providers"""
osf_providers = fetch_osf_providers()
external_providers = get_external_providers()
all_providers = osf_providers + external_providers
return sorted(all_providers, key=lambda p: p["id"].lower())
def validate_provider(provider_id: str) -> bool:
"""Validate if a provider ID exists in the given providers list"""
valid_ids = [p["id"] for p in get_all_providers()]
return provider_id in valid_ids
```
--------------------------------------------------------------------------------
/.github/workflows/deploy.yml:
--------------------------------------------------------------------------------
```yaml
name: Deploy to VPS
on:
push:
branches: [main]
workflow_dispatch:
jobs:
deploy:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Deploy to VPS
uses: appleboy/[email protected]
with:
host: ${{ secrets.VPS_HOST }}
username: ${{ secrets.VPS_USERNAME }}
key: ${{ secrets.VPS_SSH_KEY }}
passphrase: ${{ secrets.VPS_SSH_KEY_PASSPHRASE }}
script: |
cd /opt/paperclip
git fetch origin main
git reset --hard origin/main
echo "🔄 Stopping containers..."
docker-compose -f docker-compose.prod.yml down
echo "🏗️ Building containers..."
docker-compose -f docker-compose.prod.yml build --no-cache
echo "🚀 Starting containers..."
docker-compose -f docker-compose.prod.yml up -d
echo "⏳ Waiting for containers to start..."
sleep 10
echo "📊 Container status:"
docker-compose -f docker-compose.prod.yml ps --format "table {{.Service}}\t{{.Status}}\t{{.Ports}}"
echo "📋 Recent logs from all services (filtered):"
docker-compose -f docker-compose.prod.yml logs --tail=30 | grep -E "(ERROR|WARN|INFO|Ready|Starting|Listening)" | head -50
echo "🧹 Cleaning up..."
docker system prune -f
- name: Show specific service logs on failure
if: failure()
uses: appleboy/[email protected]
with:
host: ${{ secrets.VPS_HOST }}
username: ${{ secrets.VPS_USERNAME }}
key: ${{ secrets.VPS_SSH_KEY }}
passphrase: ${{ secrets.VPS_SSH_KEY_PASSPHRASE }}
script: |
cd /opt/paperclip
echo "🔍 Filtered logs for debugging:"
echo "--- Traefik status ---"
docker-compose -f docker-compose.prod.yml logs --tail=50 traefik | grep -E "(ERROR|WARN|Ready|Starting|tls|certificate)" | head -30
echo "--- Paperclip MCP Server status ---"
docker-compose -f docker-compose.prod.yml logs --tail=50 paperclip-mcp | grep -E "(ERROR|WARN|Ready|Starting|Listening|Build)" | head -30
```
--------------------------------------------------------------------------------
/.github/workflows/ping-server.yml:
--------------------------------------------------------------------------------
```yaml
name: Health Check
on:
schedule:
# Run once per day at 10:00 UTC
- cron: "0 10 * * *"
workflow_dispatch:
jobs:
ping:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: "3.11"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install fastmcp==2.11.0
- name: Create ping script
run: |
cat > ping_server.py << 'EOF'
import asyncio
import sys
import os
from datetime import datetime
from fastmcp.client.client import Client
async def ping_server():
server_url = 'https://paperclip.matsjfunke.com/mcp'
print(f"🏓 Pinging MCP server at: {server_url}")
print(f"⏰ Timestamp: {datetime.now().isoformat()}")
try:
# Create client instance
client = Client(server_url)
# Connect and ping
async with client:
print("✅ Successfully connected to server")
# Send ping
ping_result = await client.ping()
if ping_result:
print("🎯 Ping successful! Server is responsive")
return True
else:
print("❌ Ping failed! Server did not respond properly")
return False
except Exception as e:
print(f"💥 Error connecting to server: {str(e)}")
print(f"🔧 Error type: {type(e).__name__}")
return False
if __name__ == "__main__":
result = asyncio.run(ping_server())
if not result:
sys.exit(1)
EOF
- name: Run ping test
run: python ping_server.py
- name: Report ping failure
if: failure()
run: |
echo "🚨 Server ping failed!"
echo "⚠️ This could indicate:"
echo " - Server is down or not responding"
echo " - Network connectivity issues"
echo " - Server is overloaded"
echo " - Configuration problems"
echo ""
echo "🔍 Check the deploy workflow and server logs for more details"
- name: Report ping success
if: success()
run: |
echo "✅ Server ping successful!"
echo "🟢 Paperclip server is healthy and responsive"
```
--------------------------------------------------------------------------------
/tests/test_metadata_retrieval.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Unit tests for metadata retrieval functionality.
"""
import unittest
import sys
import os
# Add src to path to import server modules
sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'src'))
from core import (
fetch_single_arxiv_paper_metadata,
fetch_single_openalex_paper_metadata,
fetch_single_osf_preprint_metadata,
)
class TestMetadataRetrieval(unittest.TestCase):
"""Test class for paper metadata retrieval."""
def setUp(self):
"""Set up test fixtures."""
self.osf_id = "2stpg"
self.openalex_id = "W4385245566"
self.arxiv_id = "1709.06308v1"
# Expected results based on tmp.py output
self.expected_osf_title = "The Economy of Attention and the Novel"
self.expected_openalex_title = "Attention Is All You Need"
self.expected_arxiv_title = "Exploring Human-like Attention Supervision in Visual Question Answering"
def test_osf_metadata_retrieval(self):
"""Test OSF paper metadata retrieval."""
result = fetch_single_osf_preprint_metadata(self.osf_id)
# Assert that result is a dictionary and not an error
self.assertIsInstance(result, dict)
self.assertNotIn("status", result) or result.get("status") != "error"
# Assert title and ID
self.assertEqual(result.get("title"), self.expected_osf_title)
self.assertEqual(result.get("id"), self.osf_id)
def test_openalex_metadata_retrieval(self):
"""Test OpenAlex paper metadata retrieval."""
result = fetch_single_openalex_paper_metadata(self.openalex_id)
# Assert that result is a dictionary and not an error
self.assertIsInstance(result, dict)
self.assertNotIn("status", result) or result.get("status") != "error"
# Assert title and ID
self.assertEqual(result.get("title"), self.expected_openalex_title)
self.assertEqual(result.get("id"), self.openalex_id)
def test_arxiv_metadata_retrieval(self):
"""Test ArXiv paper metadata retrieval."""
result = fetch_single_arxiv_paper_metadata(self.arxiv_id)
# Assert that result is a dictionary and not an error
self.assertIsInstance(result, dict)
self.assertNotIn("status", result) or result.get("status") != "error"
# Assert title and ID
self.assertEqual(result.get("title"), self.expected_arxiv_title)
self.assertEqual(result.get("id"), self.arxiv_id)
def test_metadata_contains_required_fields(self):
"""Test that metadata contains essential fields."""
result = fetch_single_arxiv_paper_metadata(self.arxiv_id)
# Assert required fields are present
self.assertIn("title", result)
self.assertIn("id", result)
self.assertIsNotNone(result.get("title"))
self.assertIsNotNone(result.get("id"))
if __name__ == "__main__":
unittest.main()
```
--------------------------------------------------------------------------------
/docker-compose.prod.yml:
--------------------------------------------------------------------------------
```yaml
version: "3.8"
services:
traefik:
image: traefik:v3.0
container_name: traefik
command:
- "--api.insecure=false" # Disable insecure API dashboard for production security
- "--providers.docker=true" # Auto-discover services via Docker labels
# Only expose services that explicitly set traefik.enable=true (security best practice)
- "--providers.docker.exposedbydefault=false"
- "--entrypoints.web.address=:80" # HTTP entrypoint redirects to HTTPS
- "--entrypoints.websecure.address=:443" # HTTPS entrypoint
- "--certificatesresolvers.myresolver.acme.tlschallenge=true" # Automatic SSL certificate generation via Let's Encrypt TLS challenge
- "--certificatesresolvers.myresolver.acme.email=mats.funke@gmail.com" # Email required for Let's Encrypt certificate notifications and recovery
- "--certificatesresolvers.myresolver.acme.storage=/letsencrypt/acme.json" # SSL certificates persist across container restarts
# Force HTTP to HTTPS redirect for security (all traffic must be encrypted)
- "--entrypoints.web.http.redirections.entrypoint.to=websecure"
- "--entrypoints.web.http.redirections.entrypoint.scheme=https"
ports:
- "80:80"
- "443:443"
volumes:
- /var/run/docker.sock:/var/run/docker.sock:ro
- traefik_letsencrypt:/letsencrypt # SSL certificates persist across container restarts
networks:
- web
restart: unless-stopped
paperclip-mcp:
build:
context: .
dockerfile: Dockerfile
container_name: paperclip-mcp
command: python server.py
labels:
- "traefik.enable=true"
# Define service first to avoid Traefik auto-generating conflicting services
- "traefik.http.services.paperclip-mcp.loadbalancer.server.port=8000"
# MCP server route - accessible via HTTPS
- "traefik.http.routers.paperclip-mcp.rule=Host(`paperclip.matsjfunke.com`)"
- "traefik.http.routers.paperclip-mcp.entrypoints=websecure"
- "traefik.http.routers.paperclip-mcp.tls.certresolver=myresolver"
- "traefik.http.routers.paperclip-mcp.service=paperclip-mcp"
# CORS headers required for MCP protocol compatibility with AI clients
- "traefik.http.middlewares.mcp-cors.headers.accesscontrolallowmethods=GET,POST,OPTIONS,PUT,DELETE"
- "traefik.http.middlewares.mcp-cors.headers.accesscontrolallowheaders=Content-Type,Authorization,Accept,Origin,User-Agent,DNT,Cache-Control,X-Mx-ReqToken,Keep-Alive,X-Requested-With,If-Modified-Since,mcp-session-id"
- "traefik.http.middlewares.mcp-cors.headers.accesscontrolalloworiginlist=*"
- "traefik.http.middlewares.mcp-cors.headers.accesscontrolmaxage=86400"
# Apply CORS middleware to the router
- "traefik.http.routers.paperclip-mcp.middlewares=mcp-cors"
networks:
- web
restart: unless-stopped
depends_on:
- traefik
environment:
- PYTHONPATH=/app
volumes:
# Named volume for Let's Encrypt certificates persistence across container restarts
traefik_letsencrypt:
networks:
# Internal network for container communication (external=false for security)
web:
external: false
```
--------------------------------------------------------------------------------
/tests/test_pdf_retrieval.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""
Unit tests for PDF retrieval functionality.
"""
import unittest
import sys
import os
import asyncio
# Add src to path to import server modules
sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'src'))
from core import (
fetch_single_arxiv_paper_metadata,
fetch_single_openalex_paper_metadata,
fetch_single_osf_preprint_metadata,
)
from utils.pdf2md import download_paper_and_parse_to_markdown
class TestPdfRetrieval(unittest.TestCase):
"""Test class for paper PDF retrieval and content extraction."""
def setUp(self):
"""Set up test fixtures."""
self.osf_id = "2stpg"
self.openalex_id = "W4385245566"
self.arxiv_id = "1709.06308v1"
# Expected content starts based on tmp_pdf.py output
self.expected_osf_start = "#### The Economy of Attention and the Novel"
self.expected_openalex_start = "Skip to main content"
self.expected_arxiv_start = "## **Exploring Human-like Attention Supervision in Visual Question Answering**"
def test_osf_pdf_retrieval(self):
"""Test OSF paper PDF retrieval and content extraction."""
metadata = fetch_single_osf_preprint_metadata(self.osf_id)
result = asyncio.run(download_paper_and_parse_to_markdown(
metadata=metadata,
pdf_url_field="download_url",
paper_id=self.osf_id,
write_images=False
))
# Assert that result is successful
self.assertIsInstance(result, dict)
self.assertEqual(result.get("status"), "success")
# Assert content is retrieved and has expected start
content = result.get("content", "")
self.assertGreater(len(content), 1000) # Should have substantial content
self.assertTrue(content.startswith(self.expected_osf_start))
def test_openalex_pdf_retrieval(self):
"""Test OpenAlex paper PDF retrieval and content extraction."""
metadata = fetch_single_openalex_paper_metadata(self.openalex_id)
result = asyncio.run(download_paper_and_parse_to_markdown(
metadata=metadata,
pdf_url_field="pdf_url",
paper_id=self.openalex_id,
write_images=False
))
# Assert that result is successful
self.assertIsInstance(result, dict)
self.assertEqual(result.get("status"), "success")
# Assert content is retrieved and has expected start
content = result.get("content", "")
self.assertGreater(len(content), 1000) # Should have substantial content
self.assertTrue(content.startswith(self.expected_openalex_start))
def test_arxiv_pdf_retrieval(self):
"""Test ArXiv paper PDF retrieval and content extraction."""
metadata = fetch_single_arxiv_paper_metadata(self.arxiv_id)
result = asyncio.run(download_paper_and_parse_to_markdown(
metadata=metadata,
pdf_url_field="download_url",
paper_id=self.arxiv_id,
write_images=False
))
# Assert that result is successful
self.assertIsInstance(result, dict)
self.assertEqual(result.get("status"), "success")
# Assert content is retrieved and has expected start
content = result.get("content", "")
self.assertGreater(len(content), 1000) # Should have substantial content
self.assertTrue(content.startswith(self.expected_arxiv_start))
def test_pdf_content_contains_markdown(self):
"""Test that PDF content is properly converted to markdown."""
metadata = fetch_single_arxiv_paper_metadata(self.arxiv_id)
result = asyncio.run(download_paper_and_parse_to_markdown(
metadata=metadata,
pdf_url_field="download_url",
paper_id=self.arxiv_id,
write_images=False
))
# Assert successful retrieval
self.assertEqual(result.get("status"), "success")
content = result.get("content", "")
# Assert markdown characteristics are present
self.assertIn("##", content) # Should contain markdown headers
self.assertIn("**", content) # Should contain bold text
self.assertGreater(len(content.split('\n')), 50) # Should have many lines
def test_pdf_retrieval_includes_metadata(self):
"""Test that PDF retrieval includes paper metadata."""
metadata = fetch_single_osf_preprint_metadata(self.osf_id)
result = asyncio.run(download_paper_and_parse_to_markdown(
metadata=metadata,
pdf_url_field="download_url",
paper_id=self.osf_id,
write_images=False
))
# Assert successful retrieval
self.assertEqual(result.get("status"), "success")
# Assert metadata is included
result_metadata = result.get("metadata", {})
self.assertIsInstance(result_metadata, dict)
self.assertIn("title", result_metadata)
self.assertIn("id", result_metadata)
if __name__ == "__main__":
unittest.main()
```
--------------------------------------------------------------------------------
/src/core/arxiv.py:
--------------------------------------------------------------------------------
```python
import xml.etree.ElementTree as ET
from typing import Any, Dict, Optional
from urllib.parse import quote, urlencode
import requests
from utils import sanitize_api_queries
def fetch_arxiv_papers(
query: Optional[str] = None,
category: Optional[str] = None,
author: Optional[str] = None,
title: Optional[str] = None,
max_results: int = 100,
start_index: int = 0,
) -> Dict[str, Any]:
"""
Fetch papers from arXiv API using various search parameters.
Args:
query: General search query
category: arXiv category (e.g., 'cs.AI', 'physics.gen-ph')
author: Author name to search for
title: Title keywords to search for
max_results: Maximum number of results to return (default 100)
start_index: Starting index for pagination (default 0)
Returns:
Dictionary containing papers data from arXiv API
"""
# Build search query
search_parts = []
if query:
search_parts.append(f"all:{sanitize_api_queries(query, max_length=200)}")
if category:
search_parts.append(f"cat:{sanitize_api_queries(category, max_length=50)}")
if author:
search_parts.append(f"au:{sanitize_api_queries(author, max_length=100)}")
if title:
search_parts.append(f"ti:{sanitize_api_queries(title, max_length=200)}")
if not search_parts:
# Default search if no parameters provided
search_query = "all:*"
else:
search_query = " AND ".join(search_parts)
# Build API URL
base_url = "http://export.arxiv.org/api/query"
params = {"search_query": search_query, "start": start_index, "max_results": min(max_results, 20)}
query_string = urlencode(params, safe=":", quote_via=quote)
url = f"{base_url}?{query_string}"
try:
response = requests.get(url, timeout=30)
response.raise_for_status()
# Parse XML response
root = ET.fromstring(response.content)
# Extract namespace
ns = {"atom": "http://www.w3.org/2005/Atom", "arxiv": "http://arxiv.org/schemas/atom"}
papers = []
for entry in root.findall("atom:entry", ns):
paper = _parse_arxiv_entry(entry, ns)
papers.append(paper)
return {
"data": papers,
"meta": {"total_results": len(papers), "start_index": start_index, "max_results": max_results, "search_query": search_query},
}
except requests.exceptions.RequestException as e:
raise ValueError(f"Request failed: {str(e)}")
except ET.ParseError as e:
raise ValueError(f"Failed to parse arXiv response: {str(e)}")
def _parse_arxiv_entry(entry, ns):
"""Parse a single arXiv entry from XML."""
# Extract basic info
arxiv_id = entry.find("atom:id", ns).text.split("/")[-1] if entry.find("atom:id", ns) is not None else ""
title = entry.find("atom:title", ns).text.strip() if entry.find("atom:title", ns) is not None else ""
summary = entry.find("atom:summary", ns).text.strip() if entry.find("atom:summary", ns) is not None else ""
published = entry.find("atom:published", ns).text if entry.find("atom:published", ns) is not None else ""
updated = entry.find("atom:updated", ns).text if entry.find("atom:updated", ns) is not None else ""
# Extract authors
authors = []
for author in entry.findall("atom:author", ns):
name_elem = author.find("atom:name", ns)
if name_elem is not None:
authors.append(name_elem.text)
# Extract categories
categories = []
for category in entry.findall("atom:category", ns):
term = category.get("term")
if term:
categories.append(term)
# Extract links (PDF, abstract)
pdf_url = ""
abstract_url = ""
for link in entry.findall("atom:link", ns):
if link.get("type") == "application/pdf":
pdf_url = link.get("href", "")
elif link.get("rel") == "alternate":
abstract_url = link.get("href", "")
# Extract DOI if available
doi = ""
doi_elem = entry.find("arxiv:doi", ns)
if doi_elem is not None:
doi = doi_elem.text
return {
"id": arxiv_id,
"title": title,
"summary": summary,
"authors": authors,
"categories": categories,
"published": published,
"updated": updated,
"pdf_url": pdf_url,
"abstract_url": abstract_url,
"doi": doi,
}
def fetch_single_arxiv_paper_metadata(paper_id: str) -> Dict[str, Any]:
"""
Fetch metadata for a single arXiv paper by ID.
Args:
paper_id: arXiv paper ID (e.g., '2301.00001' or 'cs.AI/0001001')
Returns:
Dictionary containing paper metadata
"""
# Validate paper exists first
pdf_url = f"https://arxiv.org/pdf/{paper_id}"
response = requests.head(pdf_url, timeout=10)
if response.status_code != 200:
raise ValueError(f"arXiv paper not found: {paper_id}")
# Fetch metadata from API
try:
api_url = f"http://export.arxiv.org/api/query?id_list={paper_id}"
response = requests.get(api_url, timeout=30)
response.raise_for_status()
# Parse XML response
root = ET.fromstring(response.content)
ns = {"atom": "http://www.w3.org/2005/Atom", "arxiv": "http://arxiv.org/schemas/atom"}
entry = root.find("atom:entry", ns)
if entry is None:
raise ValueError(f"No metadata found for paper: {paper_id}")
metadata = _parse_arxiv_entry(entry, ns)
metadata["download_url"] = pdf_url
return metadata
except requests.exceptions.RequestException as e:
raise ValueError(f"Failed to fetch paper metadata: {str(e)}")
except ET.ParseError as e:
raise ValueError(f"Failed to parse arXiv response: {str(e)}")
```
--------------------------------------------------------------------------------
/src/utils/pdf2md.py:
--------------------------------------------------------------------------------
```python
"""
PDF processing utilities using pymupdf4llm.
download_paper_and_parse_to_markdown() download_pdf_and_parse_to_markdown()
(with metadata) (direct URL)
| |
v v
Extract PDF URL from metadata Generate filename from URL
| |
+-------------------+---------------------+
|
v
_download_and_parse_pdf_core()
|
v
requests.get(pdf_url)
|
v
extract_pdf_to_markdown()
|
v
Return (content, size, message)
|
+-----------+-----------+
| |
v v
Format response Format response
with metadata with pdf_url
The shared core logic eliminates code duplication while maintaining
distinct interfaces for metadata-based vs direct URL workflows.
"""
import os
from typing import Optional
import tempfile
import httpx
import requests
import pymupdf4llm as pdfmd
async def extract_pdf_to_markdown(file_input, filename: Optional[str] = None, write_images: bool = False) -> str:
"""
Extract PDF content to markdown using pymupdf4llm.
Args:
file_input: Can be either:
- A file path (str) to an existing PDF
- File bytes/content (bytes) that will be written to temp file
- A file object with .read() method (for async file handling)
filename: Optional filename to use for temp file (only used when file_input is bytes/file object)
write_images: Whether to extract and write images (default: False)
Returns:
Markdown content as string
"""
temp_path = None
try:
# Handle different input types
if isinstance(file_input, str) and os.path.exists(file_input):
# Direct file path
md = pdfmd.to_markdown(file_input, write_images=write_images)
return md
elif isinstance(file_input, bytes):
# File bytes - write to temp file
temp_filename = filename or "temp_pdf.pdf"
temp_path = f"/tmp/{temp_filename}"
with open(temp_path, "wb") as f:
f.write(file_input)
md = pdfmd.to_markdown(temp_path, write_images=write_images)
return md
elif hasattr(file_input, "read"):
# File object (like FastAPI UploadFile)
temp_filename = filename or getattr(file_input, "filename", "temp_pdf.pdf")
temp_path = f"/tmp/{temp_filename}"
# Handle both sync and async file objects
if hasattr(file_input, "__aiter__") or hasattr(file_input.read, "__call__"):
try:
# Try async read first
content = await file_input.read()
except TypeError:
# Fall back to sync read
content = file_input.read()
else:
content = file_input.read()
with open(temp_path, "wb") as f:
f.write(content)
md = pdfmd.to_markdown(temp_path, write_images=write_images)
return md
else:
raise ValueError(f"Unsupported file_input type: {type(file_input)}")
finally:
# Clean up temporary file
if temp_path and os.path.exists(temp_path):
try:
os.unlink(temp_path)
except Exception:
pass # Ignore cleanup errors
async def _download_and_parse_pdf_core(
pdf_url: str,
filename: str = "paper.pdf",
write_images: bool = False
) -> tuple[str, int, str]:
# Download PDF
pdf_response = requests.get(pdf_url, timeout=60)
pdf_response.raise_for_status()
# Parse PDF to markdown
markdown_content = await extract_pdf_to_markdown(
pdf_response.content,
filename=filename,
write_images=write_images
)
file_size = len(pdf_response.content)
message = f"Successfully parsed PDF content ({file_size} bytes)"
return markdown_content, file_size, message
async def download_paper_and_parse_to_markdown(
metadata: dict,
pdf_url_field: str = "download_url",
paper_id: str = "",
write_images: bool = False
) -> dict:
# Extract PDF URL from metadata
pdf_url = metadata.get(pdf_url_field)
if not pdf_url:
return {
"status": "error",
"message": f"No PDF URL found in metadata field '{pdf_url_field}'",
"metadata": metadata
}
try:
filename = f"{paper_id}.pdf" if paper_id else "paper.pdf"
markdown_content, file_size, message = await _download_and_parse_pdf_core(
pdf_url, filename, write_images
)
return {
"status": "success",
"metadata": metadata,
"content": markdown_content,
"file_size": file_size,
"message": message,
}
except requests.exceptions.RequestException as e:
return {
"status": "error",
"message": f"Network error: {str(e)}",
"metadata": metadata
}
except Exception as e:
return {
"status": "error",
"message": f"Error parsing PDF: {str(e)}",
"metadata": metadata
}
async def download_pdf_and_parse_to_markdown(pdf_url: str, write_images: bool = False) -> dict:
try:
filename = pdf_url.split('/')[-1] if '/' in pdf_url else "paper.pdf"
if not filename.endswith('.pdf'):
filename = "paper.pdf"
markdown_content, file_size, message = await _download_and_parse_pdf_core(
pdf_url, filename, write_images
)
return {
"status": "success",
"content": markdown_content,
"file_size": file_size,
"pdf_url": pdf_url,
"message": message,
}
except requests.exceptions.RequestException as e:
return {
"status": "error",
"message": f"Network error downloading PDF: {str(e)}",
"pdf_url": pdf_url
}
except Exception as e:
return {
"status": "error",
"message": f"Error parsing PDF: {str(e)}",
"pdf_url": pdf_url
}
```
--------------------------------------------------------------------------------
/src/tools.py:
--------------------------------------------------------------------------------
```python
from typing import Annotated
from fastmcp import FastMCP
from core import (
fetch_arxiv_papers,
fetch_openalex_papers,
fetch_osf_preprints,
fetch_single_arxiv_paper_metadata,
fetch_single_openalex_paper_metadata,
fetch_single_osf_preprint_metadata,
fetch_osf_providers,
get_all_providers,
)
from utils.pdf2md import download_pdf_and_parse_to_markdown, download_paper_and_parse_to_markdown
tools_mcp = FastMCP()
@tools_mcp.tool(
name="list_providers",
description="Get the complete list of all available academic paper providers. Includes preprint servers (ArXiv, Open Science Framework (OSF) discipline-specific servers). Returns provider IDs for use with search_papers.",
)
async def list_providers() -> dict:
"""
Call the osf api and hardcode other supported providers.
"""
providers = get_all_providers()
return {
"providers": providers,
"total_count": len(providers),
}
@tools_mcp.tool(
name="search_papers",
description="Find papers using supported filters. And retrieve their metadata.",
)
async def search_papers(
query: Annotated[str | None, "Text search query for title, author, content"] = None,
provider: Annotated[str | None, "Provider ID to filter preprints (e.g., psyarxiv, socarxiv, arxiv, openalex, osf)"] = None,
subjects: Annotated[str | None, "Subject categories to filter by (e.g., psychology, neuroscience)"] = None,
date_published_gte: Annotated[str | None, "Filter preprints published on or after this date (e.g., 2024-01-01)"] = None,
) -> dict:
if provider and provider not in [p["id"] for p in get_all_providers()]:
return {
"error": f"Provider: {provider} not found. Please use list_preprint_providers to get the complete list of all available providers.",
}
if not provider:
all_results = []
arxiv_results = fetch_arxiv_papers(query=query, category=subjects)
all_results.append(arxiv_results)
openalex_results = fetch_openalex_papers(
query=query,
concepts=subjects,
date_published_gte=date_published_gte
)
all_results.append(openalex_results)
osf_results = fetch_osf_preprints(
provider_id="osf",
subjects=subjects,
date_published_gte=date_published_gte,
query=query,
)
all_results.append(osf_results)
return {
"papers": all_results,
"total_count": len(all_results),
"providers_searched": ["arxiv", "openalex", "osf"],
}
if provider == "osf" or provider in [p["id"] for p in fetch_osf_providers()]:
return fetch_osf_preprints( provider_id=provider,
subjects=subjects,
date_published_gte=date_published_gte,
query=query,
)
elif provider == "arxiv":
return fetch_arxiv_papers(
query=query,
category=subjects,
)
elif provider == "openalex":
return fetch_openalex_papers(
query=query,
concepts=subjects,
date_published_gte=date_published_gte,
)
@tools_mcp.tool(
name="get_paper_by_id",
description="Download and convert an academic paper to markdown format by its ID. Returns full paper content including title, abstract, sections, and references. Supports ArXiv (e.g., '2407.06405v1'), OpenAlex (e.g., 'W4385245566'), and OSF IDs.",
)
async def get_paper_by_id(paper_id: str) -> dict:
try:
# Check if it's an OpenAlex paper ID (starts with 'W' followed by numbers)
if paper_id.startswith("W") and paper_id[1:].isdigit():
# OpenAlex paper ID format (e.g., "W4385245566")
metadata = fetch_single_openalex_paper_metadata(paper_id)
return await download_paper_and_parse_to_markdown(
metadata=metadata,
pdf_url_field="pdf_url",
paper_id=paper_id,
write_images=False
)
# Check if it's an arXiv paper ID (contains 'v' followed by version number or matches arXiv format)
elif "." in paper_id and ("v" in paper_id or len(paper_id.split(".")[0]) == 4):
# arXiv paper ID format (e.g., "2407.06405v1" or "cs.AI/0001001")
metadata = fetch_single_arxiv_paper_metadata(paper_id)
return await download_paper_and_parse_to_markdown(
metadata=metadata,
pdf_url_field="download_url",
paper_id=paper_id,
write_images=False
)
else:
# OSF paper ID format
metadata = fetch_single_osf_preprint_metadata(paper_id)
# Handle error case from OSF metadata function
if isinstance(metadata, dict) and metadata.get("status") == "error":
return metadata
return await download_paper_and_parse_to_markdown(
metadata=metadata,
pdf_url_field="download_url",
paper_id=paper_id,
write_images=False
)
except ValueError as e:
return {"status": "error", "message": str(e), "metadata": {}}
@tools_mcp.tool(
name="get_paper_metadata_by_id",
description="Get metadata for an academic paper by its ID without downloading full content. Returns title, authors, abstract, publication date, journal info, and download URLs. Supports ArXiv, OpenAlex, and OSF IDs.",
)
async def get_paper_metadata_by_id(preprint_id: str) -> dict:
# Check if it's an OpenAlex paper ID (starts with 'W' followed by numbers)
if preprint_id.startswith("W") and preprint_id[1:].isdigit():
# OpenAlex paper ID format (e.g., "W4385245566")
return fetch_single_openalex_paper_metadata(preprint_id)
# Check if it's an arXiv paper ID (contains 'v' followed by version number or matches arXiv format)
elif "." in preprint_id and ("v" in preprint_id or len(preprint_id.split(".")[0]) == 4):
# arXiv paper ID format (e.g., "2407.06405v1" or "cs.AI/0001001")
return fetch_single_arxiv_paper_metadata(preprint_id)
else:
# OSF paper ID format
return fetch_single_osf_preprint_metadata(preprint_id)
@tools_mcp.tool(
name="get_paper_content_by_url",
description="Download and convert the PDF of a paper to markdown format from a direct PDF URL. Returns full paper content parsed from the PDF including title, abstract, sections, and references.",
)
async def get_paper_content_by_url(pdf_url: str) -> dict:
return await download_pdf_and_parse_to_markdown(pdf_url)
```
--------------------------------------------------------------------------------
/src/core/openalex.py:
--------------------------------------------------------------------------------
```python
import requests
from typing import Any, Dict, Optional
from urllib.parse import urlencode
from utils import sanitize_api_queries
def fetch_openalex_papers(
query: Optional[str] = None,
author: Optional[str] = None,
title: Optional[str] = None,
publisher: Optional[str] = None,
institution: Optional[str] = None,
concepts: Optional[str] = None,
date_published_gte: Optional[str] = None,
max_results: int = 20,
page: int = 1,
) -> Dict[str, Any]:
"""
Fetch papers from the OpenAlex API using various search parameters.
Args:
query: General search query (full-text search)
author: Author name to search for
title: Title keywords to search for
publisher: Publisher name to search for
institution: Institution name to search for
concepts: Concepts to filter by (e.g., 'computer science', 'artificial intelligence')
date_published_gte: Published date greater than or equal to (YYYY-MM-DD)
max_results: Maximum number of results to return (default 20, max 200)
page: Page number for pagination (default 1)
Returns:
Dictionary containing papers data from OpenAlex API
"""
base_url = "https://api.openalex.org/works"
filters = {}
if query:
filters["search"] = sanitize_api_queries(query, max_length=500)
if author:
filters["filter"] = f"authors.author_name.search:{sanitize_api_queries(author, max_length=200)}"
if title:
if "filter" in filters:
filters["filter"] += f",{sanitize_api_queries(title, max_length=500)}"
else:
filters["filter"] = f"title.search:{sanitize_api_queries(title, max_length=500)}"
if publisher:
if "filter" in filters:
filters["filter"] += f",publisher.search:{sanitize_api_queries(publisher, max_length=200)}"
else:
filters["filter"] = f"publisher.search:{sanitize_api_queries(publisher, max_length=200)}"
if institution:
if "filter" in filters:
filters["filter"] += f",institutions.institution_name.search:{sanitize_api_queries(institution, max_length=200)}"
else:
filters["filter"] = f"institutions.institution_name.search:{sanitize_api_queries(institution, max_length=200)}"
if concepts:
# OpenAlex concepts can be tricky, simple search might work for now
if "filter" in filters:
filters["filter"] += f",concepts.display_name.search:{sanitize_api_queries(concepts, max_length=200)}"
else:
filters["filter"] = f"concepts.display_name.search:{sanitize_api_queries(concepts, max_length=200)}"
if date_published_gte:
if "filter" in filters:
filters["filter"] += f",publication_date:>{date_published_gte}"
else:
filters["filter"] = f"publication_date:>{date_published_gte}"
# Add pagination and results limit
filters["per_page"] = min(max_results, 200) # OpenAlex max per_page is 200
filters["page"] = page
try:
query_string = urlencode(filters, safe=":,") # Allow colons and commas in filter values
url = f"{base_url}?{query_string}"
response = requests.get(url, timeout=30)
response.raise_for_status()
data = response.json()
papers = []
for result in data.get("results", []):
paper = _parse_openalex_work(result)
papers.append(paper)
return {
"data": papers,
"meta": {
"total_results": data.get("meta", {}).get("count", 0),
"page": page,
"per_page": filters["per_page"],
"search_query": query, # Only include general query for simplicity
},
"links": data.get("meta", {}).get("next_page", ""),
}
except requests.exceptions.RequestException as e:
raise ValueError(f"Request failed: {str(e)}")
def _parse_openalex_work(work_data: Dict[str, Any]) -> Dict[str, Any]:
"""Parse a single OpenAlex work entry."""
# Extract authors
authors = []
for authorship in work_data.get("authorships", []):
author = authorship.get("author", {})
if author and author.get("display_name"):
authors.append(author["display_name"])
# Extract concepts
concepts = []
for concept in work_data.get("concepts", []):
if concept.get("display_name"):
concepts.append(concept["display_name"])
# Extract PDF URL from primary location or alternative locations
pdf_url = ""
primary_location = work_data.get("primary_location") or {}
if primary_location and primary_location.get("pdf_url"):
pdf_url = primary_location["pdf_url"]
elif primary_location and primary_location.get("landing_page_url"):
pdf_url = primary_location.get("landing_page_url", "")
else:
# Check all locations for a PDF URL if primary doesn't have one
for location in work_data.get("locations", []):
if location.get("pdf_url"):
pdf_url = location["pdf_url"]
break
# Extract abstract from inverted index
abstract = ""
abstract_inverted_index = work_data.get("abstract_inverted_index", {})
if abstract_inverted_index:
abstract = _reconstruct_abstract_from_inverted_index(abstract_inverted_index)
# Extract OpenAlex ID from URL
openalex_id = work_data.get("id", "")
if openalex_id.startswith("https://openalex.org/"):
openalex_id = openalex_id.replace("https://openalex.org/", "")
# Get primary location source info
primary_source = ""
if primary_location and primary_location.get("source"):
source = primary_location.get("source") or {}
primary_source = source.get("display_name", "")
return {
"id": openalex_id,
"doi": work_data.get("doi", ""),
"title": work_data.get("title", "") or work_data.get("display_name", ""),
"abstract": abstract,
"authors": authors,
"publication_date": work_data.get("publication_date", ""),
"publication_year": work_data.get("publication_year"),
"cited_by_count": work_data.get("cited_by_count", 0),
"concepts": concepts,
"primary_location_url": (work_data.get("primary_location") or {}).get("landing_page_url", ""),
"primary_source": primary_source,
"pdf_url": pdf_url,
"open_access_status": (work_data.get("open_access") or {}).get("oa_status", "closed"),
"is_open_access": (work_data.get("primary_location") or {}).get("is_oa", False),
"type": work_data.get("type", ""),
"relevance_score": work_data.get("relevance_score", 0),
}
def _reconstruct_abstract_from_inverted_index(inverted_index: Dict[str, Any]) -> str:
"""Reconstruct abstract text from OpenAlex's inverted index format."""
if not inverted_index:
return ""
try:
# Create a list to hold words at their positions
word_positions = []
for word, positions in inverted_index.items():
if isinstance(positions, list):
for position in positions:
word_positions.append((position, word))
# Sort by position and reconstruct text
word_positions.sort(key=lambda x: x[0])
abstract_words = [word for _, word in word_positions]
return " ".join(abstract_words)
except Exception:
# If reconstruction fails, return empty string
return ""
def fetch_single_openalex_paper_metadata(paper_id: str) -> Dict[str, Any]:
"""
Fetch metadata for a single OpenAlex paper by ID.
Args:
paper_id: OpenAlex paper ID (e.g., 'W2741809809')
Returns:
Dictionary containing paper metadata
"""
base_url = "https://api.openalex.org/works"
url = f"{base_url}/{paper_id}"
try:
response = requests.get(url, timeout=30)
response.raise_for_status()
work_data = response.json()
if not work_data.get("id"):
raise ValueError(f"No metadata found for paper: {paper_id}")
metadata = _parse_openalex_work(work_data)
return metadata
except requests.exceptions.RequestException as e:
raise ValueError(f"Failed to fetch paper metadata: {str(e)}")
```
--------------------------------------------------------------------------------
/src/core/osf.py:
--------------------------------------------------------------------------------
```python
from typing import Any, Dict, Optional
from urllib.parse import quote, urlencode
import requests
from utils import sanitize_api_queries
from .providers import fetch_osf_providers, validate_provider
def fetch_osf_preprints(
provider_id: Optional[str] = None,
subjects: Optional[str] = None,
date_published_gte: Optional[str] = None,
query: Optional[str] = None,
) -> Dict[str, Any]:
"""
NOTE: The OSF API only supports a limited set of filters. Many common filters
like title, DOI, creator, etc. are NOT supported by the OSF API.
When query is provided, uses the trove search endpoint which supports full-text search.
Args:
provider_id: The provider ID (e.g., 'psyarxiv', 'socarxiv')
subjects: Subject filter (e.g., 'psychology', 'neuroscience')
date_published_gte: Published date greater than or equal to (YYYY-MM-DD)
query: Text search query for title, author, content (uses trove endpoint)
Returns:
Dictionary containing preprints data from OSF API or trove search
"""
# If query is provided, use trove search endpoint
if query:
return fetch_osf_preprints_via_trove(query, provider_id)
# Build query parameters (only using OSF API supported filters)
filters = {}
if provider_id:
filters["filter[provider]"] = sanitize_api_queries(provider_id, max_length=50)
if subjects:
filters["filter[subjects]"] = sanitize_api_queries(subjects, max_length=100)
if date_published_gte:
filters["filter[date_published][gte]"] = date_published_gte # Dates don't need cleaning
# Build URL with filters
base_url = "https://api.osf.io/v2/preprints/"
if filters:
query_string = urlencode(filters, safe="", quote_via=quote)
url = f"{base_url}?{query_string}"
else:
url = base_url
try:
response = requests.get(url, timeout=30)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as e:
if response.status_code == 400:
if len(filters) > 1:
simple_filters = {}
if provider_id:
simple_filters["filter[provider]"] = sanitize_api_queries(provider_id, max_length=50)
simple_query = urlencode(simple_filters, safe="", quote_via=quote)
simple_url = f"{base_url}?{simple_query}"
try:
simple_response = requests.get(simple_url, timeout=30)
simple_response.raise_for_status()
result = simple_response.json()
# Add a note about the simplified search
if "meta" not in result:
result["meta"] = {}
result["meta"][
"search_note"
] = f"Original search failed (400 error), showing all results for provider '{provider_id}'. You may need to filter results manually."
return result
except:
pass
raise ValueError(f"Bad request (400) - The search parameters may be invalid. Original error: {str(e)}")
else:
raise e
except requests.exceptions.RequestException as e:
raise ValueError(f"Request failed: {str(e)}")
def fetch_osf_preprints_via_trove(query: str, provider_id: Optional[str] = None) -> Dict[str, Any]:
"""
Fetch preprints using the trove search endpoint and transform to standard format.
"""
from urllib.parse import quote_plus
# Build trove search URL
base_url = "https://share.osf.io/trove/index-card-search"
params = {
"cardSearchFilter[resourceType]": "Preprint",
"cardSearchText[*,creator.name,isContainedBy.creator.name]": sanitize_api_queries(query, max_length=200),
"page[size]": "20", # Match our default page size
"sort": "-relevance",
}
# Validate provider if specified (we'll filter results later)
if provider_id:
if not validate_provider(provider_id):
osf_providers = fetch_osf_providers()
valid_ids = [p["id"] for p in osf_providers]
raise ValueError(f"Invalid OSF provider: {provider_id}. Valid OSF providers: {valid_ids}")
# Build query string manually to handle complex parameter names
query_parts = []
for key, value in params.items():
query_parts.append(f"{quote_plus(key)}={quote_plus(str(value))}")
query_string = "&".join(query_parts)
url = f"{base_url}?{query_string}"
try:
headers = {"Accept": "application/json"}
response = requests.get(url, headers=headers, timeout=30)
response.raise_for_status()
trove_data = response.json()
# Transform trove format to standard OSF API format
transformed_data = []
for item in trove_data.get("data", []):
# Extract OSF ID from @id field
osf_id = ""
if "@id" in item and "osf.io/" in item["@id"]:
osf_id = item["@id"].split("/")[-1]
# Filter by provider if specified
if provider_id:
# Check if this item is from the specified provider
publisher_info = item.get("publisher", [])
if isinstance(publisher_info, list) and len(publisher_info) > 0:
publisher_id = publisher_info[0].get("@id", "")
# Extract provider ID from publisher URL (e.g., "https://osf.io/preprints/psyarxiv" -> "psyarxiv")
if provider_id not in publisher_id:
continue # Skip this item if it doesn't match the provider
else:
continue # Skip if no publisher info
# Transform to standard format
transformed_item = {
"id": osf_id,
"type": "preprints",
"attributes": {
"title": extract_first_value(item.get("title", [])),
"description": extract_first_value(item.get("description", [])),
"date_created": extract_first_value(item.get("dateCreated", [])),
"date_published": extract_first_value(item.get("dateAccepted", [])),
"date_modified": extract_first_value(item.get("dateModified", [])),
"doi": extract_doi_from_identifiers(item.get("identifier", [])),
"tags": [kw.get("@value", "") for kw in item.get("keyword", [])],
"subjects": [subj.get("prefLabel", [{}])[0].get("@value", "") for subj in item.get("subject", [])],
},
"relationships": {},
"links": {"self": item.get("@id", "")},
}
transformed_data.append(transformed_item)
# Return in standard OSF API format
return {
"data": transformed_data,
"meta": {
"version": "2.0", # Match OSF API version
"total": trove_data.get("meta", {}).get("total", len(transformed_data)),
"search_note": f"Results from trove search for query: '{query}'",
},
"links": {
"first": trove_data.get("links", {}).get("first", ""),
"next": trove_data.get("links", {}).get("next", ""),
"last": "",
"prev": "",
"meta": "",
},
}
except requests.exceptions.RequestException as e:
raise ValueError(f"Trove search failed: {str(e)}")
def extract_first_value(field_list):
"""Extract the first @value from a field list."""
if isinstance(field_list, list) and len(field_list) > 0:
if isinstance(field_list[0], dict) and "@value" in field_list[0]:
return field_list[0]["@value"]
elif isinstance(field_list[0], str):
return field_list[0]
return ""
def extract_doi_from_identifiers(identifiers):
"""Extract DOI from identifier list."""
for identifier in identifiers:
if isinstance(identifier, dict) and "@value" in identifier:
value = identifier["@value"]
if "doi.org" in value or value.startswith("10."):
return value
return ""
def fetch_single_osf_preprint_metadata(preprint_id: str) -> Dict[str, Any]:
try:
preprint_url = f"https://api.osf.io/v2/preprints/{preprint_id}"
response = requests.get(preprint_url, timeout=30)
response.raise_for_status()
preprint_data = response.json()
primary_file_url = preprint_data["data"]["relationships"]["primary_file"]["links"]["related"]["href"]
file_response = requests.get(primary_file_url, timeout=30)
file_response.raise_for_status()
file_data = file_response.json()
# Get the download URL
download_url = file_data["data"]["links"]["download"]
# Prepare metadata first
attributes = preprint_data["data"]["attributes"]
metadata = {
"id": preprint_id,
"title": attributes.get("title", ""),
"description": attributes.get("description", ""),
"date_created": attributes.get("date_created", ""),
"date_published": attributes.get("date_published", ""),
"date_modified": attributes.get("date_modified", ""),
"is_published": attributes.get("is_published", False),
"is_preprint_orphan": attributes.get("is_preprint_orphan", False),
"license_record": attributes.get("license_record", {}),
"doi": attributes.get("doi", ""),
"tags": attributes.get("tags", []),
"subjects": attributes.get("subjects", []),
"download_url": download_url,
}
if not download_url:
return {"status": "error", "message": "Download URL not available", "metadata": metadata}
return metadata
except requests.exceptions.RequestException as e:
raise ValueError(f"Failed to fetch preprint metadata: {str(e)}")
```