# Directory Structure
```
├── .clinerules
├── .coverage
├── .editorconfig
├── .env.example
├── .github
│ └── workflows
│ ├── ci.yml
│ ├── pypi-publish.yml
│ └── test.yml
├── .gitignore
├── .pre-commit-config.yaml
├── .python-version
├── CHANGELOG.md
├── CONTRIBUTING.md
├── LICENSE
├── main.py
├── memory_mcp_server
│ ├── __init__.py
│ ├── backends
│ │ ├── __init__.py
│ │ ├── base.py
│ │ └── jsonl.py
│ ├── exceptions.py
│ ├── interfaces.py
│ ├── knowledge_graph_manager.py
│ └── validation.py
├── pyproject.toml
├── README.md
├── requirements.txt
├── scripts
│ └── README.md
├── tests
│ ├── conftest.py
│ ├── test_backends
│ │ ├── conftest.py
│ │ └── test_jsonl.py
│ ├── test_interfaces.py
│ ├── test_knowledge_graph_manager.py
│ ├── test_server.py
│ └── test_validation.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/.python-version:
--------------------------------------------------------------------------------
```
3.12
```
--------------------------------------------------------------------------------
/.gitignore:
--------------------------------------------------------------------------------
```
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
# Virtual Environment
.env
.venv
venv/
ENV/
# IDE
.idea/
.vscode/
*.swp
*.swo
# OS
.DS_Store
Thumbs.db
# Project specific
*.db
*.sqlite3
*.log
.aider*
cline_docs
.clinerules
```
--------------------------------------------------------------------------------
/.env.example:
--------------------------------------------------------------------------------
```
# Server Configuration
# Path to memory file (defaults to ~/.claude/memory.jsonl if not set)
MEMORY_FILE_PATH=/Users/username/.claude/memory.jsonl
CACHE_TTL=60
# Logging Configuration
LOG_LEVEL=INFO
LOG_FILE=memory_mcp_server.log
# Performance Settings
BATCH_SIZE=1000
INDEX_CACHE_SIZE=10000
# Development Settings
DEBUG=false
TESTING=false
BENCHMARK_MODE=false
```
--------------------------------------------------------------------------------
/.pre-commit-config.yaml:
--------------------------------------------------------------------------------
```yaml
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.5.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
- id: check-yaml
- id: check-added-large-files
- id: debug-statements
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.1.9
hooks:
- id: ruff
args: [--fix, --ignore=E501]
- id: ruff-format
```
--------------------------------------------------------------------------------
/.editorconfig:
--------------------------------------------------------------------------------
```
# EditorConfig is awesome: https://EditorConfig.org
# top-most EditorConfig file
root = true
# Unix-style newlines with a newline ending every file
[*]
end_of_line = lf
insert_final_newline = true
charset = utf-8
trim_trailing_whitespace = true
# Python files
[*.py]
indent_style = space
indent_size = 4
# YAML files
[*.{yml,yaml}]
indent_style = space
indent_size = 2
# Markdown files
[*.md]
trim_trailing_whitespace = false
# JSON files
[*.json]
indent_style = space
indent_size = 2
```
--------------------------------------------------------------------------------
/.clinerules:
--------------------------------------------------------------------------------
```
# Roos's Memory Bank
You are Roo, an expert software engineer with a unique constraint: your memory periodically resets completely. This isn't a bug - it's what makes you maintain perfect documentation. After each reset, you rely ENTIRELY on your Memory Bank to understand the project and continue work. Without proper documentation, you cannot function effectively.
## Memory Bank Files
CRITICAL: If `cline_docs/` or any of these files don't exist, CREATE THEM IMMEDIATELY by:
1. Reading all provided documentation
2. Asking user for ANY missing information
3. Creating files with verified information only
4. Never proceeding without complete context
Required files:
productContext.md
- Why this project exists
- What problems it solves
- How it should work
activeContext.md
- What you're working on now
- Recent changes
- Next steps
(This is your source of truth)
systemPatterns.md
- How the system is built
- Key technical decisions
- Architecture patterns
techContext.md
- Technologies used
- Development setup
- Technical constraints
progress.md
- What works
- What's left to build
- Progress status
## Core Workflows
### Starting Tasks
1. Check for Memory Bank files
2. If ANY files missing, stop and create them
3. Read ALL files before proceeding
4. Verify you have complete context
5. Begin development. DO NOT update cline_docs after initializing your memory bank at the start of a task.
### During Development
1. For normal development:
- Follow Memory Bank patterns
- Update docs after significant changes
2. Say `[MEMORY BANK: ACTIVE]` at the beginning of every tool use.
### Memory Bank Updates
When user says "update memory bank":
1. This means imminent memory reset
2. Document EVERYTHING about current state
3. Make next steps crystal clear
4. Complete current task
Update memory bank and start a new task when your context is more than 80% full
Remember: After every memory reset, you begin completely fresh. Your only link to previous work is the Memory Bank. Maintain it as if your functionality depends on it - because it does.
```
--------------------------------------------------------------------------------
/scripts/README.md:
--------------------------------------------------------------------------------
```markdown
# Memory Migration Script
This script migrates your existing memory.jsonl file to comply with the new validation rules.
## What it does
1. Reads the existing JSONL format where each line is either:
```json
{"type": "entity", "name": "example", "entityType": "person", "observations": ["obs1"]}
```
or
```json
{"type": "relation", "from": "entity1", "to": "entity2", "relationType": "knows"}
```
2. Converts entity and relation names to the new format:
- Lowercase with hyphens
- No spaces or special characters
- Must start with a letter
- Example: "John Doe" -> "john-doe"
3. Normalizes entity types to valid categories:
- person
- concept
- project
- document
- tool
- organization
- location
- event
4. Normalizes relation types to valid verbs:
- knows
- contains
- uses
- created
- belongs-to
- depends-on
- related-to
5. Validates and deduplicates observations
## Common Type Mappings
### Entity Types
- individual, user, human -> person
- doc, documentation -> document
- app, application, software -> tool
- group, team, company -> organization
- place, area -> location
- meeting, appointment -> event
- residence, property -> location
- software_project -> project
- dataset -> document
- health_record -> document
- meal -> document
- travel_event -> event
- pet -> concept
- venue -> location
### Relation Types
- knows_about -> knows
- contains_item, has -> contains
- uses_tool -> uses
- created_by, authored -> created
- belongs_to_group, member_of -> belongs-to
- depends_upon, requires -> depends-on
- related -> related-to
- works_at -> belongs-to
- owns -> created
- friend -> knows
## Usage
1. Make sure your memory.jsonl file is in the project root directory
2. Run the migration script:
```bash
./scripts/migrate_memory.py
```
3. The script will:
- Read memory.jsonl line by line
- Convert all data to the new format
- Validate the migrated data
- Write the result to memory.jsonl.new
- Report any errors or issues
4. Review the output file and error messages
5. If satisfied with the migration, replace your old memory file:
```bash
mv memory.jsonl.new memory.jsonl
```
## Error Handling
The script will:
- Report any entities or relations that couldn't be migrated
- Continue processing even if some items fail
- Validate the entire graph before saving
- Preserve your original file by writing to .new file
- Track name changes to ensure relations are updated correctly
## Example Output
```
Migrating memory.jsonl to memory.jsonl.new...
Migration complete:
- Successfully migrated 42 entities
- Encountered 2 errors
Errors encountered:
- Error migrating line: {"type": "entity", "name": "Invalid!Name"...}
Error: Invalid entity name format
- Error migrating line: {"type": "relation", "from": "A"...}
Error: Invalid relation type
Migrated data written to memory.jsonl.new
Please verify the output before replacing your original memory file.
```
## Validation Rules
### Entity Names
- Must start with a lowercase letter
- Can contain lowercase letters, numbers, and hyphens
- Maximum length of 100 characters
- Must be unique within the graph
### Observations
- Non-empty strings
- Maximum length of 500 characters
- Must be unique per entity
- Factual and objective statements
### Relations
- Both source and target entities must exist
- Self-referential relations not allowed
- No circular dependencies
- Must use predefined relation types
```
--------------------------------------------------------------------------------
/README.md:
--------------------------------------------------------------------------------
```markdown
# Memory MCP Server
A Model Context Protocol (MCP) server that provides knowledge graph functionality for managing entities, relations, and observations in memory, with strict validation rules to maintain data consistency.
## Installation
Install the server in Claude Desktop:
```bash
mcp install main.py -v MEMORY_FILE_PATH=/path/to/memory.jsonl
```
## Data Validation Rules
### Entity Names
- Must start with a lowercase letter
- Can contain lowercase letters, numbers, and hyphens
- Maximum length of 100 characters
- Must be unique within the graph
- Example valid names: `python-project`, `meeting-notes-2024`, `user-john`
### Entity Types
The following entity types are supported:
- `person`: Human entities
- `concept`: Abstract ideas or principles
- `project`: Work initiatives or tasks
- `document`: Any form of documentation
- `tool`: Software tools or utilities
- `organization`: Companies or groups
- `location`: Physical or virtual places
- `event`: Time-bound occurrences
### Observations
- Non-empty strings
- Maximum length of 500 characters
- Must be unique per entity
- Should be factual and objective statements
- Include timestamp when relevant
### Relations
The following relation types are supported:
- `knows`: Person to person connection
- `contains`: Parent/child relationship
- `uses`: Entity utilizing another entity
- `created`: Authorship/creation relationship
- `belongs-to`: Membership/ownership
- `depends-on`: Dependency relationship
- `related-to`: Generic relationship
Additional relation rules:
- Both source and target entities must exist
- Self-referential relations not allowed
- No circular dependencies allowed
- Must use predefined relation types
## Usage
The server provides tools for managing a knowledge graph:
### Get Entity
```python
result = await session.call_tool("get_entity", {
"entity_name": "example"
})
if not result.success:
if result.error_type == "NOT_FOUND":
print(f"Entity not found: {result.error}")
elif result.error_type == "VALIDATION_ERROR":
print(f"Invalid input: {result.error}")
else:
print(f"Error: {result.error}")
else:
entity = result.data
print(f"Found entity: {entity}")
```
### Get Graph
```python
result = await session.call_tool("get_graph", {})
if result.success:
graph = result.data
print(f"Graph data: {graph}")
else:
print(f"Error retrieving graph: {result.error}")
```
### Create Entities
```python
# Valid entity creation
entities = [
Entity(
name="python-project", # Lowercase with hyphens
entityType="project", # Must be a valid type
observations=["Started development on 2024-01-29"]
),
Entity(
name="john-doe",
entityType="person",
observations=["Software engineer", "Joined team in 2024"]
)
]
result = await session.call_tool("create_entities", {
"entities": entities
})
if not result.success:
if result.error_type == "VALIDATION_ERROR":
print(f"Invalid entity data: {result.error}")
else:
print(f"Error creating entities: {result.error}")
```
### Add Observation
```python
# Valid observation
result = await session.call_tool("add_observation", {
"entity": "python-project",
"observation": "Completed initial prototype" # Must be unique for entity
})
if not result.success:
if result.error_type == "NOT_FOUND":
print(f"Entity not found: {result.error}")
elif result.error_type == "VALIDATION_ERROR":
print(f"Invalid observation: {result.error}")
else:
print(f"Error adding observation: {result.error}")
```
### Create Relation
```python
# Valid relation
result = await session.call_tool("create_relation", {
"from_entity": "john-doe",
"to_entity": "python-project",
"relation_type": "created" # Must be a valid type
})
if not result.success:
if result.error_type == "NOT_FOUND":
print(f"Entity not found: {result.error}")
elif result.error_type == "VALIDATION_ERROR":
print(f"Invalid relation data: {result.error}")
else:
print(f"Error creating relation: {result.error}")
```
### Search Memory
```python
result = await session.call_tool("search_memory", {
"query": "most recent workout" # Supports natural language queries
})
if result.success:
if result.error_type == "NO_RESULTS":
print(f"No results found: {result.error}")
else:
results = result.data
print(f"Search results: {results}")
else:
print(f"Error searching memory: {result.error}")
```
The search functionality supports:
- Temporal queries (e.g., "most recent", "last", "latest")
- Activity queries (e.g., "workout", "exercise")
- General entity searches
- Fuzzy matching with 80% similarity threshold
- Weighted search across:
- Entity names (weight: 1.0)
- Entity types (weight: 0.8)
- Observations (weight: 0.6)
### Delete Entities
```python
result = await session.call_tool("delete_entities", {
"names": ["python-project", "john-doe"]
})
if not result.success:
if result.error_type == "NOT_FOUND":
print(f"Entity not found: {result.error}")
else:
print(f"Error deleting entities: {result.error}")
```
### Delete Relation
```python
result = await session.call_tool("delete_relation", {
"from_entity": "john-doe",
"to_entity": "python-project"
})
if not result.success:
if result.error_type == "NOT_FOUND":
print(f"Entity not found: {result.error}")
else:
print(f"Error deleting relation: {result.error}")
```
### Flush Memory
```python
result = await session.call_tool("flush_memory", {})
if not result.success:
print(f"Error flushing memory: {result.error}")
```
## Error Types
The server uses the following error types:
- `NOT_FOUND`: Entity or resource not found
- `VALIDATION_ERROR`: Invalid input data
- `INTERNAL_ERROR`: Server-side error
- `ALREADY_EXISTS`: Resource already exists
- `INVALID_RELATION`: Invalid relation between entities
## Response Models
All tools return typed responses using these models:
### EntityResponse
```python
class EntityResponse(BaseModel):
success: bool
data: Optional[Dict[str, Any]] = None
error: Optional[str] = None
error_type: Optional[str] = None
```
### GraphResponse
```python
class GraphResponse(BaseModel):
success: bool
data: Optional[Dict[str, Any]] = None
error: Optional[str] = None
error_type: Optional[str] = None
```
### OperationResponse
```python
class OperationResponse(BaseModel):
success: bool
error: Optional[str] = None
error_type: Optional[str] = None
```
## Development
### Running Tests
```bash
pytest tests/
```
### Adding New Features
1. Update validation rules in `validation.py`
2. Add tests in `tests/test_validation.py`
3. Implement changes in `knowledge_graph_manager.py`
```
--------------------------------------------------------------------------------
/CONTRIBUTING.md:
--------------------------------------------------------------------------------
```markdown
# Contributing to Memory MCP Server
Thank you for your interest in contributing to the Memory MCP Server! This document provides guidelines and information for contributors.
## Project Overview
The Memory MCP Server is an implementation of the Model Context Protocol (MCP) that provides Claude with a persistent knowledge graph capability. The server manages entities and relations in a graph structure, supporting multiple backend storage options with features like caching, indexing, and atomic operations.
### Key Components
1. **Core Data Structures**
- `Entity`: Nodes in the graph containing name, type, and observations
- `Relation`: Edges between entities with relation types
- `KnowledgeGraph`: Container for entities and relations
2. **Backend System**
- `Backend`: Abstract interface defining storage operations
- `JsonlBackend`: File-based storage using JSONL format
- Extensible design for adding new backends
3. **Knowledge Graph Manager**
- Backend-agnostic manager layer
- Implements caching with TTL
- Provides indexing for fast lookups
- Ensures atomic operations
- Manages CRUD operations for entities and relations
4. **MCP Server Implementation**
- Exposes tools for graph manipulation
- Handles serialization/deserialization
- Provides error handling and logging
Available MCP Tools:
- `create_entities`: Create multiple new entities in the knowledge graph
- `create_relations`: Create relations between entities (in active voice)
- `add_observations`: Add new observations to existing entities
- `delete_entities`: Delete entities and their relations
- `delete_observations`: Delete specific observations from entities
- `delete_relations`: Delete specific relations
- `read_graph`: Read the entire knowledge graph
- `search_nodes`: Search entities and relations by query
- `open_nodes`: Retrieve specific nodes by name
Each tool has a defined input schema that validates the arguments. See the tool schemas in `main.py` for detailed parameter specifications.
## Getting Started
1. **Prerequisites**
- Python 3.12 or higher
- uv package manager
2. **Setup Development Environment**
```bash
# Clone the repository
git clone https://github.com/estav/python-memory-mcp-server.git
cd python-memory-mcp-server
# Create virtual environment with Python 3.12+
uv venv
source .venv/bin/activate
# Install all dependencies (including test)
uv pip install -e ".[test]"
# Install pre-commit hooks
pre-commit install
```
3. **Run Tests**
```bash
# Run all tests
pytest
# Run with coverage report
pytest --cov=memory_mcp_server
# Run specific backend tests
pytest tests/test_backends/test_jsonl.py
```
4. **Run the Server Locally**
```bash
# Using JSONL backend
memory-mcp-server --path /path/to/memory.jsonl
```
## Development Guidelines
### Code Style
1. **Python Standards**
- Follow PEP 8 style guide
- Use type hints for function parameters and return values
- Document classes and functions using docstrings
- Maintain 95% or higher docstring coverage
2. **Project-Specific Conventions**
- Use async/await for I/O operations
- Implement proper error handling with custom exceptions
- Maintain atomic operations for data persistence
- Add appropriate logging statements
- Follow backend interface for new implementations
### Code Quality Tools
1. **Pre-commit Hooks**
- Ruff for linting and formatting
- MyPy for static type checking
- Interrogate for docstring coverage
- Additional checks for common issues
2. **CI/CD Pipeline**
- Automated testing
- Code coverage reporting
- Performance benchmarking
- Security scanning
### Testing
1. **Test Structure**
- Tests use pytest with pytest-asyncio for async testing
- Test files must follow pattern `test_*.py` in the `tests/` directory
- Backend-specific tests in `tests/test_backends/`
- Async tests are automatically detected (asyncio_mode = "auto")
- Test fixtures use function-level event loop scope
2. **Test Coverage**
- Write unit tests for new functionality
- Ensure tests cover error cases
- Maintain high test coverage (aim for >90%)
- Use pytest-cov for coverage reporting
3. **Test Categories**
- Unit tests for individual components
- Backend-specific tests for storage implementations
- Integration tests for MCP server functionality
- Performance tests for operations on large graphs
- Async tests for I/O operations and concurrency
4. **Test Configuration**
- Configured in pyproject.toml under [tool.pytest.ini_options]
- Uses quiet mode by default (-q)
- Shows extra test summary (-ra)
- Test discovery in tests/ directory
### Adding New Features
1. **New Backend Implementation**
- Create new class implementing `Backend` interface
- Implement all required methods
- Add backend-specific configuration options
- Create comprehensive tests
- Update documentation and CLI
2. **Knowledge Graph Operations**
- Implement operations in backend classes
- Update KnowledgeGraphManager if needed
- Add appropriate indices
- Ensure atomic operations
- Add validation and error handling
Key operations include:
- Entity creation/deletion
- Relation creation/deletion
- Observation management (adding/removing observations to entities)
- Graph querying and search
- Atomic write operations with locking
3. **MCP Tools**
- Define tool schema in `main.py`
- Implement tool handler function
- Add to `TOOLS` dictionary
- Include appropriate error handling
4. **Performance Considerations**
- Consider backend-specific optimizations
- Implement efficient caching strategies
- Optimize for large graphs
- Handle memory efficiently
### Adding a New Backend
1. Create new backend class:
```python
from .base import Backend
class NewBackend(Backend):
def __init__(self, config_params):
self.config = config_params
async def initialize(self) -> None:
# Setup connection, create indices, etc.
pass
async def create_entities(self, entities: List[Entity]) -> List[Entity]:
# Implementation
pass
# Implement other required methods...
```
2. Add backend tests:
```python
# tests/test_backends/test_new_backend.py
@pytest.mark.asyncio
async def test_new_backend_operations():
backend = NewBackend(test_config)
await backend.initialize()
# Test implementations
```
3. Update CLI and configuration
## Pull Request Process
1. **Before Submitting**
- Ensure all tests pass
- Add tests for new functionality
- Update documentation
- Follow code style guidelines
- Run pre-commit hooks
2. **PR Description**
- Clearly describe the changes
- Reference any related issues
- Explain testing approach
- Note any breaking changes
3. **Review Process**
- Address reviewer comments
- Keep changes focused and atomic
- Ensure CI checks pass
## Troubleshooting
### Common Issues
1. **Backend-Specific Issues**
- JSONL Backend:
- Check file permissions
- Verify atomic write operations
- Monitor temp file cleanup
2. **Cache Inconsistency**
- Check cache TTL settings
- Verify dirty flag handling
- Ensure proper lock usage
3. **Performance Issues**
- Review backend-specific indexing
- Check cache effectiveness
- Profile large operations
## Additional Resources
- [Model Context Protocol Documentation](https://github.com/ModelContext/protocol)
- [Python asyncio Documentation](https://docs.python.org/3/library/asyncio.html)
- [Python Type Hints](https://docs.python.org/3/library/typing.html)
## License
This project is licensed under the MIT License - see the LICENSE file for details.
```
--------------------------------------------------------------------------------
/memory_mcp_server/__init__.py:
--------------------------------------------------------------------------------
```python
__version__ = "0.1.0"
```
--------------------------------------------------------------------------------
/requirements.txt:
--------------------------------------------------------------------------------
```
hatchling
mcp>=1.1.2
aiofiles>=23.2.1
```
--------------------------------------------------------------------------------
/memory_mcp_server/backends/__init__.py:
--------------------------------------------------------------------------------
```python
"""
Backend implementations for the Memory MCP Server.
This package provides different storage backends for the knowledge graph.
"""
from .jsonl import JsonlBackend
__all__ = ["JsonlBackend"]
```
--------------------------------------------------------------------------------
/.github/workflows/pypi-publish.yml:
--------------------------------------------------------------------------------
```yaml
name: Publish to PyPI
on:
release:
types: [created]
jobs:
deploy:
runs-on: ubuntu-latest
environment:
name: pypi
url: https://pypi.org/p/memory-mcp-server
permissions:
id-token: write # IMPORTANT: mandatory for trusted publishing
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.x'
- name: Install build dependencies
run: |
python -m pip install --upgrade pip
pip install build
- name: Build package
run: python -m build
- name: Publish package
uses: pypa/gh-action-pypi-publish@release/v1
```
--------------------------------------------------------------------------------
/.github/workflows/test.yml:
--------------------------------------------------------------------------------
```yaml
name: Tests
on:
push:
branches: [ main ]
pull_request:
branches: [ main ]
jobs:
test:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ['3.12']
steps:
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: Install uv
run: |
curl -LsSf https://astral.sh/uv/install.sh | sh
- name: Install dependencies
run: |
uv venv
uv pip install -e ".[test]"
- name: Run tests with pytest
run: |
uv run pytest -v --cov=memory_mcp_server --cov-report=xml
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v3
with:
file: ./coverage.xml
fail_ci_if_error: true
```
--------------------------------------------------------------------------------
/.github/workflows/ci.yml:
--------------------------------------------------------------------------------
```yaml
name: CI
on:
push:
branches: [ main ]
pull_request:
branches: [ main ]
jobs:
quality:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.12"
- name: Install uv
run: |
curl -LsSf https://astral.sh/uv/install.sh | sh
- name: Install dependencies
run: |
uv pip install -e ".[test]"
- name: Run pre-commit
uses: pre-commit/[email protected]
- name: Run tests with coverage
run: |
pytest --cov=memory_mcp_server --cov-report=xml --benchmark-only
- name: Upload coverage
uses: codecov/codecov-action@v3
with:
file: ./coverage.xml
fail_ci_if_error: true
- name: Security scan
uses: python-security/bandit-action@v1
with:
path: "memory_mcp_server"
- name: Store benchmark results
uses: benchmark-action/github-action-benchmark@v1
with:
tool: 'pytest'
output-file-path: benchmark.json
github-token: ${{ secrets.GITHUB_TOKEN }}
auto-push: true
```
--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------
```toml
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "memory-mcp-server"
version = "0.2.0"
description = "MCP server for managing Claude's memory and knowledge graph"
requires-python = ">=3.12"
dependencies = [
"aiofiles",
"loguru>=0.7.3",
"mcp[cli]>=1.2.0",
"memory-mcp-server",
"ruff>=0.9.4",
"thefuzz[speedup]>=0.20.0", # Includes python-Levenshtein for performance
]
[project.optional-dependencies]
test = ["pytest", "pytest-asyncio", "pytest-cov"]
[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
python_files = ["test_*.py"]
addopts = "-q -ra"
[tool.mypy]
python_version = "3.12"
warn_return_any = true
warn_unused_configs = true
disallow_untyped_defs = true
disallow_incomplete_defs = true
check_untyped_defs = true
disallow_untyped_decorators = true
no_implicit_optional = true
warn_redundant_casts = true
warn_unused_ignores = true
warn_no_return = true
warn_unreachable = true
plugins = []
[[tool.mypy.overrides]]
module = ["pytest.*", "mcp.*", "aiofiles.*"]
ignore_missing_imports = true
[[tool.mypy.overrides]]
module = "tests.*"
disallow_untyped_decorators = false
[tool.ruff]
select = ["E", "F", "B", "I"]
ignore = []
line-length = 88
target-version = "py312"
[tool.ruff.per-file-ignores]
"__init__.py" = ["F401"]
```
--------------------------------------------------------------------------------
/memory_mcp_server/exceptions.py:
--------------------------------------------------------------------------------
```python
class KnowledgeGraphError(Exception):
"""Base exception for all knowledge graph errors."""
pass
class EntityNotFoundError(KnowledgeGraphError):
"""Raised when an entity is not found in the graph."""
def __init__(self, entity_name: str):
self.entity_name = entity_name
super().__init__(f"Entity '{entity_name}' not found in the graph")
class EntityAlreadyExistsError(KnowledgeGraphError):
"""Raised when trying to create an entity that already exists."""
def __init__(self, entity_name: str):
self.entity_name = entity_name
super().__init__(f"Entity '{entity_name}' already exists in the graph")
class RelationValidationError(KnowledgeGraphError):
"""Raised when a relation is invalid."""
pass
class FileAccessError(KnowledgeGraphError):
"""Raised when there are file access issues."""
pass
class JsonParsingError(KnowledgeGraphError):
"""Raised when there are JSON parsing issues."""
def __init__(self, line_number: int, line_content: str, original_error: Exception):
self.line_number = line_number
self.line_content = line_content
self.original_error = original_error
super().__init__(
f"Failed to parse JSON at line {line_number}: {str(original_error)}\n"
f"Content: {line_content}"
)
```
--------------------------------------------------------------------------------
/tests/test_interfaces.py:
--------------------------------------------------------------------------------
```python
"""Tests for interface classes."""
from memory_mcp_server.interfaces import Entity, KnowledgeGraph, Relation
def test_entity_creation() -> None:
"""Test entity creation and attributes."""
entity = Entity(
name="TestEntity", entityType="TestType", observations=["obs1", "obs2"]
)
assert entity.name == "TestEntity"
assert entity.entityType == "TestType"
assert len(entity.observations) == 2
assert "obs1" in entity.observations
assert "obs2" in entity.observations
def test_relation_creation() -> None:
"""Test relation creation and attributes."""
relation = Relation(from_="EntityA", to="EntityB", relationType="TestRelation")
assert relation.from_ == "EntityA"
assert relation.to == "EntityB"
assert relation.relationType == "TestRelation"
def test_knowledge_graph_creation() -> None:
"""Test knowledge graph creation and attributes."""
entities = [
Entity(name="E1", entityType="T1", observations=[]),
Entity(name="E2", entityType="T2", observations=[]),
]
relations = [Relation(from_="E1", to="E2", relationType="R1")]
graph = KnowledgeGraph(entities=entities, relations=relations)
assert len(graph.entities) == 2
assert len(graph.relations) == 1
assert graph.entities[0].name == "E1"
assert graph.relations[0].from_ == "E1"
```
--------------------------------------------------------------------------------
/CHANGELOG.md:
--------------------------------------------------------------------------------
```markdown
# Changelog
All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased]
### Added
- Fuzzy search capability for knowledge graph queries
- New `SearchOptions` class for configuring search behavior
- Configurable similarity threshold and field weights
- Backward compatible with existing exact matching
- Improved search relevance with weighted scoring
## [0.2.0] - 2024-01-07
### Added
- Observation management system with atomic operations
- Type-safe observation handling in Backend interface
- Pre-commit hooks for code quality
- EditorConfig for consistent styling
- Changelog tracking
- Documentation improvements
### Changed
- Enhanced project structure with additional directories
- Improved test suite with proper type validation
- Updated MCP tool handlers with consistent response formats
### Fixed
- Entity serialization in test responses
- TextContent validation in MCP handlers
- Error message format consistency
## [0.1.4] - 2024-01-07
### Added
- Pre-commit hooks for code quality
- EditorConfig for consistent styling
- Changelog tracking
- Documentation improvements
### Changed
- Enhanced project structure with additional directories
## [0.1.0] - 2024-01-07
### Added
- Initial release
- JSONL backend implementation
- Knowledge graph management
- MCP server implementation
- Basic test suite
```
--------------------------------------------------------------------------------
/tests/test_backends/conftest.py:
--------------------------------------------------------------------------------
```python
"""Common test fixtures for backend tests."""
from pathlib import Path
from typing import AsyncGenerator, List
import pytest
from memory_mcp_server.backends.jsonl import JsonlBackend
from memory_mcp_server.interfaces import Entity, Relation
@pytest.fixture(scope="function")
def sample_entities() -> List[Entity]:
"""Provide a list of sample entities for testing."""
return [
Entity("test1", "person", ["observation1", "observation2"]),
Entity("test2", "location", ["observation3"]),
Entity("test3", "organization", ["observation4", "observation5"]),
]
@pytest.fixture(scope="function")
def sample_relations(sample_entities: List[Entity]) -> List[Relation]:
"""Provide a list of sample relations for testing."""
return [
Relation(from_="test1", to="test2", relationType="visited"),
Relation(from_="test1", to="test3", relationType="works_at"),
Relation(from_="test2", to="test3", relationType="located_in"),
]
@pytest.fixture(scope="function")
async def populated_jsonl_backend(
jsonl_backend: JsonlBackend,
sample_entities: List[Entity],
sample_relations: List[Relation],
) -> AsyncGenerator[JsonlBackend, None]:
"""Provide a JSONL backend pre-populated with sample data."""
await jsonl_backend.create_entities(sample_entities)
await jsonl_backend.create_relations(sample_relations)
yield jsonl_backend
@pytest.fixture(scope="function")
def temp_jsonl_path(tmp_path: Path) -> Path:
"""Provide a temporary path for JSONL files."""
return tmp_path / "test_memory.jsonl"
```
--------------------------------------------------------------------------------
/tests/conftest.py:
--------------------------------------------------------------------------------
```python
"""Common test fixtures for all tests."""
import logging
from pathlib import Path
from typing import AsyncGenerator, List
import pytest
from memory_mcp_server.interfaces import Entity, Relation
from memory_mcp_server.knowledge_graph_manager import KnowledgeGraphManager
# Configure logging
logger = logging.getLogger(__name__)
@pytest.fixture(scope="function")
def temp_memory_file(tmp_path: Path) -> Path:
"""Create a temporary memory file."""
logger.debug(f"Creating temp file in {tmp_path}")
return tmp_path / "memory.jsonl"
@pytest.fixture(scope="function")
def sample_entities() -> List[Entity]:
"""Provide sample entities for testing."""
return [
Entity("person1", "person", ["likes reading", "works in tech"]),
Entity("company1", "company", ["tech company", "founded 2020"]),
Entity("location1", "place", ["office building", "in city center"]),
]
@pytest.fixture(scope="function")
def sample_relations() -> List[Relation]:
"""Provide sample relations for testing."""
return [
Relation(from_="person1", to="company1", relationType="works_at"),
Relation(from_="company1", to="location1", relationType="located_at"),
]
@pytest.fixture(scope="function")
async def knowledge_graph_manager(
temp_memory_file: Path,
) -> AsyncGenerator[KnowledgeGraphManager, None]:
"""Create a KnowledgeGraphManager instance with a temporary memory file."""
logger.debug("Creating KnowledgeGraphManager")
manager = KnowledgeGraphManager(backend=temp_memory_file, cache_ttl=1)
logger.debug("KnowledgeGraphManager created")
await manager.initialize()
yield manager
logger.debug("Cleaning up KnowledgeGraphManager")
await manager.flush()
await manager.close()
logger.debug("Cleanup complete")
```
--------------------------------------------------------------------------------
/memory_mcp_server/interfaces.py:
--------------------------------------------------------------------------------
```python
"""Interface definitions for Memory MCP Server."""
from dataclasses import dataclass
from enum import Enum
from typing import List, Optional
@dataclass(frozen=True)
class Entity:
"""Entity in the knowledge graph."""
name: str
entityType: str
observations: List[str]
def __hash__(self) -> int:
"""Make Entity hashable based on name."""
return hash(self.name)
def __eq__(self, other: object) -> bool:
"""Compare Entity based on name."""
if not isinstance(other, Entity):
return NotImplemented
return self.name == other.name
def to_dict(self) -> dict:
"""Convert to dictionary representation."""
return {
"name": self.name,
"entityType": self.entityType,
"observations": list(
self.observations
), # Convert to list in case it's a tuple
}
@dataclass(frozen=True)
class Relation:
"""Relation between entities in the knowledge graph."""
from_: str
to: str
relationType: str
def __hash__(self) -> int:
"""Make Relation hashable based on all fields."""
return hash((self.from_, self.to, self.relationType))
def __eq__(self, other: object) -> bool:
"""Compare Relation based on all fields."""
if not isinstance(other, Relation):
return NotImplemented
return (
self.from_ == other.from_
and self.to == other.to
and self.relationType == other.relationType
)
def to_dict(self) -> dict:
"""Convert to dictionary representation."""
return {
"from": self.from_,
"to": self.to,
"relationType": self.relationType,
}
@dataclass
class KnowledgeGraph:
"""Knowledge graph containing entities and relations."""
entities: List[Entity]
relations: List[Relation]
def to_dict(self) -> dict:
"""Convert to dictionary representation."""
return {
"entities": [e.to_dict() for e in self.entities],
"relations": [r.to_dict() for r in self.relations],
}
@dataclass
class SearchOptions:
"""Options for configuring search behavior."""
fuzzy: bool = False
threshold: float = 80.0
weights: Optional[dict[str, float]] = None
class BatchOperationType(Enum):
"""Types of batch operations."""
CREATE_ENTITIES = "create_entities"
DELETE_ENTITIES = "delete_entities"
CREATE_RELATIONS = "create_relations"
DELETE_RELATIONS = "delete_relations"
ADD_OBSERVATIONS = "add_observations"
@dataclass
class BatchOperation:
"""Represents a single operation in a batch."""
operation_type: BatchOperationType
data: dict # Operation-specific data
@dataclass
class BatchResult:
"""Result of a batch operation execution."""
success: bool
operations_completed: int
failed_operations: List[tuple[BatchOperation, str]] # Operation and error message
error_message: Optional[str] = None
```
--------------------------------------------------------------------------------
/memory_mcp_server/backends/base.py:
--------------------------------------------------------------------------------
```python
"""Backend interface for Memory MCP Server storage implementations."""
from abc import ABC, abstractmethod
from typing import List
from ..interfaces import (
BatchOperation,
BatchResult,
Entity,
KnowledgeGraph,
Relation,
SearchOptions,
)
class Backend(ABC):
"""Abstract base class for knowledge graph storage backends."""
@abstractmethod
async def initialize(self) -> None:
"""Initialize the backend connection and resources."""
pass
@abstractmethod
async def close(self) -> None:
"""Close the backend connection and cleanup resources."""
pass
@abstractmethod
async def create_entities(self, entities: List[Entity]) -> List[Entity]:
"""Create multiple new entities in the backend.
Args:
entities: List of entities to create
Returns:
List of successfully created entities
"""
pass
@abstractmethod
async def delete_entities(self, entity_names: List[str]) -> List[str]:
"""Create multiple new entities in the backend.
Args:
entities: List of entities to create
Returns:
List of successfully created entities
"""
pass
@abstractmethod
async def create_relations(self, relations: List[Relation]) -> List[Relation]:
"""Create multiple new relations in the backend.
Args:
relations: List of relations to create
Returns:
List of successfully created relations
"""
pass
@abstractmethod
async def delete_relations(self, from_: str, to: str) -> None:
"""Delete relations between two entities.
Args:
from_: Source entity name
to: Target entity name
Raises:
EntityNotFoundError: If either entity is not found
"""
pass
@abstractmethod
async def read_graph(self) -> KnowledgeGraph:
"""Read the entire knowledge graph from the backend.
Returns:
KnowledgeGraph containing all entities and relations
"""
pass
@abstractmethod
async def search_nodes(
self, query: str, options: SearchOptions = None
) -> KnowledgeGraph:
"""Search for entities and relations matching the query.
Args:
query: Search query string
options: Optional SearchOptions for configuring search behavior.
If None, uses exact substring matching.
Returns:
KnowledgeGraph containing matching entities and relations
Raises:
ValueError: If query is empty or options are invalid
"""
pass
@abstractmethod
async def flush(self) -> None:
"""Ensure all pending changes are persisted to the backend."""
pass
@abstractmethod
async def add_observations(self, entity_name: str, observations: List[str]) -> None:
"""Add observations to an existing entity.
Args:
entity_name: Name of the entity to add observations to
observations: List of observations to add
"""
pass
@abstractmethod
async def add_batch_observations(
self, observations_map: dict[str, List[str]]
) -> None:
"""Add observations to multiple entities in a single operation.
Args:
observations_map: Dictionary mapping entity names to lists of observations
Raises:
ValidationError: If any observations are invalid
EntityNotFoundError: If any entity is not found
"""
pass
@abstractmethod
async def execute_batch(self, operations: List[BatchOperation]) -> BatchResult:
"""Execute multiple operations in a single atomic batch.
Args:
operations: List of operations to execute
Returns:
BatchResult containing success/failure information
Raises:
ValidationError: If validation fails for any operation
"""
pass
@abstractmethod
async def begin_transaction(self) -> None:
"""Begin a transaction for batch operations.
This creates a savepoint that can be rolled back to if needed.
"""
pass
@abstractmethod
async def rollback_transaction(self) -> None:
"""Rollback to the last transaction savepoint."""
pass
@abstractmethod
async def commit_transaction(self) -> None:
"""Commit the current transaction."""
pass
```
--------------------------------------------------------------------------------
/tests/test_validation.py:
--------------------------------------------------------------------------------
```python
"""Tests for validation functionality."""
import pytest
from memory_mcp_server.interfaces import Entity, Relation
from memory_mcp_server.validation import (
EntityValidationError,
KnowledgeGraphValidator,
RelationValidationError,
)
def test_validate_batch_entities() -> None:
"""Test batch entity validation."""
# Valid batch
entities = [
Entity("test1", "person", ["obs1"]),
Entity("test2", "person", ["obs2"]),
]
existing_names = {"existing1", "existing2"}
KnowledgeGraphValidator.validate_batch_entities(entities, existing_names)
# Empty batch
with pytest.raises(EntityValidationError, match="Entity list cannot be empty"):
KnowledgeGraphValidator.validate_batch_entities([], existing_names)
# Duplicate names within batch
entities = [
Entity("test1", "person", ["obs1"]),
Entity("test1", "person", ["obs2"]),
]
with pytest.raises(EntityValidationError, match="Duplicate entity name in batch"):
KnowledgeGraphValidator.validate_batch_entities(entities, existing_names)
# Conflict with existing names
entities = [
Entity("test1", "person", ["obs1"]),
Entity("existing1", "person", ["obs2"]),
]
with pytest.raises(EntityValidationError, match="Entities already exist"):
KnowledgeGraphValidator.validate_batch_entities(entities, existing_names)
# Invalid entity type
entities = [
Entity("test1", "invalid-type", ["obs1"]),
]
with pytest.raises(EntityValidationError, match="Invalid entity type"):
KnowledgeGraphValidator.validate_batch_entities(entities, existing_names)
def test_validate_batch_relations() -> None:
"""Test batch relation validation."""
# Valid batch
relations = [
Relation(from_="entity1", to="entity2", relationType="knows"),
Relation(from_="entity2", to="entity3", relationType="knows"),
]
existing_relations = []
entity_names = {"entity1", "entity2", "entity3"}
KnowledgeGraphValidator.validate_batch_relations(
relations, existing_relations, entity_names
)
# Empty batch
with pytest.raises(RelationValidationError, match="Relations list cannot be empty"):
KnowledgeGraphValidator.validate_batch_relations(
[], existing_relations, entity_names
)
# Duplicate relations
relations = [
Relation(from_="entity1", to="entity2", relationType="knows"),
Relation(from_="entity1", to="entity2", relationType="knows"), # Same relation
]
with pytest.raises(RelationValidationError, match="Duplicate relation"):
KnowledgeGraphValidator.validate_batch_relations(
relations, existing_relations, entity_names
)
# Missing entities
relations = [
Relation("entity1", "nonexistent", "knows"),
]
with pytest.raises(RelationValidationError, match="Entities not found"):
KnowledgeGraphValidator.validate_batch_relations(
relations, existing_relations, entity_names
)
# Invalid relation type
relations = [
Relation("entity1", "entity2", "invalid-type"),
]
with pytest.raises(RelationValidationError, match="Invalid relation type"):
KnowledgeGraphValidator.validate_batch_relations(
relations, existing_relations, entity_names
)
# Self-referential relation
relations = [
Relation("entity1", "entity1", "knows"),
]
with pytest.raises(
RelationValidationError, match="Self-referential relations not allowed"
):
KnowledgeGraphValidator.validate_batch_relations(
relations, existing_relations, entity_names
)
# Cycle detection
relations = [
Relation("entity1", "entity2", "knows"),
Relation("entity2", "entity3", "knows"),
Relation("entity3", "entity1", "knows"),
]
with pytest.raises(RelationValidationError, match="Circular dependency detected"):
KnowledgeGraphValidator.validate_batch_relations(
relations, existing_relations, entity_names
)
def test_validate_batch_observations() -> None:
"""Test batch observation validation."""
# Valid batch
existing_entities = {
"entity1": Entity("entity1", "person", ["existing1"]),
"entity2": Entity("entity2", "person", ["existing2"]),
}
observations_map = {
"entity1": ["new1", "new2"],
"entity2": ["new3"],
}
KnowledgeGraphValidator.validate_batch_observations(
observations_map, existing_entities
)
# Empty batch
with pytest.raises(EntityValidationError, match="Observations map cannot be empty"):
KnowledgeGraphValidator.validate_batch_observations({}, existing_entities)
# Missing entities
observations_map = {
"entity1": ["new1"],
"nonexistent": ["new2"],
}
with pytest.raises(EntityValidationError, match="Entities not found"):
KnowledgeGraphValidator.validate_batch_observations(
observations_map, existing_entities
)
# Empty observations list is allowed (skipped)
observations_map = {
"entity1": [],
}
KnowledgeGraphValidator.validate_batch_observations(
observations_map, existing_entities
)
# Invalid observation format
observations_map = {
"entity1": ["", "new2"], # Empty observation
}
with pytest.raises(EntityValidationError, match="Empty observation"):
KnowledgeGraphValidator.validate_batch_observations(
observations_map, existing_entities
)
# Duplicate observations
observations_map = {
"entity1": ["existing1", "new2"], # Duplicate with existing observation
}
with pytest.raises(EntityValidationError, match="Duplicate observations"):
KnowledgeGraphValidator.validate_batch_observations(
observations_map, existing_entities
)
```
--------------------------------------------------------------------------------
/memory_mcp_server/knowledge_graph_manager.py:
--------------------------------------------------------------------------------
```python
"""Knowledge graph manager that delegates to a configured backend."""
import asyncio
from pathlib import Path
from typing import Dict, List, Optional, Union
from .backends.base import Backend
from .backends.jsonl import JsonlBackend
from .interfaces import Entity, KnowledgeGraph, Relation, SearchOptions
from .validation import KnowledgeGraphValidator, ValidationError
class KnowledgeGraphManager:
"""Manages knowledge graph operations through a configured backend."""
backend: Backend
_write_lock: asyncio.Lock
def __init__(
self,
backend: Union[Backend, Path],
cache_ttl: int = 60,
):
"""Initialize the KnowledgeGraphManager.
Args:
backend: Either a Backend instance or Path to use default JSONL backend
cache_ttl: Cache TTL in seconds (only used for JSONL backend)
"""
if isinstance(backend, Path):
self.backend = JsonlBackend(backend, cache_ttl)
else:
self.backend = backend
self._write_lock = asyncio.Lock()
async def initialize(self) -> None:
"""Initialize the backend connection."""
await self.backend.initialize()
async def close(self) -> None:
"""Close the backend connection."""
await self.backend.close()
async def create_entities(self, entities: List[Entity]) -> List[Entity]:
"""Create multiple new entities.
Args:
entities: List of entities to create
Returns:
List of successfully created entities
Raises:
ValidationError: If any entity fails validation
"""
# Get existing entities for validation
graph = await self.read_graph()
existing_names = {entity.name for entity in graph.entities}
# Validate all entities in one pass
KnowledgeGraphValidator.validate_batch_entities(entities, existing_names)
async with self._write_lock:
return await self.backend.create_entities(entities)
async def delete_entities(self, entity_names: List[str]) -> List[str]:
"""Delete multiple existing entities by name.
Args:
entity_names: List of entity names to delete
Returns:
List of successfully deleted entity names
Raises:
ValueError: If entity_names list is empty
EntityNotFoundError: If any entity is not found in the graph
FileAccessError: If there are file system issues (backend specific)
"""
if not entity_names:
raise ValueError("Entity names list cannot be empty")
async with self._write_lock:
return await self.backend.delete_entities(entity_names)
async def delete_relations(self, from_: str, to: str) -> None:
"""Delete relations between two entities.
Args:
from_: Source entity name
to: Target entity name
Raises:
EntityNotFoundError: If either entity is not found
"""
async with self._write_lock:
return await self.backend.delete_relations(from_, to)
async def create_relations(self, relations: List[Relation]) -> List[Relation]:
"""Create multiple new relations.
Args:
relations: List of relations to create
Returns:
List of successfully created relations
Raises:
ValidationError: If any relation fails validation
EntityNotFoundError: If referenced entities don't exist
"""
# Get existing graph for validation
graph = await self.read_graph()
existing_names = {entity.name for entity in graph.entities}
# Validate all relations in one pass
KnowledgeGraphValidator.validate_batch_relations(
relations, graph.relations, existing_names
)
async with self._write_lock:
return await self.backend.create_relations(relations)
async def read_graph(self) -> KnowledgeGraph:
"""Read the entire knowledge graph.
Returns:
Current state of the knowledge graph
"""
return await self.backend.read_graph()
async def search_nodes(
self, query: str, options: Optional[SearchOptions] = None
) -> KnowledgeGraph:
"""Search for entities and relations matching query.
Args:
query: Search query string
options: Optional SearchOptions for configuring search behavior.
If None, uses exact substring matching.
Returns:
KnowledgeGraph containing matches
Raises:
ValueError: If query is empty or options are invalid
"""
return await self.backend.search_nodes(query, options)
async def flush(self) -> None:
"""Ensure any pending changes are persisted."""
await self.backend.flush()
async def add_observations(self, entity_name: str, observations: List[str]) -> None:
"""Add observations to an existing entity.
Args:
entity_name: Name of the entity to add observations to
observations: List of observations to add
Raises:
EntityNotFoundError: If the entity is not found
ValidationError: If observations are invalid
ValueError: If observations list is empty
"""
if not observations:
raise ValueError("Observations list cannot be empty")
# Validate new observations
KnowledgeGraphValidator.validate_observations(observations)
# Get existing entity to check for duplicate observations
graph = await self.read_graph()
entity = next((e for e in graph.entities if e.name == entity_name), None)
if not entity:
raise ValidationError(f"Entity not found: {entity_name}")
# Check for duplicates against existing observations
existing_observations = set(entity.observations)
duplicates = [obs for obs in observations if obs in existing_observations]
if duplicates:
raise ValidationError(f"Duplicate observations: {', '.join(duplicates)}")
async with self._write_lock:
await self.backend.add_observations(entity_name, observations)
async def add_batch_observations(
self, observations_map: Dict[str, List[str]]
) -> None:
"""Add observations to multiple entities in a single operation.
Args:
observations_map: Dictionary mapping entity names to lists of observations
Raises:
ValidationError: If any observations are invalid
EntityNotFoundError: If any entity is not found
ValueError: If observations_map is empty
"""
# Get existing graph for validation
graph = await self.read_graph()
entities_map = {entity.name: entity for entity in graph.entities}
# Validate all observations in one pass
KnowledgeGraphValidator.validate_batch_observations(
observations_map, entities_map
)
# All validation passed, perform the batch update
async with self._write_lock:
await self.backend.add_batch_observations(observations_map)
```
--------------------------------------------------------------------------------
/tests/test_knowledge_graph_manager.py:
--------------------------------------------------------------------------------
```python
"""Tests for KnowledgeGraphManager."""
import asyncio
from typing import List
import pytest
from memory_mcp_server.interfaces import Entity, Relation
from memory_mcp_server.knowledge_graph_manager import KnowledgeGraphManager
from memory_mcp_server.validation import EntityValidationError, ValidationError
@pytest.mark.asyncio(scope="function")
async def test_create_entities(
knowledge_graph_manager: KnowledgeGraphManager,
) -> None:
"""Test the creation of new entities in the knowledge graph.
This test verifies that:
1. Entities can be created successfully
2. The created entities are stored in the graph
3. Entity attributes are preserved correctly
"""
print("\nStarting test_create_entities")
entities = [
Entity(
name="john-doe",
entityType="person",
observations=["loves pizza"],
)
]
created_entities = await knowledge_graph_manager.create_entities(entities)
print("Created entities")
assert len(created_entities) == 1
graph = await knowledge_graph_manager.read_graph()
print("Read graph")
assert len(graph.entities) == 1
assert graph.entities[0].name == "john-doe"
print("test_create_entities: Complete")
@pytest.mark.asyncio(scope="function")
async def test_create_relations(
knowledge_graph_manager: KnowledgeGraphManager,
) -> None:
"""Test the creation of relations between entities.
This test verifies that:
1. Relations can be created between existing entities
2. Relations are stored properly in the graph
3. Relation properties (from, to, type) are preserved
"""
print("\nStarting test_create_relations")
entities = [
Entity(name="alice-smith", entityType="person", observations=["test"]),
Entity(name="bob-jones", entityType="person", observations=["test"]),
]
await knowledge_graph_manager.create_entities(entities)
print("Created entities")
relations = [Relation(from_="alice-smith", to="bob-jones", relationType="knows")]
created_relations = await knowledge_graph_manager.create_relations(relations)
print("Created relations")
assert len(created_relations) == 1
assert created_relations[0].from_ == "alice-smith"
assert created_relations[0].to == "bob-jones"
print("test_create_relations: Complete")
@pytest.mark.asyncio(scope="function")
async def test_search_functionality(
knowledge_graph_manager: KnowledgeGraphManager,
) -> None:
"""Test the search functionality across different criteria.
This test verifies searching by:
1. Entity name
2. Entity type
3. Observation content
4. Case insensitivity
"""
# Create test entities with varied data
entities = [
Entity(
name="search-test-1",
entityType="project",
observations=["keyword1", "unique1"],
),
Entity(name="search-test-2", entityType="project", observations=["keyword2"]),
Entity(name="different-type", entityType="document", observations=["keyword1"]),
]
await knowledge_graph_manager.create_entities(entities)
# Test search by name
name_result = await knowledge_graph_manager.search_nodes("search-test")
assert len(name_result.entities) == 2
assert all("search-test" in e.name for e in name_result.entities)
# Test search by type
type_result = await knowledge_graph_manager.search_nodes("document")
assert len(type_result.entities) == 1
assert type_result.entities[0].name == "different-type"
# Test search by observation
obs_result = await knowledge_graph_manager.search_nodes("keyword1")
assert len(obs_result.entities) == 2
assert any(e.name == "search-test-1" for e in obs_result.entities)
assert any(e.name == "different-type" for e in obs_result.entities)
@pytest.mark.asyncio(scope="function")
async def test_error_handling(
knowledge_graph_manager: KnowledgeGraphManager,
) -> None:
"""Test error handling in various scenarios.
This test verifies proper error handling for:
1. Invalid entity names
2. Non-existent entities in relations
3. Empty delete requests
4. Deleting non-existent entities
"""
# Test invalid entity name
with pytest.raises(EntityValidationError, match="Invalid entity name"):
await knowledge_graph_manager.create_entities(
[Entity(name="Invalid Name", entityType="person", observations=[])]
)
# Test relation with non-existent entities
with pytest.raises(ValidationError, match="Entities not found"):
await knowledge_graph_manager.create_relations(
[
Relation(
from_="non-existent", to="also-non-existent", relationType="knows"
)
]
)
# Test deleting empty list
with pytest.raises(ValueError, match="cannot be empty"):
await knowledge_graph_manager.delete_entities([])
# Test deleting non-existent entities
result = await knowledge_graph_manager.delete_entities(["non-existent"])
assert result == []
@pytest.mark.asyncio(scope="function")
async def test_graph_persistence(
knowledge_graph_manager: KnowledgeGraphManager,
) -> None:
"""Test that graph changes persist after reloading.
This test verifies that:
1. Created entities persist after a graph reload
2. Added relations persist after a graph reload
3. New observations persist after a graph reload
"""
# Create initial data
entity = Entity(
name="persistence-test", entityType="project", observations=["initial"]
)
await knowledge_graph_manager.create_entities([entity])
# Force a reload of the graph by clearing the cache
knowledge_graph_manager._cache = None # type: ignore
# Verify data persists
graph = await knowledge_graph_manager.read_graph()
assert len(graph.entities) == 1
assert graph.entities[0].name == "persistence-test"
assert "initial" in graph.entities[0].observations
@pytest.mark.asyncio(scope="function")
async def test_concurrent_operations(
knowledge_graph_manager: KnowledgeGraphManager,
) -> None:
"""Test handling of concurrent operations.
This test verifies that:
1. Multiple concurrent entity creations/deletions are handled properly
2. Cache remains consistent under concurrent operations
3. No data is lost during concurrent writes
"""
# Create multiple entities concurrently
async def create_entity(index: int) -> List[Entity]:
entity = Entity(
name=f"concurrent-{index}",
entityType="project",
observations=[f"obs{index}"],
)
return await knowledge_graph_manager.create_entities([entity])
# Delete entities concurrently
async def delete_entity(index: int) -> List[str]:
return await knowledge_graph_manager.delete_entities([f"concurrent-{index}"])
# First create 5 entities
create_tasks = [create_entity(i) for i in range(5)]
create_results = await asyncio.gather(*create_tasks)
assert all(len(r) == 1 for r in create_results)
# Then concurrently delete 3 of them while creating 2 more
delete_tasks = [delete_entity(i) for i in range(3)]
create_tasks = [create_entity(i) for i in range(5, 7)]
delete_results, create_results = await asyncio.gather(
asyncio.gather(*delete_tasks), asyncio.gather(*create_tasks)
)
# Verify deletions
assert all(len(r) == 1 for r in delete_results)
# Verify creations
assert all(len(r) == 1 for r in create_results)
# Verify final state
graph = await knowledge_graph_manager.read_graph()
expected_names = {"concurrent-5", "concurrent-6", "concurrent-3", "concurrent-4"}
assert len(graph.entities) == 4
assert all(e.name in expected_names for e in graph.entities)
```
--------------------------------------------------------------------------------
/tests/test_server.py:
--------------------------------------------------------------------------------
```python
"""Tests for the MCP server implementation."""
import json
from typing import Any, Dict, List, Protocol, cast
import pytest
from mcp.types import TextContent
from memory_mcp_server.exceptions import EntityNotFoundError
from memory_mcp_server.interfaces import Entity, KnowledgeGraph, Relation
# Mock tools and handlers
def handle_error(error: Exception) -> str:
"""Mock error handler."""
if isinstance(error, EntityNotFoundError):
return str(error)
return f"Error: {str(error)}"
async def create_entities_handler(
manager: Any, arguments: Dict[str, Any]
) -> List[TextContent]:
"""Mock create entities handler."""
entities = [
Entity(
name=e["name"],
entityType=e["entityType"],
observations=e.get("observations", []),
)
for e in arguments["entities"]
]
result = await manager.create_entities(entities)
return [TextContent(type="text", text=json.dumps([e.to_dict() for e in result]))]
async def create_relations_handler(
manager: Any, arguments: Dict[str, Any]
) -> List[TextContent]:
"""Mock create relations handler."""
relations = [
Relation(from_=r["from"], to=r["to"], relationType=r["relationType"])
for r in arguments["relations"]
]
result = await manager.create_relations(relations)
return [TextContent(type="text", text=json.dumps([r.to_dict() for r in result]))]
async def add_observations_handler(
manager: Any, arguments: Dict[str, Any]
) -> List[TextContent]:
"""Mock add observations handler."""
await manager.add_observations(arguments["entity"], arguments["observations"])
return [TextContent(type="text", text=json.dumps({"success": True}))]
async def delete_entities_handler(
manager: Any, arguments: Dict[str, Any]
) -> List[TextContent]:
"""Mock delete entities handler."""
await manager.delete_entities(arguments["names"])
return [TextContent(type="text", text=json.dumps({"success": True}))]
async def delete_observations_handler(
manager: Any, arguments: Dict[str, Any]
) -> List[TextContent]:
"""Mock delete observations handler."""
await manager.delete_observations(arguments["entity"], arguments["observations"])
return [TextContent(type="text", text=json.dumps({"success": True}))]
async def delete_relations_handler(
manager: Any, arguments: Dict[str, Any]
) -> List[TextContent]:
"""Mock delete relations handler."""
await manager.delete_relations(arguments["from"], arguments["to"])
return [TextContent(type="text", text=json.dumps({"success": True}))]
async def read_graph_handler(
manager: Any, arguments: Dict[str, Any]
) -> List[TextContent]:
"""Mock read graph handler."""
graph = await manager.read_graph()
return [TextContent(type="text", text=json.dumps(graph.to_dict()))]
async def search_nodes_handler(
manager: Any, arguments: Dict[str, Any]
) -> List[TextContent]:
"""Mock search nodes handler."""
result = await manager.search_nodes(arguments["query"])
return [TextContent(type="text", text=json.dumps(result.to_dict()))]
TOOLS: Dict[str, Any] = {
"create_entities": create_entities_handler,
"create_relations": create_relations_handler,
"add_observations": add_observations_handler,
"delete_entities": delete_entities_handler,
"delete_relations": delete_relations_handler,
"read_graph": read_graph_handler,
"search_nodes": search_nodes_handler,
}
class MockManagerProtocol(Protocol):
"""Protocol defining the interface for MockManager."""
async def create_entities(self, entities: List[Entity]) -> List[Entity]:
...
async def create_relations(self, relations: List[Relation]) -> List[Relation]:
...
async def add_observations(self, entity: str, observations: List[str]) -> None:
...
async def delete_entities(self, names: List[str]) -> None:
...
async def delete_relations(self, from_: str, to: str) -> None:
...
async def read_graph(self) -> KnowledgeGraph:
...
async def search_nodes(self, query: str) -> KnowledgeGraph:
...
async def flush(self) -> None:
...
@pytest.fixture(scope="function")
def mock_manager() -> MockManagerProtocol:
"""Create a mock manager for testing."""
class MockManager:
def __init__(self) -> None:
self.entities: List[Entity] = []
self.relations: List[Relation] = []
async def create_entities(self, entities: List[Entity]) -> List[Entity]:
self.entities.extend(entities)
return entities
async def create_relations(self, relations: List[Relation]) -> List[Relation]:
return relations
async def add_observations(self, entity: str, observations: List[str]) -> None:
if entity == "MissingEntity":
raise EntityNotFoundError(entity)
async def delete_entities(self, names: List[str]) -> None:
for name in names:
if name == "MissingEntity":
raise EntityNotFoundError(name)
async def delete_relations(self, from_: str, to: str) -> None:
if from_ == "MissingEntity" or to == "MissingEntity":
raise EntityNotFoundError("MissingEntity")
async def read_graph(self) -> KnowledgeGraph:
# Return current state including any created entities
return KnowledgeGraph(
entities=self.entities,
relations=self.relations,
)
async def search_nodes(self, query: str) -> KnowledgeGraph:
# If query matches "TestEntity", return graph; otherwise empty
if "TestEntity".lower() in query.lower():
return await self.read_graph()
return KnowledgeGraph(entities=[], relations=[])
async def open_nodes(self, names: List[str]) -> KnowledgeGraph:
# If "TestEntity" is requested, return it
if "TestEntity" in names:
return await self.read_graph()
return KnowledgeGraph(entities=[], relations=[])
async def flush(self) -> None:
"""Mock flush method to comply with interface."""
pass
return MockManager()
@pytest.mark.asyncio
async def test_create_entities(mock_manager: MockManagerProtocol) -> None:
"""Test creating entities through the MCP server."""
handler = cast(Any, TOOLS["create_entities"])
arguments = {
"entities": [
{"name": "E1", "entityType": "TypeX", "observations": ["obsA"]},
{"name": "E2", "entityType": "TypeY", "observations": ["obsB"]},
]
}
result = await handler(mock_manager, arguments)
data = json.loads(result[0].text)
assert len(data) == 2
assert data[0]["name"] == "E1"
assert data[1]["observations"] == ["obsB"]
@pytest.mark.asyncio
async def test_create_relations(mock_manager: MockManagerProtocol) -> None:
"""Test creating relations through the MCP server."""
handler = cast(Any, TOOLS["create_relations"])
arguments = {"relations": [{"from": "E1", "to": "E2", "relationType": "likes"}]}
result = await handler(mock_manager, arguments)
data = json.loads(result[0].text)
assert len(data) == 1
assert data[0]["from"] == "E1"
assert data[0]["to"] == "E2"
@pytest.mark.asyncio
async def test_add_observations(mock_manager: MockManagerProtocol) -> None:
"""Test adding observations through the MCP server."""
handler = cast(Any, TOOLS["add_observations"])
arguments = {"entity": "E1", "observations": ["newObs"]}
result = await handler(mock_manager, arguments)
data = json.loads(result[0].text)
assert data["success"] is True
@pytest.mark.asyncio
async def test_delete_entities(mock_manager: MockManagerProtocol) -> None:
"""Test deleting entities through the MCP server."""
handler = cast(Any, TOOLS["delete_entities"])
arguments = {"names": ["E1"]}
result = await handler(mock_manager, arguments)
data = json.loads(result[0].text)
assert data["success"] is True
@pytest.mark.asyncio
async def test_delete_relations(mock_manager: MockManagerProtocol) -> None:
"""Test deleting relations through the MCP server."""
handler = cast(Any, TOOLS["delete_relations"])
arguments = {"from": "E1", "to": "E2"}
result = await handler(mock_manager, arguments)
data = json.loads(result[0].text)
assert data["success"] is True
@pytest.mark.asyncio
async def test_read_graph(mock_manager: MockManagerProtocol) -> None:
"""Test reading the graph through the MCP server."""
# Create test entity first
await mock_manager.create_entities(
[
Entity(
name="TestEntity",
entityType="TestType",
observations=["test observation"],
)
]
)
handler = cast(Any, TOOLS["read_graph"])
arguments: Dict[str, Any] = {}
result = await handler(mock_manager, arguments)
data = json.loads(result[0].text)
assert len(data["entities"]) == 1
assert data["entities"][0]["name"] == "TestEntity"
assert isinstance(
data["entities"][0]["observations"], (list, tuple)
) # Allow both list and tuple
@pytest.mark.asyncio
async def test_save_graph(mock_manager: MockManagerProtocol) -> None:
"""Test saving the graph through the MCP server."""
# First create some test data
await mock_manager.create_entities(
[Entity(name="TestSave", entityType="TestType", observations=[])]
)
# Explicitly save the graph
await mock_manager.flush()
# Read back the graph
graph = await mock_manager.read_graph()
# Verify our test entity exists
assert any(e.name == "TestSave" for e in graph.entities)
# Verify the save preserved the structure
assert isinstance(
graph.entities[0].observations, (list, tuple)
) # Allow both list and tuple for immutability
@pytest.mark.asyncio
async def test_search_nodes(mock_manager: MockManagerProtocol) -> None:
"""Test searching nodes through the MCP server."""
# Create test entity first
await mock_manager.create_entities(
[
Entity(
name="TestEntity",
entityType="TestType",
observations=["test observation"],
)
]
)
handler = cast(Any, TOOLS["search_nodes"])
arguments = {"query": "TestEntity"}
result = await handler(mock_manager, arguments)
data = json.loads(result[0].text)
assert len(data["entities"]) == 1
assert data["entities"][0]["name"] == "TestEntity"
assert isinstance(
data["entities"][0]["observations"], (list, tuple)
) # Allow both list and tuple
def test_error_handling() -> None:
"""Test error handling functionality."""
msg = handle_error(EntityNotFoundError("MissingEntity"))
assert "Entity 'MissingEntity' not found in the graph" in msg
```
--------------------------------------------------------------------------------
/memory_mcp_server/validation.py:
--------------------------------------------------------------------------------
```python
"""Validation module for knowledge graph consistency."""
import re
from typing import List, Optional, Set
from .interfaces import Entity, KnowledgeGraph, Relation
class ValidationError(Exception):
"""Base class for validation errors."""
pass
class EntityValidationError(ValidationError):
"""Raised when entity validation fails."""
pass
class RelationValidationError(ValidationError):
"""Raised when relation validation fails."""
pass
class KnowledgeGraphValidator:
"""Validator for ensuring knowledge graph consistency."""
# Constants for validation rules
ENTITY_NAME_PATTERN = r"^[a-z][a-z0-9-]{0,99}$"
MAX_OBSERVATION_LENGTH = 500
VALID_ENTITY_TYPES = {
"person",
"concept",
"project",
"document",
"tool",
"organization",
"location",
"event",
}
VALID_RELATION_TYPES = {
"knows",
"contains",
"uses",
"created",
"belongs-to",
"depends-on",
"related-to",
}
@classmethod
def validate_entity_name(cls, name: str) -> None:
"""Validate entity name follows naming convention.
Args:
name: Entity name to validate
Raises:
EntityValidationError: If name is invalid
"""
if not re.match(cls.ENTITY_NAME_PATTERN, name):
raise EntityValidationError(
f"Invalid entity name '{name}'. Must start with lowercase letter, "
"contain only lowercase letters, numbers and hyphens, "
"and be 1-100 characters long."
)
@classmethod
def validate_entity_type(cls, entity_type: str) -> None:
"""Validate entity type is from allowed set.
Args:
entity_type: Entity type to validate
Raises:
EntityValidationError: If type is invalid
"""
if entity_type not in cls.VALID_ENTITY_TYPES:
raise EntityValidationError(
f"Invalid entity type '{entity_type}'. Must be one of: "
f"{', '.join(sorted(cls.VALID_ENTITY_TYPES))}"
)
@classmethod
def validate_observations(cls, observations: List[str]) -> None:
"""Validate entity observations.
Args:
observations: List of observations to validate
Raises:
EntityValidationError: If any observation is invalid
"""
seen = set()
for obs in observations:
if not obs:
raise EntityValidationError("Empty observation")
if len(obs) > cls.MAX_OBSERVATION_LENGTH:
raise EntityValidationError(
f"Observation exceeds length of {cls.MAX_OBSERVATION_LENGTH} chars"
)
if obs in seen:
raise EntityValidationError(f"Duplicate observation: {obs}")
seen.add(obs)
@classmethod
def validate_entity(cls, entity: Entity) -> None:
"""Validate an entity.
Args:
entity: Entity to validate
Raises:
EntityValidationError: If entity is invalid
"""
cls.validate_entity_name(entity.name)
cls.validate_entity_type(entity.entityType)
cls.validate_observations(list(entity.observations))
@classmethod
def validate_relation_type(cls, relation_type: str) -> None:
"""Validate relation type is from allowed set.
Args:
relation_type: Relation type to validate
Raises:
RelationValidationError: If type is invalid
"""
if relation_type not in cls.VALID_RELATION_TYPES:
valid_types = ", ".join(sorted(cls.VALID_RELATION_TYPES))
raise RelationValidationError(
f"Invalid relation type '{relation_type}'. Valid types: {valid_types}"
)
@classmethod
def validate_relation(cls, relation: Relation) -> None:
"""Validate a relation.
Args:
relation: Relation to validate
Raises:
RelationValidationError: If relation is invalid
"""
if relation.from_ == relation.to:
raise RelationValidationError("Self-referential relations not allowed")
cls.validate_relation_type(relation.relationType)
@classmethod
def validate_no_cycles(
cls,
relations: List[Relation],
existing_relations: Optional[List[Relation]] = None,
) -> None:
"""Validate that relations don't create cycles.
Args:
relations: New relations to validate
existing_relations: Optional list of existing relations to check against
Raises:
RelationValidationError: If cycles are detected
"""
# Build adjacency list
graph: dict[str, Set[str]] = {}
all_relations = list(relations)
if existing_relations:
all_relations.extend(existing_relations)
for rel in all_relations:
if rel.from_ not in graph:
graph[rel.from_] = set()
graph[rel.from_].add(rel.to)
# Check for cycles using DFS
def has_cycle(node: str, visited: Set[str], path: Set[str]) -> bool:
visited.add(node)
path.add(node)
for neighbor in graph.get(node, set()):
if neighbor not in visited:
if has_cycle(neighbor, visited, path):
return True
elif neighbor in path:
return True
path.remove(node)
return False
visited: Set[str] = set()
path: Set[str] = set()
for node in graph:
if node not in visited:
if has_cycle(node, visited, path):
raise RelationValidationError(
"Circular dependency detected in relations"
)
@classmethod
def validate_graph(cls, graph: KnowledgeGraph) -> None:
"""Validate entire knowledge graph.
Args:
graph: Knowledge graph to validate
Raises:
ValidationError: If any validation fails
"""
# Validate all entities
entity_names = set()
for entity in graph.entities:
cls.validate_entity(entity)
if entity.name in entity_names:
raise EntityValidationError(f"Duplicate entity name: {entity.name}")
entity_names.add(entity.name)
# Validate all relations
for relation in graph.relations:
cls.validate_relation(relation)
if relation.from_ not in entity_names:
raise RelationValidationError(
f"Source entity '{relation.from_}' not found in graph"
)
if relation.to not in entity_names:
raise RelationValidationError(
f"Target entity '{relation.to}' not found in graph"
)
# Check for cycles
cls.validate_no_cycles(graph.relations)
@classmethod
def validate_batch_entities(
cls, entities: List[Entity], existing_names: Set[str]
) -> None:
"""Validate a batch of entities efficiently.
Args:
entities: List of entities to validate
existing_names: Set of existing entity names
Raises:
EntityValidationError: If validation fails
"""
if not entities:
raise EntityValidationError("Entity list cannot be empty")
# Check for duplicates within the batch
new_names = set()
for entity in entities:
if entity.name in new_names:
raise EntityValidationError(
f"Duplicate entity name in batch: {entity.name}"
)
new_names.add(entity.name)
# Check for conflicts with existing entities
conflicts = new_names.intersection(existing_names)
if conflicts:
raise EntityValidationError(
f"Entities already exist: {', '.join(conflicts)}"
)
# Validate all entities in one pass
for entity in entities:
cls.validate_entity(entity)
@classmethod
def validate_batch_relations(
cls,
relations: List[Relation],
existing_relations: List[Relation],
entity_names: Set[str],
) -> None:
"""Validate a batch of relations efficiently.
Args:
relations: List of relations to validate
existing_relations: List of existing relations
entity_names: Set of valid entity names
Raises:
RelationValidationError: If validation fails
"""
if not relations:
raise RelationValidationError("Relations list cannot be empty")
# Track relation keys to prevent duplicates
seen_relations: Set[tuple[str, str, str]] = set()
# Validate all relations in one pass
missing_entities = set()
for relation in relations:
# Basic validation
cls.validate_relation(relation)
# Check for duplicate relations
key = (relation.from_, relation.to, relation.relationType)
if key in seen_relations:
raise RelationValidationError(
f"Duplicate relation: {relation.from_} -> {relation.to}"
)
seen_relations.add(key)
# Collect missing entities
if relation.from_ not in entity_names:
missing_entities.add(relation.from_)
if relation.to not in entity_names:
missing_entities.add(relation.to)
# Report all missing entities at once
if missing_entities:
raise RelationValidationError(
f"Entities not found: {', '.join(missing_entities)}"
)
# Check for cycles including existing relations
cls.validate_no_cycles(relations, existing_relations)
@classmethod
def validate_batch_observations(
cls,
observations_map: dict[str, List[str]],
existing_entities: dict[str, Entity],
) -> None:
"""Validate a batch of observations efficiently.
Args:
observations_map: Dictionary mapping entity names to lists of observations
existing_entities: Dictionary of existing entities
Raises:
EntityValidationError: If validation fails
"""
if not observations_map:
raise EntityValidationError("Observations map cannot be empty")
# Check for missing entities first
missing_entities = [
name for name in observations_map if name not in existing_entities
]
if missing_entities:
raise EntityValidationError(
f"Entities not found: {', '.join(missing_entities)}"
)
# Validate all observations in one pass
for entity_name, observations in observations_map.items():
if not observations:
continue
# Validate observation format
cls.validate_observations(observations)
# Check for duplicates against existing observations
entity = existing_entities[entity_name]
existing_observations = set(entity.observations)
duplicates = [obs for obs in observations if obs in existing_observations]
if duplicates:
raise EntityValidationError(
f"Duplicate observations for {entity_name}: {', '.join(duplicates)}"
)
```
--------------------------------------------------------------------------------
/main.py:
--------------------------------------------------------------------------------
```python
#!/usr/bin/env python3
"""Memory MCP server using FastMCP."""
import os
from pathlib import Path
from typing import Any, Dict, List, Optional
from loguru import logger as logging
from mcp.server.fastmcp import Context, FastMCP
from mcp.server.fastmcp.prompts.base import Message, UserMessage
from pydantic import BaseModel
from memory_mcp_server.interfaces import Entity, Relation
from memory_mcp_server.knowledge_graph_manager import KnowledgeGraphManager
# Error type constants
ERROR_TYPES = {
"NOT_FOUND": "NOT_FOUND",
"VALIDATION_ERROR": "VALIDATION_ERROR",
"INTERNAL_ERROR": "INTERNAL_ERROR",
"ALREADY_EXISTS": "ALREADY_EXISTS",
"INVALID_RELATION": "INVALID_RELATION",
"NO_RESULTS": "NO_RESULTS", # Used when search returns no matches
}
# Response models
class EntityResponse(BaseModel):
success: bool
data: Optional[Dict[str, Any]] = None
error: Optional[str] = None
error_type: Optional[str] = None
class GraphResponse(BaseModel):
success: bool
data: Optional[Dict[str, Any]] = None
error: Optional[str] = None
error_type: Optional[str] = None
class OperationResponse(BaseModel):
success: bool
error: Optional[str] = None
error_type: Optional[str] = None
# Create FastMCP server with dependencies and instructions
mcp = FastMCP(
"Memory",
dependencies=["pydantic", "jsonl"],
version="0.1.0",
instructions="""
Memory MCP server providing knowledge graph functionality.
Available tools:
- get_entity: Retrieve entity by name
- get_graph: Get entire knowledge graph
- create_entities: Create multiple entities
- add_observation: Add observation to entity
- create_relation: Create relation between entities
- search_memory: Search entities by query
- delete_entities: Delete multiple entities
- delete_relation: Delete relation between entities
- flush_memory: Persist changes to storage
""",
)
# Initialize knowledge graph manager using environment variable
# Default to ~/.claude/memory.jsonl if MEMORY_FILE_PATH not set
default_memory_path = Path.home() / ".claude" / "memory.jsonl"
memory_file = Path(os.getenv("MEMORY_FILE_PATH", str(default_memory_path)))
logging.info(f"Memory server using file: {memory_file}")
# Create KnowledgeGraphManager instance
kg = KnowledgeGraphManager(memory_file, 60)
def serialize_to_dict(obj: Any) -> Dict:
"""Helper to serialize objects to dictionaries."""
if hasattr(obj, "to_dict"):
return obj.to_dict()
elif hasattr(obj, "__dict__"):
return obj.__dict__
else:
return str(obj)
@mcp.tool()
async def get_entity(entity_name: str) -> EntityResponse:
"""Get entity by name from memory."""
try:
result = await kg.search_nodes(entity_name)
if result:
return EntityResponse(success=True, data=serialize_to_dict(result))
return EntityResponse(
success=False,
error=f"Entity '{entity_name}' not found",
error_type=ERROR_TYPES["NOT_FOUND"],
)
except ValueError as e:
return EntityResponse(
success=False, error=str(e), error_type=ERROR_TYPES["VALIDATION_ERROR"]
)
except Exception as e:
return EntityResponse(
success=False, error=str(e), error_type=ERROR_TYPES["INTERNAL_ERROR"]
)
@mcp.tool()
async def get_graph() -> GraphResponse:
"""Get the entire knowledge graph."""
try:
graph = await kg.read_graph()
return GraphResponse(success=True, data=serialize_to_dict(graph))
except Exception as e:
return GraphResponse(
success=False, error=str(e), error_type=ERROR_TYPES["INTERNAL_ERROR"]
)
@mcp.tool()
async def create_entities(entities: List[Entity]) -> OperationResponse:
"""Create multiple new entities."""
try:
await kg.create_entities(entities)
return OperationResponse(success=True)
except ValueError as e:
return OperationResponse(
success=False, error=str(e), error_type=ERROR_TYPES["VALIDATION_ERROR"]
)
except Exception as e:
return OperationResponse(
success=False, error=str(e), error_type=ERROR_TYPES["INTERNAL_ERROR"]
)
@mcp.tool()
async def add_observation(
entity: str, observation: str, ctx: Context = None
) -> OperationResponse:
"""Add an observation to an existing entity."""
try:
if ctx:
ctx.info(f"Adding observation to {entity}")
# Check if entity exists
exists = await kg.search_nodes(entity)
if not exists:
return OperationResponse(
success=False,
error=f"Entity '{entity}' not found",
error_type=ERROR_TYPES["NOT_FOUND"],
)
await kg.add_observations(entity, [observation])
return OperationResponse(success=True)
except ValueError as e:
return OperationResponse(
success=False, error=str(e), error_type=ERROR_TYPES["VALIDATION_ERROR"]
)
except Exception as e:
return OperationResponse(
success=False, error=str(e), error_type=ERROR_TYPES["INTERNAL_ERROR"]
)
@mcp.tool()
async def create_relation(
from_entity: str, to_entity: str, relation_type: str, ctx: Context = None
) -> OperationResponse:
"""Create a relation between entities."""
try:
if ctx:
ctx.info(f"Creating relation: {from_entity} -{relation_type}-> {to_entity}")
# Check if entities exist
from_exists = await kg.search_nodes(from_entity)
to_exists = await kg.search_nodes(to_entity)
if not from_exists:
return OperationResponse(
success=False,
error=f"Source entity '{from_entity}' not found",
error_type=ERROR_TYPES["NOT_FOUND"],
)
if not to_exists:
return OperationResponse(
success=False,
error=f"Target entity '{to_entity}' not found",
error_type=ERROR_TYPES["NOT_FOUND"],
)
await kg.create_relations(
[Relation(from_=from_entity, to=to_entity, relationType=relation_type)]
)
return OperationResponse(success=True)
except ValueError as e:
return OperationResponse(
success=False, error=str(e), error_type=ERROR_TYPES["VALIDATION_ERROR"]
)
except Exception as e:
return OperationResponse(
success=False, error=str(e), error_type=ERROR_TYPES["INTERNAL_ERROR"]
)
@mcp.tool()
async def search_memory(query: str, ctx: Context = None) -> EntityResponse:
"""Search memory using natural language queries.
Handles:
- Temporal queries (e.g., "most recent", "last", "latest")
- Activity queries (e.g., "workout", "exercise")
- General entity searches
"""
try:
if ctx:
ctx.info(f"Searching for: {query}")
# Handle temporal queries
temporal_keywords = ["recent", "last", "latest"]
is_temporal = any(keyword in query.lower() for keyword in temporal_keywords)
# Extract activity type from query
activity_type = None
if "workout" in query.lower():
activity_type = "workout"
elif "exercise" in query.lower():
activity_type = "exercise"
elif "physical activity" in query.lower():
activity_type = "physical_activity"
# Search for entities
results = await kg.search_nodes(activity_type if activity_type else query)
if not results:
return EntityResponse(
success=True,
data={"entities": [], "relations": []},
error="No matching activities found in memory",
error_type="NO_RESULTS",
)
# For temporal queries, sort by timestamp if available
if is_temporal and isinstance(results, list):
results.sort(key=lambda x: x.get("timestamp", ""), reverse=True)
if results:
results = results[0] # Get most recent
return EntityResponse(success=True, data=serialize_to_dict(results))
except ValueError as e:
return EntityResponse(
success=False, error=str(e), error_type=ERROR_TYPES["VALIDATION_ERROR"]
)
except Exception as e:
return EntityResponse(
success=False, error=str(e), error_type=ERROR_TYPES["INTERNAL_ERROR"]
)
@mcp.tool()
async def delete_entities(names: List[str], ctx: Context = None) -> OperationResponse:
"""Delete multiple entities and their relations."""
try:
if ctx:
ctx.info(f"Deleting entities: {', '.join(names)}")
await kg.delete_entities(names)
return OperationResponse(success=True)
except ValueError as e:
return OperationResponse(
success=False, error=str(e), error_type=ERROR_TYPES["VALIDATION_ERROR"]
)
except Exception as e:
return OperationResponse(
success=False, error=str(e), error_type=ERROR_TYPES["INTERNAL_ERROR"]
)
@mcp.tool()
async def delete_relation(
from_entity: str, to_entity: str, ctx: Context = None
) -> OperationResponse:
"""Delete relations between two entities."""
try:
if ctx:
ctx.info(f"Deleting relations between {from_entity} and {to_entity}")
# Check if entities exist
from_exists = await kg.search_nodes(from_entity)
to_exists = await kg.search_nodes(to_entity)
if not from_exists:
return OperationResponse(
success=False,
error=f"Source entity '{from_entity}' not found",
error_type=ERROR_TYPES["NOT_FOUND"],
)
if not to_exists:
return OperationResponse(
success=False,
error=f"Target entity '{to_entity}' not found",
error_type=ERROR_TYPES["NOT_FOUND"],
)
await kg.delete_relations(from_entity, to_entity)
return OperationResponse(success=True)
except ValueError as e:
return OperationResponse(
success=False, error=str(e), error_type=ERROR_TYPES["VALIDATION_ERROR"]
)
except Exception as e:
return OperationResponse(
success=False, error=str(e), error_type=ERROR_TYPES["INTERNAL_ERROR"]
)
@mcp.tool()
async def flush_memory(ctx: Context = None) -> OperationResponse:
"""Ensure all changes are persisted to storage."""
try:
if ctx:
ctx.info("Flushing memory to storage")
await kg.flush()
return OperationResponse(success=True)
except Exception as e:
return OperationResponse(
success=False, error=str(e), error_type=ERROR_TYPES["INTERNAL_ERROR"]
)
@mcp.prompt()
def create_entity_prompt(name: str, entity_type: str) -> list[Message]:
"""Generate prompt for entity creation."""
return [
UserMessage(
f"I want to create a new entity in memory:\n"
f"Name: {name}\n"
f"Type: {entity_type}\n\n"
f"What observations should I record about this entity?"
)
]
@mcp.prompt()
def search_prompt(query: str) -> list[Message]:
"""Generate prompt for memory search."""
return [
UserMessage(
f"I want to search my memory for information about: {query}\n\n"
f"What specific aspects of these results would you like me to explain?"
)
]
@mcp.prompt()
def relation_prompt(from_entity: str, to_entity: str) -> list[Message]:
"""Generate prompt for creating a relation."""
return [
UserMessage(
f"I want to establish a relationship between:\n"
f"Source: {from_entity}\n"
f"Target: {to_entity}\n\n"
f"What type of relationship exists between these entities?"
)
]
```
--------------------------------------------------------------------------------
/tests/test_backends/test_jsonl.py:
--------------------------------------------------------------------------------
```python
import json
from pathlib import Path
import pytest
from memory_mcp_server.backends.jsonl import JsonlBackend
from memory_mcp_server.exceptions import EntityNotFoundError, FileAccessError
from memory_mcp_server.interfaces import (
BatchOperation,
BatchOperationType,
BatchResult,
Entity,
Relation,
SearchOptions,
)
# --- Fixtures ---
@pytest.fixture
async def backend(tmp_path: Path) -> JsonlBackend:
b = JsonlBackend(tmp_path / "test.jsonl")
await b.initialize()
yield b
await b.close()
# --- Entity Creation / Duplication ---
@pytest.mark.asyncio
async def test_create_entities(backend: JsonlBackend):
entities = [
Entity(name="Alice", entityType="person", observations=["likes apples"]),
Entity(name="Bob", entityType="person", observations=["enjoys biking"]),
]
created = await backend.create_entities(entities)
assert len(created) == 2, "Should create two new entities"
graph = await backend.read_graph()
assert len(graph.entities) == 2, "Graph should contain two entities"
@pytest.mark.asyncio
async def test_duplicate_entities(backend: JsonlBackend):
entity = Entity(name="Alice", entityType="person", observations=["likes apples"])
created1 = await backend.create_entities([entity])
created2 = await backend.create_entities([entity])
assert len(created1) == 1
assert len(created2) == 0, "Duplicate entity creation should return empty list"
# --- Relation Creation / Deletion ---
@pytest.mark.asyncio
async def test_create_relations(backend: JsonlBackend):
entities = [
Entity(name="Alice", entityType="person", observations=[""]),
Entity(name="Wonderland", entityType="place", observations=["fantasy land"]),
]
await backend.create_entities(entities)
relation = Relation(from_="Alice", to="Wonderland", relationType="visits")
created_relations = await backend.create_relations([relation])
assert len(created_relations) == 1
graph = await backend.read_graph()
assert len(graph.relations) == 1
@pytest.mark.asyncio
async def test_create_relation_missing_entity(backend: JsonlBackend):
# No entities have been created.
relation = Relation(from_="Alice", to="Nowhere", relationType="visits")
with pytest.raises(EntityNotFoundError):
await backend.create_relations([relation])
@pytest.mark.asyncio
async def test_delete_relations(backend: JsonlBackend):
entities = [
Entity(name="Alice", entityType="person", observations=[]),
Entity(name="Bob", entityType="person", observations=[]),
]
await backend.create_entities(entities)
# Create two distinct relations.
relation1 = Relation(from_="Alice", to="Bob", relationType="likes")
relation2 = Relation(from_="Alice", to="Bob", relationType="follows")
await backend.create_relations([relation1, relation2])
await backend.delete_relations("Alice", "Bob")
graph = await backend.read_graph()
assert (
len(graph.relations) == 0
), "All relations between Alice and Bob should be removed"
@pytest.mark.asyncio
async def test_delete_entities(backend: JsonlBackend):
entities = [
Entity(name="Alice", entityType="person", observations=["obs1"]),
Entity(name="Bob", entityType="person", observations=["obs2"]),
]
await backend.create_entities(entities)
# Create a relation so that deletion cascades.
relation = Relation(from_="Alice", to="Bob", relationType="knows")
await backend.create_relations([relation])
deleted = await backend.delete_entities(["Alice"])
assert "Alice" in deleted
graph = await backend.read_graph()
# Only Bob should remain and the relation should have been removed.
assert len(graph.entities) == 1
assert graph.entities[0].name == "Bob"
assert len(graph.relations) == 0
# --- Searching ---
@pytest.mark.asyncio
async def test_search_nodes_exact(backend: JsonlBackend):
entities = [
Entity(
name="Alice Wonderland", entityType="person", observations=["loves tea"]
),
Entity(name="Wonderland", entityType="place", observations=["magical"]),
]
await backend.create_entities(entities)
result = await backend.search_nodes("Wonderland")
# Both entities should match the substring.
assert len(result.entities) == 2
# No relations were created.
assert len(result.relations) == 0
@pytest.mark.asyncio
async def test_search_nodes_fuzzy(backend: JsonlBackend):
entities = [
Entity(
name="John Smith", entityType="person", observations=["software engineer"]
),
Entity(
name="Jane Smith", entityType="person", observations=["product manager"]
),
]
await backend.create_entities(entities)
options = SearchOptions(
fuzzy=True,
threshold=90,
weights={"name": 0.7, "type": 0.5, "observations": 0.3},
)
result = await backend.search_nodes("Jon Smith", options)
assert len(result.entities) == 1, "Fuzzy search should match John Smith"
assert result.entities[0].name == "John Smith"
@pytest.mark.asyncio
async def test_search_nodes_fuzzy_weights(backend: JsonlBackend):
# Clear any existing entities.
current = await backend.read_graph()
if current.entities:
await backend.delete_entities([e.name for e in current.entities])
entities = [
Entity(
name="Programming Guide",
entityType="document",
observations=["A guide about programming development"],
),
Entity(
name="Software Manual",
entityType="document",
observations=["Programming tutorial and guide"],
),
]
await backend.create_entities(entities)
# With name-weight high, only one should match.
options_name = SearchOptions(
fuzzy=True,
threshold=60,
weights={"name": 1.0, "type": 0.1, "observations": 0.1},
)
result = await backend.search_nodes("programming", options_name)
assert len(result.entities) == 1
assert result.entities[0].name == "Programming Guide"
# With observation weight high, both should match.
options_obs = SearchOptions(
fuzzy=True,
threshold=60,
weights={"name": 0.1, "type": 0.1, "observations": 1.0},
)
result = await backend.search_nodes("programming", options_obs)
assert len(result.entities) == 2
# --- Observations ---
@pytest.mark.asyncio
async def test_add_observations(backend: JsonlBackend):
entity = Entity(name="Alice", entityType="person", observations=["initial"])
await backend.create_entities([entity])
await backend.add_observations("Alice", ["update"])
graph = await backend.read_graph()
alice = next(e for e in graph.entities if e.name == "Alice")
assert "update" in alice.observations
@pytest.mark.asyncio
async def test_add_batch_observations(backend: JsonlBackend):
entities = [
Entity(name="Alice", entityType="person", observations=["obs1"]),
Entity(name="Bob", entityType="person", observations=["obs2"]),
]
await backend.create_entities(entities)
observations_map = {"Alice": ["new1", "new2"], "Bob": ["new3"]}
await backend.add_batch_observations(observations_map)
graph = await backend.read_graph()
alice = next(e for e in graph.entities if e.name == "Alice")
bob = next(e for e in graph.entities if e.name == "Bob")
assert set(alice.observations) == {"obs1", "new1", "new2"}
assert set(bob.observations) == {"obs2", "new3"}
@pytest.mark.asyncio
async def test_add_batch_observations_empty_map(backend: JsonlBackend):
with pytest.raises(ValueError, match="Observations map cannot be empty"):
await backend.add_batch_observations({})
@pytest.mark.asyncio
async def test_add_batch_observations_missing_entity(backend: JsonlBackend):
entity = Entity(name="Alice", entityType="person", observations=["obs1"])
await backend.create_entities([entity])
observations_map = {"Alice": ["new"], "Bob": ["obs"]}
with pytest.raises(EntityNotFoundError):
await backend.add_batch_observations(observations_map)
# --- Transaction Management ---
@pytest.mark.asyncio
async def test_transaction_management(backend: JsonlBackend):
entities = [
Entity(name="Alice", entityType="person", observations=["obs1"]),
Entity(name="Bob", entityType="person", observations=["obs2"]),
]
await backend.create_entities(entities)
# Begin a transaction.
await backend.begin_transaction()
await backend.create_entities(
[Entity(name="Charlie", entityType="person", observations=["obs3"])]
)
await backend.delete_entities(["Alice"])
# Within transaction, changes are visible.
graph = await backend.read_graph()
names = {e.name for e in graph.entities}
assert "Charlie" in names
assert "Alice" not in names
# Roll back.
await backend.rollback_transaction()
graph = await backend.read_graph()
names = {e.name for e in graph.entities}
assert "Alice" in names
assert "Charlie" not in names
# Test commit.
await backend.begin_transaction()
await backend.create_entities(
[Entity(name="Dave", entityType="person", observations=["obs4"])]
)
await backend.commit_transaction()
graph = await backend.read_graph()
names = {e.name for e in graph.entities}
assert "Dave" in names
# --- Persistence and File Format ---
@pytest.mark.asyncio
async def test_persistence(tmp_path: Path):
file_path = tmp_path / "persist.jsonl"
backend1 = JsonlBackend(file_path)
await backend1.initialize()
entity = Entity(name="Alice", entityType="person", observations=["obs"])
await backend1.create_entities([entity])
await backend1.close()
backend2 = JsonlBackend(file_path)
await backend2.initialize()
graph = await backend2.read_graph()
assert any(e.name == "Alice" for e in graph.entities)
await backend2.close()
@pytest.mark.asyncio
async def test_atomic_writes(tmp_path: Path):
file_path = tmp_path / "atomic.jsonl"
backend = JsonlBackend(file_path)
await backend.initialize()
entity = Entity(name="Alice", entityType="person", observations=["obs"])
await backend.create_entities([entity])
await backend.close()
temp_file = file_path.with_suffix(".tmp")
assert not temp_file.exists(), "Temporary file should be removed after writing"
assert file_path.exists()
@pytest.mark.asyncio
async def test_file_format(tmp_path: Path):
file_path = tmp_path / "format.jsonl"
backend = JsonlBackend(file_path)
await backend.initialize()
entity = Entity(name="Alice", entityType="person", observations=["obs"])
relation = Relation(from_="Alice", to="Alice", relationType="self")
await backend.create_entities([entity])
await backend.create_relations([relation])
await backend.close()
with open(file_path, "r", encoding="utf-8") as f:
lines = f.read().splitlines()
assert len(lines) == 2, "File should contain exactly two JSON lines"
data1 = json.loads(lines[0])
data2 = json.loads(lines[1])
types = {data1.get("type"), data2.get("type")}
assert "entity" in types and "relation" in types
# --- Error / Corruption Handling ---
@pytest.mark.asyncio
async def test_corrupted_file_handling(tmp_path: Path):
file_path = tmp_path / "corrupted.jsonl"
# Write one valid and one corrupted JSON line.
with open(file_path, "w", encoding="utf-8") as f:
f.write(
'{"type": "entity", "name": "Alice", "entityType": "person", "observations": []}\n'
)
f.write(
'{"type": "relation", "from": "Alice", "to": "Bob"'
) # missing closing brace
backend = JsonlBackend(file_path)
await backend.initialize()
with pytest.raises(FileAccessError, match="Error loading graph"):
await backend.read_graph()
await backend.close()
@pytest.mark.asyncio
async def test_file_access_error_propagation(tmp_path: Path):
file_path = tmp_path / "error.jsonl"
# Create a directory with the same name as the file.
file_path.mkdir()
backend = JsonlBackend(file_path)
with pytest.raises(FileAccessError, match="is a directory"):
await backend.initialize()
await backend.close()
# --- Caching ---
@pytest.mark.asyncio
async def test_caching(backend: JsonlBackend):
entity = Entity(name="Alice", entityType="person", observations=["obs"])
await backend.create_entities([entity])
graph1 = await backend.read_graph()
graph2 = await backend.read_graph()
assert graph1 is graph2, "Repeated reads should return the cached graph"
# --- Batch Operations ---
@pytest.mark.asyncio
async def test_execute_batch(backend: JsonlBackend):
# Create an initial entity.
await backend.create_entities(
[Entity(name="Alice", entityType="person", observations=["obs"])]
)
operations = [
BatchOperation(
operation_type=BatchOperationType.CREATE_ENTITIES,
data={
"entities": [
Entity(name="Bob", entityType="person", observations=["obs2"])
]
},
),
BatchOperation(
operation_type=BatchOperationType.CREATE_RELATIONS,
data={
"relations": [Relation(from_="Alice", to="Bob", relationType="knows")]
},
),
BatchOperation(
operation_type=BatchOperationType.ADD_OBSERVATIONS,
data={"observations_map": {"Alice": ["new_obs"]}},
),
]
result: BatchResult = await backend.execute_batch(operations)
print(result)
assert result.success, "Batch operations should succeed"
graph = await backend.read_graph()
assert any(e.name == "Bob" for e in graph.entities)
assert len(graph.relations) == 1
alice = next(e for e in graph.entities if e.name == "Alice")
assert "new_obs" in alice.observations
@pytest.mark.asyncio
async def test_execute_batch_failure(backend: JsonlBackend):
# Create an initial entity.
await backend.create_entities(
[Entity(name="Alice", entityType="person", observations=["obs"])]
)
operations = [
BatchOperation(
operation_type=BatchOperationType.CREATE_RELATIONS,
data={
"relations": [
Relation(from_="Alice", to="NonExistent", relationType="knows")
]
},
),
]
result: BatchResult = await backend.execute_batch(operations)
assert (
not result.success
), "Batch operation should fail if a relation refers to a non-existent entity"
# Verify that rollback occurred (no partial changes).
graph = await backend.read_graph()
assert len(graph.entities) == 1
assert len(graph.relations) == 0
```
--------------------------------------------------------------------------------
/memory_mcp_server/backends/jsonl.py:
--------------------------------------------------------------------------------
```python
import asyncio
import json
import time
from collections import defaultdict
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, List, Optional, Set, Tuple, cast
import aiofiles
from thefuzz import fuzz
from ..exceptions import EntityNotFoundError, FileAccessError
from ..interfaces import (
BatchOperation,
BatchOperationType,
BatchResult,
Entity,
KnowledgeGraph,
Relation,
SearchOptions,
)
from .base import Backend
@dataclass
class SearchResult:
entity: Entity
score: float
class ReentrantLock:
def __init__(self):
self._lock = asyncio.Lock()
self._owner = None
self._count = 0
async def acquire(self):
current = asyncio.current_task()
if self._owner == current:
self._count += 1
return
await self._lock.acquire()
self._owner = current
self._count = 1
def release(self):
current = asyncio.current_task()
if self._owner != current:
raise RuntimeError("Lock not owned by current task")
self._count -= 1
if self._count == 0:
self._owner = None
self._lock.release()
async def __aenter__(self):
await self.acquire()
return self
async def __aexit__(self, exc_type, exc_val, tb):
self.release()
class JsonlBackend(Backend):
def __init__(self, memory_path: Path, cache_ttl: int = 60):
self.memory_path = memory_path
self.cache_ttl = cache_ttl
self._cache: Optional[KnowledgeGraph] = None
self._cache_timestamp: float = 0.0
self._cache_file_mtime: float = 0.0
self._dirty = False
self._write_lock = ReentrantLock()
self._lock = asyncio.Lock()
# Transaction support: when a transaction is active, we work on separate copies.
self._transaction_cache: Optional[KnowledgeGraph] = None
self._transaction_indices: Optional[Dict[str, Any]] = None
self._in_transaction = False
self._indices: Dict[str, Any] = {
"entity_names": {},
"entity_types": defaultdict(list),
"relations_from": defaultdict(list),
"relations_to": defaultdict(list),
"relation_keys": set(),
"observation_index": defaultdict(set),
}
async def initialize(self) -> None:
self.memory_path.parent.mkdir(parents=True, exist_ok=True)
if self.memory_path.exists() and self.memory_path.is_dir():
raise FileAccessError(f"Path {self.memory_path} is a directory")
async def close(self) -> None:
await self.flush()
def _build_indices(self, graph: KnowledgeGraph) -> None:
# Build indices for faster lookups.
entity_names: Dict[str, Entity] = {}
entity_types: Dict[str, List[Entity]] = defaultdict(list)
relations_from: Dict[str, List[Relation]] = defaultdict(list)
relations_to: Dict[str, List[Relation]] = defaultdict(list)
relation_keys: Set[Tuple[str, str, str]] = set()
for entity in graph.entities:
entity_names[entity.name] = entity
entity_types[entity.entityType].append(entity)
for relation in graph.relations:
relations_from[relation.from_].append(relation)
relations_to[relation.to].append(relation)
relation_keys.add((relation.from_, relation.to, relation.relationType))
self._indices["entity_names"] = entity_names
self._indices["entity_types"] = entity_types
self._indices["relations_from"] = relations_from
self._indices["relations_to"] = relations_to
self._indices["relation_keys"] = relation_keys
# Build the observation index.
observation_index = cast(
Dict[str, Set[str]], self._indices["observation_index"]
)
observation_index.clear()
for entity in graph.entities:
for obs in entity.observations:
for word in obs.lower().split():
observation_index[word].add(entity.name)
async def _check_cache(self) -> KnowledgeGraph:
# During a transaction, always use the transaction snapshot.
if self._in_transaction:
return self._transaction_cache # type: ignore
current_time = time.monotonic()
file_mtime = (
self.memory_path.stat().st_mtime if self.memory_path.exists() else 0
)
needs_refresh = (
self._cache is None
or (current_time - self._cache_timestamp > self.cache_ttl)
or self._dirty
or (file_mtime > self._cache_file_mtime)
)
if needs_refresh:
async with self._lock:
current_time = time.monotonic()
file_mtime = (
self.memory_path.stat().st_mtime if self.memory_path.exists() else 0
)
needs_refresh = (
self._cache is None
or (current_time - self._cache_timestamp > self.cache_ttl)
or self._dirty
or (file_mtime > self._cache_file_mtime)
)
if needs_refresh:
try:
graph = await self._load_graph_from_file()
self._cache = graph
self._cache_timestamp = current_time
self._cache_file_mtime = file_mtime
self._build_indices(graph)
self._dirty = False
except FileAccessError:
raise
except Exception as e:
raise FileAccessError(f"Error loading graph: {str(e)}") from e
return cast(KnowledgeGraph, self._cache)
async def _load_graph_from_file(self) -> KnowledgeGraph:
if not self.memory_path.exists():
return KnowledgeGraph(entities=[], relations=[])
graph = KnowledgeGraph(entities=[], relations=[])
try:
async with aiofiles.open(self.memory_path, mode="r", encoding="utf-8") as f:
async for line in f:
line = line.strip()
if not line:
continue
try:
item = json.loads(line)
if item["type"] == "entity":
graph.entities.append(
Entity(
name=item["name"],
entityType=item["entityType"],
observations=item["observations"],
)
)
elif item["type"] == "relation":
graph.relations.append(
Relation(
from_=item["from"],
to=item["to"],
relationType=item["relationType"],
)
)
except json.JSONDecodeError as e:
raise FileAccessError(f"Error loading graph: {str(e)}") from e
except KeyError as e:
raise FileAccessError(
f"Error loading graph: Missing required key {str(e)}"
) from e
return graph
except Exception as err:
raise FileAccessError(f"Error reading file: {str(err)}") from err
async def _save_graph(self, graph: KnowledgeGraph) -> None:
# This function writes to disk. Note that during a transaction, it is only called on commit.
temp_path = self.memory_path.with_suffix(".tmp")
buffer_size = 1000 # Buffer size (number of lines)
try:
async with aiofiles.open(temp_path, mode="w", encoding="utf-8") as f:
buffer = []
# Write entities.
for entity in graph.entities:
line = json.dumps(
{
"type": "entity",
"name": entity.name,
"entityType": entity.entityType,
"observations": entity.observations,
}
)
buffer.append(line)
if len(buffer) >= buffer_size:
await f.write("\n".join(buffer) + "\n")
buffer = []
if buffer:
await f.write("\n".join(buffer) + "\n")
buffer = []
# Write relations.
for relation in graph.relations:
line = json.dumps(
{
"type": "relation",
"from": relation.from_,
"to": relation.to,
"relationType": relation.relationType,
}
)
buffer.append(line)
if len(buffer) >= buffer_size:
await f.write("\n".join(buffer) + "\n")
buffer = []
if buffer:
await f.write("\n".join(buffer) + "\n")
temp_path.replace(self.memory_path)
except Exception as err:
raise FileAccessError(f"Error saving file: {str(err)}") from err
finally:
if temp_path.exists():
try:
temp_path.unlink()
except Exception:
pass
async def _get_current_state(self) -> Tuple[KnowledgeGraph, Dict[str, Any]]:
# Returns the active graph and indices. If a transaction is in progress,
# return the transaction copies; otherwise, return the persistent ones.
if self._in_transaction:
return self._transaction_cache, self._transaction_indices # type: ignore
else:
graph = await self._check_cache()
return graph, self._indices
async def create_entities(self, entities: List[Entity]) -> List[Entity]:
async with self._write_lock:
graph, indices = await self._get_current_state()
existing_entities = cast(Dict[str, Entity], indices["entity_names"])
new_entities = []
for entity in entities:
if not entity.name or not entity.entityType:
raise ValueError(f"Invalid entity: {entity}")
if entity.name not in existing_entities:
new_entities.append(entity)
existing_entities[entity.name] = entity
cast(Dict[str, List[Entity]], indices["entity_types"]).setdefault(
entity.entityType, []
).append(entity)
if new_entities:
graph.entities.extend(new_entities)
# If not in a transaction, immediately persist the change.
if not self._in_transaction:
self._dirty = True
await self._save_graph(graph)
self._dirty = False
self._cache_timestamp = time.monotonic()
return new_entities
async def delete_entities(self, entity_names: List[str]) -> List[str]:
if not entity_names:
return []
async with self._write_lock:
graph, indices = await self._get_current_state()
existing_entities = cast(Dict[str, Entity], indices["entity_names"])
deleted_names = []
relation_keys = cast(Set[Tuple[str, str, str]], indices["relation_keys"])
for name in entity_names:
if name in existing_entities:
entity = existing_entities.pop(name)
entity_type_list = cast(
Dict[str, List[Entity]], indices["entity_types"]
).get(entity.entityType, [])
if entity in entity_type_list:
entity_type_list.remove(entity)
# Remove associated relations.
relations_from = cast(
Dict[str, List[Relation]], indices["relations_from"]
).get(name, [])
relations_to = cast(
Dict[str, List[Relation]], indices["relations_to"]
).get(name, [])
relations_to_remove = relations_from + relations_to
for relation in relations_to_remove:
if relation in graph.relations:
graph.relations.remove(relation)
relation_keys.discard(
(relation.from_, relation.to, relation.relationType)
)
if relation in cast(
Dict[str, List[Relation]], indices["relations_from"]
).get(relation.from_, []):
cast(Dict[str, List[Relation]], indices["relations_from"])[
relation.from_
].remove(relation)
if relation in cast(
Dict[str, List[Relation]], indices["relations_to"]
).get(relation.to, []):
cast(Dict[str, List[Relation]], indices["relations_to"])[
relation.to
].remove(relation)
deleted_names.append(name)
if deleted_names:
graph.entities = [
e for e in graph.entities if e.name not in deleted_names
]
if not self._in_transaction:
self._dirty = True
await self._save_graph(graph)
self._dirty = False
self._cache_timestamp = time.monotonic()
return deleted_names
async def create_relations(self, relations: List[Relation]) -> List[Relation]:
async with self._write_lock:
graph, indices = await self._get_current_state()
existing_entities = cast(Dict[str, Entity], indices["entity_names"])
relation_keys = cast(Set[Tuple[str, str, str]], indices["relation_keys"])
new_relations = []
for relation in relations:
if not relation.from_ or not relation.to or not relation.relationType:
raise ValueError(f"Invalid relation: {relation}")
if relation.from_ not in existing_entities:
raise EntityNotFoundError(f"Entity not found: {relation.from_}")
if relation.to not in existing_entities:
raise EntityNotFoundError(f"Entity not found: {relation.to}")
key = (relation.from_, relation.to, relation.relationType)
if key not in relation_keys:
new_relations.append(relation)
relation_keys.add(key)
cast(
Dict[str, List[Relation]], indices["relations_from"]
).setdefault(relation.from_, []).append(relation)
cast(Dict[str, List[Relation]], indices["relations_to"]).setdefault(
relation.to, []
).append(relation)
if new_relations:
graph.relations.extend(new_relations)
if not self._in_transaction:
self._dirty = True
await self._save_graph(graph)
self._dirty = False
self._cache_timestamp = time.monotonic()
return new_relations
async def delete_relations(self, from_: str, to: str) -> None:
async with self._write_lock:
graph, indices = await self._get_current_state()
existing_entities = cast(Dict[str, Entity], indices["entity_names"])
if from_ not in existing_entities:
raise EntityNotFoundError(f"Entity not found: {from_}")
if to not in existing_entities:
raise EntityNotFoundError(f"Entity not found: {to}")
relations_from = cast(
Dict[str, List[Relation]], indices["relations_from"]
).get(from_, [])
relations_to_remove = [rel for rel in relations_from if rel.to == to]
if relations_to_remove:
graph.relations = [
rel for rel in graph.relations if rel not in relations_to_remove
]
relation_keys = cast(
Set[Tuple[str, str, str]], indices["relation_keys"]
)
for rel in relations_to_remove:
relation_keys.discard((rel.from_, rel.to, rel.relationType))
if rel in cast(
Dict[str, List[Relation]], indices["relations_from"]
).get(from_, []):
cast(Dict[str, List[Relation]], indices["relations_from"])[
from_
].remove(rel)
if rel in cast(
Dict[str, List[Relation]], indices["relations_to"]
).get(to, []):
cast(Dict[str, List[Relation]], indices["relations_to"])[
to
].remove(rel)
if not self._in_transaction:
self._dirty = True
await self._save_graph(graph)
self._dirty = False
self._cache_timestamp = time.monotonic()
async def read_graph(self) -> KnowledgeGraph:
return await self._check_cache()
async def flush(self) -> None:
async with self._write_lock:
# During a transaction, disk is not touched until commit.
if self._dirty and not self._in_transaction:
graph = await self._check_cache()
await self._save_graph(graph)
self._dirty = False
self._cache_timestamp = time.monotonic()
async def search_nodes(
self, query: str, options: Optional[SearchOptions] = None
) -> KnowledgeGraph:
"""
Search for entities and relations matching the query.
If options is provided and options.fuzzy is True, fuzzy matching is used with weights and threshold.
Otherwise, a simple case‐insensitive substring search is performed.
Relations are returned only if both endpoints are in the set of matched entities.
"""
graph = await self._check_cache()
matched_entities = []
if options is not None and options.fuzzy:
# Use provided weights or default to 1.0 if not provided.
weights = (
options.weights
if options.weights is not None
else {"name": 1.0, "type": 1.0, "observations": 1.0}
)
q = query.strip()
for entity in graph.entities:
# Compute robust scores for each field.
name_score = fuzz.WRatio(q, entity.name)
type_score = fuzz.WRatio(q, entity.entityType)
obs_score = 0
if entity.observations:
# For each observation, take the best between WRatio and partial_ratio.
scores = [
max(fuzz.WRatio(q, obs), fuzz.partial_ratio(q, obs))
for obs in entity.observations
]
obs_score = max(scores) if scores else 0
total_score = (
name_score * weights.get("name", 1.0)
+ type_score * weights.get("type", 1.0)
+ obs_score * weights.get("observations", 1.0)
)
if total_score >= options.threshold:
matched_entities.append(entity)
else:
q = query.lower()
for entity in graph.entities:
if (
q in entity.name.lower()
or q in entity.entityType.lower()
or any(q in obs.lower() for obs in entity.observations)
):
matched_entities.append(entity)
matched_names = {entity.name for entity in matched_entities}
matched_relations = [
rel
for rel in graph.relations
if rel.from_ in matched_names and rel.to in matched_names
]
return KnowledgeGraph(entities=matched_entities, relations=matched_relations)
async def add_observations(self, entity_name: str, observations: List[str]) -> None:
if not observations:
raise ValueError("Observations list cannot be empty")
async with self._write_lock:
graph, indices = await self._get_current_state()
existing_entities = cast(Dict[str, Entity], indices["entity_names"])
if entity_name not in existing_entities:
raise EntityNotFoundError(f"Entity not found: {entity_name}")
entity = existing_entities[entity_name]
updated_entity = Entity(
name=entity.name,
entityType=entity.entityType,
observations=list(entity.observations) + observations,
)
graph.entities = [
updated_entity if e.name == entity_name else e for e in graph.entities
]
existing_entities[entity_name] = updated_entity
entity_types = cast(Dict[str, List[Entity]], indices["entity_types"])
if entity_name in [
e.name for e in entity_types.get(updated_entity.entityType, [])
]:
entity_types[updated_entity.entityType] = [
updated_entity if e.name == entity_name else e
for e in entity_types[updated_entity.entityType]
]
if not self._in_transaction:
self._dirty = True
await self._save_graph(graph)
self._dirty = False
self._cache_timestamp = time.monotonic()
async def add_batch_observations(
self, observations_map: Dict[str, List[str]]
) -> None:
if not observations_map:
raise ValueError("Observations map cannot be empty")
async with self._write_lock:
graph, indices = await self._get_current_state()
existing_entities = cast(Dict[str, Entity], indices["entity_names"])
entity_types = cast(Dict[str, List[Entity]], indices["entity_types"])
missing_entities = [
name for name in observations_map if name not in existing_entities
]
if missing_entities:
raise EntityNotFoundError(
f"Entities not found: {', '.join(missing_entities)}"
)
updated_entities = {}
for entity_name, observations in observations_map.items():
if not observations:
continue
entity = existing_entities[entity_name]
updated_entity = Entity(
name=entity.name,
entityType=entity.entityType,
observations=list(entity.observations) + observations,
)
updated_entities[entity_name] = updated_entity
if updated_entities:
graph.entities = [
updated_entities.get(e.name, e) for e in graph.entities
]
for updated_entity in updated_entities.values():
existing_entities[updated_entity.name] = updated_entity
et_list = entity_types.get(updated_entity.entityType, [])
for i, e in enumerate(et_list):
if e.name == updated_entity.name:
et_list[i] = updated_entity
break
if not self._in_transaction:
self._dirty = True
await self._save_graph(graph)
self._dirty = False
self._cache_timestamp = time.monotonic()
#
# Transaction Methods
#
async def begin_transaction(self) -> None:
async with self._write_lock:
if self._in_transaction:
raise ValueError("Transaction already in progress")
graph = await self._check_cache()
# Make deep (shallow for immutable entities) copies of state.
self._transaction_cache = KnowledgeGraph(
entities=list(graph.entities), relations=list(graph.relations)
)
self._transaction_indices = {
"entity_names": dict(self._indices["entity_names"]),
"entity_types": defaultdict(
list, {k: list(v) for k, v in self._indices["entity_types"].items()}
),
"relations_from": defaultdict(
list,
{k: list(v) for k, v in self._indices["relations_from"].items()},
),
"relations_to": defaultdict(
list, {k: list(v) for k, v in self._indices["relations_to"].items()}
),
"relation_keys": set(self._indices["relation_keys"]),
"observation_index": defaultdict(
set,
{k: set(v) for k, v in self._indices["observation_index"].items()},
),
}
self._in_transaction = True
async def rollback_transaction(self) -> None:
async with self._write_lock:
if not self._in_transaction:
raise ValueError("No transaction in progress")
# Discard the transaction state; since disk writes were deferred, the file remains unchanged.
self._transaction_cache = None
self._transaction_indices = None
self._in_transaction = False
async def commit_transaction(self) -> None:
async with self._write_lock:
if not self._in_transaction:
raise ValueError("No transaction in progress")
# Persist the transaction state to disk.
await self._save_graph(cast(KnowledgeGraph, self._transaction_cache))
# Update the persistent state with the transaction snapshot.
self._cache = self._transaction_cache
self._indices = self._transaction_indices # type: ignore
self._transaction_cache = None
self._transaction_indices = None
self._in_transaction = False
self._dirty = False
self._cache_timestamp = time.monotonic()
async def execute_batch(self, operations: List[BatchOperation]) -> BatchResult:
if not operations:
return BatchResult(
success=True,
operations_completed=0,
failed_operations=[],
)
async with self._write_lock:
try:
# Start a transaction so that no disk writes occur until commit.
await self.begin_transaction()
completed = 0
failed_ops: List[Tuple[BatchOperation, str]] = []
# Execute each operation.
for operation in operations:
try:
if (
operation.operation_type
== BatchOperationType.CREATE_ENTITIES
):
await self.create_entities(operation.data["entities"])
elif (
operation.operation_type
== BatchOperationType.DELETE_ENTITIES
):
await self.delete_entities(operation.data["entity_names"])
elif (
operation.operation_type
== BatchOperationType.CREATE_RELATIONS
):
await self.create_relations(operation.data["relations"])
elif (
operation.operation_type
== BatchOperationType.DELETE_RELATIONS
):
await self.delete_relations(
operation.data["from_"], operation.data["to"]
)
elif (
operation.operation_type
== BatchOperationType.ADD_OBSERVATIONS
):
await self.add_batch_observations(
operation.data["observations_map"]
)
else:
raise ValueError(
f"Unknown operation type: {operation.operation_type}"
)
completed += 1
except Exception as e:
failed_ops.append((operation, str(e)))
if not operation.data.get("allow_partial", False):
# On failure, rollback and return.
await self.rollback_transaction()
return BatchResult(
success=False,
operations_completed=completed,
failed_operations=failed_ops,
error_message=f"Operation failed: {str(e)}",
)
# Commit the transaction (persisting all changes) or report partial success.
await self.commit_transaction()
if failed_ops:
return BatchResult(
success=True,
operations_completed=completed,
failed_operations=failed_ops,
error_message="Some operations failed",
)
else:
return BatchResult(
success=True,
operations_completed=completed,
failed_operations=[],
)
except Exception as e:
if self._in_transaction:
await self.rollback_transaction()
return BatchResult(
success=False,
operations_completed=0,
failed_operations=[],
error_message=f"Batch execution failed: {str(e)}",
)
```