This is page 7 of 17. Use http://codebase.md/basicmachines-co/basic-memory?lines=false&page={x} to view the full context.
# Directory Structure
```
├── .claude
│ ├── agents
│ │ ├── python-developer.md
│ │ └── system-architect.md
│ └── commands
│ ├── release
│ │ ├── beta.md
│ │ ├── changelog.md
│ │ ├── release-check.md
│ │ └── release.md
│ ├── spec.md
│ └── test-live.md
├── .dockerignore
├── .github
│ ├── dependabot.yml
│ ├── ISSUE_TEMPLATE
│ │ ├── bug_report.md
│ │ ├── config.yml
│ │ ├── documentation.md
│ │ └── feature_request.md
│ └── workflows
│ ├── claude-code-review.yml
│ ├── claude-issue-triage.yml
│ ├── claude.yml
│ ├── dev-release.yml
│ ├── docker.yml
│ ├── pr-title.yml
│ ├── release.yml
│ └── test.yml
├── .gitignore
├── .python-version
├── CHANGELOG.md
├── CITATION.cff
├── CLA.md
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── docker-compose.yml
├── Dockerfile
├── docs
│ ├── ai-assistant-guide-extended.md
│ ├── character-handling.md
│ ├── cloud-cli.md
│ └── Docker.md
├── justfile
├── LICENSE
├── llms-install.md
├── pyproject.toml
├── README.md
├── SECURITY.md
├── smithery.yaml
├── specs
│ ├── SPEC-1 Specification-Driven Development Process.md
│ ├── SPEC-10 Unified Deployment Workflow and Event Tracking.md
│ ├── SPEC-11 Basic Memory API Performance Optimization.md
│ ├── SPEC-12 OpenTelemetry Observability.md
│ ├── SPEC-13 CLI Authentication with Subscription Validation.md
│ ├── SPEC-14 Cloud Git Versioning & GitHub Backup.md
│ ├── SPEC-14- Cloud Git Versioning & GitHub Backup.md
│ ├── SPEC-15 Configuration Persistence via Tigris for Cloud Tenants.md
│ ├── SPEC-16 MCP Cloud Service Consolidation.md
│ ├── SPEC-17 Semantic Search with ChromaDB.md
│ ├── SPEC-18 AI Memory Management Tool.md
│ ├── SPEC-19 Sync Performance and Memory Optimization.md
│ ├── SPEC-2 Slash Commands Reference.md
│ ├── SPEC-3 Agent Definitions.md
│ ├── SPEC-4 Notes Web UI Component Architecture.md
│ ├── SPEC-5 CLI Cloud Upload via WebDAV.md
│ ├── SPEC-6 Explicit Project Parameter Architecture.md
│ ├── SPEC-7 POC to spike Tigris Turso for local access to cloud data.md
│ ├── SPEC-8 TigrisFS Integration.md
│ ├── SPEC-9 Multi-Project Bidirectional Sync Architecture.md
│ ├── SPEC-9 Signed Header Tenant Information.md
│ └── SPEC-9-1 Follow-Ups- Conflict, Sync, and Observability.md
├── src
│ └── basic_memory
│ ├── __init__.py
│ ├── alembic
│ │ ├── alembic.ini
│ │ ├── env.py
│ │ ├── migrations.py
│ │ ├── script.py.mako
│ │ └── versions
│ │ ├── 3dae7c7b1564_initial_schema.py
│ │ ├── 502b60eaa905_remove_required_from_entity_permalink.py
│ │ ├── 5fe1ab1ccebe_add_projects_table.py
│ │ ├── 647e7a75e2cd_project_constraint_fix.py
│ │ ├── 9d9c1cb7d8f5_add_mtime_and_size_columns_to_entity_.py
│ │ ├── a1b2c3d4e5f6_fix_project_foreign_keys.py
│ │ ├── b3c3938bacdb_relation_to_name_unique_index.py
│ │ ├── cc7172b46608_update_search_index_schema.py
│ │ └── e7e1f4367280_add_scan_watermark_tracking_to_project.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── app.py
│ │ ├── routers
│ │ │ ├── __init__.py
│ │ │ ├── directory_router.py
│ │ │ ├── importer_router.py
│ │ │ ├── knowledge_router.py
│ │ │ ├── management_router.py
│ │ │ ├── memory_router.py
│ │ │ ├── project_router.py
│ │ │ ├── prompt_router.py
│ │ │ ├── resource_router.py
│ │ │ ├── search_router.py
│ │ │ └── utils.py
│ │ └── template_loader.py
│ ├── cli
│ │ ├── __init__.py
│ │ ├── app.py
│ │ ├── auth.py
│ │ ├── commands
│ │ │ ├── __init__.py
│ │ │ ├── cloud
│ │ │ │ ├── __init__.py
│ │ │ │ ├── api_client.py
│ │ │ │ ├── bisync_commands.py
│ │ │ │ ├── cloud_utils.py
│ │ │ │ ├── core_commands.py
│ │ │ │ ├── mount_commands.py
│ │ │ │ ├── rclone_config.py
│ │ │ │ ├── rclone_installer.py
│ │ │ │ ├── upload_command.py
│ │ │ │ └── upload.py
│ │ │ ├── command_utils.py
│ │ │ ├── db.py
│ │ │ ├── import_chatgpt.py
│ │ │ ├── import_claude_conversations.py
│ │ │ ├── import_claude_projects.py
│ │ │ ├── import_memory_json.py
│ │ │ ├── mcp.py
│ │ │ ├── project.py
│ │ │ ├── status.py
│ │ │ ├── sync.py
│ │ │ └── tool.py
│ │ └── main.py
│ ├── config.py
│ ├── db.py
│ ├── deps.py
│ ├── file_utils.py
│ ├── ignore_utils.py
│ ├── importers
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── chatgpt_importer.py
│ │ ├── claude_conversations_importer.py
│ │ ├── claude_projects_importer.py
│ │ ├── memory_json_importer.py
│ │ └── utils.py
│ ├── markdown
│ │ ├── __init__.py
│ │ ├── entity_parser.py
│ │ ├── markdown_processor.py
│ │ ├── plugins.py
│ │ ├── schemas.py
│ │ └── utils.py
│ ├── mcp
│ │ ├── __init__.py
│ │ ├── async_client.py
│ │ ├── project_context.py
│ │ ├── prompts
│ │ │ ├── __init__.py
│ │ │ ├── ai_assistant_guide.py
│ │ │ ├── continue_conversation.py
│ │ │ ├── recent_activity.py
│ │ │ ├── search.py
│ │ │ └── utils.py
│ │ ├── resources
│ │ │ ├── ai_assistant_guide.md
│ │ │ └── project_info.py
│ │ ├── server.py
│ │ └── tools
│ │ ├── __init__.py
│ │ ├── build_context.py
│ │ ├── canvas.py
│ │ ├── chatgpt_tools.py
│ │ ├── delete_note.py
│ │ ├── edit_note.py
│ │ ├── list_directory.py
│ │ ├── move_note.py
│ │ ├── project_management.py
│ │ ├── read_content.py
│ │ ├── read_note.py
│ │ ├── recent_activity.py
│ │ ├── search.py
│ │ ├── utils.py
│ │ ├── view_note.py
│ │ └── write_note.py
│ ├── models
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── knowledge.py
│ │ ├── project.py
│ │ └── search.py
│ ├── repository
│ │ ├── __init__.py
│ │ ├── entity_repository.py
│ │ ├── observation_repository.py
│ │ ├── project_info_repository.py
│ │ ├── project_repository.py
│ │ ├── relation_repository.py
│ │ ├── repository.py
│ │ └── search_repository.py
│ ├── schemas
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── cloud.py
│ │ ├── delete.py
│ │ ├── directory.py
│ │ ├── importer.py
│ │ ├── memory.py
│ │ ├── project_info.py
│ │ ├── prompt.py
│ │ ├── request.py
│ │ ├── response.py
│ │ ├── search.py
│ │ └── sync_report.py
│ ├── services
│ │ ├── __init__.py
│ │ ├── context_service.py
│ │ ├── directory_service.py
│ │ ├── entity_service.py
│ │ ├── exceptions.py
│ │ ├── file_service.py
│ │ ├── initialization.py
│ │ ├── link_resolver.py
│ │ ├── project_service.py
│ │ ├── search_service.py
│ │ └── service.py
│ ├── sync
│ │ ├── __init__.py
│ │ ├── background_sync.py
│ │ ├── sync_service.py
│ │ └── watch_service.py
│ ├── templates
│ │ └── prompts
│ │ ├── continue_conversation.hbs
│ │ └── search.hbs
│ └── utils.py
├── test-int
│ ├── BENCHMARKS.md
│ ├── cli
│ │ ├── test_project_commands_integration.py
│ │ ├── test_sync_commands_integration.py
│ │ └── test_version_integration.py
│ ├── conftest.py
│ ├── mcp
│ │ ├── test_build_context_underscore.py
│ │ ├── test_build_context_validation.py
│ │ ├── test_chatgpt_tools_integration.py
│ │ ├── test_default_project_mode_integration.py
│ │ ├── test_delete_note_integration.py
│ │ ├── test_edit_note_integration.py
│ │ ├── test_list_directory_integration.py
│ │ ├── test_move_note_integration.py
│ │ ├── test_project_management_integration.py
│ │ ├── test_project_state_sync_integration.py
│ │ ├── test_read_content_integration.py
│ │ ├── test_read_note_integration.py
│ │ ├── test_search_integration.py
│ │ ├── test_single_project_mcp_integration.py
│ │ └── test_write_note_integration.py
│ ├── test_db_wal_mode.py
│ ├── test_disable_permalinks_integration.py
│ └── test_sync_performance_benchmark.py
├── tests
│ ├── __init__.py
│ ├── api
│ │ ├── conftest.py
│ │ ├── test_async_client.py
│ │ ├── test_continue_conversation_template.py
│ │ ├── test_directory_router.py
│ │ ├── test_importer_router.py
│ │ ├── test_knowledge_router.py
│ │ ├── test_management_router.py
│ │ ├── test_memory_router.py
│ │ ├── test_project_router_operations.py
│ │ ├── test_project_router.py
│ │ ├── test_prompt_router.py
│ │ ├── test_relation_background_resolution.py
│ │ ├── test_resource_router.py
│ │ ├── test_search_router.py
│ │ ├── test_search_template.py
│ │ ├── test_template_loader_helpers.py
│ │ └── test_template_loader.py
│ ├── cli
│ │ ├── conftest.py
│ │ ├── test_bisync_commands.py
│ │ ├── test_cli_tools.py
│ │ ├── test_cloud_authentication.py
│ │ ├── test_cloud_utils.py
│ │ ├── test_ignore_utils.py
│ │ ├── test_import_chatgpt.py
│ │ ├── test_import_claude_conversations.py
│ │ ├── test_import_claude_projects.py
│ │ ├── test_import_memory_json.py
│ │ └── test_upload.py
│ ├── conftest.py
│ ├── db
│ │ └── test_issue_254_foreign_key_constraints.py
│ ├── importers
│ │ ├── test_importer_base.py
│ │ └── test_importer_utils.py
│ ├── markdown
│ │ ├── __init__.py
│ │ ├── test_date_frontmatter_parsing.py
│ │ ├── test_entity_parser_error_handling.py
│ │ ├── test_entity_parser.py
│ │ ├── test_markdown_plugins.py
│ │ ├── test_markdown_processor.py
│ │ ├── test_observation_edge_cases.py
│ │ ├── test_parser_edge_cases.py
│ │ ├── test_relation_edge_cases.py
│ │ └── test_task_detection.py
│ ├── mcp
│ │ ├── conftest.py
│ │ ├── test_obsidian_yaml_formatting.py
│ │ ├── test_permalink_collision_file_overwrite.py
│ │ ├── test_prompts.py
│ │ ├── test_resources.py
│ │ ├── test_tool_build_context.py
│ │ ├── test_tool_canvas.py
│ │ ├── test_tool_delete_note.py
│ │ ├── test_tool_edit_note.py
│ │ ├── test_tool_list_directory.py
│ │ ├── test_tool_move_note.py
│ │ ├── test_tool_read_content.py
│ │ ├── test_tool_read_note.py
│ │ ├── test_tool_recent_activity.py
│ │ ├── test_tool_resource.py
│ │ ├── test_tool_search.py
│ │ ├── test_tool_utils.py
│ │ ├── test_tool_view_note.py
│ │ ├── test_tool_write_note.py
│ │ └── tools
│ │ └── test_chatgpt_tools.py
│ ├── Non-MarkdownFileSupport.pdf
│ ├── repository
│ │ ├── test_entity_repository_upsert.py
│ │ ├── test_entity_repository.py
│ │ ├── test_entity_upsert_issue_187.py
│ │ ├── test_observation_repository.py
│ │ ├── test_project_info_repository.py
│ │ ├── test_project_repository.py
│ │ ├── test_relation_repository.py
│ │ ├── test_repository.py
│ │ ├── test_search_repository_edit_bug_fix.py
│ │ └── test_search_repository.py
│ ├── schemas
│ │ ├── test_base_timeframe_minimum.py
│ │ ├── test_memory_serialization.py
│ │ ├── test_memory_url_validation.py
│ │ ├── test_memory_url.py
│ │ ├── test_schemas.py
│ │ └── test_search.py
│ ├── Screenshot.png
│ ├── services
│ │ ├── test_context_service.py
│ │ ├── test_directory_service.py
│ │ ├── test_entity_service_disable_permalinks.py
│ │ ├── test_entity_service.py
│ │ ├── test_file_service.py
│ │ ├── test_initialization.py
│ │ ├── test_link_resolver.py
│ │ ├── test_project_removal_bug.py
│ │ ├── test_project_service_operations.py
│ │ ├── test_project_service.py
│ │ └── test_search_service.py
│ ├── sync
│ │ ├── test_character_conflicts.py
│ │ ├── test_sync_service_incremental.py
│ │ ├── test_sync_service.py
│ │ ├── test_sync_wikilink_issue.py
│ │ ├── test_tmp_files.py
│ │ ├── test_watch_service_edge_cases.py
│ │ ├── test_watch_service_reload.py
│ │ └── test_watch_service.py
│ ├── test_config.py
│ ├── test_db_migration_deduplication.py
│ ├── test_deps.py
│ ├── test_production_cascade_delete.py
│ └── utils
│ ├── test_file_utils.py
│ ├── test_frontmatter_obsidian_compatible.py
│ ├── test_parse_tags.py
│ ├── test_permalink_formatting.py
│ ├── test_utf8_handling.py
│ └── test_validate_project_path.py
├── uv.lock
├── v0.15.0-RELEASE-DOCS.md
└── v15-docs
├── api-performance.md
├── background-relations.md
├── basic-memory-home.md
├── bug-fixes.md
├── chatgpt-integration.md
├── cloud-authentication.md
├── cloud-bisync.md
├── cloud-mode-usage.md
├── cloud-mount.md
├── default-project-mode.md
├── env-file-removal.md
├── env-var-overrides.md
├── explicit-project-parameter.md
├── gitignore-integration.md
├── project-root-env-var.md
├── README.md
└── sqlite-performance.md
```
# Files
--------------------------------------------------------------------------------
/src/basic_memory/services/directory_service.py:
--------------------------------------------------------------------------------
```python
"""Directory service for managing file directories and tree structure."""
import fnmatch
import logging
import os
from typing import Dict, List, Optional, Sequence
from basic_memory.models import Entity
from basic_memory.repository import EntityRepository
from basic_memory.schemas.directory import DirectoryNode
logger = logging.getLogger(__name__)
class DirectoryService:
"""Service for working with directory trees."""
def __init__(self, entity_repository: EntityRepository):
"""Initialize the directory service.
Args:
entity_repository: Directory repository for data access.
"""
self.entity_repository = entity_repository
async def get_directory_tree(self) -> DirectoryNode:
"""Build a hierarchical directory tree from indexed files."""
# Get all files from DB (flat list)
entity_rows = await self.entity_repository.find_all()
# Create a root directory node
root_node = DirectoryNode(name="Root", directory_path="/", type="directory")
# Map to store directory nodes by path for easy lookup
dir_map: Dict[str, DirectoryNode] = {root_node.directory_path: root_node}
# First pass: create all directory nodes
for file in entity_rows:
# Process directory path components
parts = [p for p in file.file_path.split("/") if p]
# Create directory structure
current_path = "/"
for i, part in enumerate(parts[:-1]): # Skip the filename
parent_path = current_path
# Build the directory path
current_path = (
f"{current_path}{part}" if current_path == "/" else f"{current_path}/{part}"
)
# Create directory node if it doesn't exist
if current_path not in dir_map:
dir_node = DirectoryNode(
name=part, directory_path=current_path, type="directory"
)
dir_map[current_path] = dir_node
# Add to parent's children
if parent_path in dir_map:
dir_map[parent_path].children.append(dir_node)
# Second pass: add file nodes to their parent directories
for file in entity_rows:
file_name = os.path.basename(file.file_path)
parent_dir = os.path.dirname(file.file_path)
directory_path = "/" if parent_dir == "" else f"/{parent_dir}"
# Create file node
file_node = DirectoryNode(
name=file_name,
file_path=file.file_path, # Original path from DB (no leading slash)
directory_path=f"/{file.file_path}", # Path with leading slash
type="file",
title=file.title,
permalink=file.permalink,
entity_id=file.id,
entity_type=file.entity_type,
content_type=file.content_type,
updated_at=file.updated_at,
)
# Add to parent directory's children
if directory_path in dir_map:
dir_map[directory_path].children.append(file_node)
else:
# If parent directory doesn't exist (should be rare), add to root
dir_map["/"].children.append(file_node) # pragma: no cover
# Return the root node with its children
return root_node
async def get_directory_structure(self) -> DirectoryNode:
"""Build a hierarchical directory structure without file details.
Optimized method for folder navigation that only returns directory nodes,
no file metadata. Much faster than get_directory_tree() for large knowledge bases.
Returns:
DirectoryNode tree containing only folders (type="directory")
"""
# Get unique directories without loading entities
directories = await self.entity_repository.get_distinct_directories()
# Create a root directory node
root_node = DirectoryNode(name="Root", directory_path="/", type="directory")
# Map to store directory nodes by path for easy lookup
dir_map: Dict[str, DirectoryNode] = {"/": root_node}
# Build tree with just folders
for dir_path in directories:
parts = [p for p in dir_path.split("/") if p]
current_path = "/"
for i, part in enumerate(parts):
parent_path = current_path
# Build the directory path
current_path = (
f"{current_path}{part}" if current_path == "/" else f"{current_path}/{part}"
)
# Create directory node if it doesn't exist
if current_path not in dir_map:
dir_node = DirectoryNode(
name=part, directory_path=current_path, type="directory"
)
dir_map[current_path] = dir_node
# Add to parent's children
if parent_path in dir_map:
dir_map[parent_path].children.append(dir_node)
return root_node
async def list_directory(
self,
dir_name: str = "/",
depth: int = 1,
file_name_glob: Optional[str] = None,
) -> List[DirectoryNode]:
"""List directory contents with filtering and depth control.
Args:
dir_name: Directory path to list (default: root "/")
depth: Recursion depth (1 = immediate children only)
file_name_glob: Glob pattern for filtering file names
Returns:
List of DirectoryNode objects matching the criteria
"""
# Normalize directory path
# Strip ./ prefix if present (handles relative path notation)
if dir_name.startswith("./"):
dir_name = dir_name[2:] # Remove "./" prefix
# Ensure path starts with "/"
if not dir_name.startswith("/"):
dir_name = f"/{dir_name}"
# Remove trailing slashes except for root
if dir_name != "/" and dir_name.endswith("/"):
dir_name = dir_name.rstrip("/")
# Optimize: Query only entities in the target directory
# instead of loading the entire tree
dir_prefix = dir_name.lstrip("/")
entity_rows = await self.entity_repository.find_by_directory_prefix(dir_prefix)
# Build a partial tree from only the relevant entities
root_tree = self._build_directory_tree_from_entities(entity_rows, dir_name)
# Find the target directory node
target_node = self._find_directory_node(root_tree, dir_name)
if not target_node:
return []
# Collect nodes with depth and glob filtering
result = []
self._collect_nodes_recursive(target_node, result, depth, file_name_glob, 0)
return result
def _build_directory_tree_from_entities(
self, entity_rows: Sequence[Entity], root_path: str
) -> DirectoryNode:
"""Build a directory tree from a subset of entities.
Args:
entity_rows: Sequence of entity objects to build tree from
root_path: Root directory path for the tree
Returns:
DirectoryNode representing the tree root
"""
# Create a root directory node
root_node = DirectoryNode(name="Root", directory_path=root_path, type="directory")
# Map to store directory nodes by path for easy lookup
dir_map: Dict[str, DirectoryNode] = {root_path: root_node}
# First pass: create all directory nodes
for file in entity_rows:
# Process directory path components
parts = [p for p in file.file_path.split("/") if p]
# Create directory structure
current_path = "/"
for i, part in enumerate(parts[:-1]): # Skip the filename
parent_path = current_path
# Build the directory path
current_path = (
f"{current_path}{part}" if current_path == "/" else f"{current_path}/{part}"
)
# Create directory node if it doesn't exist
if current_path not in dir_map:
dir_node = DirectoryNode(
name=part, directory_path=current_path, type="directory"
)
dir_map[current_path] = dir_node
# Add to parent's children
if parent_path in dir_map:
dir_map[parent_path].children.append(dir_node)
# Second pass: add file nodes to their parent directories
for file in entity_rows:
file_name = os.path.basename(file.file_path)
parent_dir = os.path.dirname(file.file_path)
directory_path = "/" if parent_dir == "" else f"/{parent_dir}"
# Create file node
file_node = DirectoryNode(
name=file_name,
file_path=file.file_path,
directory_path=f"/{file.file_path}",
type="file",
title=file.title,
permalink=file.permalink,
entity_id=file.id,
entity_type=file.entity_type,
content_type=file.content_type,
updated_at=file.updated_at,
)
# Add to parent directory's children
if directory_path in dir_map:
dir_map[directory_path].children.append(file_node)
elif root_path in dir_map:
# Fallback to root if parent not found
dir_map[root_path].children.append(file_node)
return root_node
def _find_directory_node(
self, root: DirectoryNode, target_path: str
) -> Optional[DirectoryNode]:
"""Find a directory node by path in the tree."""
if root.directory_path == target_path:
return root
for child in root.children:
if child.type == "directory":
found = self._find_directory_node(child, target_path)
if found:
return found
return None
def _collect_nodes_recursive(
self,
node: DirectoryNode,
result: List[DirectoryNode],
max_depth: int,
file_name_glob: Optional[str],
current_depth: int,
) -> None:
"""Recursively collect nodes with depth and glob filtering."""
if current_depth >= max_depth:
return
for child in node.children:
# Apply glob filtering
if file_name_glob and not fnmatch.fnmatch(child.name, file_name_glob):
continue
# Add the child to results
result.append(child)
# Recurse into subdirectories if we haven't reached max depth
if child.type == "directory" and current_depth < max_depth:
self._collect_nodes_recursive(
child, result, max_depth, file_name_glob, current_depth + 1
)
```
--------------------------------------------------------------------------------
/v15-docs/cloud-bisync.md:
--------------------------------------------------------------------------------
```markdown
# Cloud Bidirectional Sync (SPEC-9)
**Status**: New Feature
**PR**: #322
**Requires**: Active subscription, rclone installation
## What's New
v0.15.0 introduces **bidirectional cloud synchronization** using rclone bisync. Your local files sync automatically with the cloud, enabling multi-device workflows, backups, and collaboration.
## Quick Start
### One-Time Setup
```bash
# Install and configure cloud sync
bm cloud bisync-setup
# What it does:
# 1. Installs rclone
# 2. Gets tenant credentials
# 3. Configures rclone remote
# 4. Creates sync directory
# 5. Performs initial sync
```
### Regular Sync
```bash
# Recommended: Use standard sync command
bm sync # Syncs local → database
bm cloud bisync # Syncs local ↔ cloud
# Or: Use watch mode (auto-sync every 60 seconds)
bm sync --watch
```
## How Bidirectional Sync Works
### Sync Architecture
```
Local Files rclone bisync Cloud Storage
~/basic-memory- <─────────────> s3://bucket/
cloud-sync/ (bidirectional) tenant-id/
├── project-a/ ├── project-a/
├── project-b/ ├── project-b/
└── notes/ └── notes/
```
### Sync Profiles
Three profiles optimize for different use cases:
| Profile | Conflicts | Max Deletes | Speed | Use Case |
|---------|-----------|-------------|-------|----------|
| **safe** | Keep both versions | 10 | Slower | Preserve all changes, manual conflict resolution |
| **balanced** | Use newer file | 25 | Medium | **Default** - auto-resolve most conflicts |
| **fast** | Use newer file | 50 | Fastest | Rapid iteration, trust newer versions |
### Conflict Resolution
**safe profile** (--conflict-resolve=none):
- Conflicting files saved as `file.conflict1`, `file.conflict2`
- Manual resolution required
- No data loss
**balanced/fast profiles** (--conflict-resolve=newer):
- Automatically uses the newer file
- Faster syncs
- Good for single-user workflows
## Commands
### bm cloud bisync-setup
One-time setup for cloud sync.
```bash
bm cloud bisync-setup
# Optional: Custom sync directory
bm cloud bisync-setup --dir ~/my-sync-folder
```
**What happens:**
1. Checks for/installs rclone
2. Generates scoped S3 credentials
3. Configures rclone remote
4. Creates local sync directory
5. Performs initial baseline sync (--resync)
**Configuration saved to:**
- `~/.basic-memory/config.json` - sync_dir path
- `~/.config/rclone/rclone.conf` - remote credentials
- `~/.basic-memory/bisync-state/{tenant_id}/` - sync state
### bm cloud bisync
Manual bidirectional sync.
```bash
# Basic sync (uses 'balanced' profile)
bm cloud bisync
# Choose sync profile
bm cloud bisync --profile safe
bm cloud bisync --profile balanced
bm cloud bisync --profile fast
# Dry run (preview changes)
bm cloud bisync --dry-run
# Force resync (rebuild baseline)
bm cloud bisync --resync
# Verbose output
bm cloud bisync --verbose
```
**Auto-registration:**
- Scans local directory for new projects
- Creates them on cloud before sync
- Ensures cloud knows about all local projects
### bm sync (Recommended)
The standard sync command now handles both local and cloud:
```bash
# One command for everything
bm sync # Local sync + cloud sync
bm sync --watch # Continuous sync every 60s
```
## Sync Directory Structure
### Default Layout
```bash
~/basic-memory-cloud-sync/ # Configurable via --dir
├── project-a/ # Auto-created local projects
│ ├── notes/
│ ├── ideas/
│ └── .bmignore # Respected during sync
├── project-b/
│ └── documents/
└── .basic-memory/ # Metadata (ignored in sync)
```
### Important Paths
| Path | Purpose |
|------|---------|
| `~/basic-memory-cloud-sync/` | Default local sync directory |
| `~/basic-memory-cloud/` | Mount point (DO NOT use for bisync) |
| `~/.basic-memory/bisync-state/{tenant_id}/` | Sync state and history |
| `~/.basic-memory/.bmignore` | Patterns to exclude from sync |
**Critical:** Bisync and mount must use **different directories**
## File Filtering with .bmignore
### Default Patterns
Basic Memory respects `.bmignore` patterns (gitignore format):
```bash
# ~/.basic-memory/.bmignore (default)
.git
.DS_Store
node_modules
*.tmp
.env
__pycache__
.pytest_cache
.ruff_cache
.vscode
.idea
```
### How It Works
1. `.bmignore` patterns converted to rclone filter format
2. Auto-regenerated when `.bmignore` changes
3. Stored as `~/.basic-memory/.bmignore.rclone`
4. Applied to all bisync operations
### Custom Patterns
Edit `~/.basic-memory/.bmignore`:
```bash
# Your custom patterns
.git
*.log
temp/
*.backup
```
Next sync will use updated filters.
## Project Management
### Auto-Registration
Bisync automatically registers new local projects:
```bash
# You create a new project locally
mkdir ~/basic-memory-cloud-sync/new-project
echo "# Hello" > ~/basic-memory-cloud-sync/new-project/README.md
# Next sync auto-creates on cloud
bm cloud bisync
# → "Found 1 new local project, creating on cloud..."
# → "✓ Created project: new-project"
```
### Project Discovery
```bash
# List cloud projects
bm cloud status
# Shows:
# - Total projects
# - Last sync time
# - Storage used
```
### Cloud Mode
To work with cloud projects via CLI:
```bash
# Set cloud API URL
export BASIC_MEMORY_API_URL=https://api.basicmemory.cloud
# Or in config.json:
{
"api_url": "https://api.basicmemory.cloud"
}
# Now CLI tools work against cloud
bm sync --project new-project # Syncs cloud project
bm tools continue-conversation --project new-project
```
## Sync Workflow Examples
### Daily Workflow
```bash
# Morning: Start watch mode
bm sync --watch &
# Work in your sync directory
cd ~/basic-memory-cloud-sync/work-notes
vim ideas.md
# Changes auto-sync every 60s
# Watch output shows sync progress
```
### Multi-Device Workflow
**Device A:**
```bash
# Make changes
echo "# New Idea" > ~/basic-memory-cloud-sync/ideas/innovation.md
# Sync to cloud
bm cloud bisync
# → "✓ Sync completed - 1 file uploaded"
```
**Device B:**
```bash
# Pull changes from cloud
bm cloud bisync
# → "✓ Sync completed - 1 file downloaded"
# See the new file
cat ~/basic-memory-cloud-sync/ideas/innovation.md
# → "# New Idea"
```
### Conflict Scenario
**Using balanced profile (auto-resolve):**
```bash
# Both devices edit same file
# Device A: Updated at 10:00 AM
# Device B: Updated at 10:05 AM
# Device A syncs
bm cloud bisync
# → "✓ Sync completed"
# Device B syncs
bm cloud bisync
# → "Resolving conflict: using newer version"
# → "✓ Sync completed"
# → Device B's version (10:05) wins
```
**Using safe profile (manual resolution):**
```bash
bm cloud bisync --profile safe
# → "Conflict detected: ideas.md"
# → "Saved as: ideas.md.conflict1 and ideas.md.conflict2"
# → "Please resolve manually"
# Review both versions
diff ideas.md.conflict1 ideas.md.conflict2
# Merge and cleanup
vim ideas.md # Merge manually
rm ideas.md.conflict*
```
## Monitoring and Status
### Check Sync Status
```bash
bm cloud status
```
**Shows:**
```
Cloud Bisync Status
┏━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Property ┃ Value ┃
┡━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ Status │ ✓ Initialized │
│ Local Directory │ ~/basic-memory-cloud-sync │
│ Remote │ s3://bucket/tenant-id │
│ Last Sync │ 2 minutes ago │
│ Total Projects │ 5 │
└─────────────────────┴────────────────────────────┘
```
### Verify Integrity
```bash
bm cloud check
```
Compares local and cloud file hashes to detect:
- Corrupted files
- Missing files
- Sync drift
## Troubleshooting
### "First bisync requires --resync"
**Problem:** Initial sync not established
```bash
$ bm cloud bisync
Error: First bisync requires --resync to establish baseline
```
**Solution:**
```bash
bm cloud bisync --resync
```
### "Cannot use mount directory for bisync"
**Problem:** Trying to use mounted directory for sync
```bash
$ bm cloud bisync --dir ~/basic-memory-cloud
Error: Cannot use ~/basic-memory-cloud for bisync - it's the mount directory!
```
**Solution:** Use different directory
```bash
bm cloud bisync --dir ~/basic-memory-cloud-sync
```
### Sync Conflicts
**Problem:** Files modified on both sides
**Safe profile (manual):**
```bash
# Find conflict files
find ~/basic-memory-cloud-sync -name "*.conflict*"
# Review and merge
vimdiff file.conflict1 file.conflict2
# Keep desired version
mv file.conflict1 file
rm file.conflict2
```
**Balanced profile (auto):**
```bash
# Already resolved to newer version
# Check git history if needed
cd ~/basic-memory-cloud-sync
git log file.md
```
### Deleted Too Many Files
**Problem:** Exceeds max_delete threshold
```bash
$ bm cloud bisync
Error: Deletion exceeds safety limit (26 > 25)
```
**Solution:** Review deletions, then force if intentional
```bash
# Preview what would be deleted
bm cloud bisync --dry-run
# If intentional, use higher threshold profile
bm cloud bisync --profile fast # max_delete=50
# Or resync to establish new baseline
bm cloud bisync --resync
```
### rclone Not Found
**Problem:** rclone not installed
```bash
$ bm cloud bisync
Error: rclone not found
```
**Solution:**
```bash
# Run setup again
bm cloud bisync-setup
# → Installs rclone automatically
```
## Configuration
### Bisync Config
Edit `~/.basic-memory/config.json`:
```json
{
"bisync_config": {
"sync_dir": "~/basic-memory-cloud-sync",
"default_profile": "balanced",
"auto_sync_interval": 60
}
}
```
### rclone Config
Located at `~/.config/rclone/rclone.conf`:
```ini
[basic-memory-{tenant_id}]
type = s3
provider = AWS
env_auth = false
access_key_id = AKIA...
secret_access_key = ***
region = us-east-1
endpoint = https://fly.storage.tigris.dev
```
**Security:** This file contains credentials - keep private (mode 600)
## Performance Tips
1. **Use balanced profile**: Best trade-off for most users
2. **Enable watch mode**: `bm sync --watch` for auto-sync
3. **Optimize .bmignore**: Exclude build artifacts and temp files
4. **Batch changes**: Group related edits before sync
5. **Use fast profile**: For rapid iteration on solo projects
## Migration from WebDAV
If upgrading from v0.14.x WebDAV:
1. **Backup existing setup**
```bash
cp -r ~/basic-memory ~/basic-memory.backup
```
2. **Run bisync setup**
```bash
bm cloud bisync-setup
```
3. **Copy projects to sync directory**
```bash
cp -r ~/basic-memory/* ~/basic-memory-cloud-sync/
```
4. **Initial sync**
```bash
bm cloud bisync --resync
```
5. **Remove old WebDAV config** (if applicable)
## Security
- **Scoped credentials**: S3 credentials only access your tenant
- **Encrypted transport**: All traffic over HTTPS/TLS
- **No plain text secrets**: Credentials stored securely in rclone config
- **File permissions**: Config files restricted to user (600)
- **.bmignore**: Prevents syncing sensitive files
## See Also
- SPEC-9: Multi-Project Bidirectional Sync Architecture
- `cloud-authentication.md` - Required for cloud access
- `cloud-mount.md` - Alternative: mount cloud storage
- `env-file-removal.md` - Why .env files aren't synced
- `gitignore-integration.md` - File filtering patterns
```
--------------------------------------------------------------------------------
/v15-docs/api-performance.md:
--------------------------------------------------------------------------------
```markdown
# API Performance Optimizations (SPEC-11)
**Status**: Performance Enhancement
**PR**: #315
**Specification**: SPEC-11
**Impact**: Faster API responses, reduced database queries
## What Changed
v0.15.0 implements comprehensive API performance optimizations from SPEC-11, including query optimizations, reduced database round trips, and improved relation traversal.
## Key Optimizations
### 1. Query Optimization
**Before:**
```python
# Multiple separate queries
entity = await get_entity(id) # Query 1
observations = await get_observations(id) # Query 2
relations = await get_relations(id) # Query 3
tags = await get_tags(id) # Query 4
```
**After:**
```python
# Single optimized query with joins
entity = await get_entity_with_details(id)
# → One query returns everything
```
**Result:** **75% fewer database queries**
### 2. Relation Traversal
**Before:**
```python
# Recursive queries for each relation
for relation in entity.relations:
target = await get_entity(relation.target_id) # N queries
```
**After:**
```python
# Batch load all related entities
related_ids = [r.target_id for r in entity.relations]
targets = await get_entities_batch(related_ids) # 1 query
```
**Result:** **N+1 query problem eliminated**
### 3. Eager Loading
**Before:**
```python
# Lazy loading (multiple queries)
entity = await get_entity(id)
if need_relations:
relations = await load_relations(id)
if need_observations:
observations = await load_observations(id)
```
**After:**
```python
# Eager loading (one query)
entity = await get_entity(
id,
load_relations=True,
load_observations=True
) # All data in one query
```
**Result:** Configurable loading strategy
## Performance Impact
### API Response Times
**read_note endpoint:**
```
Before: 250ms average
After: 75ms average (3.3x faster)
```
**search_notes endpoint:**
```
Before: 450ms average
After: 150ms average (3x faster)
```
**build_context endpoint (depth=2):**
```
Before: 1200ms average
After: 320ms average (3.8x faster)
```
### Database Queries
**Typical MCP tool call:**
```
Before: 15-20 queries
After: 3-5 queries (75% reduction)
```
**Context building (10 entities):**
```
Before: 150+ queries (N+1 problem)
After: 8 queries (batch loading)
```
## Optimization Techniques
### 1. SELECT Optimization
**Specific column selection:**
```python
# Before: SELECT *
query = select(Entity)
# After: SELECT only needed columns
query = select(
Entity.id,
Entity.title,
Entity.permalink,
Entity.content
)
```
**Benefit:** Reduced data transfer
### 2. JOIN Optimization
**Efficient joins:**
```python
# Join related tables in one query
query = (
select(Entity, Observation, Relation)
.join(Observation, Entity.id == Observation.entity_id)
.join(Relation, Entity.id == Relation.from_id)
)
```
**Benefit:** Single query vs multiple
### 3. Index Usage
**Optimized indexes:**
```sql
-- Ensure indexes on frequently queried columns
CREATE INDEX idx_entity_permalink ON entities(permalink);
CREATE INDEX idx_relation_from_id ON relations(from_id);
CREATE INDEX idx_relation_to_id ON relations(to_id);
CREATE INDEX idx_observation_entity_id ON observations(entity_id);
```
**Benefit:** Faster lookups
### 4. Query Caching
**Result caching:**
```python
from functools import lru_cache
@lru_cache(maxsize=1000)
async def get_entity_cached(entity_id: str):
return await get_entity(entity_id)
```
**Benefit:** Avoid redundant queries
### 5. Batch Loading
**Load multiple entities:**
```python
# Before: Load one at a time
entities = []
for id in entity_ids:
entity = await get_entity(id) # N queries
entities.append(entity)
# After: Batch load
query = select(Entity).where(Entity.id.in_(entity_ids))
entities = await db.execute(query) # 1 query
```
**Benefit:** Eliminates N+1 problem
## API-Specific Optimizations
### read_note
**Optimizations:**
- Single query with joins
- Eager load observations and relations
- Efficient permalink lookup
```python
# Optimized query
query = (
select(Entity)
.options(
selectinload(Entity.observations),
selectinload(Entity.relations)
)
.where(Entity.permalink == permalink)
)
```
**Performance:**
- **Before:** 250ms (4 queries)
- **After:** 75ms (1 query)
### search_notes
**Optimizations:**
- Full-text search index
- Pagination optimization
- Result limiting
```python
# Optimized search
query = (
select(Entity)
.where(Entity.content.match(search_query))
.limit(page_size)
.offset(page * page_size)
)
```
**Performance:**
- **Before:** 450ms
- **After:** 150ms (3x faster)
### build_context
**Optimizations:**
- Batch relation traversal
- Depth-limited queries
- Circular reference detection
```python
# Optimized context building
async def build_context(url: str, depth: int = 2):
# Start entity
entity = await get_entity_by_url(url)
# Batch load all relations (depth levels)
related_ids = collect_related_ids(entity, depth)
related = await get_entities_batch(related_ids) # 1 query
return build_graph(entity, related)
```
**Performance:**
- **Before:** 1200ms (150+ queries)
- **After:** 320ms (8 queries)
### recent_activity
**Optimizations:**
- Time-indexed queries
- Limit early in query
- Efficient sorting
```python
# Optimized recent query
query = (
select(Entity)
.where(Entity.updated_at >= timeframe_start)
.order_by(Entity.updated_at.desc())
.limit(max_results)
)
```
**Performance:**
- **Before:** 600ms
- **After:** 180ms (3.3x faster)
## Configuration
### Query Optimization Settings
No configuration needed - optimizations are automatic.
### Monitoring Query Performance
**Enable query logging:**
```bash
export BASIC_MEMORY_LOG_LEVEL=DEBUG
```
**Log output:**
```
[DEBUG] Query took 15ms: SELECT entity WHERE permalink=...
[DEBUG] Query took 3ms: SELECT observations WHERE entity_id IN (...)
```
### Profiling
```python
import time
from loguru import logger
async def profile_query(query_name: str):
start = time.time()
result = await execute_query()
elapsed = (time.time() - start) * 1000
logger.info(f"{query_name}: {elapsed:.2f}ms")
return result
```
## Benchmarks
### Single Entity Retrieval
```
Operation: get_entity_with_details(id)
Before:
- Queries: 4 (entity, observations, relations, tags)
- Time: 45ms total
After:
- Queries: 1 (joined query)
- Time: 12ms total (3.8x faster)
```
### Search Operations
```
Operation: search_notes(query, limit=10)
Before:
- Queries: 1 search + 10 detail queries
- Time: 450ms total
After:
- Queries: 1 optimized search with joins
- Time: 150ms total (3x faster)
```
### Context Building
```
Operation: build_context(url, depth=2)
Scenario: 10 entities, 20 relations
Before:
- Queries: 1 root + 20 relations + 10 targets = 31 queries
- Time: 620ms
After:
- Queries: 1 root + 1 batch relations + 1 batch targets = 3 queries
- Time: 165ms (3.8x faster)
```
### Bulk Operations
```
Operation: Import 100 notes
Before:
- Queries: 100 inserts + 300 relation queries = 400 queries
- Time: 8.5 seconds
After:
- Queries: 1 bulk insert + 1 bulk relations = 2 queries
- Time: 2.1 seconds (4x faster)
```
## Best Practices
### 1. Use Batch Operations
```python
# ✓ Good: Batch load
entity_ids = [1, 2, 3, 4, 5]
entities = await get_entities_batch(entity_ids)
# ✗ Bad: Load one at a time
entities = []
for id in entity_ids:
entity = await get_entity(id)
entities.append(entity)
```
### 2. Specify Required Data
```python
# ✓ Good: Load what you need
entity = await get_entity(
id,
load_relations=True,
load_observations=False # Don't need these
)
# ✗ Bad: Load everything
entity = await get_entity_full(id) # Loads unnecessary data
```
### 3. Use Pagination
```python
# ✓ Good: Paginate results
results = await search_notes(
query="test",
page=1,
page_size=20
)
# ✗ Bad: Load all results
results = await search_notes(query="test") # Could be thousands
```
### 4. Index Foreign Keys
```sql
-- ✓ Good: Indexed joins
CREATE INDEX idx_relation_from_id ON relations(from_id);
-- ✗ Bad: No index
-- Joins will be slow
```
### 5. Limit Depth
```python
# ✓ Good: Reasonable depth
context = await build_context(url, depth=2)
# ✗ Bad: Excessive depth
context = await build_context(url, depth=10) # Exponential growth
```
## Troubleshooting
### Slow Queries
**Problem:** API responses still slow
**Debug:**
```bash
# Enable query logging
export BASIC_MEMORY_LOG_LEVEL=DEBUG
# Check for N+1 queries
# Look for repeated similar queries
```
**Solution:**
```python
# Use batch loading
ids = [1, 2, 3, 4, 5]
entities = await get_entities_batch(ids) # Not in loop
```
### High Memory Usage
**Problem:** Large result sets consume memory
**Solution:**
```python
# Use streaming/pagination
async for batch in stream_entities(batch_size=100):
process(batch)
```
### Database Locks
**Problem:** Concurrent queries blocking
**Solution:**
- Ensure WAL mode enabled (see `sqlite-performance.md`)
- Use read-only queries when possible
- Reduce transaction size
## Implementation Details
### Optimized Query Builder
```python
class OptimizedQueryBuilder:
def __init__(self):
self.query = select(Entity)
self.joins = []
self.options = []
def with_observations(self):
self.options.append(selectinload(Entity.observations))
return self
def with_relations(self):
self.options.append(selectinload(Entity.relations))
return self
def build(self):
if self.options:
self.query = self.query.options(*self.options)
return self.query
```
### Batch Loader
```python
class BatchEntityLoader:
def __init__(self, batch_size: int = 100):
self.batch_size = batch_size
self.pending = []
async def load(self, entity_id: str):
self.pending.append(entity_id)
if len(self.pending) >= self.batch_size:
return await self._flush()
return None
async def _flush(self):
if not self.pending:
return []
ids = self.pending
self.pending = []
# Single batch query
query = select(Entity).where(Entity.id.in_(ids))
result = await db.execute(query)
return result.scalars().all()
```
### Query Cache
```python
from cachetools import TTLCache
class QueryCache:
def __init__(self, maxsize: int = 1000, ttl: int = 300):
self.cache = TTLCache(maxsize=maxsize, ttl=ttl)
async def get_or_query(self, key: str, query_func):
if key in self.cache:
return self.cache[key]
result = await query_func()
self.cache[key] = result
return result
```
## Migration from v0.14.x
### Automatic Optimization
**No action needed** - optimizations are automatic:
```bash
# Upgrade and restart
pip install --upgrade basic-memory
bm mcp
# Optimizations active immediately
```
### Verify Performance Improvement
**Before upgrade:**
```bash
time bm tools search --query "test"
# → 450ms
```
**After upgrade:**
```bash
time bm tools search --query "test"
# → 150ms (3x faster)
```
## See Also
- SPEC-11: API Performance Optimization specification
- `sqlite-performance.md` - Database-level optimizations
- `background-relations.md` - Background processing optimizations
- Database indexing guide
- Query optimization patterns
```
--------------------------------------------------------------------------------
/src/basic_memory/db.py:
--------------------------------------------------------------------------------
```python
import asyncio
import os
from contextlib import asynccontextmanager
from enum import Enum, auto
from pathlib import Path
from typing import AsyncGenerator, Optional
from basic_memory.config import BasicMemoryConfig, ConfigManager
from alembic import command
from alembic.config import Config
from loguru import logger
from sqlalchemy import text, event
from sqlalchemy.ext.asyncio import (
create_async_engine,
async_sessionmaker,
AsyncSession,
AsyncEngine,
async_scoped_session,
)
from sqlalchemy.pool import NullPool
from basic_memory.repository.search_repository import SearchRepository
# Module level state
_engine: Optional[AsyncEngine] = None
_session_maker: Optional[async_sessionmaker[AsyncSession]] = None
_migrations_completed: bool = False
class DatabaseType(Enum):
"""Types of supported databases."""
MEMORY = auto()
FILESYSTEM = auto()
@classmethod
def get_db_url(cls, db_path: Path, db_type: "DatabaseType") -> str:
"""Get SQLAlchemy URL for database path."""
if db_type == cls.MEMORY:
logger.info("Using in-memory SQLite database")
return "sqlite+aiosqlite://"
return f"sqlite+aiosqlite:///{db_path}" # pragma: no cover
def get_scoped_session_factory(
session_maker: async_sessionmaker[AsyncSession],
) -> async_scoped_session:
"""Create a scoped session factory scoped to current task."""
return async_scoped_session(session_maker, scopefunc=asyncio.current_task)
@asynccontextmanager
async def scoped_session(
session_maker: async_sessionmaker[AsyncSession],
) -> AsyncGenerator[AsyncSession, None]:
"""
Get a scoped session with proper lifecycle management.
Args:
session_maker: Session maker to create scoped sessions from
"""
factory = get_scoped_session_factory(session_maker)
session = factory()
try:
await session.execute(text("PRAGMA foreign_keys=ON"))
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
await factory.remove()
def _configure_sqlite_connection(dbapi_conn, enable_wal: bool = True) -> None:
"""Configure SQLite connection with WAL mode and optimizations.
Args:
dbapi_conn: Database API connection object
enable_wal: Whether to enable WAL mode (should be False for in-memory databases)
"""
cursor = dbapi_conn.cursor()
try:
# Enable WAL mode for better concurrency (not supported for in-memory databases)
if enable_wal:
cursor.execute("PRAGMA journal_mode=WAL")
# Set busy timeout to handle locked databases
cursor.execute("PRAGMA busy_timeout=10000") # 10 seconds
# Optimize for performance
cursor.execute("PRAGMA synchronous=NORMAL")
cursor.execute("PRAGMA cache_size=-64000") # 64MB cache
cursor.execute("PRAGMA temp_store=MEMORY")
# Windows-specific optimizations
if os.name == "nt":
cursor.execute("PRAGMA locking_mode=NORMAL") # Ensure normal locking on Windows
except Exception as e:
# Log but don't fail - some PRAGMAs may not be supported
logger.warning(f"Failed to configure SQLite connection: {e}")
finally:
cursor.close()
def _create_engine_and_session(
db_path: Path, db_type: DatabaseType = DatabaseType.FILESYSTEM
) -> tuple[AsyncEngine, async_sessionmaker[AsyncSession]]:
"""Internal helper to create engine and session maker."""
db_url = DatabaseType.get_db_url(db_path, db_type)
logger.debug(f"Creating engine for db_url: {db_url}")
# Configure connection args with Windows-specific settings
connect_args: dict[str, bool | float | None] = {"check_same_thread": False}
# Add Windows-specific parameters to improve reliability
if os.name == "nt": # Windows
connect_args.update(
{
"timeout": 30.0, # Increase timeout to 30 seconds for Windows
"isolation_level": None, # Use autocommit mode
}
)
# Use NullPool for Windows filesystem databases to avoid connection pooling issues
# Important: Do NOT use NullPool for in-memory databases as it will destroy the database
# between connections
if db_type == DatabaseType.FILESYSTEM:
engine = create_async_engine(
db_url,
connect_args=connect_args,
poolclass=NullPool, # Disable connection pooling on Windows
echo=False,
)
else:
# In-memory databases need connection pooling to maintain state
engine = create_async_engine(db_url, connect_args=connect_args)
else:
engine = create_async_engine(db_url, connect_args=connect_args)
# Enable WAL mode for better concurrency and reliability
# Note: WAL mode is not supported for in-memory databases
enable_wal = db_type != DatabaseType.MEMORY
@event.listens_for(engine.sync_engine, "connect")
def enable_wal_mode(dbapi_conn, connection_record):
"""Enable WAL mode on each connection."""
_configure_sqlite_connection(dbapi_conn, enable_wal=enable_wal)
session_maker = async_sessionmaker(engine, expire_on_commit=False)
return engine, session_maker
async def get_or_create_db(
db_path: Path,
db_type: DatabaseType = DatabaseType.FILESYSTEM,
ensure_migrations: bool = True,
) -> tuple[AsyncEngine, async_sessionmaker[AsyncSession]]: # pragma: no cover
"""Get or create database engine and session maker."""
global _engine, _session_maker
if _engine is None:
_engine, _session_maker = _create_engine_and_session(db_path, db_type)
# Run migrations automatically unless explicitly disabled
if ensure_migrations:
app_config = ConfigManager().config
await run_migrations(app_config, db_type)
# These checks should never fail since we just created the engine and session maker
# if they were None, but we'll check anyway for the type checker
if _engine is None:
logger.error("Failed to create database engine", db_path=str(db_path))
raise RuntimeError("Database engine initialization failed")
if _session_maker is None:
logger.error("Failed to create session maker", db_path=str(db_path))
raise RuntimeError("Session maker initialization failed")
return _engine, _session_maker
async def shutdown_db() -> None: # pragma: no cover
"""Clean up database connections."""
global _engine, _session_maker, _migrations_completed
if _engine:
await _engine.dispose()
_engine = None
_session_maker = None
_migrations_completed = False
@asynccontextmanager
async def engine_session_factory(
db_path: Path,
db_type: DatabaseType = DatabaseType.MEMORY,
) -> AsyncGenerator[tuple[AsyncEngine, async_sessionmaker[AsyncSession]], None]:
"""Create engine and session factory.
Note: This is primarily used for testing where we want a fresh database
for each test. For production use, use get_or_create_db() instead.
"""
global _engine, _session_maker, _migrations_completed
db_url = DatabaseType.get_db_url(db_path, db_type)
logger.debug(f"Creating engine for db_url: {db_url}")
# Configure connection args with Windows-specific settings
connect_args: dict[str, bool | float | None] = {"check_same_thread": False}
# Add Windows-specific parameters to improve reliability
if os.name == "nt": # Windows
connect_args.update(
{
"timeout": 30.0, # Increase timeout to 30 seconds for Windows
"isolation_level": None, # Use autocommit mode
}
)
# Use NullPool for Windows filesystem databases to avoid connection pooling issues
# Important: Do NOT use NullPool for in-memory databases as it will destroy the database
# between connections
if db_type == DatabaseType.FILESYSTEM:
_engine = create_async_engine(
db_url,
connect_args=connect_args,
poolclass=NullPool, # Disable connection pooling on Windows
echo=False,
)
else:
# In-memory databases need connection pooling to maintain state
_engine = create_async_engine(db_url, connect_args=connect_args)
else:
_engine = create_async_engine(db_url, connect_args=connect_args)
# Enable WAL mode for better concurrency and reliability
# Note: WAL mode is not supported for in-memory databases
enable_wal = db_type != DatabaseType.MEMORY
@event.listens_for(_engine.sync_engine, "connect")
def enable_wal_mode(dbapi_conn, connection_record):
"""Enable WAL mode on each connection."""
_configure_sqlite_connection(dbapi_conn, enable_wal=enable_wal)
try:
_session_maker = async_sessionmaker(_engine, expire_on_commit=False)
# Verify that engine and session maker are initialized
if _engine is None: # pragma: no cover
logger.error("Database engine is None in engine_session_factory")
raise RuntimeError("Database engine initialization failed")
if _session_maker is None: # pragma: no cover
logger.error("Session maker is None in engine_session_factory")
raise RuntimeError("Session maker initialization failed")
yield _engine, _session_maker
finally:
if _engine:
await _engine.dispose()
_engine = None
_session_maker = None
_migrations_completed = False
async def run_migrations(
app_config: BasicMemoryConfig, database_type=DatabaseType.FILESYSTEM, force: bool = False
): # pragma: no cover
"""Run any pending alembic migrations."""
global _migrations_completed
# Skip if migrations already completed unless forced
if _migrations_completed and not force:
logger.debug("Migrations already completed in this session, skipping")
return
logger.info("Running database migrations...")
try:
# Get the absolute path to the alembic directory relative to this file
alembic_dir = Path(__file__).parent / "alembic"
config = Config()
# Set required Alembic config options programmatically
config.set_main_option("script_location", str(alembic_dir))
config.set_main_option(
"file_template",
"%%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s",
)
config.set_main_option("timezone", "UTC")
config.set_main_option("revision_environment", "false")
config.set_main_option(
"sqlalchemy.url", DatabaseType.get_db_url(app_config.database_path, database_type)
)
command.upgrade(config, "head")
logger.info("Migrations completed successfully")
# Get session maker - ensure we don't trigger recursive migration calls
if _session_maker is None:
_, session_maker = _create_engine_and_session(app_config.database_path, database_type)
else:
session_maker = _session_maker
# initialize the search Index schema
# the project_id is not used for init_search_index, so we pass a dummy value
await SearchRepository(session_maker, 1).init_search_index()
# Mark migrations as completed
_migrations_completed = True
except Exception as e: # pragma: no cover
logger.error(f"Error running migrations: {e}")
raise
```
--------------------------------------------------------------------------------
/tests/repository/test_observation_repository.py:
--------------------------------------------------------------------------------
```python
"""Tests for the ObservationRepository."""
from datetime import datetime, timezone
import pytest
import pytest_asyncio
import sqlalchemy
from sqlalchemy.ext.asyncio import async_sessionmaker
from basic_memory import db
from basic_memory.models import Entity, Observation, Project
from basic_memory.repository.observation_repository import ObservationRepository
@pytest_asyncio.fixture(scope="function")
async def repo(observation_repository):
"""Create an ObservationRepository instance"""
return observation_repository
@pytest_asyncio.fixture(scope="function")
async def sample_observation(repo, sample_entity: Entity):
"""Create a sample observation for testing"""
observation_data = {
"entity_id": sample_entity.id,
"content": "Test observation",
"context": "test-context",
}
return await repo.create(observation_data)
@pytest.mark.asyncio
async def test_create_observation(
observation_repository: ObservationRepository, sample_entity: Entity
):
"""Test creating a new observation"""
observation_data = {
"entity_id": sample_entity.id,
"content": "Test content",
"context": "test-context",
}
observation = await observation_repository.create(observation_data)
assert observation.entity_id == sample_entity.id
assert observation.content == "Test content"
assert observation.id is not None # Should be auto-generated
@pytest.mark.asyncio
async def test_create_observation_entity_does_not_exist(
observation_repository: ObservationRepository, sample_entity: Entity
):
"""Test creating a new observation"""
observation_data = {
"entity_id": "does-not-exist",
"content": "Test content",
"context": "test-context",
}
with pytest.raises(sqlalchemy.exc.IntegrityError):
await observation_repository.create(observation_data)
@pytest.mark.asyncio
async def test_find_by_entity(
observation_repository: ObservationRepository,
sample_observation: Observation,
sample_entity: Entity,
):
"""Test finding observations by entity"""
observations = await observation_repository.find_by_entity(sample_entity.id)
assert len(observations) == 1
assert observations[0].id == sample_observation.id
assert observations[0].content == sample_observation.content
@pytest.mark.asyncio
async def test_find_by_context(
observation_repository: ObservationRepository, sample_observation: Observation
):
"""Test finding observations by context"""
observations = await observation_repository.find_by_context("test-context")
assert len(observations) == 1
assert observations[0].id == sample_observation.id
assert observations[0].content == sample_observation.content
@pytest.mark.asyncio
async def test_delete_observations(session_maker: async_sessionmaker, repo, test_project: Project):
"""Test deleting observations by entity_id."""
# Create test entity
async with db.scoped_session(session_maker) as session:
entity = Entity(
project_id=test_project.id,
title="test_entity",
entity_type="test",
permalink="test/test-entity",
file_path="test/test_entity.md",
content_type="text/markdown",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
session.add(entity)
await session.flush()
# Create test observations
obs1 = Observation(
entity_id=entity.id,
content="Test observation 1",
)
obs2 = Observation(
entity_id=entity.id,
content="Test observation 2",
)
session.add_all([obs1, obs2])
# Test deletion by entity_id
deleted = await repo.delete_by_fields(entity_id=entity.id)
assert deleted is True
# Verify observations were deleted
remaining = await repo.find_by_entity(entity.id)
assert len(remaining) == 0
@pytest.mark.asyncio
async def test_delete_observation_by_id(
session_maker: async_sessionmaker, repo, test_project: Project
):
"""Test deleting a single observation by its ID."""
# Create test entity
async with db.scoped_session(session_maker) as session:
entity = Entity(
project_id=test_project.id,
title="test_entity",
entity_type="test",
permalink="test/test-entity",
file_path="test/test_entity.md",
content_type="text/markdown",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
session.add(entity)
await session.flush()
# Create test observation
obs = Observation(
entity_id=entity.id,
content="Test observation",
)
session.add(obs)
# Test deletion by ID
deleted = await repo.delete(obs.id)
assert deleted is True
# Verify observation was deleted
remaining = await repo.find_by_id(obs.id)
assert remaining is None
@pytest.mark.asyncio
async def test_delete_observation_by_content(
session_maker: async_sessionmaker, repo, test_project: Project
):
"""Test deleting observations by content."""
# Create test entity
async with db.scoped_session(session_maker) as session:
entity = Entity(
project_id=test_project.id,
title="test_entity",
entity_type="test",
permalink="test/test-entity",
file_path="test/test_entity.md",
content_type="text/markdown",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
session.add(entity)
await session.flush()
# Create test observations
obs1 = Observation(
entity_id=entity.id,
content="Delete this observation",
)
obs2 = Observation(
entity_id=entity.id,
content="Keep this observation",
)
session.add_all([obs1, obs2])
# Test deletion by content
deleted = await repo.delete_by_fields(content="Delete this observation")
assert deleted is True
# Verify only matching observation was deleted
remaining = await repo.find_by_entity(entity.id)
assert len(remaining) == 1
assert remaining[0].content == "Keep this observation"
@pytest.mark.asyncio
async def test_find_by_category(session_maker: async_sessionmaker, repo, test_project: Project):
"""Test finding observations by their category."""
# Create test entity
async with db.scoped_session(session_maker) as session:
entity = Entity(
project_id=test_project.id,
title="test_entity",
entity_type="test",
permalink="test/test-entity",
file_path="test/test_entity.md",
content_type="text/markdown",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
session.add(entity)
await session.flush()
# Create test observations with different categories
observations = [
Observation(
entity_id=entity.id,
content="Tech observation",
category="tech",
),
Observation(
entity_id=entity.id,
content="Design observation",
category="design",
),
Observation(
entity_id=entity.id,
content="Another tech observation",
category="tech",
),
]
session.add_all(observations)
await session.commit()
# Find tech observations
tech_obs = await repo.find_by_category("tech")
assert len(tech_obs) == 2
assert all(obs.category == "tech" for obs in tech_obs)
assert set(obs.content for obs in tech_obs) == {"Tech observation", "Another tech observation"}
# Find design observations
design_obs = await repo.find_by_category("design")
assert len(design_obs) == 1
assert design_obs[0].category == "design"
assert design_obs[0].content == "Design observation"
# Search for non-existent category
missing_obs = await repo.find_by_category("missing")
assert len(missing_obs) == 0
@pytest.mark.asyncio
async def test_observation_categories(
session_maker: async_sessionmaker, repo, test_project: Project
):
"""Test retrieving distinct observation categories."""
# Create test entity
async with db.scoped_session(session_maker) as session:
entity = Entity(
project_id=test_project.id,
title="test_entity",
entity_type="test",
permalink="test/test-entity",
file_path="test/test_entity.md",
content_type="text/markdown",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
session.add(entity)
await session.flush()
# Create observations with various categories
observations = [
Observation(
entity_id=entity.id,
content="First tech note",
category="tech",
),
Observation(
entity_id=entity.id,
content="Second tech note",
category="tech", # Duplicate category
),
Observation(
entity_id=entity.id,
content="Design note",
category="design",
),
Observation(
entity_id=entity.id,
content="Feature note",
category="feature",
),
]
session.add_all(observations)
await session.commit()
# Get distinct categories
categories = await repo.observation_categories()
# Should have unique categories in a deterministic order
assert set(categories) == {"tech", "design", "feature"}
@pytest.mark.asyncio
async def test_find_by_category_with_empty_db(repo):
"""Test category operations with an empty database."""
# Find by category should return empty list
obs = await repo.find_by_category("tech")
assert len(obs) == 0
# Get categories should return empty list
categories = await repo.observation_categories()
assert len(categories) == 0
@pytest.mark.asyncio
async def test_find_by_category_case_sensitivity(
session_maker: async_sessionmaker, repo, test_project: Project
):
"""Test how category search handles case sensitivity."""
async with db.scoped_session(session_maker) as session:
entity = Entity(
project_id=test_project.id,
title="test_entity",
entity_type="test",
permalink="test/test-entity",
file_path="test/test_entity.md",
content_type="text/markdown",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
session.add(entity)
await session.flush()
# Create a test observation
obs = Observation(
entity_id=entity.id,
content="Tech note",
category="tech", # lowercase in database
)
session.add(obs)
await session.commit()
# Search should work regardless of case
# Note: If we want case-insensitive search, we'll need to update the query
# For now, this test documents the current behavior
exact_match = await repo.find_by_category("tech")
assert len(exact_match) == 1
upper_case = await repo.find_by_category("TECH")
assert len(upper_case) == 0 # Currently case-sensitive
```
--------------------------------------------------------------------------------
/tests/mcp/test_tool_search.py:
--------------------------------------------------------------------------------
```python
"""Tests for search MCP tools."""
import pytest
from datetime import datetime, timedelta
from unittest.mock import patch
from basic_memory.mcp.tools import write_note
from basic_memory.mcp.tools.search import search_notes, _format_search_error_response
from basic_memory.schemas.search import SearchResponse
@pytest.mark.asyncio
async def test_search_text(client, test_project):
"""Test basic search functionality."""
# Create a test note
result = await write_note.fn(
project=test_project.name,
title="Test Search Note",
folder="test",
content="# Test\nThis is a searchable test note",
tags=["test", "search"],
)
assert result
# Search for it
response = await search_notes.fn(project=test_project.name, query="searchable")
# Verify results - handle both success and error cases
if isinstance(response, SearchResponse):
# Success case - verify SearchResponse
assert len(response.results) > 0
assert any(r.permalink == "test/test-search-note" for r in response.results)
else:
# If search failed and returned error message, test should fail with informative message
pytest.fail(f"Search failed with error: {response}")
@pytest.mark.asyncio
async def test_search_title(client, test_project):
"""Test basic search functionality."""
# Create a test note
result = await write_note.fn(
project=test_project.name,
title="Test Search Note",
folder="test",
content="# Test\nThis is a searchable test note",
tags=["test", "search"],
)
assert result
# Search for it
response = await search_notes.fn(
project=test_project.name, query="Search Note", search_type="title"
)
# Verify results - handle both success and error cases
if isinstance(response, str):
# If search failed and returned error message, test should fail with informative message
pytest.fail(f"Search failed with error: {response}")
else:
# Success case - verify SearchResponse
assert len(response.results) > 0
assert any(r.permalink == "test/test-search-note" for r in response.results)
@pytest.mark.asyncio
async def test_search_permalink(client, test_project):
"""Test basic search functionality."""
# Create a test note
result = await write_note.fn(
project=test_project.name,
title="Test Search Note",
folder="test",
content="# Test\nThis is a searchable test note",
tags=["test", "search"],
)
assert result
# Search for it
response = await search_notes.fn(
project=test_project.name, query="test/test-search-note", search_type="permalink"
)
# Verify results - handle both success and error cases
if isinstance(response, SearchResponse):
# Success case - verify SearchResponse
assert len(response.results) > 0
assert any(r.permalink == "test/test-search-note" for r in response.results)
else:
# If search failed and returned error message, test should fail with informative message
pytest.fail(f"Search failed with error: {response}")
@pytest.mark.asyncio
async def test_search_permalink_match(client, test_project):
"""Test basic search functionality."""
# Create a test note
result = await write_note.fn(
project=test_project.name,
title="Test Search Note",
folder="test",
content="# Test\nThis is a searchable test note",
tags=["test", "search"],
)
assert result
# Search for it
response = await search_notes.fn(
project=test_project.name, query="test/test-search-*", search_type="permalink"
)
# Verify results - handle both success and error cases
if isinstance(response, SearchResponse):
# Success case - verify SearchResponse
assert len(response.results) > 0
assert any(r.permalink == "test/test-search-note" for r in response.results)
else:
# If search failed and returned error message, test should fail with informative message
pytest.fail(f"Search failed with error: {response}")
@pytest.mark.asyncio
async def test_search_pagination(client, test_project):
"""Test basic search functionality."""
# Create a test note
result = await write_note.fn(
project=test_project.name,
title="Test Search Note",
folder="test",
content="# Test\nThis is a searchable test note",
tags=["test", "search"],
)
assert result
# Search for it
response = await search_notes.fn(
project=test_project.name, query="searchable", page=1, page_size=1
)
# Verify results - handle both success and error cases
if isinstance(response, SearchResponse):
# Success case - verify SearchResponse
assert len(response.results) == 1
assert any(r.permalink == "test/test-search-note" for r in response.results)
else:
# If search failed and returned error message, test should fail with informative message
pytest.fail(f"Search failed with error: {response}")
@pytest.mark.asyncio
async def test_search_with_type_filter(client, test_project):
"""Test search with entity type filter."""
# Create test content
await write_note.fn(
project=test_project.name,
title="Entity Type Test",
folder="test",
content="# Test\nFiltered by type",
)
# Search with type filter
response = await search_notes.fn(project=test_project.name, query="type", types=["note"])
# Verify results - handle both success and error cases
if isinstance(response, SearchResponse):
# Success case - verify all results are entities
assert all(r.type == "entity" for r in response.results)
else:
# If search failed and returned error message, test should fail with informative message
pytest.fail(f"Search failed with error: {response}")
@pytest.mark.asyncio
async def test_search_with_entity_type_filter(client, test_project):
"""Test search with entity type filter."""
# Create test content
await write_note.fn(
project=test_project.name,
title="Entity Type Test",
folder="test",
content="# Test\nFiltered by type",
)
# Search with entity type filter
response = await search_notes.fn(
project=test_project.name, query="type", entity_types=["entity"]
)
# Verify results - handle both success and error cases
if isinstance(response, SearchResponse):
# Success case - verify all results are entities
assert all(r.type == "entity" for r in response.results)
else:
# If search failed and returned error message, test should fail with informative message
pytest.fail(f"Search failed with error: {response}")
@pytest.mark.asyncio
async def test_search_with_date_filter(client, test_project):
"""Test search with date filter."""
# Create test content
await write_note.fn(
project=test_project.name,
title="Recent Note",
folder="test",
content="# Test\nRecent content",
)
# Search with date filter
one_hour_ago = datetime.now() - timedelta(hours=1)
response = await search_notes.fn(
project=test_project.name, query="recent", after_date=one_hour_ago.isoformat()
)
# Verify results - handle both success and error cases
if isinstance(response, SearchResponse):
# Success case - verify we get results within timeframe
assert len(response.results) > 0
else:
# If search failed and returned error message, test should fail with informative message
pytest.fail(f"Search failed with error: {response}")
class TestSearchErrorFormatting:
"""Test search error formatting for better user experience."""
def test_format_search_error_fts5_syntax(self):
"""Test formatting for FTS5 syntax errors."""
result = _format_search_error_response(
"test-project", "syntax error in FTS5", "test query("
)
assert "# Search Failed - Invalid Syntax" in result
assert "The search query 'test query(' contains invalid syntax" in result
assert "Special characters" in result
assert "test query" in result # Clean query without special chars
def test_format_search_error_no_results(self):
"""Test formatting for no results found."""
result = _format_search_error_response(
"test-project", "no results found", "very specific query"
)
assert "# Search Complete - No Results Found" in result
assert "No content found matching 'very specific query'" in result
assert "Broaden your search" in result
assert "very" in result # Simplified query
def test_format_search_error_server_error(self):
"""Test formatting for server errors."""
result = _format_search_error_response(
"test-project", "internal server error", "test query"
)
assert "# Search Failed - Server Error" in result
assert "The search service encountered an error while processing 'test query'" in result
assert "Try again" in result
assert "Check project status" in result
def test_format_search_error_permission_denied(self):
"""Test formatting for permission errors."""
result = _format_search_error_response("test-project", "permission denied", "test query")
assert "# Search Failed - Access Error" in result
assert "You don't have permission to search" in result
assert "Check your project access" in result
def test_format_search_error_project_not_found(self):
"""Test formatting for project not found errors."""
result = _format_search_error_response(
"test-project", "current project not found", "test query"
)
assert "# Search Failed - Project Not Found" in result
assert "The current project is not accessible" in result
assert "Check available projects" in result
def test_format_search_error_generic(self):
"""Test formatting for generic errors."""
result = _format_search_error_response("test-project", "unknown error", "test query")
assert "# Search Failed" in result
assert "Error searching for 'test query': unknown error" in result
assert "## Troubleshooting steps:" in result
class TestSearchToolErrorHandling:
"""Test search tool exception handling."""
@pytest.mark.asyncio
async def test_search_notes_exception_handling(self):
"""Test exception handling in search_notes."""
with patch("basic_memory.mcp.tools.search.get_active_project") as mock_get_project:
mock_get_project.return_value.project_url = "http://test"
with patch(
"basic_memory.mcp.tools.search.call_post", side_effect=Exception("syntax error")
):
result = await search_notes.fn(project="test-project", query="test query")
assert isinstance(result, str)
assert "# Search Failed - Invalid Syntax" in result
@pytest.mark.asyncio
async def test_search_notes_permission_error(self):
"""Test search_notes with permission error."""
with patch("basic_memory.mcp.tools.search.get_active_project") as mock_get_project:
mock_get_project.return_value.project_url = "http://test"
with patch(
"basic_memory.mcp.tools.search.call_post",
side_effect=Exception("permission denied"),
):
result = await search_notes.fn(project="test-project", query="test query")
assert isinstance(result, str)
assert "# Search Failed - Access Error" in result
```
--------------------------------------------------------------------------------
/src/basic_memory/cli/commands/tool.py:
--------------------------------------------------------------------------------
```python
"""CLI tool commands for Basic Memory."""
import asyncio
import sys
from typing import Annotated, List, Optional
import typer
from loguru import logger
from rich import print as rprint
from basic_memory.cli.app import app
from basic_memory.config import ConfigManager
# Import prompts
from basic_memory.mcp.prompts.continue_conversation import (
continue_conversation as mcp_continue_conversation,
)
from basic_memory.mcp.prompts.recent_activity import (
recent_activity_prompt as recent_activity_prompt,
)
from basic_memory.mcp.tools import build_context as mcp_build_context
from basic_memory.mcp.tools import read_note as mcp_read_note
from basic_memory.mcp.tools import recent_activity as mcp_recent_activity
from basic_memory.mcp.tools import search_notes as mcp_search
from basic_memory.mcp.tools import write_note as mcp_write_note
from basic_memory.schemas.base import TimeFrame
from basic_memory.schemas.memory import MemoryUrl
from basic_memory.schemas.search import SearchItemType
tool_app = typer.Typer()
app.add_typer(tool_app, name="tool", help="Access to MCP tools via CLI")
@tool_app.command()
def write_note(
title: Annotated[str, typer.Option(help="The title of the note")],
folder: Annotated[str, typer.Option(help="The folder to create the note in")],
project: Annotated[
Optional[str],
typer.Option(
help="The project to write to. If not provided, the default project will be used."
),
] = None,
content: Annotated[
Optional[str],
typer.Option(
help="The content of the note. If not provided, content will be read from stdin. This allows piping content from other commands, e.g.: cat file.md | basic-memory tools write-note"
),
] = None,
tags: Annotated[
Optional[List[str]], typer.Option(help="A list of tags to apply to the note")
] = None,
):
"""Create or update a markdown note. Content can be provided as an argument or read from stdin.
Content can be provided in two ways:
1. Using the --content parameter
2. Piping content through stdin (if --content is not provided)
Examples:
# Using content parameter
basic-memory tools write-note --title "My Note" --folder "notes" --content "Note content"
# Using stdin pipe
echo "# My Note Content" | basic-memory tools write-note --title "My Note" --folder "notes"
# Using heredoc
cat << EOF | basic-memory tools write-note --title "My Note" --folder "notes"
# My Document
This is my document content.
- Point 1
- Point 2
EOF
# Reading from a file
cat document.md | basic-memory tools write-note --title "Document" --folder "docs"
"""
try:
# If content is not provided, read from stdin
if content is None:
# Check if we're getting data from a pipe or redirect
if not sys.stdin.isatty():
content = sys.stdin.read()
else: # pragma: no cover
# If stdin is a terminal (no pipe/redirect), inform the user
typer.echo(
"No content provided. Please provide content via --content or by piping to stdin.",
err=True,
)
raise typer.Exit(1)
# Also check for empty content
if content is not None and not content.strip():
typer.echo("Empty content provided. Please provide non-empty content.", err=True)
raise typer.Exit(1)
# look for the project in the config
config_manager = ConfigManager()
project_name = None
if project is not None:
project_name, _ = config_manager.get_project(project)
if not project_name:
typer.echo(f"No project found named: {project}", err=True)
raise typer.Exit(1)
# use the project name, or the default from the config
project_name = project_name or config_manager.default_project
note = asyncio.run(mcp_write_note.fn(title, content, folder, project_name, tags))
rprint(note)
except Exception as e: # pragma: no cover
if not isinstance(e, typer.Exit):
typer.echo(f"Error during write_note: {e}", err=True)
raise typer.Exit(1)
raise
@tool_app.command()
def read_note(
identifier: str,
project: Annotated[
Optional[str],
typer.Option(
help="The project to use for the note. If not provided, the default project will be used."
),
] = None,
page: int = 1,
page_size: int = 10,
):
"""Read a markdown note from the knowledge base."""
# look for the project in the config
config_manager = ConfigManager()
project_name = None
if project is not None:
project_name, _ = config_manager.get_project(project)
if not project_name:
typer.echo(f"No project found named: {project}", err=True)
raise typer.Exit(1)
# use the project name, or the default from the config
project_name = project_name or config_manager.default_project
try:
note = asyncio.run(mcp_read_note.fn(identifier, project_name, page, page_size))
rprint(note)
except Exception as e: # pragma: no cover
if not isinstance(e, typer.Exit):
typer.echo(f"Error during read_note: {e}", err=True)
raise typer.Exit(1)
raise
@tool_app.command()
def build_context(
url: MemoryUrl,
project: Annotated[
Optional[str],
typer.Option(help="The project to use. If not provided, the default project will be used."),
] = None,
depth: Optional[int] = 1,
timeframe: Optional[TimeFrame] = "7d",
page: int = 1,
page_size: int = 10,
max_related: int = 10,
):
"""Get context needed to continue a discussion."""
# look for the project in the config
config_manager = ConfigManager()
project_name = None
if project is not None:
project_name, _ = config_manager.get_project(project)
if not project_name:
typer.echo(f"No project found named: {project}", err=True)
raise typer.Exit(1)
# use the project name, or the default from the config
project_name = project_name or config_manager.default_project
try:
context = asyncio.run(
mcp_build_context.fn(
project=project_name,
url=url,
depth=depth,
timeframe=timeframe,
page=page,
page_size=page_size,
max_related=max_related,
)
)
# Use json module for more controlled serialization
import json
context_dict = context.model_dump(exclude_none=True)
print(json.dumps(context_dict, indent=2, ensure_ascii=True, default=str))
except Exception as e: # pragma: no cover
if not isinstance(e, typer.Exit):
typer.echo(f"Error during build_context: {e}", err=True)
raise typer.Exit(1)
raise
@tool_app.command()
def recent_activity(
type: Annotated[Optional[List[SearchItemType]], typer.Option()] = None,
depth: Optional[int] = 1,
timeframe: Optional[TimeFrame] = "7d",
):
"""Get recent activity across the knowledge base."""
try:
result = asyncio.run(
mcp_recent_activity.fn(
type=type, # pyright: ignore [reportArgumentType]
depth=depth,
timeframe=timeframe,
)
)
# The tool now returns a formatted string directly
print(result)
except Exception as e: # pragma: no cover
if not isinstance(e, typer.Exit):
typer.echo(f"Error during recent_activity: {e}", err=True)
raise typer.Exit(1)
raise
@tool_app.command("search-notes")
def search_notes(
query: str,
permalink: Annotated[bool, typer.Option("--permalink", help="Search permalink values")] = False,
title: Annotated[bool, typer.Option("--title", help="Search title values")] = False,
project: Annotated[
Optional[str],
typer.Option(
help="The project to use for the note. If not provided, the default project will be used."
),
] = None,
after_date: Annotated[
Optional[str],
typer.Option("--after_date", help="Search results after date, eg. '2d', '1 week'"),
] = None,
page: int = 1,
page_size: int = 10,
):
"""Search across all content in the knowledge base."""
# look for the project in the config
config_manager = ConfigManager()
project_name = None
if project is not None:
project_name, _ = config_manager.get_project(project)
if not project_name:
typer.echo(f"No project found named: {project}", err=True)
raise typer.Exit(1)
# use the project name, or the default from the config
project_name = project_name or config_manager.default_project
if permalink and title: # pragma: no cover
print("Cannot search both permalink and title")
raise typer.Abort()
try:
if permalink and title: # pragma: no cover
typer.echo(
"Use either --permalink or --title, not both. Exiting.",
err=True,
)
raise typer.Exit(1)
# set search type
search_type = ("permalink" if permalink else None,)
search_type = ("permalink_match" if permalink and "*" in query else None,)
search_type = ("title" if title else None,)
search_type = "text" if search_type is None else search_type
results = asyncio.run(
mcp_search.fn(
query,
project_name,
search_type=search_type,
page=page,
after_date=after_date,
page_size=page_size,
)
)
# Use json module for more controlled serialization
import json
results_dict = results.model_dump(exclude_none=True)
print(json.dumps(results_dict, indent=2, ensure_ascii=True, default=str))
except Exception as e: # pragma: no cover
if not isinstance(e, typer.Exit):
logger.exception("Error during search", e)
typer.echo(f"Error during search: {e}", err=True)
raise typer.Exit(1)
raise
@tool_app.command(name="continue-conversation")
def continue_conversation(
topic: Annotated[Optional[str], typer.Option(help="Topic or keyword to search for")] = None,
timeframe: Annotated[
Optional[str], typer.Option(help="How far back to look for activity")
] = None,
):
"""Prompt to continue a previous conversation or work session."""
try:
# Prompt functions return formatted strings directly
session = asyncio.run(mcp_continue_conversation.fn(topic=topic, timeframe=timeframe)) # type: ignore
rprint(session)
except Exception as e: # pragma: no cover
if not isinstance(e, typer.Exit):
logger.exception("Error continuing conversation", e)
typer.echo(f"Error continuing conversation: {e}", err=True)
raise typer.Exit(1)
raise
# @tool_app.command(name="show-recent-activity")
# def show_recent_activity(
# timeframe: Annotated[
# str, typer.Option(help="How far back to look for activity")
# ] = "7d",
# ):
# """Prompt to show recent activity."""
# try:
# # Prompt functions return formatted strings directly
# session = asyncio.run(recent_activity_prompt(timeframe=timeframe))
# rprint(session)
# except Exception as e: # pragma: no cover
# if not isinstance(e, typer.Exit):
# logger.exception("Error continuing conversation", e)
# typer.echo(f"Error continuing conversation: {e}", err=True)
# raise typer.Exit(1)
# raise
```
--------------------------------------------------------------------------------
/tests/repository/test_relation_repository.py:
--------------------------------------------------------------------------------
```python
"""Tests for the RelationRepository."""
from datetime import datetime, timezone
import pytest
import pytest_asyncio
import sqlalchemy
from basic_memory import db
from basic_memory.models import Entity, Relation, Project
from basic_memory.repository.relation_repository import RelationRepository
@pytest_asyncio.fixture
async def source_entity(session_maker, test_project: Project):
"""Create a source entity for testing relations."""
entity = Entity(
project_id=test_project.id,
title="test_source",
entity_type="test",
permalink="source/test-source",
file_path="source/test_source.md",
content_type="text/markdown",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
async with db.scoped_session(session_maker) as session:
session.add(entity)
await session.flush()
return entity
@pytest_asyncio.fixture
async def target_entity(session_maker, test_project: Project):
"""Create a target entity for testing relations."""
entity = Entity(
project_id=test_project.id,
title="test_target",
entity_type="test",
permalink="target/test-target",
file_path="target/test_target.md",
content_type="text/markdown",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
async with db.scoped_session(session_maker) as session:
session.add(entity)
await session.flush()
return entity
@pytest_asyncio.fixture
async def test_relations(session_maker, source_entity, target_entity):
"""Create test relations."""
relations = [
Relation(
from_id=source_entity.id,
to_id=target_entity.id,
to_name=target_entity.title,
relation_type="connects_to",
),
Relation(
from_id=source_entity.id,
to_id=target_entity.id,
to_name=target_entity.title,
relation_type="depends_on",
),
]
async with db.scoped_session(session_maker) as session:
session.add_all(relations)
await session.flush()
return relations
@pytest_asyncio.fixture(scope="function")
async def related_entity(entity_repository):
"""Create a second entity for testing relations"""
entity_data = {
"title": "Related Entity",
"entity_type": "test",
"permalink": "test/related-entity",
"file_path": "test/related_entity.md",
"summary": "A related test entity",
"content_type": "text/markdown",
"created_at": datetime.now(timezone.utc),
"updated_at": datetime.now(timezone.utc),
}
return await entity_repository.create(entity_data)
@pytest_asyncio.fixture(scope="function")
async def sample_relation(
relation_repository: RelationRepository, sample_entity: Entity, related_entity: Entity
):
"""Create a sample relation for testing"""
relation_data = {
"from_id": sample_entity.id,
"to_id": related_entity.id,
"to_name": related_entity.title,
"relation_type": "test_relation",
"context": "test-context",
}
return await relation_repository.create(relation_data)
@pytest_asyncio.fixture(scope="function")
async def multiple_relations(
relation_repository: RelationRepository, sample_entity: Entity, related_entity: Entity
):
"""Create multiple relations for testing"""
relations_data = [
{
"from_id": sample_entity.id,
"to_id": related_entity.id,
"to_name": related_entity.title,
"relation_type": "relation_one",
"context": "context_one",
},
{
"from_id": sample_entity.id,
"to_id": related_entity.id,
"to_name": related_entity.title,
"relation_type": "relation_two",
"context": "context_two",
},
{
"from_id": related_entity.id,
"to_id": sample_entity.id,
"to_name": related_entity.title,
"relation_type": "relation_one",
"context": "context_three",
},
]
return [await relation_repository.create(data) for data in relations_data]
@pytest.mark.asyncio
async def test_create_relation(
relation_repository: RelationRepository, sample_entity: Entity, related_entity: Entity
):
"""Test creating a new relation"""
relation_data = {
"from_id": sample_entity.id,
"to_id": related_entity.id,
"to_name": related_entity.title,
"relation_type": "test_relation",
"context": "test-context",
}
relation = await relation_repository.create(relation_data)
assert relation.from_id == sample_entity.id
assert relation.to_id == related_entity.id
assert relation.relation_type == "test_relation"
assert relation.id is not None # Should be auto-generated
@pytest.mark.asyncio
async def test_create_relation_entity_does_not_exist(
relation_repository: RelationRepository, sample_entity: Entity, related_entity: Entity
):
"""Test creating a new relation"""
relation_data = {
"from_id": "not_exist",
"to_id": related_entity.id,
"to_name": related_entity.title,
"relation_type": "test_relation",
"context": "test-context",
}
with pytest.raises(sqlalchemy.exc.IntegrityError):
await relation_repository.create(relation_data)
@pytest.mark.asyncio
async def test_find_by_entities(
relation_repository: RelationRepository,
sample_relation: Relation,
sample_entity: Entity,
related_entity: Entity,
):
"""Test finding relations between specific entities"""
relations = await relation_repository.find_by_entities(sample_entity.id, related_entity.id)
assert len(relations) == 1
assert relations[0].id == sample_relation.id
assert relations[0].relation_type == sample_relation.relation_type
@pytest.mark.asyncio
async def test_find_relation(relation_repository: RelationRepository, sample_relation: Relation):
"""Test finding relations by type"""
relation = await relation_repository.find_relation(
from_permalink=sample_relation.from_entity.permalink,
to_permalink=sample_relation.to_entity.permalink,
relation_type=sample_relation.relation_type,
)
assert relation.id == sample_relation.id
@pytest.mark.asyncio
async def test_find_by_type(relation_repository: RelationRepository, sample_relation: Relation):
"""Test finding relations by type"""
relations = await relation_repository.find_by_type("test_relation")
assert len(relations) == 1
assert relations[0].id == sample_relation.id
@pytest.mark.asyncio
async def test_find_unresolved_relations(
relation_repository: RelationRepository, sample_entity: Entity, related_entity: Entity
):
"""Test creating a new relation"""
relation_data = {
"from_id": sample_entity.id,
"to_id": None,
"to_name": related_entity.title,
"relation_type": "test_relation",
"context": "test-context",
}
relation = await relation_repository.create(relation_data)
assert relation.from_id == sample_entity.id
assert relation.to_id is None
unresolved = await relation_repository.find_unresolved_relations()
assert len(unresolved) == 1
assert unresolved[0].id == relation.id
@pytest.mark.asyncio
async def test_delete_by_fields_single_field(
relation_repository: RelationRepository, multiple_relations: list[Relation]
):
"""Test deleting relations by a single field."""
# Delete all relations of type 'relation_one'
result = await relation_repository.delete_by_fields(relation_type="relation_one") # pyright: ignore [reportArgumentType]
assert result is True
# Verify deletion
remaining = await relation_repository.find_by_type("relation_one")
assert len(remaining) == 0
# Other relations should still exist
others = await relation_repository.find_by_type("relation_two")
assert len(others) == 1
@pytest.mark.asyncio
async def test_delete_by_fields_multiple_fields(
relation_repository: RelationRepository,
multiple_relations: list[Relation],
sample_entity: Entity,
related_entity: Entity,
):
"""Test deleting relations by multiple fields."""
# Delete specific relation matching both from_id and relation_type
result = await relation_repository.delete_by_fields(
from_id=sample_entity.id, # pyright: ignore [reportArgumentType]
relation_type="relation_one", # pyright: ignore [reportArgumentType]
)
assert result is True
# Verify correct relation was deleted
remaining = await relation_repository.find_by_entities(sample_entity.id, related_entity.id)
assert len(remaining) == 1 # Only relation_two should remain
assert remaining[0].relation_type == "relation_two"
@pytest.mark.asyncio
async def test_delete_by_fields_no_match(
relation_repository: RelationRepository, multiple_relations: list[Relation]
):
"""Test delete_by_fields when no relations match."""
result = await relation_repository.delete_by_fields(
relation_type="nonexistent_type" # pyright: ignore [reportArgumentType]
)
assert result is False
@pytest.mark.asyncio
async def test_delete_by_fields_all_fields(
relation_repository: RelationRepository,
multiple_relations: list[Relation],
sample_entity: Entity,
related_entity: Entity,
):
"""Test deleting relation by matching all fields."""
# Get first relation's data
relation = multiple_relations[0]
# Delete using all fields
result = await relation_repository.delete_by_fields(
from_id=relation.from_id, # pyright: ignore [reportArgumentType]
to_id=relation.to_id, # pyright: ignore [reportArgumentType]
relation_type=relation.relation_type, # pyright: ignore [reportArgumentType]
)
assert result is True
# Verify only exact match was deleted
remaining = await relation_repository.find_by_type(relation.relation_type)
assert len(remaining) == 1 # One other relation_one should remain
@pytest.mark.asyncio
async def test_delete_relation_by_id(relation_repository, test_relations):
"""Test deleting a relation by ID."""
relation = test_relations[0]
result = await relation_repository.delete(relation.id)
assert result is True
# Verify deletion
remaining = await relation_repository.find_one(
relation_repository.select(Relation).filter(Relation.id == relation.id)
)
assert remaining is None
@pytest.mark.asyncio
async def test_delete_relations_by_type(relation_repository, test_relations):
"""Test deleting relations by type."""
result = await relation_repository.delete_by_fields(relation_type="connects_to")
assert result is True
# Verify specific type was deleted
remaining = await relation_repository.find_by_type("connects_to")
assert len(remaining) == 0
# Verify other type still exists
others = await relation_repository.find_by_type("depends_on")
assert len(others) == 1
@pytest.mark.asyncio
async def test_delete_relations_by_entities(
relation_repository, test_relations, source_entity, target_entity
):
"""Test deleting relations between specific entities."""
result = await relation_repository.delete_by_fields(
from_id=source_entity.id, to_id=target_entity.id
)
assert result is True
# Verify all relations between entities were deleted
remaining = await relation_repository.find_by_entities(source_entity.id, target_entity.id)
assert len(remaining) == 0
@pytest.mark.asyncio
async def test_delete_nonexistent_relation(relation_repository):
"""Test deleting a relation that doesn't exist."""
result = await relation_repository.delete_by_fields(relation_type="nonexistent")
assert result is False
```
--------------------------------------------------------------------------------
/test-int/mcp/test_read_content_integration.py:
--------------------------------------------------------------------------------
```python
"""
Integration tests for read_content MCP tool.
Comprehensive tests covering text files, binary files, images, error cases,
and memory:// URL handling via the complete MCP client-server flow.
"""
import json
import pytest
from fastmcp import Client
from fastmcp.exceptions import ToolError
def parse_read_content_response(mcp_result):
"""Helper function to parse read_content MCP response."""
assert len(mcp_result.content) == 1
assert mcp_result.content[0].type == "text"
return json.loads(mcp_result.content[0].text)
@pytest.mark.asyncio
async def test_read_content_markdown_file(mcp_server, app, test_project):
"""Test reading a markdown file created by write_note."""
async with Client(mcp_server) as client:
# First create a note
await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": "Content Test",
"folder": "test",
"content": "# Content Test\n\nThis is test content with **markdown**.",
"tags": "test,content",
},
)
# Then read the raw file content
read_result = await client.call_tool(
"read_content",
{
"project": test_project.name,
"path": "test/Content Test.md",
},
)
# Parse the response
response_data = parse_read_content_response(read_result)
assert response_data["type"] == "text"
assert response_data["content_type"] == "text/markdown; charset=utf-8"
assert response_data["encoding"] == "utf-8"
content = response_data["text"]
# Should contain the raw markdown with frontmatter
assert "# Content Test" in content
assert "This is test content with **markdown**." in content
assert "tags:" in content # frontmatter
assert "- test" in content # tags are in YAML list format
assert "- content" in content
@pytest.mark.asyncio
async def test_read_content_by_permalink(mcp_server, app, test_project):
"""Test reading content using permalink instead of file path."""
async with Client(mcp_server) as client:
# Create a note
await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": "Permalink Test",
"folder": "docs",
"content": "# Permalink Test\n\nTesting permalink-based content reading.",
},
)
# Read by permalink (without .md extension)
read_result = await client.call_tool(
"read_content",
{
"project": test_project.name,
"path": "docs/permalink-test",
},
)
# Parse the response
response_data = parse_read_content_response(read_result)
content = response_data["text"]
assert "# Permalink Test" in content
assert "Testing permalink-based content reading." in content
@pytest.mark.asyncio
async def test_read_content_memory_url(mcp_server, app, test_project):
"""Test reading content using memory:// URL format."""
async with Client(mcp_server) as client:
# Create a note
await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": "Memory URL Test",
"folder": "test",
"content": "# Memory URL Test\n\nTesting memory:// URL handling.",
"tags": "memory,url",
},
)
# Read using memory:// URL
read_result = await client.call_tool(
"read_content",
{
"project": test_project.name,
"path": "memory://test/memory-url-test",
},
)
# Parse the response
response_data = parse_read_content_response(read_result)
content = response_data["text"]
assert "# Memory URL Test" in content
assert "Testing memory:// URL handling." in content
@pytest.mark.asyncio
async def test_read_content_unicode_file(mcp_server, app, test_project):
"""Test reading content with unicode characters and emojis."""
async with Client(mcp_server) as client:
# Create a note with unicode content
unicode_content = (
"# Unicode Test 🚀\n\nThis note has emoji 🎉 and unicode ♠♣♥♦\n\n测试中文内容"
)
await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": "Unicode Content Test",
"folder": "test",
"content": unicode_content,
"tags": "unicode,emoji",
},
)
# Read the content back
read_result = await client.call_tool(
"read_content",
{
"project": test_project.name,
"path": "test/Unicode Content Test.md",
},
)
# Parse the response
response_data = parse_read_content_response(read_result)
content = response_data["text"]
# All unicode content should be preserved
assert "🚀" in content
assert "🎉" in content
assert "♠♣♥♦" in content
assert "测试中文内容" in content
@pytest.mark.asyncio
async def test_read_content_complex_frontmatter(mcp_server, app, test_project):
"""Test reading content with complex frontmatter and markdown."""
async with Client(mcp_server) as client:
# Create a note with complex content
complex_content = """---
title: Complex Note
type: document
version: 1.0
author: Test Author
metadata:
status: draft
priority: high
---
# Complex Note
This note has complex frontmatter and various markdown elements.
## Observations
- [tech] Uses YAML frontmatter
- [design] Structured content format
## Relations
- related_to [[Other Note]]
- depends_on [[Framework]]
Regular markdown content continues here."""
await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": "Complex Note",
"folder": "docs",
"content": complex_content,
"tags": "complex,frontmatter",
},
)
# Read the content back
read_result = await client.call_tool(
"read_content",
{
"project": test_project.name,
"path": "docs/Complex Note.md",
},
)
# Parse the response
response_data = parse_read_content_response(read_result)
content = response_data["text"]
# Should preserve all frontmatter and content structure
assert "version: 1.0" in content
assert "author: Test Author" in content
assert "status: draft" in content
assert "[tech] Uses YAML frontmatter" in content
assert "[[Other Note]]" in content
@pytest.mark.asyncio
async def test_read_content_missing_file(mcp_server, app, test_project):
"""Test reading a file that doesn't exist."""
async with Client(mcp_server) as client:
try:
await client.call_tool(
"read_content",
{
"project": test_project.name,
"path": "nonexistent/file.md",
},
)
# Should not reach here - expecting an error
assert False, "Expected error for missing file"
except ToolError as e:
# Should get an appropriate error message
error_msg = str(e).lower()
assert "not found" in error_msg or "does not exist" in error_msg
@pytest.mark.asyncio
async def test_read_content_empty_file(mcp_server, app, test_project):
"""Test reading an empty file."""
async with Client(mcp_server) as client:
# Create a note with minimal content
await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": "Empty Test",
"folder": "test",
"content": "", # Empty content
},
)
# Read the content back
read_result = await client.call_tool(
"read_content",
{
"project": test_project.name,
"path": "test/Empty Test.md",
},
)
# Parse the response
response_data = parse_read_content_response(read_result)
content = response_data["text"]
# Should still have frontmatter even with empty content
assert "title: Empty Test" in content
assert "permalink: test/empty-test" in content
@pytest.mark.asyncio
async def test_read_content_large_file(mcp_server, app, test_project):
"""Test reading a file with substantial content."""
async with Client(mcp_server) as client:
# Create a note with substantial content
large_content = "# Large Content Test\n\n"
# Add multiple sections with substantial text
for i in range(10):
large_content += f"""
## Section {i + 1}
This is section {i + 1} with substantial content. Lorem ipsum dolor sit amet,
consectetur adipiscing elit. Sed do eiusmod tempor incididunt ut labore et
dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation.
- [note] This is observation {i + 1}
- related_to [[Section {i}]]
Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore
eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident.
"""
await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": "Large Content Note",
"folder": "test",
"content": large_content,
"tags": "large,content,test",
},
)
# Read the content back
read_result = await client.call_tool(
"read_content",
{
"project": test_project.name,
"path": "test/Large Content Note.md",
},
)
# Parse the response
response_data = parse_read_content_response(read_result)
content = response_data["text"]
# Should contain all sections
assert "Section 1" in content
assert "Section 10" in content
assert "Lorem ipsum" in content
assert len(content) > 1000 # Should be substantial
@pytest.mark.asyncio
async def test_read_content_special_characters_in_filename(mcp_server, app, test_project):
"""Test reading files with special characters in the filename."""
async with Client(mcp_server) as client:
# Create notes with special characters in titles
test_cases = [
("File with spaces", "test"),
("File-with-dashes", "test"),
("File_with_underscores", "test"),
("File (with parentheses)", "test"),
("File & Symbols!", "test"),
]
for title, folder in test_cases:
await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": title,
"folder": folder,
"content": f"# {title}\n\nContent for {title}",
},
)
# Read the content back using the exact filename
read_result = await client.call_tool(
"read_content",
{
"project": test_project.name,
"path": f"{folder}/{title}.md",
},
)
assert len(read_result.content) == 1
assert read_result.content[0].type == "text"
content = read_result.content[0].text
assert f"# {title}" in content
assert f"Content for {title}" in content
```
--------------------------------------------------------------------------------
/tests/test_config.py:
--------------------------------------------------------------------------------
```python
"""Test configuration management."""
import tempfile
import pytest
from basic_memory.config import BasicMemoryConfig, ConfigManager
from pathlib import Path
class TestBasicMemoryConfig:
"""Test BasicMemoryConfig behavior with BASIC_MEMORY_HOME environment variable."""
def test_default_behavior_without_basic_memory_home(self, config_home, monkeypatch):
"""Test that config uses default path when BASIC_MEMORY_HOME is not set."""
# Ensure BASIC_MEMORY_HOME is not set
monkeypatch.delenv("BASIC_MEMORY_HOME", raising=False)
config = BasicMemoryConfig()
# Should use the default path (home/basic-memory)
expected_path = (config_home / "basic-memory").as_posix()
assert config.projects["main"] == Path(expected_path).as_posix()
def test_respects_basic_memory_home_environment_variable(self, config_home, monkeypatch):
"""Test that config respects BASIC_MEMORY_HOME environment variable."""
custom_path = (config_home / "app" / "data").as_posix()
monkeypatch.setenv("BASIC_MEMORY_HOME", custom_path)
config = BasicMemoryConfig()
# Should use the custom path from environment variable
assert config.projects["main"] == custom_path
def test_model_post_init_respects_basic_memory_home(self, config_home, monkeypatch):
"""Test that model_post_init creates main project with BASIC_MEMORY_HOME when missing."""
custom_path = str(config_home / "custom" / "memory" / "path")
monkeypatch.setenv("BASIC_MEMORY_HOME", custom_path)
# Create config without main project
other_path = str(config_home / "some" / "path")
config = BasicMemoryConfig(projects={"other": other_path})
# model_post_init should have added main project with BASIC_MEMORY_HOME
assert "main" in config.projects
assert config.projects["main"] == Path(custom_path).as_posix()
def test_model_post_init_fallback_without_basic_memory_home(self, config_home, monkeypatch):
"""Test that model_post_init falls back to default when BASIC_MEMORY_HOME is not set."""
# Ensure BASIC_MEMORY_HOME is not set
monkeypatch.delenv("BASIC_MEMORY_HOME", raising=False)
# Create config without main project
other_path = (config_home / "some" / "path").as_posix()
config = BasicMemoryConfig(projects={"other": other_path})
# model_post_init should have added main project with default path
expected_path = (config_home / "basic-memory").as_posix()
assert "main" in config.projects
assert config.projects["main"] == Path(expected_path).as_posix()
def test_basic_memory_home_with_relative_path(self, config_home, monkeypatch):
"""Test that BASIC_MEMORY_HOME works with relative paths."""
relative_path = "relative/memory/path"
monkeypatch.setenv("BASIC_MEMORY_HOME", relative_path)
config = BasicMemoryConfig()
# Should use the exact value from environment variable
assert config.projects["main"] == relative_path
def test_basic_memory_home_overrides_existing_main_project(self, config_home, monkeypatch):
"""Test that BASIC_MEMORY_HOME is not used when a map is passed in the constructor."""
custom_path = str(config_home / "override" / "memory" / "path")
monkeypatch.setenv("BASIC_MEMORY_HOME", custom_path)
# Try to create config with a different main project path
original_path = str(config_home / "original" / "path")
config = BasicMemoryConfig(projects={"main": original_path})
# The default_factory should override with BASIC_MEMORY_HOME value
# Note: This tests the current behavior where default_factory takes precedence
assert config.projects["main"] == original_path
class TestConfigManager:
"""Test ConfigManager functionality."""
@pytest.fixture
def temp_config_manager(self):
"""Create a ConfigManager with temporary config file."""
with tempfile.TemporaryDirectory() as temp_dir:
temp_path = Path(temp_dir)
# Create a test ConfigManager instance
config_manager = ConfigManager()
# Override config paths to use temp directory
config_manager.config_dir = temp_path / "basic-memory"
config_manager.config_file = config_manager.config_dir / "config.yaml"
config_manager.config_dir.mkdir(parents=True, exist_ok=True)
# Create initial config with test projects
test_config = BasicMemoryConfig(
default_project="main",
projects={
"main": str(temp_path / "main"),
"test-project": str(temp_path / "test"),
"special-chars": str(
temp_path / "special"
), # This will be the config key for "Special/Chars"
},
)
config_manager.save_config(test_config)
yield config_manager
def test_set_default_project_with_exact_name_match(self, temp_config_manager):
"""Test set_default_project when project name matches config key exactly."""
config_manager = temp_config_manager
# Set default to a project that exists with exact name match
config_manager.set_default_project("test-project")
# Verify the config was updated
config = config_manager.load_config()
assert config.default_project == "test-project"
def test_set_default_project_with_permalink_lookup(self, temp_config_manager):
"""Test set_default_project when input needs permalink normalization."""
config_manager = temp_config_manager
# Simulate a project that was created with special characters
# The config key would be the permalink, but user might type the original name
# First add a project with original name that gets normalized
config = config_manager.load_config()
config.projects["special-chars-project"] = str(Path("/tmp/special"))
config_manager.save_config(config)
# Now test setting default using a name that will normalize to the config key
config_manager.set_default_project(
"Special Chars Project"
) # This should normalize to "special-chars-project"
# Verify the config was updated with the correct config key
updated_config = config_manager.load_config()
assert updated_config.default_project == "special-chars-project"
def test_set_default_project_uses_canonical_name(self, temp_config_manager):
"""Test that set_default_project uses the canonical config key, not user input."""
config_manager = temp_config_manager
# Add a project with a config key that differs from user input
config = config_manager.load_config()
config.projects["my-test-project"] = str(Path("/tmp/mytest"))
config_manager.save_config(config)
# Set default using input that will match but is different from config key
config_manager.set_default_project("My Test Project") # Should find "my-test-project"
# Verify that the canonical config key is used, not the user input
updated_config = config_manager.load_config()
assert updated_config.default_project == "my-test-project"
# Should NOT be the user input
assert updated_config.default_project != "My Test Project"
def test_set_default_project_nonexistent_project(self, temp_config_manager):
"""Test set_default_project raises ValueError for nonexistent project."""
config_manager = temp_config_manager
with pytest.raises(ValueError, match="Project 'nonexistent' not found"):
config_manager.set_default_project("nonexistent")
def test_disable_permalinks_flag_default(self):
"""Test that disable_permalinks flag defaults to False."""
config = BasicMemoryConfig()
assert config.disable_permalinks is False
def test_disable_permalinks_flag_can_be_enabled(self):
"""Test that disable_permalinks flag can be set to True."""
config = BasicMemoryConfig(disable_permalinks=True)
assert config.disable_permalinks is True
def test_config_manager_respects_custom_config_dir(self, monkeypatch):
"""Test that ConfigManager respects BASIC_MEMORY_CONFIG_DIR environment variable."""
with tempfile.TemporaryDirectory() as temp_dir:
custom_config_dir = Path(temp_dir) / "custom" / "config"
monkeypatch.setenv("BASIC_MEMORY_CONFIG_DIR", str(custom_config_dir))
config_manager = ConfigManager()
# Verify config_dir is set to the custom path
assert config_manager.config_dir == custom_config_dir
# Verify config_file is in the custom directory
assert config_manager.config_file == custom_config_dir / "config.json"
# Verify the directory was created
assert config_manager.config_dir.exists()
def test_config_manager_default_without_custom_config_dir(self, config_home, monkeypatch):
"""Test that ConfigManager uses default location when BASIC_MEMORY_CONFIG_DIR is not set."""
monkeypatch.delenv("BASIC_MEMORY_CONFIG_DIR", raising=False)
config_manager = ConfigManager()
# Should use default location
assert config_manager.config_dir == config_home / ".basic-memory"
assert config_manager.config_file == config_home / ".basic-memory" / "config.json"
def test_remove_project_with_exact_name_match(self, temp_config_manager):
"""Test remove_project when project name matches config key exactly."""
config_manager = temp_config_manager
# Verify project exists
config = config_manager.load_config()
assert "test-project" in config.projects
# Remove the project with exact name match
config_manager.remove_project("test-project")
# Verify the project was removed
config = config_manager.load_config()
assert "test-project" not in config.projects
def test_remove_project_with_permalink_lookup(self, temp_config_manager):
"""Test remove_project when input needs permalink normalization."""
config_manager = temp_config_manager
# Add a project with normalized key
config = config_manager.load_config()
config.projects["special-chars-project"] = str(Path("/tmp/special"))
config_manager.save_config(config)
# Remove using a name that will normalize to the config key
config_manager.remove_project(
"Special Chars Project"
) # This should normalize to "special-chars-project"
# Verify the project was removed using the correct config key
updated_config = config_manager.load_config()
assert "special-chars-project" not in updated_config.projects
def test_remove_project_uses_canonical_name(self, temp_config_manager):
"""Test that remove_project uses the canonical config key, not user input."""
config_manager = temp_config_manager
# Add a project with a config key that differs from user input
config = config_manager.load_config()
config.projects["my-test-project"] = str(Path("/tmp/mytest"))
config_manager.save_config(config)
# Remove using input that will match but is different from config key
config_manager.remove_project("My Test Project") # Should find "my-test-project"
# Verify that the canonical config key was removed
updated_config = config_manager.load_config()
assert "my-test-project" not in updated_config.projects
def test_remove_project_nonexistent_project(self, temp_config_manager):
"""Test remove_project raises ValueError for nonexistent project."""
config_manager = temp_config_manager
with pytest.raises(ValueError, match="Project 'nonexistent' not found"):
config_manager.remove_project("nonexistent")
def test_remove_project_cannot_remove_default(self, temp_config_manager):
"""Test remove_project raises ValueError when trying to remove default project."""
config_manager = temp_config_manager
# Try to remove the default project
with pytest.raises(ValueError, match="Cannot remove the default project"):
config_manager.remove_project("main")
```
--------------------------------------------------------------------------------
/v15-docs/chatgpt-integration.md:
--------------------------------------------------------------------------------
```markdown
# ChatGPT MCP Integration
**Status**: New Feature
**PR**: #305
**File**: `mcp/tools/chatgpt_tools.py`
**Mode**: Remote MCP only
## What's New
v0.15.0 introduces ChatGPT-specific MCP tools that expose Basic Memory's search and fetch functionality using OpenAI's required tool schema and response format.
## Requirements
### ChatGPT Plus/Pro Subscription
**Required:** ChatGPT Plus or Pro subscription
- Free tier does NOT support MCP
- Pro tier includes MCP support
**Pricing:**
- ChatGPT Plus: $20/month
- ChatGPT Pro: $200/month (includes advanced features)
### Developer Mode
**Required:** ChatGPT Developer Mode
- Access to MCP server configuration
- Ability to add custom MCP servers
**Enable Developer Mode:**
1. Open ChatGPT settings
2. Navigate to "Advanced" or "Developer" settings
3. Enable "Developer Mode"
4. Restart ChatGPT
### Remote MCP Configuration
**Important:** ChatGPT only supports **remote MCP servers**
- Cannot use local MCP (like Claude Desktop)
- Requires publicly accessible MCP server
- Basic Memory must be deployed and reachable
## How It Works
### ChatGPT-Specific Format
OpenAI requires MCP responses in a specific format:
**Standard MCP (Claude, etc.):**
```json
{
"results": [...],
"total": 10
}
```
**ChatGPT MCP:**
```json
[
{
"type": "text",
"text": "{\"results\": [...], \"total\": 10}"
}
]
```
**Key difference:** ChatGPT expects content wrapped in `[{"type": "text", "text": "..."}]` array
### Adapter Architecture
```
ChatGPT Request
↓
ChatGPT MCP Tools (chatgpt_tools.py)
↓
Standard Basic Memory Tools (search_notes, read_note)
↓
Format for ChatGPT
↓
[{"type": "text", "text": "{...json...}"}]
↓
ChatGPT Response
```
## Available Tools
### 1. search
Search across the knowledge base.
**Tool Definition:**
```json
{
"name": "search",
"description": "Search for content across the knowledge base",
"inputSchema": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query"
}
},
"required": ["query"]
}
}
```
**Example Request:**
```json
{
"query": "authentication system"
}
```
**Example Response:**
```json
[
{
"type": "text",
"text": "{\"results\": [{\"id\": \"auth-design\", \"title\": \"Authentication Design\", \"url\": \"auth-design\"}], \"total_count\": 1, \"query\": \"authentication system\"}"
}
]
```
**Parsed JSON:**
```json
{
"results": [
{
"id": "auth-design",
"title": "Authentication Design",
"url": "auth-design"
}
],
"total_count": 1,
"query": "authentication system"
}
```
### 2. fetch
Fetch full contents of a document.
**Tool Definition:**
```json
{
"name": "fetch",
"description": "Fetch the full contents of a search result document",
"inputSchema": {
"type": "object",
"properties": {
"id": {
"type": "string",
"description": "Document identifier"
}
},
"required": ["id"]
}
}
```
**Example Request:**
```json
{
"id": "auth-design"
}
```
**Example Response:**
```json
[
{
"type": "text",
"text": "{\"id\": \"auth-design\", \"title\": \"Authentication Design\", \"text\": \"# Authentication Design\\n\\n...\", \"url\": \"auth-design\", \"metadata\": {\"format\": \"markdown\"}}"
}
]
```
**Parsed JSON:**
```json
{
"id": "auth-design",
"title": "Authentication Design",
"text": "# Authentication Design\n\n...",
"url": "auth-design",
"metadata": {
"format": "markdown"
}
}
```
## Configuration
### Remote MCP Server Setup
**Option 1: Deploy to Cloud**
```bash
# Deploy Basic Memory to cloud provider
# Ensure publicly accessible
# Example: Deploy to Fly.io
fly deploy
# Get URL
export MCP_SERVER_URL=https://your-app.fly.dev
```
**Option 2: Use ngrok for Testing**
```bash
# Start Basic Memory locally
bm mcp --port 8000
# Expose via ngrok
ngrok http 8000
# Get public URL
# → https://abc123.ngrok.io
```
### ChatGPT MCP Configuration
**In ChatGPT Developer Mode:**
```json
{
"mcpServers": {
"basic-memory": {
"url": "https://your-server.com/mcp",
"apiKey": "your-api-key-if-needed"
}
}
}
```
**Environment Variables (if using auth):**
```bash
export BASIC_MEMORY_API_KEY=your-secret-key
```
## Usage Examples
### Search Workflow
**User asks ChatGPT:**
> "Search my knowledge base for authentication notes"
**ChatGPT internally calls:**
```json
{
"tool": "search",
"arguments": {
"query": "authentication notes"
}
}
```
**Basic Memory responds:**
```json
[{
"type": "text",
"text": "{\"results\": [{\"id\": \"auth-design\", \"title\": \"Auth Design\", \"url\": \"auth-design\"}, {\"id\": \"oauth-setup\", \"title\": \"OAuth Setup\", \"url\": \"oauth-setup\"}], \"total_count\": 2, \"query\": \"authentication notes\"}"
}]
```
**ChatGPT displays:**
> I found 2 documents about authentication:
> 1. Auth Design
> 2. OAuth Setup
### Fetch Workflow
**User asks ChatGPT:**
> "Show me the Auth Design document"
**ChatGPT internally calls:**
```json
{
"tool": "fetch",
"arguments": {
"id": "auth-design"
}
}
```
**Basic Memory responds:**
```json
[{
"type": "text",
"text": "{\"id\": \"auth-design\", \"title\": \"Auth Design\", \"text\": \"# Auth Design\\n\\n## Overview\\n...full content...\", \"url\": \"auth-design\", \"metadata\": {\"format\": \"markdown\"}}"
}]
```
**ChatGPT displays:**
> Here's the Auth Design document:
>
> # Auth Design
>
> ## Overview
> ...
## Response Schema
### Search Response
```typescript
{
results: Array<{
id: string, // Document permalink
title: string, // Document title
url: string // Document URL/permalink
}>,
total_count: number, // Total results found
query: string // Original query echoed back
}
```
### Fetch Response
```typescript
{
id: string, // Document identifier
title: string, // Document title
text: string, // Full markdown content
url: string, // Document URL/permalink
metadata: {
format: string // "markdown"
}
}
```
### Error Response
```typescript
{
results: [], // Empty for search
error: string, // Error type
error_message: string // Error details
}
```
## Differences from Standard Tools
### ChatGPT Tools vs Standard MCP Tools
| Feature | ChatGPT Tools | Standard Tools |
|---------|---------------|----------------|
| **Tool Names** | `search`, `fetch` | `search_notes`, `read_note` |
| **Response Format** | `[{"type": "text", "text": "..."}]` | Direct JSON |
| **Parameters** | Minimal (query, id) | Rich (project, page, filters) |
| **Project Selection** | Automatic | Explicit or default_project_mode |
| **Pagination** | Fixed (10 results) | Configurable |
| **Error Handling** | JSON error objects | Direct error messages |
### Automatic Defaults
ChatGPT tools use sensible defaults:
```python
# search tool defaults
page = 1
page_size = 10
search_type = "text"
project = None # Auto-resolved
# fetch tool defaults
page = 1
page_size = 10
project = None # Auto-resolved
```
## Project Resolution
### Automatic Project Selection
ChatGPT tools use automatic project resolution:
1. **CLI constraint** (if `--project` flag used)
2. **default_project_mode** (if enabled in config)
3. **Error** if no project can be resolved
**Recommended Setup:**
```json
// ~/.basic-memory/config.json
{
"default_project": "main",
"default_project_mode": true
}
```
This ensures ChatGPT tools work without explicit project parameters.
## Error Handling
### Search Errors
```json
[{
"type": "text",
"text": "{\"results\": [], \"error\": \"Search failed\", \"error_details\": \"Project not found\"}"
}]
```
### Fetch Errors
```json
[{
"type": "text",
"text": "{\"id\": \"missing-doc\", \"title\": \"Fetch Error\", \"text\": \"Failed to fetch document: Not found\", \"url\": \"missing-doc\", \"metadata\": {\"error\": \"Fetch failed\"}}"
}]
```
### Common Errors
**No project found:**
```json
{
"error": "Project required",
"error_message": "No project specified and default_project_mode not enabled"
}
```
**Document not found:**
```json
{
"id": "doc-123",
"title": "Document Not Found",
"text": "# Note Not Found\n\nThe requested document 'doc-123' could not be found",
"metadata": {"error": "Document not found"}
}
```
## Deployment Patterns
### Production Deployment
**1. Deploy to Cloud:**
```bash
# Docker deployment
docker build -t basic-memory .
docker run -p 8000:8000 \
-e BASIC_MEMORY_API_URL=https://api.basicmemory.cloud \
basic-memory mcp --port 8000
# Or use managed hosting
fly deploy
```
**2. Configure ChatGPT:**
```json
{
"mcpServers": {
"basic-memory": {
"url": "https://your-app.fly.dev/mcp"
}
}
}
```
**3. Enable default_project_mode:**
```json
{
"default_project_mode": true,
"default_project": "main"
}
```
### Development/Testing
**1. Use ngrok:**
```bash
# Terminal 1: Start MCP server
bm mcp --port 8000
# Terminal 2: Expose with ngrok
ngrok http 8000
# → https://abc123.ngrok.io
```
**2. Configure ChatGPT:**
```json
{
"mcpServers": {
"basic-memory-dev": {
"url": "https://abc123.ngrok.io/mcp"
}
}
}
```
## Limitations
### ChatGPT-Specific Constraints
1. **Remote only** - Cannot use local MCP server
2. **No streaming** - Results returned all at once
3. **Fixed pagination** - 10 results per search
4. **Simplified parameters** - Cannot specify advanced filters
5. **No project selection** - Must use default_project_mode
6. **Subscription required** - ChatGPT Plus/Pro only
### Workarounds
**For more results:**
- Refine search query
- Use fetch to get full documents
- Deploy multiple searches
**For project selection:**
- Enable default_project_mode
- Or deploy separate instances per project
**For advanced features:**
- Use Claude Desktop with full MCP tools
- Or use Basic Memory CLI directly
## Troubleshooting
### ChatGPT Can't Connect
**Problem:** ChatGPT shows "MCP server unavailable"
**Solutions:**
1. Verify server is publicly accessible
```bash
curl https://your-server.com/mcp/health
```
2. Check firewall/security groups
3. Verify HTTPS (not HTTP)
4. Check API key if using auth
### No Results Returned
**Problem:** Search returns empty results
**Solutions:**
1. Check default_project_mode enabled
```json
{"default_project_mode": true}
```
2. Verify data is synced
```bash
bm sync --project main
```
3. Test search locally
```bash
bm tools search --query "test"
```
### Format Errors
**Problem:** ChatGPT shows parsing errors
**Check response format:**
```python
# Must be wrapped array
[{"type": "text", "text": "{...json...}"}]
# NOT direct JSON
{"results": [...]}
```
### Developer Mode Not Available
**Problem:** Can't find Developer Mode in ChatGPT
**Solution:**
- Ensure ChatGPT Plus/Pro subscription
- Check for feature rollout (may not be available in all regions)
- Contact OpenAI support
## Best Practices
### 1. Enable default_project_mode
```json
{
"default_project_mode": true,
"default_project": "main"
}
```
### 2. Use Cloud Deployment
Don't rely on ngrok for production:
```bash
# Production deployment
fly deploy
# or
railway up
# or
vercel deploy
```
### 3. Monitor Usage
```bash
# Enable logging
export BASIC_MEMORY_LOG_LEVEL=INFO
# Monitor requests
tail -f /var/log/basic-memory/mcp.log
```
### 4. Secure Your Server
```bash
# Use API key authentication
export BASIC_MEMORY_API_KEY=secret
# Restrict CORS
export BASIC_MEMORY_ALLOWED_ORIGINS=https://chatgpt.com
```
### 5. Test Locally First
```bash
# Test with curl
curl -X POST https://your-server.com/mcp/tools/search \
-H "Content-Type: application/json" \
-d '{"query": "test"}'
```
## Comparison with Claude Desktop
| Feature | ChatGPT | Claude Desktop |
|---------|---------|----------------|
| **MCP Mode** | Remote only | Local or Remote |
| **Tools** | 2 (search, fetch) | 17+ (full suite) |
| **Response Format** | OpenAI-specific | Standard MCP |
| **Project Support** | Default only | Full multi-project |
| **Subscription** | Plus/Pro required | Free (Claude) |
| **Configuration** | Developer mode | Config file |
| **Performance** | Network latency | Local (instant) |
**Recommendation:** Use Claude Desktop for full features, ChatGPT for convenience
## See Also
- ChatGPT MCP documentation: https://platform.openai.com/docs/mcp
- `default-project-mode.md` - Required for ChatGPT tools
- `cloud-mode-usage.md` - Deploying MCP to cloud
- Standard MCP tools documentation
```
--------------------------------------------------------------------------------
/tests/sync/test_character_conflicts.py:
--------------------------------------------------------------------------------
```python
"""Test character-related sync conflicts and permalink generation."""
from pathlib import Path
from textwrap import dedent
import pytest
from sqlalchemy.exc import IntegrityError
from basic_memory.config import ProjectConfig
from basic_memory.repository import EntityRepository
from basic_memory.sync.sync_service import SyncService
from basic_memory.utils import (
generate_permalink,
normalize_file_path_for_comparison,
detect_potential_file_conflicts,
)
async def create_test_file(path: Path, content: str = "test content") -> None:
"""Create a test file with given content."""
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content)
class TestUtilityFunctions:
"""Test utility functions for file path normalization and conflict detection."""
def test_normalize_file_path_for_comparison(self):
"""Test file path normalization for conflict detection."""
# Case sensitivity normalization
assert (
normalize_file_path_for_comparison("Finance/Investment.md") == "finance/investment.md"
)
assert (
normalize_file_path_for_comparison("FINANCE/INVESTMENT.MD") == "finance/investment.md"
)
# Path separator normalization
assert (
normalize_file_path_for_comparison("Finance\\Investment.md") == "finance/investment.md"
)
# Multiple slash handling
assert (
normalize_file_path_for_comparison("Finance//Investment.md") == "finance/investment.md"
)
def test_detect_potential_file_conflicts(self):
"""Test the enhanced conflict detection function."""
existing_paths = [
"Finance/Investment.md",
"finance/Investment.md",
"docs/my-feature.md",
"docs/my feature.md",
]
# Case sensitivity conflict
conflicts = detect_potential_file_conflicts("FINANCE/INVESTMENT.md", existing_paths)
assert "Finance/Investment.md" in conflicts
assert "finance/Investment.md" in conflicts
# Permalink conflict (space vs hyphen)
conflicts = detect_potential_file_conflicts("docs/my_feature.md", existing_paths)
assert "docs/my-feature.md" in conflicts
assert "docs/my feature.md" in conflicts
class TestPermalinkGeneration:
"""Test permalink generation with various character scenarios."""
def test_hyphen_handling(self):
"""Test that hyphens in filenames are handled consistently."""
# File with existing hyphens
assert generate_permalink("docs/my-feature.md") == "docs/my-feature"
assert generate_permalink("docs/basic-memory bug.md") == "docs/basic-memory-bug"
# File with spaces that become hyphens
assert generate_permalink("docs/my feature.md") == "docs/my-feature"
# Mixed scenarios
assert generate_permalink("docs/my-old feature.md") == "docs/my-old-feature"
def test_forward_slash_handling(self):
"""Test that forward slashes are handled properly."""
# Normal directory structure
assert generate_permalink("Finance/Investment.md") == "finance/investment"
# Path with spaces in directory names
assert generate_permalink("My Finance/Investment.md") == "my-finance/investment"
def test_case_sensitivity_normalization(self):
"""Test that case differences are normalized consistently."""
# Same logical path with different cases
assert generate_permalink("Finance/Investment.md") == "finance/investment"
assert generate_permalink("finance/Investment.md") == "finance/investment"
assert generate_permalink("FINANCE/INVESTMENT.md") == "finance/investment"
def test_unicode_character_handling(self):
"""Test that international characters are handled properly."""
# Italian characters as mentioned in user feedback
assert (
generate_permalink("Finance/Punti Chiave di Peter Lynch.md")
== "finance/punti-chiave-di-peter-lynch"
)
# Chinese characters (should be preserved)
assert generate_permalink("中文/测试文档.md") == "中文/测试文档"
# Mixed international characters
assert generate_permalink("docs/Café München.md") == "docs/cafe-munchen"
def test_special_punctuation(self):
"""Test handling of special punctuation characters."""
# Apostrophes should be removed
assert generate_permalink("Peter's Guide.md") == "peters-guide"
# Other punctuation should become hyphens
assert generate_permalink("Q&A Session.md") == "q-a-session"
@pytest.mark.asyncio
class TestSyncConflictHandling:
"""Test sync service handling of file path and permalink conflicts."""
async def test_file_path_conflict_detection(
self,
sync_service: SyncService,
project_config: ProjectConfig,
entity_repository: EntityRepository,
):
"""Test that file path conflicts are detected during move operations."""
project_dir = project_config.home
# Create two files
content1 = dedent("""
---
type: knowledge
---
# Document One
This is the first document.
""")
content2 = dedent("""
---
type: knowledge
---
# Document Two
This is the second document.
""")
await create_test_file(project_dir / "doc1.md", content1)
await create_test_file(project_dir / "doc2.md", content2)
# Initial sync
await sync_service.sync(project_config.home)
# Verify both entities exist
entities = await entity_repository.find_all()
assert len(entities) == 2
# Now simulate a move where doc1.md tries to move to doc2.md's location
# This should be handled gracefully, not throw an IntegrityError
# First, get the entities
entity1 = await entity_repository.get_by_file_path("doc1.md")
entity2 = await entity_repository.get_by_file_path("doc2.md")
assert entity1 is not None
assert entity2 is not None
# Simulate the conflict scenario
with pytest.raises(Exception) as exc_info:
# This should detect the conflict and handle it gracefully
await sync_service.handle_move("doc1.md", "doc2.md")
# The exception should be a meaningful error, not an IntegrityError
assert not isinstance(exc_info.value, IntegrityError)
async def test_hyphen_filename_conflict(
self,
sync_service: SyncService,
project_config: ProjectConfig,
entity_repository: EntityRepository,
):
"""Test conflict when filename with hyphens conflicts with generated permalink."""
project_dir = project_config.home
# Create file with spaces (will generate permalink with hyphens)
content1 = dedent("""
---
type: knowledge
---
# Basic Memory Bug
This file has spaces in the name.
""")
# Create file with hyphens (already has hyphens in filename)
content2 = dedent("""
---
type: knowledge
---
# Basic Memory Bug Report
This file has hyphens in the name.
""")
await create_test_file(project_dir / "basic memory bug.md", content1)
await create_test_file(project_dir / "basic-memory-bug.md", content2)
# Sync should handle this without conflict
await sync_service.sync(project_config.home)
# Verify both entities were created with unique permalinks
entities = await entity_repository.find_all()
assert len(entities) == 2
# Check that permalinks are unique
permalinks = [entity.permalink for entity in entities if entity.permalink]
assert len(set(permalinks)) == len(permalinks), "Permalinks should be unique"
async def test_case_sensitivity_conflict(
self,
sync_service: SyncService,
project_config: ProjectConfig,
entity_repository: EntityRepository,
):
"""Test conflict handling when case differences cause issues."""
import platform
project_dir = project_config.home
# Create directory structure that might cause case conflicts
(project_dir / "Finance").mkdir(parents=True, exist_ok=True)
(project_dir / "finance").mkdir(parents=True, exist_ok=True)
content1 = dedent("""
---
type: knowledge
---
# Investment Guide
Upper case directory.
""")
content2 = dedent("""
---
type: knowledge
---
# Investment Tips
Lower case directory.
""")
await create_test_file(project_dir / "Finance" / "investment.md", content1)
await create_test_file(project_dir / "finance" / "investment.md", content2)
# Sync should handle case differences properly
await sync_service.sync(project_config.home)
# Verify entities were created
entities = await entity_repository.find_all()
# On case-insensitive file systems (macOS, Windows), only one entity will be created
# On case-sensitive file systems (Linux), two entities will be created
if platform.system() in ["Darwin", "Windows"]:
# Case-insensitive file systems
assert len(entities) >= 1
# Only one of the paths will exist
file_paths = [entity.file_path for entity in entities]
assert any(
path in ["Finance/investment.md", "finance/investment.md"] for path in file_paths
)
else:
# Case-sensitive file systems (Linux)
assert len(entities) >= 2
# Check that file paths are preserved correctly
file_paths = [entity.file_path for entity in entities]
assert "Finance/investment.md" in file_paths
assert "finance/investment.md" in file_paths
async def test_move_conflict_resolution(
self,
sync_service: SyncService,
project_config: ProjectConfig,
entity_repository: EntityRepository,
):
"""Test that move conflicts are resolved with proper error handling."""
project_dir = project_config.home
# Create three files in a scenario that could cause move conflicts
await create_test_file(project_dir / "file-a.md", "# File A")
await create_test_file(project_dir / "file-b.md", "# File B")
await create_test_file(project_dir / "temp.md", "# Temp File")
# Initial sync
await sync_service.sync(project_config.home)
# Simulate a complex move scenario where files swap locations
# This is the kind of scenario that caused the original bug
# Get the entities
entity_a = await entity_repository.get_by_file_path("file-a.md")
entity_b = await entity_repository.get_by_file_path("file-b.md")
entity_temp = await entity_repository.get_by_file_path("temp.md")
assert all([entity_a, entity_b, entity_temp])
# Try to move file-a to file-b's location (should detect conflict)
try:
await sync_service.handle_move("file-a.md", "file-b.md")
# If this doesn't raise an exception, the conflict was resolved
# Verify the state is consistent
updated_entities = await entity_repository.find_all()
file_paths = [entity.file_path for entity in updated_entities]
# Should not have duplicate file paths
assert len(file_paths) == len(set(file_paths)), "File paths should be unique"
except Exception as e:
# If an exception is raised, it should be a meaningful error
assert "conflict" in str(e).lower() or "already exists" in str(e).lower()
assert not isinstance(e, IntegrityError), "Should not be a raw IntegrityError"
@pytest.mark.asyncio
class TestEnhancedErrorMessages:
"""Test that error messages provide helpful guidance for character conflicts."""
async def test_helpful_error_for_hyphen_conflict(
self,
sync_service: SyncService,
project_config: ProjectConfig,
):
"""Test that hyphen conflicts generate helpful error messages."""
# This test will be implemented after we enhance the error handling
pass
async def test_helpful_error_for_case_conflict(
self,
sync_service: SyncService,
project_config: ProjectConfig,
):
"""Test that case sensitivity conflicts generate helpful error messages."""
# This test will be implemented after we enhance the error handling
pass
```
--------------------------------------------------------------------------------
/test-int/test_sync_performance_benchmark.py:
--------------------------------------------------------------------------------
```python
"""
Performance benchmark tests for sync operations.
These tests measure baseline performance for indexing operations to track
improvements from optimizations. Tests are marked with @pytest.mark.benchmark
and can be run separately.
Usage:
# Run all benchmarks
pytest test-int/test_sync_performance_benchmark.py -v
# Run specific benchmark
pytest test-int/test_sync_performance_benchmark.py::test_benchmark_sync_100_files -v
"""
import asyncio
import time
from pathlib import Path
from textwrap import dedent
import pytest
from basic_memory.config import BasicMemoryConfig, ProjectConfig
from basic_memory.sync.sync_service import get_sync_service
async def create_benchmark_file(path: Path, file_num: int, total_files: int) -> None:
"""Create a realistic test markdown file with observations and relations.
Args:
path: Path to create the file at
file_num: Current file number (for unique content)
total_files: Total number of files being created (for relation targets)
"""
# Create realistic content with varying complexity
has_relations = file_num < (total_files - 1) # Most files have relations
num_observations = min(3 + (file_num % 5), 10) # 3-10 observations per file
# Generate relation targets (some will be forward references)
relations = []
if has_relations:
# Reference 1-3 other files
num_relations = min(1 + (file_num % 3), 3)
for i in range(num_relations):
target_num = (file_num + i + 1) % total_files
relations.append(f"- relates_to [[test-file-{target_num:04d}]]")
content = dedent(f"""
---
type: note
tags: [benchmark, test, category-{file_num % 10}]
---
# Test File {file_num:04d}
This is benchmark test file {file_num} of {total_files}.
It contains realistic markdown content to simulate actual usage.
## Observations
{chr(10).join([f"- [category-{i % 5}] Observation {i} for file {file_num} with some content #tag{i}" for i in range(num_observations)])}
## Relations
{chr(10).join(relations) if relations else "- No relations for this file"}
## Additional Content
This section contains additional prose to simulate real documents.
Lorem ipsum dolor sit amet, consectetur adipiscing elit. Sed do eiusmod
tempor incididunt ut labore et dolore magna aliqua.
### Subsection
More content here to make the file realistic. This helps test the
full indexing pipeline including content extraction and search indexing.
""").strip()
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content, encoding="utf-8")
async def generate_benchmark_files(project_dir: Path, num_files: int) -> None:
"""Generate benchmark test files.
Args:
project_dir: Directory to create files in
num_files: Number of files to generate
"""
print(f"\nGenerating {num_files} test files...")
start = time.time()
# Create files in batches for faster generation
batch_size = 100
for batch_start in range(0, num_files, batch_size):
batch_end = min(batch_start + batch_size, num_files)
tasks = [
create_benchmark_file(
project_dir / f"category-{i % 10}" / f"test-file-{i:04d}.md", i, num_files
)
for i in range(batch_start, batch_end)
]
await asyncio.gather(*tasks)
print(f" Created files {batch_start}-{batch_end} ({batch_end}/{num_files})")
duration = time.time() - start
print(f" File generation completed in {duration:.2f}s ({num_files / duration:.1f} files/sec)")
def get_db_size(db_path: Path) -> tuple[int, str]:
"""Get database file size.
Returns:
Tuple of (size_bytes, formatted_size)
"""
if not db_path.exists():
return 0, "0 B"
size_bytes = db_path.stat().st_size
# Format size
for unit in ["B", "KB", "MB", "GB"]:
if size_bytes < 1024.0:
return size_bytes, f"{size_bytes:.2f} {unit}"
size_bytes /= 1024.0
return int(size_bytes * 1024**4), f"{size_bytes:.2f} TB"
async def run_sync_benchmark(
project_config: ProjectConfig, app_config: BasicMemoryConfig, num_files: int, test_name: str
) -> dict:
"""Run a sync benchmark and collect metrics.
Args:
project_config: Project configuration
app_config: App configuration
num_files: Number of files to benchmark
test_name: Name of the test for reporting
Returns:
Dictionary with benchmark results
"""
project_dir = project_config.home
db_path = app_config.database_path
print(f"\n{'=' * 70}")
print(f"BENCHMARK: {test_name}")
print(f"{'=' * 70}")
# Generate test files
await generate_benchmark_files(project_dir, num_files)
# Get initial DB size
initial_db_size, initial_db_formatted = get_db_size(db_path)
print(f"\nInitial database size: {initial_db_formatted}")
# Create sync service
from basic_memory.repository import ProjectRepository
from basic_memory import db
_, session_maker = await db.get_or_create_db(
db_path=app_config.database_path,
db_type=db.DatabaseType.FILESYSTEM,
)
project_repository = ProjectRepository(session_maker)
# Get or create project
projects = await project_repository.find_all()
if projects:
project = projects[0]
else:
project = await project_repository.create(
{
"name": project_config.name,
"path": str(project_config.home),
"is_active": True,
"is_default": True,
}
)
sync_service = await get_sync_service(project)
# Initialize search index (required for FTS5 table)
await sync_service.search_service.init_search_index()
# Run sync and measure time
print(f"\nStarting sync of {num_files} files...")
sync_start = time.time()
report = await sync_service.sync(project_dir, project_name=project.name)
sync_duration = time.time() - sync_start
# Get final DB size
final_db_size, final_db_formatted = get_db_size(db_path)
db_growth = final_db_size - initial_db_size
db_growth_formatted = f"{db_growth / 1024 / 1024:.2f} MB"
# Calculate metrics
files_per_sec = num_files / sync_duration if sync_duration > 0 else 0
ms_per_file = (sync_duration * 1000) / num_files if num_files > 0 else 0
# Print results
print(f"\n{'-' * 70}")
print("RESULTS:")
print(f"{'-' * 70}")
print(f"Files processed: {num_files}")
print(f" New: {len(report.new)}")
print(f" Modified: {len(report.modified)}")
print(f" Deleted: {len(report.deleted)}")
print(f" Moved: {len(report.moves)}")
print("\nPerformance:")
print(f" Total time: {sync_duration:.2f}s")
print(f" Files/sec: {files_per_sec:.1f}")
print(f" ms/file: {ms_per_file:.1f}")
print("\nDatabase:")
print(f" Initial size: {initial_db_formatted}")
print(f" Final size: {final_db_formatted}")
print(f" Growth: {db_growth_formatted}")
print(f" Growth per file: {(db_growth / num_files / 1024):.2f} KB")
print(f"{'=' * 70}\n")
return {
"test_name": test_name,
"num_files": num_files,
"sync_duration_sec": sync_duration,
"files_per_sec": files_per_sec,
"ms_per_file": ms_per_file,
"new_files": len(report.new),
"modified_files": len(report.modified),
"deleted_files": len(report.deleted),
"moved_files": len(report.moves),
"initial_db_size": initial_db_size,
"final_db_size": final_db_size,
"db_growth_bytes": db_growth,
"db_growth_per_file_bytes": db_growth / num_files if num_files > 0 else 0,
}
@pytest.mark.benchmark
@pytest.mark.asyncio
async def test_benchmark_sync_100_files(app_config, project_config, config_manager):
"""Benchmark: Sync 100 files (small repository)."""
results = await run_sync_benchmark(
project_config, app_config, num_files=100, test_name="Sync 100 files (small repository)"
)
# Basic assertions to ensure sync worked
# Note: May be slightly more than 100 due to OS-generated files (.DS_Store, etc.)
assert results["new_files"] >= 100
assert results["sync_duration_sec"] > 0
assert results["files_per_sec"] > 0
@pytest.mark.benchmark
@pytest.mark.asyncio
async def test_benchmark_sync_500_files(app_config, project_config, config_manager):
"""Benchmark: Sync 500 files (medium repository)."""
results = await run_sync_benchmark(
project_config, app_config, num_files=500, test_name="Sync 500 files (medium repository)"
)
# Basic assertions
# Note: May be slightly more than 500 due to OS-generated files
assert results["new_files"] >= 500
assert results["sync_duration_sec"] > 0
assert results["files_per_sec"] > 0
@pytest.mark.benchmark
@pytest.mark.asyncio
@pytest.mark.slow
async def test_benchmark_sync_1000_files(app_config, project_config, config_manager):
"""Benchmark: Sync 1000 files (large repository).
This test is marked as 'slow' and can be skipped in regular test runs:
pytest -m "not slow"
"""
results = await run_sync_benchmark(
project_config, app_config, num_files=1000, test_name="Sync 1000 files (large repository)"
)
# Basic assertions
# Note: May be slightly more than 1000 due to OS-generated files
assert results["new_files"] >= 1000
assert results["sync_duration_sec"] > 0
assert results["files_per_sec"] > 0
@pytest.mark.benchmark
@pytest.mark.asyncio
async def test_benchmark_resync_no_changes(app_config, project_config, config_manager):
"""Benchmark: Re-sync with no changes (should be fast).
This tests the performance of scanning files when nothing has changed,
which is important for cloud restarts.
"""
project_dir = project_config.home
num_files = 100
# First sync
print(f"\nFirst sync of {num_files} files...")
await generate_benchmark_files(project_dir, num_files)
from basic_memory.repository import ProjectRepository
from basic_memory import db
_, session_maker = await db.get_or_create_db(
db_path=app_config.database_path,
db_type=db.DatabaseType.FILESYSTEM,
)
project_repository = ProjectRepository(session_maker)
projects = await project_repository.find_all()
if projects:
project = projects[0]
else:
project = await project_repository.create(
{
"name": project_config.name,
"path": str(project_config.home),
"is_active": True,
"is_default": True,
}
)
sync_service = await get_sync_service(project)
# Initialize search index
await sync_service.search_service.init_search_index()
await sync_service.sync(project_dir, project_name=project.name)
# Second sync (no changes)
print("\nRe-sync with no changes...")
resync_start = time.time()
report = await sync_service.sync(project_dir, project_name=project.name)
resync_duration = time.time() - resync_start
print(f"\n{'-' * 70}")
print("RE-SYNC RESULTS (no changes):")
print(f"{'-' * 70}")
print(f"Files scanned: {num_files}")
print(f"Changes detected: {report.total}")
print(f" New: {len(report.new)}")
print(f" Modified: {len(report.modified)}")
print(f" Deleted: {len(report.deleted)}")
print(f" Moved: {len(report.moves)}")
print(f"Duration: {resync_duration:.2f}s")
print(f"Files/sec: {num_files / resync_duration:.1f}")
# Debug: Show what changed
if report.total > 0:
print("\n⚠️ UNEXPECTED CHANGES DETECTED:")
if report.new:
print(f" New files ({len(report.new)}): {list(report.new)[:5]}")
if report.modified:
print(f" Modified files ({len(report.modified)}): {list(report.modified)[:5]}")
if report.deleted:
print(f" Deleted files ({len(report.deleted)}): {list(report.deleted)[:5]}")
if report.moves:
print(f" Moved files ({len(report.moves)}): {dict(list(report.moves.items())[:5])}")
print(f"{'=' * 70}\n")
# Should be no changes
assert report.total == 0, (
f"Expected no changes but got {report.total}: new={len(report.new)}, modified={len(report.modified)}, deleted={len(report.deleted)}, moves={len(report.moves)}"
)
assert len(report.new) == 0
assert len(report.modified) == 0
assert len(report.deleted) == 0
```
--------------------------------------------------------------------------------
/tests/services/test_context_service.py:
--------------------------------------------------------------------------------
```python
"""Tests for context service."""
from datetime import datetime, timedelta, UTC
import pytest
import pytest_asyncio
from basic_memory.repository.search_repository import SearchIndexRow
from basic_memory.schemas.memory import memory_url, memory_url_path
from basic_memory.schemas.search import SearchItemType
from basic_memory.services.context_service import ContextService
from basic_memory.models.knowledge import Entity, Relation
from basic_memory.models.project import Project
@pytest_asyncio.fixture
async def context_service(search_repository, entity_repository, observation_repository):
"""Create context service for testing."""
return ContextService(search_repository, entity_repository, observation_repository)
@pytest.mark.asyncio
async def test_find_connected_depth_limit(context_service, test_graph):
"""Test depth limiting works.
Our traversal path is:
- Depth 0: Root
- Depth 1: Relations + directly connected entities (Connected1, Connected2)
- Depth 2: Relations + next level entities (Deep)
"""
type_id_pairs = [("entity", test_graph["root"].id)]
# With depth=1, we get direct connections
# shallow_results = await context_service.find_related(type_id_pairs, max_depth=1)
# shallow_entities = {(r.id, r.type) for r in shallow_results if r.type == "entity"}
#
# assert (test_graph["deep"].id, "entity") not in shallow_entities
# search deeper
deep_results = await context_service.find_related(type_id_pairs, max_depth=3, max_results=100)
deep_entities = {(r.id, r.type) for r in deep_results if r.type == "entity"}
print(deep_entities)
# Should now include Deep entity
assert (test_graph["deep"].id, "entity") in deep_entities
@pytest.mark.asyncio
async def test_find_connected_timeframe(
context_service, test_graph, search_repository, entity_repository
):
"""Test timeframe filtering.
This tests how traversal is affected by the item dates.
When we filter by date, items are only included if:
1. They match the timeframe
2. There is a valid path to them through other items in the timeframe
"""
now = datetime.now(UTC)
old_date = now - timedelta(days=10)
recent_date = now - timedelta(days=1)
# Update entity table timestamps directly
# Root entity uses old date
root_entity = test_graph["root"]
await entity_repository.update(root_entity.id, {"created_at": old_date, "updated_at": old_date})
# Connected entity uses recent date
connected_entity = test_graph["connected1"]
await entity_repository.update(
connected_entity.id, {"created_at": recent_date, "updated_at": recent_date}
)
# Also update search_index for test consistency
await search_repository.index_item(
SearchIndexRow(
project_id=entity_repository.project_id,
id=test_graph["root"].id,
title=test_graph["root"].title,
content_snippet="Root content",
permalink=test_graph["root"].permalink,
file_path=test_graph["root"].file_path,
type=SearchItemType.ENTITY,
metadata={"created_at": old_date.isoformat()},
created_at=old_date.isoformat(),
updated_at=old_date.isoformat(),
)
)
await search_repository.index_item(
SearchIndexRow(
project_id=entity_repository.project_id,
id=test_graph["relations"][0].id,
title="Root Entity → Connected Entity 1",
content_snippet="",
permalink=f"{test_graph['root'].permalink}/connects_to/{test_graph['connected1'].permalink}",
file_path=test_graph["root"].file_path,
type=SearchItemType.RELATION,
from_id=test_graph["root"].id,
to_id=test_graph["connected1"].id,
relation_type="connects_to",
metadata={"created_at": old_date.isoformat()},
created_at=old_date.isoformat(),
updated_at=old_date.isoformat(),
)
)
await search_repository.index_item(
SearchIndexRow(
project_id=entity_repository.project_id,
id=test_graph["connected1"].id,
title=test_graph["connected1"].title,
content_snippet="Connected 1 content",
permalink=test_graph["connected1"].permalink,
file_path=test_graph["connected1"].file_path,
type=SearchItemType.ENTITY,
metadata={"created_at": recent_date.isoformat()},
created_at=recent_date.isoformat(),
updated_at=recent_date.isoformat(),
)
)
type_id_pairs = [("entity", test_graph["root"].id)]
# Search with a 7-day cutoff
since_date = now - timedelta(days=7)
results = await context_service.find_related(type_id_pairs, since=since_date)
# Only connected1 is recent, but we can't get to it
# because its connecting relation is too old and is filtered out
# (we can only reach connected1 through a relation starting from root)
entity_ids = {r.id for r in results if r.type == "entity"}
assert len(entity_ids) == 0 # No accessible entities within timeframe
@pytest.mark.asyncio
async def test_build_context(context_service, test_graph):
"""Test exact permalink lookup."""
url = memory_url.validate_strings("memory://test/root")
context_result = await context_service.build_context(url)
# Check metadata
assert context_result.metadata.uri == memory_url_path(url)
assert context_result.metadata.depth == 1
assert context_result.metadata.primary_count == 1
assert context_result.metadata.related_count > 0
assert context_result.metadata.generated_at is not None
# Check results
assert len(context_result.results) == 1
context_item = context_result.results[0]
# Check primary result
primary_result = context_item.primary_result
assert primary_result.id == test_graph["root"].id
assert primary_result.type == "entity"
assert primary_result.title == "Root"
assert primary_result.permalink == "test/root"
assert primary_result.file_path == "test/Root.md"
assert primary_result.created_at is not None
# Check related results
assert len(context_item.related_results) > 0
# Find related relation
relation = next((r for r in context_item.related_results if r.type == "relation"), None)
assert relation is not None
assert relation.relation_type == "connects_to"
assert relation.from_id == test_graph["root"].id
assert relation.to_id == test_graph["connected1"].id
# Find related entity
related_entity = next((r for r in context_item.related_results if r.type == "entity"), None)
assert related_entity is not None
assert related_entity.id == test_graph["connected1"].id
assert related_entity.title == test_graph["connected1"].title
assert related_entity.permalink == test_graph["connected1"].permalink
@pytest.mark.asyncio
async def test_build_context_with_observations(context_service, test_graph):
"""Test context building with observations."""
# The test_graph fixture already creates observations for root entity
# Let's use those existing observations
# Build context
url = memory_url.validate_strings("memory://test/root")
context_result = await context_service.build_context(url, include_observations=True)
# Check the metadata
assert context_result.metadata.total_observations > 0
assert len(context_result.results) == 1
# Check that observations were included
context_item = context_result.results[0]
assert len(context_item.observations) > 0
# Check observation properties
for observation in context_item.observations:
assert observation.type == "observation"
assert observation.category in ["note", "tech"] # Categories from test_graph fixture
assert observation.entity_id == test_graph["root"].id
# Verify at least one observation has the correct category and content
note_observation = next((o for o in context_item.observations if o.category == "note"), None)
assert note_observation is not None
assert "Root note" in note_observation.content
@pytest.mark.asyncio
async def test_build_context_not_found(context_service):
"""Test handling non-existent permalinks."""
context = await context_service.build_context("memory://does/not/exist")
assert len(context.results) == 0
assert context.metadata.primary_count == 0
assert context.metadata.related_count == 0
@pytest.mark.asyncio
async def test_context_metadata(context_service, test_graph):
"""Test metadata is correctly populated."""
context = await context_service.build_context("memory://test/root", depth=2)
metadata = context.metadata
assert metadata.uri == "test/root"
assert metadata.depth == 2
assert metadata.generated_at is not None
assert metadata.primary_count > 0
@pytest.mark.asyncio
async def test_project_isolation_in_find_related(session_maker):
"""Test that find_related respects project boundaries and doesn't leak data."""
from basic_memory.repository.entity_repository import EntityRepository
from basic_memory.repository.observation_repository import ObservationRepository
from basic_memory.repository.search_repository import SearchRepository
from basic_memory import db
# Create database session
async with db.scoped_session(session_maker) as db_session:
# Create two separate projects
project1 = Project(name="project1", path="/test1")
project2 = Project(name="project2", path="/test2")
db_session.add(project1)
db_session.add(project2)
await db_session.flush()
# Create entities in project1
entity1_p1 = Entity(
title="Entity1_P1",
entity_type="document",
content_type="text/markdown",
project_id=project1.id,
permalink="project1/entity1",
file_path="project1/entity1.md",
created_at=datetime.now(UTC),
updated_at=datetime.now(UTC),
)
entity2_p1 = Entity(
title="Entity2_P1",
entity_type="document",
content_type="text/markdown",
project_id=project1.id,
permalink="project1/entity2",
file_path="project1/entity2.md",
created_at=datetime.now(UTC),
updated_at=datetime.now(UTC),
)
# Create entities in project2
entity1_p2 = Entity(
title="Entity1_P2",
entity_type="document",
content_type="text/markdown",
project_id=project2.id,
permalink="project2/entity1",
file_path="project2/entity1.md",
created_at=datetime.now(UTC),
updated_at=datetime.now(UTC),
)
db_session.add_all([entity1_p1, entity2_p1, entity1_p2])
await db_session.flush()
# Create relation in project1 (between entities of project1)
relation_p1 = Relation(
from_id=entity1_p1.id,
to_id=entity2_p1.id,
to_name="Entity2_P1",
relation_type="connects_to",
)
db_session.add(relation_p1)
await db_session.commit()
# Create repositories for project1
search_repo_p1 = SearchRepository(session_maker, project1.id)
entity_repo_p1 = EntityRepository(session_maker, project1.id)
obs_repo_p1 = ObservationRepository(session_maker, project1.id)
context_service_p1 = ContextService(search_repo_p1, entity_repo_p1, obs_repo_p1)
# Create repositories for project2
search_repo_p2 = SearchRepository(session_maker, project2.id)
entity_repo_p2 = EntityRepository(session_maker, project2.id)
obs_repo_p2 = ObservationRepository(session_maker, project2.id)
context_service_p2 = ContextService(search_repo_p2, entity_repo_p2, obs_repo_p2)
# Test: find_related for project1 should only return project1 entities
type_id_pairs_p1 = [("entity", entity1_p1.id)]
related_p1 = await context_service_p1.find_related(type_id_pairs_p1, max_depth=2)
# Verify only project1 entities are returned
related_entity_ids = [r.id for r in related_p1 if r.type == "entity"]
assert entity2_p1.id in related_entity_ids # Should find connected entity2 in project1
assert entity1_p2.id not in related_entity_ids # Should NOT find entity from project2
# Test: find_related for project2 should return empty (no relations)
type_id_pairs_p2 = [("entity", entity1_p2.id)]
related_p2 = await context_service_p2.find_related(type_id_pairs_p2, max_depth=2)
# Project2 has no relations, so should return empty
assert len(related_p2) == 0
# Double-check: verify entities exist in their respective projects
assert entity1_p1.project_id == project1.id
assert entity2_p1.project_id == project1.id
assert entity1_p2.project_id == project2.id
```
--------------------------------------------------------------------------------
/src/basic_memory/repository/entity_repository.py:
--------------------------------------------------------------------------------
```python
"""Repository for managing entities in the knowledge graph."""
from pathlib import Path
from typing import List, Optional, Sequence, Union
from loguru import logger
from sqlalchemy import select
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from sqlalchemy.orm import selectinload
from sqlalchemy.orm.interfaces import LoaderOption
from basic_memory import db
from basic_memory.models.knowledge import Entity, Observation, Relation
from basic_memory.repository.repository import Repository
class EntityRepository(Repository[Entity]):
"""Repository for Entity model.
Note: All file paths are stored as strings in the database. Convert Path objects
to strings before passing to repository methods.
"""
def __init__(self, session_maker: async_sessionmaker[AsyncSession], project_id: int):
"""Initialize with session maker and project_id filter.
Args:
session_maker: SQLAlchemy session maker
project_id: Project ID to filter all operations by
"""
super().__init__(session_maker, Entity, project_id=project_id)
async def get_by_permalink(self, permalink: str) -> Optional[Entity]:
"""Get entity by permalink.
Args:
permalink: Unique identifier for the entity
"""
query = self.select().where(Entity.permalink == permalink).options(*self.get_load_options())
return await self.find_one(query)
async def get_by_title(self, title: str) -> Sequence[Entity]:
"""Get entity by title.
Args:
title: Title of the entity to find
"""
query = self.select().where(Entity.title == title).options(*self.get_load_options())
result = await self.execute_query(query)
return list(result.scalars().all())
async def get_by_file_path(self, file_path: Union[Path, str]) -> Optional[Entity]:
"""Get entity by file_path.
Args:
file_path: Path to the entity file (will be converted to string internally)
"""
query = (
self.select()
.where(Entity.file_path == Path(file_path).as_posix())
.options(*self.get_load_options())
)
return await self.find_one(query)
async def find_by_checksum(self, checksum: str) -> Sequence[Entity]:
"""Find entities with the given checksum.
Used for move detection - finds entities that may have been moved to a new path.
Multiple entities may have the same checksum if files were copied.
Args:
checksum: File content checksum to search for
Returns:
Sequence of entities with matching checksum (may be empty)
"""
query = self.select().where(Entity.checksum == checksum)
# Don't load relationships for move detection - we only need file_path and checksum
result = await self.execute_query(query, use_query_options=False)
return list(result.scalars().all())
async def delete_by_file_path(self, file_path: Union[Path, str]) -> bool:
"""Delete entity with the provided file_path.
Args:
file_path: Path to the entity file (will be converted to string internally)
"""
return await self.delete_by_fields(file_path=Path(file_path).as_posix())
def get_load_options(self) -> List[LoaderOption]:
"""Get SQLAlchemy loader options for eager loading relationships."""
return [
selectinload(Entity.observations).selectinload(Observation.entity),
# Load from_relations and both entities for each relation
selectinload(Entity.outgoing_relations).selectinload(Relation.from_entity),
selectinload(Entity.outgoing_relations).selectinload(Relation.to_entity),
# Load to_relations and both entities for each relation
selectinload(Entity.incoming_relations).selectinload(Relation.from_entity),
selectinload(Entity.incoming_relations).selectinload(Relation.to_entity),
]
async def find_by_permalinks(self, permalinks: List[str]) -> Sequence[Entity]:
"""Find multiple entities by their permalink.
Args:
permalinks: List of permalink strings to find
"""
# Handle empty input explicitly
if not permalinks:
return []
# Use existing select pattern
query = (
self.select().options(*self.get_load_options()).where(Entity.permalink.in_(permalinks))
)
result = await self.execute_query(query)
return list(result.scalars().all())
async def upsert_entity(self, entity: Entity) -> Entity:
"""Insert or update entity using simple try/catch with database-level conflict resolution.
Handles file_path race conditions by checking for existing entity on IntegrityError.
For permalink conflicts, generates a unique permalink with numeric suffix.
Args:
entity: The entity to insert or update
Returns:
The inserted or updated entity
"""
async with db.scoped_session(self.session_maker) as session:
# Set project_id if applicable and not already set
self._set_project_id_if_needed(entity)
# Try simple insert first
try:
session.add(entity)
await session.flush()
# Return with relationships loaded
query = (
self.select()
.where(Entity.file_path == entity.file_path)
.options(*self.get_load_options())
)
result = await session.execute(query)
found = result.scalar_one_or_none()
if not found: # pragma: no cover
raise RuntimeError(
f"Failed to retrieve entity after insert: {entity.file_path}"
)
return found
except IntegrityError as e:
# Check if this is a FOREIGN KEY constraint failure
error_str = str(e)
if "FOREIGN KEY constraint failed" in error_str:
# Import locally to avoid circular dependency (repository -> services -> repository)
from basic_memory.services.exceptions import SyncFatalError
# Project doesn't exist in database - this is a fatal sync error
raise SyncFatalError(
f"Cannot sync file '{entity.file_path}': "
f"project_id={entity.project_id} does not exist in database. "
f"The project may have been deleted. This sync will be terminated."
) from e
await session.rollback()
# Re-query after rollback to get a fresh, attached entity
existing_result = await session.execute(
select(Entity)
.where(
Entity.file_path == entity.file_path, Entity.project_id == entity.project_id
)
.options(*self.get_load_options())
)
existing_entity = existing_result.scalar_one_or_none()
if existing_entity:
# File path conflict - update the existing entity
logger.debug(
f"Resolving file_path conflict for {entity.file_path}, "
f"entity_id={existing_entity.id}, observations={len(entity.observations)}"
)
# Use merge to avoid session state conflicts
# Set the ID to update existing entity
entity.id = existing_entity.id
# Ensure observations reference the correct entity_id
for obs in entity.observations:
obs.entity_id = existing_entity.id
# Clear any existing ID to force INSERT as new observation
obs.id = None
# Merge the entity which will update the existing one
merged_entity = await session.merge(entity)
await session.commit()
# Re-query to get proper relationships loaded
final_result = await session.execute(
select(Entity)
.where(Entity.id == merged_entity.id)
.options(*self.get_load_options())
)
return final_result.scalar_one()
else:
# No file_path conflict - must be permalink conflict
# Generate unique permalink and retry
entity = await self._handle_permalink_conflict(entity, session)
return entity
async def get_all_file_paths(self) -> List[str]:
"""Get all file paths for this project - optimized for deletion detection.
Returns only file_path strings without loading entities or relationships.
Used by streaming sync to detect deleted files efficiently.
Returns:
List of file_path strings for all entities in the project
"""
query = select(Entity.file_path)
query = self._add_project_filter(query)
result = await self.execute_query(query, use_query_options=False)
return list(result.scalars().all())
async def get_distinct_directories(self) -> List[str]:
"""Extract unique directory paths from file_path column.
Optimized method for getting directory structure without loading full entities
or relationships. Returns a sorted list of unique directory paths.
Returns:
List of unique directory paths (e.g., ["notes", "notes/meetings", "specs"])
"""
# Query only file_path column, no entity objects or relationships
query = select(Entity.file_path).distinct()
query = self._add_project_filter(query)
# Execute with use_query_options=False to skip eager loading
result = await self.execute_query(query, use_query_options=False)
file_paths = [row for row in result.scalars().all()]
# Parse file paths to extract unique directories
directories = set()
for file_path in file_paths:
parts = [p for p in file_path.split("/") if p]
# Add all parent directories (exclude filename which is the last part)
for i in range(len(parts) - 1):
dir_path = "/".join(parts[: i + 1])
directories.add(dir_path)
return sorted(directories)
async def find_by_directory_prefix(self, directory_prefix: str) -> Sequence[Entity]:
"""Find entities whose file_path starts with the given directory prefix.
Optimized method for listing directory contents without loading all entities.
Uses SQL LIKE pattern matching to filter entities by directory path.
Args:
directory_prefix: Directory path prefix (e.g., "docs", "docs/guides")
Empty string returns all entities (root directory)
Returns:
Sequence of entities in the specified directory and subdirectories
"""
# Build SQL LIKE pattern
if directory_prefix == "" or directory_prefix == "/":
# Root directory - return all entities
return await self.find_all()
# Remove leading/trailing slashes for consistency
directory_prefix = directory_prefix.strip("/")
# Query entities with file_path starting with prefix
# Pattern matches "prefix/" to ensure we get files IN the directory,
# not just files whose names start with the prefix
pattern = f"{directory_prefix}/%"
query = self.select().where(Entity.file_path.like(pattern))
# Skip eager loading - we only need basic entity fields for directory trees
result = await self.execute_query(query, use_query_options=False)
return list(result.scalars().all())
async def _handle_permalink_conflict(self, entity: Entity, session: AsyncSession) -> Entity:
"""Handle permalink conflicts by generating a unique permalink."""
base_permalink = entity.permalink
suffix = 1
# Find a unique permalink
while True:
test_permalink = f"{base_permalink}-{suffix}"
existing = await session.execute(
select(Entity).where(
Entity.permalink == test_permalink, Entity.project_id == entity.project_id
)
)
if existing.scalar_one_or_none() is None:
# Found unique permalink
entity.permalink = test_permalink
break
suffix += 1
# Insert with unique permalink
session.add(entity)
await session.flush()
return entity
```
--------------------------------------------------------------------------------
/src/basic_memory/deps.py:
--------------------------------------------------------------------------------
```python
"""Dependency injection functions for basic-memory services."""
from typing import Annotated
from loguru import logger
from fastapi import Depends, HTTPException, Path, status, Request
from sqlalchemy.ext.asyncio import (
AsyncSession,
AsyncEngine,
async_sessionmaker,
)
import pathlib
from basic_memory import db
from basic_memory.config import ProjectConfig, BasicMemoryConfig, ConfigManager
from basic_memory.importers import (
ChatGPTImporter,
ClaudeConversationsImporter,
ClaudeProjectsImporter,
MemoryJsonImporter,
)
from basic_memory.markdown import EntityParser
from basic_memory.markdown.markdown_processor import MarkdownProcessor
from basic_memory.repository.entity_repository import EntityRepository
from basic_memory.repository.observation_repository import ObservationRepository
from basic_memory.repository.project_repository import ProjectRepository
from basic_memory.repository.relation_repository import RelationRepository
from basic_memory.repository.search_repository import SearchRepository
from basic_memory.services import EntityService, ProjectService
from basic_memory.services.context_service import ContextService
from basic_memory.services.directory_service import DirectoryService
from basic_memory.services.file_service import FileService
from basic_memory.services.link_resolver import LinkResolver
from basic_memory.services.search_service import SearchService
from basic_memory.sync import SyncService
from basic_memory.utils import generate_permalink
def get_app_config() -> BasicMemoryConfig: # pragma: no cover
app_config = ConfigManager().config
return app_config
AppConfigDep = Annotated[BasicMemoryConfig, Depends(get_app_config)] # pragma: no cover
## project
async def get_project_config(
project: "ProjectPathDep", project_repository: "ProjectRepositoryDep"
) -> ProjectConfig: # pragma: no cover
"""Get the current project referenced from request state.
Args:
request: The current request object
project_repository: Repository for project operations
Returns:
The resolved project config
Raises:
HTTPException: If project is not found
"""
# Convert project name to permalink for lookup
project_permalink = generate_permalink(str(project))
project_obj = await project_repository.get_by_permalink(project_permalink)
if project_obj:
return ProjectConfig(name=project_obj.name, home=pathlib.Path(project_obj.path))
# Not found
raise HTTPException( # pragma: no cover
status_code=status.HTTP_404_NOT_FOUND, detail=f"Project '{project}' not found."
)
ProjectConfigDep = Annotated[ProjectConfig, Depends(get_project_config)] # pragma: no cover
## sqlalchemy
async def get_engine_factory(
request: Request,
) -> tuple[AsyncEngine, async_sessionmaker[AsyncSession]]: # pragma: no cover
"""Get cached engine and session maker from app state.
For API requests, returns cached connections from app.state for optimal performance.
For non-API contexts (CLI), falls back to direct database connection.
"""
# Try to get cached connections from app state (API context)
if (
hasattr(request, "app")
and hasattr(request.app.state, "engine")
and hasattr(request.app.state, "session_maker")
):
return request.app.state.engine, request.app.state.session_maker
# Fallback for non-API contexts (CLI)
logger.debug("Using fallback database connection for non-API context")
app_config = get_app_config()
engine, session_maker = await db.get_or_create_db(app_config.database_path)
return engine, session_maker
EngineFactoryDep = Annotated[
tuple[AsyncEngine, async_sessionmaker[AsyncSession]], Depends(get_engine_factory)
]
async def get_session_maker(engine_factory: EngineFactoryDep) -> async_sessionmaker[AsyncSession]:
"""Get session maker."""
_, session_maker = engine_factory
return session_maker
SessionMakerDep = Annotated[async_sessionmaker, Depends(get_session_maker)]
## repositories
async def get_project_repository(
session_maker: SessionMakerDep,
) -> ProjectRepository:
"""Get the project repository."""
return ProjectRepository(session_maker)
ProjectRepositoryDep = Annotated[ProjectRepository, Depends(get_project_repository)]
ProjectPathDep = Annotated[str, Path()] # Use Path dependency to extract from URL
async def get_project_id(
project_repository: ProjectRepositoryDep,
project: ProjectPathDep,
) -> int:
"""Get the current project ID from request state.
When using sub-applications with /{project} mounting, the project value
is stored in request.state by middleware.
Args:
request: The current request object
project_repository: Repository for project operations
Returns:
The resolved project ID
Raises:
HTTPException: If project is not found
"""
# Convert project name to permalink for lookup
project_permalink = generate_permalink(str(project))
project_obj = await project_repository.get_by_permalink(project_permalink)
if project_obj:
return project_obj.id
# Try by name if permalink lookup fails
project_obj = await project_repository.get_by_name(str(project)) # pragma: no cover
if project_obj: # pragma: no cover
return project_obj.id
# Not found
raise HTTPException( # pragma: no cover
status_code=status.HTTP_404_NOT_FOUND, detail=f"Project '{project}' not found."
)
"""
The project_id dependency is used in the following:
- EntityRepository
- ObservationRepository
- RelationRepository
- SearchRepository
- ProjectInfoRepository
"""
ProjectIdDep = Annotated[int, Depends(get_project_id)]
async def get_entity_repository(
session_maker: SessionMakerDep,
project_id: ProjectIdDep,
) -> EntityRepository:
"""Create an EntityRepository instance for the current project."""
return EntityRepository(session_maker, project_id=project_id)
EntityRepositoryDep = Annotated[EntityRepository, Depends(get_entity_repository)]
async def get_observation_repository(
session_maker: SessionMakerDep,
project_id: ProjectIdDep,
) -> ObservationRepository:
"""Create an ObservationRepository instance for the current project."""
return ObservationRepository(session_maker, project_id=project_id)
ObservationRepositoryDep = Annotated[ObservationRepository, Depends(get_observation_repository)]
async def get_relation_repository(
session_maker: SessionMakerDep,
project_id: ProjectIdDep,
) -> RelationRepository:
"""Create a RelationRepository instance for the current project."""
return RelationRepository(session_maker, project_id=project_id)
RelationRepositoryDep = Annotated[RelationRepository, Depends(get_relation_repository)]
async def get_search_repository(
session_maker: SessionMakerDep,
project_id: ProjectIdDep,
) -> SearchRepository:
"""Create a SearchRepository instance for the current project."""
return SearchRepository(session_maker, project_id=project_id)
SearchRepositoryDep = Annotated[SearchRepository, Depends(get_search_repository)]
# ProjectInfoRepository is deprecated and will be removed in a future version.
# Use ProjectRepository instead, which has the same functionality plus more project-specific operations.
## services
async def get_entity_parser(project_config: ProjectConfigDep) -> EntityParser:
return EntityParser(project_config.home)
EntityParserDep = Annotated["EntityParser", Depends(get_entity_parser)]
async def get_markdown_processor(entity_parser: EntityParserDep) -> MarkdownProcessor:
return MarkdownProcessor(entity_parser)
MarkdownProcessorDep = Annotated[MarkdownProcessor, Depends(get_markdown_processor)]
async def get_file_service(
project_config: ProjectConfigDep, markdown_processor: MarkdownProcessorDep
) -> FileService:
logger.debug(
f"Creating FileService for project: {project_config.name}, base_path: {project_config.home}"
)
file_service = FileService(project_config.home, markdown_processor)
logger.debug(f"Created FileService for project: {file_service} ")
return file_service
FileServiceDep = Annotated[FileService, Depends(get_file_service)]
async def get_entity_service(
entity_repository: EntityRepositoryDep,
observation_repository: ObservationRepositoryDep,
relation_repository: RelationRepositoryDep,
entity_parser: EntityParserDep,
file_service: FileServiceDep,
link_resolver: "LinkResolverDep",
app_config: AppConfigDep,
) -> EntityService:
"""Create EntityService with repository."""
return EntityService(
entity_repository=entity_repository,
observation_repository=observation_repository,
relation_repository=relation_repository,
entity_parser=entity_parser,
file_service=file_service,
link_resolver=link_resolver,
app_config=app_config,
)
EntityServiceDep = Annotated[EntityService, Depends(get_entity_service)]
async def get_search_service(
search_repository: SearchRepositoryDep,
entity_repository: EntityRepositoryDep,
file_service: FileServiceDep,
) -> SearchService:
"""Create SearchService with dependencies."""
return SearchService(search_repository, entity_repository, file_service)
SearchServiceDep = Annotated[SearchService, Depends(get_search_service)]
async def get_link_resolver(
entity_repository: EntityRepositoryDep, search_service: SearchServiceDep
) -> LinkResolver:
return LinkResolver(entity_repository=entity_repository, search_service=search_service)
LinkResolverDep = Annotated[LinkResolver, Depends(get_link_resolver)]
async def get_context_service(
search_repository: SearchRepositoryDep,
entity_repository: EntityRepositoryDep,
observation_repository: ObservationRepositoryDep,
) -> ContextService:
return ContextService(
search_repository=search_repository,
entity_repository=entity_repository,
observation_repository=observation_repository,
)
ContextServiceDep = Annotated[ContextService, Depends(get_context_service)]
async def get_sync_service(
app_config: AppConfigDep,
entity_service: EntityServiceDep,
entity_parser: EntityParserDep,
entity_repository: EntityRepositoryDep,
relation_repository: RelationRepositoryDep,
project_repository: ProjectRepositoryDep,
search_service: SearchServiceDep,
file_service: FileServiceDep,
) -> SyncService: # pragma: no cover
"""
:rtype: object
"""
return SyncService(
app_config=app_config,
entity_service=entity_service,
entity_parser=entity_parser,
entity_repository=entity_repository,
relation_repository=relation_repository,
project_repository=project_repository,
search_service=search_service,
file_service=file_service,
)
SyncServiceDep = Annotated[SyncService, Depends(get_sync_service)]
async def get_project_service(
project_repository: ProjectRepositoryDep,
) -> ProjectService:
"""Create ProjectService with repository."""
return ProjectService(repository=project_repository)
ProjectServiceDep = Annotated[ProjectService, Depends(get_project_service)]
async def get_directory_service(
entity_repository: EntityRepositoryDep,
) -> DirectoryService:
"""Create DirectoryService with dependencies."""
return DirectoryService(
entity_repository=entity_repository,
)
DirectoryServiceDep = Annotated[DirectoryService, Depends(get_directory_service)]
# Import
async def get_chatgpt_importer(
project_config: ProjectConfigDep, markdown_processor: MarkdownProcessorDep
) -> ChatGPTImporter:
"""Create ChatGPTImporter with dependencies."""
return ChatGPTImporter(project_config.home, markdown_processor)
ChatGPTImporterDep = Annotated[ChatGPTImporter, Depends(get_chatgpt_importer)]
async def get_claude_conversations_importer(
project_config: ProjectConfigDep, markdown_processor: MarkdownProcessorDep
) -> ClaudeConversationsImporter:
"""Create ChatGPTImporter with dependencies."""
return ClaudeConversationsImporter(project_config.home, markdown_processor)
ClaudeConversationsImporterDep = Annotated[
ClaudeConversationsImporter, Depends(get_claude_conversations_importer)
]
async def get_claude_projects_importer(
project_config: ProjectConfigDep, markdown_processor: MarkdownProcessorDep
) -> ClaudeProjectsImporter:
"""Create ChatGPTImporter with dependencies."""
return ClaudeProjectsImporter(project_config.home, markdown_processor)
ClaudeProjectsImporterDep = Annotated[ClaudeProjectsImporter, Depends(get_claude_projects_importer)]
async def get_memory_json_importer(
project_config: ProjectConfigDep, markdown_processor: MarkdownProcessorDep
) -> MemoryJsonImporter:
"""Create ChatGPTImporter with dependencies."""
return MemoryJsonImporter(project_config.home, markdown_processor)
MemoryJsonImporterDep = Annotated[MemoryJsonImporter, Depends(get_memory_json_importer)]
```
--------------------------------------------------------------------------------
/src/basic_memory/services/search_service.py:
--------------------------------------------------------------------------------
```python
"""Service for search operations."""
import ast
from datetime import datetime
from typing import List, Optional, Set
from dateparser import parse
from fastapi import BackgroundTasks
from loguru import logger
from sqlalchemy import text
from basic_memory.models import Entity
from basic_memory.repository import EntityRepository
from basic_memory.repository.search_repository import SearchRepository, SearchIndexRow
from basic_memory.schemas.search import SearchQuery, SearchItemType
from basic_memory.services import FileService
class SearchService:
"""Service for search operations.
Supports three primary search modes:
1. Exact permalink lookup
2. Pattern matching with * (e.g., 'specs/*')
3. Full-text search across title/content
"""
def __init__(
self,
search_repository: SearchRepository,
entity_repository: EntityRepository,
file_service: FileService,
):
self.repository = search_repository
self.entity_repository = entity_repository
self.file_service = file_service
async def init_search_index(self):
"""Create FTS5 virtual table if it doesn't exist."""
await self.repository.init_search_index()
async def reindex_all(self, background_tasks: Optional[BackgroundTasks] = None) -> None:
"""Reindex all content from database."""
logger.info("Starting full reindex")
# Clear and recreate search index
await self.repository.execute_query(text("DROP TABLE IF EXISTS search_index"), params={})
await self.init_search_index()
# Reindex all entities
logger.debug("Indexing entities")
entities = await self.entity_repository.find_all()
for entity in entities:
await self.index_entity(entity, background_tasks)
logger.info("Reindex complete")
async def search(self, query: SearchQuery, limit=10, offset=0) -> List[SearchIndexRow]:
"""Search across all indexed content.
Supports three modes:
1. Exact permalink: finds direct matches for a specific path
2. Pattern match: handles * wildcards in paths
3. Text search: full-text search across title/content
"""
if query.no_criteria():
logger.debug("no criteria passed to query")
return []
logger.trace(f"Searching with query: {query}")
after_date = (
(
query.after_date
if isinstance(query.after_date, datetime)
else parse(query.after_date)
)
if query.after_date
else None
)
# search
results = await self.repository.search(
search_text=query.text,
permalink=query.permalink,
permalink_match=query.permalink_match,
title=query.title,
types=query.types,
search_item_types=query.entity_types,
after_date=after_date,
limit=limit,
offset=offset,
)
return results
@staticmethod
def _generate_variants(text: str) -> Set[str]:
"""Generate text variants for better fuzzy matching.
Creates variations of the text to improve match chances:
- Original form
- Lowercase form
- Path segments (for permalinks)
- Common word boundaries
"""
variants = {text, text.lower()}
# Add path segments
if "/" in text:
variants.update(p.strip() for p in text.split("/") if p.strip())
# Add word boundaries
variants.update(w.strip() for w in text.lower().split() if w.strip())
# Trigrams disabled: They create massive search index bloat, increasing DB size significantly
# and slowing down indexing performance. FTS5 search works well without them.
# See: https://github.com/basicmachines-co/basic-memory/issues/351
# variants.update(text[i : i + 3].lower() for i in range(len(text) - 2))
return variants
def _extract_entity_tags(self, entity: Entity) -> List[str]:
"""Extract tags from entity metadata for search indexing.
Handles multiple tag formats:
- List format: ["tag1", "tag2"]
- String format: "['tag1', 'tag2']" or "[tag1, tag2]"
- Empty: [] or "[]"
Returns a list of tag strings for search indexing.
"""
if not entity.entity_metadata or "tags" not in entity.entity_metadata:
return []
tags = entity.entity_metadata["tags"]
# Handle list format (preferred)
if isinstance(tags, list):
return [str(tag) for tag in tags if tag]
# Handle string format (legacy)
if isinstance(tags, str):
try:
# Parse string representation of list
parsed_tags = ast.literal_eval(tags)
if isinstance(parsed_tags, list):
return [str(tag) for tag in parsed_tags if tag]
except (ValueError, SyntaxError):
# If parsing fails, treat as single tag
return [tags] if tags.strip() else []
return [] # pragma: no cover
async def index_entity(
self,
entity: Entity,
background_tasks: Optional[BackgroundTasks] = None,
) -> None:
if background_tasks:
background_tasks.add_task(self.index_entity_data, entity)
else:
await self.index_entity_data(entity)
async def index_entity_data(
self,
entity: Entity,
) -> None:
# delete all search index data associated with entity
await self.repository.delete_by_entity_id(entity_id=entity.id)
# reindex
await self.index_entity_markdown(
entity
) if entity.is_markdown else await self.index_entity_file(entity)
async def index_entity_file(
self,
entity: Entity,
) -> None:
# Index entity file with no content
await self.repository.index_item(
SearchIndexRow(
id=entity.id,
entity_id=entity.id,
type=SearchItemType.ENTITY.value,
title=entity.title,
file_path=entity.file_path,
metadata={
"entity_type": entity.entity_type,
},
created_at=entity.created_at,
updated_at=entity.updated_at,
project_id=entity.project_id,
)
)
async def index_entity_markdown(
self,
entity: Entity,
) -> None:
"""Index an entity and all its observations and relations.
Indexing structure:
1. Entities
- permalink: direct from entity (e.g., "specs/search")
- file_path: physical file location
- project_id: project context for isolation
2. Observations
- permalink: entity permalink + /observations/id (e.g., "specs/search/observations/123")
- file_path: parent entity's file (where observation is defined)
- project_id: inherited from parent entity
3. Relations (only index outgoing relations defined in this file)
- permalink: from_entity/relation_type/to_entity (e.g., "specs/search/implements/features/search-ui")
- file_path: source entity's file (where relation is defined)
- project_id: inherited from source entity
Each type gets its own row in the search index with appropriate metadata.
The project_id is automatically added by the repository when indexing.
"""
# Collect all search index rows to batch insert at the end
rows_to_index = []
content_stems = []
content_snippet = ""
title_variants = self._generate_variants(entity.title)
content_stems.extend(title_variants)
content = await self.file_service.read_entity_content(entity)
if content:
content_stems.append(content)
content_snippet = f"{content[:250]}"
if entity.permalink:
content_stems.extend(self._generate_variants(entity.permalink))
content_stems.extend(self._generate_variants(entity.file_path))
# Add entity tags from frontmatter to search content
entity_tags = self._extract_entity_tags(entity)
if entity_tags:
content_stems.extend(entity_tags)
entity_content_stems = "\n".join(p for p in content_stems if p and p.strip())
# Add entity row
rows_to_index.append(
SearchIndexRow(
id=entity.id,
type=SearchItemType.ENTITY.value,
title=entity.title,
content_stems=entity_content_stems,
content_snippet=content_snippet,
permalink=entity.permalink,
file_path=entity.file_path,
entity_id=entity.id,
metadata={
"entity_type": entity.entity_type,
},
created_at=entity.created_at,
updated_at=entity.updated_at,
project_id=entity.project_id,
)
)
# Add observation rows
for obs in entity.observations:
# Index with parent entity's file path since that's where it's defined
obs_content_stems = "\n".join(
p for p in self._generate_variants(obs.content) if p and p.strip()
)
rows_to_index.append(
SearchIndexRow(
id=obs.id,
type=SearchItemType.OBSERVATION.value,
title=f"{obs.category}: {obs.content[:100]}...",
content_stems=obs_content_stems,
content_snippet=obs.content,
permalink=obs.permalink,
file_path=entity.file_path,
category=obs.category,
entity_id=entity.id,
metadata={
"tags": obs.tags,
},
created_at=entity.created_at,
updated_at=entity.updated_at,
project_id=entity.project_id,
)
)
# Add relation rows (only outgoing relations defined in this file)
for rel in entity.outgoing_relations:
# Create descriptive title showing the relationship
relation_title = (
f"{rel.from_entity.title} → {rel.to_entity.title}"
if rel.to_entity
else f"{rel.from_entity.title}"
)
rel_content_stems = "\n".join(
p for p in self._generate_variants(relation_title) if p and p.strip()
)
rows_to_index.append(
SearchIndexRow(
id=rel.id,
title=relation_title,
permalink=rel.permalink,
content_stems=rel_content_stems,
file_path=entity.file_path,
type=SearchItemType.RELATION.value,
entity_id=entity.id,
from_id=rel.from_id,
to_id=rel.to_id,
relation_type=rel.relation_type,
created_at=entity.created_at,
updated_at=entity.updated_at,
project_id=entity.project_id,
)
)
# Batch insert all rows at once
await self.repository.bulk_index_items(rows_to_index)
async def delete_by_permalink(self, permalink: str):
"""Delete an item from the search index."""
await self.repository.delete_by_permalink(permalink)
async def delete_by_entity_id(self, entity_id: int):
"""Delete an item from the search index."""
await self.repository.delete_by_entity_id(entity_id)
async def handle_delete(self, entity: Entity):
"""Handle complete entity deletion from search index including observations and relations.
This replicates the logic from sync_service.handle_delete() to properly clean up
all search index entries for an entity and its related data.
"""
logger.debug(
f"Cleaning up search index for entity_id={entity.id}, file_path={entity.file_path}, "
f"observations={len(entity.observations)}, relations={len(entity.outgoing_relations)}"
)
# Clean up search index - same logic as sync_service.handle_delete()
permalinks = (
[entity.permalink]
+ [o.permalink for o in entity.observations]
+ [r.permalink for r in entity.outgoing_relations]
)
logger.debug(
f"Deleting search index entries for entity_id={entity.id}, "
f"index_entries={len(permalinks)}"
)
for permalink in permalinks:
if permalink:
await self.delete_by_permalink(permalink)
else:
await self.delete_by_entity_id(entity.id)
```