This is page 14 of 17. Use http://codebase.md/basicmachines-co/basic-memory?page={x} to view the full context.
# Directory Structure
```
├── .claude
│ ├── agents
│ │ ├── python-developer.md
│ │ └── system-architect.md
│ └── commands
│ ├── release
│ │ ├── beta.md
│ │ ├── changelog.md
│ │ ├── release-check.md
│ │ └── release.md
│ ├── spec.md
│ └── test-live.md
├── .dockerignore
├── .github
│ ├── dependabot.yml
│ ├── ISSUE_TEMPLATE
│ │ ├── bug_report.md
│ │ ├── config.yml
│ │ ├── documentation.md
│ │ └── feature_request.md
│ └── workflows
│ ├── claude-code-review.yml
│ ├── claude-issue-triage.yml
│ ├── claude.yml
│ ├── dev-release.yml
│ ├── docker.yml
│ ├── pr-title.yml
│ ├── release.yml
│ └── test.yml
├── .gitignore
├── .python-version
├── CHANGELOG.md
├── CITATION.cff
├── CLA.md
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── docker-compose.yml
├── Dockerfile
├── docs
│ ├── ai-assistant-guide-extended.md
│ ├── character-handling.md
│ ├── cloud-cli.md
│ └── Docker.md
├── justfile
├── LICENSE
├── llms-install.md
├── pyproject.toml
├── README.md
├── SECURITY.md
├── smithery.yaml
├── specs
│ ├── SPEC-1 Specification-Driven Development Process.md
│ ├── SPEC-10 Unified Deployment Workflow and Event Tracking.md
│ ├── SPEC-11 Basic Memory API Performance Optimization.md
│ ├── SPEC-12 OpenTelemetry Observability.md
│ ├── SPEC-13 CLI Authentication with Subscription Validation.md
│ ├── SPEC-14 Cloud Git Versioning & GitHub Backup.md
│ ├── SPEC-14- Cloud Git Versioning & GitHub Backup.md
│ ├── SPEC-15 Configuration Persistence via Tigris for Cloud Tenants.md
│ ├── SPEC-16 MCP Cloud Service Consolidation.md
│ ├── SPEC-17 Semantic Search with ChromaDB.md
│ ├── SPEC-18 AI Memory Management Tool.md
│ ├── SPEC-19 Sync Performance and Memory Optimization.md
│ ├── SPEC-2 Slash Commands Reference.md
│ ├── SPEC-3 Agent Definitions.md
│ ├── SPEC-4 Notes Web UI Component Architecture.md
│ ├── SPEC-5 CLI Cloud Upload via WebDAV.md
│ ├── SPEC-6 Explicit Project Parameter Architecture.md
│ ├── SPEC-7 POC to spike Tigris Turso for local access to cloud data.md
│ ├── SPEC-8 TigrisFS Integration.md
│ ├── SPEC-9 Multi-Project Bidirectional Sync Architecture.md
│ ├── SPEC-9 Signed Header Tenant Information.md
│ └── SPEC-9-1 Follow-Ups- Conflict, Sync, and Observability.md
├── src
│ └── basic_memory
│ ├── __init__.py
│ ├── alembic
│ │ ├── alembic.ini
│ │ ├── env.py
│ │ ├── migrations.py
│ │ ├── script.py.mako
│ │ └── versions
│ │ ├── 3dae7c7b1564_initial_schema.py
│ │ ├── 502b60eaa905_remove_required_from_entity_permalink.py
│ │ ├── 5fe1ab1ccebe_add_projects_table.py
│ │ ├── 647e7a75e2cd_project_constraint_fix.py
│ │ ├── 9d9c1cb7d8f5_add_mtime_and_size_columns_to_entity_.py
│ │ ├── a1b2c3d4e5f6_fix_project_foreign_keys.py
│ │ ├── b3c3938bacdb_relation_to_name_unique_index.py
│ │ ├── cc7172b46608_update_search_index_schema.py
│ │ └── e7e1f4367280_add_scan_watermark_tracking_to_project.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── app.py
│ │ ├── routers
│ │ │ ├── __init__.py
│ │ │ ├── directory_router.py
│ │ │ ├── importer_router.py
│ │ │ ├── knowledge_router.py
│ │ │ ├── management_router.py
│ │ │ ├── memory_router.py
│ │ │ ├── project_router.py
│ │ │ ├── prompt_router.py
│ │ │ ├── resource_router.py
│ │ │ ├── search_router.py
│ │ │ └── utils.py
│ │ └── template_loader.py
│ ├── cli
│ │ ├── __init__.py
│ │ ├── app.py
│ │ ├── auth.py
│ │ ├── commands
│ │ │ ├── __init__.py
│ │ │ ├── cloud
│ │ │ │ ├── __init__.py
│ │ │ │ ├── api_client.py
│ │ │ │ ├── bisync_commands.py
│ │ │ │ ├── cloud_utils.py
│ │ │ │ ├── core_commands.py
│ │ │ │ ├── mount_commands.py
│ │ │ │ ├── rclone_config.py
│ │ │ │ ├── rclone_installer.py
│ │ │ │ ├── upload_command.py
│ │ │ │ └── upload.py
│ │ │ ├── command_utils.py
│ │ │ ├── db.py
│ │ │ ├── import_chatgpt.py
│ │ │ ├── import_claude_conversations.py
│ │ │ ├── import_claude_projects.py
│ │ │ ├── import_memory_json.py
│ │ │ ├── mcp.py
│ │ │ ├── project.py
│ │ │ ├── status.py
│ │ │ ├── sync.py
│ │ │ └── tool.py
│ │ └── main.py
│ ├── config.py
│ ├── db.py
│ ├── deps.py
│ ├── file_utils.py
│ ├── ignore_utils.py
│ ├── importers
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── chatgpt_importer.py
│ │ ├── claude_conversations_importer.py
│ │ ├── claude_projects_importer.py
│ │ ├── memory_json_importer.py
│ │ └── utils.py
│ ├── markdown
│ │ ├── __init__.py
│ │ ├── entity_parser.py
│ │ ├── markdown_processor.py
│ │ ├── plugins.py
│ │ ├── schemas.py
│ │ └── utils.py
│ ├── mcp
│ │ ├── __init__.py
│ │ ├── async_client.py
│ │ ├── project_context.py
│ │ ├── prompts
│ │ │ ├── __init__.py
│ │ │ ├── ai_assistant_guide.py
│ │ │ ├── continue_conversation.py
│ │ │ ├── recent_activity.py
│ │ │ ├── search.py
│ │ │ └── utils.py
│ │ ├── resources
│ │ │ ├── ai_assistant_guide.md
│ │ │ └── project_info.py
│ │ ├── server.py
│ │ └── tools
│ │ ├── __init__.py
│ │ ├── build_context.py
│ │ ├── canvas.py
│ │ ├── chatgpt_tools.py
│ │ ├── delete_note.py
│ │ ├── edit_note.py
│ │ ├── list_directory.py
│ │ ├── move_note.py
│ │ ├── project_management.py
│ │ ├── read_content.py
│ │ ├── read_note.py
│ │ ├── recent_activity.py
│ │ ├── search.py
│ │ ├── utils.py
│ │ ├── view_note.py
│ │ └── write_note.py
│ ├── models
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── knowledge.py
│ │ ├── project.py
│ │ └── search.py
│ ├── repository
│ │ ├── __init__.py
│ │ ├── entity_repository.py
│ │ ├── observation_repository.py
│ │ ├── project_info_repository.py
│ │ ├── project_repository.py
│ │ ├── relation_repository.py
│ │ ├── repository.py
│ │ └── search_repository.py
│ ├── schemas
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── cloud.py
│ │ ├── delete.py
│ │ ├── directory.py
│ │ ├── importer.py
│ │ ├── memory.py
│ │ ├── project_info.py
│ │ ├── prompt.py
│ │ ├── request.py
│ │ ├── response.py
│ │ ├── search.py
│ │ └── sync_report.py
│ ├── services
│ │ ├── __init__.py
│ │ ├── context_service.py
│ │ ├── directory_service.py
│ │ ├── entity_service.py
│ │ ├── exceptions.py
│ │ ├── file_service.py
│ │ ├── initialization.py
│ │ ├── link_resolver.py
│ │ ├── project_service.py
│ │ ├── search_service.py
│ │ └── service.py
│ ├── sync
│ │ ├── __init__.py
│ │ ├── background_sync.py
│ │ ├── sync_service.py
│ │ └── watch_service.py
│ ├── templates
│ │ └── prompts
│ │ ├── continue_conversation.hbs
│ │ └── search.hbs
│ └── utils.py
├── test-int
│ ├── BENCHMARKS.md
│ ├── cli
│ │ ├── test_project_commands_integration.py
│ │ ├── test_sync_commands_integration.py
│ │ └── test_version_integration.py
│ ├── conftest.py
│ ├── mcp
│ │ ├── test_build_context_underscore.py
│ │ ├── test_build_context_validation.py
│ │ ├── test_chatgpt_tools_integration.py
│ │ ├── test_default_project_mode_integration.py
│ │ ├── test_delete_note_integration.py
│ │ ├── test_edit_note_integration.py
│ │ ├── test_list_directory_integration.py
│ │ ├── test_move_note_integration.py
│ │ ├── test_project_management_integration.py
│ │ ├── test_project_state_sync_integration.py
│ │ ├── test_read_content_integration.py
│ │ ├── test_read_note_integration.py
│ │ ├── test_search_integration.py
│ │ ├── test_single_project_mcp_integration.py
│ │ └── test_write_note_integration.py
│ ├── test_db_wal_mode.py
│ ├── test_disable_permalinks_integration.py
│ └── test_sync_performance_benchmark.py
├── tests
│ ├── __init__.py
│ ├── api
│ │ ├── conftest.py
│ │ ├── test_async_client.py
│ │ ├── test_continue_conversation_template.py
│ │ ├── test_directory_router.py
│ │ ├── test_importer_router.py
│ │ ├── test_knowledge_router.py
│ │ ├── test_management_router.py
│ │ ├── test_memory_router.py
│ │ ├── test_project_router_operations.py
│ │ ├── test_project_router.py
│ │ ├── test_prompt_router.py
│ │ ├── test_relation_background_resolution.py
│ │ ├── test_resource_router.py
│ │ ├── test_search_router.py
│ │ ├── test_search_template.py
│ │ ├── test_template_loader_helpers.py
│ │ └── test_template_loader.py
│ ├── cli
│ │ ├── conftest.py
│ │ ├── test_bisync_commands.py
│ │ ├── test_cli_tools.py
│ │ ├── test_cloud_authentication.py
│ │ ├── test_cloud_utils.py
│ │ ├── test_ignore_utils.py
│ │ ├── test_import_chatgpt.py
│ │ ├── test_import_claude_conversations.py
│ │ ├── test_import_claude_projects.py
│ │ ├── test_import_memory_json.py
│ │ └── test_upload.py
│ ├── conftest.py
│ ├── db
│ │ └── test_issue_254_foreign_key_constraints.py
│ ├── importers
│ │ ├── test_importer_base.py
│ │ └── test_importer_utils.py
│ ├── markdown
│ │ ├── __init__.py
│ │ ├── test_date_frontmatter_parsing.py
│ │ ├── test_entity_parser_error_handling.py
│ │ ├── test_entity_parser.py
│ │ ├── test_markdown_plugins.py
│ │ ├── test_markdown_processor.py
│ │ ├── test_observation_edge_cases.py
│ │ ├── test_parser_edge_cases.py
│ │ ├── test_relation_edge_cases.py
│ │ └── test_task_detection.py
│ ├── mcp
│ │ ├── conftest.py
│ │ ├── test_obsidian_yaml_formatting.py
│ │ ├── test_permalink_collision_file_overwrite.py
│ │ ├── test_prompts.py
│ │ ├── test_resources.py
│ │ ├── test_tool_build_context.py
│ │ ├── test_tool_canvas.py
│ │ ├── test_tool_delete_note.py
│ │ ├── test_tool_edit_note.py
│ │ ├── test_tool_list_directory.py
│ │ ├── test_tool_move_note.py
│ │ ├── test_tool_read_content.py
│ │ ├── test_tool_read_note.py
│ │ ├── test_tool_recent_activity.py
│ │ ├── test_tool_resource.py
│ │ ├── test_tool_search.py
│ │ ├── test_tool_utils.py
│ │ ├── test_tool_view_note.py
│ │ ├── test_tool_write_note.py
│ │ └── tools
│ │ └── test_chatgpt_tools.py
│ ├── Non-MarkdownFileSupport.pdf
│ ├── repository
│ │ ├── test_entity_repository_upsert.py
│ │ ├── test_entity_repository.py
│ │ ├── test_entity_upsert_issue_187.py
│ │ ├── test_observation_repository.py
│ │ ├── test_project_info_repository.py
│ │ ├── test_project_repository.py
│ │ ├── test_relation_repository.py
│ │ ├── test_repository.py
│ │ ├── test_search_repository_edit_bug_fix.py
│ │ └── test_search_repository.py
│ ├── schemas
│ │ ├── test_base_timeframe_minimum.py
│ │ ├── test_memory_serialization.py
│ │ ├── test_memory_url_validation.py
│ │ ├── test_memory_url.py
│ │ ├── test_schemas.py
│ │ └── test_search.py
│ ├── Screenshot.png
│ ├── services
│ │ ├── test_context_service.py
│ │ ├── test_directory_service.py
│ │ ├── test_entity_service_disable_permalinks.py
│ │ ├── test_entity_service.py
│ │ ├── test_file_service.py
│ │ ├── test_initialization.py
│ │ ├── test_link_resolver.py
│ │ ├── test_project_removal_bug.py
│ │ ├── test_project_service_operations.py
│ │ ├── test_project_service.py
│ │ └── test_search_service.py
│ ├── sync
│ │ ├── test_character_conflicts.py
│ │ ├── test_sync_service_incremental.py
│ │ ├── test_sync_service.py
│ │ ├── test_sync_wikilink_issue.py
│ │ ├── test_tmp_files.py
│ │ ├── test_watch_service_edge_cases.py
│ │ ├── test_watch_service_reload.py
│ │ └── test_watch_service.py
│ ├── test_config.py
│ ├── test_db_migration_deduplication.py
│ ├── test_deps.py
│ ├── test_production_cascade_delete.py
│ └── utils
│ ├── test_file_utils.py
│ ├── test_frontmatter_obsidian_compatible.py
│ ├── test_parse_tags.py
│ ├── test_permalink_formatting.py
│ ├── test_utf8_handling.py
│ └── test_validate_project_path.py
├── uv.lock
├── v0.15.0-RELEASE-DOCS.md
└── v15-docs
├── api-performance.md
├── background-relations.md
├── basic-memory-home.md
├── bug-fixes.md
├── chatgpt-integration.md
├── cloud-authentication.md
├── cloud-bisync.md
├── cloud-mode-usage.md
├── cloud-mount.md
├── default-project-mode.md
├── env-file-removal.md
├── env-var-overrides.md
├── explicit-project-parameter.md
├── gitignore-integration.md
├── project-root-env-var.md
├── README.md
└── sqlite-performance.md
```
# Files
--------------------------------------------------------------------------------
/specs/SPEC-9 Multi-Project Bidirectional Sync Architecture.md:
--------------------------------------------------------------------------------
```markdown
---
title: 'SPEC-9: Multi-Project Bidirectional Sync Architecture'
type: spec
permalink: specs/spec-9-multi-project-bisync
tags:
- cloud
- bisync
- architecture
- multi-project
---
# SPEC-9: Multi-Project Bidirectional Sync Architecture
## Status: ✅ Implementation Complete
**Completed Phases:**
- ✅ Phase 1: Cloud Mode Toggle & Config
- ✅ Phase 2: Bisync Updates (Multi-Project)
- ✅ Phase 3: Sync Command Dual Mode
- ✅ Phase 4: Remove Duplicate Commands & Cloud Mode Auth
- ✅ Phase 5: Mount Updates
- ✅ Phase 6: Safety & Validation
- ⏸️ Phase 7: Cloud-Side Implementation (Deferred to cloud repo)
- ✅ Phase 8.1: Testing (All test scenarios validated)
- ✅ Phase 8.2: Documentation (Core docs complete, demos pending)
**Key Achievements:**
- Unified CLI: `bm sync`, `bm project`, `bm tool` work transparently in both local and cloud modes
- Multi-project sync: Single `bm sync` operation handles all projects bidirectionally
- Cloud mode toggle: `bm cloud login` / `bm cloud logout` switches modes seamlessly
- Integrity checking: `bm cloud check` verifies file matching without data transfer
- Directory isolation: Mount and bisync use separate directories with conflict prevention
- Clean UX: No RCLONE_TEST files, clear error messages, transparent implementation
## Why
**Current State:**
SPEC-8 implemented rclone bisync for cloud file synchronization, but has several architectural limitations:
1. Syncs only a single project subdirectory (`bucket:/basic-memory`)
2. Requires separate `bm cloud` command namespace, duplicating existing CLI commands
3. Users must learn different commands for local vs cloud operations
4. RCLONE_TEST marker files clutter user directories
**Problems:**
1. **Duplicate Commands**: `bm project` vs `bm cloud project`, `bm tool` vs (no cloud equivalent)
2. **Inconsistent UX**: Same operations require different command syntax depending on mode
3. **Single Project Sync**: Users can only sync one project at a time
4. **Manual Coordination**: Creating new projects requires manual coordination between local and cloud
5. **Confusing Artifacts**: RCLONE_TEST marker files confuse users
**Goals:**
- **Unified CLI**: All existing `bm` commands work in both local and cloud mode via toggle
- **Multi-Project Sync**: Single sync operation handles all projects bidirectionally
- **Simple Mode Switch**: `bm cloud login` enables cloud mode, `logout` returns to local
- **Automatic Registration**: Projects auto-register on both local and cloud sides
- **Clean UX**: Remove unnecessary safety checks and confusing artifacts
## Cloud Access Paradigm: The Dropbox Model
**Mental Model Shift:**
Basic Memory cloud access follows the **Dropbox/iCloud paradigm** - not a per-project cloud connection model.
**What This Means:**
```
Traditional Project-Based Model (❌ Not This):
bm cloud mount --project work # Mount individual project
bm cloud mount --project personal # Mount another project
bm cloud sync --project research # Sync specific project
→ Multiple connections, multiple credentials, complex management
Dropbox Model (✅ This):
bm cloud mount # One mount, all projects
bm sync # One sync, all projects
~/basic-memory-cloud/ # One folder, all content
→ Single connection, organized by folders (projects)
```
**Key Principles:**
1. **Mount/Bisync = Access Methods, Not Project Tools**
- Mount: Read-through cache to cloud (like Dropbox folder)
- Bisync: Bidirectional sync with cloud (like Dropbox sync)
- Both operate at **bucket level** (all projects)
2. **Projects = Organization Within Cloud Space**
- Projects are folders within your cloud storage
- Creating a folder creates a project (auto-discovered)
- Projects are managed via `bm project` commands
3. **One Cloud Space Per Machine**
- One set of IAM credentials per tenant
- One mount point: `~/basic-memory-cloud/`
- One bisync directory: `~/basic-memory-cloud-sync/` (default)
- All projects accessible through this single entry point
4. **Why This Works Better**
- **Credential Management**: One credential set, not N sets per project
- **Resource Efficiency**: One rclone process, not N processes
- **Familiar Pattern**: Users already understand Dropbox/iCloud
- **Operational Simplicity**: `mount` once, `unmount` once
- **Scales Naturally**: Add projects by creating folders, not reconfiguring cloud access
**User Journey:**
```bash
# Setup cloud access (once)
bm cloud login
bm cloud mount # or: bm cloud setup for bisync
# Work with projects (create folders as needed)
cd ~/basic-memory-cloud/
mkdir my-new-project
echo "# Notes" > my-new-project/readme.md
# Cloud auto-discovers and registers project
# No additional cloud configuration needed
```
This paradigm shift means **mount and bisync are infrastructure concerns**, while **projects are content organization**. Users think about their knowledge, not about cloud plumbing.
## What
This spec affects:
1. **Cloud Mode Toggle** (`config.py`, `async_client.py`):
- Add `cloud_mode` flag to `~/.basic-memory/config.json`
- Set/unset `BASIC_MEMORY_PROXY_URL` based on cloud mode
- `bm cloud login` enables cloud mode, `logout` disables it
- All CLI commands respect cloud mode via existing async_client
2. **Unified CLI Commands**:
- **Remove**: `bm cloud project` commands (duplicate of `bm project`)
- **Enhance**: `bm sync` co-opted for bisync in cloud mode
- **Keep**: `bm cloud login/logout/status/setup` for mode management
- **Result**: `bm project`, `bm tool`, `bm sync` work in both modes
3. **Bisync Integration** (`bisync_commands.py`):
- Remove `--check-access` (no RCLONE_TEST files)
- Sync bucket root (all projects), not single subdirectory
- Project auto-registration before sync
- `bm sync` triggers bisync in cloud mode
- `bm sync --watch` for continuous sync
4. **Config Structure**:
```json
{
"cloud_mode": true,
"cloud_host": "https://cloud.basicmemory.com",
"auth_tokens": {...},
"bisync_config": {
"profile": "balanced",
"sync_dir": "~/basic-memory-cloud-sync"
}
}
```
5. **User Workflows**:
- **Enable cloud**: `bm cloud login` → all commands work remotely
- **Create projects**: `bm project add "name"` creates on cloud
- **Sync files**: `bm sync` runs bisync (all projects)
- **Use tools**: `bm tool write-note` creates notes on cloud
- **Disable cloud**: `bm cloud logout` → back to local mode
## Implementation Tasks
### Phase 1: Cloud Mode Toggle & Config (Foundation) ✅
**1.1 Update Config Schema**
- [x] Add `cloud_mode: bool = False` to Config model
- [x] Add `bisync_config: dict` with `profile` and `sync_dir` fields
- [x] Ensure `cloud_host` field exists
- [x] Add config migration for existing users (defaults handle this)
**1.2 Update async_client.py**
- [x] Read `cloud_mode` from config (not just environment)
- [x] Set `BASIC_MEMORY_PROXY_URL` from config when `cloud_mode=true`
- [x] Priority: env var > config.cloud_host (if cloud_mode) > None (local ASGI)
- [ ] Test both local and cloud mode routing
**1.3 Update Login/Logout Commands**
- [x] `bm cloud login`: Set `cloud_mode=true` and save config
- [x] `bm cloud login`: Set `BASIC_MEMORY_PROXY_URL` environment variable
- [x] `bm cloud logout`: Set `cloud_mode=false` and save config
- [x] `bm cloud logout`: Clear `BASIC_MEMORY_PROXY_URL` environment variable
- [x] `bm cloud status`: Show current mode (local/cloud), connection status
**1.4 Skip Initialization in Cloud Mode** ✅
- [x] Update `ensure_initialization()` to check `cloud_mode` and return early
- [x] Document that `config.projects` is only used in local mode
- [x] Cloud manages its own projects via API, no local reconciliation needed
### Phase 2: Bisync Updates (Multi-Project)
**2.1 Remove RCLONE_TEST Files** ✅
- [x] Update all bisync profiles: `check_access=False`
- [x] Remove RCLONE_TEST creation from `setup_cloud_bisync()`
- [x] Remove RCLONE_TEST upload logic
- [ ] Update documentation
**2.2 Sync Bucket Root (All Projects)** ✅
- [x] Change remote path from `bucket:/basic-memory` to `bucket:/` in `build_bisync_command()`
- [x] Update `setup_cloud_bisync()` to use bucket root
- [ ] Test with multiple projects
**2.3 Project Auto-Registration (Bisync)** ✅
- [x] Add `fetch_cloud_projects()` function (GET /proxy/projects/projects)
- [x] Add `scan_local_directories()` function
- [x] Add `create_cloud_project()` function (POST /proxy/projects/projects)
- [x] Integrate into `run_bisync()`: fetch → scan → create missing → sync
- [x] Wait for API 201 response before syncing
**2.4 Bisync Directory Configuration** ✅
- [x] Add `--dir` parameter to `bm cloud bisync-setup`
- [x] Store bisync directory in config
- [x] Default to `~/basic-memory-cloud-sync/`
- [x] Add `validate_bisync_directory()` safety check
- [x] Update `get_default_mount_path()` to return fixed `~/basic-memory-cloud/`
**2.5 Sync/Status API Infrastructure** ✅ (commit d48b1dc)
- [x] Create `POST /{project}/project/sync` endpoint for background sync
- [x] Create `POST /{project}/project/status` endpoint for scan-only status
- [x] Create `SyncReportResponse` Pydantic schema
- [x] Refactor CLI `sync` command to use API endpoint
- [x] Refactor CLI `status` command to use API endpoint
- [x] Create `command_utils.py` with shared `run_sync()` function
- [x] Update `notify_container_sync()` to call `run_sync()` for each project
- [x] Update all tests to match new API-based implementation
### Phase 3: Sync Command Dual Mode ✅
**3.1 Update `bm sync` Command** ✅
- [x] Check `config.cloud_mode` at start
- [x] If `cloud_mode=false`: Run existing local sync
- [x] If `cloud_mode=true`: Run bisync
- [x] Add `--watch` parameter for continuous sync
- [x] Add `--interval` parameter (default 60 seconds)
- [x] Error if `--watch` used in local mode with helpful message
**3.2 Watch Mode for Bisync** ✅
- [x] Implement `run_bisync_watch()` with interval loop
- [x] Add `--interval` parameter (default 60 seconds)
- [x] Handle errors gracefully, continue on failure
- [x] Show sync progress and status
**3.3 Integrity Check Command** ✅
- [x] Implement `bm cloud check` command using `rclone check`
- [x] Read-only operation that verifies file matching
- [x] Error with helpful messages if rclone/bisync not set up
- [x] Support `--one-way` flag for faster checks
- [x] Transparent about rclone implementation
- [x] Suggest `bm sync` to resolve differences
**Implementation Notes:**
- `bm sync` adapts to cloud mode automatically - users don't need separate commands
- `bm cloud bisync` kept for power users with full options (--dry-run, --resync, --profile, --verbose)
- `bm cloud check` provides integrity verification without transferring data
- Design philosophy: Simplicity for everyday use, transparency about implementation
### Phase 4: Remove Duplicate Commands & Cloud Mode Auth ✅
**4.0 Cloud Mode Authentication** ✅
- [x] Update `async_client.py` to support dual auth sources
- [x] FastMCP context auth (cloud service mode) via `inject_auth_header()`
- [x] JWT token file auth (CLI cloud mode) via `CLIAuth.get_valid_token()`
- [x] Automatic token refresh for CLI cloud mode
- [x] Remove `BASIC_MEMORY_PROXY_URL` environment variable dependency
- [x] Simplify to use only `config.cloud_mode` + `config.cloud_host`
**4.1 Delete `bm cloud project` Commands** ✅
- [x] Remove `bm cloud project list` (use `bm project list`)
- [x] Remove `bm cloud project add` (use `bm project add`)
- [x] Update `core_commands.py` to remove project_app subcommands
- [x] Keep only: `login`, `logout`, `status`, `setup`, `mount`, `unmount`, bisync commands
- [x] Remove unused imports (Table, generate_permalink, os)
- [x] Clean up environment variable references in login/logout
**4.2 CLI Command Cloud Mode Integration** ✅
- [x] Add runtime `cloud_mode_enabled` checks to all CLI commands
- [x] Update `list_projects()` to conditionally authenticate based on cloud mode
- [x] Update `remove_project()` to conditionally authenticate based on cloud mode
- [x] Update `run_sync()` to conditionally authenticate based on cloud mode
- [x] Update `get_project_info()` to conditionally authenticate based on cloud mode
- [x] Update `run_status()` to conditionally authenticate based on cloud mode
- [x] Remove auth from `set_default_project()` (local-only command, no cloud version)
- [x] Create CLI integration tests (`test-int/cli/`) to validate both local and cloud modes
- [x] Replace mock-heavy CLI tests with integration tests (deleted 5 mock test files)
**4.3 OAuth Authentication Fixes** ✅
- [x] Restore missing `SettingsConfigDict` in `BasicMemoryConfig`
- [x] Fix environment variable reading with `BASIC_MEMORY_` prefix
- [x] Fix `.env` file loading
- [x] Fix extra field handling for config files
- [x] Resolve `bm cloud login` OAuth failure ("Something went wrong" error)
- [x] Implement PKCE (Proof Key for Code Exchange) for device flow
- [x] Generate code verifier and SHA256 challenge for device authorization
- [x] Send code_verifier with token polling requests
- [x] Support both PKCE-required and PKCE-optional OAuth clients
- [x] Verify authentication flow works end-to-end with staging and production
- [x] Document WorkOS requirement: redirect URI must be configured even for device flow
**4.4 Update Documentation**
- [ ] Update `cloud-cli.md` with cloud mode toggle workflow
- [ ] Document `bm cloud login` → use normal commands
- [ ] Add examples of cloud mode usage
- [ ] Document mount vs bisync directory isolation
- [ ] Add troubleshooting section
### Phase 5: Mount Updates ✅
**5.1 Fixed Mount Directory** ✅
- [x] Change mount path to `~/basic-memory-cloud/` (fixed, no tenant ID)
- [x] Update `get_default_mount_path()` function
- [x] Remove configurability (fixed location)
- [x] Update mount commands to use new path
**5.2 Mount at Bucket Root** ✅
- [x] Ensure mount uses bucket root (not subdirectory)
- [x] Test with multiple projects
- [x] Verify all projects visible in mount
**Implementation:** Mount uses fixed `~/basic-memory-cloud/` directory and syncs entire bucket root `basic-memory-{tenant_id}:{bucket_name}` for all projects.
### Phase 6: Safety & Validation ✅
**6.1 Directory Conflict Prevention** ✅
- [x] Implement `validate_bisync_directory()` check
- [x] Detect if bisync dir == mount dir
- [x] Detect if bisync dir is currently mounted
- [x] Show clear error messages with solutions
**6.2 State Management** ✅
- [x] Use `--workdir` for bisync state
- [x] Store state in `~/.basic-memory/bisync-state/{tenant-id}/`
- [x] Ensure state directory created before bisync
**Implementation:** `validate_bisync_directory()` prevents conflicts by checking directory equality and mount status. State managed in isolated `~/.basic-memory/bisync-state/{tenant-id}/` directory using `--workdir` flag.
### Phase 7: Cloud-Side Implementation (Deferred to Cloud Repo)
**7.1 Project Discovery Service (Cloud)** - Deferred
- [ ] Create `ProjectDiscoveryService` background job
- [ ] Scan `/app/data/` every 2 minutes
- [ ] Auto-register new directories as projects
- [ ] Log discovery events
- [ ] Handle errors gracefully
**7.2 Project API Updates (Cloud)** - Deferred
- [ ] Ensure `POST /proxy/projects/projects` creates directory synchronously
- [ ] Return 201 with project details
- [ ] Ensure directory ready immediately after creation
**Note:** Phase 7 is cloud-side work that belongs in the basic-memory-cloud repository. The CLI-side implementation (Phase 2.3 auto-registration) is complete and working - it calls the existing cloud API endpoints.
### Phase 8: Testing & Documentation
**8.1 Test Scenarios**
- [x] Test: Cloud mode toggle (login/logout)
- [x] Test: Local-first project creation (bisync)
- [x] Test: Cloud-first project creation (API)
- [x] Test: Multi-project bidirectional sync
- [x] Test: MCP tools in cloud mode
- [x] Test: Watch mode continuous sync
- [x] Test: Safety profile protection (max_delete implemented)
- [x] Test: No RCLONE_TEST files (check_access=False in all profiles)
- [x] Test: Mount/bisync directory isolation (validate_bisync_directory)
- [x] Test: Integrity check command (bm cloud check)
**8.2 Documentation**
- [x] Update cloud-cli.md with cloud mode instructions
- [x] Document Dropbox model paradigm
- [x] Update command reference with new commands
- [x] Document `bm sync` dual mode behavior
- [x] Document `bm cloud check` command
- [x] Document directory structure and fixed paths
- [ ] Update README with quick start
- [ ] Create migration guide for existing users
- [ ] Create video/GIF demos
### Success Criteria Checklist
- [x] `bm cloud login` enables cloud mode for all commands
- [x] `bm cloud logout` reverts to local mode
- [x] `bm project`, `bm tool`, `bm sync` work transparently in both modes
- [x] `bm sync` runs bisync in cloud mode, local sync in local mode
- [x] Single sync operation handles all projects bidirectionally
- [x] Local directories auto-create cloud projects via API
- [x] Cloud projects auto-sync to local directories
- [x] No RCLONE_TEST files in user directories
- [x] Bisync profiles provide safety via `max_delete` limits
- [x] `bm sync --watch` enables continuous sync
- [x] No duplicate `bm cloud project` commands (removed)
- [x] `bm cloud check` command for integrity verification
- [ ] Documentation covers cloud mode toggle and workflows
- [ ] Edge cases handled gracefully with clear errors
## How (High Level)
### Architecture Overview
**Cloud Mode Toggle:**
```
┌─────────────────────────────────────┐
│ bm cloud login │
│ ├─ Authenticate via OAuth │
│ ├─ Set cloud_mode: true in config │
│ └─ Set BASIC_MEMORY_PROXY_URL │
└─────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ All CLI commands use async_client │
│ ├─ async_client checks proxy URL │
│ ├─ If set: HTTP to cloud │
│ └─ If not: Local ASGI │
└─────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ bm project add "work" │
│ bm tool write-note ... │
│ bm sync (triggers bisync) │
│ → All work against cloud │
└─────────────────────────────────────┘
```
**Storage Hierarchy:**
```
Cloud Container: Bucket: Local Sync Dir:
/app/data/ (mounted) ←→ production-tenant-{id}/ ←→ ~/basic-memory-cloud-sync/
├── basic-memory/ ├── basic-memory/ ├── basic-memory/
│ ├── notes/ │ ├── notes/ │ ├── notes/
│ └── concepts/ │ └── concepts/ │ └── concepts/
├── work-project/ ├── work-project/ ├── work-project/
│ └── tasks/ │ └── tasks/ │ └── tasks/
└── personal/ └── personal/ └── personal/
└── journal/ └── journal/ └── journal/
Bidirectional sync via rclone bisync
```
### Sync Flow
**`bm sync` execution (in cloud mode):**
1. **Check cloud mode**
```python
if not config.cloud_mode:
# Run normal local file sync
run_local_sync()
return
# Cloud mode: Run bisync
```
2. **Fetch cloud projects**
```python
# GET /proxy/projects/projects (via async_client)
cloud_projects = fetch_cloud_projects()
cloud_project_names = {p["name"] for p in cloud_projects["projects"]}
```
3. **Scan local sync directory**
```python
sync_dir = config.bisync_config["sync_dir"] # ~/basic-memory-cloud-sync
local_dirs = [d.name for d in sync_dir.iterdir()
if d.is_dir() and not d.name.startswith('.')]
```
4. **Create missing cloud projects**
```python
for dir_name in local_dirs:
if dir_name not in cloud_project_names:
# POST /proxy/projects/projects (via async_client)
create_cloud_project(name=dir_name)
# Blocks until 201 response
```
5. **Run bisync on bucket root**
```bash
rclone bisync \
~/basic-memory-cloud-sync \
basic-memory-{tenant}:{bucket} \
--filters-file ~/.basic-memory/.bmignore.rclone \
--conflict-resolve=newer \
--max-delete=25
# Syncs ALL project subdirectories bidirectionally
```
6. **Notify cloud to refresh** (commit d48b1dc)
```python
# After rclone bisync completes, sync each project's database
for project in cloud_projects:
# POST /{project}/project/sync (via async_client)
# Triggers background sync for this project
await run_sync(project=project_name)
```
### Key Changes
**1. Cloud Mode via Config**
**Config changes:**
```python
class Config:
cloud_mode: bool = False
cloud_host: str = "https://cloud.basicmemory.com"
bisync_config: dict = {
"profile": "balanced",
"sync_dir": "~/basic-memory-cloud-sync"
}
```
**async_client.py behavior:**
```python
def create_client() -> AsyncClient:
# Check config first, then environment
config = ConfigManager().config
proxy_url = os.getenv("BASIC_MEMORY_PROXY_URL") or \
(config.cloud_host if config.cloud_mode else None)
if proxy_url:
return AsyncClient(base_url=proxy_url) # HTTP to cloud
else:
return AsyncClient(transport=ASGITransport(...)) # Local ASGI
```
**2. Login/Logout Sets Cloud Mode**
```python
# bm cloud login
async def login():
# Existing OAuth flow...
success = await auth.login()
if success:
config.cloud_mode = True
config.save()
os.environ["BASIC_MEMORY_PROXY_URL"] = config.cloud_host
```
```python
# bm cloud logout
def logout():
config.cloud_mode = False
config.save()
os.environ.pop("BASIC_MEMORY_PROXY_URL", None)
```
**3. Remove Duplicate Commands**
**Delete:**
- `bm cloud project list` → use `bm project list`
- `bm cloud project add` → use `bm project add`
**Keep:**
- `bm cloud login` - Enable cloud mode
- `bm cloud logout` - Disable cloud mode
- `bm cloud status` - Show current mode & connection
- `bm cloud setup` - Initial bisync setup
- `bm cloud bisync` - Power-user command with full options
- `bm cloud check` - Verify file integrity between local and cloud
**4. Sync Command Dual Mode**
```python
# bm sync
def sync_command(watch: bool = False, profile: str = "balanced"):
config = ConfigManager().config
if config.cloud_mode:
# Run bisync for cloud sync
run_bisync(profile=profile, watch=watch)
else:
# Run local file sync
run_local_sync()
```
**5. Remove RCLONE_TEST Files**
```python
# All profiles: check_access=False
BISYNC_PROFILES = {
"safe": RcloneBisyncProfile(check_access=False, max_delete=10),
"balanced": RcloneBisyncProfile(check_access=False, max_delete=25),
"fast": RcloneBisyncProfile(check_access=False, max_delete=50),
}
```
**6. Sync Bucket Root (All Projects)**
```python
# Sync entire bucket, not subdirectory
rclone_remote = f"basic-memory-{tenant_id}:{bucket_name}"
```
## How to Evaluate
### Test Scenarios
**1. Cloud Mode Toggle**
```bash
# Start in local mode
bm project list
# → Shows local projects
# Enable cloud mode
bm cloud login
# → Authenticates, sets cloud_mode=true
bm project list
# → Now shows cloud projects (same command!)
# Disable cloud mode
bm cloud logout
bm project list
# → Back to local projects
```
**Expected:** ✅ Single command works in both modes
**2. Local-First Project Creation (Cloud Mode)**
```bash
# Enable cloud mode
bm cloud login
# Create new project locally in sync dir
mkdir ~/basic-memory-cloud-sync/my-research
echo "# Research Notes" > ~/basic-memory-cloud-sync/my-research/index.md
# Run sync (triggers bisync in cloud mode)
bm sync
# Verify:
# - Cloud project created automatically via API
# - Files synced to bucket:/my-research/
# - Cloud database updated
# - `bm project list` shows new project
```
**Expected:** ✅ Project visible in cloud project list
**3. Cloud-First Project Creation**
```bash
# In cloud mode
bm project add "work-notes"
# → Creates project on cloud (via async_client HTTP)
# Run sync
bm sync
# Verify:
# - Local directory ~/basic-memory-cloud-sync/work-notes/ created
# - Files sync bidirectionally
# - Can use `bm tool write-note` to add content remotely
```
**Expected:** ✅ Project accessible via all CLI commands
**4. Multi-Project Bidirectional Sync**
```bash
# Setup: 3 projects in cloud mode
# Modify files in all 3 locally and remotely
bm sync
# Verify:
# - All 3 projects sync simultaneously
# - Changes propagate correctly
# - No cross-project interference
```
**Expected:** ✅ All projects in sync state
**5. MCP Tools Work in Cloud Mode**
```bash
# In cloud mode
bm tool write-note \
--title "Meeting Notes" \
--folder "work-notes" \
--content "Discussion points..."
# Verify:
# - Note created on cloud (via async_client HTTP)
# - Next `bm sync` pulls note to local
# - Note appears in ~/basic-memory-cloud-sync/work-notes/
```
**Expected:** ✅ Tools work transparently in cloud mode
**6. Watch Mode Continuous Sync**
```bash
# In cloud mode
bm sync --watch
# While running:
# - Create local folder → auto-creates cloud project
# - Edit files locally → syncs to cloud
# - Edit files remotely → syncs to local
# - Create project via API → appears locally
# Verify:
# - Continuous bidirectional sync
# - New projects handled automatically
# - No manual intervention needed
```
**Expected:** ✅ Seamless continuous sync
**7. Safety Profile Protection**
```bash
# Create project with 15 files locally
# Delete project from cloud (simulate error)
bm sync --profile safe
# Verify:
# - Bisync detects 15 pending deletions
# - Exceeds max_delete=10 limit
# - Aborts with clear error
# - No files deleted locally
```
**Expected:** ✅ Safety limit prevents data loss
**8. No RCLONE_TEST Files**
```bash
# After setup and multiple syncs
ls -la ~/basic-memory-cloud-sync/
# Verify:
# - No RCLONE_TEST files
# - No .rclone state files (in ~/.basic-memory/bisync-state/)
# - Clean directory structure
```
**Expected:** ✅ User directory stays clean
### Success Criteria
- [x] `bm cloud login` enables cloud mode for all commands
- [x] `bm cloud logout` reverts to local mode
- [x] `bm project`, `bm tool`, `bm sync` work in both modes transparently
- [x] `bm sync` runs bisync in cloud mode, local sync in local mode
- [x] Single sync operation handles all projects bidirectionally
- [x] Local directories auto-create cloud projects via API
- [x] Cloud projects auto-sync to local directories
- [x] No RCLONE_TEST files in user directories
- [x] Bisync profiles provide safety via `max_delete` limits
- [x] `bm sync --watch` enables continuous sync
- [x] No duplicate `bm cloud project` commands (removed)
- [x] `bm cloud check` command for integrity verification
- [ ] Documentation covers cloud mode toggle and workflows
- [ ] Edge cases handled gracefully with clear errors
## Notes
### API Contract
**Cloud must provide:**
1. **Project Management APIs:**
- `GET /proxy/projects/projects` - List all projects
- `POST /proxy/projects/projects` - Create project synchronously
- `POST /proxy/sync` - Trigger cache refresh
2. **Project Discovery Service (Background):**
- **Purpose**: Auto-register projects created via mount, direct bucket uploads, or any non-API method
- **Interval**: Every 2 minutes
- **Behavior**:
- Scan `/app/data/` for directories
- Register any directory not already in project database
- Log discovery events
- **Implementation**:
```python
class ProjectDiscoveryService:
"""Background service to auto-discover projects from filesystem."""
async def run(self):
"""Scan /app/data/ and register new project directories."""
data_path = Path("/app/data")
for dir_path in data_path.iterdir():
# Skip hidden and special directories
if not dir_path.is_dir() or dir_path.name.startswith('.'):
continue
project_name = dir_path.name
# Check if project already registered
project = await self.project_repo.get_by_name(project_name)
if not project:
# Auto-register new project
await self.project_repo.create(
name=project_name,
path=str(dir_path)
)
logger.info(f"Auto-discovered project: {project_name}")
```
**Project Creation (API-based):**
- API creates `/app/data/{project-name}/` directory
- Registers project in database
- Returns 201 with project details
- Directory ready for bisync immediately
**Project Creation (Discovery-based):**
- User creates folder via mount: `~/basic-memory-cloud/new-project/`
- Files appear in `/app/data/new-project/` (mounted bucket)
- Discovery service finds directory on next scan (within 2 minutes)
- Auto-registers as project
- User sees project in `bm project list` after discovery
**Why Both Methods:**
- **API**: Immediate registration when using bisync (client-side scan + API call)
- **Discovery**: Delayed registration when using mount (no API call hook)
- **Result**: Projects created ANY way (API, mount, bisync, WebDAV) eventually registered
- **Trade-off**: 2-minute delay for mount-created projects is acceptable
### Mount vs Bisync Directory Isolation
**Critical Safety Requirement**: Mount and bisync MUST use different directories to prevent conflicts.
**The Dropbox Model Applied:**
Both mount and bisync operate at **bucket level** (all projects), following the Dropbox/iCloud paradigm:
```
~/basic-memory-cloud/ # Mount: Read-through cache (like Dropbox folder)
├── work-notes/
├── personal/
└── research/
~/basic-memory-cloud-sync/ # Bisync: Bidirectional sync (like Dropbox sync folder)
├── work-notes/
├── personal/
└── research/
```
**Mount Directory (Fixed):**
```bash
# Fixed location, not configurable
~/basic-memory-cloud/
```
- **Scope**: Entire bucket (all projects)
- **Method**: NFS mount via `rclone nfsmount`
- **Behavior**: Read-through cache to cloud bucket
- **Credentials**: One IAM credential set per tenant
- **Process**: One rclone mount process
- **Use Case**: Quick access, browsing, light editing
- **Known Issue**: Obsidian compatibility problems with NFS
- **Not Configurable**: Fixed location prevents user error
**Why Fixed Location:**
- One mount point per machine (like `/Users/you/Dropbox`)
- Prevents credential proliferation (one credential set, not N)
- Prevents multiple mount processes (resource efficiency)
- Familiar pattern users already understand
- Simple operations: `mount` once, `unmount` once
**Bisync Directory (User Configurable):**
```bash
# Default location
~/basic-memory-cloud-sync/
# User can override
bm cloud setup --dir ~/my-knowledge-base
```
- **Scope**: Entire bucket (all projects)
- **Method**: Bidirectional sync via `rclone bisync`
- **Behavior**: Full local copy with periodic sync
- **Credentials**: Same IAM credential set as mount
- **Use Case**: Full offline access, reliable editing, Obsidian support
- **Configurable**: Users may want specific locations (external drive, existing folder structure)
**Why User Configurable:**
- Users have preferences for where local copies live
- May want sync folder on external drive
- May want to integrate with existing folder structure
- Default works for most, option available for power users
**Conflict Prevention:**
```python
def validate_bisync_directory(bisync_dir: Path):
"""Ensure bisync directory doesn't conflict with mount."""
mount_dir = Path.home() / "basic-memory-cloud"
if bisync_dir.resolve() == mount_dir.resolve():
raise BisyncError(
f"Cannot use {bisync_dir} for bisync - it's the mount directory!\n"
f"Mount and bisync must use different directories.\n\n"
f"Options:\n"
f" 1. Use default: ~/basic-memory-cloud-sync/\n"
f" 2. Specify different directory: --dir ~/my-sync-folder"
)
# Check if mount is active at this location
result = subprocess.run(["mount"], capture_output=True, text=True)
if str(bisync_dir) in result.stdout and "rclone" in result.stdout:
raise BisyncError(
f"{bisync_dir} is currently mounted via 'bm cloud mount'\n"
f"Cannot use mounted directory for bisync.\n\n"
f"Either:\n"
f" 1. Unmount first: bm cloud unmount\n"
f" 2. Use different directory for bisync"
)
```
**Why This Matters:**
- Mounting and syncing the SAME directory would create infinite loops
- rclone mount → bisync detects changes → syncs to bucket → mount sees changes → triggers bisync → ∞
- Separate directories = clean separation of concerns
- Mount is read-heavy caching layer, bisync is write-heavy bidirectional sync
### Future Enhancements
**Phase 2 (Not in this spec):**
- **Near Real-Time Sync**: Integrate `watch_service.py` with cloud mode
- Watch service detects local changes (already battle-tested)
- Queue changes in memory
- Use `rclone copy` for individual file sync (near instant)
- Example: `rclone copyto ~/sync/project/file.md tenant:{bucket}/project/file.md`
- Fallback to full `rclone bisync` every N seconds for bidirectional changes
- Provides near real-time sync without polling overhead
- Per-project bisync profiles (different safety levels per project)
- Selective project sync (exclude specific projects from sync)
- Project deletion workflow (cascade to cloud/local)
- Conflict resolution UI/CLI
**Phase 3:**
- Project sharing between tenants
- Incremental backup/restore
- Sync statistics and bandwidth monitoring
- Mobile app integration with cloud mode
### Related Specs
- **SPEC-8**: TigrisFS Integration - Original bisync implementation
- **SPEC-6**: Explicit Project Parameter Architecture - Multi-project foundations
- **SPEC-5**: CLI Cloud Upload via WebDAV - Cloud file operations
### Implementation Notes
**Architectural Simplifications:**
- **Unified CLI**: Eliminated duplicate commands by using mode toggle
- **Single Entry Point**: All commands route through `async_client` which handles mode
- **Config-Driven**: Cloud mode stored in persistent config, not just environment
- **Transparent Routing**: Existing commands work without modification in cloud mode
**Complexity Trade-offs:**
- Removed: Separate `bm cloud project` command namespace
- Removed: Complex state detection for new projects
- Removed: RCLONE_TEST marker file management
- Added: Simple cloud_mode flag and config integration
- Added: Simple project list comparison before sync
- Relied on: Existing bisync profile safety mechanisms
- Result: Significantly simpler, more maintainable code
**User Experience:**
- **Mental Model**: "Toggle cloud mode, use normal commands"
- **No Learning Curve**: Same commands work locally and in cloud
- **Minimal Config**: Just login/logout to switch modes
- **Safety**: Profile system gives users control over safety/speed trade-offs
- **"Just Works"**: Create folders anywhere, they sync automatically
**Migration Path:**
- Existing `bm cloud project` users: Use `bm project` instead
- Existing `bm cloud bisync` becomes `bm sync` in cloud mode
- Config automatically migrates on first `bm cloud login`
## Testing
Initial Setup (One Time)
1. Login to cloud and enable cloud mode:
bm cloud login
# → Authenticates via OAuth
# → Sets cloud_mode=true in config
# → Sets BASIC_MEMORY_PROXY_URL environment variable
# → All CLI commands now route to cloud
2. Check cloud mode status:
bm cloud status
# → Shows: Mode: Cloud (enabled)
# → Shows: Host: https://cloud.basicmemory.com
# → Checks cloud health
3. Set up bidirectional sync:
bm cloud bisync-setup
# Or with custom directory:
bm cloud bisync-setup --dir ~/my-sync-folder
# This will:
# → Install rclone (if not already installed)
# → Get tenant info (tenant_id, bucket_name)
# → Generate scoped IAM credentials
# → Configure rclone with credentials
# → Create sync directory (default: ~/basic-memory-cloud-sync/)
# → Validate no conflict with mount directory
# → Run initial --resync to establish baseline
Normal Usage
4. Create local project and sync:
# Create a local project directory
mkdir ~/basic-memory-cloud-sync/my-research
echo "# Research Notes" > ~/basic-memory-cloud-sync/my-research/readme.md
# Run sync
bm cloud bisync
# Auto-magic happens:
# → Checks for new local directories
# → Finds "my-research" not in cloud
# → Creates project on cloud via POST /proxy/projects/projects
# → Runs bidirectional sync (all projects)
# → Syncs to bucket root (all projects synced together)
5. Watch mode for continuous sync:
bm cloud bisync --watch
# Or with custom interval:
bm cloud bisync --watch --interval 30
# → Syncs every 60 seconds (or custom interval)
# → Auto-registers new projects on each run
# → Press Ctrl+C to stop
6. Check bisync status:
bm cloud bisync-status
# → Shows tenant ID
# → Shows sync directory path
# → Shows initialization status
# → Shows last sync time
# → Lists available profiles (safe/balanced/fast)
7. Manual sync with different profiles:
# Safe mode (max 10 deletes, preserves conflicts)
bm cloud bisync --profile safe
# Balanced mode (max 25 deletes, auto-resolve to newer) - default
bm cloud bisync --profile balanced
# Fast mode (max 50 deletes, skip verification)
bm cloud bisync --profile fast
8. Dry run to preview changes:
bm cloud bisync --dry-run
# → Shows what would be synced without making changes
9. Force resync (if needed):
bm cloud bisync --resync
# → Establishes new baseline
# → Use if sync state is corrupted
10. Check file integrity:
bm cloud check
# → Verifies all files match between local and cloud
# → Read-only operation (no data transfer)
# → Shows differences if any found
# Faster one-way check
bm cloud check --one-way
# → Only checks for missing files on destination
Verify Cloud Mode Integration
11. Test that all commands work in cloud mode:
# List cloud projects (not local)
bm project list
# Create project on cloud
bm project add "work-notes"
# Use MCP tools against cloud
bm tool write-note --title "Test" --folder "my-research" --content "Hello"
# All of these work against cloud because cloud_mode=true
12. Switch back to local mode:
bm cloud logout
# → Sets cloud_mode=false
# → Clears BASIC_MEMORY_PROXY_URL
# → All commands now work locally again
Expected Directory Structure
~/basic-memory-cloud-sync/ # Your local sync directory
├── my-research/ # Auto-created cloud project
│ ├── readme.md
│ └── notes.md
├── work-notes/ # Another project
│ └── tasks.md
└── personal/ # Another project
└── journal.md
# All sync bidirectionally with:
bucket:/ # Cloud bucket root
├── my-research/
├── work-notes/
└── personal/
Key Points to Test
1. ✅ Cloud mode toggle works (login/logout)
2. ✅ Bisync setup validates directory (no conflict with mount)
3. ✅ Local directories auto-create cloud projects
4. ✅ All projects sync together (bucket root)
5. ✅ No RCLONE_TEST files created
6. ✅ Changes sync bidirectionally
7. ✅ Watch mode continuous sync works
8. ✅ Profile safety limits work (max_delete)
9. ✅ `bm sync` adapts to cloud mode automatically
10. ✅ `bm cloud check` verifies file integrity without side effects
```
--------------------------------------------------------------------------------
/tests/mcp/test_tool_write_note.py:
--------------------------------------------------------------------------------
```python
"""Tests for note tools that exercise the full stack with SQLite."""
from textwrap import dedent
import pytest
from basic_memory.mcp.tools import write_note, read_note, delete_note
from basic_memory.utils import normalize_newlines
@pytest.mark.asyncio
async def test_write_note(app, test_project):
"""Test creating a new note.
Should:
- Create entity with correct type and content
- Save markdown content
- Handle tags correctly
- Return valid permalink
"""
result = await write_note.fn(
project=test_project.name,
title="Test Note",
folder="test",
content="# Test\nThis is a test note",
tags=["test", "documentation"],
)
assert result
assert "# Created note" in result
assert f"project: {test_project.name}" in result
assert "file_path: test/Test Note.md" in result
assert "permalink: test/test-note" in result
assert "## Tags" in result
assert "- test, documentation" in result
assert f"[Session: Using project '{test_project.name}']" in result
# Try reading it back via permalink
content = await read_note.fn("test/test-note", project=test_project.name)
assert (
normalize_newlines(
dedent("""
---
title: Test Note
type: note
permalink: test/test-note
tags:
- test
- documentation
---
# Test
This is a test note
""").strip()
)
in content
)
@pytest.mark.asyncio
async def test_write_note_no_tags(app, test_project):
"""Test creating a note without tags."""
result = await write_note.fn(
project=test_project.name, title="Simple Note", folder="test", content="Just some text"
)
assert result
assert "# Created note" in result
assert f"project: {test_project.name}" in result
assert "file_path: test/Simple Note.md" in result
assert "permalink: test/simple-note" in result
assert f"[Session: Using project '{test_project.name}']" in result
# Should be able to read it back
content = await read_note.fn("test/simple-note", project=test_project.name)
assert (
normalize_newlines(
dedent("""
---
title: Simple Note
type: note
permalink: test/simple-note
---
Just some text
""").strip()
)
in content
)
@pytest.mark.asyncio
async def test_write_note_update_existing(app, test_project):
"""Test creating a new note.
Should:
- Create entity with correct type and content
- Save markdown content
- Handle tags correctly
- Return valid permalink
"""
result = await write_note.fn(
project=test_project.name,
title="Test Note",
folder="test",
content="# Test\nThis is a test note",
tags=["test", "documentation"],
)
assert result # Got a valid permalink
assert "# Created note" in result
assert f"project: {test_project.name}" in result
assert "file_path: test/Test Note.md" in result
assert "permalink: test/test-note" in result
assert "## Tags" in result
assert "- test, documentation" in result
assert f"[Session: Using project '{test_project.name}']" in result
result = await write_note.fn(
project=test_project.name,
title="Test Note",
folder="test",
content="# Test\nThis is an updated note",
tags=["test", "documentation"],
)
assert "# Updated note" in result
assert f"project: {test_project.name}" in result
assert "file_path: test/Test Note.md" in result
assert "permalink: test/test-note" in result
assert "## Tags" in result
assert "- test, documentation" in result
assert f"[Session: Using project '{test_project.name}']" in result
# Try reading it back
content = await read_note.fn("test/test-note", project=test_project.name)
assert (
normalize_newlines(
dedent(
"""
---
title: Test Note
type: note
permalink: test/test-note
tags:
- test
- documentation
---
# Test
This is an updated note
"""
).strip()
)
== content
)
@pytest.mark.asyncio
async def test_issue_93_write_note_respects_custom_permalink_new_note(app, test_project):
"""Test that write_note respects custom permalinks in frontmatter for new notes (Issue #93)"""
# Create a note with custom permalink in frontmatter
content_with_custom_permalink = dedent("""
---
permalink: custom/my-desired-permalink
---
# My New Note
This note has a custom permalink specified in frontmatter.
- [note] Testing if custom permalink is respected
""").strip()
result = await write_note.fn(
project=test_project.name,
title="My New Note",
folder="notes",
content=content_with_custom_permalink,
)
# Verify the custom permalink is respected
assert "# Created note" in result
assert f"project: {test_project.name}" in result
assert "file_path: notes/My New Note.md" in result
assert "permalink: custom/my-desired-permalink" in result
assert f"[Session: Using project '{test_project.name}']" in result
@pytest.mark.asyncio
async def test_issue_93_write_note_respects_custom_permalink_existing_note(app, test_project):
"""Test that write_note respects custom permalinks when updating existing notes (Issue #93)"""
# Step 1: Create initial note (auto-generated permalink)
result1 = await write_note.fn(
project=test_project.name,
title="Existing Note",
folder="test",
content="Initial content without custom permalink",
)
assert "# Created note" in result1
assert f"project: {test_project.name}" in result1
# Extract the auto-generated permalink
initial_permalink = None
for line in result1.split("\n"):
if line.startswith("permalink:"):
initial_permalink = line.split(":", 1)[1].strip()
break
assert initial_permalink is not None
# Step 2: Update with content that includes custom permalink in frontmatter
updated_content = dedent("""
---
permalink: custom/new-permalink
---
# Existing Note
Updated content with custom permalink in frontmatter.
- [note] Custom permalink should be respected on update
""").strip()
result2 = await write_note.fn(
project=test_project.name,
title="Existing Note",
folder="test",
content=updated_content,
)
# Verify the custom permalink is respected
assert "# Updated note" in result2
assert f"project: {test_project.name}" in result2
assert "permalink: custom/new-permalink" in result2
assert f"permalink: {initial_permalink}" not in result2
assert f"[Session: Using project '{test_project.name}']" in result2
@pytest.mark.asyncio
async def test_delete_note_existing(app, test_project):
"""Test deleting a new note.
Should:
- Create entity with correct type and content
- Return valid permalink
- Delete the note
"""
result = await write_note.fn(
project=test_project.name,
title="Test Note",
folder="test",
content="# Test\nThis is a test note",
tags=["test", "documentation"],
)
assert result
assert f"project: {test_project.name}" in result
deleted = await delete_note.fn("test/test-note", project=test_project.name)
assert deleted is True
@pytest.mark.asyncio
async def test_delete_note_doesnt_exist(app, test_project):
"""Test deleting a new note.
Should:
- Delete the note
- verify returns false
"""
deleted = await delete_note.fn("doesnt-exist", project=test_project.name)
assert deleted is False
@pytest.mark.asyncio
async def test_write_note_with_tag_array_from_bug_report(app, test_project):
"""Test creating a note with a tag array as reported in issue #38.
This reproduces the exact payload from the bug report where Cursor
was passing an array of tags and getting a type mismatch error.
"""
# This is the exact payload from the bug report
bug_payload = {
"project": test_project.name,
"title": "Title",
"folder": "folder",
"content": "CONTENT",
"tags": ["hipporag", "search", "fallback", "symfony", "error-handling"],
}
# Try to call the function with this data directly
result = await write_note.fn(**bug_payload)
assert result
assert f"project: {test_project.name}" in result
assert "permalink: folder/title" in result
assert "Tags" in result
assert "hipporag" in result
assert f"[Session: Using project '{test_project.name}']" in result
@pytest.mark.asyncio
async def test_write_note_verbose(app, test_project):
"""Test creating a new note.
Should:
- Create entity with correct type and content
- Save markdown content
- Handle tags correctly
- Return valid permalink
"""
result = await write_note.fn(
project=test_project.name,
title="Test Note",
folder="test",
content="""
# Test\nThis is a test note
- [note] First observation
- relates to [[Knowledge]]
""",
tags=["test", "documentation"],
)
assert "# Created note" in result
assert f"project: {test_project.name}" in result
assert "file_path: test/Test Note.md" in result
assert "permalink: test/test-note" in result
assert "## Observations" in result
assert "- note: 1" in result
assert "## Relations" in result
assert "## Tags" in result
assert "- test, documentation" in result
assert f"[Session: Using project '{test_project.name}']" in result
@pytest.mark.asyncio
async def test_write_note_preserves_custom_metadata(app, project_config, test_project):
"""Test that updating a note preserves custom metadata fields.
Reproduces issue #36 where custom frontmatter fields like Status
were being lost when updating notes with the write_note tool.
Should:
- Create a note with custom frontmatter
- Update the note with new content
- Verify custom frontmatter is preserved
"""
# First, create a note with custom metadata using write_note
await write_note.fn(
project=test_project.name,
title="Custom Metadata Note",
folder="test",
content="# Initial content",
tags=["test"],
)
# Read the note to get its permalink
content = await read_note.fn("test/custom-metadata-note", project=test_project.name)
# Now directly update the file with custom frontmatter
# We need to use a direct file update to add custom frontmatter
import frontmatter
file_path = project_config.home / "test" / "Custom Metadata Note.md"
post = frontmatter.load(file_path)
# Add custom frontmatter
post["Status"] = "In Progress"
post["Priority"] = "High"
post["Version"] = "1.0"
# Write the file back
with open(file_path, "w") as f:
f.write(frontmatter.dumps(post))
# Now update the note using write_note
result = await write_note.fn(
project=test_project.name,
title="Custom Metadata Note",
folder="test",
content="# Updated content",
tags=["test", "updated"],
)
# Verify the update was successful
assert (
"Updated note\nproject: test-project\nfile_path: test/Custom Metadata Note.md"
) in result
assert f"project: {test_project.name}" in result
# Read the note back and check if custom frontmatter is preserved
content = await read_note.fn("test/custom-metadata-note", project=test_project.name)
# Custom frontmatter should be preserved
assert "Status: In Progress" in content
assert "Priority: High" in content
# Version might be quoted as '1.0' due to YAML serialization
assert "Version:" in content # Just check that the field exists
assert "1.0" in content # And that the value exists somewhere
# And new content should be there
assert "# Updated content" in content
# And tags should be updated (without # prefix)
assert "- test" in content
assert "- updated" in content
@pytest.mark.asyncio
async def test_write_note_preserves_content_frontmatter(app, test_project):
"""Test creating a new note."""
await write_note.fn(
project=test_project.name,
title="Test Note",
folder="test",
content=dedent(
"""
---
title: Test Note
type: note
version: 1.0
author: name
---
# Test
This is a test note
"""
),
tags=["test", "documentation"],
)
# Try reading it back via permalink
content = await read_note.fn("test/test-note", project=test_project.name)
assert (
normalize_newlines(
dedent(
"""
---
title: Test Note
type: note
permalink: test/test-note
version: 1.0
author: name
tags:
- test
- documentation
---
# Test
This is a test note
"""
).strip()
)
in content
)
@pytest.mark.asyncio
async def test_write_note_permalink_collision_fix_issue_139(app, test_project):
"""Test fix for GitHub Issue #139: UNIQUE constraint failed: entity.permalink.
This reproduces the exact scenario described in the issue:
1. Create a note with title "Note 1"
2. Create another note with title "Note 2"
3. Try to create/replace first note again with same title "Note 1"
Before the fix, step 3 would fail with UNIQUE constraint error.
After the fix, it should either update the existing note or create with unique permalink.
"""
# Step 1: Create first note
result1 = await write_note.fn(
project=test_project.name,
title="Note 1",
folder="test",
content="Original content for note 1",
)
assert "# Created note" in result1
assert f"project: {test_project.name}" in result1
assert "permalink: test/note-1" in result1
# Step 2: Create second note with different title
result2 = await write_note.fn(
project=test_project.name, title="Note 2", folder="test", content="Content for note 2"
)
assert "# Created note" in result2
assert f"project: {test_project.name}" in result2
assert "permalink: test/note-2" in result2
# Step 3: Try to create/replace first note again
# This scenario would trigger the UNIQUE constraint failure before the fix
result3 = await write_note.fn(
project=test_project.name,
title="Note 1", # Same title as first note
folder="test", # Same folder as first note
content="Replacement content for note 1", # Different content
)
# This should not raise a UNIQUE constraint failure error
# It should succeed and either:
# 1. Update the existing note (preferred behavior)
# 2. Create a new note with unique permalink (fallback behavior)
assert result3 is not None
assert f"project: {test_project.name}" in result3
assert "Updated note" in result3 or "Created note" in result3
# The result should contain either the original permalink or a unique one
assert "permalink: test/note-1" in result3 or "permalink: test/note-1-1" in result3
# Verify we can read back the content
if "permalink: test/note-1" in result3:
# Updated existing note case
content = await read_note.fn("test/note-1", project=test_project.name)
assert "Replacement content for note 1" in content
else:
# Created new note with unique permalink case
content = await read_note.fn(test_project.name, "test/note-1-1")
assert "Replacement content for note 1" in content
# Original note should still exist
original_content = await read_note.fn(test_project.name, "test/note-1")
assert "Original content for note 1" in original_content
@pytest.mark.asyncio
async def test_write_note_with_custom_entity_type(app, test_project):
"""Test creating a note with custom entity_type parameter.
This test verifies the fix for Issue #144 where entity_type parameter
was hardcoded to "note" instead of allowing custom types.
"""
result = await write_note.fn(
project=test_project.name,
title="Test Guide",
folder="guides",
content="# Guide Content\nThis is a guide",
tags=["guide", "documentation"],
entity_type="guide",
)
assert result
assert "# Created note" in result
assert f"project: {test_project.name}" in result
assert "file_path: guides/Test Guide.md" in result
assert "permalink: guides/test-guide" in result
assert "## Tags" in result
assert "- guide, documentation" in result
assert f"[Session: Using project '{test_project.name}']" in result
# Verify the entity type is correctly set in the frontmatter
content = await read_note.fn("guides/test-guide", project=test_project.name)
assert (
normalize_newlines(
dedent("""
---
title: Test Guide
type: guide
permalink: guides/test-guide
tags:
- guide
- documentation
---
# Guide Content
This is a guide
""").strip()
)
in content
)
@pytest.mark.asyncio
async def test_write_note_with_report_entity_type(app, test_project):
"""Test creating a note with entity_type="report"."""
result = await write_note.fn(
project=test_project.name,
title="Monthly Report",
folder="reports",
content="# Monthly Report\nThis is a monthly report",
tags=["report", "monthly"],
entity_type="report",
)
assert result
assert "# Created note" in result
assert f"project: {test_project.name}" in result
assert "file_path: reports/Monthly Report.md" in result
assert "permalink: reports/monthly-report" in result
assert f"[Session: Using project '{test_project.name}']" in result
# Verify the entity type is correctly set in the frontmatter
content = await read_note.fn("reports/monthly-report", project=test_project.name)
assert "type: report" in content
assert "# Monthly Report" in content
@pytest.mark.asyncio
async def test_write_note_with_config_entity_type(app, test_project):
"""Test creating a note with entity_type="config"."""
result = await write_note.fn(
project=test_project.name,
title="System Config",
folder="config",
content="# System Configuration\nThis is a config file",
entity_type="config",
)
assert result
assert "# Created note" in result
assert f"project: {test_project.name}" in result
assert "file_path: config/System Config.md" in result
assert "permalink: config/system-config" in result
assert f"[Session: Using project '{test_project.name}']" in result
# Verify the entity type is correctly set in the frontmatter
content = await read_note.fn("config/system-config", project=test_project.name)
assert "type: config" in content
assert "# System Configuration" in content
@pytest.mark.asyncio
async def test_write_note_entity_type_default_behavior(app, test_project):
"""Test that the entity_type parameter defaults to "note" when not specified.
This ensures backward compatibility - existing code that doesn't specify
entity_type should continue to work as before.
"""
result = await write_note.fn(
project=test_project.name,
title="Default Type Test",
folder="test",
content="# Default Type Test\nThis should be type 'note'",
tags=["test"],
)
assert result
assert "# Created note" in result
assert f"project: {test_project.name}" in result
assert "file_path: test/Default Type Test.md" in result
assert "permalink: test/default-type-test" in result
assert f"[Session: Using project '{test_project.name}']" in result
# Verify the entity type defaults to "note"
content = await read_note.fn("test/default-type-test", project=test_project.name)
assert "type: note" in content
assert "# Default Type Test" in content
@pytest.mark.asyncio
async def test_write_note_update_existing_with_different_entity_type(app, test_project):
"""Test updating an existing note with a different entity_type."""
# Create initial note as "note" type
result1 = await write_note.fn(
project=test_project.name,
title="Changeable Type",
folder="test",
content="# Initial Content\nThis starts as a note",
tags=["test"],
entity_type="note",
)
assert result1
assert "# Created note" in result1
assert f"project: {test_project.name}" in result1
# Update the same note with a different entity_type
result2 = await write_note.fn(
project=test_project.name,
title="Changeable Type",
folder="test",
content="# Updated Content\nThis is now a guide",
tags=["guide"],
entity_type="guide",
)
assert result2
assert "# Updated note" in result2
assert f"project: {test_project.name}" in result2
# Verify the entity type was updated
content = await read_note.fn("test/changeable-type", project=test_project.name)
assert "type: guide" in content
assert "# Updated Content" in content
assert "- guide" in content
@pytest.mark.asyncio
async def test_write_note_respects_frontmatter_entity_type(app, test_project):
"""Test that entity_type in frontmatter is respected when parameter is not provided.
This verifies that when write_note is called without entity_type parameter,
but the content includes frontmatter with a 'type' field, that type is respected
instead of defaulting to 'note'.
"""
note = dedent("""
---
title: Test Guide
type: guide
permalink: guides/test-guide
tags:
- guide
- documentation
---
# Guide Content
This is a guide
""").strip()
# Call write_note without entity_type parameter - it should respect frontmatter type
result = await write_note.fn(
project=test_project.name, title="Test Guide", folder="guides", content=note
)
assert result
assert "# Created note" in result
assert f"project: {test_project.name}" in result
assert "file_path: guides/Test Guide.md" in result
assert "permalink: guides/test-guide" in result
assert f"[Session: Using project '{test_project.name}']" in result
# Verify the entity type from frontmatter is respected (should be "guide", not "note")
content = await read_note.fn("guides/test-guide", project=test_project.name)
assert "type: guide" in content
assert "# Guide Content" in content
assert "- guide" in content
assert "- documentation" in content
class TestWriteNoteSecurityValidation:
"""Test write_note security validation features."""
@pytest.mark.asyncio
async def test_write_note_blocks_path_traversal_unix(self, app, test_project):
"""Test that Unix-style path traversal attacks are blocked in folder parameter."""
# Test various Unix-style path traversal patterns
attack_folders = [
"../",
"../../",
"../../../",
"../secrets",
"../../etc",
"../../../etc/passwd_folder",
"notes/../../../etc",
"folder/../../outside",
"../../../../malicious",
]
for attack_folder in attack_folders:
result = await write_note.fn(
project=test_project.name,
title="Test Note",
folder=attack_folder,
content="# Test Content\nThis should be blocked by security validation.",
)
assert isinstance(result, str)
assert "# Error" in result
assert "paths must stay within project boundaries" in result
assert attack_folder in result
@pytest.mark.asyncio
async def test_write_note_blocks_path_traversal_windows(self, app, test_project):
"""Test that Windows-style path traversal attacks are blocked in folder parameter."""
# Test various Windows-style path traversal patterns
attack_folders = [
"..\\",
"..\\..\\",
"..\\..\\..\\",
"..\\secrets",
"..\\..\\Windows",
"..\\..\\..\\Windows\\System32",
"notes\\..\\..\\..\\Windows",
"\\\\server\\share",
"\\\\..\\..\\Windows",
]
for attack_folder in attack_folders:
result = await write_note.fn(
project=test_project.name,
title="Test Note",
folder=attack_folder,
content="# Test Content\nThis should be blocked by security validation.",
)
assert isinstance(result, str)
assert "# Error" in result
assert "paths must stay within project boundaries" in result
assert attack_folder in result
@pytest.mark.asyncio
async def test_write_note_blocks_absolute_paths(self, app, test_project):
"""Test that absolute paths are blocked in folder parameter."""
# Test various absolute path patterns
attack_folders = [
"/etc",
"/home/user",
"/var/log",
"/root",
"C:\\Windows",
"C:\\Users\\user",
"D:\\secrets",
"/tmp/malicious",
"/usr/local/evil",
]
for attack_folder in attack_folders:
result = await write_note.fn(
project=test_project.name,
title="Test Note",
folder=attack_folder,
content="# Test Content\nThis should be blocked by security validation.",
)
assert isinstance(result, str)
assert "# Error" in result
assert "paths must stay within project boundaries" in result
assert attack_folder in result
@pytest.mark.asyncio
async def test_write_note_blocks_home_directory_access(self, app, test_project):
"""Test that home directory access patterns are blocked in folder parameter."""
# Test various home directory access patterns
attack_folders = [
"~",
"~/",
"~/secrets",
"~/.ssh",
"~/Documents",
"~\\AppData",
"~\\Desktop",
"~/.env_folder",
]
for attack_folder in attack_folders:
result = await write_note.fn(
project=test_project.name,
title="Test Note",
folder=attack_folder,
content="# Test Content\nThis should be blocked by security validation.",
)
assert isinstance(result, str)
assert "# Error" in result
assert "paths must stay within project boundaries" in result
assert attack_folder in result
@pytest.mark.asyncio
async def test_write_note_blocks_mixed_attack_patterns(self, app, test_project):
"""Test that mixed legitimate/attack patterns are blocked in folder parameter."""
# Test mixed patterns that start legitimate but contain attacks
attack_folders = [
"notes/../../../etc",
"docs/../../.env_folder",
"legitimate/path/../../.ssh",
"project/folder/../../../Windows",
"valid/folder/../../home/user",
"assets/../../../tmp/evil",
]
for attack_folder in attack_folders:
result = await write_note.fn(
project=test_project.name,
title="Test Note",
folder=attack_folder,
content="# Test Content\nThis should be blocked by security validation.",
)
assert isinstance(result, str)
assert "# Error" in result
assert "paths must stay within project boundaries" in result
@pytest.mark.asyncio
async def test_write_note_allows_safe_folder_paths(self, app, test_project):
"""Test that legitimate folder paths are still allowed."""
# Test various safe folder patterns
safe_folders = [
"notes",
"docs",
"projects/2025",
"archive/old-notes",
"deep/nested/directory/structure",
"folder/subfolder",
"research/ml",
"meeting-notes",
]
for safe_folder in safe_folders:
result = await write_note.fn(
project=test_project.name,
title=f"Test Note in {safe_folder.replace('/', '-')}",
folder=safe_folder,
content="# Test Content\nThis should work normally with security validation.",
tags=["test", "security"],
)
# Should succeed (not a security error)
assert isinstance(result, str)
assert "# Error" not in result
assert "paths must stay within project boundaries" not in result
# Should be normal successful creation/update
assert ("# Created note" in result) or ("# Updated note" in result)
assert safe_folder in result # Should show in file_path
@pytest.mark.asyncio
async def test_write_note_empty_folder_security(self, app, test_project):
"""Test that empty folder parameter is handled securely."""
# Empty folder should be allowed (creates in root)
result = await write_note.fn(
project=test_project.name,
title="Root Note",
folder="",
content="# Root Note\nThis note should be created in the project root.",
)
assert isinstance(result, str)
# Empty folder should not trigger security error
assert "# Error" not in result
assert "paths must stay within project boundaries" not in result
# Should succeed normally
assert ("# Created note" in result) or ("# Updated note" in result)
@pytest.mark.asyncio
async def test_write_note_none_folder_security(self, app, test_project):
"""Test that default folder behavior works securely when folder is omitted."""
# The write_note function requires folder parameter, but we can test with empty string
# which effectively creates in project root
result = await write_note.fn(
project=test_project.name,
title="Root Folder Note",
folder="", # Empty string instead of None since folder is required
content="# Root Folder Note\nThis note should be created in the project root.",
)
assert isinstance(result, str)
# Empty folder should not trigger security error
assert "# Error" not in result
assert "paths must stay within project boundaries" not in result
# Should succeed normally
assert ("# Created note" in result) or ("# Updated note" in result)
@pytest.mark.asyncio
async def test_write_note_current_directory_references_security(self, app, test_project):
"""Test that current directory references are handled securely."""
# Test current directory references (should be safe)
safe_folders = [
"./notes",
"folder/./subfolder",
"./folder/subfolder",
]
for safe_folder in safe_folders:
result = await write_note.fn(
project=test_project.name,
title=f"Current Dir Test {safe_folder.replace('/', '-').replace('.', 'dot')}",
folder=safe_folder,
content="# Current Directory Test\nThis should work with current directory references.",
)
assert isinstance(result, str)
# Should NOT contain security error message
assert "# Error" not in result
assert "paths must stay within project boundaries" not in result
# Should succeed normally
assert ("# Created note" in result) or ("# Updated note" in result)
@pytest.mark.asyncio
async def test_write_note_security_with_all_parameters(self, app, test_project):
"""Test security validation works with all write_note parameters."""
# Test that security validation is applied even when all other parameters are provided
result = await write_note.fn(
project=test_project.name,
title="Security Test with All Params",
folder="../../../etc/malicious",
content="# Malicious Content\nThis should be blocked by security validation.",
tags=["malicious", "test"],
entity_type="guide",
)
assert isinstance(result, str)
assert "# Error" in result
assert "paths must stay within project boundaries" in result
assert "../../../etc/malicious" in result
@pytest.mark.asyncio
async def test_write_note_security_logging(self, app, test_project, caplog):
"""Test that security violations are properly logged."""
# Attempt path traversal attack
result = await write_note.fn(
project=test_project.name,
title="Security Logging Test",
folder="../../../etc/passwd_folder",
content="# Test Content\nThis should trigger security logging.",
)
assert "# Error" in result
assert "paths must stay within project boundaries" in result
# Check that security violation was logged
# Note: This test may need adjustment based on the actual logging setup
# The security validation should generate a warning log entry
@pytest.mark.asyncio
async def test_write_note_preserves_functionality_with_security(self, app, test_project):
"""Test that security validation doesn't break normal note creation functionality."""
# Create a note with all features to ensure security validation doesn't interfere
result = await write_note.fn(
project=test_project.name,
title="Full Feature Security Test",
folder="security-tests",
content=dedent("""
# Full Feature Security Test
This note tests that security validation doesn't break normal functionality.
## Observations
- [security] Path validation working correctly #security
- [feature] All features still functional #test
## Relations
- relates_to [[Security Implementation]]
- depends_on [[Path Validation]]
Additional content with various formatting.
""").strip(),
tags=["security", "test", "full-feature"],
entity_type="guide",
)
# Should succeed normally
assert isinstance(result, str)
assert "# Error" not in result
assert "paths must stay within project boundaries" not in result
assert "# Created note" in result
assert "file_path: security-tests/Full Feature Security Test.md" in result
assert "permalink: security-tests/full-feature-security-test" in result
# Should process observations and relations
assert "## Observations" in result
assert "## Relations" in result
assert "## Tags" in result
# Should show proper counts
assert "security: 1" in result
assert "feature: 1" in result
class TestWriteNoteSecurityEdgeCases:
"""Test edge cases for write_note security validation."""
@pytest.mark.asyncio
async def test_write_note_unicode_folder_attacks(self, app, test_project):
"""Test that Unicode-based path traversal attempts are blocked."""
# Test Unicode path traversal attempts
unicode_attack_folders = [
"notes/文档/../../../etc", # Chinese characters
"docs/café/../../secrets", # Accented characters
"files/αβγ/../../../malicious", # Greek characters
]
for attack_folder in unicode_attack_folders:
result = await write_note.fn(
project=test_project.name,
title="Unicode Attack Test",
folder=attack_folder,
content="# Unicode Attack\nThis should be blocked.",
)
assert isinstance(result, str)
assert "# Error" in result
assert "paths must stay within project boundaries" in result
@pytest.mark.asyncio
async def test_write_note_very_long_attack_folder(self, app, test_project):
"""Test handling of very long attack folder paths."""
# Create a very long path traversal attack
long_attack_folder = "../" * 1000 + "etc/malicious"
result = await write_note.fn(
project=test_project.name,
title="Long Attack Test",
folder=long_attack_folder,
content="# Long Attack\nThis should be blocked.",
)
assert isinstance(result, str)
assert "# Error" in result
assert "paths must stay within project boundaries" in result
@pytest.mark.asyncio
async def test_write_note_case_variations_attacks(self, app, test_project):
"""Test that case variations don't bypass security."""
# Test case variations (though case sensitivity depends on filesystem)
case_attack_folders = [
"../ETC",
"../Etc/SECRETS",
"..\\WINDOWS",
"~/SECRETS",
]
for attack_folder in case_attack_folders:
result = await write_note.fn(
project=test_project.name,
title="Case Variation Attack Test",
folder=attack_folder,
content="# Case Attack\nThis should be blocked.",
)
assert isinstance(result, str)
assert "# Error" in result
assert "paths must stay within project boundaries" in result
@pytest.mark.asyncio
async def test_write_note_whitespace_in_attack_folders(self, app, test_project):
"""Test that whitespace doesn't help bypass security."""
# Test attack folders with various whitespace
whitespace_attack_folders = [
" ../../../etc ",
"\t../../../secrets\t",
" ..\\..\\Windows ",
"notes/ ../../ malicious",
]
for attack_folder in whitespace_attack_folders:
result = await write_note.fn(
project=test_project.name,
title="Whitespace Attack Test",
folder=attack_folder,
content="# Whitespace Attack\nThis should be blocked.",
)
assert isinstance(result, str)
# The attack should still be blocked even with whitespace
if ".." in attack_folder.strip() or "~" in attack_folder.strip():
assert "# Error" in result
assert "paths must stay within project boundaries" in result
```
--------------------------------------------------------------------------------
/specs/SPEC-17 Semantic Search with ChromaDB.md:
--------------------------------------------------------------------------------
```markdown
---
title: 'SPEC-17: Semantic Search with ChromaDB'
type: spec
permalink: specs/spec-17-semantic-search-chromadb
tags:
- search
- chromadb
- semantic-search
- vector-database
- postgres-migration
---
# SPEC-17: Semantic Search with ChromaDB
Why ChromaDB for Knowledge Management
Your users aren't just searching for keywords - they're trying to:
- "Find notes related to this concept"
- "Show me similar ideas"
- "What else did I write about this topic?"
Example:
# User searches: "AI ethics"
# FTS5/MeiliSearch finds:
- "AI ethics guidelines" ✅
- "ethical AI development" ✅
- "artificial intelligence" ❌ No keyword match
# ChromaDB finds:
- "AI ethics guidelines" ✅
- "ethical AI development" ✅
- "artificial intelligence" ✅ Semantic match!
- "bias in ML models" ✅ Related concept
- "responsible technology" ✅ Similar theme
- "neural network fairness" ✅ Connected idea
ChromaDB vs MeiliSearch vs Typesense
| Feature | ChromaDB | MeiliSearch | Typesense |
|------------------|--------------------|--------------------|--------------------|
| Semantic Search | ✅ Excellent | ❌ No | ❌ No |
| Keyword Search | ⚠️ Via metadata | ✅ Excellent | ✅ Excellent |
| Local Deployment | ✅ Embedded mode | ⚠️ Server required | ⚠️ Server required |
| No Server Needed | ✅ YES! | ❌ No | ❌ No |
| Embedding Cost | ~$0.13/1M tokens | None | None |
| Search Speed | 50-200ms | 10-50ms | 10-50ms |
| Best For | Semantic discovery | Exact terms | Exact terms |
The Killer Feature: Embedded Mode
ChromaDB has an embedded client that runs in-process - NO SERVER NEEDED!
# Local (FOSS) - ChromaDB embedded in Python process
import chromadb
client = chromadb.PersistentClient(path="/path/to/chroma_data")
collection = client.get_or_create_collection("knowledge_base")
# Add documents
collection.add(
ids=["note1", "note2"],
documents=["AI ethics", "Neural networks"],
metadatas=[{"type": "note"}, {"type": "spec"}]
)
# Search - NO API calls, runs locally!
results = collection.query(
query_texts=["machine learning"],
n_results=10
)
## Why
### Current Problem: Database Persistence in Cloud
In cloud deployments, `memory.db` (SQLite) doesn't persist across Docker container restarts. This means:
- Database must be rebuilt on every container restart
- Initial sync takes ~49 seconds for 500 files (after optimization in #352)
- Users experience delays on each deployment
### Search Architecture Issues
Current SQLite FTS5 implementation creates a **dual-implementation problem** for PostgreSQL migration:
- FTS5 (SQLite) uses `VIRTUAL TABLE` with `MATCH` queries
- PostgreSQL full-text search uses `TSVECTOR` with `@@` operator
- These are fundamentally incompatible architectures
- Would require **2x search code** and **2x tests** to support both
**Example of incompatibility:**
```python
# SQLite FTS5
"content_stems MATCH :text"
# PostgreSQL
"content_vector @@ plainto_tsquery(:text)"
```
### Search Quality Limitations
Current keyword-based FTS5 has limitations:
- No semantic understanding (search "AI" doesn't find "machine learning")
- No word relationships (search "neural networks" doesn't find "deep learning")
- Limited typo tolerance
- No relevance ranking beyond keyword matching
### Strategic Goal: PostgreSQL Migration
Moving to PostgreSQL (Neon) for cloud deployments would:
- ✅ Solve persistence issues (database survives restarts)
- ✅ Enable multi-tenant architecture
- ✅ Better performance for large datasets
- ✅ Support for cloud-native scaling
**But requires solving the search compatibility problem.**
## What
Migrate from SQLite FTS5 to **ChromaDB** for semantic vector search across all deployments.
**Key insight:** ChromaDB is **database-agnostic** - it works with both SQLite and PostgreSQL, eliminating the dual-implementation problem.
### Affected Areas
- Search implementation (`src/basic_memory/repository/search_repository.py`)
- Search service (`src/basic_memory/services/search_service.py`)
- Search models (`src/basic_memory/models/search.py`)
- Database initialization (`src/basic_memory/db.py`)
- MCP search tools (`src/basic_memory/mcp/tools/search.py`)
- Dependencies (`pyproject.toml` - add ChromaDB)
- Alembic migrations (FTS5 table removal)
- Documentation
### What Changes
**Removed:**
- SQLite FTS5 virtual table
- `MATCH` query syntax
- FTS5-specific tokenization and prefix handling
- ~300 lines of FTS5 query preparation code
**Added:**
- ChromaDB persistent client (embedded mode)
- Vector embedding generation
- Semantic similarity search
- Local embedding model (`sentence-transformers`)
- Collection management for multi-project support
### What Stays the Same
- Search API interface (MCP tools, REST endpoints)
- Entity/Observation/Relation indexing workflow
- Multi-project isolation
- Search filtering by type, date, metadata
- Pagination and result formatting
- **All SQL queries for exact lookups and metadata filtering**
## Hybrid Architecture: SQL + ChromaDB
**Critical Design Decision:** ChromaDB **complements** SQL, it doesn't **replace** it.
### Why Hybrid?
ChromaDB is excellent for semantic text search but terrible for exact lookups. SQL is perfect for exact lookups and structured queries. We use both:
```
┌─────────────────────────────────────────────────┐
│ Search Request │
└─────────────────────────────────────────────────┘
▼
┌────────────────────────┐
│ SearchRepository │
│ (Smart Router) │
└────────────────────────┘
▼ ▼
┌───────────┐ ┌──────────────┐
│ SQL │ │ ChromaDB │
│ Queries │ │ Semantic │
└───────────┘ └──────────────┘
▼ ▼
Exact lookups Text search
- Permalink - Semantic similarity
- Pattern match - Related concepts
- Title exact - Typo tolerance
- Metadata filter - Fuzzy matching
- Date ranges
```
### When to Use Each
#### Use SQL For (Fast & Exact)
**Exact Permalink Lookup:**
```python
# Find by exact permalink - SQL wins
"SELECT * FROM entities WHERE permalink = 'specs/search-feature'"
# ~1ms, perfect for exact matches
# ChromaDB would be: ~50ms, wasteful
```
**Pattern Matching:**
```python
# Find all specs - SQL wins
"SELECT * FROM entities WHERE permalink GLOB 'specs/*'"
# ~5ms, perfect for wildcards
# ChromaDB doesn't support glob patterns
```
**Pure Metadata Queries:**
```python
# Find all meetings tagged "important" - SQL wins
"SELECT * FROM entities
WHERE json_extract(entity_metadata, '$.entity_type') = 'meeting'
AND json_extract(entity_metadata, '$.tags') LIKE '%important%'"
# ~5ms, structured query
# No text search needed, SQL is faster and simpler
```
**Date Filtering:**
```python
# Find recent specs - SQL wins
"SELECT * FROM entities
WHERE entity_type = 'spec'
AND created_at > '2024-01-01'
ORDER BY created_at DESC"
# ~2ms, perfect for structured data
```
#### Use ChromaDB For (Semantic & Fuzzy)
**Semantic Content Search:**
```python
# Find notes about "neural networks" - ChromaDB wins
collection.query(query_texts=["neural networks"])
# Finds: "machine learning", "deep learning", "AI models"
# ~50-100ms, semantic understanding
# SQL FTS5 would only find exact keyword matches
```
**Text Search + Metadata:**
```python
# Find meeting notes about "project planning" tagged "important"
collection.query(
query_texts=["project planning"],
where={
"entity_type": "meeting",
"tags": {"$contains": "important"}
}
)
# ~100ms, semantic search with filters
# Finds: "roadmap discussion", "sprint planning", etc.
```
**Typo Tolerance:**
```python
# User types "serch feature" (typo) - ChromaDB wins
collection.query(query_texts=["serch feature"])
# Still finds: "search feature" documents
# ~50-100ms, fuzzy matching
# SQL would find nothing
```
### Performance Comparison
| Query Type | SQL | ChromaDB | Winner |
|-----------|-----|----------|--------|
| Exact permalink | 1-2ms | 50ms | ✅ SQL |
| Pattern match (specs/*) | 5-10ms | N/A | ✅ SQL |
| Pure metadata filter | 5ms | 50ms | ✅ SQL |
| Semantic text search | ❌ Can't | 50-100ms | ✅ ChromaDB |
| Text + metadata | ❌ Keywords only | 100ms | ✅ ChromaDB |
| Typo tolerance | ❌ Can't | 50ms | ✅ ChromaDB |
### Metadata/Frontmatter Handling
**Both systems support full frontmatter filtering!**
#### SQL Metadata Storage
```python
# Entities table stores frontmatter as JSON
CREATE TABLE entities (
id INTEGER PRIMARY KEY,
title TEXT,
permalink TEXT,
file_path TEXT,
entity_type TEXT,
entity_metadata JSON, -- All frontmatter here!
created_at DATETIME,
...
)
# Query frontmatter fields
SELECT * FROM entities
WHERE json_extract(entity_metadata, '$.entity_type') = 'meeting'
AND json_extract(entity_metadata, '$.tags') LIKE '%important%'
AND json_extract(entity_metadata, '$.status') = 'completed'
```
#### ChromaDB Metadata Storage
```python
# When indexing, store ALL frontmatter as metadata
class ChromaSearchBackend:
async def index_entity(self, entity: Entity):
"""Index with complete frontmatter metadata."""
# Extract ALL frontmatter fields
metadata = {
"entity_id": entity.id,
"project_id": entity.project_id,
"permalink": entity.permalink,
"file_path": entity.file_path,
"entity_type": entity.entity_type,
"type": "entity",
# ALL frontmatter tags
"tags": entity.entity_metadata.get("tags", []),
# Custom frontmatter fields
"status": entity.entity_metadata.get("status"),
"priority": entity.entity_metadata.get("priority"),
# Spread any other custom fields
**{k: v for k, v in entity.entity_metadata.items()
if k not in ["tags", "entity_type"]}
}
self.collection.upsert(
ids=[f"entity_{entity.id}_{entity.project_id}"],
documents=[self._format_document(entity)],
metadatas=[metadata] # Full frontmatter!
)
```
#### ChromaDB Metadata Queries
ChromaDB supports rich filtering:
```python
# Simple filter - single field
collection.query(
query_texts=["project planning"],
where={"entity_type": "meeting"}
)
# Multiple conditions (AND)
collection.query(
query_texts=["architecture decisions"],
where={
"entity_type": "spec",
"tags": {"$contains": "important"}
}
)
# Complex filters with operators
collection.query(
query_texts=["machine learning"],
where={
"$and": [
{"entity_type": {"$in": ["note", "spec"]}},
{"tags": {"$contains": "AI"}},
{"created_at": {"$gt": "2024-01-01"}},
{"status": "in-progress"}
]
}
)
# Multiple tags (all must match)
collection.query(
query_texts=["cloud architecture"],
where={
"$and": [
{"tags": {"$contains": "architecture"}},
{"tags": {"$contains": "cloud"}}
]
}
)
```
### Smart Routing Implementation
```python
class SearchRepository:
def __init__(
self,
session_maker: async_sessionmaker[AsyncSession],
project_id: int,
chroma_backend: ChromaSearchBackend
):
self.sql = session_maker # Keep SQL!
self.chroma = chroma_backend
self.project_id = project_id
async def search(
self,
search_text: Optional[str] = None,
permalink: Optional[str] = None,
permalink_match: Optional[str] = None,
title: Optional[str] = None,
types: Optional[List[str]] = None,
tags: Optional[List[str]] = None,
after_date: Optional[datetime] = None,
custom_metadata: Optional[dict] = None,
limit: int = 10,
offset: int = 0,
) -> List[SearchIndexRow]:
"""Smart routing between SQL and ChromaDB."""
# ==========================================
# Route 1: Exact Lookups → SQL (1-5ms)
# ==========================================
if permalink:
# Exact permalink: "specs/search-feature"
return await self._sql_permalink_lookup(permalink)
if permalink_match:
# Pattern match: "specs/*"
return await self._sql_pattern_match(permalink_match)
if title and not search_text:
# Exact title lookup (no semantic search needed)
return await self._sql_title_match(title)
# ==========================================
# Route 2: Pure Metadata → SQL (5-10ms)
# ==========================================
# No text search, just filtering by metadata
if not search_text and (types or tags or after_date or custom_metadata):
return await self._sql_metadata_filter(
types=types,
tags=tags,
after_date=after_date,
custom_metadata=custom_metadata,
limit=limit,
offset=offset
)
# ==========================================
# Route 3: Text Search → ChromaDB (50-100ms)
# ==========================================
if search_text:
# Build ChromaDB metadata filters
where_filters = self._build_chroma_filters(
types=types,
tags=tags,
after_date=after_date,
custom_metadata=custom_metadata
)
# Semantic search with metadata filtering
return await self.chroma.search(
query_text=search_text,
project_id=self.project_id,
where=where_filters,
limit=limit
)
# ==========================================
# Route 4: List All → SQL (2-5ms)
# ==========================================
return await self._sql_list_entities(
limit=limit,
offset=offset
)
def _build_chroma_filters(
self,
types: Optional[List[str]] = None,
tags: Optional[List[str]] = None,
after_date: Optional[datetime] = None,
custom_metadata: Optional[dict] = None
) -> dict:
"""Build ChromaDB where clause from filters."""
filters = {"project_id": self.project_id}
# Type filtering
if types:
if len(types) == 1:
filters["entity_type"] = types[0]
else:
filters["entity_type"] = {"$in": types}
# Tag filtering (array contains)
if tags:
if len(tags) == 1:
filters["tags"] = {"$contains": tags[0]}
else:
# Multiple tags - all must match
filters = {
"$and": [
filters,
*[{"tags": {"$contains": tag}} for tag in tags]
]
}
# Date filtering
if after_date:
filters["created_at"] = {"$gt": after_date.isoformat()}
# Custom frontmatter fields
if custom_metadata:
filters.update(custom_metadata)
return filters
async def _sql_metadata_filter(
self,
types: Optional[List[str]] = None,
tags: Optional[List[str]] = None,
after_date: Optional[datetime] = None,
custom_metadata: Optional[dict] = None,
limit: int = 10,
offset: int = 0
) -> List[SearchIndexRow]:
"""Pure metadata queries using SQL."""
conditions = ["project_id = :project_id"]
params = {"project_id": self.project_id}
if types:
type_list = ", ".join(f"'{t}'" for t in types)
conditions.append(f"entity_type IN ({type_list})")
if tags:
# Check each tag
for i, tag in enumerate(tags):
param_name = f"tag_{i}"
conditions.append(
f"json_extract(entity_metadata, '$.tags') LIKE :{param_name}"
)
params[param_name] = f"%{tag}%"
if after_date:
conditions.append("created_at > :after_date")
params["after_date"] = after_date
if custom_metadata:
for key, value in custom_metadata.items():
param_name = f"meta_{key}"
conditions.append(
f"json_extract(entity_metadata, '$.{key}') = :{param_name}"
)
params[param_name] = value
where = " AND ".join(conditions)
sql = f"""
SELECT * FROM entities
WHERE {where}
ORDER BY created_at DESC
LIMIT :limit OFFSET :offset
"""
params["limit"] = limit
params["offset"] = offset
async with db.scoped_session(self.session_maker) as session:
result = await session.execute(text(sql), params)
return self._format_sql_results(result)
```
### Real-World Examples
#### Example 1: Pure Metadata Query (No Text)
```python
# "Find all meetings tagged 'important'"
results = await search_repo.search(
types=["meeting"],
tags=["important"]
)
# Routing: → SQL (~5ms)
# SQL: SELECT * FROM entities
# WHERE entity_type = 'meeting'
# AND json_extract(entity_metadata, '$.tags') LIKE '%important%'
```
#### Example 2: Semantic Search (No Metadata)
```python
# "Find notes about neural networks"
results = await search_repo.search(
search_text="neural networks"
)
# Routing: → ChromaDB (~80ms)
# Finds: "machine learning", "deep learning", "AI models", etc.
```
#### Example 3: Semantic + Metadata
```python
# "Find meeting notes about 'project planning' tagged 'important'"
results = await search_repo.search(
search_text="project planning",
types=["meeting"],
tags=["important"]
)
# Routing: → ChromaDB with filters (~100ms)
# ChromaDB: query_texts=["project planning"]
# where={"entity_type": "meeting",
# "tags": {"$contains": "important"}}
# Finds: "roadmap discussion", "sprint planning", etc.
```
#### Example 4: Complex Frontmatter Query
```python
# "Find in-progress specs with multiple tags, recent"
results = await search_repo.search(
types=["spec"],
tags=["architecture", "cloud"],
after_date=datetime(2024, 1, 1),
custom_metadata={"status": "in-progress"}
)
# Routing: → SQL (~10ms)
# No text search, pure structured query - SQL is faster
```
#### Example 5: Semantic + Complex Metadata
```python
# "Find notes about 'authentication' that are in-progress"
results = await search_repo.search(
search_text="authentication",
custom_metadata={"status": "in-progress", "priority": "high"}
)
# Routing: → ChromaDB with metadata filters (~100ms)
# Semantic search for "authentication" concept
# Filters by status and priority in metadata
```
#### Example 6: Exact Permalink
```python
# "Show me specs/search-feature"
results = await search_repo.search(
permalink="specs/search-feature"
)
# Routing: → SQL (~1ms)
# SQL: SELECT * FROM entities WHERE permalink = 'specs/search-feature'
```
#### Example 7: Pattern Match
```python
# "Show me all specs"
results = await search_repo.search(
permalink_match="specs/*"
)
# Routing: → SQL (~5ms)
# SQL: SELECT * FROM entities WHERE permalink GLOB 'specs/*'
```
### What We Remove vs Keep
**REMOVE (FTS5-specific):**
- ❌ `CREATE VIRTUAL TABLE search_index USING fts5(...)`
- ❌ `MATCH` operator queries
- ❌ FTS5 tokenization configuration
- ❌ ~300 lines of FTS5 query preparation code
- ❌ Trigram generation and prefix handling
**KEEP (Standard SQL):**
- ✅ `SELECT * FROM entities WHERE permalink = :permalink`
- ✅ `SELECT * FROM entities WHERE permalink GLOB :pattern`
- ✅ `SELECT * FROM entities WHERE title LIKE :title`
- ✅ `SELECT * FROM entities WHERE json_extract(entity_metadata, ...) = :value`
- ✅ All date filtering, pagination, sorting
- ✅ Entity table structure and indexes
**ADD (ChromaDB):**
- ✅ ChromaDB persistent client (embedded)
- ✅ Semantic vector search
- ✅ Metadata filtering in ChromaDB
- ✅ Smart routing logic
## How (High Level)
### Architecture Overview
```
┌─────────────────────────────────────────────────────────────┐
│ FOSS Deployment (Local) │
├─────────────────────────────────────────────────────────────┤
│ SQLite (data) + ChromaDB embedded (search) │
│ - No external services │
│ - Local embedding model (sentence-transformers) │
│ - Persists in ~/.basic-memory/chroma_data/ │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ Cloud Deployment (Multi-tenant) │
├─────────────────────────────────────────────────────────────┤
│ PostgreSQL/Neon (data) + ChromaDB server (search) │
│ - Neon serverless Postgres for persistence │
│ - ChromaDB server in Docker container │
│ - Optional: OpenAI embeddings for better quality │
└─────────────────────────────────────────────────────────────┘
```
### Phase 1: ChromaDB Integration (2-3 days)
#### 1. Add ChromaDB Dependency
```toml
# pyproject.toml
dependencies = [
"chromadb>=0.4.0",
"sentence-transformers>=2.2.0", # Local embeddings
]
```
#### 2. Create ChromaSearchBackend
```python
# src/basic_memory/search/chroma_backend.py
from chromadb import PersistentClient
from chromadb.utils import embedding_functions
class ChromaSearchBackend:
def __init__(
self,
persist_directory: Path,
collection_name: str = "knowledge_base",
embedding_model: str = "all-MiniLM-L6-v2"
):
"""Initialize ChromaDB with local embeddings."""
self.client = PersistentClient(path=str(persist_directory))
# Use local sentence-transformers model (no API costs)
self.embed_fn = embedding_functions.SentenceTransformerEmbeddingFunction(
model_name=embedding_model
)
self.collection = self.client.get_or_create_collection(
name=collection_name,
embedding_function=self.embed_fn,
metadata={"hnsw:space": "cosine"} # Similarity metric
)
async def index_entity(self, entity: Entity):
"""Index entity with automatic embeddings."""
# Combine title and content for semantic search
document = self._format_document(entity)
self.collection.upsert(
ids=[f"entity_{entity.id}_{entity.project_id}"],
documents=[document],
metadatas=[{
"entity_id": entity.id,
"project_id": entity.project_id,
"permalink": entity.permalink,
"file_path": entity.file_path,
"entity_type": entity.entity_type,
"type": "entity",
}]
)
async def search(
self,
query_text: str,
project_id: int,
limit: int = 10,
filters: dict = None
) -> List[SearchResult]:
"""Semantic search with metadata filtering."""
where = {"project_id": project_id}
if filters:
where.update(filters)
results = self.collection.query(
query_texts=[query_text],
n_results=limit,
where=where
)
return self._format_results(results)
```
#### 3. Update SearchRepository
```python
# src/basic_memory/repository/search_repository.py
class SearchRepository:
def __init__(
self,
session_maker: async_sessionmaker[AsyncSession],
project_id: int,
chroma_backend: ChromaSearchBackend
):
self.session_maker = session_maker
self.project_id = project_id
self.chroma = chroma_backend
async def search(
self,
search_text: Optional[str] = None,
permalink: Optional[str] = None,
# ... other filters
) -> List[SearchIndexRow]:
"""Search using ChromaDB for text, SQL for exact lookups."""
# For exact permalink/pattern matches, use SQL
if permalink or permalink_match:
return await self._sql_exact_search(...)
# For text search, use ChromaDB semantic search
if search_text:
results = await self.chroma.search(
query_text=search_text,
project_id=self.project_id,
limit=limit,
filters=self._build_filters(types, after_date, ...)
)
return results
# Fallback to listing all
return await self._list_entities(...)
```
#### 4. Update SearchService
```python
# src/basic_memory/services/search_service.py
class SearchService:
def __init__(
self,
search_repository: SearchRepository,
entity_repository: EntityRepository,
file_service: FileService,
chroma_backend: ChromaSearchBackend,
):
self.repository = search_repository
self.entity_repository = entity_repository
self.file_service = file_service
self.chroma = chroma_backend
async def index_entity(self, entity: Entity):
"""Index entity in ChromaDB."""
if entity.is_markdown:
await self._index_entity_markdown(entity)
else:
await self._index_entity_file(entity)
async def _index_entity_markdown(self, entity: Entity):
"""Index markdown entity with full content."""
# Index entity
await self.chroma.index_entity(entity)
# Index observations (as separate documents)
for obs in entity.observations:
await self.chroma.index_observation(obs, entity)
# Index relations (metadata only)
for rel in entity.outgoing_relations:
await self.chroma.index_relation(rel, entity)
```
### Phase 2: PostgreSQL Support (1 day)
#### 1. Add PostgreSQL Database Type
```python
# src/basic_memory/db.py
class DatabaseType(Enum):
MEMORY = auto()
FILESYSTEM = auto()
POSTGRESQL = auto() # NEW
@classmethod
def get_db_url(cls, db_path_or_url: str, db_type: "DatabaseType") -> str:
if db_type == cls.POSTGRESQL:
return db_path_or_url # Neon connection string
elif db_type == cls.MEMORY:
return "sqlite+aiosqlite://"
return f"sqlite+aiosqlite:///{db_path_or_url}"
```
#### 2. Update Connection Handling
```python
def _create_engine_and_session(...):
db_url = DatabaseType.get_db_url(db_path_or_url, db_type)
if db_type == DatabaseType.POSTGRESQL:
# Use asyncpg driver for Postgres
engine = create_async_engine(
db_url,
pool_size=10,
max_overflow=20,
pool_pre_ping=True, # Health checks
)
else:
# SQLite configuration
engine = create_async_engine(db_url, connect_args=connect_args)
# Only configure SQLite-specific settings for SQLite
if db_type != DatabaseType.MEMORY:
@event.listens_for(engine.sync_engine, "connect")
def enable_wal_mode(dbapi_conn, connection_record):
_configure_sqlite_connection(dbapi_conn, enable_wal=True)
return engine, async_sessionmaker(engine, expire_on_commit=False)
```
#### 3. Remove SQLite-Specific Code
```python
# Remove from scoped_session context manager:
# await session.execute(text("PRAGMA foreign_keys=ON")) # DELETE
# PostgreSQL handles foreign keys by default
```
### Phase 3: Migration & Testing (1-2 days)
#### 1. Create Migration Script
```python
# scripts/migrate_to_chromadb.py
async def migrate_fts5_to_chromadb():
"""One-time migration from FTS5 to ChromaDB."""
# 1. Read all entities from database
entities = await entity_repository.find_all()
# 2. Index in ChromaDB
for entity in entities:
await search_service.index_entity(entity)
# 3. Drop FTS5 table (Alembic migration)
await session.execute(text("DROP TABLE IF EXISTS search_index"))
```
#### 2. Update Tests
- Replace FTS5 test fixtures with ChromaDB fixtures
- Test semantic search quality
- Test multi-project isolation in ChromaDB
- Benchmark performance vs FTS5
#### 3. Documentation Updates
- Update search documentation
- Add ChromaDB configuration guide
- Document embedding model options
- PostgreSQL deployment guide
### Configuration
```python
# config.py
class BasicMemoryConfig:
# Database
database_type: DatabaseType = DatabaseType.FILESYSTEM
database_path: Path = Path.home() / ".basic-memory" / "memory.db"
database_url: Optional[str] = None # For Postgres: postgresql://...
# Search
chroma_persist_directory: Path = Path.home() / ".basic-memory" / "chroma_data"
embedding_model: str = "all-MiniLM-L6-v2" # Local model
embedding_provider: str = "local" # or "openai"
openai_api_key: Optional[str] = None # For cloud deployments
```
### Deployment Configurations
#### Local (FOSS)
```yaml
# Default configuration
database_type: FILESYSTEM
database_path: ~/.basic-memory/memory.db
chroma_persist_directory: ~/.basic-memory/chroma_data
embedding_model: all-MiniLM-L6-v2
embedding_provider: local
```
#### Cloud (Docker Compose)
```yaml
services:
postgres:
image: postgres:15
environment:
POSTGRES_DB: basic_memory
POSTGRES_PASSWORD: ${DB_PASSWORD}
chromadb:
image: chromadb/chroma:latest
volumes:
- chroma_data:/chroma/chroma
environment:
ALLOW_RESET: true
app:
environment:
DATABASE_TYPE: POSTGRESQL
DATABASE_URL: postgresql://postgres:${DB_PASSWORD}@postgres/basic_memory
CHROMA_HOST: chromadb
CHROMA_PORT: 8000
EMBEDDING_PROVIDER: local # or openai
```
## How to Evaluate
### Success Criteria
#### Functional Requirements
- ✅ Semantic search finds related concepts (e.g., "AI" finds "machine learning")
- ✅ Exact permalink/pattern matches work (e.g., `specs/*`)
- ✅ Multi-project isolation maintained
- ✅ All existing search filters work (type, date, metadata)
- ✅ MCP tools continue to work without changes
- ✅ Works with both SQLite and PostgreSQL
#### Performance Requirements
- ✅ Search latency < 200ms for 1000 documents (local embedding)
- ✅ Indexing time comparable to FTS5 (~10 files/sec)
- ✅ Initial sync time not significantly worse than current
- ✅ Memory footprint < 1GB for local deployments
#### Quality Requirements
- ✅ Better search relevance than FTS5 keyword matching
- ✅ Handles typos and word variations
- ✅ Finds semantically similar content
#### Deployment Requirements
- ✅ FOSS: Works out-of-box with no external services
- ✅ Cloud: Integrates with PostgreSQL (Neon)
- ✅ No breaking changes to MCP API
- ✅ Migration script for existing users
### Testing Procedure
#### 1. Unit Tests
```bash
# Test ChromaDB backend
pytest tests/test_chroma_backend.py
# Test search repository with ChromaDB
pytest tests/test_search_repository.py
# Test search service
pytest tests/test_search_service.py
```
#### 2. Integration Tests
```bash
# Test full search workflow
pytest test-int/test_search_integration.py
# Test with PostgreSQL
DATABASE_TYPE=POSTGRESQL pytest test-int/
```
#### 3. Semantic Search Quality Tests
```python
# Test semantic similarity
search("machine learning") should find:
- "neural networks"
- "deep learning"
- "AI algorithms"
search("software architecture") should find:
- "system design"
- "design patterns"
- "microservices"
```
#### 4. Performance Benchmarks
```bash
# Run search benchmarks
pytest test-int/test_search_performance.py -v
# Measure:
- Search latency (should be < 200ms)
- Indexing throughput (should be ~10 files/sec)
- Memory usage (should be < 1GB)
```
#### 5. Migration Testing
```bash
# Test migration from FTS5 to ChromaDB
python scripts/migrate_to_chromadb.py
# Verify all entities indexed
# Verify search results quality
# Verify no data loss
```
### Metrics
**Search Quality:**
- Semantic relevance score (manual evaluation)
- Precision/recall for common queries
- User satisfaction (qualitative)
**Performance:**
- Average search latency (ms)
- P95/P99 search latency
- Indexing throughput (files/sec)
- Memory usage (MB)
**Deployment:**
- Local deployment success rate
- Cloud deployment success rate
- Migration success rate
## Implementation Checklist
### Phase 1: ChromaDB Integration
- [ ] Add ChromaDB and sentence-transformers dependencies
- [ ] Create ChromaSearchBackend class
- [ ] Update SearchRepository to use ChromaDB
- [ ] Update SearchService indexing methods
- [ ] Remove FTS5 table creation code
- [ ] Update search query logic
- [ ] Add ChromaDB configuration to BasicMemoryConfig
### Phase 2: PostgreSQL Support
- [ ] Add DatabaseType.POSTGRESQL enum
- [ ] Update get_db_url() for Postgres connection strings
- [ ] Add asyncpg dependency
- [ ] Update engine creation for Postgres
- [ ] Remove SQLite-specific PRAGMA statements
- [ ] Test with Neon database
### Phase 3: Testing & Migration
- [ ] Write unit tests for ChromaSearchBackend
- [ ] Update search integration tests
- [ ] Add semantic search quality tests
- [ ] Create performance benchmarks
- [ ] Write migration script from FTS5
- [ ] Test migration with existing data
- [ ] Update documentation
### Phase 4: Deployment
- [ ] Update docker-compose.yml for cloud
- [ ] Document local FOSS deployment
- [ ] Document cloud PostgreSQL deployment
- [ ] Create migration guide for users
- [ ] Update MCP tool documentation
## Notes
### Embedding Model Trade-offs
**Local Model: `all-MiniLM-L6-v2`**
- Size: 80MB download
- Speed: ~50ms embedding time
- Dimensions: 384
- Cost: $0
- Quality: Good for general knowledge
- Best for: FOSS deployments
**OpenAI: `text-embedding-3-small`**
- Speed: ~100-200ms (API call)
- Dimensions: 1536
- Cost: ~$0.13 per 1M tokens (~$0.01 per 1000 notes)
- Quality: Excellent
- Best for: Cloud deployments with budget
### ChromaDB Storage
ChromaDB stores data in:
```
~/.basic-memory/chroma_data/
├── chroma.sqlite3 # Metadata
├── index/ # HNSW indexes
└── collections/ # Vector data
```
Typical sizes:
- 100 notes: ~5MB
- 1000 notes: ~50MB
- 10000 notes: ~500MB
### Why Not Keep FTS5?
**Considered:** Hybrid approach (FTS5 for SQLite + tsvector for Postgres)
**Rejected because:**
- 2x the code to maintain
- 2x the tests to write
- 2x the bugs to fix
- Inconsistent search behavior between deployments
- ChromaDB provides better search quality anyway
**ChromaDB wins:**
- One implementation for both databases
- Better search quality (semantic!)
- Database-agnostic architecture
- Embedded mode for FOSS (no servers needed)
## implementation
Proposed Architecture
Option 1: ChromaDB Only (Simplest)
class ChromaSearchBackend:
def __init__(self, path: str, embedding_model: str = "all-MiniLM-L6-v2"):yes
# For local: embedded client (no server!)
self.client = chromadb.PersistentClient(path=path)
# Use local embedding model (no API costs!)
from chromadb.utils import embedding_functions
self.embed_fn = embedding_functions.SentenceTransformerEmbeddingFunction(
model_name=embedding_model
)
self.collection = self.client.get_or_create_collection(
name="knowledge_base",
embedding_function=self.embed_fn
)
async def index_entity(self, entity: Entity):
# ChromaDB handles embeddings automatically!
self.collection.upsert(
ids=[str(entity.id)],
documents=[f"{entity.title}\n{entity.content}"],
metadatas=[{
"permalink": entity.permalink,
"type": entity.entity_type,
"file_path": entity.file_path
}]
)
async def search(self, query: str, filters: dict = None):
# Semantic search with optional metadata filters
results = self.collection.query(
query_texts=[query],
n_results=10,
where=filters # e.g., {"type": "note"}
)
return results
Deployment:
- Local (FOSS): ChromaDB embedded, local embedding model, NO servers
- Cloud: ChromaDB server OR still embedded (it's just a Python lib!)
Option 2: Hybrid FTS + ChromaDB (Best UX)
class HybridSearchBackend:
def __init__(self):
self.fts = SQLiteFTS5Backend() # Fast keyword search
self.chroma = ChromaSearchBackend() # Semantic search
async def search(self, query: str, search_type: str = "auto"):
if search_type == "exact":
# User wants exact match: "specs/search-feature"
return await self.fts.search(query)
elif search_type == "semantic":
# User wants related concepts
return await self.chroma.search(query)
else: # "auto"
# Check if query looks like exact match
if "/" in query or query.startswith('"'):
return await self.fts.search(query)
# Otherwise use semantic search
return await self.chroma.search(query)
Embedding Options
Option A: Local Model (FREE, FOSS-friendly)
# Uses sentence-transformers (runs locally)
# Model: ~100MB download
# Speed: ~50-100ms for embedding
# Cost: $0
from chromadb.utils import embedding_functions
embed_fn = embedding_functions.SentenceTransformerEmbeddingFunction(
model_name="all-MiniLM-L6-v2" # Fast, accurate, free
)
Option B: OpenAI Embeddings (Cloud only)
# For cloud users who want best quality
# Model: text-embedding-3-small
# Speed: ~100-200ms via API
# Cost: ~$0.13 per 1M tokens (~$0.01 per 1000 notes)
embed_fn = embedding_functions.OpenAIEmbeddingFunction(
api_key="...",
model_name="text-embedding-3-small"
)
Performance Comparison
Local embedding model: all-MiniLM-L6-v2
Embedding time: ~50ms per note
Search time: ~100ms for 1000 notes
Memory: ~500MB (model + ChromaDB)
Cost: $0
Quality: Good (384 dimensions)
OpenAI embeddings: text-embedding-3-small
Embedding time: ~100-200ms per note (API call)
Search time: ~50ms for 1000 notes
Cost: ~$0.01 per 1000 notes
Quality: Excellent (1536 dimensions)
My Recommendation: ChromaDB with Local Embeddings
Here's the plan:
Phase 1: Local ChromaDB (1-2 days)
# FOSS version
- SQLite for data persistence
- ChromaDB embedded for semantic search
- Local embedding model (no API costs)
- NO external services required
Benefits:
- ✅ Same deployment as current (just Python package)
- ✅ Semantic search for better UX
- ✅ Free embeddings with local model
- ✅ No servers needed
Phase 2: Postgres + ChromaDB Cloud (1-2 days)
# Cloud version
- Postgres for data persistence
- ChromaDB server for semantic search
- OpenAI embeddings (higher quality)
- OR keep local embeddings (cheaper)
Phase 3: Hybrid Search (optional, 1 day)
# Add FTS for exact matches alongside ChromaDB
- Quick keyword search when needed
- Semantic search for exploration
- Best of both worlds
Code Estimate
Just ChromaDB (replacing FTS5):
- Remove FTS5 code: 2 hours
- Add ChromaDB backend: 4 hours
- Update search service: 2 hours
- Testing: 4 hours
- Total: 1.5 days
ChromaDB + Postgres migration:
- Add Postgres support: 4 hours
- Test with Neon: 2 hours
- Total: +0.75 days
Grand total: 2-3 days for complete migration
The Kicker
ChromaDB solves BOTH problems:
1. ✅ Works with SQLite AND Postgres (it's separate!)
2. ✅ No server needed for local (embedded mode)
3. ✅ Better search than FTS5 (semantic!)
4. ✅ One implementation for both deployments
Want me to prototype this? I can show you:
1. ChromaDB embedded with local embeddings
2. Example searches showing semantic matching
3. Performance benchmarks
4. Migration from FTS5
## Observations
- [problem] SQLite FTS5 and PostgreSQL tsvector are incompatible architectures requiring dual implementation #database-compatibility
- [problem] Cloud deployments lose database on container restart requiring full re-sync #persistence
- [solution] ChromaDB provides database-agnostic semantic search eliminating dual implementation #architecture
- [advantage] Semantic search finds related concepts beyond keyword matching improving UX #search-quality
- [deployment] Embedded ChromaDB requires no external services for FOSS #simplicity
- [migration] Moving to PostgreSQL solves cloud persistence issues #cloud-architecture
- [performance] Local embedding models provide good quality at zero cost #cost-optimization
- [trade-off] Embedding generation adds ~50ms latency vs instant FTS5 indexing #performance
- [benefit] Single search codebase reduces maintenance burden and test coverage needs #maintainability
## Prior Art / References
### Community Fork: manuelbliemel/basic-memory (feature/vector-search)
**Repository**: https://github.com/manuelbliemel/basic-memory/tree/feature/vector-search
**Key Implementation Details**:
**Vector Database**: ChromaDB (same as our approach!)
**Embedding Models**:
- Local: `all-MiniLM-L6-v2` (default, 384 dims) - same model we planned
- Also supports: `all-mpnet-base-v2`, `paraphrase-MiniLM-L6-v2`, `multi-qa-MiniLM-L6-cos-v1`
- OpenAI: `text-embedding-ada-002`, `text-embedding-3-small`, `text-embedding-3-large`
**Chunking Strategy** (interesting - we didn't consider this):
- Chunk Size: 500 characters
- Chunk Overlap: 50 characters
- Breaks documents into smaller pieces for better semantic search
**Search Strategies**:
1. `fuzzy_only` (default) - FTS5 only
2. `vector_only` - ChromaDB only
3. `hybrid` (recommended) - Both FTS5 + ChromaDB
4. `fuzzy_primary` - FTS5 first, ChromaDB fallback
5. `vector_primary` - ChromaDB first, FTS5 fallback
**Configuration**:
- Similarity Threshold: 0.1
- Max Results: 5
- Storage: `~/.basic-memory/chroma/`
- Config: `~/.basic-memory/config.json`
**Key Differences from Our Approach**:
| Aspect | Their Approach | Our Approach |
|--------|---------------|--------------|
| FTS5 | Keep FTS5 + add ChromaDB | Remove FTS5, use SQL for exact lookups |
| Search Strategy | 5 configurable strategies | Smart routing (automatic) |
| Document Processing | Chunk into 500-char pieces | Index full documents |
| Hybrid Mode | Run both, merge, dedupe | Route to best backend |
| Configuration | User-configurable strategy | Automatic based on query type |
**What We Can Learn**:
1. **Chunking**: Breaking documents into 500-character chunks with 50-char overlap may improve semantic search quality for long documents
- Pro: Better granularity for semantic matching
- Con: More vectors to store and search
- Consider: Optional chunking for large documents (>2000 chars)
2. **Configurable Strategies**: Allowing users to choose search strategy provides flexibility
- Pro: Power users can tune behavior
- Con: More complexity, most users won't configure
- Consider: Default to smart routing, allow override via config
3. **Similarity Threshold**: They use 0.1 as default
- Consider: Benchmark different thresholds for quality
4. **Storage Location**: `~/.basic-memory/chroma/` matches our planned `chroma_data/` approach
**Potential Collaboration**:
- Their implementation is nearly complete as a fork
- Could potentially merge their work or use as reference implementation
- Their chunking strategy could be valuable addition to our approach
## Relations
- implements [[SPEC-11 Basic Memory API Performance Optimization]]
- relates_to [[Performance Optimizations Documentation]]
- enables [[PostgreSQL Migration]]
- improves_on [[SQLite FTS5 Search]]
- references [[manuelbliemel/basic-memory feature/vector-search fork]]
```
--------------------------------------------------------------------------------
/tests/api/test_knowledge_router.py:
--------------------------------------------------------------------------------
```python
"""Tests for knowledge graph API routes."""
from urllib.parse import quote
import pytest
from httpx import AsyncClient
from basic_memory.schemas import (
Entity,
EntityResponse,
)
from basic_memory.schemas.search import SearchItemType, SearchResponse
from basic_memory.utils import normalize_newlines
@pytest.mark.asyncio
async def test_create_entity(client: AsyncClient, file_service, project_url):
"""Should create entity successfully."""
data = {
"title": "TestEntity",
"folder": "test",
"entity_type": "test",
"content": "TestContent",
"project": "Test Project Context",
}
# Create an entity
print(f"Requesting with data: {data}")
# Use the permalink version of the project name in the path
response = await client.post(f"{project_url}/knowledge/entities", json=data)
# Print response for debugging
print(f"Response status: {response.status_code}")
print(f"Response content: {response.text}")
# Verify creation
assert response.status_code == 200
entity = EntityResponse.model_validate(response.json())
assert entity.permalink == "test/test-entity"
assert entity.file_path == "test/TestEntity.md"
assert entity.entity_type == data["entity_type"]
assert entity.content_type == "text/markdown"
# Verify file has new content but preserved metadata
file_path = file_service.get_entity_path(entity)
file_content, _ = await file_service.read_file(file_path)
assert data["content"] in file_content
@pytest.mark.asyncio
async def test_create_entity_observations_relations(client: AsyncClient, file_service, project_url):
"""Should create entity successfully."""
data = {
"title": "TestEntity",
"folder": "test",
"content": """
# TestContent
## Observations
- [note] This is notable #tag1 (testing)
- related to [[SomeOtherThing]]
""",
}
# Create an entity
response = await client.post(f"{project_url}/knowledge/entities", json=data)
# Verify creation
assert response.status_code == 200
entity = EntityResponse.model_validate(response.json())
assert entity.permalink == "test/test-entity"
assert entity.file_path == "test/TestEntity.md"
assert entity.entity_type == "note"
assert entity.content_type == "text/markdown"
assert len(entity.observations) == 1
assert entity.observations[0].category == "note"
assert entity.observations[0].content == "This is notable #tag1"
assert entity.observations[0].tags == ["tag1"]
assert entity.observations[0].context == "testing"
assert len(entity.relations) == 1
assert entity.relations[0].relation_type == "related to"
assert entity.relations[0].from_id == "test/test-entity"
assert entity.relations[0].to_id is None
# Verify file has new content but preserved metadata
file_path = file_service.get_entity_path(entity)
file_content, _ = await file_service.read_file(file_path)
assert data["content"].strip() in file_content
@pytest.mark.asyncio
async def test_relation_resolution_after_creation(client: AsyncClient, project_url):
"""Test that relation resolution works after creating entities and handles exceptions gracefully."""
# Create first entity with unresolved relation
entity1_data = {
"title": "EntityOne",
"folder": "test",
"entity_type": "test",
"content": "This entity references [[EntityTwo]]",
}
response1 = await client.put(
f"{project_url}/knowledge/entities/test/entity-one", json=entity1_data
)
assert response1.status_code == 201
entity1 = response1.json()
# Verify relation exists but is unresolved
assert len(entity1["relations"]) == 1
assert entity1["relations"][0]["to_id"] is None
assert entity1["relations"][0]["to_name"] == "EntityTwo"
# Create the referenced entity
entity2_data = {
"title": "EntityTwo",
"folder": "test",
"entity_type": "test",
"content": "This is the referenced entity",
}
response2 = await client.put(
f"{project_url}/knowledge/entities/test/entity-two", json=entity2_data
)
assert response2.status_code == 201
# Verify the original entity's relation was resolved
response_check = await client.get(f"{project_url}/knowledge/entities/test/entity-one")
assert response_check.status_code == 200
updated_entity1 = response_check.json()
# The relation should now be resolved via the automatic resolution after entity creation
resolved_relations = [r for r in updated_entity1["relations"] if r["to_id"] is not None]
assert (
len(resolved_relations) >= 0
) # May or may not be resolved immediately depending on timing
@pytest.mark.asyncio
async def test_relation_resolution_exception_handling(client: AsyncClient, project_url):
"""Test that relation resolution exceptions are handled gracefully."""
import unittest.mock
# Create an entity that would trigger relation resolution
entity_data = {
"title": "ExceptionTest",
"folder": "test",
"entity_type": "test",
"content": "This entity has a [[Relation]]",
}
# Mock the sync service to raise an exception during relation resolution
# We'll patch at the module level where it's imported
with unittest.mock.patch(
"basic_memory.api.routers.knowledge_router.SyncServiceDep",
side_effect=lambda: unittest.mock.AsyncMock(),
) as mock_sync_service_dep:
# Configure the mock sync service to raise an exception
mock_sync_service = unittest.mock.AsyncMock()
mock_sync_service.resolve_relations.side_effect = Exception("Sync service failed")
mock_sync_service_dep.return_value = mock_sync_service
# This should still succeed even though relation resolution fails
response = await client.put(
f"{project_url}/knowledge/entities/test/exception-test", json=entity_data
)
assert response.status_code == 201
entity = response.json()
# Verify the entity was still created successfully
assert entity["title"] == "ExceptionTest"
assert len(entity["relations"]) == 1 # Relation should still be there, just unresolved
@pytest.mark.asyncio
async def test_get_entity_by_permalink(client: AsyncClient, project_url):
"""Should retrieve an entity by path ID."""
# First create an entity
data = {"title": "TestEntity", "folder": "test", "entity_type": "test"}
response = await client.post(f"{project_url}/knowledge/entities", json=data)
assert response.status_code == 200
data = response.json()
# Now get it by permalink
permalink = data["permalink"]
response = await client.get(f"{project_url}/knowledge/entities/{permalink}")
# Verify retrieval
assert response.status_code == 200
entity = response.json()
assert entity["title"] == "TestEntity"
assert entity["file_path"] == "test/TestEntity.md"
assert entity["entity_type"] == "test"
assert entity["permalink"] == "test/test-entity"
@pytest.mark.asyncio
async def test_get_entity_by_file_path(client: AsyncClient, project_url):
"""Should retrieve an entity by path ID."""
# First create an entity
data = {"title": "TestEntity", "folder": "test", "entity_type": "test"}
response = await client.post(f"{project_url}/knowledge/entities", json=data)
assert response.status_code == 200
data = response.json()
# Now get it by path
file_path = data["file_path"]
response = await client.get(f"{project_url}/knowledge/entities/{file_path}")
# Verify retrieval
assert response.status_code == 200
entity = response.json()
assert entity["title"] == "TestEntity"
assert entity["file_path"] == "test/TestEntity.md"
assert entity["entity_type"] == "test"
assert entity["permalink"] == "test/test-entity"
@pytest.mark.asyncio
async def test_get_entities(client: AsyncClient, project_url):
"""Should open multiple entities by path IDs."""
# Create a few entities with different names
await client.post(
f"{project_url}/knowledge/entities",
json={"title": "AlphaTest", "folder": "", "entity_type": "test"},
)
await client.post(
f"{project_url}/knowledge/entities",
json={"title": "BetaTest", "folder": "", "entity_type": "test"},
)
# Open nodes by path IDs
response = await client.get(
f"{project_url}/knowledge/entities?permalink=alpha-test&permalink=beta-test",
)
# Verify results
assert response.status_code == 200
data = response.json()
assert len(data["entities"]) == 2
entity_0 = data["entities"][0]
assert entity_0["title"] == "AlphaTest"
assert entity_0["file_path"] == "AlphaTest.md"
assert entity_0["entity_type"] == "test"
assert entity_0["permalink"] == "alpha-test"
entity_1 = data["entities"][1]
assert entity_1["title"] == "BetaTest"
assert entity_1["file_path"] == "BetaTest.md"
assert entity_1["entity_type"] == "test"
assert entity_1["permalink"] == "beta-test"
@pytest.mark.asyncio
async def test_delete_entity(client: AsyncClient, project_url):
"""Test DELETE /knowledge/entities with path ID."""
# Create test entity
entity_data = {"file_path": "TestEntity", "entity_type": "test"}
await client.post(f"{project_url}/knowledge/entities", json=entity_data)
# Test deletion
response = await client.post(
f"{project_url}/knowledge/entities/delete", json={"permalinks": ["test-entity"]}
)
assert response.status_code == 200
assert response.json() == {"deleted": True}
# Verify entity is gone
permalink = quote("test/TestEntity")
response = await client.get(f"{project_url}/knowledge/entities/{permalink}")
assert response.status_code == 404
@pytest.mark.asyncio
async def test_delete_single_entity(client: AsyncClient, project_url):
"""Test DELETE /knowledge/entities with path ID."""
# Create test entity
entity_data = {"title": "TestEntity", "folder": "", "entity_type": "test"}
await client.post(f"{project_url}/knowledge/entities", json=entity_data)
# Test deletion
response = await client.delete(f"{project_url}/knowledge/entities/test-entity")
assert response.status_code == 200
assert response.json() == {"deleted": True}
# Verify entity is gone
permalink = quote("test/TestEntity")
response = await client.get(f"{project_url}/knowledge/entities/{permalink}")
assert response.status_code == 404
@pytest.mark.asyncio
async def test_delete_single_entity_by_title(client: AsyncClient, project_url):
"""Test DELETE /knowledge/entities with file path."""
# Create test entity
entity_data = {"title": "TestEntity", "folder": "", "entity_type": "test"}
response = await client.post(f"{project_url}/knowledge/entities", json=entity_data)
assert response.status_code == 200
data = response.json()
# Test deletion
response = await client.delete(f"{project_url}/knowledge/entities/TestEntity")
assert response.status_code == 200
assert response.json() == {"deleted": True}
# Verify entity is gone
file_path = quote(data["file_path"])
response = await client.get(f"{project_url}/knowledge/entities/{file_path}")
assert response.status_code == 404
@pytest.mark.asyncio
async def test_delete_single_entity_not_found(client: AsyncClient, project_url):
"""Test DELETE /knowledge/entities with path ID."""
# Test deletion
response = await client.delete(f"{project_url}/knowledge/entities/test-not-found")
assert response.status_code == 200
assert response.json() == {"deleted": False}
@pytest.mark.asyncio
async def test_delete_entity_bulk(client: AsyncClient, project_url):
"""Test bulk entity deletion using path IDs."""
# Create test entities
await client.post(
f"{project_url}/knowledge/entities", json={"file_path": "Entity1", "entity_type": "test"}
)
await client.post(
f"{project_url}/knowledge/entities", json={"file_path": "Entity2", "entity_type": "test"}
)
# Test deletion
response = await client.post(
f"{project_url}/knowledge/entities/delete", json={"permalinks": ["Entity1", "Entity2"]}
)
assert response.status_code == 200
assert response.json() == {"deleted": True}
# Verify entities are gone
for name in ["Entity1", "Entity2"]:
permalink = quote(f"{name}")
response = await client.get(f"{project_url}/knowledge/entities/{permalink}")
assert response.status_code == 404
@pytest.mark.asyncio
async def test_delete_nonexistent_entity(client: AsyncClient, project_url):
"""Test deleting a nonexistent entity by path ID."""
response = await client.post(
f"{project_url}/knowledge/entities/delete", json={"permalinks": ["non_existent"]}
)
assert response.status_code == 200
assert response.json() == {"deleted": True}
@pytest.mark.asyncio
async def test_entity_indexing(client: AsyncClient, project_url):
"""Test entity creation includes search indexing."""
# Create entity
response = await client.post(
f"{project_url}/knowledge/entities",
json={
"title": "SearchTest",
"folder": "",
"entity_type": "test",
"observations": ["Unique searchable observation"],
},
)
assert response.status_code == 200
# Verify it's searchable
search_response = await client.post(
f"{project_url}/search/",
json={"text": "search", "entity_types": [SearchItemType.ENTITY.value]},
)
assert search_response.status_code == 200
search_result = SearchResponse.model_validate(search_response.json())
assert len(search_result.results) == 1
assert search_result.results[0].permalink == "search-test"
assert search_result.results[0].type == SearchItemType.ENTITY.value
@pytest.mark.asyncio
async def test_entity_delete_indexing(client: AsyncClient, project_url):
"""Test deleted entities are removed from search index."""
# Create entity
response = await client.post(
f"{project_url}/knowledge/entities",
json={
"title": "DeleteTest",
"folder": "",
"entity_type": "test",
"observations": ["Searchable observation that should be removed"],
},
)
assert response.status_code == 200
entity = response.json()
# Verify it's initially searchable
search_response = await client.post(
f"{project_url}/search/",
json={"text": "delete", "entity_types": [SearchItemType.ENTITY.value]},
)
search_result = SearchResponse.model_validate(search_response.json())
assert len(search_result.results) == 1
# Delete entity
delete_response = await client.post(
f"{project_url}/knowledge/entities/delete", json={"permalinks": [entity["permalink"]]}
)
assert delete_response.status_code == 200
# Verify it's no longer searchable
search_response = await client.post(
f"{project_url}/search/", json={"text": "delete", "types": [SearchItemType.ENTITY.value]}
)
search_result = SearchResponse.model_validate(search_response.json())
assert len(search_result.results) == 0
@pytest.mark.asyncio
async def test_update_entity_basic(client: AsyncClient, project_url):
"""Test basic entity field updates."""
# Create initial entity
response = await client.post(
f"{project_url}/knowledge/entities",
json={
"title": "test",
"folder": "",
"entity_type": "test",
"content": "Initial summary",
"entity_metadata": {"status": "draft"},
},
)
entity_response = response.json()
# Update fields
entity = Entity(**entity_response, folder="")
entity.entity_metadata["status"] = "final"
entity.content = "Updated summary"
response = await client.put(
f"{project_url}/knowledge/entities/{entity.permalink}", json=entity.model_dump()
)
assert response.status_code == 200
updated = response.json()
# Verify updates
assert updated["entity_metadata"]["status"] == "final" # Preserved
response = await client.get(f"{project_url}/resource/{updated['permalink']}?content=true")
# raw markdown content
fetched = response.text
assert "Updated summary" in fetched
@pytest.mark.asyncio
async def test_update_entity_content(client: AsyncClient, project_url):
"""Test updating content for different entity types."""
# Create a note entity
response = await client.post(
f"{project_url}/knowledge/entities",
json={"title": "test-note", "folder": "", "entity_type": "note", "summary": "Test note"},
)
note = response.json()
# Update fields
entity = Entity(**note, folder="")
entity.content = "# Updated Note\n\nNew content."
response = await client.put(
f"{project_url}/knowledge/entities/{note['permalink']}", json=entity.model_dump()
)
assert response.status_code == 200
updated = response.json()
# Verify through get request to check file
response = await client.get(f"{project_url}/resource/{updated['permalink']}?content=true")
# raw markdown content
fetched = response.text
assert "# Updated Note" in fetched
assert "New content" in fetched
@pytest.mark.asyncio
async def test_update_entity_type_conversion(client: AsyncClient, project_url):
"""Test converting between note and knowledge types."""
# Create a note
note_data = {
"title": "test-note",
"folder": "",
"entity_type": "note",
"summary": "Test note",
"content": "# Test Note\n\nInitial content.",
}
response = await client.post(f"{project_url}/knowledge/entities", json=note_data)
note = response.json()
# Update fields
entity = Entity(**note, folder="")
entity.entity_type = "test"
response = await client.put(
f"{project_url}/knowledge/entities/{note['permalink']}", json=entity.model_dump()
)
assert response.status_code == 200
updated = response.json()
# Verify conversion
assert updated["entity_type"] == "test"
# Get latest to verify file format
response = await client.get(f"{project_url}/knowledge/entities/{updated['permalink']}")
knowledge = response.json()
assert knowledge.get("content") is None
@pytest.mark.asyncio
async def test_update_entity_metadata(client: AsyncClient, project_url):
"""Test updating entity metadata."""
# Create entity
data = {
"title": "test",
"folder": "",
"entity_type": "test",
"entity_metadata": {"status": "draft"},
}
response = await client.post(f"{project_url}/knowledge/entities", json=data)
entity_response = response.json()
# Update fields
entity = Entity(**entity_response, folder="")
entity.entity_metadata["status"] = "final"
entity.entity_metadata["reviewed"] = True
# Update metadata
response = await client.put(
f"{project_url}/knowledge/entities/{entity.permalink}", json=entity.model_dump()
)
assert response.status_code == 200
updated = response.json()
# Verify metadata was merged, not replaced
assert updated["entity_metadata"]["status"] == "final"
assert updated["entity_metadata"]["reviewed"] in (True, "True")
@pytest.mark.asyncio
async def test_update_entity_not_found_does_create(client: AsyncClient, project_url):
"""Test updating non-existent entity does a create"""
data = {
"title": "nonexistent",
"folder": "",
"entity_type": "test",
"observations": ["First observation", "Second observation"],
}
entity = Entity(**data)
response = await client.put(
f"{project_url}/knowledge/entities/nonexistent", json=entity.model_dump()
)
assert response.status_code == 201
@pytest.mark.asyncio
async def test_update_entity_incorrect_permalink(client: AsyncClient, project_url):
"""Test updating non-existent entity does a create"""
data = {
"title": "Test Entity",
"folder": "",
"entity_type": "test",
"observations": ["First observation", "Second observation"],
}
entity = Entity(**data)
response = await client.put(
f"{project_url}/knowledge/entities/nonexistent", json=entity.model_dump()
)
assert response.status_code == 400
@pytest.mark.asyncio
async def test_update_entity_search_index(client: AsyncClient, project_url):
"""Test search index is updated after entity changes."""
# Create entity
data = {
"title": "test",
"folder": "",
"entity_type": "test",
"content": "Initial searchable content",
}
response = await client.post(f"{project_url}/knowledge/entities", json=data)
entity_response = response.json()
# Update fields
entity = Entity(**entity_response, folder="")
entity.content = "Updated with unique sphinx marker"
response = await client.put(
f"{project_url}/knowledge/entities/{entity.permalink}", json=entity.model_dump()
)
assert response.status_code == 200
# Search should find new content
search_response = await client.post(
f"{project_url}/search/",
json={"text": "sphinx marker", "entity_types": [SearchItemType.ENTITY.value]},
)
results = search_response.json()["results"]
assert len(results) == 1
assert results[0]["permalink"] == entity.permalink
# PATCH edit entity endpoint tests
@pytest.mark.asyncio
async def test_edit_entity_append(client: AsyncClient, project_url):
"""Test appending content to an entity via PATCH endpoint."""
# Create test entity
response = await client.post(
f"{project_url}/knowledge/entities",
json={
"title": "Test Note",
"folder": "test",
"entity_type": "note",
"content": "Original content",
},
)
assert response.status_code == 200
entity = response.json()
# Edit entity with append operation
response = await client.patch(
f"{project_url}/knowledge/entities/{entity['permalink']}",
json={"operation": "append", "content": "Appended content"},
)
if response.status_code != 200:
print(f"PATCH failed with status {response.status_code}")
print(f"Response content: {response.text}")
assert response.status_code == 200
updated = response.json()
# Verify content was appended by reading the file
response = await client.get(f"{project_url}/resource/{updated['permalink']}?content=true")
file_content = response.text
assert "Original content" in file_content
assert "Appended content" in file_content
assert file_content.index("Original content") < file_content.index("Appended content")
@pytest.mark.asyncio
async def test_edit_entity_prepend(client: AsyncClient, project_url):
"""Test prepending content to an entity via PATCH endpoint."""
# Create test entity
response = await client.post(
f"{project_url}/knowledge/entities",
json={
"title": "Test Note",
"folder": "test",
"entity_type": "note",
"content": "Original content",
},
)
assert response.status_code == 200
entity = response.json()
# Edit entity with prepend operation
response = await client.patch(
f"{project_url}/knowledge/entities/{entity['permalink']}",
json={"operation": "prepend", "content": "Prepended content"},
)
if response.status_code != 200:
print(f"PATCH prepend failed with status {response.status_code}")
print(f"Response content: {response.text}")
assert response.status_code == 200
updated = response.json()
# Verify the entire file content structure
response = await client.get(f"{project_url}/resource/{updated['permalink']}?content=true")
file_content = response.text
# Expected content with frontmatter preserved and content prepended to body
expected_content = normalize_newlines("""---
title: Test Note
type: note
permalink: test/test-note
---
Prepended content
Original content""")
assert file_content.strip() == expected_content.strip()
@pytest.mark.asyncio
async def test_edit_entity_find_replace(client: AsyncClient, project_url):
"""Test find and replace operation via PATCH endpoint."""
# Create test entity
response = await client.post(
f"{project_url}/knowledge/entities",
json={
"title": "Test Note",
"folder": "test",
"entity_type": "note",
"content": "This is old content that needs updating",
},
)
assert response.status_code == 200
entity = response.json()
# Edit entity with find_replace operation
response = await client.patch(
f"{project_url}/knowledge/entities/{entity['permalink']}",
json={"operation": "find_replace", "content": "new content", "find_text": "old content"},
)
assert response.status_code == 200
updated = response.json()
# Verify content was replaced
response = await client.get(f"{project_url}/resource/{updated['permalink']}?content=true")
file_content = response.text
assert "old content" not in file_content
assert "This is new content that needs updating" in file_content
@pytest.mark.asyncio
async def test_edit_entity_find_replace_with_expected_replacements(
client: AsyncClient, project_url
):
"""Test find and replace with expected_replacements parameter."""
# Create test entity with repeated text
response = await client.post(
f"{project_url}/knowledge/entities",
json={
"title": "Sample Note",
"folder": "docs",
"entity_type": "note",
"content": "The word banana appears here. Another banana word here.",
},
)
assert response.status_code == 200
entity = response.json()
# Edit entity with find_replace operation, expecting 2 replacements
response = await client.patch(
f"{project_url}/knowledge/entities/{entity['permalink']}",
json={
"operation": "find_replace",
"content": "apple",
"find_text": "banana",
"expected_replacements": 2,
},
)
assert response.status_code == 200
updated = response.json()
# Verify both instances were replaced
response = await client.get(f"{project_url}/resource/{updated['permalink']}?content=true")
file_content = response.text
assert "The word apple appears here. Another apple word here." in file_content
@pytest.mark.asyncio
async def test_edit_entity_replace_section(client: AsyncClient, project_url):
"""Test replacing a section via PATCH endpoint."""
# Create test entity with sections
content = """# Main Title
## Section 1
Original section 1 content
## Section 2
Original section 2 content"""
response = await client.post(
f"{project_url}/knowledge/entities",
json={
"title": "Sample Note",
"folder": "docs",
"entity_type": "note",
"content": content,
},
)
assert response.status_code == 200
entity = response.json()
# Edit entity with replace_section operation
response = await client.patch(
f"{project_url}/knowledge/entities/{entity['permalink']}",
json={
"operation": "replace_section",
"content": "New section 1 content",
"section": "## Section 1",
},
)
assert response.status_code == 200
updated = response.json()
# Verify section was replaced
response = await client.get(f"{project_url}/resource/{updated['permalink']}?content=true")
file_content = response.text
assert "New section 1 content" in file_content
assert "Original section 1 content" not in file_content
assert "Original section 2 content" in file_content # Other sections preserved
@pytest.mark.asyncio
async def test_edit_entity_not_found(client: AsyncClient, project_url):
"""Test editing a non-existent entity returns 400."""
response = await client.patch(
f"{project_url}/knowledge/entities/non-existent",
json={"operation": "append", "content": "content"},
)
assert response.status_code == 400
assert "Entity not found" in response.json()["detail"]
@pytest.mark.asyncio
async def test_edit_entity_invalid_operation(client: AsyncClient, project_url):
"""Test editing with invalid operation returns 400."""
# Create test entity
response = await client.post(
f"{project_url}/knowledge/entities",
json={
"title": "Test Note",
"folder": "test",
"entity_type": "note",
"content": "Original content",
},
)
assert response.status_code == 200
entity = response.json()
# Try invalid operation
response = await client.patch(
f"{project_url}/knowledge/entities/{entity['permalink']}",
json={"operation": "invalid_operation", "content": "content"},
)
assert response.status_code == 422
assert "invalid_operation" in response.json()["detail"][0]["input"]
@pytest.mark.asyncio
async def test_edit_entity_find_replace_missing_find_text(client: AsyncClient, project_url):
"""Test find_replace without find_text returns 400."""
# Create test entity
response = await client.post(
f"{project_url}/knowledge/entities",
json={
"title": "Test Note",
"folder": "test",
"entity_type": "note",
"content": "Original content",
},
)
assert response.status_code == 200
entity = response.json()
# Try find_replace without find_text
response = await client.patch(
f"{project_url}/knowledge/entities/{entity['permalink']}",
json={"operation": "find_replace", "content": "new content"},
)
assert response.status_code == 400
assert "find_text is required" in response.json()["detail"]
@pytest.mark.asyncio
async def test_edit_entity_replace_section_missing_section(client: AsyncClient, project_url):
"""Test replace_section without section parameter returns 400."""
# Create test entity
response = await client.post(
f"{project_url}/knowledge/entities",
json={
"title": "Test Note",
"folder": "test",
"entity_type": "note",
"content": "Original content",
},
)
assert response.status_code == 200
entity = response.json()
# Try replace_section without section
response = await client.patch(
f"{project_url}/knowledge/entities/{entity['permalink']}",
json={"operation": "replace_section", "content": "new content"},
)
assert response.status_code == 400
assert "section is required" in response.json()["detail"]
@pytest.mark.asyncio
async def test_edit_entity_find_replace_not_found(client: AsyncClient, project_url):
"""Test find_replace when text is not found returns 400."""
# Create test entity
response = await client.post(
f"{project_url}/knowledge/entities",
json={
"title": "Test Note",
"folder": "test",
"entity_type": "note",
"content": "This is some content",
},
)
assert response.status_code == 200
entity = response.json()
# Try to replace text that doesn't exist
response = await client.patch(
f"{project_url}/knowledge/entities/{entity['permalink']}",
json={"operation": "find_replace", "content": "new content", "find_text": "nonexistent"},
)
assert response.status_code == 400
assert "Text to replace not found" in response.json()["detail"]
@pytest.mark.asyncio
async def test_edit_entity_find_replace_wrong_expected_count(client: AsyncClient, project_url):
"""Test find_replace with wrong expected_replacements count returns 400."""
# Create test entity with repeated text
response = await client.post(
f"{project_url}/knowledge/entities",
json={
"title": "Sample Note",
"folder": "docs",
"entity_type": "note",
"content": "The word banana appears here. Another banana word here.",
},
)
assert response.status_code == 200
entity = response.json()
# Try to replace with wrong expected count
response = await client.patch(
f"{project_url}/knowledge/entities/{entity['permalink']}",
json={
"operation": "find_replace",
"content": "replacement",
"find_text": "banana",
"expected_replacements": 1, # Wrong - there are actually 2
},
)
assert response.status_code == 400
assert "Expected 1 occurrences" in response.json()["detail"]
assert "but found 2" in response.json()["detail"]
@pytest.mark.asyncio
async def test_edit_entity_search_reindex(client: AsyncClient, project_url):
"""Test that edited entities are reindexed for search."""
# Create test entity
response = await client.post(
f"{project_url}/knowledge/entities",
json={
"title": "Search Test",
"folder": "test",
"entity_type": "note",
"content": "Original searchable content",
},
)
assert response.status_code == 200
entity = response.json()
# Edit the entity
response = await client.patch(
f"{project_url}/knowledge/entities/{entity['permalink']}",
json={"operation": "append", "content": " with unique zebra marker"},
)
assert response.status_code == 200
# Search should find the new content
search_response = await client.post(
f"{project_url}/search/",
json={"text": "zebra marker", "entity_types": ["entity"]},
)
results = search_response.json()["results"]
assert len(results) == 1
assert results[0]["permalink"] == entity["permalink"]
# Move entity endpoint tests
@pytest.mark.asyncio
async def test_move_entity_success(client: AsyncClient, project_url):
"""Test successfully moving an entity to a new location."""
# Create test entity
response = await client.post(
f"{project_url}/knowledge/entities",
json={
"title": "TestNote",
"folder": "source",
"entity_type": "note",
"content": "Test content",
},
)
assert response.status_code == 200
entity = response.json()
original_permalink = entity["permalink"]
# Move entity
move_data = {
"identifier": original_permalink,
"destination_path": "target/MovedNote.md",
}
response = await client.post(f"{project_url}/knowledge/move", json=move_data)
assert response.status_code == 200
response_model = EntityResponse.model_validate(response.json())
assert response_model.file_path == "target/MovedNote.md"
# Verify original entity no longer exists
response = await client.get(f"{project_url}/knowledge/entities/{original_permalink}")
assert response.status_code == 404
# Verify entity exists at new location
response = await client.get(f"{project_url}/knowledge/entities/target/moved-note")
assert response.status_code == 200
moved_entity = response.json()
assert moved_entity["file_path"] == "target/MovedNote.md"
assert moved_entity["permalink"] == "target/moved-note"
# Verify file content using resource endpoint
response = await client.get(f"{project_url}/resource/target/moved-note?content=true")
assert response.status_code == 200
file_content = response.text
assert "Test content" in file_content
@pytest.mark.asyncio
async def test_move_entity_with_folder_creation(client: AsyncClient, project_url):
"""Test moving entity creates necessary folders."""
# Create test entity
response = await client.post(
f"{project_url}/knowledge/entities",
json={
"title": "TestNote",
"folder": "",
"entity_type": "note",
"content": "Test content",
},
)
assert response.status_code == 200
entity = response.json()
# Move to deeply nested path
move_data = {
"identifier": entity["permalink"],
"destination_path": "deeply/nested/folder/MovedNote.md",
}
response = await client.post(f"{project_url}/knowledge/move", json=move_data)
assert response.status_code == 200
# Verify entity exists at new location
response = await client.get(f"{project_url}/knowledge/entities/deeply/nested/folder/moved-note")
assert response.status_code == 200
moved_entity = response.json()
assert moved_entity["file_path"] == "deeply/nested/folder/MovedNote.md"
@pytest.mark.asyncio
async def test_move_entity_with_observations_and_relations(client: AsyncClient, project_url):
"""Test moving entity preserves observations and relations."""
# Create test entity with complex content
content = """# Complex Entity
## Observations
- [note] Important observation #tag1
- [feature] Key feature #feature
- relation to [[SomeOtherEntity]]
- depends on [[Dependency]]
Some additional content."""
response = await client.post(
f"{project_url}/knowledge/entities",
json={
"title": "ComplexEntity",
"folder": "source",
"entity_type": "note",
"content": content,
},
)
assert response.status_code == 200
entity = response.json()
# Verify original observations and relations
assert len(entity["observations"]) == 2
assert len(entity["relations"]) == 2
# Move entity
move_data = {
"identifier": entity["permalink"],
"destination_path": "target/MovedComplex.md",
}
response = await client.post(f"{project_url}/knowledge/move", json=move_data)
assert response.status_code == 200
# Verify moved entity preserves data
response = await client.get(f"{project_url}/knowledge/entities/target/moved-complex")
assert response.status_code == 200
moved_entity = response.json()
# Check observations preserved
assert len(moved_entity["observations"]) == 2
obs_categories = {obs["category"] for obs in moved_entity["observations"]}
assert obs_categories == {"note", "feature"}
# Check relations preserved
assert len(moved_entity["relations"]) == 2
rel_types = {rel["relation_type"] for rel in moved_entity["relations"]}
assert rel_types == {"relation to", "depends on"}
# Verify file content preserved
response = await client.get(f"{project_url}/resource/target/moved-complex?content=true")
assert response.status_code == 200
file_content = response.text
assert "Important observation #tag1" in file_content
assert "[[SomeOtherEntity]]" in file_content
@pytest.mark.asyncio
async def test_move_entity_search_reindexing(client: AsyncClient, project_url):
"""Test that moved entities are properly reindexed for search."""
# Create searchable entity
response = await client.post(
f"{project_url}/knowledge/entities",
json={
"title": "SearchableNote",
"folder": "source",
"entity_type": "note",
"content": "Unique searchable elephant content",
},
)
assert response.status_code == 200
entity = response.json()
# Move entity
move_data = {
"identifier": entity["permalink"],
"destination_path": "target/MovedSearchable.md",
}
response = await client.post(f"{project_url}/knowledge/move", json=move_data)
assert response.status_code == 200
# Search should find entity at new location
search_response = await client.post(
f"{project_url}/search/",
json={"text": "elephant", "entity_types": [SearchItemType.ENTITY.value]},
)
results = search_response.json()["results"]
assert len(results) == 1
assert results[0]["permalink"] == "target/moved-searchable"
@pytest.mark.asyncio
async def test_move_entity_not_found(client: AsyncClient, project_url):
"""Test moving non-existent entity returns 400 error."""
move_data = {
"identifier": "non-existent-entity",
"destination_path": "target/SomeFile.md",
}
response = await client.post(f"{project_url}/knowledge/move", json=move_data)
assert response.status_code == 400
assert "Entity not found" in response.json()["detail"]
@pytest.mark.asyncio
async def test_move_entity_invalid_destination_path(client: AsyncClient, project_url):
"""Test moving entity with invalid destination path."""
# Create test entity
response = await client.post(
f"{project_url}/knowledge/entities",
json={
"title": "TestNote",
"folder": "",
"entity_type": "note",
"content": "Test content",
},
)
assert response.status_code == 200
entity = response.json()
# Test various invalid paths
invalid_paths = [
"/absolute/path.md", # Absolute path
"../parent/path.md", # Parent directory
"", # Empty string
" ", # Whitespace only
]
for invalid_path in invalid_paths:
move_data = {
"identifier": entity["permalink"],
"destination_path": invalid_path,
}
response = await client.post(f"{project_url}/knowledge/move", json=move_data)
assert response.status_code == 422 # Validation error
@pytest.mark.asyncio
async def test_move_entity_destination_exists(client: AsyncClient, project_url):
"""Test moving entity to existing destination returns error."""
# Create source entity
response = await client.post(
f"{project_url}/knowledge/entities",
json={
"title": "SourceNote",
"folder": "source",
"entity_type": "note",
"content": "Source content",
},
)
assert response.status_code == 200
source_entity = response.json()
# Create destination entity
response = await client.post(
f"{project_url}/knowledge/entities",
json={
"title": "DestinationNote",
"folder": "target",
"entity_type": "note",
"content": "Destination content",
},
)
assert response.status_code == 200
# Try to move source to existing destination
move_data = {
"identifier": source_entity["permalink"],
"destination_path": "target/DestinationNote.md",
}
response = await client.post(f"{project_url}/knowledge/move", json=move_data)
assert response.status_code == 400
assert "already exists" in response.json()["detail"]
@pytest.mark.asyncio
async def test_move_entity_missing_identifier(client: AsyncClient, project_url):
"""Test move request with missing identifier."""
move_data = {
"destination_path": "target/SomeFile.md",
}
response = await client.post(f"{project_url}/knowledge/move", json=move_data)
assert response.status_code == 422 # Validation error
@pytest.mark.asyncio
async def test_move_entity_missing_destination(client: AsyncClient, project_url):
"""Test move request with missing destination path."""
move_data = {
"identifier": "some-entity",
}
response = await client.post(f"{project_url}/knowledge/move", json=move_data)
assert response.status_code == 422 # Validation error
@pytest.mark.asyncio
async def test_move_entity_by_file_path(client: AsyncClient, project_url):
"""Test moving entity using file path as identifier."""
# Create test entity
response = await client.post(
f"{project_url}/knowledge/entities",
json={
"title": "TestNote",
"folder": "source",
"entity_type": "note",
"content": "Test content",
},
)
assert response.status_code == 200
entity = response.json()
# Move using file path as identifier
move_data = {
"identifier": entity["file_path"],
"destination_path": "target/MovedByPath.md",
}
response = await client.post(f"{project_url}/knowledge/move", json=move_data)
assert response.status_code == 200
# Verify entity exists at new location
response = await client.get(f"{project_url}/knowledge/entities/target/moved-by-path")
assert response.status_code == 200
moved_entity = response.json()
assert moved_entity["file_path"] == "target/MovedByPath.md"
@pytest.mark.asyncio
async def test_move_entity_by_title(client: AsyncClient, project_url):
"""Test moving entity using title as identifier."""
# Create test entity with unique title
response = await client.post(
f"{project_url}/knowledge/entities",
json={
"title": "UniqueTestTitle",
"folder": "source",
"entity_type": "note",
"content": "Test content",
},
)
assert response.status_code == 200
# Move using title as identifier
move_data = {
"identifier": "UniqueTestTitle",
"destination_path": "target/MovedByTitle.md",
}
response = await client.post(f"{project_url}/knowledge/move", json=move_data)
assert response.status_code == 200
# Verify entity exists at new location
response = await client.get(f"{project_url}/knowledge/entities/target/moved-by-title")
assert response.status_code == 200
moved_entity = response.json()
assert moved_entity["file_path"] == "target/MovedByTitle.md"
assert moved_entity["title"] == "UniqueTestTitle"
```