# Directory Structure
```
├── _config.yml
├── .env.example
├── .gitignore
├── .python-version
├── .vscode
│ └── settings.json
├── CHANGELOG.md
├── database
│ └── .gitkeep
├── Dockerfile
├── LICENSE
├── Makefile
├── pyproject.toml
├── README.md
├── smithery.yaml
├── src
│ └── mcp_pinecone
│ ├── __init__.py
│ ├── chunking.py
│ ├── constants.py
│ ├── pinecone.py
│ ├── prompts.py
│ ├── server.py
│ ├── tools.py
│ └── utils.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/database/.gitkeep:
--------------------------------------------------------------------------------
```
```
--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------
```
3.12
```
--------------------------------------------------------------------------------
/.env.example:
--------------------------------------------------------------------------------
```
PINECONE_API_KEY=
PINECONE_INDEX_NAME=
```
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
# Python-generated files
__pycache__/
*.py[oc]
build/
dist/
wheels/
*.egg-info
# Mac OS
.DS_Store
# Virtual environments
.venv
# Apple Notes database for development
database/*
!database/.gitkeep
# Environment variables
.env
```
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
```markdown
# Pinecone Model Context Protocol Server for Claude Desktop.
[](https://smithery.ai/server/mcp-pinecone)
[](https://pypi.org/project/mcp-pinecone/)
Read and write to a Pinecone index.
## Components
```mermaid
flowchart TB
subgraph Client["MCP Client (e.g., Claude Desktop)"]
UI[User Interface]
end
subgraph MCPServer["MCP Server (pinecone-mcp)"]
Server[Server Class]
subgraph Handlers["Request Handlers"]
ListRes[list_resources]
ReadRes[read_resource]
ListTools[list_tools]
CallTool[call_tool]
GetPrompt[get_prompt]
ListPrompts[list_prompts]
end
subgraph Tools["Implemented Tools"]
SemSearch[semantic-search]
ReadDoc[read-document]
ListDocs[list-documents]
PineconeStats[pinecone-stats]
ProcessDoc[process-document]
end
end
subgraph PineconeService["Pinecone Service"]
PC[Pinecone Client]
subgraph PineconeFunctions["Pinecone Operations"]
Search[search_records]
Upsert[upsert_records]
Fetch[fetch_records]
List[list_records]
Embed[generate_embeddings]
end
Index[(Pinecone Index)]
end
%% Connections
UI --> Server
Server --> Handlers
ListTools --> Tools
CallTool --> Tools
Tools --> PC
PC --> PineconeFunctions
PineconeFunctions --> Index
%% Data flow for semantic search
SemSearch --> Search
Search --> Embed
Embed --> Index
%% Data flow for document operations
UpsertDoc --> Upsert
ReadDoc --> Fetch
ListRes --> List
classDef primary fill:#2563eb,stroke:#1d4ed8,color:white
classDef secondary fill:#4b5563,stroke:#374151,color:white
classDef storage fill:#059669,stroke:#047857,color:white
class Server,PC primary
class Tools,Handlers secondary
class Index storage
```
### Resources
The server implements the ability to read and write to a Pinecone index.
### Tools
- `semantic-search`: Search for records in the Pinecone index.
- `read-document`: Read a document from the Pinecone index.
- `list-documents`: List all documents in the Pinecone index.
- `pinecone-stats`: Get stats about the Pinecone index, including the number of records, dimensions, and namespaces.
- `process-document`: Process a document into chunks and upsert them into the Pinecone index. This performs the overall steps of chunking, embedding, and upserting.
Note: embeddings are generated via Pinecone's inference API and chunking is done with a token-based chunker. Written by copying a lot from langchain and debugging with Claude.
## Quickstart
### Installing via Smithery
To install Pinecone MCP Server for Claude Desktop automatically via [Smithery](https://smithery.ai/server/mcp-pinecone):
```bash
npx -y @smithery/cli install mcp-pinecone --client claude
```
### Install the server
Recommend using [uv](https://docs.astral.sh/uv/getting-started/installation/) to install the server locally for Claude.
```
uvx install mcp-pinecone
```
OR
```
uv pip install mcp-pinecone
```
Add your config as described below.
#### Claude Desktop
On MacOS: `~/Library/Application\ Support/Claude/claude_desktop_config.json`
On Windows: `%APPDATA%/Claude/claude_desktop_config.json`
Note: You might need to use the direct path to `uv`. Use `which uv` to find the path.
__Development/Unpublished Servers Configuration__
```json
"mcpServers": {
"mcp-pinecone": {
"command": "uv",
"args": [
"--directory",
"{project_dir}",
"run",
"mcp-pinecone"
]
}
}
```
__Published Servers Configuration__
```json
"mcpServers": {
"mcp-pinecone": {
"command": "uvx",
"args": [
"--index-name",
"{your-index-name}",
"--api-key",
"{your-secret-api-key}",
"mcp-pinecone"
]
}
}
```
#### Sign up to Pinecone
You can sign up for a Pinecone account [here](https://www.pinecone.io/).
#### Get an API key
Create a new index in Pinecone, replacing `{your-index-name}` and get an API key from the Pinecone dashboard, replacing `{your-secret-api-key}` in the config.
## Development
### Building and Publishing
To prepare the package for distribution:
1. Sync dependencies and update lockfile:
```bash
uv sync
```
2. Build package distributions:
```bash
uv build
```
This will create source and wheel distributions in the `dist/` directory.
3. Publish to PyPI:
```bash
uv publish
```
Note: You'll need to set PyPI credentials via environment variables or command flags:
- Token: `--token` or `UV_PUBLISH_TOKEN`
- Or username/password: `--username`/`UV_PUBLISH_USERNAME` and `--password`/`UV_PUBLISH_PASSWORD`
### Debugging
Since MCP servers run over stdio, debugging can be challenging. For the best debugging
experience, we strongly recommend using the [MCP Inspector](https://github.com/modelcontextprotocol/inspector).
You can launch the MCP Inspector via [`npm`](https://docs.npmjs.com/downloading-and-installing-node-js-and-npm) with this command:
```bash
npx @modelcontextprotocol/inspector uv --directory {project_dir} run mcp-pinecone
```
Upon launching, the Inspector will display a URL that you can access in your browser to begin debugging.
## License
This project is licensed under the MIT License. See the [LICENSE](LICENSE) file for details.
## Source Code
The source code is available on [GitHub](https://github.com/sirmews/mcp-pinecone).
## Contributing
Send your ideas and feedback to me on [Bluesky](https://bsky.app/profile/perfectlycromulent.bsky.social) or by opening an issue.
```
--------------------------------------------------------------------------------
/_config.yml:
--------------------------------------------------------------------------------
```yaml
remote_theme: pages-themes/[email protected]
plugins:
- jekyll-remote-theme
```
--------------------------------------------------------------------------------
/.vscode/settings.json:
--------------------------------------------------------------------------------
```json
{
"[python]": {
"editor.formatOnSave": true,
"editor.defaultFormatter": "charliermarsh.ruff"
}
}
```
--------------------------------------------------------------------------------
/src/mcp_pinecone/__init__.py:
--------------------------------------------------------------------------------
```python
from . import server
import asyncio
def main():
asyncio.run(server.main())
# Optionally expose other important items at package level
__all__ = ["main", "server"]
```
--------------------------------------------------------------------------------
/src/mcp_pinecone/utils.py:
--------------------------------------------------------------------------------
```python
class MCPToolError(Exception):
"""Custom exception for MCP tool errors"""
def __init__(self, code: int, message: str):
self.code = code
self.message = message
super().__init__(message)
def is_valid_vector_uri(uri: str) -> bool:
"""
Validate vector URI format
Parameters:
uri: The URI to validate.
Returns:
bool: True if the URI is valid, False otherwise.s
"""
try:
if not uri.startswith("pinecone://vectors/"):
return False
vector_id = uri.split("/")[-1]
return bool(vector_id.strip()) # Ensure non-empty ID
except Exception:
return False
```
--------------------------------------------------------------------------------
/smithery.yaml:
--------------------------------------------------------------------------------
```yaml
# Smithery configuration file: https://smithery.ai/docs/config#smitheryyaml
startCommand:
type: stdio
configSchema:
# JSON Schema defining the configuration options for the MCP.
type: object
required:
- indexName
- apiKey
properties:
indexName:
type: string
description: The name of the Pinecone index.
apiKey:
type: string
description: The API key for accessing Pinecone.
commandFunction:
# A function that produces the CLI command to start the MCP on stdio.
|-
config => ({command: 'uv', args: ['run', 'mcp-pinecone', '--index-name', config.indexName, '--api-key', config.apiKey]})
```
--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------
```toml
[project]
name = "mcp-pinecone"
version = "0.1.8"
description = "Read and write to Pinecone from Claude Desktop with Model Context Protocol."
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"httpx>=0.28.0",
"jsonschema>=4.23.0",
"mcp>=1.0.0",
"pinecone>=5.4.1",
"python-dotenv>=1.0.1",
"tiktoken>=0.8.0",
]
classifiers = [
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
"Operating System :: MacOS",
]
[[project.authors]]
name = "Navishkar Rao"
email = "[email protected]"
[build-system]
requires = [ "hatchling",]
build-backend = "hatchling.build"
[project.scripts]
mcp-pinecone = "mcp_pinecone:main"
[tool.mcp-pinecone]
server_name = "mcp-pinecone"
[project.urls]
Homepage = "https://sirmews.github.io/mcp-pinecone/"
Issues = "https://github.com/sirmews/mcp-pinecone/issues"
```
--------------------------------------------------------------------------------
/Dockerfile:
--------------------------------------------------------------------------------
```dockerfile
# Generated by https://smithery.ai. See: https://smithery.ai/docs/config#dockerfile
# Use a Python image with uv pre-installed
FROM ghcr.io/astral-sh/uv:python3.12-bookworm-slim AS uv
# Set the working directory
WORKDIR /app
# Copy the project files to the working directory
ADD . /app
# Enable bytecode compilation
ENV UV_COMPILE_BYTECODE=1
# Copy from the cache instead of linking since it's a mounted volume
ENV UV_LINK_MODE=copy
# Sync the dependencies and lockfile
RUN --mount=type=cache,target=/root/.cache/uv --mount=type=bind,source=uv.lock,target=uv.lock --mount=type=bind,source=pyproject.toml,target=pyproject.toml uv sync --frozen --no-install-project --no-dev --no-editable
# Install the project
RUN --mount=type=cache,target=/root/.cache/uv uv sync --frozen --no-dev --no-editable
FROM python:3.12-slim-bookworm
WORKDIR /app
COPY --from=uv /root/.local /root/.local
COPY --from=uv --chown=app:app /app/.venv /app/.venv
# Place executables in the environment at the front of the path
ENV PATH="/app/.venv/bin:$PATH"
# Entry point for running the MCP server
ENTRYPOINT ["uv", "run", "mcp-pinecone"]
```
--------------------------------------------------------------------------------
/CHANGELOG.md:
--------------------------------------------------------------------------------
```markdown
# Changelog
All notable changes to the MCP-Pinecone project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [0.1.8] - 2025-01-04
### Added
- Added `pinecone-store` prompt to store documents in Pinecone
- Added `pinecone-stats` tool to get stats about the Pinecone index
### Changed
- Refactoring across the codebase to make it more modular and easier to extend
- Removed `embed-document` tool
- Removed `chunk-document` tool
## [0.1.7] - 2025-01-01
### Added
- Updated prompt to `pinecone-query` because Zed can't use tools.
## [0.1.6] - 2024-12-31
### Added
- Added `chunk_enabled` argument to `process-document` tool to enable/disable chunking. Defaults to false.
- Added `list-documents` tool to list all documents in a namespace
## [0.1.5] - 2024-12-29
### Added
- Added `process-document` tool to combine chunking, embedding, and upserting documents into Pinecone
- Added `chunk-document` tool to explicitly chunk documents into chunks
- Added `embed-document` tool to explicitly embed documents into Pinecone
- Mention Pinecone api in README
## [0.1.4] - 2024-12-20
### Added
- Added `langchain` dependency for chunking
- Auto chunk documents by markdown headers
## [0.1.3] - 2024-12-20
### Added
- Namespace support for all vector operations (search, read, upsert)
- Explicit namespace parameter in tool schemas
### Changed
- Updated MCP package to latest version
## [0.1.0 - 0.1.2]
### Added
- Initial public release
- Basic Pinecone integration with MCP
- Semantic search capabilities
- Document reading and writing
- Metadata support
```
--------------------------------------------------------------------------------
/src/mcp_pinecone/constants.py:
--------------------------------------------------------------------------------
```python
# Index name
import os
import argparse
from dotenv import load_dotenv
load_dotenv()
def get_pinecone_config():
parser = argparse.ArgumentParser(description="Pinecone MCP Configuration")
parser.add_argument(
"--index-name",
default=None,
help="Name of the Pinecone index to use. Will use environment variable PINECONE_INDEX_NAME if not provided.",
)
parser.add_argument(
"--api-key",
default=None,
help="API key for Pinecone. Will use environment variable PINECONE_API_KEY if not provided.",
)
args = parser.parse_args()
# Use command line arguments if provided, otherwise fall back to environment variables
index_name = args.index_name or os.getenv("PINECONE_INDEX_NAME")
api_key = args.api_key or os.getenv("PINECONE_API_KEY")
# Set default index name if none provided
if not index_name:
index_name = "mcp-pinecone-index"
print(f"No index name provided, using default: {index_name}")
# Validate API key
if not api_key:
raise ValueError(
"Pinecone API key is required. Provide it via --api-key argument or PINECONE_API_KEY environment variable"
)
return index_name, api_key
# Get configuration values
PINECONE_INDEX_NAME, PINECONE_API_KEY = get_pinecone_config()
# Validate configuration after loading
if not PINECONE_INDEX_NAME or not PINECONE_API_KEY:
raise ValueError(
"Missing required configuration. Ensure PINECONE_INDEX_NAME and PINECONE_API_KEY "
"are set either via environment variables or command line arguments."
)
# Inference API model name
INFERENCE_MODEL = "multilingual-e5-large"
# Inference API embedding dimension
INFERENCE_DIMENSION = 1024
# Export values for use in other modules
__all__ = [
"PINECONE_INDEX_NAME",
"PINECONE_API_KEY",
"INFERENCE_MODEL",
"INFERENCE_DIMENSION",
]
```
--------------------------------------------------------------------------------
/src/mcp_pinecone/server.py:
--------------------------------------------------------------------------------
```python
import logging
from typing import Union
from mcp.server.models import InitializationOptions
import mcp.types as types
from mcp.server import NotificationOptions, Server
from pydantic import AnyUrl
import mcp.server.stdio
from .pinecone import PineconeClient
from .tools import register_tools
from .prompts import register_prompts
import importlib.metadata
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("pinecone-mcp")
pinecone_client = None
server = Server("pinecone-mcp")
@server.list_resources()
async def handle_list_resources() -> list[types.Resource]:
try:
if pinecone_client is None:
logger.error("Pinecone client is not initialized")
return []
records = pinecone_client.list_records()
resources = []
for record in records.get("vectors", []):
# If metadata is None, use empty dict
metadata = record.get("metadata") or {}
description = (
metadata.get("text", "")[:100] + "..." if metadata.get("text") else ""
)
resources.append(
types.Resource(
uri=f"pinecone://vectors/{record['id']}",
name=metadata.get("title", f"Vector {record['id']}"),
description=description,
metadata=metadata,
mimeType=metadata.get("content_type", "text/plain"),
)
)
return resources
except Exception as e:
logger.error(f"Error listing resources: {e}")
return []
@server.read_resource()
async def handle_read_resource(uri: AnyUrl) -> Union[str, bytes]:
if not str(uri).startswith("pinecone://vectors/"):
raise ValueError(f"Unsupported URI scheme: {uri}")
try:
vector_id = str(uri).split("/")[-1]
record = pinecone_client.fetch_records([vector_id])
if not record or "records" not in record or not record["records"]:
raise ValueError(f"Vector not found: {vector_id}")
vector_data = record["records"][0]
metadata = vector_data.get("metadata", {})
content_type = metadata.get("content_type", "text/plain")
if content_type.startswith("text/"):
return format_text_content(vector_data)
else:
return format_binary_content(vector_data)
except Exception as e:
raise RuntimeError(f"Pinecone error: {str(e)}")
def format_text_content(vector_data: dict) -> str:
metadata = vector_data.get("metadata", {})
output = []
if "title" in metadata:
output.append(f"Title: {metadata['title']}")
output.append(f"ID: {vector_data.get('id')}")
for key, value in metadata.items():
if key not in ["title", "text", "content_type"]:
output.append(f"{key}: {value}")
output.append("")
if "text" in metadata:
output.append(metadata["text"])
return "\n".join(output)
def format_binary_content(vector_data: dict) -> bytes:
content = vector_data.get("metadata", {}).get("content", b"")
if isinstance(content, str):
content = content.encode("utf-8")
return content
async def main():
logger.info("Starting Pinecone MCP server")
global pinecone_client
pinecone_client = PineconeClient()
# Register tools and prompts
register_tools(server, pinecone_client)
register_prompts(server, pinecone_client)
async with mcp.server.stdio.stdio_server() as (read_stream, write_stream):
await server.run(
read_stream,
write_stream,
InitializationOptions(
server_name="pinecone-mcp",
server_version=importlib.metadata.version("mcp-pinecone"),
capabilities=server.get_capabilities(
notification_options=NotificationOptions(resources_changed=True),
experimental_capabilities={},
),
),
)
```
--------------------------------------------------------------------------------
/src/mcp_pinecone/prompts.py:
--------------------------------------------------------------------------------
```python
import logging
from enum import Enum
import mcp.types as types
from mcp.server import Server
from .pinecone import PineconeClient
from datetime import datetime
logger = logging.getLogger("pinecone-mcp")
class PromptName(str, Enum):
PINECONE_QUERY = "pinecone-query"
PINECONE_STORE = "pinecone-store"
ServerPrompts = [
types.Prompt(
name=PromptName.PINECONE_QUERY,
description="Search Pinecone index and construct an answer based on relevant pinecone documents",
arguments=[
types.PromptArgument(
name="query",
description="The question to answer, or the context to search for",
required=True,
)
],
),
types.Prompt(
name=PromptName.PINECONE_STORE,
description="Store content as document in Pinecone",
arguments=[
types.PromptArgument(
name="content",
description="The content to store as a Pinecone document",
required=True,
),
types.PromptArgument(
name="namespace",
description="The namespace to store the document in",
required=False,
),
],
),
]
def register_prompts(server: Server, pinecone_client: PineconeClient):
@server.list_prompts()
async def handle_list_prompts() -> list[types.Prompt]:
return ServerPrompts
@server.get_prompt()
async def handle_get_prompt(
name: str, arguments: dict[str, str] | None
) -> types.GetPromptResult:
try:
if name == PromptName.PINECONE_QUERY:
return pinecone_query(arguments, pinecone_client)
elif name == PromptName.PINECONE_STORE:
return pinecone_store(arguments, pinecone_client)
else:
raise ValueError(f"Unknown prompt: {name}")
except Exception as e:
logger.error(f"Error calling prompt {name}: {e}")
raise
def pinecone_store(
arguments: dict | None, pinecone_client: PineconeClient
) -> list[types.TextContent]:
"""
Store content as document in Pinecone
"""
content = arguments.get("content")
namespace = arguments.get("namespace")
metadata = {
"date": datetime.now().isoformat(),
}
if not content:
raise ValueError("Content required")
return types.GetPromptResult(
messages=[
types.PromptMessage(
role="user",
content=types.TextContent(
type="text",
text=f"The namespace is {namespace if namespace else 'not specified'}. \n"
"If the namespace is not specified, use pinecone-stats to find an appropriate namespace or use the default namespace.",
),
),
types.PromptMessage(
role="user",
content=types.TextContent(
type="text",
text=f"Based on the content, generate metadata that can be relevant to the content and used for filtering. \n"
"The metadata should be a dictionary with keys and values that are relevant to the content. \n"
f"Append the metdata to {metadata} \n",
),
),
types.PromptMessage(
role="user",
content=types.TextContent(
type="text",
text=f"Run the process-document tool with the content: {content} \n"
"Include generated metadata in the document. \n"
f"Store in the {namespace} if specified",
),
),
]
)
def pinecone_query(
arguments: dict | None, pinecone_client: PineconeClient
) -> list[types.TextContent]:
"""
Search Pinecone index and construct an answer based on relevant pinecone documents
"""
query = arguments.get("query")
if not query:
raise ValueError("Query required")
return types.GetPromptResult(
messages=[
types.PromptMessage(
role="user",
content=types.TextContent(
type="text",
text="First use pinecone-stats to get a list of namespaces that might contain relevant documents. Ignore if a namespace is specified in the query",
),
),
types.PromptMessage(
role="user",
content=types.TextContent(
type="text",
text=f"Do a semantic search for the query: {query} with the chosen namespace",
),
),
]
)
__all__ = [
"register_prompts",
]
```
--------------------------------------------------------------------------------
/src/mcp_pinecone/tools.py:
--------------------------------------------------------------------------------
```python
import json
import logging
from typing import Dict, Any, TypedDict
from enum import Enum
from typing import Union, Sequence
import mcp.types as types
from mcp.server import Server
from .pinecone import PineconeClient, PineconeRecord
from .utils import MCPToolError
from .chunking import create_chunker, Chunk
logger = logging.getLogger("pinecone-mcp")
class ToolName(str, Enum):
SEMANTIC_SEARCH = "semantic-search"
READ_DOCUMENT = "read-document"
PROCESS_DOCUMENT = "process-document"
LIST_DOCUMENTS = "list-documents"
PINECONE_STATS = "pinecone-stats"
ServerTools = [
types.Tool(
name=ToolName.SEMANTIC_SEARCH,
description="Search pinecone for documents",
inputSchema={
"type": "object",
"properties": {
"query": {"type": "string"},
"top_k": {"type": "integer", "default": 10},
"namespace": {
"type": "string",
"description": "Optional namespace to search in",
},
"category": {"type": "string"},
"tags": {"type": "array", "items": {"type": "string"}},
"date_range": {
"type": "object",
"properties": {
"start": {"type": "string", "format": "date"},
"end": {"type": "string", "format": "date"},
},
},
},
"required": ["query"],
},
),
types.Tool(
name=ToolName.READ_DOCUMENT,
description="Read a document from pinecone",
inputSchema={
"type": "object",
"properties": {
"document_id": {"type": "string"},
"namespace": {
"type": "string",
"description": "Optional namespace to read from",
},
},
"required": ["document_id"],
},
),
types.Tool(
name=ToolName.PROCESS_DOCUMENT,
description="Process a document. This will optionally chunk, then embed, and upsert the document into pinecone.",
inputSchema={
"type": "object",
"properties": {
"document_id": {"type": "string"},
"text": {"type": "string"},
"metadata": {"type": "object"},
"namespace": {
"type": "string",
"description": "Optional namespace to store the document in",
},
},
"required": ["document_id", "text", "metadata"],
},
),
types.Tool(
name=ToolName.LIST_DOCUMENTS,
description="List all documents in the knowledge base by namespace",
inputSchema={
"type": "object",
"properties": {
"namespace": {
"type": "string",
"description": "Namespace to list documents in",
}
},
"required": ["namespace"],
},
),
types.Tool(
name=ToolName.PINECONE_STATS,
description="Get stats about the Pinecone index specified in this server",
inputSchema={
"type": "object",
"properties": {},
"required": [],
},
),
]
def register_tools(server: Server, pinecone_client: PineconeClient):
@server.list_tools()
async def handle_list_tools() -> list[types.Tool]:
return ServerTools
@server.call_tool()
async def handle_call_tool(
name: str, arguments: dict | None
) -> Sequence[Union[types.TextContent, types.ImageContent, types.EmbeddedResource]]:
try:
if name == ToolName.SEMANTIC_SEARCH:
return semantic_search(arguments, pinecone_client)
if name == ToolName.PINECONE_STATS:
return pinecone_stats(pinecone_client)
if name == ToolName.READ_DOCUMENT:
return read_document(arguments, pinecone_client)
if name == ToolName.PROCESS_DOCUMENT:
return process_document(arguments, pinecone_client)
if name == ToolName.LIST_DOCUMENTS:
return list_documents(arguments, pinecone_client)
except Exception as e:
logger.error(f"Error calling tool {name}: {e}")
raise
def list_documents(
arguments: dict | None, pinecone_client: PineconeClient
) -> list[types.TextContent]:
"""
List all documents in the knowledge base by namespace
"""
namespace = arguments.get("namespace")
results = pinecone_client.list_records(namespace=namespace)
return [types.TextContent(type="text", text=json.dumps(results))]
def pinecone_stats(pinecone_client: PineconeClient) -> list[types.TextContent]:
"""
Get stats about the Pinecone index specified in this server
"""
stats = pinecone_client.stats()
return [types.TextContent(type="text", text=json.dumps(stats))]
def semantic_search(
arguments: dict | None, pinecone_client: PineconeClient
) -> list[types.TextContent]:
"""
Read a document from the pinecone knowledge base
"""
query = arguments.get("query")
top_k = arguments.get("top_k", 10)
filters = arguments.get("filters", {})
namespace = arguments.get("namespace")
results = pinecone_client.search_records(
query=query,
top_k=top_k,
filter=filters,
include_metadata=True,
namespace=namespace,
)
matches = results.get("matches", [])
# Format results with rich context
formatted_text = "Retrieved Contexts:\n\n"
for i, match in enumerate(matches, 1):
metadata = match.get("metadata", {})
formatted_text += f"Result {i} | Similarity: {match['score']:.3f} | Document ID: {match['id']}\n"
formatted_text += f"{metadata.get('text', '').strip()}\n"
formatted_text += "-" * 10 + "\n\n"
return [types.TextContent(type="text", text=formatted_text)]
def process_document(
arguments: dict | None, pinecone_client: PineconeClient
) -> list[types.TextContent]:
"""
Process a document by chunking, embedding, and upserting it into the knowledge base. Returns the document ID.
"""
document_id = arguments.get("document_id")
text = arguments.get("text")
namespace = arguments.get("namespace")
metadata = arguments.get("metadata", {})
chunker = create_chunker(chunk_type="smart")
chunks = chunker.chunk_document(document_id, text, metadata)
embed_result = embed_document(chunks, pinecone_client)
embedded_chunks = embed_result.get("embedded_chunks", None)
if embedded_chunks is None:
raise MCPToolError("No embedded chunks found")
upsert_documents(embedded_chunks, pinecone_client, namespace)
return [
types.TextContent(
type="text",
text=f"Successfully processed document. The document ID is {document_id}",
)
]
class EmbeddingResult(TypedDict):
embedded_chunks: list[PineconeRecord]
total_embedded: int
def embed_document(
chunks: list[Chunk], pinecone_client: PineconeClient
) -> EmbeddingResult:
"""
Embed a list of chunks.
Uses the Pinecone client to generate embeddings with the inference API.
"""
embedded_chunks = []
for chunk in chunks:
content = chunk.content
chunk_id = chunk.id
metadata = chunk.metadata
if not content or not chunk_id:
logger.warning(f"Skipping invalid chunk: {chunk}")
continue
embedding = pinecone_client.generate_embeddings(content)
record = PineconeRecord(
id=chunk_id,
embedding=embedding,
text=content,
metadata=metadata,
)
embedded_chunks.append(record)
return EmbeddingResult(
embedded_chunks=embedded_chunks,
total_embedded=len(embedded_chunks),
)
def read_document(
arguments: dict | None, pinecone_client: PineconeClient
) -> list[types.TextContent]:
"""
Read a single Pinecone document by ID
"""
document_id = arguments.get("document_id")
namespace = arguments.get("namespace")
if not document_id:
raise ValueError("document_id is required")
# Fetch the record using your existing fetch_records method
record = pinecone_client.fetch_records([document_id], namespace=namespace)
# Get the vector data for this document
vector = record.vectors.get(document_id)
if not vector:
raise ValueError(f"Document {document_id} not found")
# Get metadata from the vector
metadata = vector.metadata if hasattr(vector, "metadata") else {}
# Format the document content
formatted_content = []
formatted_content.append(f"Document ID: {document_id}")
formatted_content.append("") # Empty line for spacing
if metadata:
formatted_content.append("Metadata:")
for key, value in metadata.items():
formatted_content.append(f"{key}: {value}")
return [types.TextContent(type="text", text="\n".join(formatted_content))]
def upsert_documents(
records: list[PineconeRecord],
pinecone_client: PineconeClient,
namespace: str | None = None,
) -> Dict[str, Any]:
"""
Upsert a list of Pinecone records into the knowledge base.
"""
result = pinecone_client.upsert_records(records, namespace=namespace)
return result
__all__ = [
"register_tools",
]
```
--------------------------------------------------------------------------------
/src/mcp_pinecone/chunking.py:
--------------------------------------------------------------------------------
```python
"""
Smart document chunking with token awareness and recursive splitting.
Provides configurable text splitting strategies optimized for LLM context windows.
"""
from typing import List, Dict, Any, Optional
from pydantic import BaseModel, Field, model_validator
import tiktoken
import logging
from abc import ABC, abstractmethod
logger = logging.getLogger("smart_chunker")
class ChunkingError(Exception):
"""Base exception for chunking errors"""
pass
class Chunk(BaseModel):
"""Represents a document chunk with metadata"""
id: str
content: str
metadata: Dict[str, Any]
def to_dict(self) -> dict:
"""Convert to dictionary format for embed-document"""
return {"id": self.id, "content": self.content, "metadata": self.metadata}
class ChunkingConfig(BaseModel):
"""Configuration for chunking behavior"""
target_tokens: int = Field(
default=512,
description="Target chunk size in tokens",
gt=0, # Must be positive
)
max_tokens: int = Field(
default=1000,
description="Maximum allowed tokens per chunk",
gt=0,
)
overlap_tokens: int = Field(
default=50,
description="Number of tokens to overlap",
ge=0,
)
tokenizer_model: str = Field(
default="cl100k_base", description="Tokenizer model to use"
)
# Separators in priority order
separators: List[str] = Field(
default=[
"\n\n", # Paragraphs
"\n", # Lines
". ", # Sentences
"? ", # Questions
"! ", # Exclamations
", ", # Clauses
" ", # Words
"", # Characters
],
description="Separators in order of preference",
)
@model_validator(mode="after")
def validate_tokens(self):
"""Ensure overlap tokens are less than target tokens"""
if self.overlap_tokens >= self.target_tokens:
raise ValueError("overlap_tokens must be less than target_tokens")
if self.max_tokens < self.target_tokens:
raise ValueError(
"max_tokens must be greater than or equal to target_tokens"
)
return self
class BaseChunker(ABC):
"""
Abstract base for all chunking strategies.
We can add more chunking strategies here as we learn more approaches for certain document types.
"""
@abstractmethod
def chunk_document(
self, document_id: str, content: str, metadata: Dict[str, Any]
) -> List[Chunk]:
pass
class SmartChunker(BaseChunker):
"""
Intelligent chunking implementation that combines:
- Token awareness
- Recursive splitting
- Smart overlap handling
- Configurable behavior
This is inspired by approaches highlighted in https://js.langchain.com/docs/concepts/text_splitters/
In order to keep dependencies minimal, we're not using LangChain here.
Just taking inspiration from their approaches.
"""
def __init__(self, config: Optional[ChunkingConfig] = None):
self.config = config or ChunkingConfig()
self.tokenizer = tiktoken.get_encoding(self.config.tokenizer_model)
def count_tokens(self, text: str) -> int:
"""
Get exact token count for text
"""
return len(self.tokenizer.encode(text))
def create_chunk(
self,
document_id: str,
content: str,
chunk_number: int,
total_chunks: int,
base_metadata: Dict[str, Any],
) -> Chunk:
"""Create a chunk with complete metadata"""
token_count = self.count_tokens(content)
metadata = {
"document_id": document_id,
"chunk_number": chunk_number,
"total_chunks": total_chunks,
"token_count": token_count,
"char_count": len(content),
"chunk_type": "smart",
**base_metadata,
}
return Chunk(
id=f"{document_id}#chunk{chunk_number}",
content=content.strip(),
metadata=metadata,
)
def chunk_document(
self, document_id: str, content: str, metadata: Dict[str, Any]
) -> List[Chunk]:
"""
Chunk document with intelligent boundary detection and token awareness
This works by recursively splitting the document into chunks with overlap
and then trying to find the best boundaries using progressively smaller separators
"""
if not content or not content.strip():
raise ChunkingError("Cannot chunk empty content")
if not document_id:
raise ChunkingError("Document ID is required")
try:
# Get initial splits
chunks = self._split_with_overlap(
content,
self.config.separators,
self.config.target_tokens,
self.config.overlap_tokens,
)
# Convert to chunk objects with metadata
processed_chunks = []
for i, text in enumerate(chunks, 1):
chunk = self.create_chunk(
document_id=document_id,
content=text,
chunk_number=i,
total_chunks=len(chunks),
base_metadata=metadata,
)
processed_chunks.append(chunk)
# Log stats
total_tokens = sum(c.metadata["token_count"] for c in processed_chunks)
avg_tokens = total_tokens / len(processed_chunks)
logger.info(
f"Split document {document_id} into {len(processed_chunks)} chunks. "
f"Average tokens per chunk: {avg_tokens:.0f}"
)
return processed_chunks
except Exception as e:
raise ChunkingError(f"Error chunking document {document_id}: {e}")
def _split_with_overlap(
self, text: str, separators: List[str], target_tokens: int, overlap_tokens: int
) -> List[str]:
"""
Split text recursively while handling overlap
Args:
text: The text to split
separators: List of separators to try, in order of preference
target_tokens: Target number of tokens per chunk
overlap_tokens: Number of tokens to overlap between chunks
Returns:
List of text chunks with overlap
Raises:
ChunkingError: If text cannot be split into chunks
"""
# Base case - text is small enough
text_tokens = self.count_tokens(text)
if text_tokens <= target_tokens:
return [text]
# Try each separator in order
for separator in separators:
splits = text.split(separator)
# Skip if separator doesn't help
if len(splits) == 1:
continue
# Process splits with overlap
chunks = []
current_chunk = []
current_tokens = 0
for split in splits:
split_tokens = self.count_tokens(split)
# Check if adding split would exceed target
if current_tokens + split_tokens > target_tokens and current_chunk:
# Add current chunk
chunks.append(separator.join(current_chunk))
# Start new chunk with overlap
overlap_tokens_remaining = overlap_tokens
current_chunk = []
# Add previous splits until we hit overlap target
prev_splits = current_chunk.copy()
current_chunk = []
for prev_split in reversed(prev_splits):
prev_tokens = self.count_tokens(prev_split)
if overlap_tokens_remaining - prev_tokens < 0:
break
current_chunk.insert(0, prev_split)
overlap_tokens_remaining -= prev_tokens
current_tokens = self.count_tokens(separator.join(current_chunk))
current_chunk.append(split)
current_tokens += split_tokens
# Add final chunk
if current_chunk:
chunks.append(separator.join(current_chunk))
# If we found valid splits, return them
if chunks:
return chunks
# If no good splits found, fall back to token boundary
return self._split_by_tokens(text, target_tokens, overlap_tokens)
def _split_by_tokens(
self, text: str, target_tokens: int, overlap_tokens: int
) -> List[str]:
"""
Split on token boundaries as a last resort
This is a simple approach that splits the document into chunks of the target size
with an overlap of the overlap size.
"""
tokens = self.tokenizer.encode(text)
chunks = []
for i in range(0, len(tokens), target_tokens - overlap_tokens):
chunk_tokens = tokens[i : i + target_tokens]
chunk_text = self.tokenizer.decode(chunk_tokens)
chunks.append(chunk_text)
return chunks
# Factory for creating chunkers
def create_chunker(
chunk_type: str = "smart", config: Optional[ChunkingConfig] = None
) -> BaseChunker:
"""Create appropriate chunker based on type"""
chunkers = {"smart": lambda: SmartChunker(config)}
if chunk_type not in chunkers:
raise ValueError(f"Unknown chunker type: {chunk_type}")
return chunkers[chunk_type]()
__all__ = [
"Chunk",
"ChunkingConfig",
"BaseChunker",
"SmartChunker",
"create_chunker",
]
```
--------------------------------------------------------------------------------
/src/mcp_pinecone/pinecone.py:
--------------------------------------------------------------------------------
```python
from pinecone import Pinecone, ServerlessSpec, FetchResponse, UpsertResponse
from typing import List, Dict, Any, Optional, Union
from pydantic import BaseModel
from .constants import (
INFERENCE_DIMENSION,
PINECONE_INDEX_NAME,
PINECONE_API_KEY,
INFERENCE_MODEL,
)
from dotenv import load_dotenv
import logging
load_dotenv()
logger = logging.getLogger(__name__)
# Pydantic moddel for a Pinecone record
class PineconeRecord(BaseModel):
"""
Represents a record in Pinecone
"""
id: str
embedding: List[float]
text: str
metadata: Dict[str, Any]
def to_dict(self) -> dict:
"""
Convert to dictionary format for JSON serialization
"""
return {
"id": self.id,
"embedding": self.embedding,
"text": self.text,
"metadata": self.metadata,
}
class PineconeClient:
"""
A client for interacting with Pinecone.
"""
def __init__(self):
self.pc = Pinecone(api_key=PINECONE_API_KEY)
# Initialize index after checking/creating
self.ensure_index_exists()
desc = self.pc.describe_index(PINECONE_INDEX_NAME)
self.index = self.pc.Index(
name=PINECONE_INDEX_NAME,
host=desc.host, # Get the proper host from the index description
)
def ensure_index_exists(self):
"""
Check if index exists, create if it doesn't.
"""
try:
indexes = self.pc.list_indexes()
exists = any(index["name"] == PINECONE_INDEX_NAME for index in indexes)
if exists:
logger.warning(f"Index {PINECONE_INDEX_NAME} already exists")
return
self.create_index()
except Exception as e:
logger.error(f"Error checking/creating index: {e}")
raise
def create_index(self):
"""
Create a serverless index with integrated inference.
"""
try:
return self.pc.create_index(
name=PINECONE_INDEX_NAME,
dimension=INFERENCE_DIMENSION,
metric="cosine",
deletion_protection="disabled", # Consider enabling for production
spec=ServerlessSpec(cloud="aws", region="us-east-1"),
)
except Exception as e:
logger.error(f"Failed to create index: {e}")
raise
def generate_embeddings(self, text: str) -> List[float]:
"""
Generate embeddings for a given text using Pinecone Inference API.
Parameters:
text: The text to generate embeddings for.
Returns:
List[float]: The embeddings for the text.
"""
response = self.pc.inference.embed(
model=INFERENCE_MODEL,
inputs=[text],
parameters={"input_type": "passage", "truncate": "END"},
)
# if the response is empty, raise an error
if not response.data:
raise ValueError(f"Failed to generate embeddings for text: {text}")
return response.data[0].values
def upsert_records(
self,
records: List[PineconeRecord],
namespace: Optional[str] = None,
) -> UpsertResponse:
"""
Upsert records into the Pinecone index.
Parameters:
records: List of records to upsert.
namespace: Optional namespace to upsert into.
Returns:
Dict[str, Any]: The response from Pinecone.
"""
try:
vectors = []
for record in records:
# Don't continue if there's no vector values
if not record.embedding:
continue
vector_values = record.embedding
raw_text = record.text
record_id = record.id
metadata = record.metadata
logger.info(f"Record: {metadata}")
# Add raw text to metadata
metadata["text"] = raw_text
vectors.append((record_id, vector_values, metadata))
return self.index.upsert(vectors=vectors, namespace=namespace)
except Exception as e:
logger.error(f"Error upserting records: {e}")
raise
def search_records(
self,
query: Union[str, List[float]],
top_k: int = 10,
namespace: Optional[str] = None,
filter: Optional[Dict] = None,
include_metadata: bool = True,
) -> Dict[str, Any]:
"""
Search records using integrated inference.
Parameters:
query: The query to search for.
top_k: The number of results to return.
namespace: Optional namespace to search in.
filter: Optional filter to apply to the search.
include_metadata: Whether to include metadata in the search results.
Returns:
Dict[str, Any]: The search results from Pinecone.
"""
try:
# If query is text, use our custom function to get embeddings
if isinstance(query, str):
vector = self.generate_embeddings(query)
else:
vector = query
return self.index.query(
vector=vector,
top_k=top_k,
namespace=namespace,
include_metadata=include_metadata,
filter=filter,
)
except Exception as e:
logger.error(f"Error searching records: {e}")
raise
def stats(self) -> Dict[str, Any]:
"""
Get detailed statistics about the index including:
- Total vector count
- Index dimension
- Index fullness
- Namespace-specific statistics
Returns:
Dict[str, Any]: A dictionary containing:
- namespaces: Dict mapping namespace names to their statistics
- dimension: Dimension of the indexed vectors
- index_fullness: Fullness of the index (0-1 scale)
- total_vector_count: Total number of vectors across all namespaces
"""
try:
stats = self.index.describe_index_stats()
# Convert namespaces to dict - each NamespaceSummary needs to be converted to dict
namespaces_dict = {}
for ns_name, ns_summary in stats.namespaces.items():
namespaces_dict[ns_name] = {
"vector_count": ns_summary.vector_count,
}
return {
"namespaces": namespaces_dict,
"dimension": stats.dimension,
"index_fullness": stats.index_fullness,
"total_vector_count": stats.total_vector_count,
}
except Exception as e:
logger.error(f"Error getting stats: {e}")
raise
def delete_records(
self, ids: List[str], namespace: Optional[str] = None
) -> Dict[str, Any]:
"""
Delete records by ID
Parameters:
ids: List of record IDs to delete
namespace: Optional namespace to delete from
"""
try:
return self.index.delete(ids=ids, namespace=namespace)
except Exception as e:
logger.error(f"Error deleting records: {e}")
raise
def fetch_records(
self, ids: List[str], namespace: Optional[str] = None
) -> FetchResponse:
"""
Fetch specific records by ID
Parameters:
ids: List of record IDs to fetch
namespace: Optional namespace to fetch from
Returns:
FetchResponse: The response from Pinecone.
Raises:
Exception: If there is an error fetching the records.
"""
try:
return self.index.fetch(ids=ids, namespace=namespace)
except Exception as e:
logger.error(f"Error fetching records: {e}")
raise
def list_records(
self,
prefix: Optional[str] = None,
limit: int = 100,
namespace: Optional[str] = None,
) -> Dict[str, Any]:
"""
List records in the index using pagination.
Parameters:
prefix: Optional prefix to filter records by.
limit: The number of records to return per page.
namespace: Optional namespace to list records from.
"""
try:
# Using list_paginated for single-page results
response = self.index.list_paginated(
prefix=prefix, limit=limit, namespace=namespace
)
# Check if response is None
if response is None:
logger.error("Received None response from Pinecone list_paginated")
return {"vectors": [], "namespace": namespace, "pagination_token": None}
# Handle the case where vectors might be None
vectors = response.vectors if hasattr(response, "vectors") else []
return {
"vectors": [
{
"id": getattr(v, "id", None),
"metadata": getattr(v, "metadata", {}),
}
for v in vectors
],
"namespace": getattr(response, "namespace", namespace),
"pagination_token": getattr(response.pagination, "next", None)
if hasattr(response, "pagination")
else None,
}
except Exception as e:
logger.error(f"Error listing records: {e}")
# Return empty result instead of raising
return {"vectors": [], "namespace": namespace, "pagination_token": None}
```