#
tokens: 11555/50000 19/19 files
lines: off (toggle) GitHub
raw markdown copy
# Directory Structure

```
├── _config.yml
├── .env.example
├── .gitignore
├── .python-version
├── .vscode
│   └── settings.json
├── CHANGELOG.md
├── database
│   └── .gitkeep
├── Dockerfile
├── LICENSE
├── Makefile
├── pyproject.toml
├── README.md
├── smithery.yaml
├── src
│   └── mcp_pinecone
│       ├── __init__.py
│       ├── chunking.py
│       ├── constants.py
│       ├── pinecone.py
│       ├── prompts.py
│       ├── server.py
│       ├── tools.py
│       └── utils.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/database/.gitkeep:
--------------------------------------------------------------------------------

```

```

--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------

```
3.12

```

--------------------------------------------------------------------------------
/.env.example:
--------------------------------------------------------------------------------

```
PINECONE_API_KEY=
PINECONE_INDEX_NAME=

```

--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------

```
# Python-generated files
__pycache__/
*.py[oc]
build/
dist/
wheels/
*.egg-info

# Mac OS
.DS_Store

# Virtual environments
.venv

# Apple Notes database for development
database/*
!database/.gitkeep

# Environment variables
.env

```

--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------

```markdown
# Pinecone Model Context Protocol Server for Claude Desktop.

[![smithery badge](https://smithery.ai/badge/mcp-pinecone)](https://smithery.ai/server/mcp-pinecone)

[![PyPI - Downloads](https://img.shields.io/pypi/dd/mcp-pinecone?style=flat)](https://pypi.org/project/mcp-pinecone/)

Read and write to a Pinecone index.


## Components

```mermaid
flowchart TB
    subgraph Client["MCP Client (e.g., Claude Desktop)"]
        UI[User Interface]
    end

    subgraph MCPServer["MCP Server (pinecone-mcp)"]
        Server[Server Class]
        
        subgraph Handlers["Request Handlers"]
            ListRes[list_resources]
            ReadRes[read_resource]
            ListTools[list_tools]
            CallTool[call_tool]
            GetPrompt[get_prompt]
            ListPrompts[list_prompts]
        end
        
        subgraph Tools["Implemented Tools"]
            SemSearch[semantic-search]
            ReadDoc[read-document]
            ListDocs[list-documents]
            PineconeStats[pinecone-stats]
            ProcessDoc[process-document]
        end
    end

    subgraph PineconeService["Pinecone Service"]
        PC[Pinecone Client]
        subgraph PineconeFunctions["Pinecone Operations"]
            Search[search_records]
            Upsert[upsert_records]
            Fetch[fetch_records]
            List[list_records]
            Embed[generate_embeddings]
        end
        Index[(Pinecone Index)]
    end

    %% Connections
    UI --> Server
    Server --> Handlers
    
    ListTools --> Tools
    CallTool --> Tools
    
    Tools --> PC
    PC --> PineconeFunctions
    PineconeFunctions --> Index
    
    %% Data flow for semantic search
    SemSearch --> Search
    Search --> Embed
    Embed --> Index
    
    %% Data flow for document operations
    UpsertDoc --> Upsert
    ReadDoc --> Fetch
    ListRes --> List

    classDef primary fill:#2563eb,stroke:#1d4ed8,color:white
    classDef secondary fill:#4b5563,stroke:#374151,color:white
    classDef storage fill:#059669,stroke:#047857,color:white
    
    class Server,PC primary
    class Tools,Handlers secondary
    class Index storage
```

### Resources

The server implements the ability to read and write to a Pinecone index.

### Tools

- `semantic-search`: Search for records in the Pinecone index.
- `read-document`: Read a document from the Pinecone index.
- `list-documents`: List all documents in the Pinecone index.
- `pinecone-stats`: Get stats about the Pinecone index, including the number of records, dimensions, and namespaces.
- `process-document`: Process a document into chunks and upsert them into the Pinecone index. This performs the overall steps of chunking, embedding, and upserting.

Note: embeddings are generated via Pinecone's inference API and chunking is done with a token-based chunker. Written by copying a lot from langchain and debugging with Claude.
## Quickstart

### Installing via Smithery

To install Pinecone MCP Server for Claude Desktop automatically via [Smithery](https://smithery.ai/server/mcp-pinecone):

```bash
npx -y @smithery/cli install mcp-pinecone --client claude
```

### Install the server

Recommend using [uv](https://docs.astral.sh/uv/getting-started/installation/) to install the server locally for Claude.

```
uvx install mcp-pinecone
```
OR
```
uv pip install mcp-pinecone
```

Add your config as described below.

#### Claude Desktop

On MacOS: `~/Library/Application\ Support/Claude/claude_desktop_config.json`
On Windows: `%APPDATA%/Claude/claude_desktop_config.json`

Note: You might need to use the direct path to `uv`. Use `which uv` to find the path.


__Development/Unpublished Servers Configuration__
  
```json
"mcpServers": {
  "mcp-pinecone": {
    "command": "uv",
    "args": [
      "--directory",
      "{project_dir}",
      "run",
      "mcp-pinecone"
    ]
  }
}
```


__Published Servers Configuration__
  
```json
"mcpServers": {
  "mcp-pinecone": {
    "command": "uvx",
    "args": [
      "--index-name",
      "{your-index-name}",
      "--api-key",
      "{your-secret-api-key}",
      "mcp-pinecone"
    ]
  }
}
```

#### Sign up to Pinecone

You can sign up for a Pinecone account [here](https://www.pinecone.io/).

#### Get an API key

Create a new index in Pinecone, replacing `{your-index-name}` and get an API key from the Pinecone dashboard, replacing `{your-secret-api-key}` in the config.

## Development

### Building and Publishing

To prepare the package for distribution:

1. Sync dependencies and update lockfile:
```bash
uv sync
```

2. Build package distributions:
```bash
uv build
```

This will create source and wheel distributions in the `dist/` directory.

3. Publish to PyPI:
```bash
uv publish
```

Note: You'll need to set PyPI credentials via environment variables or command flags:
- Token: `--token` or `UV_PUBLISH_TOKEN`
- Or username/password: `--username`/`UV_PUBLISH_USERNAME` and `--password`/`UV_PUBLISH_PASSWORD`

### Debugging

Since MCP servers run over stdio, debugging can be challenging. For the best debugging
experience, we strongly recommend using the [MCP Inspector](https://github.com/modelcontextprotocol/inspector).


You can launch the MCP Inspector via [`npm`](https://docs.npmjs.com/downloading-and-installing-node-js-and-npm) with this command:

```bash
npx @modelcontextprotocol/inspector uv --directory {project_dir} run mcp-pinecone
```


Upon launching, the Inspector will display a URL that you can access in your browser to begin debugging.

## License

This project is licensed under the MIT License. See the [LICENSE](LICENSE) file for details.

## Source Code

The source code is available on [GitHub](https://github.com/sirmews/mcp-pinecone).

## Contributing

Send your ideas and feedback to me on [Bluesky](https://bsky.app/profile/perfectlycromulent.bsky.social) or by opening an issue.

```

--------------------------------------------------------------------------------
/_config.yml:
--------------------------------------------------------------------------------

```yaml
remote_theme: pages-themes/[email protected]
plugins:
- jekyll-remote-theme

```

--------------------------------------------------------------------------------
/.vscode/settings.json:
--------------------------------------------------------------------------------

```json
{
    "[python]": {
        "editor.formatOnSave": true,
        "editor.defaultFormatter": "charliermarsh.ruff"
    }
}
```

--------------------------------------------------------------------------------
/src/mcp_pinecone/__init__.py:
--------------------------------------------------------------------------------

```python
from . import server
import asyncio


def main():
    asyncio.run(server.main())


# Optionally expose other important items at package level
__all__ = ["main", "server"]

```

--------------------------------------------------------------------------------
/src/mcp_pinecone/utils.py:
--------------------------------------------------------------------------------

```python
class MCPToolError(Exception):
    """Custom exception for MCP tool errors"""

    def __init__(self, code: int, message: str):
        self.code = code
        self.message = message
        super().__init__(message)


def is_valid_vector_uri(uri: str) -> bool:
    """
    Validate vector URI format

    Parameters:
        uri: The URI to validate.

    Returns:
        bool: True if the URI is valid, False otherwise.s
    """
    try:
        if not uri.startswith("pinecone://vectors/"):
            return False
        vector_id = uri.split("/")[-1]
        return bool(vector_id.strip())  # Ensure non-empty ID
    except Exception:
        return False

```

--------------------------------------------------------------------------------
/smithery.yaml:
--------------------------------------------------------------------------------

```yaml
# Smithery configuration file: https://smithery.ai/docs/config#smitheryyaml

startCommand:
  type: stdio
  configSchema:
    # JSON Schema defining the configuration options for the MCP.
    type: object
    required:
      - indexName
      - apiKey
    properties:
      indexName:
        type: string
        description: The name of the Pinecone index.
      apiKey:
        type: string
        description: The API key for accessing Pinecone.
  commandFunction:
    # A function that produces the CLI command to start the MCP on stdio.
    |-
    config => ({command: 'uv', args: ['run', 'mcp-pinecone', '--index-name', config.indexName, '--api-key', config.apiKey]})

```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
[project]
name = "mcp-pinecone"
version = "0.1.8"
description = "Read and write to Pinecone from Claude Desktop with Model Context Protocol."
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
 "httpx>=0.28.0",
 "jsonschema>=4.23.0",
 "mcp>=1.0.0",
 "pinecone>=5.4.1",
 "python-dotenv>=1.0.1",
 "tiktoken>=0.8.0",
]
classifiers = [
    "Programming Language :: Python :: 3",
    "License :: OSI Approved :: MIT License",
    "Operating System :: MacOS",
]
[[project.authors]]
name = "Navishkar Rao"
email = "[email protected]"

[build-system]
requires = [ "hatchling",]
build-backend = "hatchling.build"

[project.scripts]
mcp-pinecone = "mcp_pinecone:main"

[tool.mcp-pinecone]
server_name = "mcp-pinecone"

[project.urls]
Homepage = "https://sirmews.github.io/mcp-pinecone/"
Issues = "https://github.com/sirmews/mcp-pinecone/issues"

```

--------------------------------------------------------------------------------
/Dockerfile:
--------------------------------------------------------------------------------

```dockerfile
# Generated by https://smithery.ai. See: https://smithery.ai/docs/config#dockerfile
# Use a Python image with uv pre-installed
FROM ghcr.io/astral-sh/uv:python3.12-bookworm-slim AS uv

# Set the working directory
WORKDIR /app

# Copy the project files to the working directory
ADD . /app

# Enable bytecode compilation
ENV UV_COMPILE_BYTECODE=1

# Copy from the cache instead of linking since it's a mounted volume
ENV UV_LINK_MODE=copy

# Sync the dependencies and lockfile
RUN --mount=type=cache,target=/root/.cache/uv     --mount=type=bind,source=uv.lock,target=uv.lock     --mount=type=bind,source=pyproject.toml,target=pyproject.toml     uv sync --frozen --no-install-project --no-dev --no-editable

# Install the project
RUN --mount=type=cache,target=/root/.cache/uv     uv sync --frozen --no-dev --no-editable

FROM python:3.12-slim-bookworm

WORKDIR /app
 
COPY --from=uv /root/.local /root/.local
COPY --from=uv --chown=app:app /app/.venv /app/.venv

# Place executables in the environment at the front of the path
ENV PATH="/app/.venv/bin:$PATH"

# Entry point for running the MCP server
ENTRYPOINT ["uv", "run", "mcp-pinecone"]

```

--------------------------------------------------------------------------------
/CHANGELOG.md:
--------------------------------------------------------------------------------

```markdown
# Changelog

All notable changes to the MCP-Pinecone project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.1.8] - 2025-01-04
### Added
- Added `pinecone-store` prompt to store documents in Pinecone
- Added `pinecone-stats` tool to get stats about the Pinecone index
### Changed
- Refactoring across the codebase to make it more modular and easier to extend
- Removed `embed-document` tool
- Removed `chunk-document` tool

## [0.1.7] - 2025-01-01
### Added
- Updated prompt to `pinecone-query` because Zed can't use tools.

## [0.1.6] - 2024-12-31
### Added
- Added `chunk_enabled` argument to `process-document` tool to enable/disable chunking. Defaults to false.
- Added `list-documents` tool to list all documents in a namespace

## [0.1.5] - 2024-12-29
### Added
- Added `process-document` tool to combine chunking, embedding, and upserting documents into Pinecone
- Added `chunk-document` tool to explicitly chunk documents into chunks
- Added `embed-document` tool to explicitly embed documents into Pinecone
- Mention Pinecone api in README

## [0.1.4] - 2024-12-20
### Added
- Added `langchain` dependency for chunking
- Auto chunk documents by markdown headers

## [0.1.3] - 2024-12-20
### Added
- Namespace support for all vector operations (search, read, upsert)
- Explicit namespace parameter in tool schemas

### Changed
- Updated MCP package to latest version

## [0.1.0 - 0.1.2]
### Added
- Initial public release
- Basic Pinecone integration with MCP
- Semantic search capabilities
- Document reading and writing
- Metadata support
```

--------------------------------------------------------------------------------
/src/mcp_pinecone/constants.py:
--------------------------------------------------------------------------------

```python
# Index name
import os
import argparse
from dotenv import load_dotenv

load_dotenv()


def get_pinecone_config():
    parser = argparse.ArgumentParser(description="Pinecone MCP Configuration")
    parser.add_argument(
        "--index-name",
        default=None,
        help="Name of the Pinecone index to use. Will use environment variable PINECONE_INDEX_NAME if not provided.",
    )
    parser.add_argument(
        "--api-key",
        default=None,
        help="API key for Pinecone. Will use environment variable PINECONE_API_KEY if not provided.",
    )
    args = parser.parse_args()

    # Use command line arguments if provided, otherwise fall back to environment variables
    index_name = args.index_name or os.getenv("PINECONE_INDEX_NAME")
    api_key = args.api_key or os.getenv("PINECONE_API_KEY")

    # Set default index name if none provided
    if not index_name:
        index_name = "mcp-pinecone-index"
        print(f"No index name provided, using default: {index_name}")

    # Validate API key
    if not api_key:
        raise ValueError(
            "Pinecone API key is required. Provide it via --api-key argument or PINECONE_API_KEY environment variable"
        )

    return index_name, api_key


# Get configuration values
PINECONE_INDEX_NAME, PINECONE_API_KEY = get_pinecone_config()

# Validate configuration after loading
if not PINECONE_INDEX_NAME or not PINECONE_API_KEY:
    raise ValueError(
        "Missing required configuration. Ensure PINECONE_INDEX_NAME and PINECONE_API_KEY "
        "are set either via environment variables or command line arguments."
    )

# Inference API model name
INFERENCE_MODEL = "multilingual-e5-large"

# Inference API embedding dimension
INFERENCE_DIMENSION = 1024

# Export values for use in other modules
__all__ = [
    "PINECONE_INDEX_NAME",
    "PINECONE_API_KEY",
    "INFERENCE_MODEL",
    "INFERENCE_DIMENSION",
]

```

--------------------------------------------------------------------------------
/src/mcp_pinecone/server.py:
--------------------------------------------------------------------------------

```python
import logging
from typing import Union
from mcp.server.models import InitializationOptions
import mcp.types as types
from mcp.server import NotificationOptions, Server
from pydantic import AnyUrl
import mcp.server.stdio
from .pinecone import PineconeClient
from .tools import register_tools
from .prompts import register_prompts
import importlib.metadata

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("pinecone-mcp")

pinecone_client = None
server = Server("pinecone-mcp")


@server.list_resources()
async def handle_list_resources() -> list[types.Resource]:
    try:
        if pinecone_client is None:
            logger.error("Pinecone client is not initialized")
            return []
        records = pinecone_client.list_records()

        resources = []
        for record in records.get("vectors", []):
            # If metadata is None, use empty dict
            metadata = record.get("metadata") or {}
            description = (
                metadata.get("text", "")[:100] + "..." if metadata.get("text") else ""
            )
            resources.append(
                types.Resource(
                    uri=f"pinecone://vectors/{record['id']}",
                    name=metadata.get("title", f"Vector {record['id']}"),
                    description=description,
                    metadata=metadata,
                    mimeType=metadata.get("content_type", "text/plain"),
                )
            )
        return resources
    except Exception as e:
        logger.error(f"Error listing resources: {e}")
        return []


@server.read_resource()
async def handle_read_resource(uri: AnyUrl) -> Union[str, bytes]:
    if not str(uri).startswith("pinecone://vectors/"):
        raise ValueError(f"Unsupported URI scheme: {uri}")

    try:
        vector_id = str(uri).split("/")[-1]
        record = pinecone_client.fetch_records([vector_id])

        if not record or "records" not in record or not record["records"]:
            raise ValueError(f"Vector not found: {vector_id}")

        vector_data = record["records"][0]
        metadata = vector_data.get("metadata", {})
        content_type = metadata.get("content_type", "text/plain")

        if content_type.startswith("text/"):
            return format_text_content(vector_data)
        else:
            return format_binary_content(vector_data)
    except Exception as e:
        raise RuntimeError(f"Pinecone error: {str(e)}")


def format_text_content(vector_data: dict) -> str:
    metadata = vector_data.get("metadata", {})
    output = []

    if "title" in metadata:
        output.append(f"Title: {metadata['title']}")
    output.append(f"ID: {vector_data.get('id')}")

    for key, value in metadata.items():
        if key not in ["title", "text", "content_type"]:
            output.append(f"{key}: {value}")

    output.append("")

    if "text" in metadata:
        output.append(metadata["text"])

    return "\n".join(output)


def format_binary_content(vector_data: dict) -> bytes:
    content = vector_data.get("metadata", {}).get("content", b"")
    if isinstance(content, str):
        content = content.encode("utf-8")
    return content


async def main():
    logger.info("Starting Pinecone MCP server")

    global pinecone_client
    pinecone_client = PineconeClient()

    # Register tools and prompts
    register_tools(server, pinecone_client)
    register_prompts(server, pinecone_client)

    async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
        await server.run(
            read_stream,
            write_stream,
            InitializationOptions(
                server_name="pinecone-mcp",
                server_version=importlib.metadata.version("mcp-pinecone"),
                capabilities=server.get_capabilities(
                    notification_options=NotificationOptions(resources_changed=True),
                    experimental_capabilities={},
                ),
            ),
        )

```

--------------------------------------------------------------------------------
/src/mcp_pinecone/prompts.py:
--------------------------------------------------------------------------------

```python
import logging
from enum import Enum
import mcp.types as types
from mcp.server import Server
from .pinecone import PineconeClient
from datetime import datetime


logger = logging.getLogger("pinecone-mcp")


class PromptName(str, Enum):
    PINECONE_QUERY = "pinecone-query"
    PINECONE_STORE = "pinecone-store"


ServerPrompts = [
    types.Prompt(
        name=PromptName.PINECONE_QUERY,
        description="Search Pinecone index and construct an answer based on relevant pinecone documents",
        arguments=[
            types.PromptArgument(
                name="query",
                description="The question to answer, or the context to search for",
                required=True,
            )
        ],
    ),
    types.Prompt(
        name=PromptName.PINECONE_STORE,
        description="Store content as document in Pinecone",
        arguments=[
            types.PromptArgument(
                name="content",
                description="The content to store as a Pinecone document",
                required=True,
            ),
            types.PromptArgument(
                name="namespace",
                description="The namespace to store the document in",
                required=False,
            ),
        ],
    ),
]


def register_prompts(server: Server, pinecone_client: PineconeClient):
    @server.list_prompts()
    async def handle_list_prompts() -> list[types.Prompt]:
        return ServerPrompts

    @server.get_prompt()
    async def handle_get_prompt(
        name: str, arguments: dict[str, str] | None
    ) -> types.GetPromptResult:
        try:
            if name == PromptName.PINECONE_QUERY:
                return pinecone_query(arguments, pinecone_client)
            elif name == PromptName.PINECONE_STORE:
                return pinecone_store(arguments, pinecone_client)
            else:
                raise ValueError(f"Unknown prompt: {name}")

        except Exception as e:
            logger.error(f"Error calling prompt {name}: {e}")
            raise


def pinecone_store(
    arguments: dict | None, pinecone_client: PineconeClient
) -> list[types.TextContent]:
    """
    Store content as document in Pinecone
    """
    content = arguments.get("content")
    namespace = arguments.get("namespace")

    metadata = {
        "date": datetime.now().isoformat(),
    }

    if not content:
        raise ValueError("Content required")

    return types.GetPromptResult(
        messages=[
            types.PromptMessage(
                role="user",
                content=types.TextContent(
                    type="text",
                    text=f"The namespace is {namespace if namespace else 'not specified'}. \n"
                    "If the namespace is not specified, use pinecone-stats to find an appropriate namespace or use the default namespace.",
                ),
            ),
            types.PromptMessage(
                role="user",
                content=types.TextContent(
                    type="text",
                    text=f"Based on the content, generate metadata that can be relevant to the content and used for filtering. \n"
                    "The metadata should be a dictionary with keys and values that are relevant to the content. \n"
                    f"Append the metdata to {metadata} \n",
                ),
            ),
            types.PromptMessage(
                role="user",
                content=types.TextContent(
                    type="text",
                    text=f"Run the process-document tool with the content: {content} \n"
                    "Include generated metadata in the document. \n"
                    f"Store in the {namespace} if specified",
                ),
            ),
        ]
    )


def pinecone_query(
    arguments: dict | None, pinecone_client: PineconeClient
) -> list[types.TextContent]:
    """
    Search Pinecone index and construct an answer based on relevant pinecone documents
    """
    query = arguments.get("query")
    if not query:
        raise ValueError("Query required")

    return types.GetPromptResult(
        messages=[
            types.PromptMessage(
                role="user",
                content=types.TextContent(
                    type="text",
                    text="First use pinecone-stats to get a list of namespaces that might contain relevant documents. Ignore if a namespace is specified in the query",
                ),
            ),
            types.PromptMessage(
                role="user",
                content=types.TextContent(
                    type="text",
                    text=f"Do a semantic search for the query: {query} with the chosen namespace",
                ),
            ),
        ]
    )


__all__ = [
    "register_prompts",
]

```

--------------------------------------------------------------------------------
/src/mcp_pinecone/tools.py:
--------------------------------------------------------------------------------

```python
import json
import logging
from typing import Dict, Any, TypedDict
from enum import Enum
from typing import Union, Sequence
import mcp.types as types
from mcp.server import Server
from .pinecone import PineconeClient, PineconeRecord
from .utils import MCPToolError
from .chunking import create_chunker, Chunk


logger = logging.getLogger("pinecone-mcp")


class ToolName(str, Enum):
    SEMANTIC_SEARCH = "semantic-search"
    READ_DOCUMENT = "read-document"
    PROCESS_DOCUMENT = "process-document"
    LIST_DOCUMENTS = "list-documents"
    PINECONE_STATS = "pinecone-stats"


ServerTools = [
    types.Tool(
        name=ToolName.SEMANTIC_SEARCH,
        description="Search pinecone for documents",
        inputSchema={
            "type": "object",
            "properties": {
                "query": {"type": "string"},
                "top_k": {"type": "integer", "default": 10},
                "namespace": {
                    "type": "string",
                    "description": "Optional namespace to search in",
                },
                "category": {"type": "string"},
                "tags": {"type": "array", "items": {"type": "string"}},
                "date_range": {
                    "type": "object",
                    "properties": {
                        "start": {"type": "string", "format": "date"},
                        "end": {"type": "string", "format": "date"},
                    },
                },
            },
            "required": ["query"],
        },
    ),
    types.Tool(
        name=ToolName.READ_DOCUMENT,
        description="Read a document from pinecone",
        inputSchema={
            "type": "object",
            "properties": {
                "document_id": {"type": "string"},
                "namespace": {
                    "type": "string",
                    "description": "Optional namespace to read from",
                },
            },
            "required": ["document_id"],
        },
    ),
    types.Tool(
        name=ToolName.PROCESS_DOCUMENT,
        description="Process a document. This will optionally chunk, then embed, and upsert the document into pinecone.",
        inputSchema={
            "type": "object",
            "properties": {
                "document_id": {"type": "string"},
                "text": {"type": "string"},
                "metadata": {"type": "object"},
                "namespace": {
                    "type": "string",
                    "description": "Optional namespace to store the document in",
                },
            },
            "required": ["document_id", "text", "metadata"],
        },
    ),
    types.Tool(
        name=ToolName.LIST_DOCUMENTS,
        description="List all documents in the knowledge base by namespace",
        inputSchema={
            "type": "object",
            "properties": {
                "namespace": {
                    "type": "string",
                    "description": "Namespace to list documents in",
                }
            },
            "required": ["namespace"],
        },
    ),
    types.Tool(
        name=ToolName.PINECONE_STATS,
        description="Get stats about the Pinecone index specified in this server",
        inputSchema={
            "type": "object",
            "properties": {},
            "required": [],
        },
    ),
]


def register_tools(server: Server, pinecone_client: PineconeClient):
    @server.list_tools()
    async def handle_list_tools() -> list[types.Tool]:
        return ServerTools

    @server.call_tool()
    async def handle_call_tool(
        name: str, arguments: dict | None
    ) -> Sequence[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
        try:
            if name == ToolName.SEMANTIC_SEARCH:
                return semantic_search(arguments, pinecone_client)
            if name == ToolName.PINECONE_STATS:
                return pinecone_stats(pinecone_client)
            if name == ToolName.READ_DOCUMENT:
                return read_document(arguments, pinecone_client)
            if name == ToolName.PROCESS_DOCUMENT:
                return process_document(arguments, pinecone_client)
            if name == ToolName.LIST_DOCUMENTS:
                return list_documents(arguments, pinecone_client)

        except Exception as e:
            logger.error(f"Error calling tool {name}: {e}")
            raise


def list_documents(
    arguments: dict | None, pinecone_client: PineconeClient
) -> list[types.TextContent]:
    """
    List all documents in the knowledge base by namespace
    """
    namespace = arguments.get("namespace")
    results = pinecone_client.list_records(namespace=namespace)
    return [types.TextContent(type="text", text=json.dumps(results))]


def pinecone_stats(pinecone_client: PineconeClient) -> list[types.TextContent]:
    """
    Get stats about the Pinecone index specified in this server
    """
    stats = pinecone_client.stats()
    return [types.TextContent(type="text", text=json.dumps(stats))]


def semantic_search(
    arguments: dict | None, pinecone_client: PineconeClient
) -> list[types.TextContent]:
    """
    Read a document from the pinecone knowledge base
    """
    query = arguments.get("query")
    top_k = arguments.get("top_k", 10)
    filters = arguments.get("filters", {})
    namespace = arguments.get("namespace")

    results = pinecone_client.search_records(
        query=query,
        top_k=top_k,
        filter=filters,
        include_metadata=True,
        namespace=namespace,
    )

    matches = results.get("matches", [])

    # Format results with rich context
    formatted_text = "Retrieved Contexts:\n\n"
    for i, match in enumerate(matches, 1):
        metadata = match.get("metadata", {})
        formatted_text += f"Result {i} | Similarity: {match['score']:.3f} | Document ID: {match['id']}\n"
        formatted_text += f"{metadata.get('text', '').strip()}\n"
        formatted_text += "-" * 10 + "\n\n"

    return [types.TextContent(type="text", text=formatted_text)]


def process_document(
    arguments: dict | None, pinecone_client: PineconeClient
) -> list[types.TextContent]:
    """
    Process a document by chunking, embedding, and upserting it into the knowledge base. Returns the document ID.
    """
    document_id = arguments.get("document_id")
    text = arguments.get("text")
    namespace = arguments.get("namespace")
    metadata = arguments.get("metadata", {})

    chunker = create_chunker(chunk_type="smart")
    chunks = chunker.chunk_document(document_id, text, metadata)

    embed_result = embed_document(chunks, pinecone_client)

    embedded_chunks = embed_result.get("embedded_chunks", None)

    if embedded_chunks is None:
        raise MCPToolError("No embedded chunks found")

    upsert_documents(embedded_chunks, pinecone_client, namespace)

    return [
        types.TextContent(
            type="text",
            text=f"Successfully processed document. The document ID is {document_id}",
        )
    ]


class EmbeddingResult(TypedDict):
    embedded_chunks: list[PineconeRecord]
    total_embedded: int


def embed_document(
    chunks: list[Chunk], pinecone_client: PineconeClient
) -> EmbeddingResult:
    """
    Embed a list of chunks.
    Uses the Pinecone client to generate embeddings with the inference API.
    """
    embedded_chunks = []
    for chunk in chunks:
        content = chunk.content
        chunk_id = chunk.id
        metadata = chunk.metadata

        if not content or not chunk_id:
            logger.warning(f"Skipping invalid chunk: {chunk}")
            continue

        embedding = pinecone_client.generate_embeddings(content)
        record = PineconeRecord(
            id=chunk_id,
            embedding=embedding,
            text=content,
            metadata=metadata,
        )
        embedded_chunks.append(record)
    return EmbeddingResult(
        embedded_chunks=embedded_chunks,
        total_embedded=len(embedded_chunks),
    )


def read_document(
    arguments: dict | None, pinecone_client: PineconeClient
) -> list[types.TextContent]:
    """
    Read a single Pinecone document by ID
    """
    document_id = arguments.get("document_id")
    namespace = arguments.get("namespace")
    if not document_id:
        raise ValueError("document_id is required")

    # Fetch the record using your existing fetch_records method
    record = pinecone_client.fetch_records([document_id], namespace=namespace)

    # Get the vector data for this document
    vector = record.vectors.get(document_id)
    if not vector:
        raise ValueError(f"Document {document_id} not found")

    # Get metadata from the vector
    metadata = vector.metadata if hasattr(vector, "metadata") else {}

    # Format the document content
    formatted_content = []
    formatted_content.append(f"Document ID: {document_id}")
    formatted_content.append("")  # Empty line for spacing

    if metadata:
        formatted_content.append("Metadata:")
        for key, value in metadata.items():
            formatted_content.append(f"{key}: {value}")

    return [types.TextContent(type="text", text="\n".join(formatted_content))]


def upsert_documents(
    records: list[PineconeRecord],
    pinecone_client: PineconeClient,
    namespace: str | None = None,
) -> Dict[str, Any]:
    """
    Upsert a list of Pinecone records into the knowledge base.
    """
    result = pinecone_client.upsert_records(records, namespace=namespace)
    return result


__all__ = [
    "register_tools",
]

```

--------------------------------------------------------------------------------
/src/mcp_pinecone/chunking.py:
--------------------------------------------------------------------------------

```python
"""
Smart document chunking with token awareness and recursive splitting.
Provides configurable text splitting strategies optimized for LLM context windows.
"""

from typing import List, Dict, Any, Optional
from pydantic import BaseModel, Field, model_validator
import tiktoken
import logging
from abc import ABC, abstractmethod

logger = logging.getLogger("smart_chunker")


class ChunkingError(Exception):
    """Base exception for chunking errors"""

    pass


class Chunk(BaseModel):
    """Represents a document chunk with metadata"""

    id: str
    content: str
    metadata: Dict[str, Any]

    def to_dict(self) -> dict:
        """Convert to dictionary format for embed-document"""
        return {"id": self.id, "content": self.content, "metadata": self.metadata}


class ChunkingConfig(BaseModel):
    """Configuration for chunking behavior"""

    target_tokens: int = Field(
        default=512,
        description="Target chunk size in tokens",
        gt=0,  # Must be positive
    )
    max_tokens: int = Field(
        default=1000,
        description="Maximum allowed tokens per chunk",
        gt=0,
    )
    overlap_tokens: int = Field(
        default=50,
        description="Number of tokens to overlap",
        ge=0,
    )
    tokenizer_model: str = Field(
        default="cl100k_base", description="Tokenizer model to use"
    )

    # Separators in priority order
    separators: List[str] = Field(
        default=[
            "\n\n",  # Paragraphs
            "\n",  # Lines
            ". ",  # Sentences
            "? ",  # Questions
            "! ",  # Exclamations
            ", ",  # Clauses
            " ",  # Words
            "",  # Characters
        ],
        description="Separators in order of preference",
    )

    @model_validator(mode="after")
    def validate_tokens(self):
        """Ensure overlap tokens are less than target tokens"""
        if self.overlap_tokens >= self.target_tokens:
            raise ValueError("overlap_tokens must be less than target_tokens")
        if self.max_tokens < self.target_tokens:
            raise ValueError(
                "max_tokens must be greater than or equal to target_tokens"
            )
        return self


class BaseChunker(ABC):
    """
    Abstract base for all chunking strategies.
    We can add more chunking strategies here as we learn more approaches for certain document types.
    """

    @abstractmethod
    def chunk_document(
        self, document_id: str, content: str, metadata: Dict[str, Any]
    ) -> List[Chunk]:
        pass


class SmartChunker(BaseChunker):
    """
    Intelligent chunking implementation that combines:
    - Token awareness
    - Recursive splitting
    - Smart overlap handling
    - Configurable behavior
    This is inspired by approaches highlighted in https://js.langchain.com/docs/concepts/text_splitters/
    In order to keep dependencies minimal, we're not using LangChain here.
    Just taking inspiration from their approaches.
    """

    def __init__(self, config: Optional[ChunkingConfig] = None):
        self.config = config or ChunkingConfig()
        self.tokenizer = tiktoken.get_encoding(self.config.tokenizer_model)

    def count_tokens(self, text: str) -> int:
        """
        Get exact token count for text
        """
        return len(self.tokenizer.encode(text))

    def create_chunk(
        self,
        document_id: str,
        content: str,
        chunk_number: int,
        total_chunks: int,
        base_metadata: Dict[str, Any],
    ) -> Chunk:
        """Create a chunk with complete metadata"""
        token_count = self.count_tokens(content)

        metadata = {
            "document_id": document_id,
            "chunk_number": chunk_number,
            "total_chunks": total_chunks,
            "token_count": token_count,
            "char_count": len(content),
            "chunk_type": "smart",
            **base_metadata,
        }

        return Chunk(
            id=f"{document_id}#chunk{chunk_number}",
            content=content.strip(),
            metadata=metadata,
        )

    def chunk_document(
        self, document_id: str, content: str, metadata: Dict[str, Any]
    ) -> List[Chunk]:
        """
        Chunk document with intelligent boundary detection and token awareness
        This works by recursively splitting the document into chunks with overlap
        and then trying to find the best boundaries using progressively smaller separators
        """
        if not content or not content.strip():
            raise ChunkingError("Cannot chunk empty content")
        if not document_id:
            raise ChunkingError("Document ID is required")
        try:
            # Get initial splits
            chunks = self._split_with_overlap(
                content,
                self.config.separators,
                self.config.target_tokens,
                self.config.overlap_tokens,
            )

            # Convert to chunk objects with metadata
            processed_chunks = []
            for i, text in enumerate(chunks, 1):
                chunk = self.create_chunk(
                    document_id=document_id,
                    content=text,
                    chunk_number=i,
                    total_chunks=len(chunks),
                    base_metadata=metadata,
                )
                processed_chunks.append(chunk)

            # Log stats
            total_tokens = sum(c.metadata["token_count"] for c in processed_chunks)
            avg_tokens = total_tokens / len(processed_chunks)
            logger.info(
                f"Split document {document_id} into {len(processed_chunks)} chunks. "
                f"Average tokens per chunk: {avg_tokens:.0f}"
            )

            return processed_chunks

        except Exception as e:
            raise ChunkingError(f"Error chunking document {document_id}: {e}")

    def _split_with_overlap(
        self, text: str, separators: List[str], target_tokens: int, overlap_tokens: int
    ) -> List[str]:
        """
        Split text recursively while handling overlap

        Args:
            text: The text to split
            separators: List of separators to try, in order of preference
            target_tokens: Target number of tokens per chunk
            overlap_tokens: Number of tokens to overlap between chunks

        Returns:
            List of text chunks with overlap

        Raises:
            ChunkingError: If text cannot be split into chunks
        """

        # Base case - text is small enough
        text_tokens = self.count_tokens(text)
        if text_tokens <= target_tokens:
            return [text]

        # Try each separator in order
        for separator in separators:
            splits = text.split(separator)

            # Skip if separator doesn't help
            if len(splits) == 1:
                continue

            # Process splits with overlap
            chunks = []
            current_chunk = []
            current_tokens = 0

            for split in splits:
                split_tokens = self.count_tokens(split)

                # Check if adding split would exceed target
                if current_tokens + split_tokens > target_tokens and current_chunk:
                    # Add current chunk
                    chunks.append(separator.join(current_chunk))

                    # Start new chunk with overlap
                    overlap_tokens_remaining = overlap_tokens
                    current_chunk = []

                    # Add previous splits until we hit overlap target
                    prev_splits = current_chunk.copy()
                    current_chunk = []
                    for prev_split in reversed(prev_splits):
                        prev_tokens = self.count_tokens(prev_split)
                        if overlap_tokens_remaining - prev_tokens < 0:
                            break
                        current_chunk.insert(0, prev_split)
                        overlap_tokens_remaining -= prev_tokens

                    current_tokens = self.count_tokens(separator.join(current_chunk))

                current_chunk.append(split)
                current_tokens += split_tokens

            # Add final chunk
            if current_chunk:
                chunks.append(separator.join(current_chunk))

            # If we found valid splits, return them
            if chunks:
                return chunks

        # If no good splits found, fall back to token boundary
        return self._split_by_tokens(text, target_tokens, overlap_tokens)

    def _split_by_tokens(
        self, text: str, target_tokens: int, overlap_tokens: int
    ) -> List[str]:
        """
        Split on token boundaries as a last resort
        This is a simple approach that splits the document into chunks of the target size
        with an overlap of the overlap size.
        """
        tokens = self.tokenizer.encode(text)
        chunks = []

        for i in range(0, len(tokens), target_tokens - overlap_tokens):
            chunk_tokens = tokens[i : i + target_tokens]
            chunk_text = self.tokenizer.decode(chunk_tokens)
            chunks.append(chunk_text)

        return chunks


# Factory for creating chunkers
def create_chunker(
    chunk_type: str = "smart", config: Optional[ChunkingConfig] = None
) -> BaseChunker:
    """Create appropriate chunker based on type"""
    chunkers = {"smart": lambda: SmartChunker(config)}

    if chunk_type not in chunkers:
        raise ValueError(f"Unknown chunker type: {chunk_type}")

    return chunkers[chunk_type]()


__all__ = [
    "Chunk",
    "ChunkingConfig",
    "BaseChunker",
    "SmartChunker",
    "create_chunker",
]

```

--------------------------------------------------------------------------------
/src/mcp_pinecone/pinecone.py:
--------------------------------------------------------------------------------

```python
from pinecone import Pinecone, ServerlessSpec, FetchResponse, UpsertResponse
from typing import List, Dict, Any, Optional, Union

from pydantic import BaseModel
from .constants import (
    INFERENCE_DIMENSION,
    PINECONE_INDEX_NAME,
    PINECONE_API_KEY,
    INFERENCE_MODEL,
)
from dotenv import load_dotenv
import logging

load_dotenv()

logger = logging.getLogger(__name__)


# Pydantic moddel for a Pinecone record
class PineconeRecord(BaseModel):
    """
    Represents a record in Pinecone
    """

    id: str
    embedding: List[float]
    text: str
    metadata: Dict[str, Any]

    def to_dict(self) -> dict:
        """
        Convert to dictionary format for JSON serialization
        """
        return {
            "id": self.id,
            "embedding": self.embedding,
            "text": self.text,
            "metadata": self.metadata,
        }


class PineconeClient:
    """
    A client for interacting with Pinecone.
    """

    def __init__(self):
        self.pc = Pinecone(api_key=PINECONE_API_KEY)
        # Initialize index after checking/creating
        self.ensure_index_exists()
        desc = self.pc.describe_index(PINECONE_INDEX_NAME)
        self.index = self.pc.Index(
            name=PINECONE_INDEX_NAME,
            host=desc.host,  # Get the proper host from the index description
        )

    def ensure_index_exists(self):
        """
        Check if index exists, create if it doesn't.
        """
        try:
            indexes = self.pc.list_indexes()

            exists = any(index["name"] == PINECONE_INDEX_NAME for index in indexes)
            if exists:
                logger.warning(f"Index {PINECONE_INDEX_NAME} already exists")
                return

            self.create_index()

        except Exception as e:
            logger.error(f"Error checking/creating index: {e}")
            raise

    def create_index(self):
        """
        Create a serverless index with integrated inference.
        """
        try:
            return self.pc.create_index(
                name=PINECONE_INDEX_NAME,
                dimension=INFERENCE_DIMENSION,
                metric="cosine",
                deletion_protection="disabled",  # Consider enabling for production
                spec=ServerlessSpec(cloud="aws", region="us-east-1"),
            )
        except Exception as e:
            logger.error(f"Failed to create index: {e}")
            raise

    def generate_embeddings(self, text: str) -> List[float]:
        """
        Generate embeddings for a given text using Pinecone Inference API.

        Parameters:
            text: The text to generate embeddings for.

        Returns:
            List[float]: The embeddings for the text.
        """
        response = self.pc.inference.embed(
            model=INFERENCE_MODEL,
            inputs=[text],
            parameters={"input_type": "passage", "truncate": "END"},
        )
        # if the response is empty, raise an error
        if not response.data:
            raise ValueError(f"Failed to generate embeddings for text: {text}")
        return response.data[0].values

    def upsert_records(
        self,
        records: List[PineconeRecord],
        namespace: Optional[str] = None,
    ) -> UpsertResponse:
        """
        Upsert records into the Pinecone index.

        Parameters:
            records: List of records to upsert.
            namespace: Optional namespace to upsert into.

        Returns:
            Dict[str, Any]: The response from Pinecone.
        """
        try:
            vectors = []
            for record in records:
                # Don't continue if there's no vector values
                if not record.embedding:
                    continue

                vector_values = record.embedding
                raw_text = record.text
                record_id = record.id
                metadata = record.metadata

                logger.info(f"Record: {metadata}")

                # Add raw text to metadata
                metadata["text"] = raw_text
                vectors.append((record_id, vector_values, metadata))

            return self.index.upsert(vectors=vectors, namespace=namespace)

        except Exception as e:
            logger.error(f"Error upserting records: {e}")
            raise

    def search_records(
        self,
        query: Union[str, List[float]],
        top_k: int = 10,
        namespace: Optional[str] = None,
        filter: Optional[Dict] = None,
        include_metadata: bool = True,
    ) -> Dict[str, Any]:
        """
        Search records using integrated inference.

        Parameters:
            query: The query to search for.
            top_k: The number of results to return.
            namespace: Optional namespace to search in.
            filter: Optional filter to apply to the search.
            include_metadata: Whether to include metadata in the search results.

        Returns:
            Dict[str, Any]: The search results from Pinecone.
        """
        try:
            # If query is text, use our custom function to get embeddings
            if isinstance(query, str):
                vector = self.generate_embeddings(query)
            else:
                vector = query

            return self.index.query(
                vector=vector,
                top_k=top_k,
                namespace=namespace,
                include_metadata=include_metadata,
                filter=filter,
            )
        except Exception as e:
            logger.error(f"Error searching records: {e}")
            raise

    def stats(self) -> Dict[str, Any]:
        """
        Get detailed statistics about the index including:
        - Total vector count
        - Index dimension
        - Index fullness
        - Namespace-specific statistics

        Returns:
            Dict[str, Any]: A dictionary containing:
                - namespaces: Dict mapping namespace names to their statistics
                - dimension: Dimension of the indexed vectors
                - index_fullness: Fullness of the index (0-1 scale)
                - total_vector_count: Total number of vectors across all namespaces

        """
        try:
            stats = self.index.describe_index_stats()
            # Convert namespaces to dict - each NamespaceSummary needs to be converted to dict
            namespaces_dict = {}
            for ns_name, ns_summary in stats.namespaces.items():
                namespaces_dict[ns_name] = {
                    "vector_count": ns_summary.vector_count,
                }

            return {
                "namespaces": namespaces_dict,
                "dimension": stats.dimension,
                "index_fullness": stats.index_fullness,
                "total_vector_count": stats.total_vector_count,
            }
        except Exception as e:
            logger.error(f"Error getting stats: {e}")
            raise

    def delete_records(
        self, ids: List[str], namespace: Optional[str] = None
    ) -> Dict[str, Any]:
        """
        Delete records by ID

        Parameters:
            ids: List of record IDs to delete
            namespace: Optional namespace to delete from
        """
        try:
            return self.index.delete(ids=ids, namespace=namespace)
        except Exception as e:
            logger.error(f"Error deleting records: {e}")
            raise

    def fetch_records(
        self, ids: List[str], namespace: Optional[str] = None
    ) -> FetchResponse:
        """
        Fetch specific records by ID

        Parameters:
            ids: List of record IDs to fetch
            namespace: Optional namespace to fetch from

        Returns:
            FetchResponse: The response from Pinecone.

        Raises:
            Exception: If there is an error fetching the records.
        """
        try:
            return self.index.fetch(ids=ids, namespace=namespace)
        except Exception as e:
            logger.error(f"Error fetching records: {e}")
            raise

    def list_records(
        self,
        prefix: Optional[str] = None,
        limit: int = 100,
        namespace: Optional[str] = None,
    ) -> Dict[str, Any]:
        """
        List records in the index using pagination.

        Parameters:
            prefix: Optional prefix to filter records by.
            limit: The number of records to return per page.
            namespace: Optional namespace to list records from.
        """
        try:
            # Using list_paginated for single-page results
            response = self.index.list_paginated(
                prefix=prefix, limit=limit, namespace=namespace
            )

            # Check if response is None
            if response is None:
                logger.error("Received None response from Pinecone list_paginated")
                return {"vectors": [], "namespace": namespace, "pagination_token": None}

            # Handle the case where vectors might be None
            vectors = response.vectors if hasattr(response, "vectors") else []

            return {
                "vectors": [
                    {
                        "id": getattr(v, "id", None),
                        "metadata": getattr(v, "metadata", {}),
                    }
                    for v in vectors
                ],
                "namespace": getattr(response, "namespace", namespace),
                "pagination_token": getattr(response.pagination, "next", None)
                if hasattr(response, "pagination")
                else None,
            }
        except Exception as e:
            logger.error(f"Error listing records: {e}")
            # Return empty result instead of raising
            return {"vectors": [], "namespace": namespace, "pagination_token": None}

```