#
tokens: 3940/50000 13/13 files
lines: off (toggle) GitHub
raw markdown copy
# Directory Structure

```
├── .gitignore
├── Cargo.lock
├── Cargo.toml
├── extension.toml
├── pdf_rag
│   ├── .aiignore
│   ├── .python-version
│   ├── pyproject.toml
│   ├── README.md
│   ├── src
│   │   └── pdf_rag
│   │       ├── __init__.py
│   │       ├── env.py
│   │       ├── rag.py
│   │       └── server.py
│   └── uv.lock
├── README.md
└── src
    └── lib.rs
```

# Files

--------------------------------------------------------------------------------
/pdf_rag/.python-version:
--------------------------------------------------------------------------------

```
3.12

```

--------------------------------------------------------------------------------
/pdf_rag/.aiignore:
--------------------------------------------------------------------------------

```
.env
.venv
.idea
pdfsearch.sqlite

```

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
/target
__pycache__


.idea
.venv
.ruff_cache

*.wasm
*.sqlite

**/.env

```

--------------------------------------------------------------------------------
/pdf_rag/README.md:
--------------------------------------------------------------------------------

```markdown
# About

The heart of the extension - this is the MCP server logic.


```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
# PDF Search for Zed

A document search extension for Zed that lets you semantically search through a
PDF document and use the results in Zed's AI Assistant.

## Prerequisites

This extension currently requires:

1. An `OpenAI` API key (to generate embeddings)
2. `uv` installed on your system

**Note:** While the current setup requires an OpenAI API key for generating embeddings, we plan to implement a self-contained alternative in future versions. Community feedback will help prioritize these improvements.

## Quick Start

1. Clone the repository

```bash
git clone https://github.com/freespirit/pdfsearch-zed.git
```

2. Set up the Python environment for the MCP server:

```bash
cd pdfsearch-zed/pdf_rag
uv venv
uv sync
```

3. [Install Dev Extension](https://zed.dev/docs/extensions/developing-extensions) in Zed

4. Build the search db

```bash
cd /path/to/pdfsearch-zed/pdf_rag

echo "OPENAI_API_KEY=sk-..." > src/pdf_rag/.env

# This may take a couple of minutes, depending on the documents' size
# You can provide multiple files and directories as arguments.
#  - files would be chunked.
#  - a directory would be considered as if its files contains chunks.
#    E.g. they won't be further split.
uv run src/pdf_rag/rag.py build "file1.pdf" "dir1" "file2.md" ...
```

5. Configure Zed

```json
"context_servers": {
    "pdfsearch-context-server": {
        "settings": {
            "extension_path": "/path/to/pdfsearch-zed"
        }
    }
}
```

## Usage

1. Open Zed's AI Assistant panel
2. Type `/pdfsearch` followed by your search query
3. The extension will search the PDF and add relevant sections to the AI
   Assistant's context

## Future Improvements

- [x] Self-contained vector store
- [ ] Self-contained embeddings
- [ ] Automated index building on first run
- [ ] Configurable result size
- [x] Support for multiple PDFs
- [x] Optional: Additional file formats beyond PDF

## Project Structure

- `pdf_rag/`: Python-based MCP server implementation
- `src/`: Zed extension code
- `extension.toml` and `Cargo.toml`: Zed extension configuration files

## Known Limitations

- Manual index building is required before first use
- Requires external services (OpenAI)

```

--------------------------------------------------------------------------------
/Cargo.toml:
--------------------------------------------------------------------------------

```toml
[package]
name = "pdfsearch"
version = "0.1.0"
edition = "2021"

[lib]
crate-type = ["cdylib"]

[dependencies]
serde = "1.0.217"
zed_extension_api = "0.3.0"


```

--------------------------------------------------------------------------------
/pdf_rag/src/pdf_rag/__init__.py:
--------------------------------------------------------------------------------

```python
import asyncio

from . import env, server


def main():
    """Main entry point for the package."""
    asyncio.run(server.main())


__all__ = ["main", "server"]

```

--------------------------------------------------------------------------------
/extension.toml:
--------------------------------------------------------------------------------

```toml
id = "pdfsearch"
name = "PDF Search"
description = "A PDF search (RAG-style) MCP for Zed"
version = "0.2.0"
schema_version = 1
authors = ["stano <TBD email>"]
repository = "https://github.com/freespirit/pdfsearch-zed"


[context_servers.my-context-server]
name = "PDF Search"
```

--------------------------------------------------------------------------------
/pdf_rag/pyproject.toml:
--------------------------------------------------------------------------------

```toml
[project]
name = "pdf_rag"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
    "httpx>=0.28.1",
    "libsql-experimental==0.0.41",
    "mcp[cli]>=1.6.0",
    "openai>=1.58.1",
    "pypdf>=5.1.0",
    "tiktoken>=0.8.0",
]

[build-system]
requires = [ "hatchling",]
build-backend = "hatchling.build"

[project.scripts]
pdf_rag = "pdf_rag:main"

```

--------------------------------------------------------------------------------
/pdf_rag/src/pdf_rag/env.py:
--------------------------------------------------------------------------------

```python
import os


def load_env_file(file_path='.env'):
    with open(file_path, 'r') as file:
        for line in file:
            line = line.strip()
            if not line or line.startswith('#'):
                continue

            if '=' in line:
                key, value = line.split('=', 1)
                key = key.strip()
                value = value.strip()

                # Remove quotes if present
                if value and (value[0] == value[-1] == '"' or value[0] == value[-1] == "'"):
                    value = value[1:-1]

                os.environ[key] = value

```

--------------------------------------------------------------------------------
/src/lib.rs:
--------------------------------------------------------------------------------

```rust
use std::path::Path;

use serde::Deserialize;
use zed_extension_api::{
    self as zed, serde_json, settings::ContextServerSettings, ContextServerId,
};

#[derive(Debug, Deserialize)]
struct PdfSearchContextServerSettings {
    extension_path: String,
}

struct MyExtension {}

impl zed::Extension for MyExtension {
    fn new() -> Self
    where
        Self: Sized,
    {
        Self {}
    }

    fn context_server_command(
        &mut self,
        context_server_id: &ContextServerId,
        project: &zed::Project,
    ) -> zed::Result<zed::Command> {
        let settings = ContextServerSettings::for_project("pdfsearch-context-server", project)?;
        let Some(settings) = settings.settings else {
            return Err("Missing `context_servers` settings for `pdfsearch-context-server`".into());
        };
        let settings: PdfSearchContextServerSettings =
            serde_json::from_value(settings).map_err(|e| e.to_string())?;

        let mcp_python_module = String::from("pdf_rag");
        let extension_path = Path::new(settings.extension_path.as_str());
        let mcp_server_path = extension_path.join(&mcp_python_module);

        Ok(zed::Command {
            command: "uv".to_string(),
            args: vec![
                format!("--directory={}", mcp_server_path.to_string_lossy()),
                "run".to_string(),
                mcp_python_module,
            ],
            env: vec![],
        })
    }
}

zed::register_extension!(MyExtension);

```

--------------------------------------------------------------------------------
/pdf_rag/src/pdf_rag/server.py:
--------------------------------------------------------------------------------

```python
import asyncio

import mcp.server.stdio
import mcp.types as types
from mcp.server import NotificationOptions, Server
from mcp.server.models import InitializationOptions

from pdf_rag.env import load_env_file
from pdf_rag.rag import RAG

load_env_file()

server = Server("pdfsearch-server")


@server.list_prompts()
async def list_prompts() -> list[types.Prompt]:
    return [
        types.Prompt(
            name="pdfsearch",
            description="Do a RAG-style expansion of your prompt, enriching it with relevant information from the PDF.",
            arguments=[
                types.PromptArgument(
                    name="input",
                    description="What to look for in the document.",
                    required=True,
                )
            ]
        )
    ]


@server.list_tools()
async def list_tools() -> list[types.Tool]:
    return [
        types.Tool(
            name="pdfsearch",
            description="Retrieve relevant information from a document.",
            inputSchema={
                "type": "object",
                "required": ["query"],
                "properties": {
                    "query": {
                        "type": "string",
                        "description": "User provided query (to search for in the documents)",
                    }
                },
            },
        )
    ]


@server.get_prompt()
async def get_prompt(
        name: str,
        arguments: dict[str, str] | None = None
) -> types.GetPromptResult:
    if name != "pdfsearch":
        raise ValueError(f"Prompt not found: {name}")

    user_input = arguments.get("input") if arguments else ""
    result = await _search(user_input)
    return types.GetPromptResult(
        messages=[
            types.PromptMessage(
                role="user",
                content=types.TextContent(
                    type="text",
                    text=result,
                ),
            )
        ]
    )


@server.call_tool()
async def call_a_tool(
        name: str,
        arguments: dict
) -> list[types.TextContent | types.ImageContent | types.EmbeddedResource]:
    if name != "pdfsearch":
        raise ValueError(f"Unknown tool: {name}")
    # if "query" not in arguments:
    #     raise ValueError("Missing required argument 'query'")

    result = f"received: {arguments}"
    try:
        user_input = arguments["query"]
        result = await _search(user_input)
    except Exception as e:
        result = str(e.__repr__())

    return [types.TextContent(type="text", text=result)]


async def _search(user_input):
    # TODO figure out when to build the vector db
    rag = RAG()
    related_chunks = await rag.search(user_input)
    response = ""
    for chunk in related_chunks:
        response += "<text>\n"
        response += chunk
        response += "</text>\n"
    response += "\n"
    return response


async def main():
    # Run the server using stdin/stdout streams
    async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
        await server.run(
            read_stream,
            write_stream,
            InitializationOptions(
                server_name="pdf_rag",
                server_version="0.1.0",
                capabilities=server.get_capabilities(
                    notification_options=NotificationOptions(),
                    experimental_capabilities={},
                ),
            ),
        )


# This is needed if you'd like to connect to a custom client
if __name__ == "__main__":
    asyncio.run(main())

```

--------------------------------------------------------------------------------
/pdf_rag/src/pdf_rag/rag.py:
--------------------------------------------------------------------------------

```python
"""THe RAG backbone of this MCP server"""

import argparse
import asyncio
from pathlib import Path
from typing import List

import libsql_experimental as libsql
import tiktoken
from openai import AsyncOpenAI
from pypdf import PdfReader
from tqdm import tqdm
from typing_extensions import Tuple

from pdf_rag.env import load_env_file

VECTOR_COLLECTION_NAME = "document_chunks"
EMBEDDING_DIMENSIONS = 1024

QUERY_DROP = f"DROP TABLE IF EXISTS {VECTOR_COLLECTION_NAME}"

QUERY_CREATE = (
    f"CREATE TABLE IF NOT EXISTS {VECTOR_COLLECTION_NAME} ("
    f"  text TEXT,"
    f"  embedding F32_BLOB({EMBEDDING_DIMENSIONS})"
    f");"
)

QUERY_INSERT = (
    f"INSERT INTO {VECTOR_COLLECTION_NAME} (text, embedding) VALUES (?, vector32(?))"
)

QUERY_SEARCH = (
    "SELECT text, vector_distance_cos(embedding, vector32(?)) "
    f"  FROM {VECTOR_COLLECTION_NAME} "
    f"  ORDER BY vector_distance_cos(embedding, vector32(?)) ASC "
    f"  LIMIT 10;"
)

load_env_file()


class RAG:
    """Basic document RAG system.

    Splits a document in chunks and later retrieves the most relevant chunks for a given query.
    """

    openai: AsyncOpenAI
    db_file: Path

    def __init__(self, db_path: str = "pdfsearch.sqlite"):
        self.openai = AsyncOpenAI()
        self.db_file = Path(db_path)

    def build_search_db(self):
        conn = libsql.connect(str(self.db_file.absolute()))
        conn.execute(QUERY_DROP)
        conn.execute(QUERY_CREATE)
        conn.commit()

    def add_knowledge(self, text: str, embedding: List[float]):
        conn = libsql.connect(str(self.db_file.absolute()))
        conn.execute(QUERY_INSERT, (text, str(embedding)))
        conn.commit()

    async def search(self, query: str) -> List[str]:
        embedding = await embed(query, self.openai)

        conn = libsql.connect(str(self.db_file.absolute()))
        search_result = conn.execute(
            QUERY_SEARCH,
            (str(embedding), str(embedding)),
        ).fetchall()
        conn.commit()

        for row in search_result:
            print(row[0][:25], row[1])

        return [row[0] for row in search_result]


def chunkify(
    text: str, max_tokens: int = 512, overlap: int = 64, tokenizer=None
) -> List[str]:
    """Split text into token-based chunks with overlap."""
    if not text.strip():
        return []

    if tokenizer is None:
        tokenizer = tiktoken.get_encoding("cl100k_base")

    # Tokenize the entire text first
    tokenized_text = tokenizer.encode(text)
    num_tokens = len(tokenized_text)
    chunks = []

    # Iterate over the tokenized text in `max_tokens` increments with specified overlap
    start_idx = 0
    while start_idx < num_tokens:
        end_idx = min(start_idx + max_tokens, num_tokens)
        chunk_tokens = tokenized_text[start_idx:end_idx]
        chunk_text = tokenizer.decode(chunk_tokens)  # Decode the tokens back to text
        chunks.append(chunk_text)

        start_idx += max_tokens - overlap  # Move window forward with overlap

    return chunks


async def embed(text: str, client: AsyncOpenAI) -> list[float]:
    response = await client.embeddings.create(
        input=text, model="text-embedding-3-small", dimensions=1024
    )
    return response.data[0].embedding


async def embed_pdf(
    file_path: Path, should_split: bool = True
) -> List[Tuple[str, List[float]]]:
    pdf_text = ""
    with open(file_path, "rb") as pdf_file:
        reader = PdfReader(pdf_file)
        for page in reader.pages:
            pdf_text += page.extract_text(extraction_mode="plain")

    chunks = chunkify(text=pdf_text) if should_split else [pdf_text]
    tasks = [
        embed(chunk, AsyncOpenAI())
        for chunk in tqdm(chunks, desc=f"Embedding {file_path.name}")
    ]
    embeddings = await asyncio.gather(*tasks)

    return list(zip(chunks, embeddings))


async def embed_text(
    text: str,
    should_split: bool = True,
) -> List[Tuple[str, List[float]]]:
    chunks = chunkify(text=text) if should_split else [text]
    tasks = [
        embed(chunk, AsyncOpenAI())
        for chunk in tqdm(chunks, desc=f"Embedding {text[:25]}")
    ]
    embeddings = await asyncio.gather(*tasks)

    return list(zip(chunks, embeddings))


async def main():
    parser = argparse.ArgumentParser(description="PDF document RAG system")
    parser.add_argument(
        "action",
        choices=["build", "search", "chunkify"],
        help="Action to perform",
    )
    parser.add_argument(
        "inputs", nargs="+", help="Input files/directories or a search query"
    )
    args = parser.parse_args()

    if args.action == "build":
        rag = RAG()
        rag.build_search_db()

        # Process each input path
        total_chunks = 0
        for input_path in args.inputs:
            path = Path(input_path)

            if path.is_dir():
                tasks = []
                texts = []
                for text_file in tqdm(
                    path.glob("*.txt"), desc=f"Embedding files in {path.name}"
                ):
                    text = text_file.read_text()
                    tasks.append(embed(text, AsyncOpenAI()))
                    texts.append(text)

                embeddings = await asyncio.gather(*tasks)
                for text, embedding in zip(texts, embeddings):
                    rag.add_knowledge(text, embedding)

            elif path.is_file() and path.suffix.lower() == ".pdf":
                embeddings = await embed_pdf(path)
                for chunk, embedding in embeddings:
                    rag.add_knowledge(chunk, embedding)
                total_chunks += len(embeddings)
            else:  # Assume a single file in other text format - txt, md...
                text = path.read_text()
                embeddings = await embed_text(text)
                for text, embedding in embeddings:
                    rag.add_knowledge(text, embedding)
                    total_chunks += 1

        print(f"Inserted {total_chunks} chunks of text")

    elif args.action == "search":
        rag = RAG()
        result = rag.search(args.inputs[0])

    elif args.action == "chunkify":
        pdf_text = ""
        with open(args.inputs[0], "rb") as pdf_file:
            reader = PdfReader(pdf_file)
            for page in reader.pages:
                pdf_text += page.extract_text(extraction_mode="plain")

        chunks = chunkify(text=pdf_text)


if __name__ == "__main__":
    asyncio.run(main())

```