#
tokens: 42640/50000 4/348 files (page 14/17)
lines: off (toggle) GitHub
raw markdown copy
This is page 14 of 17. Use http://codebase.md/basicmachines-co/basic-memory?page={x} to view the full context.

# Directory Structure

```
├── .claude
│   ├── agents
│   │   ├── python-developer.md
│   │   └── system-architect.md
│   └── commands
│       ├── release
│       │   ├── beta.md
│       │   ├── changelog.md
│       │   ├── release-check.md
│       │   └── release.md
│       ├── spec.md
│       └── test-live.md
├── .dockerignore
├── .github
│   ├── dependabot.yml
│   ├── ISSUE_TEMPLATE
│   │   ├── bug_report.md
│   │   ├── config.yml
│   │   ├── documentation.md
│   │   └── feature_request.md
│   └── workflows
│       ├── claude-code-review.yml
│       ├── claude-issue-triage.yml
│       ├── claude.yml
│       ├── dev-release.yml
│       ├── docker.yml
│       ├── pr-title.yml
│       ├── release.yml
│       └── test.yml
├── .gitignore
├── .python-version
├── CHANGELOG.md
├── CITATION.cff
├── CLA.md
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── docker-compose.yml
├── Dockerfile
├── docs
│   ├── ai-assistant-guide-extended.md
│   ├── character-handling.md
│   ├── cloud-cli.md
│   └── Docker.md
├── justfile
├── LICENSE
├── llms-install.md
├── pyproject.toml
├── README.md
├── SECURITY.md
├── smithery.yaml
├── specs
│   ├── SPEC-1 Specification-Driven Development Process.md
│   ├── SPEC-10 Unified Deployment Workflow and Event Tracking.md
│   ├── SPEC-11 Basic Memory API Performance Optimization.md
│   ├── SPEC-12 OpenTelemetry Observability.md
│   ├── SPEC-13 CLI Authentication with Subscription Validation.md
│   ├── SPEC-14 Cloud Git Versioning & GitHub Backup.md
│   ├── SPEC-14- Cloud Git Versioning & GitHub Backup.md
│   ├── SPEC-15 Configuration Persistence via Tigris for Cloud Tenants.md
│   ├── SPEC-16 MCP Cloud Service Consolidation.md
│   ├── SPEC-17 Semantic Search with ChromaDB.md
│   ├── SPEC-18 AI Memory Management Tool.md
│   ├── SPEC-19 Sync Performance and Memory Optimization.md
│   ├── SPEC-2 Slash Commands Reference.md
│   ├── SPEC-3 Agent Definitions.md
│   ├── SPEC-4 Notes Web UI Component Architecture.md
│   ├── SPEC-5 CLI Cloud Upload via WebDAV.md
│   ├── SPEC-6 Explicit Project Parameter Architecture.md
│   ├── SPEC-7 POC to spike Tigris Turso for local access to cloud data.md
│   ├── SPEC-8 TigrisFS Integration.md
│   ├── SPEC-9 Multi-Project Bidirectional Sync Architecture.md
│   ├── SPEC-9 Signed Header Tenant Information.md
│   └── SPEC-9-1 Follow-Ups- Conflict, Sync, and Observability.md
├── src
│   └── basic_memory
│       ├── __init__.py
│       ├── alembic
│       │   ├── alembic.ini
│       │   ├── env.py
│       │   ├── migrations.py
│       │   ├── script.py.mako
│       │   └── versions
│       │       ├── 3dae7c7b1564_initial_schema.py
│       │       ├── 502b60eaa905_remove_required_from_entity_permalink.py
│       │       ├── 5fe1ab1ccebe_add_projects_table.py
│       │       ├── 647e7a75e2cd_project_constraint_fix.py
│       │       ├── 9d9c1cb7d8f5_add_mtime_and_size_columns_to_entity_.py
│       │       ├── a1b2c3d4e5f6_fix_project_foreign_keys.py
│       │       ├── b3c3938bacdb_relation_to_name_unique_index.py
│       │       ├── cc7172b46608_update_search_index_schema.py
│       │       └── e7e1f4367280_add_scan_watermark_tracking_to_project.py
│       ├── api
│       │   ├── __init__.py
│       │   ├── app.py
│       │   ├── routers
│       │   │   ├── __init__.py
│       │   │   ├── directory_router.py
│       │   │   ├── importer_router.py
│       │   │   ├── knowledge_router.py
│       │   │   ├── management_router.py
│       │   │   ├── memory_router.py
│       │   │   ├── project_router.py
│       │   │   ├── prompt_router.py
│       │   │   ├── resource_router.py
│       │   │   ├── search_router.py
│       │   │   └── utils.py
│       │   └── template_loader.py
│       ├── cli
│       │   ├── __init__.py
│       │   ├── app.py
│       │   ├── auth.py
│       │   ├── commands
│       │   │   ├── __init__.py
│       │   │   ├── cloud
│       │   │   │   ├── __init__.py
│       │   │   │   ├── api_client.py
│       │   │   │   ├── bisync_commands.py
│       │   │   │   ├── cloud_utils.py
│       │   │   │   ├── core_commands.py
│       │   │   │   ├── mount_commands.py
│       │   │   │   ├── rclone_config.py
│       │   │   │   ├── rclone_installer.py
│       │   │   │   ├── upload_command.py
│       │   │   │   └── upload.py
│       │   │   ├── command_utils.py
│       │   │   ├── db.py
│       │   │   ├── import_chatgpt.py
│       │   │   ├── import_claude_conversations.py
│       │   │   ├── import_claude_projects.py
│       │   │   ├── import_memory_json.py
│       │   │   ├── mcp.py
│       │   │   ├── project.py
│       │   │   ├── status.py
│       │   │   ├── sync.py
│       │   │   └── tool.py
│       │   └── main.py
│       ├── config.py
│       ├── db.py
│       ├── deps.py
│       ├── file_utils.py
│       ├── ignore_utils.py
│       ├── importers
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── chatgpt_importer.py
│       │   ├── claude_conversations_importer.py
│       │   ├── claude_projects_importer.py
│       │   ├── memory_json_importer.py
│       │   └── utils.py
│       ├── markdown
│       │   ├── __init__.py
│       │   ├── entity_parser.py
│       │   ├── markdown_processor.py
│       │   ├── plugins.py
│       │   ├── schemas.py
│       │   └── utils.py
│       ├── mcp
│       │   ├── __init__.py
│       │   ├── async_client.py
│       │   ├── project_context.py
│       │   ├── prompts
│       │   │   ├── __init__.py
│       │   │   ├── ai_assistant_guide.py
│       │   │   ├── continue_conversation.py
│       │   │   ├── recent_activity.py
│       │   │   ├── search.py
│       │   │   └── utils.py
│       │   ├── resources
│       │   │   ├── ai_assistant_guide.md
│       │   │   └── project_info.py
│       │   ├── server.py
│       │   └── tools
│       │       ├── __init__.py
│       │       ├── build_context.py
│       │       ├── canvas.py
│       │       ├── chatgpt_tools.py
│       │       ├── delete_note.py
│       │       ├── edit_note.py
│       │       ├── list_directory.py
│       │       ├── move_note.py
│       │       ├── project_management.py
│       │       ├── read_content.py
│       │       ├── read_note.py
│       │       ├── recent_activity.py
│       │       ├── search.py
│       │       ├── utils.py
│       │       ├── view_note.py
│       │       └── write_note.py
│       ├── models
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── knowledge.py
│       │   ├── project.py
│       │   └── search.py
│       ├── repository
│       │   ├── __init__.py
│       │   ├── entity_repository.py
│       │   ├── observation_repository.py
│       │   ├── project_info_repository.py
│       │   ├── project_repository.py
│       │   ├── relation_repository.py
│       │   ├── repository.py
│       │   └── search_repository.py
│       ├── schemas
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── cloud.py
│       │   ├── delete.py
│       │   ├── directory.py
│       │   ├── importer.py
│       │   ├── memory.py
│       │   ├── project_info.py
│       │   ├── prompt.py
│       │   ├── request.py
│       │   ├── response.py
│       │   ├── search.py
│       │   └── sync_report.py
│       ├── services
│       │   ├── __init__.py
│       │   ├── context_service.py
│       │   ├── directory_service.py
│       │   ├── entity_service.py
│       │   ├── exceptions.py
│       │   ├── file_service.py
│       │   ├── initialization.py
│       │   ├── link_resolver.py
│       │   ├── project_service.py
│       │   ├── search_service.py
│       │   └── service.py
│       ├── sync
│       │   ├── __init__.py
│       │   ├── background_sync.py
│       │   ├── sync_service.py
│       │   └── watch_service.py
│       ├── templates
│       │   └── prompts
│       │       ├── continue_conversation.hbs
│       │       └── search.hbs
│       └── utils.py
├── test-int
│   ├── BENCHMARKS.md
│   ├── cli
│   │   ├── test_project_commands_integration.py
│   │   ├── test_sync_commands_integration.py
│   │   └── test_version_integration.py
│   ├── conftest.py
│   ├── mcp
│   │   ├── test_build_context_underscore.py
│   │   ├── test_build_context_validation.py
│   │   ├── test_chatgpt_tools_integration.py
│   │   ├── test_default_project_mode_integration.py
│   │   ├── test_delete_note_integration.py
│   │   ├── test_edit_note_integration.py
│   │   ├── test_list_directory_integration.py
│   │   ├── test_move_note_integration.py
│   │   ├── test_project_management_integration.py
│   │   ├── test_project_state_sync_integration.py
│   │   ├── test_read_content_integration.py
│   │   ├── test_read_note_integration.py
│   │   ├── test_search_integration.py
│   │   ├── test_single_project_mcp_integration.py
│   │   └── test_write_note_integration.py
│   ├── test_db_wal_mode.py
│   ├── test_disable_permalinks_integration.py
│   └── test_sync_performance_benchmark.py
├── tests
│   ├── __init__.py
│   ├── api
│   │   ├── conftest.py
│   │   ├── test_async_client.py
│   │   ├── test_continue_conversation_template.py
│   │   ├── test_directory_router.py
│   │   ├── test_importer_router.py
│   │   ├── test_knowledge_router.py
│   │   ├── test_management_router.py
│   │   ├── test_memory_router.py
│   │   ├── test_project_router_operations.py
│   │   ├── test_project_router.py
│   │   ├── test_prompt_router.py
│   │   ├── test_relation_background_resolution.py
│   │   ├── test_resource_router.py
│   │   ├── test_search_router.py
│   │   ├── test_search_template.py
│   │   ├── test_template_loader_helpers.py
│   │   └── test_template_loader.py
│   ├── cli
│   │   ├── conftest.py
│   │   ├── test_bisync_commands.py
│   │   ├── test_cli_tools.py
│   │   ├── test_cloud_authentication.py
│   │   ├── test_cloud_utils.py
│   │   ├── test_ignore_utils.py
│   │   ├── test_import_chatgpt.py
│   │   ├── test_import_claude_conversations.py
│   │   ├── test_import_claude_projects.py
│   │   ├── test_import_memory_json.py
│   │   └── test_upload.py
│   ├── conftest.py
│   ├── db
│   │   └── test_issue_254_foreign_key_constraints.py
│   ├── importers
│   │   ├── test_importer_base.py
│   │   └── test_importer_utils.py
│   ├── markdown
│   │   ├── __init__.py
│   │   ├── test_date_frontmatter_parsing.py
│   │   ├── test_entity_parser_error_handling.py
│   │   ├── test_entity_parser.py
│   │   ├── test_markdown_plugins.py
│   │   ├── test_markdown_processor.py
│   │   ├── test_observation_edge_cases.py
│   │   ├── test_parser_edge_cases.py
│   │   ├── test_relation_edge_cases.py
│   │   └── test_task_detection.py
│   ├── mcp
│   │   ├── conftest.py
│   │   ├── test_obsidian_yaml_formatting.py
│   │   ├── test_permalink_collision_file_overwrite.py
│   │   ├── test_prompts.py
│   │   ├── test_resources.py
│   │   ├── test_tool_build_context.py
│   │   ├── test_tool_canvas.py
│   │   ├── test_tool_delete_note.py
│   │   ├── test_tool_edit_note.py
│   │   ├── test_tool_list_directory.py
│   │   ├── test_tool_move_note.py
│   │   ├── test_tool_read_content.py
│   │   ├── test_tool_read_note.py
│   │   ├── test_tool_recent_activity.py
│   │   ├── test_tool_resource.py
│   │   ├── test_tool_search.py
│   │   ├── test_tool_utils.py
│   │   ├── test_tool_view_note.py
│   │   ├── test_tool_write_note.py
│   │   └── tools
│   │       └── test_chatgpt_tools.py
│   ├── Non-MarkdownFileSupport.pdf
│   ├── repository
│   │   ├── test_entity_repository_upsert.py
│   │   ├── test_entity_repository.py
│   │   ├── test_entity_upsert_issue_187.py
│   │   ├── test_observation_repository.py
│   │   ├── test_project_info_repository.py
│   │   ├── test_project_repository.py
│   │   ├── test_relation_repository.py
│   │   ├── test_repository.py
│   │   ├── test_search_repository_edit_bug_fix.py
│   │   └── test_search_repository.py
│   ├── schemas
│   │   ├── test_base_timeframe_minimum.py
│   │   ├── test_memory_serialization.py
│   │   ├── test_memory_url_validation.py
│   │   ├── test_memory_url.py
│   │   ├── test_schemas.py
│   │   └── test_search.py
│   ├── Screenshot.png
│   ├── services
│   │   ├── test_context_service.py
│   │   ├── test_directory_service.py
│   │   ├── test_entity_service_disable_permalinks.py
│   │   ├── test_entity_service.py
│   │   ├── test_file_service.py
│   │   ├── test_initialization.py
│   │   ├── test_link_resolver.py
│   │   ├── test_project_removal_bug.py
│   │   ├── test_project_service_operations.py
│   │   ├── test_project_service.py
│   │   └── test_search_service.py
│   ├── sync
│   │   ├── test_character_conflicts.py
│   │   ├── test_sync_service_incremental.py
│   │   ├── test_sync_service.py
│   │   ├── test_sync_wikilink_issue.py
│   │   ├── test_tmp_files.py
│   │   ├── test_watch_service_edge_cases.py
│   │   ├── test_watch_service_reload.py
│   │   └── test_watch_service.py
│   ├── test_config.py
│   ├── test_db_migration_deduplication.py
│   ├── test_deps.py
│   ├── test_production_cascade_delete.py
│   └── utils
│       ├── test_file_utils.py
│       ├── test_frontmatter_obsidian_compatible.py
│       ├── test_parse_tags.py
│       ├── test_permalink_formatting.py
│       ├── test_utf8_handling.py
│       └── test_validate_project_path.py
├── uv.lock
├── v0.15.0-RELEASE-DOCS.md
└── v15-docs
    ├── api-performance.md
    ├── background-relations.md
    ├── basic-memory-home.md
    ├── bug-fixes.md
    ├── chatgpt-integration.md
    ├── cloud-authentication.md
    ├── cloud-bisync.md
    ├── cloud-mode-usage.md
    ├── cloud-mount.md
    ├── default-project-mode.md
    ├── env-file-removal.md
    ├── env-var-overrides.md
    ├── explicit-project-parameter.md
    ├── gitignore-integration.md
    ├── project-root-env-var.md
    ├── README.md
    └── sqlite-performance.md
```

# Files

--------------------------------------------------------------------------------
/specs/SPEC-9 Multi-Project Bidirectional Sync Architecture.md:
--------------------------------------------------------------------------------

```markdown
---
title: 'SPEC-9: Multi-Project Bidirectional Sync Architecture'
type: spec
permalink: specs/spec-9-multi-project-bisync
tags:
- cloud
- bisync
- architecture
- multi-project
---

# SPEC-9: Multi-Project Bidirectional Sync Architecture

## Status: ✅ Implementation Complete

**Completed Phases:**
- ✅ Phase 1: Cloud Mode Toggle & Config
- ✅ Phase 2: Bisync Updates (Multi-Project)
- ✅ Phase 3: Sync Command Dual Mode
- ✅ Phase 4: Remove Duplicate Commands & Cloud Mode Auth
- ✅ Phase 5: Mount Updates
- ✅ Phase 6: Safety & Validation
- ⏸️ Phase 7: Cloud-Side Implementation (Deferred to cloud repo)
- ✅ Phase 8.1: Testing (All test scenarios validated)
- ✅ Phase 8.2: Documentation (Core docs complete, demos pending)

**Key Achievements:**
- Unified CLI: `bm sync`, `bm project`, `bm tool` work transparently in both local and cloud modes
- Multi-project sync: Single `bm sync` operation handles all projects bidirectionally
- Cloud mode toggle: `bm cloud login` / `bm cloud logout` switches modes seamlessly
- Integrity checking: `bm cloud check` verifies file matching without data transfer
- Directory isolation: Mount and bisync use separate directories with conflict prevention
- Clean UX: No RCLONE_TEST files, clear error messages, transparent implementation

## Why

**Current State:**
SPEC-8 implemented rclone bisync for cloud file synchronization, but has several architectural limitations:
1. Syncs only a single project subdirectory (`bucket:/basic-memory`)
2. Requires separate `bm cloud` command namespace, duplicating existing CLI commands
3. Users must learn different commands for local vs cloud operations
4. RCLONE_TEST marker files clutter user directories

**Problems:**
1. **Duplicate Commands**: `bm project` vs `bm cloud project`, `bm tool` vs (no cloud equivalent)
2. **Inconsistent UX**: Same operations require different command syntax depending on mode
3. **Single Project Sync**: Users can only sync one project at a time
4. **Manual Coordination**: Creating new projects requires manual coordination between local and cloud
5. **Confusing Artifacts**: RCLONE_TEST marker files confuse users

**Goals:**
- **Unified CLI**: All existing `bm` commands work in both local and cloud mode via toggle
- **Multi-Project Sync**: Single sync operation handles all projects bidirectionally
- **Simple Mode Switch**: `bm cloud login` enables cloud mode, `logout` returns to local
- **Automatic Registration**: Projects auto-register on both local and cloud sides
- **Clean UX**: Remove unnecessary safety checks and confusing artifacts

## Cloud Access Paradigm: The Dropbox Model

**Mental Model Shift:**

Basic Memory cloud access follows the **Dropbox/iCloud paradigm** - not a per-project cloud connection model.

**What This Means:**

```
Traditional Project-Based Model (❌ Not This):
  bm cloud mount --project work      # Mount individual project
  bm cloud mount --project personal  # Mount another project
  bm cloud sync --project research   # Sync specific project
  → Multiple connections, multiple credentials, complex management

Dropbox Model (✅ This):
  bm cloud mount                     # One mount, all projects
  bm sync                            # One sync, all projects
  ~/basic-memory-cloud/              # One folder, all content
  → Single connection, organized by folders (projects)
```

**Key Principles:**

1. **Mount/Bisync = Access Methods, Not Project Tools**
   - Mount: Read-through cache to cloud (like Dropbox folder)
   - Bisync: Bidirectional sync with cloud (like Dropbox sync)
   - Both operate at **bucket level** (all projects)

2. **Projects = Organization Within Cloud Space**
   - Projects are folders within your cloud storage
   - Creating a folder creates a project (auto-discovered)
   - Projects are managed via `bm project` commands

3. **One Cloud Space Per Machine**
   - One set of IAM credentials per tenant
   - One mount point: `~/basic-memory-cloud/`
   - One bisync directory: `~/basic-memory-cloud-sync/` (default)
   - All projects accessible through this single entry point

4. **Why This Works Better**
   - **Credential Management**: One credential set, not N sets per project
   - **Resource Efficiency**: One rclone process, not N processes
   - **Familiar Pattern**: Users already understand Dropbox/iCloud
   - **Operational Simplicity**: `mount` once, `unmount` once
   - **Scales Naturally**: Add projects by creating folders, not reconfiguring cloud access

**User Journey:**

```bash
# Setup cloud access (once)
bm cloud login
bm cloud mount  # or: bm cloud setup for bisync

# Work with projects (create folders as needed)
cd ~/basic-memory-cloud/
mkdir my-new-project
echo "# Notes" > my-new-project/readme.md

# Cloud auto-discovers and registers project
# No additional cloud configuration needed
```

This paradigm shift means **mount and bisync are infrastructure concerns**, while **projects are content organization**. Users think about their knowledge, not about cloud plumbing.

## What

This spec affects:

1. **Cloud Mode Toggle** (`config.py`, `async_client.py`):
   - Add `cloud_mode` flag to `~/.basic-memory/config.json`
   - Set/unset `BASIC_MEMORY_PROXY_URL` based on cloud mode
   - `bm cloud login` enables cloud mode, `logout` disables it
   - All CLI commands respect cloud mode via existing async_client

2. **Unified CLI Commands**:
   - **Remove**: `bm cloud project` commands (duplicate of `bm project`)
   - **Enhance**: `bm sync` co-opted for bisync in cloud mode
   - **Keep**: `bm cloud login/logout/status/setup` for mode management
   - **Result**: `bm project`, `bm tool`, `bm sync` work in both modes

3. **Bisync Integration** (`bisync_commands.py`):
   - Remove `--check-access` (no RCLONE_TEST files)
   - Sync bucket root (all projects), not single subdirectory
   - Project auto-registration before sync
   - `bm sync` triggers bisync in cloud mode
   - `bm sync --watch` for continuous sync

4. **Config Structure**:
   ```json
   {
     "cloud_mode": true,
     "cloud_host": "https://cloud.basicmemory.com",
     "auth_tokens": {...},
     "bisync_config": {
       "profile": "balanced",
       "sync_dir": "~/basic-memory-cloud-sync"
     }
   }
   ```

5. **User Workflows**:
   - **Enable cloud**: `bm cloud login` → all commands work remotely
   - **Create projects**: `bm project add "name"` creates on cloud
   - **Sync files**: `bm sync` runs bisync (all projects)
   - **Use tools**: `bm tool write-note` creates notes on cloud
   - **Disable cloud**: `bm cloud logout` → back to local mode

## Implementation Tasks

### Phase 1: Cloud Mode Toggle & Config (Foundation) ✅

**1.1 Update Config Schema**
- [x] Add `cloud_mode: bool = False` to Config model
- [x] Add `bisync_config: dict` with `profile` and `sync_dir` fields
- [x] Ensure `cloud_host` field exists
- [x] Add config migration for existing users (defaults handle this)

**1.2 Update async_client.py**
- [x] Read `cloud_mode` from config (not just environment)
- [x] Set `BASIC_MEMORY_PROXY_URL` from config when `cloud_mode=true`
- [x] Priority: env var > config.cloud_host (if cloud_mode) > None (local ASGI)
- [ ] Test both local and cloud mode routing

**1.3 Update Login/Logout Commands**
- [x] `bm cloud login`: Set `cloud_mode=true` and save config
- [x] `bm cloud login`: Set `BASIC_MEMORY_PROXY_URL` environment variable
- [x] `bm cloud logout`: Set `cloud_mode=false` and save config
- [x] `bm cloud logout`: Clear `BASIC_MEMORY_PROXY_URL` environment variable
- [x] `bm cloud status`: Show current mode (local/cloud), connection status

**1.4 Skip Initialization in Cloud Mode** ✅
- [x] Update `ensure_initialization()` to check `cloud_mode` and return early
- [x] Document that `config.projects` is only used in local mode
- [x] Cloud manages its own projects via API, no local reconciliation needed

### Phase 2: Bisync Updates (Multi-Project)

**2.1 Remove RCLONE_TEST Files** ✅
- [x] Update all bisync profiles: `check_access=False`
- [x] Remove RCLONE_TEST creation from `setup_cloud_bisync()`
- [x] Remove RCLONE_TEST upload logic
- [ ] Update documentation

**2.2 Sync Bucket Root (All Projects)** ✅
- [x] Change remote path from `bucket:/basic-memory` to `bucket:/` in `build_bisync_command()`
- [x] Update `setup_cloud_bisync()` to use bucket root
- [ ] Test with multiple projects

**2.3 Project Auto-Registration (Bisync)** ✅
- [x] Add `fetch_cloud_projects()` function (GET /proxy/projects/projects)
- [x] Add `scan_local_directories()` function
- [x] Add `create_cloud_project()` function (POST /proxy/projects/projects)
- [x] Integrate into `run_bisync()`: fetch → scan → create missing → sync
- [x] Wait for API 201 response before syncing

**2.4 Bisync Directory Configuration** ✅
- [x] Add `--dir` parameter to `bm cloud bisync-setup`
- [x] Store bisync directory in config
- [x] Default to `~/basic-memory-cloud-sync/`
- [x] Add `validate_bisync_directory()` safety check
- [x] Update `get_default_mount_path()` to return fixed `~/basic-memory-cloud/`

**2.5 Sync/Status API Infrastructure** ✅ (commit d48b1dc)
- [x] Create `POST /{project}/project/sync` endpoint for background sync
- [x] Create `POST /{project}/project/status` endpoint for scan-only status
- [x] Create `SyncReportResponse` Pydantic schema
- [x] Refactor CLI `sync` command to use API endpoint
- [x] Refactor CLI `status` command to use API endpoint
- [x] Create `command_utils.py` with shared `run_sync()` function
- [x] Update `notify_container_sync()` to call `run_sync()` for each project
- [x] Update all tests to match new API-based implementation

### Phase 3: Sync Command Dual Mode ✅

**3.1 Update `bm sync` Command** ✅
- [x] Check `config.cloud_mode` at start
- [x] If `cloud_mode=false`: Run existing local sync
- [x] If `cloud_mode=true`: Run bisync
- [x] Add `--watch` parameter for continuous sync
- [x] Add `--interval` parameter (default 60 seconds)
- [x] Error if `--watch` used in local mode with helpful message

**3.2 Watch Mode for Bisync** ✅
- [x] Implement `run_bisync_watch()` with interval loop
- [x] Add `--interval` parameter (default 60 seconds)
- [x] Handle errors gracefully, continue on failure
- [x] Show sync progress and status

**3.3 Integrity Check Command** ✅
- [x] Implement `bm cloud check` command using `rclone check`
- [x] Read-only operation that verifies file matching
- [x] Error with helpful messages if rclone/bisync not set up
- [x] Support `--one-way` flag for faster checks
- [x] Transparent about rclone implementation
- [x] Suggest `bm sync` to resolve differences

**Implementation Notes:**
- `bm sync` adapts to cloud mode automatically - users don't need separate commands
- `bm cloud bisync` kept for power users with full options (--dry-run, --resync, --profile, --verbose)
- `bm cloud check` provides integrity verification without transferring data
- Design philosophy: Simplicity for everyday use, transparency about implementation

### Phase 4: Remove Duplicate Commands & Cloud Mode Auth ✅

**4.0 Cloud Mode Authentication** ✅
- [x] Update `async_client.py` to support dual auth sources
- [x] FastMCP context auth (cloud service mode) via `inject_auth_header()`
- [x] JWT token file auth (CLI cloud mode) via `CLIAuth.get_valid_token()`
- [x] Automatic token refresh for CLI cloud mode
- [x] Remove `BASIC_MEMORY_PROXY_URL` environment variable dependency
- [x] Simplify to use only `config.cloud_mode` + `config.cloud_host`

**4.1 Delete `bm cloud project` Commands** ✅
- [x] Remove `bm cloud project list` (use `bm project list`)
- [x] Remove `bm cloud project add` (use `bm project add`)
- [x] Update `core_commands.py` to remove project_app subcommands
- [x] Keep only: `login`, `logout`, `status`, `setup`, `mount`, `unmount`, bisync commands
- [x] Remove unused imports (Table, generate_permalink, os)
- [x] Clean up environment variable references in login/logout

**4.2 CLI Command Cloud Mode Integration** ✅
- [x] Add runtime `cloud_mode_enabled` checks to all CLI commands
- [x] Update `list_projects()` to conditionally authenticate based on cloud mode
- [x] Update `remove_project()` to conditionally authenticate based on cloud mode
- [x] Update `run_sync()` to conditionally authenticate based on cloud mode
- [x] Update `get_project_info()` to conditionally authenticate based on cloud mode
- [x] Update `run_status()` to conditionally authenticate based on cloud mode
- [x] Remove auth from `set_default_project()` (local-only command, no cloud version)
- [x] Create CLI integration tests (`test-int/cli/`) to validate both local and cloud modes
- [x] Replace mock-heavy CLI tests with integration tests (deleted 5 mock test files)

**4.3 OAuth Authentication Fixes** ✅
- [x] Restore missing `SettingsConfigDict` in `BasicMemoryConfig`
- [x] Fix environment variable reading with `BASIC_MEMORY_` prefix
- [x] Fix `.env` file loading
- [x] Fix extra field handling for config files
- [x] Resolve `bm cloud login` OAuth failure ("Something went wrong" error)
- [x] Implement PKCE (Proof Key for Code Exchange) for device flow
- [x] Generate code verifier and SHA256 challenge for device authorization
- [x] Send code_verifier with token polling requests
- [x] Support both PKCE-required and PKCE-optional OAuth clients
- [x] Verify authentication flow works end-to-end with staging and production
- [x] Document WorkOS requirement: redirect URI must be configured even for device flow

**4.4 Update Documentation**
- [ ] Update `cloud-cli.md` with cloud mode toggle workflow
- [ ] Document `bm cloud login` → use normal commands
- [ ] Add examples of cloud mode usage
- [ ] Document mount vs bisync directory isolation
- [ ] Add troubleshooting section

### Phase 5: Mount Updates ✅

**5.1 Fixed Mount Directory** ✅
- [x] Change mount path to `~/basic-memory-cloud/` (fixed, no tenant ID)
- [x] Update `get_default_mount_path()` function
- [x] Remove configurability (fixed location)
- [x] Update mount commands to use new path

**5.2 Mount at Bucket Root** ✅
- [x] Ensure mount uses bucket root (not subdirectory)
- [x] Test with multiple projects
- [x] Verify all projects visible in mount

**Implementation:** Mount uses fixed `~/basic-memory-cloud/` directory and syncs entire bucket root `basic-memory-{tenant_id}:{bucket_name}` for all projects.

### Phase 6: Safety & Validation ✅

**6.1 Directory Conflict Prevention** ✅
- [x] Implement `validate_bisync_directory()` check
- [x] Detect if bisync dir == mount dir
- [x] Detect if bisync dir is currently mounted
- [x] Show clear error messages with solutions

**6.2 State Management** ✅
- [x] Use `--workdir` for bisync state
- [x] Store state in `~/.basic-memory/bisync-state/{tenant-id}/`
- [x] Ensure state directory created before bisync

**Implementation:** `validate_bisync_directory()` prevents conflicts by checking directory equality and mount status. State managed in isolated `~/.basic-memory/bisync-state/{tenant-id}/` directory using `--workdir` flag.

### Phase 7: Cloud-Side Implementation (Deferred to Cloud Repo)

**7.1 Project Discovery Service (Cloud)** - Deferred
- [ ] Create `ProjectDiscoveryService` background job
- [ ] Scan `/app/data/` every 2 minutes
- [ ] Auto-register new directories as projects
- [ ] Log discovery events
- [ ] Handle errors gracefully

**7.2 Project API Updates (Cloud)** - Deferred
- [ ] Ensure `POST /proxy/projects/projects` creates directory synchronously
- [ ] Return 201 with project details
- [ ] Ensure directory ready immediately after creation

**Note:** Phase 7 is cloud-side work that belongs in the basic-memory-cloud repository. The CLI-side implementation (Phase 2.3 auto-registration) is complete and working - it calls the existing cloud API endpoints.

### Phase 8: Testing & Documentation

**8.1 Test Scenarios**
- [x] Test: Cloud mode toggle (login/logout)
- [x] Test: Local-first project creation (bisync)
- [x] Test: Cloud-first project creation (API)
- [x] Test: Multi-project bidirectional sync
- [x] Test: MCP tools in cloud mode
- [x] Test: Watch mode continuous sync
- [x] Test: Safety profile protection (max_delete implemented)
- [x] Test: No RCLONE_TEST files (check_access=False in all profiles)
- [x] Test: Mount/bisync directory isolation (validate_bisync_directory)
- [x] Test: Integrity check command (bm cloud check)

**8.2 Documentation**
- [x] Update cloud-cli.md with cloud mode instructions
- [x] Document Dropbox model paradigm
- [x] Update command reference with new commands
- [x] Document `bm sync` dual mode behavior
- [x] Document `bm cloud check` command
- [x] Document directory structure and fixed paths
- [ ] Update README with quick start
- [ ] Create migration guide for existing users
- [ ] Create video/GIF demos

### Success Criteria Checklist

- [x] `bm cloud login` enables cloud mode for all commands
- [x] `bm cloud logout` reverts to local mode
- [x] `bm project`, `bm tool`, `bm sync` work transparently in both modes
- [x] `bm sync` runs bisync in cloud mode, local sync in local mode
- [x] Single sync operation handles all projects bidirectionally
- [x] Local directories auto-create cloud projects via API
- [x] Cloud projects auto-sync to local directories
- [x] No RCLONE_TEST files in user directories
- [x] Bisync profiles provide safety via `max_delete` limits
- [x] `bm sync --watch` enables continuous sync
- [x] No duplicate `bm cloud project` commands (removed)
- [x] `bm cloud check` command for integrity verification
- [ ] Documentation covers cloud mode toggle and workflows
- [ ] Edge cases handled gracefully with clear errors

## How (High Level)

### Architecture Overview

**Cloud Mode Toggle:**
```
┌─────────────────────────────────────┐
│  bm cloud login                     │
│  ├─ Authenticate via OAuth          │
│  ├─ Set cloud_mode: true in config  │
│  └─ Set BASIC_MEMORY_PROXY_URL      │
└─────────────────────────────────────┘
           ↓
┌─────────────────────────────────────┐
│  All CLI commands use async_client  │
│  ├─ async_client checks proxy URL   │
│  ├─ If set: HTTP to cloud           │
│  └─ If not: Local ASGI              │
└─────────────────────────────────────┘
           ↓
┌─────────────────────────────────────┐
│  bm project add "work"              │
│  bm tool write-note ...             │
│  bm sync (triggers bisync)          │
│  → All work against cloud           │
└─────────────────────────────────────┘
```

**Storage Hierarchy:**
```
Cloud Container:                   Bucket:                      Local Sync Dir:
/app/data/ (mounted) ←→ production-tenant-{id}/ ←→ ~/basic-memory-cloud-sync/
├── basic-memory/               ├── basic-memory/               ├── basic-memory/
│   ├── notes/                  │   ├── notes/                  │   ├── notes/
│   └── concepts/               │   └── concepts/               │   └── concepts/
├── work-project/               ├── work-project/               ├── work-project/
│   └── tasks/                  │   └── tasks/                  │   └── tasks/
└── personal/                   └── personal/                   └── personal/
    └── journal/                    └── journal/                    └── journal/

Bidirectional sync via rclone bisync
```

### Sync Flow

**`bm sync` execution (in cloud mode):**

1. **Check cloud mode**
   ```python
   if not config.cloud_mode:
       # Run normal local file sync
       run_local_sync()
       return

   # Cloud mode: Run bisync
   ```

2. **Fetch cloud projects**
   ```python
   # GET /proxy/projects/projects (via async_client)
   cloud_projects = fetch_cloud_projects()
   cloud_project_names = {p["name"] for p in cloud_projects["projects"]}
   ```

3. **Scan local sync directory**
   ```python
   sync_dir = config.bisync_config["sync_dir"]  # ~/basic-memory-cloud-sync
   local_dirs = [d.name for d in sync_dir.iterdir()
                 if d.is_dir() and not d.name.startswith('.')]
   ```

4. **Create missing cloud projects**
   ```python
   for dir_name in local_dirs:
       if dir_name not in cloud_project_names:
           # POST /proxy/projects/projects (via async_client)
           create_cloud_project(name=dir_name)
           # Blocks until 201 response
   ```

5. **Run bisync on bucket root**
   ```bash
   rclone bisync \
     ~/basic-memory-cloud-sync \
     basic-memory-{tenant}:{bucket} \
     --filters-file ~/.basic-memory/.bmignore.rclone \
     --conflict-resolve=newer \
     --max-delete=25
   # Syncs ALL project subdirectories bidirectionally
   ```

6. **Notify cloud to refresh** (commit d48b1dc)
   ```python
   # After rclone bisync completes, sync each project's database
   for project in cloud_projects:
       # POST /{project}/project/sync (via async_client)
       # Triggers background sync for this project
       await run_sync(project=project_name)
   ```

### Key Changes

**1. Cloud Mode via Config**

**Config changes:**
```python
class Config:
    cloud_mode: bool = False
    cloud_host: str = "https://cloud.basicmemory.com"
    bisync_config: dict = {
        "profile": "balanced",
        "sync_dir": "~/basic-memory-cloud-sync"
    }
```

**async_client.py behavior:**
```python
def create_client() -> AsyncClient:
    # Check config first, then environment
    config = ConfigManager().config
    proxy_url = os.getenv("BASIC_MEMORY_PROXY_URL") or \
                (config.cloud_host if config.cloud_mode else None)

    if proxy_url:
        return AsyncClient(base_url=proxy_url)  # HTTP to cloud
    else:
        return AsyncClient(transport=ASGITransport(...))  # Local ASGI
```

**2. Login/Logout Sets Cloud Mode**

```python
# bm cloud login
async def login():
    # Existing OAuth flow...
    success = await auth.login()
    if success:
        config.cloud_mode = True
        config.save()
        os.environ["BASIC_MEMORY_PROXY_URL"] = config.cloud_host
```

```python
# bm cloud logout
def logout():
    config.cloud_mode = False
    config.save()
    os.environ.pop("BASIC_MEMORY_PROXY_URL", None)
```

**3. Remove Duplicate Commands**

**Delete:**
- `bm cloud project list` → use `bm project list`
- `bm cloud project add` → use `bm project add`

**Keep:**
- `bm cloud login` - Enable cloud mode
- `bm cloud logout` - Disable cloud mode
- `bm cloud status` - Show current mode & connection
- `bm cloud setup` - Initial bisync setup
- `bm cloud bisync` - Power-user command with full options
- `bm cloud check` - Verify file integrity between local and cloud

**4. Sync Command Dual Mode**

```python
# bm sync
def sync_command(watch: bool = False, profile: str = "balanced"):
    config = ConfigManager().config

    if config.cloud_mode:
        # Run bisync for cloud sync
        run_bisync(profile=profile, watch=watch)
    else:
        # Run local file sync
        run_local_sync()
```

**5. Remove RCLONE_TEST Files**

```python
# All profiles: check_access=False
BISYNC_PROFILES = {
    "safe": RcloneBisyncProfile(check_access=False, max_delete=10),
    "balanced": RcloneBisyncProfile(check_access=False, max_delete=25),
    "fast": RcloneBisyncProfile(check_access=False, max_delete=50),
}
```

**6. Sync Bucket Root (All Projects)**

```python
# Sync entire bucket, not subdirectory
rclone_remote = f"basic-memory-{tenant_id}:{bucket_name}"
```

## How to Evaluate

### Test Scenarios

**1. Cloud Mode Toggle**
```bash
# Start in local mode
bm project list
# → Shows local projects

# Enable cloud mode
bm cloud login
# → Authenticates, sets cloud_mode=true

bm project list
# → Now shows cloud projects (same command!)

# Disable cloud mode
bm cloud logout

bm project list
# → Back to local projects
```

**Expected:** ✅ Single command works in both modes

**2. Local-First Project Creation (Cloud Mode)**
```bash
# Enable cloud mode
bm cloud login

# Create new project locally in sync dir
mkdir ~/basic-memory-cloud-sync/my-research
echo "# Research Notes" > ~/basic-memory-cloud-sync/my-research/index.md

# Run sync (triggers bisync in cloud mode)
bm sync

# Verify:
# - Cloud project created automatically via API
# - Files synced to bucket:/my-research/
# - Cloud database updated
# - `bm project list` shows new project
```

**Expected:** ✅ Project visible in cloud project list

**3. Cloud-First Project Creation**
```bash
# In cloud mode
bm project add "work-notes"
# → Creates project on cloud (via async_client HTTP)

# Run sync
bm sync

# Verify:
# - Local directory ~/basic-memory-cloud-sync/work-notes/ created
# - Files sync bidirectionally
# - Can use `bm tool write-note` to add content remotely
```

**Expected:** ✅ Project accessible via all CLI commands

**4. Multi-Project Bidirectional Sync**
```bash
# Setup: 3 projects in cloud mode
# Modify files in all 3 locally and remotely

bm sync

# Verify:
# - All 3 projects sync simultaneously
# - Changes propagate correctly
# - No cross-project interference
```

**Expected:** ✅ All projects in sync state

**5. MCP Tools Work in Cloud Mode**
```bash
# In cloud mode
bm tool write-note \
  --title "Meeting Notes" \
  --folder "work-notes" \
  --content "Discussion points..."

# Verify:
# - Note created on cloud (via async_client HTTP)
# - Next `bm sync` pulls note to local
# - Note appears in ~/basic-memory-cloud-sync/work-notes/
```

**Expected:** ✅ Tools work transparently in cloud mode

**6. Watch Mode Continuous Sync**
```bash
# In cloud mode
bm sync --watch

# While running:
# - Create local folder → auto-creates cloud project
# - Edit files locally → syncs to cloud
# - Edit files remotely → syncs to local
# - Create project via API → appears locally

# Verify:
# - Continuous bidirectional sync
# - New projects handled automatically
# - No manual intervention needed
```

**Expected:** ✅ Seamless continuous sync

**7. Safety Profile Protection**
```bash
# Create project with 15 files locally
# Delete project from cloud (simulate error)

bm sync --profile safe

# Verify:
# - Bisync detects 15 pending deletions
# - Exceeds max_delete=10 limit
# - Aborts with clear error
# - No files deleted locally
```

**Expected:** ✅ Safety limit prevents data loss

**8. No RCLONE_TEST Files**
```bash
# After setup and multiple syncs
ls -la ~/basic-memory-cloud-sync/

# Verify:
# - No RCLONE_TEST files
# - No .rclone state files (in ~/.basic-memory/bisync-state/)
# - Clean directory structure
```

**Expected:** ✅ User directory stays clean

### Success Criteria

- [x] `bm cloud login` enables cloud mode for all commands
- [x] `bm cloud logout` reverts to local mode
- [x] `bm project`, `bm tool`, `bm sync` work in both modes transparently
- [x] `bm sync` runs bisync in cloud mode, local sync in local mode
- [x] Single sync operation handles all projects bidirectionally
- [x] Local directories auto-create cloud projects via API
- [x] Cloud projects auto-sync to local directories
- [x] No RCLONE_TEST files in user directories
- [x] Bisync profiles provide safety via `max_delete` limits
- [x] `bm sync --watch` enables continuous sync
- [x] No duplicate `bm cloud project` commands (removed)
- [x] `bm cloud check` command for integrity verification
- [ ] Documentation covers cloud mode toggle and workflows
- [ ] Edge cases handled gracefully with clear errors

## Notes

### API Contract

**Cloud must provide:**

1. **Project Management APIs:**
   - `GET /proxy/projects/projects` - List all projects
   - `POST /proxy/projects/projects` - Create project synchronously
   - `POST /proxy/sync` - Trigger cache refresh

2. **Project Discovery Service (Background):**
   - **Purpose**: Auto-register projects created via mount, direct bucket uploads, or any non-API method
   - **Interval**: Every 2 minutes
   - **Behavior**:
     - Scan `/app/data/` for directories
     - Register any directory not already in project database
     - Log discovery events
   - **Implementation**:
     ```python
     class ProjectDiscoveryService:
         """Background service to auto-discover projects from filesystem."""

         async def run(self):
             """Scan /app/data/ and register new project directories."""
             data_path = Path("/app/data")

             for dir_path in data_path.iterdir():
                 # Skip hidden and special directories
                 if not dir_path.is_dir() or dir_path.name.startswith('.'):
                     continue

                 project_name = dir_path.name

                 # Check if project already registered
                 project = await self.project_repo.get_by_name(project_name)
                 if not project:
                     # Auto-register new project
                     await self.project_repo.create(
                         name=project_name,
                         path=str(dir_path)
                     )
                     logger.info(f"Auto-discovered project: {project_name}")
     ```

**Project Creation (API-based):**
- API creates `/app/data/{project-name}/` directory
- Registers project in database
- Returns 201 with project details
- Directory ready for bisync immediately

**Project Creation (Discovery-based):**
- User creates folder via mount: `~/basic-memory-cloud/new-project/`
- Files appear in `/app/data/new-project/` (mounted bucket)
- Discovery service finds directory on next scan (within 2 minutes)
- Auto-registers as project
- User sees project in `bm project list` after discovery

**Why Both Methods:**
- **API**: Immediate registration when using bisync (client-side scan + API call)
- **Discovery**: Delayed registration when using mount (no API call hook)
- **Result**: Projects created ANY way (API, mount, bisync, WebDAV) eventually registered
- **Trade-off**: 2-minute delay for mount-created projects is acceptable

### Mount vs Bisync Directory Isolation

**Critical Safety Requirement**: Mount and bisync MUST use different directories to prevent conflicts.

**The Dropbox Model Applied:**

Both mount and bisync operate at **bucket level** (all projects), following the Dropbox/iCloud paradigm:

```
~/basic-memory-cloud/          # Mount: Read-through cache (like Dropbox folder)
├── work-notes/
├── personal/
└── research/

~/basic-memory-cloud-sync/         # Bisync: Bidirectional sync (like Dropbox sync folder)
├── work-notes/
├── personal/
└── research/
```

**Mount Directory (Fixed):**
```bash
# Fixed location, not configurable
~/basic-memory-cloud/
```
- **Scope**: Entire bucket (all projects)
- **Method**: NFS mount via `rclone nfsmount`
- **Behavior**: Read-through cache to cloud bucket
- **Credentials**: One IAM credential set per tenant
- **Process**: One rclone mount process
- **Use Case**: Quick access, browsing, light editing
- **Known Issue**: Obsidian compatibility problems with NFS
- **Not Configurable**: Fixed location prevents user error

**Why Fixed Location:**
- One mount point per machine (like `/Users/you/Dropbox`)
- Prevents credential proliferation (one credential set, not N)
- Prevents multiple mount processes (resource efficiency)
- Familiar pattern users already understand
- Simple operations: `mount` once, `unmount` once

**Bisync Directory (User Configurable):**
```bash
# Default location
~/basic-memory-cloud-sync/

# User can override
bm cloud setup --dir ~/my-knowledge-base
```
- **Scope**: Entire bucket (all projects)
- **Method**: Bidirectional sync via `rclone bisync`
- **Behavior**: Full local copy with periodic sync
- **Credentials**: Same IAM credential set as mount
- **Use Case**: Full offline access, reliable editing, Obsidian support
- **Configurable**: Users may want specific locations (external drive, existing folder structure)

**Why User Configurable:**
- Users have preferences for where local copies live
- May want sync folder on external drive
- May want to integrate with existing folder structure
- Default works for most, option available for power users

**Conflict Prevention:**
```python
def validate_bisync_directory(bisync_dir: Path):
    """Ensure bisync directory doesn't conflict with mount."""
    mount_dir = Path.home() / "basic-memory-cloud"

    if bisync_dir.resolve() == mount_dir.resolve():
        raise BisyncError(
            f"Cannot use {bisync_dir} for bisync - it's the mount directory!\n"
            f"Mount and bisync must use different directories.\n\n"
            f"Options:\n"
            f"  1. Use default: ~/basic-memory-cloud-sync/\n"
            f"  2. Specify different directory: --dir ~/my-sync-folder"
        )

    # Check if mount is active at this location
    result = subprocess.run(["mount"], capture_output=True, text=True)
    if str(bisync_dir) in result.stdout and "rclone" in result.stdout:
        raise BisyncError(
            f"{bisync_dir} is currently mounted via 'bm cloud mount'\n"
            f"Cannot use mounted directory for bisync.\n\n"
            f"Either:\n"
            f"  1. Unmount first: bm cloud unmount\n"
            f"  2. Use different directory for bisync"
        )
```

**Why This Matters:**
- Mounting and syncing the SAME directory would create infinite loops
- rclone mount → bisync detects changes → syncs to bucket → mount sees changes → triggers bisync → ∞
- Separate directories = clean separation of concerns
- Mount is read-heavy caching layer, bisync is write-heavy bidirectional sync

### Future Enhancements

**Phase 2 (Not in this spec):**
- **Near Real-Time Sync**: Integrate `watch_service.py` with cloud mode
  - Watch service detects local changes (already battle-tested)
  - Queue changes in memory
  - Use `rclone copy` for individual file sync (near instant)
  - Example: `rclone copyto ~/sync/project/file.md tenant:{bucket}/project/file.md`
  - Fallback to full `rclone bisync` every N seconds for bidirectional changes
  - Provides near real-time sync without polling overhead
- Per-project bisync profiles (different safety levels per project)
- Selective project sync (exclude specific projects from sync)
- Project deletion workflow (cascade to cloud/local)
- Conflict resolution UI/CLI

**Phase 3:**
- Project sharing between tenants
- Incremental backup/restore
- Sync statistics and bandwidth monitoring
- Mobile app integration with cloud mode

### Related Specs

- **SPEC-8**: TigrisFS Integration - Original bisync implementation
- **SPEC-6**: Explicit Project Parameter Architecture - Multi-project foundations
- **SPEC-5**: CLI Cloud Upload via WebDAV - Cloud file operations

### Implementation Notes

**Architectural Simplifications:**
- **Unified CLI**: Eliminated duplicate commands by using mode toggle
- **Single Entry Point**: All commands route through `async_client` which handles mode
- **Config-Driven**: Cloud mode stored in persistent config, not just environment
- **Transparent Routing**: Existing commands work without modification in cloud mode

**Complexity Trade-offs:**
- Removed: Separate `bm cloud project` command namespace
- Removed: Complex state detection for new projects
- Removed: RCLONE_TEST marker file management
- Added: Simple cloud_mode flag and config integration
- Added: Simple project list comparison before sync
- Relied on: Existing bisync profile safety mechanisms
- Result: Significantly simpler, more maintainable code

**User Experience:**
- **Mental Model**: "Toggle cloud mode, use normal commands"
- **No Learning Curve**: Same commands work locally and in cloud
- **Minimal Config**: Just login/logout to switch modes
- **Safety**: Profile system gives users control over safety/speed trade-offs
- **"Just Works"**: Create folders anywhere, they sync automatically

**Migration Path:**
- Existing `bm cloud project` users: Use `bm project` instead
- Existing `bm cloud bisync` becomes `bm sync` in cloud mode
- Config automatically migrates on first `bm cloud login`


## Testing


Initial Setup (One Time)

1. Login to cloud and enable cloud mode:
bm cloud login
# → Authenticates via OAuth
# → Sets cloud_mode=true in config
# → Sets BASIC_MEMORY_PROXY_URL environment variable
# → All CLI commands now route to cloud

2. Check cloud mode status:
bm cloud status
# → Shows: Mode: Cloud (enabled)
# → Shows: Host: https://cloud.basicmemory.com
# → Checks cloud health

3. Set up bidirectional sync:
bm cloud bisync-setup
# Or with custom directory:
bm cloud bisync-setup --dir ~/my-sync-folder

# This will:
# → Install rclone (if not already installed)
# → Get tenant info (tenant_id, bucket_name)
# → Generate scoped IAM credentials
# → Configure rclone with credentials
# → Create sync directory (default: ~/basic-memory-cloud-sync/)
# → Validate no conflict with mount directory
# → Run initial --resync to establish baseline

Normal Usage

4. Create local project and sync:
# Create a local project directory
mkdir ~/basic-memory-cloud-sync/my-research
echo "# Research Notes" > ~/basic-memory-cloud-sync/my-research/readme.md

# Run sync
bm cloud bisync

# Auto-magic happens:
# → Checks for new local directories
# → Finds "my-research" not in cloud
# → Creates project on cloud via POST /proxy/projects/projects
# → Runs bidirectional sync (all projects)
# → Syncs to bucket root (all projects synced together)

5. Watch mode for continuous sync:
bm cloud bisync --watch
# Or with custom interval:
bm cloud bisync --watch --interval 30

# → Syncs every 60 seconds (or custom interval)
# → Auto-registers new projects on each run
# → Press Ctrl+C to stop

6. Check bisync status:
bm cloud bisync-status
# → Shows tenant ID
# → Shows sync directory path
# → Shows initialization status
# → Shows last sync time
# → Lists available profiles (safe/balanced/fast)

7. Manual sync with different profiles:
# Safe mode (max 10 deletes, preserves conflicts)
bm cloud bisync --profile safe

# Balanced mode (max 25 deletes, auto-resolve to newer) - default
bm cloud bisync --profile balanced

# Fast mode (max 50 deletes, skip verification)
bm cloud bisync --profile fast

8. Dry run to preview changes:
bm cloud bisync --dry-run
# → Shows what would be synced without making changes

9. Force resync (if needed):
bm cloud bisync --resync
# → Establishes new baseline
# → Use if sync state is corrupted

10. Check file integrity:
bm cloud check
# → Verifies all files match between local and cloud
# → Read-only operation (no data transfer)
# → Shows differences if any found

# Faster one-way check
bm cloud check --one-way
# → Only checks for missing files on destination

Verify Cloud Mode Integration

11. Test that all commands work in cloud mode:
# List cloud projects (not local)
bm project list

# Create project on cloud
bm project add "work-notes"

# Use MCP tools against cloud
bm tool write-note --title "Test" --folder "my-research" --content "Hello"

# All of these work against cloud because cloud_mode=true

12. Switch back to local mode:
bm cloud logout
# → Sets cloud_mode=false
# → Clears BASIC_MEMORY_PROXY_URL
# → All commands now work locally again

Expected Directory Structure

~/basic-memory-cloud-sync/          # Your local sync directory
├── my-research/                    # Auto-created cloud project
│   ├── readme.md
│   └── notes.md
├── work-notes/                     # Another project
│   └── tasks.md
└── personal/                       # Another project
  └── journal.md

# All sync bidirectionally with:
bucket:/                            # Cloud bucket root
├── my-research/
├── work-notes/
└── personal/

Key Points to Test

1. ✅ Cloud mode toggle works (login/logout)
2. ✅ Bisync setup validates directory (no conflict with mount)
3. ✅ Local directories auto-create cloud projects
4. ✅ All projects sync together (bucket root)
5. ✅ No RCLONE_TEST files created
6. ✅ Changes sync bidirectionally
7. ✅ Watch mode continuous sync works
8. ✅ Profile safety limits work (max_delete)
9. ✅ `bm sync` adapts to cloud mode automatically
10. ✅ `bm cloud check` verifies file integrity without side effects

```

--------------------------------------------------------------------------------
/tests/mcp/test_tool_write_note.py:
--------------------------------------------------------------------------------

```python
"""Tests for note tools that exercise the full stack with SQLite."""

from textwrap import dedent
import pytest

from basic_memory.mcp.tools import write_note, read_note, delete_note
from basic_memory.utils import normalize_newlines


@pytest.mark.asyncio
async def test_write_note(app, test_project):
    """Test creating a new note.

    Should:
    - Create entity with correct type and content
    - Save markdown content
    - Handle tags correctly
    - Return valid permalink
    """
    result = await write_note.fn(
        project=test_project.name,
        title="Test Note",
        folder="test",
        content="# Test\nThis is a test note",
        tags=["test", "documentation"],
    )

    assert result
    assert "# Created note" in result
    assert f"project: {test_project.name}" in result
    assert "file_path: test/Test Note.md" in result
    assert "permalink: test/test-note" in result
    assert "## Tags" in result
    assert "- test, documentation" in result
    assert f"[Session: Using project '{test_project.name}']" in result

    # Try reading it back via permalink
    content = await read_note.fn("test/test-note", project=test_project.name)
    assert (
        normalize_newlines(
            dedent("""
        ---
        title: Test Note
        type: note
        permalink: test/test-note
        tags:
        - test
        - documentation
        ---
        
        # Test
        This is a test note
        """).strip()
        )
        in content
    )


@pytest.mark.asyncio
async def test_write_note_no_tags(app, test_project):
    """Test creating a note without tags."""
    result = await write_note.fn(
        project=test_project.name, title="Simple Note", folder="test", content="Just some text"
    )

    assert result
    assert "# Created note" in result
    assert f"project: {test_project.name}" in result
    assert "file_path: test/Simple Note.md" in result
    assert "permalink: test/simple-note" in result
    assert f"[Session: Using project '{test_project.name}']" in result
    # Should be able to read it back
    content = await read_note.fn("test/simple-note", project=test_project.name)
    assert (
        normalize_newlines(
            dedent("""
        ---
        title: Simple Note
        type: note
        permalink: test/simple-note
        ---
        
        Just some text
        """).strip()
        )
        in content
    )


@pytest.mark.asyncio
async def test_write_note_update_existing(app, test_project):
    """Test creating a new note.

    Should:
    - Create entity with correct type and content
    - Save markdown content
    - Handle tags correctly
    - Return valid permalink
    """
    result = await write_note.fn(
        project=test_project.name,
        title="Test Note",
        folder="test",
        content="# Test\nThis is a test note",
        tags=["test", "documentation"],
    )

    assert result  # Got a valid permalink
    assert "# Created note" in result
    assert f"project: {test_project.name}" in result
    assert "file_path: test/Test Note.md" in result
    assert "permalink: test/test-note" in result
    assert "## Tags" in result
    assert "- test, documentation" in result
    assert f"[Session: Using project '{test_project.name}']" in result

    result = await write_note.fn(
        project=test_project.name,
        title="Test Note",
        folder="test",
        content="# Test\nThis is an updated note",
        tags=["test", "documentation"],
    )
    assert "# Updated note" in result
    assert f"project: {test_project.name}" in result
    assert "file_path: test/Test Note.md" in result
    assert "permalink: test/test-note" in result
    assert "## Tags" in result
    assert "- test, documentation" in result
    assert f"[Session: Using project '{test_project.name}']" in result

    # Try reading it back
    content = await read_note.fn("test/test-note", project=test_project.name)
    assert (
        normalize_newlines(
            dedent(
                """
        ---
        title: Test Note
        type: note
        permalink: test/test-note
        tags:
        - test
        - documentation
        ---
        
        # Test
        This is an updated note
        """
            ).strip()
        )
        == content
    )


@pytest.mark.asyncio
async def test_issue_93_write_note_respects_custom_permalink_new_note(app, test_project):
    """Test that write_note respects custom permalinks in frontmatter for new notes (Issue #93)"""

    # Create a note with custom permalink in frontmatter
    content_with_custom_permalink = dedent("""
        ---
        permalink: custom/my-desired-permalink
        ---

        # My New Note

        This note has a custom permalink specified in frontmatter.

        - [note] Testing if custom permalink is respected
    """).strip()

    result = await write_note.fn(
        project=test_project.name,
        title="My New Note",
        folder="notes",
        content=content_with_custom_permalink,
    )

    # Verify the custom permalink is respected
    assert "# Created note" in result
    assert f"project: {test_project.name}" in result
    assert "file_path: notes/My New Note.md" in result
    assert "permalink: custom/my-desired-permalink" in result
    assert f"[Session: Using project '{test_project.name}']" in result


@pytest.mark.asyncio
async def test_issue_93_write_note_respects_custom_permalink_existing_note(app, test_project):
    """Test that write_note respects custom permalinks when updating existing notes (Issue #93)"""

    # Step 1: Create initial note (auto-generated permalink)
    result1 = await write_note.fn(
        project=test_project.name,
        title="Existing Note",
        folder="test",
        content="Initial content without custom permalink",
    )

    assert "# Created note" in result1
    assert f"project: {test_project.name}" in result1

    # Extract the auto-generated permalink
    initial_permalink = None
    for line in result1.split("\n"):
        if line.startswith("permalink:"):
            initial_permalink = line.split(":", 1)[1].strip()
            break

    assert initial_permalink is not None

    # Step 2: Update with content that includes custom permalink in frontmatter
    updated_content = dedent("""
        ---
        permalink: custom/new-permalink
        ---

        # Existing Note

        Updated content with custom permalink in frontmatter.

        - [note] Custom permalink should be respected on update
    """).strip()

    result2 = await write_note.fn(
        project=test_project.name,
        title="Existing Note",
        folder="test",
        content=updated_content,
    )

    # Verify the custom permalink is respected
    assert "# Updated note" in result2
    assert f"project: {test_project.name}" in result2
    assert "permalink: custom/new-permalink" in result2
    assert f"permalink: {initial_permalink}" not in result2
    assert f"[Session: Using project '{test_project.name}']" in result2


@pytest.mark.asyncio
async def test_delete_note_existing(app, test_project):
    """Test deleting a new note.

    Should:
    - Create entity with correct type and content
    - Return valid permalink
    - Delete the note
    """
    result = await write_note.fn(
        project=test_project.name,
        title="Test Note",
        folder="test",
        content="# Test\nThis is a test note",
        tags=["test", "documentation"],
    )

    assert result
    assert f"project: {test_project.name}" in result

    deleted = await delete_note.fn("test/test-note", project=test_project.name)
    assert deleted is True


@pytest.mark.asyncio
async def test_delete_note_doesnt_exist(app, test_project):
    """Test deleting a new note.

    Should:
    - Delete the note
    - verify returns false
    """
    deleted = await delete_note.fn("doesnt-exist", project=test_project.name)
    assert deleted is False


@pytest.mark.asyncio
async def test_write_note_with_tag_array_from_bug_report(app, test_project):
    """Test creating a note with a tag array as reported in issue #38.

    This reproduces the exact payload from the bug report where Cursor
    was passing an array of tags and getting a type mismatch error.
    """
    # This is the exact payload from the bug report
    bug_payload = {
        "project": test_project.name,
        "title": "Title",
        "folder": "folder",
        "content": "CONTENT",
        "tags": ["hipporag", "search", "fallback", "symfony", "error-handling"],
    }

    # Try to call the function with this data directly
    result = await write_note.fn(**bug_payload)

    assert result
    assert f"project: {test_project.name}" in result
    assert "permalink: folder/title" in result
    assert "Tags" in result
    assert "hipporag" in result
    assert f"[Session: Using project '{test_project.name}']" in result


@pytest.mark.asyncio
async def test_write_note_verbose(app, test_project):
    """Test creating a new note.

    Should:
    - Create entity with correct type and content
    - Save markdown content
    - Handle tags correctly
    - Return valid permalink
    """
    result = await write_note.fn(
        project=test_project.name,
        title="Test Note",
        folder="test",
        content="""
# Test\nThis is a test note

- [note] First observation
- relates to [[Knowledge]]

""",
        tags=["test", "documentation"],
    )

    assert "# Created note" in result
    assert f"project: {test_project.name}" in result
    assert "file_path: test/Test Note.md" in result
    assert "permalink: test/test-note" in result
    assert "## Observations" in result
    assert "- note: 1" in result
    assert "## Relations" in result
    assert "## Tags" in result
    assert "- test, documentation" in result
    assert f"[Session: Using project '{test_project.name}']" in result


@pytest.mark.asyncio
async def test_write_note_preserves_custom_metadata(app, project_config, test_project):
    """Test that updating a note preserves custom metadata fields.

    Reproduces issue #36 where custom frontmatter fields like Status
    were being lost when updating notes with the write_note tool.

    Should:
    - Create a note with custom frontmatter
    - Update the note with new content
    - Verify custom frontmatter is preserved
    """
    # First, create a note with custom metadata using write_note
    await write_note.fn(
        project=test_project.name,
        title="Custom Metadata Note",
        folder="test",
        content="# Initial content",
        tags=["test"],
    )

    # Read the note to get its permalink
    content = await read_note.fn("test/custom-metadata-note", project=test_project.name)

    # Now directly update the file with custom frontmatter
    # We need to use a direct file update to add custom frontmatter
    import frontmatter

    file_path = project_config.home / "test" / "Custom Metadata Note.md"
    post = frontmatter.load(file_path)

    # Add custom frontmatter
    post["Status"] = "In Progress"
    post["Priority"] = "High"
    post["Version"] = "1.0"

    # Write the file back
    with open(file_path, "w") as f:
        f.write(frontmatter.dumps(post))

    # Now update the note using write_note
    result = await write_note.fn(
        project=test_project.name,
        title="Custom Metadata Note",
        folder="test",
        content="# Updated content",
        tags=["test", "updated"],
    )

    # Verify the update was successful
    assert (
        "Updated note\nproject: test-project\nfile_path: test/Custom Metadata Note.md"
    ) in result
    assert f"project: {test_project.name}" in result

    # Read the note back and check if custom frontmatter is preserved
    content = await read_note.fn("test/custom-metadata-note", project=test_project.name)

    # Custom frontmatter should be preserved
    assert "Status: In Progress" in content
    assert "Priority: High" in content
    # Version might be quoted as '1.0' due to YAML serialization
    assert "Version:" in content  # Just check that the field exists
    assert "1.0" in content  # And that the value exists somewhere

    # And new content should be there
    assert "# Updated content" in content

    # And tags should be updated (without # prefix)
    assert "- test" in content
    assert "- updated" in content


@pytest.mark.asyncio
async def test_write_note_preserves_content_frontmatter(app, test_project):
    """Test creating a new note."""
    await write_note.fn(
        project=test_project.name,
        title="Test Note",
        folder="test",
        content=dedent(
            """
            ---
            title: Test Note
            type: note
            version: 1.0
            author: name
            ---
            # Test

            This is a test note
            """
        ),
        tags=["test", "documentation"],
    )

    # Try reading it back via permalink
    content = await read_note.fn("test/test-note", project=test_project.name)
    assert (
        normalize_newlines(
            dedent(
                """
            ---
            title: Test Note
            type: note
            permalink: test/test-note
            version: 1.0
            author: name
            tags:
            - test
            - documentation
            ---

            # Test

            This is a test note
            """
            ).strip()
        )
        in content
    )


@pytest.mark.asyncio
async def test_write_note_permalink_collision_fix_issue_139(app, test_project):
    """Test fix for GitHub Issue #139: UNIQUE constraint failed: entity.permalink.

    This reproduces the exact scenario described in the issue:
    1. Create a note with title "Note 1"
    2. Create another note with title "Note 2"
    3. Try to create/replace first note again with same title "Note 1"

    Before the fix, step 3 would fail with UNIQUE constraint error.
    After the fix, it should either update the existing note or create with unique permalink.
    """
    # Step 1: Create first note
    result1 = await write_note.fn(
        project=test_project.name,
        title="Note 1",
        folder="test",
        content="Original content for note 1",
    )
    assert "# Created note" in result1
    assert f"project: {test_project.name}" in result1
    assert "permalink: test/note-1" in result1

    # Step 2: Create second note with different title
    result2 = await write_note.fn(
        project=test_project.name, title="Note 2", folder="test", content="Content for note 2"
    )
    assert "# Created note" in result2
    assert f"project: {test_project.name}" in result2
    assert "permalink: test/note-2" in result2

    # Step 3: Try to create/replace first note again
    # This scenario would trigger the UNIQUE constraint failure before the fix
    result3 = await write_note.fn(
        project=test_project.name,
        title="Note 1",  # Same title as first note
        folder="test",  # Same folder as first note
        content="Replacement content for note 1",  # Different content
    )

    # This should not raise a UNIQUE constraint failure error
    # It should succeed and either:
    # 1. Update the existing note (preferred behavior)
    # 2. Create a new note with unique permalink (fallback behavior)

    assert result3 is not None
    assert f"project: {test_project.name}" in result3
    assert "Updated note" in result3 or "Created note" in result3

    # The result should contain either the original permalink or a unique one
    assert "permalink: test/note-1" in result3 or "permalink: test/note-1-1" in result3

    # Verify we can read back the content
    if "permalink: test/note-1" in result3:
        # Updated existing note case
        content = await read_note.fn("test/note-1", project=test_project.name)
        assert "Replacement content for note 1" in content
    else:
        # Created new note with unique permalink case
        content = await read_note.fn(test_project.name, "test/note-1-1")
        assert "Replacement content for note 1" in content
        # Original note should still exist
        original_content = await read_note.fn(test_project.name, "test/note-1")
        assert "Original content for note 1" in original_content


@pytest.mark.asyncio
async def test_write_note_with_custom_entity_type(app, test_project):
    """Test creating a note with custom entity_type parameter.

    This test verifies the fix for Issue #144 where entity_type parameter
    was hardcoded to "note" instead of allowing custom types.
    """
    result = await write_note.fn(
        project=test_project.name,
        title="Test Guide",
        folder="guides",
        content="# Guide Content\nThis is a guide",
        tags=["guide", "documentation"],
        entity_type="guide",
    )

    assert result
    assert "# Created note" in result
    assert f"project: {test_project.name}" in result
    assert "file_path: guides/Test Guide.md" in result
    assert "permalink: guides/test-guide" in result
    assert "## Tags" in result
    assert "- guide, documentation" in result
    assert f"[Session: Using project '{test_project.name}']" in result

    # Verify the entity type is correctly set in the frontmatter
    content = await read_note.fn("guides/test-guide", project=test_project.name)
    assert (
        normalize_newlines(
            dedent("""
        ---
        title: Test Guide
        type: guide
        permalink: guides/test-guide
        tags:
        - guide
        - documentation
        ---

        # Guide Content
        This is a guide
        """).strip()
        )
        in content
    )


@pytest.mark.asyncio
async def test_write_note_with_report_entity_type(app, test_project):
    """Test creating a note with entity_type="report"."""
    result = await write_note.fn(
        project=test_project.name,
        title="Monthly Report",
        folder="reports",
        content="# Monthly Report\nThis is a monthly report",
        tags=["report", "monthly"],
        entity_type="report",
    )

    assert result
    assert "# Created note" in result
    assert f"project: {test_project.name}" in result
    assert "file_path: reports/Monthly Report.md" in result
    assert "permalink: reports/monthly-report" in result
    assert f"[Session: Using project '{test_project.name}']" in result

    # Verify the entity type is correctly set in the frontmatter
    content = await read_note.fn("reports/monthly-report", project=test_project.name)
    assert "type: report" in content
    assert "# Monthly Report" in content


@pytest.mark.asyncio
async def test_write_note_with_config_entity_type(app, test_project):
    """Test creating a note with entity_type="config"."""
    result = await write_note.fn(
        project=test_project.name,
        title="System Config",
        folder="config",
        content="# System Configuration\nThis is a config file",
        entity_type="config",
    )

    assert result
    assert "# Created note" in result
    assert f"project: {test_project.name}" in result
    assert "file_path: config/System Config.md" in result
    assert "permalink: config/system-config" in result
    assert f"[Session: Using project '{test_project.name}']" in result

    # Verify the entity type is correctly set in the frontmatter
    content = await read_note.fn("config/system-config", project=test_project.name)
    assert "type: config" in content
    assert "# System Configuration" in content


@pytest.mark.asyncio
async def test_write_note_entity_type_default_behavior(app, test_project):
    """Test that the entity_type parameter defaults to "note" when not specified.

    This ensures backward compatibility - existing code that doesn't specify
    entity_type should continue to work as before.
    """
    result = await write_note.fn(
        project=test_project.name,
        title="Default Type Test",
        folder="test",
        content="# Default Type Test\nThis should be type 'note'",
        tags=["test"],
    )

    assert result
    assert "# Created note" in result
    assert f"project: {test_project.name}" in result
    assert "file_path: test/Default Type Test.md" in result
    assert "permalink: test/default-type-test" in result
    assert f"[Session: Using project '{test_project.name}']" in result

    # Verify the entity type defaults to "note"
    content = await read_note.fn("test/default-type-test", project=test_project.name)
    assert "type: note" in content
    assert "# Default Type Test" in content


@pytest.mark.asyncio
async def test_write_note_update_existing_with_different_entity_type(app, test_project):
    """Test updating an existing note with a different entity_type."""
    # Create initial note as "note" type
    result1 = await write_note.fn(
        project=test_project.name,
        title="Changeable Type",
        folder="test",
        content="# Initial Content\nThis starts as a note",
        tags=["test"],
        entity_type="note",
    )

    assert result1
    assert "# Created note" in result1
    assert f"project: {test_project.name}" in result1

    # Update the same note with a different entity_type
    result2 = await write_note.fn(
        project=test_project.name,
        title="Changeable Type",
        folder="test",
        content="# Updated Content\nThis is now a guide",
        tags=["guide"],
        entity_type="guide",
    )

    assert result2
    assert "# Updated note" in result2
    assert f"project: {test_project.name}" in result2

    # Verify the entity type was updated
    content = await read_note.fn("test/changeable-type", project=test_project.name)
    assert "type: guide" in content
    assert "# Updated Content" in content
    assert "- guide" in content


@pytest.mark.asyncio
async def test_write_note_respects_frontmatter_entity_type(app, test_project):
    """Test that entity_type in frontmatter is respected when parameter is not provided.

    This verifies that when write_note is called without entity_type parameter,
    but the content includes frontmatter with a 'type' field, that type is respected
    instead of defaulting to 'note'.
    """
    note = dedent("""
        ---
        title: Test Guide
        type: guide
        permalink: guides/test-guide
        tags:
        - guide
        - documentation
        ---

        # Guide Content
        This is a guide
        """).strip()

    # Call write_note without entity_type parameter - it should respect frontmatter type
    result = await write_note.fn(
        project=test_project.name, title="Test Guide", folder="guides", content=note
    )

    assert result
    assert "# Created note" in result
    assert f"project: {test_project.name}" in result
    assert "file_path: guides/Test Guide.md" in result
    assert "permalink: guides/test-guide" in result
    assert f"[Session: Using project '{test_project.name}']" in result

    # Verify the entity type from frontmatter is respected (should be "guide", not "note")
    content = await read_note.fn("guides/test-guide", project=test_project.name)
    assert "type: guide" in content
    assert "# Guide Content" in content
    assert "- guide" in content
    assert "- documentation" in content


class TestWriteNoteSecurityValidation:
    """Test write_note security validation features."""

    @pytest.mark.asyncio
    async def test_write_note_blocks_path_traversal_unix(self, app, test_project):
        """Test that Unix-style path traversal attacks are blocked in folder parameter."""
        # Test various Unix-style path traversal patterns
        attack_folders = [
            "../",
            "../../",
            "../../../",
            "../secrets",
            "../../etc",
            "../../../etc/passwd_folder",
            "notes/../../../etc",
            "folder/../../outside",
            "../../../../malicious",
        ]

        for attack_folder in attack_folders:
            result = await write_note.fn(
                project=test_project.name,
                title="Test Note",
                folder=attack_folder,
                content="# Test Content\nThis should be blocked by security validation.",
            )

            assert isinstance(result, str)
            assert "# Error" in result
            assert "paths must stay within project boundaries" in result
            assert attack_folder in result

    @pytest.mark.asyncio
    async def test_write_note_blocks_path_traversal_windows(self, app, test_project):
        """Test that Windows-style path traversal attacks are blocked in folder parameter."""
        # Test various Windows-style path traversal patterns
        attack_folders = [
            "..\\",
            "..\\..\\",
            "..\\..\\..\\",
            "..\\secrets",
            "..\\..\\Windows",
            "..\\..\\..\\Windows\\System32",
            "notes\\..\\..\\..\\Windows",
            "\\\\server\\share",
            "\\\\..\\..\\Windows",
        ]

        for attack_folder in attack_folders:
            result = await write_note.fn(
                project=test_project.name,
                title="Test Note",
                folder=attack_folder,
                content="# Test Content\nThis should be blocked by security validation.",
            )

            assert isinstance(result, str)
            assert "# Error" in result
            assert "paths must stay within project boundaries" in result
            assert attack_folder in result

    @pytest.mark.asyncio
    async def test_write_note_blocks_absolute_paths(self, app, test_project):
        """Test that absolute paths are blocked in folder parameter."""
        # Test various absolute path patterns
        attack_folders = [
            "/etc",
            "/home/user",
            "/var/log",
            "/root",
            "C:\\Windows",
            "C:\\Users\\user",
            "D:\\secrets",
            "/tmp/malicious",
            "/usr/local/evil",
        ]

        for attack_folder in attack_folders:
            result = await write_note.fn(
                project=test_project.name,
                title="Test Note",
                folder=attack_folder,
                content="# Test Content\nThis should be blocked by security validation.",
            )

            assert isinstance(result, str)
            assert "# Error" in result
            assert "paths must stay within project boundaries" in result
            assert attack_folder in result

    @pytest.mark.asyncio
    async def test_write_note_blocks_home_directory_access(self, app, test_project):
        """Test that home directory access patterns are blocked in folder parameter."""
        # Test various home directory access patterns
        attack_folders = [
            "~",
            "~/",
            "~/secrets",
            "~/.ssh",
            "~/Documents",
            "~\\AppData",
            "~\\Desktop",
            "~/.env_folder",
        ]

        for attack_folder in attack_folders:
            result = await write_note.fn(
                project=test_project.name,
                title="Test Note",
                folder=attack_folder,
                content="# Test Content\nThis should be blocked by security validation.",
            )

            assert isinstance(result, str)
            assert "# Error" in result
            assert "paths must stay within project boundaries" in result
            assert attack_folder in result

    @pytest.mark.asyncio
    async def test_write_note_blocks_mixed_attack_patterns(self, app, test_project):
        """Test that mixed legitimate/attack patterns are blocked in folder parameter."""
        # Test mixed patterns that start legitimate but contain attacks
        attack_folders = [
            "notes/../../../etc",
            "docs/../../.env_folder",
            "legitimate/path/../../.ssh",
            "project/folder/../../../Windows",
            "valid/folder/../../home/user",
            "assets/../../../tmp/evil",
        ]

        for attack_folder in attack_folders:
            result = await write_note.fn(
                project=test_project.name,
                title="Test Note",
                folder=attack_folder,
                content="# Test Content\nThis should be blocked by security validation.",
            )

            assert isinstance(result, str)
            assert "# Error" in result
            assert "paths must stay within project boundaries" in result

    @pytest.mark.asyncio
    async def test_write_note_allows_safe_folder_paths(self, app, test_project):
        """Test that legitimate folder paths are still allowed."""
        # Test various safe folder patterns
        safe_folders = [
            "notes",
            "docs",
            "projects/2025",
            "archive/old-notes",
            "deep/nested/directory/structure",
            "folder/subfolder",
            "research/ml",
            "meeting-notes",
        ]

        for safe_folder in safe_folders:
            result = await write_note.fn(
                project=test_project.name,
                title=f"Test Note in {safe_folder.replace('/', '-')}",
                folder=safe_folder,
                content="# Test Content\nThis should work normally with security validation.",
                tags=["test", "security"],
            )

            # Should succeed (not a security error)
            assert isinstance(result, str)
            assert "# Error" not in result
            assert "paths must stay within project boundaries" not in result
            # Should be normal successful creation/update
            assert ("# Created note" in result) or ("# Updated note" in result)
            assert safe_folder in result  # Should show in file_path

    @pytest.mark.asyncio
    async def test_write_note_empty_folder_security(self, app, test_project):
        """Test that empty folder parameter is handled securely."""
        # Empty folder should be allowed (creates in root)
        result = await write_note.fn(
            project=test_project.name,
            title="Root Note",
            folder="",
            content="# Root Note\nThis note should be created in the project root.",
        )

        assert isinstance(result, str)
        # Empty folder should not trigger security error
        assert "# Error" not in result
        assert "paths must stay within project boundaries" not in result
        # Should succeed normally
        assert ("# Created note" in result) or ("# Updated note" in result)

    @pytest.mark.asyncio
    async def test_write_note_none_folder_security(self, app, test_project):
        """Test that default folder behavior works securely when folder is omitted."""
        # The write_note function requires folder parameter, but we can test with empty string
        # which effectively creates in project root
        result = await write_note.fn(
            project=test_project.name,
            title="Root Folder Note",
            folder="",  # Empty string instead of None since folder is required
            content="# Root Folder Note\nThis note should be created in the project root.",
        )

        assert isinstance(result, str)
        # Empty folder should not trigger security error
        assert "# Error" not in result
        assert "paths must stay within project boundaries" not in result
        # Should succeed normally
        assert ("# Created note" in result) or ("# Updated note" in result)

    @pytest.mark.asyncio
    async def test_write_note_current_directory_references_security(self, app, test_project):
        """Test that current directory references are handled securely."""
        # Test current directory references (should be safe)
        safe_folders = [
            "./notes",
            "folder/./subfolder",
            "./folder/subfolder",
        ]

        for safe_folder in safe_folders:
            result = await write_note.fn(
                project=test_project.name,
                title=f"Current Dir Test {safe_folder.replace('/', '-').replace('.', 'dot')}",
                folder=safe_folder,
                content="# Current Directory Test\nThis should work with current directory references.",
            )

            assert isinstance(result, str)
            # Should NOT contain security error message
            assert "# Error" not in result
            assert "paths must stay within project boundaries" not in result
            # Should succeed normally
            assert ("# Created note" in result) or ("# Updated note" in result)

    @pytest.mark.asyncio
    async def test_write_note_security_with_all_parameters(self, app, test_project):
        """Test security validation works with all write_note parameters."""
        # Test that security validation is applied even when all other parameters are provided
        result = await write_note.fn(
            project=test_project.name,
            title="Security Test with All Params",
            folder="../../../etc/malicious",
            content="# Malicious Content\nThis should be blocked by security validation.",
            tags=["malicious", "test"],
            entity_type="guide",
        )

        assert isinstance(result, str)
        assert "# Error" in result
        assert "paths must stay within project boundaries" in result
        assert "../../../etc/malicious" in result

    @pytest.mark.asyncio
    async def test_write_note_security_logging(self, app, test_project, caplog):
        """Test that security violations are properly logged."""
        # Attempt path traversal attack
        result = await write_note.fn(
            project=test_project.name,
            title="Security Logging Test",
            folder="../../../etc/passwd_folder",
            content="# Test Content\nThis should trigger security logging.",
        )

        assert "# Error" in result
        assert "paths must stay within project boundaries" in result

        # Check that security violation was logged
        # Note: This test may need adjustment based on the actual logging setup
        # The security validation should generate a warning log entry

    @pytest.mark.asyncio
    async def test_write_note_preserves_functionality_with_security(self, app, test_project):
        """Test that security validation doesn't break normal note creation functionality."""
        # Create a note with all features to ensure security validation doesn't interfere
        result = await write_note.fn(
            project=test_project.name,
            title="Full Feature Security Test",
            folder="security-tests",
            content=dedent("""
                # Full Feature Security Test

                This note tests that security validation doesn't break normal functionality.

                ## Observations
                - [security] Path validation working correctly #security
                - [feature] All features still functional #test

                ## Relations
                - relates_to [[Security Implementation]]
                - depends_on [[Path Validation]]

                Additional content with various formatting.
            """).strip(),
            tags=["security", "test", "full-feature"],
            entity_type="guide",
        )

        # Should succeed normally
        assert isinstance(result, str)
        assert "# Error" not in result
        assert "paths must stay within project boundaries" not in result
        assert "# Created note" in result
        assert "file_path: security-tests/Full Feature Security Test.md" in result
        assert "permalink: security-tests/full-feature-security-test" in result

        # Should process observations and relations
        assert "## Observations" in result
        assert "## Relations" in result
        assert "## Tags" in result

        # Should show proper counts
        assert "security: 1" in result
        assert "feature: 1" in result


class TestWriteNoteSecurityEdgeCases:
    """Test edge cases for write_note security validation."""

    @pytest.mark.asyncio
    async def test_write_note_unicode_folder_attacks(self, app, test_project):
        """Test that Unicode-based path traversal attempts are blocked."""
        # Test Unicode path traversal attempts
        unicode_attack_folders = [
            "notes/文档/../../../etc",  # Chinese characters
            "docs/café/../../secrets",  # Accented characters
            "files/αβγ/../../../malicious",  # Greek characters
        ]

        for attack_folder in unicode_attack_folders:
            result = await write_note.fn(
                project=test_project.name,
                title="Unicode Attack Test",
                folder=attack_folder,
                content="# Unicode Attack\nThis should be blocked.",
            )

            assert isinstance(result, str)
            assert "# Error" in result
            assert "paths must stay within project boundaries" in result

    @pytest.mark.asyncio
    async def test_write_note_very_long_attack_folder(self, app, test_project):
        """Test handling of very long attack folder paths."""
        # Create a very long path traversal attack
        long_attack_folder = "../" * 1000 + "etc/malicious"

        result = await write_note.fn(
            project=test_project.name,
            title="Long Attack Test",
            folder=long_attack_folder,
            content="# Long Attack\nThis should be blocked.",
        )

        assert isinstance(result, str)
        assert "# Error" in result
        assert "paths must stay within project boundaries" in result

    @pytest.mark.asyncio
    async def test_write_note_case_variations_attacks(self, app, test_project):
        """Test that case variations don't bypass security."""
        # Test case variations (though case sensitivity depends on filesystem)
        case_attack_folders = [
            "../ETC",
            "../Etc/SECRETS",
            "..\\WINDOWS",
            "~/SECRETS",
        ]

        for attack_folder in case_attack_folders:
            result = await write_note.fn(
                project=test_project.name,
                title="Case Variation Attack Test",
                folder=attack_folder,
                content="# Case Attack\nThis should be blocked.",
            )

            assert isinstance(result, str)
            assert "# Error" in result
            assert "paths must stay within project boundaries" in result

    @pytest.mark.asyncio
    async def test_write_note_whitespace_in_attack_folders(self, app, test_project):
        """Test that whitespace doesn't help bypass security."""
        # Test attack folders with various whitespace
        whitespace_attack_folders = [
            " ../../../etc ",
            "\t../../../secrets\t",
            " ..\\..\\Windows ",
            "notes/ ../../ malicious",
        ]

        for attack_folder in whitespace_attack_folders:
            result = await write_note.fn(
                project=test_project.name,
                title="Whitespace Attack Test",
                folder=attack_folder,
                content="# Whitespace Attack\nThis should be blocked.",
            )

            assert isinstance(result, str)
            # The attack should still be blocked even with whitespace
            if ".." in attack_folder.strip() or "~" in attack_folder.strip():
                assert "# Error" in result
                assert "paths must stay within project boundaries" in result

```

--------------------------------------------------------------------------------
/specs/SPEC-17 Semantic Search with ChromaDB.md:
--------------------------------------------------------------------------------

```markdown
---
title: 'SPEC-17: Semantic Search with ChromaDB'
type: spec
permalink: specs/spec-17-semantic-search-chromadb
tags:
- search
- chromadb
- semantic-search
- vector-database
- postgres-migration
---

# SPEC-17: Semantic Search with ChromaDB

Why ChromaDB for Knowledge Management

Your users aren't just searching for keywords - they're trying to:
- "Find notes related to this concept"
- "Show me similar ideas"
- "What else did I write about this topic?"

Example:
    # User searches: "AI ethics"

    # FTS5/MeiliSearch finds:
    - "AI ethics guidelines"     ✅
    - "ethical AI development"   ✅
    - "artificial intelligence"  ❌ No keyword match

    # ChromaDB finds:
    - "AI ethics guidelines"     ✅
    - "ethical AI development"   ✅
    - "artificial intelligence"  ✅ Semantic match!
    - "bias in ML models"        ✅ Related concept
    - "responsible technology"   ✅ Similar theme
    - "neural network fairness"  ✅ Connected idea

ChromaDB vs MeiliSearch vs Typesense

| Feature          | ChromaDB           | MeiliSearch        | Typesense          |
|------------------|--------------------|--------------------|--------------------|
| Semantic Search  | ✅ Excellent        | ❌ No               | ❌ No               |
| Keyword Search   | ⚠️ Via metadata    | ✅ Excellent        | ✅ Excellent        |
| Local Deployment | ✅ Embedded mode    | ⚠️ Server required | ⚠️ Server required |
| No Server Needed | ✅ YES!             | ❌ No               | ❌ No               |
| Embedding Cost   | ~$0.13/1M tokens   | None               | None               |
| Search Speed     | 50-200ms           | 10-50ms            | 10-50ms            |
| Best For         | Semantic discovery | Exact terms        | Exact terms        |

The Killer Feature: Embedded Mode

ChromaDB has an embedded client that runs in-process - NO SERVER NEEDED!

# Local (FOSS) - ChromaDB embedded in Python process
import chromadb

client = chromadb.PersistentClient(path="/path/to/chroma_data")
collection = client.get_or_create_collection("knowledge_base")

# Add documents
collection.add(
  ids=["note1", "note2"],
  documents=["AI ethics", "Neural networks"],
  metadatas=[{"type": "note"}, {"type": "spec"}]
)

# Search - NO API calls, runs locally!
results = collection.query(
  query_texts=["machine learning"],
  n_results=10
)


## Why

### Current Problem: Database Persistence in Cloud
In cloud deployments, `memory.db` (SQLite) doesn't persist across Docker container restarts. This means:
- Database must be rebuilt on every container restart
- Initial sync takes ~49 seconds for 500 files (after optimization in #352)
- Users experience delays on each deployment

### Search Architecture Issues
Current SQLite FTS5 implementation creates a **dual-implementation problem** for PostgreSQL migration:
- FTS5 (SQLite) uses `VIRTUAL TABLE` with `MATCH` queries
- PostgreSQL full-text search uses `TSVECTOR` with `@@` operator
- These are fundamentally incompatible architectures
- Would require **2x search code** and **2x tests** to support both

**Example of incompatibility:**
```python
# SQLite FTS5
"content_stems MATCH :text"

# PostgreSQL
"content_vector @@ plainto_tsquery(:text)"
```

### Search Quality Limitations
Current keyword-based FTS5 has limitations:
- No semantic understanding (search "AI" doesn't find "machine learning")
- No word relationships (search "neural networks" doesn't find "deep learning")
- Limited typo tolerance
- No relevance ranking beyond keyword matching

### Strategic Goal: PostgreSQL Migration
Moving to PostgreSQL (Neon) for cloud deployments would:
- ✅ Solve persistence issues (database survives restarts)
- ✅ Enable multi-tenant architecture
- ✅ Better performance for large datasets
- ✅ Support for cloud-native scaling

**But requires solving the search compatibility problem.**

## What

Migrate from SQLite FTS5 to **ChromaDB** for semantic vector search across all deployments.

**Key insight:** ChromaDB is **database-agnostic** - it works with both SQLite and PostgreSQL, eliminating the dual-implementation problem.

### Affected Areas
- Search implementation (`src/basic_memory/repository/search_repository.py`)
- Search service (`src/basic_memory/services/search_service.py`)
- Search models (`src/basic_memory/models/search.py`)
- Database initialization (`src/basic_memory/db.py`)
- MCP search tools (`src/basic_memory/mcp/tools/search.py`)
- Dependencies (`pyproject.toml` - add ChromaDB)
- Alembic migrations (FTS5 table removal)
- Documentation

### What Changes
**Removed:**
- SQLite FTS5 virtual table
- `MATCH` query syntax
- FTS5-specific tokenization and prefix handling
- ~300 lines of FTS5 query preparation code

**Added:**
- ChromaDB persistent client (embedded mode)
- Vector embedding generation
- Semantic similarity search
- Local embedding model (`sentence-transformers`)
- Collection management for multi-project support

### What Stays the Same
- Search API interface (MCP tools, REST endpoints)
- Entity/Observation/Relation indexing workflow
- Multi-project isolation
- Search filtering by type, date, metadata
- Pagination and result formatting
- **All SQL queries for exact lookups and metadata filtering**

## Hybrid Architecture: SQL + ChromaDB

**Critical Design Decision:** ChromaDB **complements** SQL, it doesn't **replace** it.

### Why Hybrid?

ChromaDB is excellent for semantic text search but terrible for exact lookups. SQL is perfect for exact lookups and structured queries. We use both:

```
┌─────────────────────────────────────────────────┐
│ Search Request                                   │
└─────────────────────────────────────────────────┘
                    ▼
       ┌────────────────────────┐
       │ SearchRepository       │
       │  (Smart Router)        │
       └────────────────────────┘
              ▼           ▼
  ┌───────────┐      ┌──────────────┐
  │ SQL       │      │ ChromaDB     │
  │ Queries   │      │ Semantic     │
  └───────────┘      └──────────────┘
       ▼                    ▼
  Exact lookups      Text search
  - Permalink        - Semantic similarity
  - Pattern match    - Related concepts
  - Title exact      - Typo tolerance
  - Metadata filter  - Fuzzy matching
  - Date ranges
```

### When to Use Each

#### Use SQL For (Fast & Exact)

**Exact Permalink Lookup:**
```python
# Find by exact permalink - SQL wins
"SELECT * FROM entities WHERE permalink = 'specs/search-feature'"
# ~1ms, perfect for exact matches

# ChromaDB would be: ~50ms, wasteful
```

**Pattern Matching:**
```python
# Find all specs - SQL wins
"SELECT * FROM entities WHERE permalink GLOB 'specs/*'"
# ~5ms, perfect for wildcards

# ChromaDB doesn't support glob patterns
```

**Pure Metadata Queries:**
```python
# Find all meetings tagged "important" - SQL wins
"SELECT * FROM entities
 WHERE json_extract(entity_metadata, '$.entity_type') = 'meeting'
 AND json_extract(entity_metadata, '$.tags') LIKE '%important%'"
# ~5ms, structured query

# No text search needed, SQL is faster and simpler
```

**Date Filtering:**
```python
# Find recent specs - SQL wins
"SELECT * FROM entities
 WHERE entity_type = 'spec'
 AND created_at > '2024-01-01'
 ORDER BY created_at DESC"
# ~2ms, perfect for structured data
```

#### Use ChromaDB For (Semantic & Fuzzy)

**Semantic Content Search:**
```python
# Find notes about "neural networks" - ChromaDB wins
collection.query(query_texts=["neural networks"])
# Finds: "machine learning", "deep learning", "AI models"
# ~50-100ms, semantic understanding

# SQL FTS5 would only find exact keyword matches
```

**Text Search + Metadata:**
```python
# Find meeting notes about "project planning" tagged "important"
collection.query(
    query_texts=["project planning"],
    where={
        "entity_type": "meeting",
        "tags": {"$contains": "important"}
    }
)
# ~100ms, semantic search with filters
# Finds: "roadmap discussion", "sprint planning", etc.
```

**Typo Tolerance:**
```python
# User types "serch feature" (typo) - ChromaDB wins
collection.query(query_texts=["serch feature"])
# Still finds: "search feature" documents
# ~50-100ms, fuzzy matching

# SQL would find nothing
```

### Performance Comparison

| Query Type | SQL | ChromaDB | Winner |
|-----------|-----|----------|--------|
| Exact permalink | 1-2ms | 50ms | ✅ SQL |
| Pattern match (specs/*) | 5-10ms | N/A | ✅ SQL |
| Pure metadata filter | 5ms | 50ms | ✅ SQL |
| Semantic text search | ❌ Can't | 50-100ms | ✅ ChromaDB |
| Text + metadata | ❌ Keywords only | 100ms | ✅ ChromaDB |
| Typo tolerance | ❌ Can't | 50ms | ✅ ChromaDB |

### Metadata/Frontmatter Handling

**Both systems support full frontmatter filtering!**

#### SQL Metadata Storage

```python
# Entities table stores frontmatter as JSON
CREATE TABLE entities (
    id INTEGER PRIMARY KEY,
    title TEXT,
    permalink TEXT,
    file_path TEXT,
    entity_type TEXT,
    entity_metadata JSON,  -- All frontmatter here!
    created_at DATETIME,
    ...
)

# Query frontmatter fields
SELECT * FROM entities
WHERE json_extract(entity_metadata, '$.entity_type') = 'meeting'
  AND json_extract(entity_metadata, '$.tags') LIKE '%important%'
  AND json_extract(entity_metadata, '$.status') = 'completed'
```

#### ChromaDB Metadata Storage

```python
# When indexing, store ALL frontmatter as metadata
class ChromaSearchBackend:
    async def index_entity(self, entity: Entity):
        """Index with complete frontmatter metadata."""

        # Extract ALL frontmatter fields
        metadata = {
            "entity_id": entity.id,
            "project_id": entity.project_id,
            "permalink": entity.permalink,
            "file_path": entity.file_path,
            "entity_type": entity.entity_type,
            "type": "entity",
            # ALL frontmatter tags
            "tags": entity.entity_metadata.get("tags", []),
            # Custom frontmatter fields
            "status": entity.entity_metadata.get("status"),
            "priority": entity.entity_metadata.get("priority"),
            # Spread any other custom fields
            **{k: v for k, v in entity.entity_metadata.items()
               if k not in ["tags", "entity_type"]}
        }

        self.collection.upsert(
            ids=[f"entity_{entity.id}_{entity.project_id}"],
            documents=[self._format_document(entity)],
            metadatas=[metadata]  # Full frontmatter!
        )
```

#### ChromaDB Metadata Queries

ChromaDB supports rich filtering:

```python
# Simple filter - single field
collection.query(
    query_texts=["project planning"],
    where={"entity_type": "meeting"}
)

# Multiple conditions (AND)
collection.query(
    query_texts=["architecture decisions"],
    where={
        "entity_type": "spec",
        "tags": {"$contains": "important"}
    }
)

# Complex filters with operators
collection.query(
    query_texts=["machine learning"],
    where={
        "$and": [
            {"entity_type": {"$in": ["note", "spec"]}},
            {"tags": {"$contains": "AI"}},
            {"created_at": {"$gt": "2024-01-01"}},
            {"status": "in-progress"}
        ]
    }
)

# Multiple tags (all must match)
collection.query(
    query_texts=["cloud architecture"],
    where={
        "$and": [
            {"tags": {"$contains": "architecture"}},
            {"tags": {"$contains": "cloud"}}
        ]
    }
)
```

### Smart Routing Implementation

```python
class SearchRepository:
    def __init__(
        self,
        session_maker: async_sessionmaker[AsyncSession],
        project_id: int,
        chroma_backend: ChromaSearchBackend
    ):
        self.sql = session_maker  # Keep SQL!
        self.chroma = chroma_backend
        self.project_id = project_id

    async def search(
        self,
        search_text: Optional[str] = None,
        permalink: Optional[str] = None,
        permalink_match: Optional[str] = None,
        title: Optional[str] = None,
        types: Optional[List[str]] = None,
        tags: Optional[List[str]] = None,
        after_date: Optional[datetime] = None,
        custom_metadata: Optional[dict] = None,
        limit: int = 10,
        offset: int = 0,
    ) -> List[SearchIndexRow]:
        """Smart routing between SQL and ChromaDB."""

        # ==========================================
        # Route 1: Exact Lookups → SQL (1-5ms)
        # ==========================================

        if permalink:
            # Exact permalink: "specs/search-feature"
            return await self._sql_permalink_lookup(permalink)

        if permalink_match:
            # Pattern match: "specs/*"
            return await self._sql_pattern_match(permalink_match)

        if title and not search_text:
            # Exact title lookup (no semantic search needed)
            return await self._sql_title_match(title)

        # ==========================================
        # Route 2: Pure Metadata → SQL (5-10ms)
        # ==========================================

        # No text search, just filtering by metadata
        if not search_text and (types or tags or after_date or custom_metadata):
            return await self._sql_metadata_filter(
                types=types,
                tags=tags,
                after_date=after_date,
                custom_metadata=custom_metadata,
                limit=limit,
                offset=offset
            )

        # ==========================================
        # Route 3: Text Search → ChromaDB (50-100ms)
        # ==========================================

        if search_text:
            # Build ChromaDB metadata filters
            where_filters = self._build_chroma_filters(
                types=types,
                tags=tags,
                after_date=after_date,
                custom_metadata=custom_metadata
            )

            # Semantic search with metadata filtering
            return await self.chroma.search(
                query_text=search_text,
                project_id=self.project_id,
                where=where_filters,
                limit=limit
            )

        # ==========================================
        # Route 4: List All → SQL (2-5ms)
        # ==========================================

        return await self._sql_list_entities(
            limit=limit,
            offset=offset
        )

    def _build_chroma_filters(
        self,
        types: Optional[List[str]] = None,
        tags: Optional[List[str]] = None,
        after_date: Optional[datetime] = None,
        custom_metadata: Optional[dict] = None
    ) -> dict:
        """Build ChromaDB where clause from filters."""
        filters = {"project_id": self.project_id}

        # Type filtering
        if types:
            if len(types) == 1:
                filters["entity_type"] = types[0]
            else:
                filters["entity_type"] = {"$in": types}

        # Tag filtering (array contains)
        if tags:
            if len(tags) == 1:
                filters["tags"] = {"$contains": tags[0]}
            else:
                # Multiple tags - all must match
                filters = {
                    "$and": [
                        filters,
                        *[{"tags": {"$contains": tag}} for tag in tags]
                    ]
                }

        # Date filtering
        if after_date:
            filters["created_at"] = {"$gt": after_date.isoformat()}

        # Custom frontmatter fields
        if custom_metadata:
            filters.update(custom_metadata)

        return filters

    async def _sql_metadata_filter(
        self,
        types: Optional[List[str]] = None,
        tags: Optional[List[str]] = None,
        after_date: Optional[datetime] = None,
        custom_metadata: Optional[dict] = None,
        limit: int = 10,
        offset: int = 0
    ) -> List[SearchIndexRow]:
        """Pure metadata queries using SQL."""
        conditions = ["project_id = :project_id"]
        params = {"project_id": self.project_id}

        if types:
            type_list = ", ".join(f"'{t}'" for t in types)
            conditions.append(f"entity_type IN ({type_list})")

        if tags:
            # Check each tag
            for i, tag in enumerate(tags):
                param_name = f"tag_{i}"
                conditions.append(
                    f"json_extract(entity_metadata, '$.tags') LIKE :{param_name}"
                )
                params[param_name] = f"%{tag}%"

        if after_date:
            conditions.append("created_at > :after_date")
            params["after_date"] = after_date

        if custom_metadata:
            for key, value in custom_metadata.items():
                param_name = f"meta_{key}"
                conditions.append(
                    f"json_extract(entity_metadata, '$.{key}') = :{param_name}"
                )
                params[param_name] = value

        where = " AND ".join(conditions)
        sql = f"""
            SELECT * FROM entities
            WHERE {where}
            ORDER BY created_at DESC
            LIMIT :limit OFFSET :offset
        """
        params["limit"] = limit
        params["offset"] = offset

        async with db.scoped_session(self.session_maker) as session:
            result = await session.execute(text(sql), params)
            return self._format_sql_results(result)
```

### Real-World Examples

#### Example 1: Pure Metadata Query (No Text)
```python
# "Find all meetings tagged 'important'"
results = await search_repo.search(
    types=["meeting"],
    tags=["important"]
)

# Routing: → SQL (~5ms)
# SQL: SELECT * FROM entities
#      WHERE entity_type = 'meeting'
#      AND json_extract(entity_metadata, '$.tags') LIKE '%important%'
```

#### Example 2: Semantic Search (No Metadata)
```python
# "Find notes about neural networks"
results = await search_repo.search(
    search_text="neural networks"
)

# Routing: → ChromaDB (~80ms)
# Finds: "machine learning", "deep learning", "AI models", etc.
```

#### Example 3: Semantic + Metadata
```python
# "Find meeting notes about 'project planning' tagged 'important'"
results = await search_repo.search(
    search_text="project planning",
    types=["meeting"],
    tags=["important"]
)

# Routing: → ChromaDB with filters (~100ms)
# ChromaDB: query_texts=["project planning"]
#           where={"entity_type": "meeting",
#                  "tags": {"$contains": "important"}}
# Finds: "roadmap discussion", "sprint planning", etc.
```

#### Example 4: Complex Frontmatter Query
```python
# "Find in-progress specs with multiple tags, recent"
results = await search_repo.search(
    types=["spec"],
    tags=["architecture", "cloud"],
    after_date=datetime(2024, 1, 1),
    custom_metadata={"status": "in-progress"}
)

# Routing: → SQL (~10ms)
# No text search, pure structured query - SQL is faster
```

#### Example 5: Semantic + Complex Metadata
```python
# "Find notes about 'authentication' that are in-progress"
results = await search_repo.search(
    search_text="authentication",
    custom_metadata={"status": "in-progress", "priority": "high"}
)

# Routing: → ChromaDB with metadata filters (~100ms)
# Semantic search for "authentication" concept
# Filters by status and priority in metadata
```

#### Example 6: Exact Permalink
```python
# "Show me specs/search-feature"
results = await search_repo.search(
    permalink="specs/search-feature"
)

# Routing: → SQL (~1ms)
# SQL: SELECT * FROM entities WHERE permalink = 'specs/search-feature'
```

#### Example 7: Pattern Match
```python
# "Show me all specs"
results = await search_repo.search(
    permalink_match="specs/*"
)

# Routing: → SQL (~5ms)
# SQL: SELECT * FROM entities WHERE permalink GLOB 'specs/*'
```

### What We Remove vs Keep

**REMOVE (FTS5-specific):**
- ❌ `CREATE VIRTUAL TABLE search_index USING fts5(...)`
- ❌ `MATCH` operator queries
- ❌ FTS5 tokenization configuration
- ❌ ~300 lines of FTS5 query preparation code
- ❌ Trigram generation and prefix handling

**KEEP (Standard SQL):**
- ✅ `SELECT * FROM entities WHERE permalink = :permalink`
- ✅ `SELECT * FROM entities WHERE permalink GLOB :pattern`
- ✅ `SELECT * FROM entities WHERE title LIKE :title`
- ✅ `SELECT * FROM entities WHERE json_extract(entity_metadata, ...) = :value`
- ✅ All date filtering, pagination, sorting
- ✅ Entity table structure and indexes

**ADD (ChromaDB):**
- ✅ ChromaDB persistent client (embedded)
- ✅ Semantic vector search
- ✅ Metadata filtering in ChromaDB
- ✅ Smart routing logic

## How (High Level)

### Architecture Overview

```
┌─────────────────────────────────────────────────────────────┐
│ FOSS Deployment (Local)                                      │
├─────────────────────────────────────────────────────────────┤
│ SQLite (data) + ChromaDB embedded (search)                   │
│ - No external services                                       │
│ - Local embedding model (sentence-transformers)              │
│ - Persists in ~/.basic-memory/chroma_data/                  │
└─────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────┐
│ Cloud Deployment (Multi-tenant)                              │
├─────────────────────────────────────────────────────────────┤
│ PostgreSQL/Neon (data) + ChromaDB server (search)           │
│ - Neon serverless Postgres for persistence                   │
│ - ChromaDB server in Docker container                        │
│ - Optional: OpenAI embeddings for better quality             │
└─────────────────────────────────────────────────────────────┘
```

### Phase 1: ChromaDB Integration (2-3 days)

#### 1. Add ChromaDB Dependency
```toml
# pyproject.toml
dependencies = [
    "chromadb>=0.4.0",
    "sentence-transformers>=2.2.0",  # Local embeddings
]
```

#### 2. Create ChromaSearchBackend
```python
# src/basic_memory/search/chroma_backend.py
from chromadb import PersistentClient
from chromadb.utils import embedding_functions

class ChromaSearchBackend:
    def __init__(
        self,
        persist_directory: Path,
        collection_name: str = "knowledge_base",
        embedding_model: str = "all-MiniLM-L6-v2"
    ):
        """Initialize ChromaDB with local embeddings."""
        self.client = PersistentClient(path=str(persist_directory))

        # Use local sentence-transformers model (no API costs)
        self.embed_fn = embedding_functions.SentenceTransformerEmbeddingFunction(
            model_name=embedding_model
        )

        self.collection = self.client.get_or_create_collection(
            name=collection_name,
            embedding_function=self.embed_fn,
            metadata={"hnsw:space": "cosine"}  # Similarity metric
        )

    async def index_entity(self, entity: Entity):
        """Index entity with automatic embeddings."""
        # Combine title and content for semantic search
        document = self._format_document(entity)

        self.collection.upsert(
            ids=[f"entity_{entity.id}_{entity.project_id}"],
            documents=[document],
            metadatas=[{
                "entity_id": entity.id,
                "project_id": entity.project_id,
                "permalink": entity.permalink,
                "file_path": entity.file_path,
                "entity_type": entity.entity_type,
                "type": "entity",
            }]
        )

    async def search(
        self,
        query_text: str,
        project_id: int,
        limit: int = 10,
        filters: dict = None
    ) -> List[SearchResult]:
        """Semantic search with metadata filtering."""
        where = {"project_id": project_id}
        if filters:
            where.update(filters)

        results = self.collection.query(
            query_texts=[query_text],
            n_results=limit,
            where=where
        )

        return self._format_results(results)
```

#### 3. Update SearchRepository
```python
# src/basic_memory/repository/search_repository.py
class SearchRepository:
    def __init__(
        self,
        session_maker: async_sessionmaker[AsyncSession],
        project_id: int,
        chroma_backend: ChromaSearchBackend
    ):
        self.session_maker = session_maker
        self.project_id = project_id
        self.chroma = chroma_backend

    async def search(
        self,
        search_text: Optional[str] = None,
        permalink: Optional[str] = None,
        # ... other filters
    ) -> List[SearchIndexRow]:
        """Search using ChromaDB for text, SQL for exact lookups."""

        # For exact permalink/pattern matches, use SQL
        if permalink or permalink_match:
            return await self._sql_exact_search(...)

        # For text search, use ChromaDB semantic search
        if search_text:
            results = await self.chroma.search(
                query_text=search_text,
                project_id=self.project_id,
                limit=limit,
                filters=self._build_filters(types, after_date, ...)
            )
            return results

        # Fallback to listing all
        return await self._list_entities(...)
```

#### 4. Update SearchService
```python
# src/basic_memory/services/search_service.py
class SearchService:
    def __init__(
        self,
        search_repository: SearchRepository,
        entity_repository: EntityRepository,
        file_service: FileService,
        chroma_backend: ChromaSearchBackend,
    ):
        self.repository = search_repository
        self.entity_repository = entity_repository
        self.file_service = file_service
        self.chroma = chroma_backend

    async def index_entity(self, entity: Entity):
        """Index entity in ChromaDB."""
        if entity.is_markdown:
            await self._index_entity_markdown(entity)
        else:
            await self._index_entity_file(entity)

    async def _index_entity_markdown(self, entity: Entity):
        """Index markdown entity with full content."""
        # Index entity
        await self.chroma.index_entity(entity)

        # Index observations (as separate documents)
        for obs in entity.observations:
            await self.chroma.index_observation(obs, entity)

        # Index relations (metadata only)
        for rel in entity.outgoing_relations:
            await self.chroma.index_relation(rel, entity)
```

### Phase 2: PostgreSQL Support (1 day)

#### 1. Add PostgreSQL Database Type
```python
# src/basic_memory/db.py
class DatabaseType(Enum):
    MEMORY = auto()
    FILESYSTEM = auto()
    POSTGRESQL = auto()  # NEW

    @classmethod
    def get_db_url(cls, db_path_or_url: str, db_type: "DatabaseType") -> str:
        if db_type == cls.POSTGRESQL:
            return db_path_or_url  # Neon connection string
        elif db_type == cls.MEMORY:
            return "sqlite+aiosqlite://"
        return f"sqlite+aiosqlite:///{db_path_or_url}"
```

#### 2. Update Connection Handling
```python
def _create_engine_and_session(...):
    db_url = DatabaseType.get_db_url(db_path_or_url, db_type)

    if db_type == DatabaseType.POSTGRESQL:
        # Use asyncpg driver for Postgres
        engine = create_async_engine(
            db_url,
            pool_size=10,
            max_overflow=20,
            pool_pre_ping=True,  # Health checks
        )
    else:
        # SQLite configuration
        engine = create_async_engine(db_url, connect_args=connect_args)

        # Only configure SQLite-specific settings for SQLite
        if db_type != DatabaseType.MEMORY:
            @event.listens_for(engine.sync_engine, "connect")
            def enable_wal_mode(dbapi_conn, connection_record):
                _configure_sqlite_connection(dbapi_conn, enable_wal=True)

    return engine, async_sessionmaker(engine, expire_on_commit=False)
```

#### 3. Remove SQLite-Specific Code
```python
# Remove from scoped_session context manager:
# await session.execute(text("PRAGMA foreign_keys=ON"))  # DELETE

# PostgreSQL handles foreign keys by default
```

### Phase 3: Migration & Testing (1-2 days)

#### 1. Create Migration Script
```python
# scripts/migrate_to_chromadb.py
async def migrate_fts5_to_chromadb():
    """One-time migration from FTS5 to ChromaDB."""
    # 1. Read all entities from database
    entities = await entity_repository.find_all()

    # 2. Index in ChromaDB
    for entity in entities:
        await search_service.index_entity(entity)

    # 3. Drop FTS5 table (Alembic migration)
    await session.execute(text("DROP TABLE IF EXISTS search_index"))
```

#### 2. Update Tests
- Replace FTS5 test fixtures with ChromaDB fixtures
- Test semantic search quality
- Test multi-project isolation in ChromaDB
- Benchmark performance vs FTS5

#### 3. Documentation Updates
- Update search documentation
- Add ChromaDB configuration guide
- Document embedding model options
- PostgreSQL deployment guide

### Configuration

```python
# config.py
class BasicMemoryConfig:
    # Database
    database_type: DatabaseType = DatabaseType.FILESYSTEM
    database_path: Path = Path.home() / ".basic-memory" / "memory.db"
    database_url: Optional[str] = None  # For Postgres: postgresql://...

    # Search
    chroma_persist_directory: Path = Path.home() / ".basic-memory" / "chroma_data"
    embedding_model: str = "all-MiniLM-L6-v2"  # Local model
    embedding_provider: str = "local"  # or "openai"
    openai_api_key: Optional[str] = None  # For cloud deployments
```

### Deployment Configurations

#### Local (FOSS)
```yaml
# Default configuration
database_type: FILESYSTEM
database_path: ~/.basic-memory/memory.db
chroma_persist_directory: ~/.basic-memory/chroma_data
embedding_model: all-MiniLM-L6-v2
embedding_provider: local
```

#### Cloud (Docker Compose)
```yaml
services:
  postgres:
    image: postgres:15
    environment:
      POSTGRES_DB: basic_memory
      POSTGRES_PASSWORD: ${DB_PASSWORD}

  chromadb:
    image: chromadb/chroma:latest
    volumes:
      - chroma_data:/chroma/chroma
    environment:
      ALLOW_RESET: true

  app:
    environment:
      DATABASE_TYPE: POSTGRESQL
      DATABASE_URL: postgresql://postgres:${DB_PASSWORD}@postgres/basic_memory
      CHROMA_HOST: chromadb
      CHROMA_PORT: 8000
      EMBEDDING_PROVIDER: local  # or openai
```

## How to Evaluate

### Success Criteria

#### Functional Requirements
- ✅ Semantic search finds related concepts (e.g., "AI" finds "machine learning")
- ✅ Exact permalink/pattern matches work (e.g., `specs/*`)
- ✅ Multi-project isolation maintained
- ✅ All existing search filters work (type, date, metadata)
- ✅ MCP tools continue to work without changes
- ✅ Works with both SQLite and PostgreSQL

#### Performance Requirements
- ✅ Search latency < 200ms for 1000 documents (local embedding)
- ✅ Indexing time comparable to FTS5 (~10 files/sec)
- ✅ Initial sync time not significantly worse than current
- ✅ Memory footprint < 1GB for local deployments

#### Quality Requirements
- ✅ Better search relevance than FTS5 keyword matching
- ✅ Handles typos and word variations
- ✅ Finds semantically similar content

#### Deployment Requirements
- ✅ FOSS: Works out-of-box with no external services
- ✅ Cloud: Integrates with PostgreSQL (Neon)
- ✅ No breaking changes to MCP API
- ✅ Migration script for existing users

### Testing Procedure

#### 1. Unit Tests
```bash
# Test ChromaDB backend
pytest tests/test_chroma_backend.py

# Test search repository with ChromaDB
pytest tests/test_search_repository.py

# Test search service
pytest tests/test_search_service.py
```

#### 2. Integration Tests
```bash
# Test full search workflow
pytest test-int/test_search_integration.py

# Test with PostgreSQL
DATABASE_TYPE=POSTGRESQL pytest test-int/
```

#### 3. Semantic Search Quality Tests
```python
# Test semantic similarity
search("machine learning") should find:
- "neural networks"
- "deep learning"
- "AI algorithms"

search("software architecture") should find:
- "system design"
- "design patterns"
- "microservices"
```

#### 4. Performance Benchmarks
```bash
# Run search benchmarks
pytest test-int/test_search_performance.py -v

# Measure:
- Search latency (should be < 200ms)
- Indexing throughput (should be ~10 files/sec)
- Memory usage (should be < 1GB)
```

#### 5. Migration Testing
```bash
# Test migration from FTS5 to ChromaDB
python scripts/migrate_to_chromadb.py

# Verify all entities indexed
# Verify search results quality
# Verify no data loss
```

### Metrics

**Search Quality:**
- Semantic relevance score (manual evaluation)
- Precision/recall for common queries
- User satisfaction (qualitative)

**Performance:**
- Average search latency (ms)
- P95/P99 search latency
- Indexing throughput (files/sec)
- Memory usage (MB)

**Deployment:**
- Local deployment success rate
- Cloud deployment success rate
- Migration success rate

## Implementation Checklist

### Phase 1: ChromaDB Integration
- [ ] Add ChromaDB and sentence-transformers dependencies
- [ ] Create ChromaSearchBackend class
- [ ] Update SearchRepository to use ChromaDB
- [ ] Update SearchService indexing methods
- [ ] Remove FTS5 table creation code
- [ ] Update search query logic
- [ ] Add ChromaDB configuration to BasicMemoryConfig

### Phase 2: PostgreSQL Support
- [ ] Add DatabaseType.POSTGRESQL enum
- [ ] Update get_db_url() for Postgres connection strings
- [ ] Add asyncpg dependency
- [ ] Update engine creation for Postgres
- [ ] Remove SQLite-specific PRAGMA statements
- [ ] Test with Neon database

### Phase 3: Testing & Migration
- [ ] Write unit tests for ChromaSearchBackend
- [ ] Update search integration tests
- [ ] Add semantic search quality tests
- [ ] Create performance benchmarks
- [ ] Write migration script from FTS5
- [ ] Test migration with existing data
- [ ] Update documentation

### Phase 4: Deployment
- [ ] Update docker-compose.yml for cloud
- [ ] Document local FOSS deployment
- [ ] Document cloud PostgreSQL deployment
- [ ] Create migration guide for users
- [ ] Update MCP tool documentation

## Notes

### Embedding Model Trade-offs

**Local Model: `all-MiniLM-L6-v2`**
- Size: 80MB download
- Speed: ~50ms embedding time
- Dimensions: 384
- Cost: $0
- Quality: Good for general knowledge
- Best for: FOSS deployments

**OpenAI: `text-embedding-3-small`**
- Speed: ~100-200ms (API call)
- Dimensions: 1536
- Cost: ~$0.13 per 1M tokens (~$0.01 per 1000 notes)
- Quality: Excellent
- Best for: Cloud deployments with budget

### ChromaDB Storage

ChromaDB stores data in:
```
~/.basic-memory/chroma_data/
  ├── chroma.sqlite3        # Metadata
  ├── index/                # HNSW indexes
  └── collections/          # Vector data
```

Typical sizes:
- 100 notes: ~5MB
- 1000 notes: ~50MB
- 10000 notes: ~500MB

### Why Not Keep FTS5?

**Considered:** Hybrid approach (FTS5 for SQLite + tsvector for Postgres)
**Rejected because:**
- 2x the code to maintain
- 2x the tests to write
- 2x the bugs to fix
- Inconsistent search behavior between deployments
- ChromaDB provides better search quality anyway

**ChromaDB wins:**
- One implementation for both databases
- Better search quality (semantic!)
- Database-agnostic architecture
- Embedded mode for FOSS (no servers needed)

## implementation

  Proposed Architecture

  Option 1: ChromaDB Only (Simplest)

  class ChromaSearchBackend:
      def __init__(self, path: str, embedding_model: str = "all-MiniLM-L6-v2"):yes
          # For local: embedded client (no server!)
          self.client = chromadb.PersistentClient(path=path)

          # Use local embedding model (no API costs!)
          from chromadb.utils import embedding_functions
          self.embed_fn = embedding_functions.SentenceTransformerEmbeddingFunction(
              model_name=embedding_model
          )

          self.collection = self.client.get_or_create_collection(
              name="knowledge_base",
              embedding_function=self.embed_fn
          )

      async def index_entity(self, entity: Entity):
          # ChromaDB handles embeddings automatically!
          self.collection.upsert(
              ids=[str(entity.id)],
              documents=[f"{entity.title}\n{entity.content}"],
              metadatas=[{
                  "permalink": entity.permalink,
                  "type": entity.entity_type,
                  "file_path": entity.file_path
              }]
          )

      async def search(self, query: str, filters: dict = None):
          # Semantic search with optional metadata filters
          results = self.collection.query(
              query_texts=[query],
              n_results=10,
              where=filters  # e.g., {"type": "note"}
          )
          return results

  Deployment:
  - Local (FOSS): ChromaDB embedded, local embedding model, NO servers
  - Cloud: ChromaDB server OR still embedded (it's just a Python lib!)

  Option 2: Hybrid FTS + ChromaDB (Best UX)

  class HybridSearchBackend:
      def __init__(self):
          self.fts = SQLiteFTS5Backend()    # Fast keyword search
          self.chroma = ChromaSearchBackend()  # Semantic search

      async def search(self, query: str, search_type: str = "auto"):
          if search_type == "exact":
              # User wants exact match: "specs/search-feature"
              return await self.fts.search(query)

          elif search_type == "semantic":
              # User wants related concepts
              return await self.chroma.search(query)

          else:  # "auto"
              # Check if query looks like exact match
              if "/" in query or query.startswith('"'):
                  return await self.fts.search(query)

              # Otherwise use semantic search
              return await self.chroma.search(query)

  Embedding Options

  Option A: Local Model (FREE, FOSS-friendly)

  # Uses sentence-transformers (runs locally)
  # Model: ~100MB download
  # Speed: ~50-100ms for embedding
  # Cost: $0

  from chromadb.utils import embedding_functions
  embed_fn = embedding_functions.SentenceTransformerEmbeddingFunction(
      model_name="all-MiniLM-L6-v2"  # Fast, accurate, free
  )

  Option B: OpenAI Embeddings (Cloud only)

  # For cloud users who want best quality
  # Model: text-embedding-3-small
  # Speed: ~100-200ms via API
  # Cost: ~$0.13 per 1M tokens (~$0.01 per 1000 notes)

  embed_fn = embedding_functions.OpenAIEmbeddingFunction(
      api_key="...",
      model_name="text-embedding-3-small"
  )

  Performance Comparison

  Local embedding model: all-MiniLM-L6-v2
  Embedding time: ~50ms per note
  Search time: ~100ms for 1000 notes
  Memory: ~500MB (model + ChromaDB)
  Cost: $0
  Quality: Good (384 dimensions)

  OpenAI embeddings: text-embedding-3-small
  Embedding time: ~100-200ms per note (API call)
  Search time: ~50ms for 1000 notes
  Cost: ~$0.01 per 1000 notes
  Quality: Excellent (1536 dimensions)

  My Recommendation: ChromaDB with Local Embeddings

  Here's the plan:

  Phase 1: Local ChromaDB (1-2 days)

  # FOSS version
  - SQLite for data persistence
  - ChromaDB embedded for semantic search
  - Local embedding model (no API costs)
  - NO external services required

  Benefits:
  - ✅ Same deployment as current (just Python package)
  - ✅ Semantic search for better UX
  - ✅ Free embeddings with local model
  - ✅ No servers needed

  Phase 2: Postgres + ChromaDB Cloud (1-2 days)

  # Cloud version
  - Postgres for data persistence
  - ChromaDB server for semantic search
  - OpenAI embeddings (higher quality)
  - OR keep local embeddings (cheaper)

  Phase 3: Hybrid Search (optional, 1 day)

  # Add FTS for exact matches alongside ChromaDB
  - Quick keyword search when needed
  - Semantic search for exploration
  - Best of both worlds

  Code Estimate

  Just ChromaDB (replacing FTS5):
  - Remove FTS5 code: 2 hours
  - Add ChromaDB backend: 4 hours
  - Update search service: 2 hours
  - Testing: 4 hours
  - Total: 1.5 days

  ChromaDB + Postgres migration:
  - Add Postgres support: 4 hours
  - Test with Neon: 2 hours
  - Total: +0.75 days

  Grand total: 2-3 days for complete migration

  The Kicker

  ChromaDB solves BOTH problems:
  1. ✅ Works with SQLite AND Postgres (it's separate!)
  2. ✅ No server needed for local (embedded mode)
  3. ✅ Better search than FTS5 (semantic!)
  4. ✅ One implementation for both deployments

  Want me to prototype this? I can show you:
  1. ChromaDB embedded with local embeddings
  2. Example searches showing semantic matching
  3. Performance benchmarks
  4. Migration from FTS5


## Observations

- [problem] SQLite FTS5 and PostgreSQL tsvector are incompatible architectures requiring dual implementation #database-compatibility
- [problem] Cloud deployments lose database on container restart requiring full re-sync #persistence
- [solution] ChromaDB provides database-agnostic semantic search eliminating dual implementation #architecture
- [advantage] Semantic search finds related concepts beyond keyword matching improving UX #search-quality
- [deployment] Embedded ChromaDB requires no external services for FOSS #simplicity
- [migration] Moving to PostgreSQL solves cloud persistence issues #cloud-architecture
- [performance] Local embedding models provide good quality at zero cost #cost-optimization
- [trade-off] Embedding generation adds ~50ms latency vs instant FTS5 indexing #performance
- [benefit] Single search codebase reduces maintenance burden and test coverage needs #maintainability

## Prior Art / References

### Community Fork: manuelbliemel/basic-memory (feature/vector-search)

**Repository**: https://github.com/manuelbliemel/basic-memory/tree/feature/vector-search

**Key Implementation Details**:

**Vector Database**: ChromaDB (same as our approach!)

**Embedding Models**:
- Local: `all-MiniLM-L6-v2` (default, 384 dims) - same model we planned
- Also supports: `all-mpnet-base-v2`, `paraphrase-MiniLM-L6-v2`, `multi-qa-MiniLM-L6-cos-v1`
- OpenAI: `text-embedding-ada-002`, `text-embedding-3-small`, `text-embedding-3-large`

**Chunking Strategy** (interesting - we didn't consider this):
- Chunk Size: 500 characters
- Chunk Overlap: 50 characters
- Breaks documents into smaller pieces for better semantic search

**Search Strategies**:
1. `fuzzy_only` (default) - FTS5 only
2. `vector_only` - ChromaDB only
3. `hybrid` (recommended) - Both FTS5 + ChromaDB
4. `fuzzy_primary` - FTS5 first, ChromaDB fallback
5. `vector_primary` - ChromaDB first, FTS5 fallback

**Configuration**:
- Similarity Threshold: 0.1
- Max Results: 5
- Storage: `~/.basic-memory/chroma/`
- Config: `~/.basic-memory/config.json`

**Key Differences from Our Approach**:

| Aspect | Their Approach | Our Approach |
|--------|---------------|--------------|
| FTS5 | Keep FTS5 + add ChromaDB | Remove FTS5, use SQL for exact lookups |
| Search Strategy | 5 configurable strategies | Smart routing (automatic) |
| Document Processing | Chunk into 500-char pieces | Index full documents |
| Hybrid Mode | Run both, merge, dedupe | Route to best backend |
| Configuration | User-configurable strategy | Automatic based on query type |

**What We Can Learn**:

1. **Chunking**: Breaking documents into 500-character chunks with 50-char overlap may improve semantic search quality for long documents
   - Pro: Better granularity for semantic matching
   - Con: More vectors to store and search
   - Consider: Optional chunking for large documents (>2000 chars)

2. **Configurable Strategies**: Allowing users to choose search strategy provides flexibility
   - Pro: Power users can tune behavior
   - Con: More complexity, most users won't configure
   - Consider: Default to smart routing, allow override via config

3. **Similarity Threshold**: They use 0.1 as default
   - Consider: Benchmark different thresholds for quality

4. **Storage Location**: `~/.basic-memory/chroma/` matches our planned `chroma_data/` approach

**Potential Collaboration**:
- Their implementation is nearly complete as a fork
- Could potentially merge their work or use as reference implementation
- Their chunking strategy could be valuable addition to our approach

## Relations

- implements [[SPEC-11 Basic Memory API Performance Optimization]]
- relates_to [[Performance Optimizations Documentation]]
- enables [[PostgreSQL Migration]]
- improves_on [[SQLite FTS5 Search]]
- references [[manuelbliemel/basic-memory feature/vector-search fork]]

```

--------------------------------------------------------------------------------
/tests/api/test_knowledge_router.py:
--------------------------------------------------------------------------------

```python
"""Tests for knowledge graph API routes."""

from urllib.parse import quote

import pytest
from httpx import AsyncClient

from basic_memory.schemas import (
    Entity,
    EntityResponse,
)
from basic_memory.schemas.search import SearchItemType, SearchResponse
from basic_memory.utils import normalize_newlines


@pytest.mark.asyncio
async def test_create_entity(client: AsyncClient, file_service, project_url):
    """Should create entity successfully."""

    data = {
        "title": "TestEntity",
        "folder": "test",
        "entity_type": "test",
        "content": "TestContent",
        "project": "Test Project Context",
    }
    # Create an entity
    print(f"Requesting with data: {data}")
    # Use the permalink version of the project name in the path
    response = await client.post(f"{project_url}/knowledge/entities", json=data)
    # Print response for debugging
    print(f"Response status: {response.status_code}")
    print(f"Response content: {response.text}")
    # Verify creation
    assert response.status_code == 200
    entity = EntityResponse.model_validate(response.json())

    assert entity.permalink == "test/test-entity"
    assert entity.file_path == "test/TestEntity.md"
    assert entity.entity_type == data["entity_type"]
    assert entity.content_type == "text/markdown"

    # Verify file has new content but preserved metadata
    file_path = file_service.get_entity_path(entity)
    file_content, _ = await file_service.read_file(file_path)

    assert data["content"] in file_content


@pytest.mark.asyncio
async def test_create_entity_observations_relations(client: AsyncClient, file_service, project_url):
    """Should create entity successfully."""

    data = {
        "title": "TestEntity",
        "folder": "test",
        "content": """
# TestContent

## Observations
- [note] This is notable #tag1 (testing)
- related to [[SomeOtherThing]]
""",
    }
    # Create an entity
    response = await client.post(f"{project_url}/knowledge/entities", json=data)
    # Verify creation
    assert response.status_code == 200
    entity = EntityResponse.model_validate(response.json())

    assert entity.permalink == "test/test-entity"
    assert entity.file_path == "test/TestEntity.md"
    assert entity.entity_type == "note"
    assert entity.content_type == "text/markdown"

    assert len(entity.observations) == 1
    assert entity.observations[0].category == "note"
    assert entity.observations[0].content == "This is notable #tag1"
    assert entity.observations[0].tags == ["tag1"]
    assert entity.observations[0].context == "testing"

    assert len(entity.relations) == 1
    assert entity.relations[0].relation_type == "related to"
    assert entity.relations[0].from_id == "test/test-entity"
    assert entity.relations[0].to_id is None

    # Verify file has new content but preserved metadata
    file_path = file_service.get_entity_path(entity)
    file_content, _ = await file_service.read_file(file_path)

    assert data["content"].strip() in file_content


@pytest.mark.asyncio
async def test_relation_resolution_after_creation(client: AsyncClient, project_url):
    """Test that relation resolution works after creating entities and handles exceptions gracefully."""

    # Create first entity with unresolved relation
    entity1_data = {
        "title": "EntityOne",
        "folder": "test",
        "entity_type": "test",
        "content": "This entity references [[EntityTwo]]",
    }
    response1 = await client.put(
        f"{project_url}/knowledge/entities/test/entity-one", json=entity1_data
    )
    assert response1.status_code == 201
    entity1 = response1.json()

    # Verify relation exists but is unresolved
    assert len(entity1["relations"]) == 1
    assert entity1["relations"][0]["to_id"] is None
    assert entity1["relations"][0]["to_name"] == "EntityTwo"

    # Create the referenced entity
    entity2_data = {
        "title": "EntityTwo",
        "folder": "test",
        "entity_type": "test",
        "content": "This is the referenced entity",
    }
    response2 = await client.put(
        f"{project_url}/knowledge/entities/test/entity-two", json=entity2_data
    )
    assert response2.status_code == 201

    # Verify the original entity's relation was resolved
    response_check = await client.get(f"{project_url}/knowledge/entities/test/entity-one")
    assert response_check.status_code == 200
    updated_entity1 = response_check.json()

    # The relation should now be resolved via the automatic resolution after entity creation
    resolved_relations = [r for r in updated_entity1["relations"] if r["to_id"] is not None]
    assert (
        len(resolved_relations) >= 0
    )  # May or may not be resolved immediately depending on timing


@pytest.mark.asyncio
async def test_relation_resolution_exception_handling(client: AsyncClient, project_url):
    """Test that relation resolution exceptions are handled gracefully."""
    import unittest.mock

    # Create an entity that would trigger relation resolution
    entity_data = {
        "title": "ExceptionTest",
        "folder": "test",
        "entity_type": "test",
        "content": "This entity has a [[Relation]]",
    }

    # Mock the sync service to raise an exception during relation resolution
    # We'll patch at the module level where it's imported
    with unittest.mock.patch(
        "basic_memory.api.routers.knowledge_router.SyncServiceDep",
        side_effect=lambda: unittest.mock.AsyncMock(),
    ) as mock_sync_service_dep:
        # Configure the mock sync service to raise an exception
        mock_sync_service = unittest.mock.AsyncMock()
        mock_sync_service.resolve_relations.side_effect = Exception("Sync service failed")
        mock_sync_service_dep.return_value = mock_sync_service

        # This should still succeed even though relation resolution fails
        response = await client.put(
            f"{project_url}/knowledge/entities/test/exception-test", json=entity_data
        )
        assert response.status_code == 201
        entity = response.json()

        # Verify the entity was still created successfully
        assert entity["title"] == "ExceptionTest"
        assert len(entity["relations"]) == 1  # Relation should still be there, just unresolved


@pytest.mark.asyncio
async def test_get_entity_by_permalink(client: AsyncClient, project_url):
    """Should retrieve an entity by path ID."""
    # First create an entity
    data = {"title": "TestEntity", "folder": "test", "entity_type": "test"}
    response = await client.post(f"{project_url}/knowledge/entities", json=data)
    assert response.status_code == 200
    data = response.json()

    # Now get it by permalink
    permalink = data["permalink"]
    response = await client.get(f"{project_url}/knowledge/entities/{permalink}")

    # Verify retrieval
    assert response.status_code == 200
    entity = response.json()
    assert entity["title"] == "TestEntity"
    assert entity["file_path"] == "test/TestEntity.md"
    assert entity["entity_type"] == "test"
    assert entity["permalink"] == "test/test-entity"


@pytest.mark.asyncio
async def test_get_entity_by_file_path(client: AsyncClient, project_url):
    """Should retrieve an entity by path ID."""
    # First create an entity
    data = {"title": "TestEntity", "folder": "test", "entity_type": "test"}
    response = await client.post(f"{project_url}/knowledge/entities", json=data)
    assert response.status_code == 200
    data = response.json()

    # Now get it by path
    file_path = data["file_path"]
    response = await client.get(f"{project_url}/knowledge/entities/{file_path}")

    # Verify retrieval
    assert response.status_code == 200
    entity = response.json()
    assert entity["title"] == "TestEntity"
    assert entity["file_path"] == "test/TestEntity.md"
    assert entity["entity_type"] == "test"
    assert entity["permalink"] == "test/test-entity"


@pytest.mark.asyncio
async def test_get_entities(client: AsyncClient, project_url):
    """Should open multiple entities by path IDs."""
    # Create a few entities with different names
    await client.post(
        f"{project_url}/knowledge/entities",
        json={"title": "AlphaTest", "folder": "", "entity_type": "test"},
    )
    await client.post(
        f"{project_url}/knowledge/entities",
        json={"title": "BetaTest", "folder": "", "entity_type": "test"},
    )

    # Open nodes by path IDs
    response = await client.get(
        f"{project_url}/knowledge/entities?permalink=alpha-test&permalink=beta-test",
    )

    # Verify results
    assert response.status_code == 200
    data = response.json()
    assert len(data["entities"]) == 2

    entity_0 = data["entities"][0]
    assert entity_0["title"] == "AlphaTest"
    assert entity_0["file_path"] == "AlphaTest.md"
    assert entity_0["entity_type"] == "test"
    assert entity_0["permalink"] == "alpha-test"

    entity_1 = data["entities"][1]
    assert entity_1["title"] == "BetaTest"
    assert entity_1["file_path"] == "BetaTest.md"
    assert entity_1["entity_type"] == "test"
    assert entity_1["permalink"] == "beta-test"


@pytest.mark.asyncio
async def test_delete_entity(client: AsyncClient, project_url):
    """Test DELETE /knowledge/entities with path ID."""
    # Create test entity
    entity_data = {"file_path": "TestEntity", "entity_type": "test"}
    await client.post(f"{project_url}/knowledge/entities", json=entity_data)

    # Test deletion
    response = await client.post(
        f"{project_url}/knowledge/entities/delete", json={"permalinks": ["test-entity"]}
    )
    assert response.status_code == 200
    assert response.json() == {"deleted": True}

    # Verify entity is gone
    permalink = quote("test/TestEntity")
    response = await client.get(f"{project_url}/knowledge/entities/{permalink}")
    assert response.status_code == 404


@pytest.mark.asyncio
async def test_delete_single_entity(client: AsyncClient, project_url):
    """Test DELETE /knowledge/entities with path ID."""
    # Create test entity
    entity_data = {"title": "TestEntity", "folder": "", "entity_type": "test"}
    await client.post(f"{project_url}/knowledge/entities", json=entity_data)

    # Test deletion
    response = await client.delete(f"{project_url}/knowledge/entities/test-entity")
    assert response.status_code == 200
    assert response.json() == {"deleted": True}

    # Verify entity is gone
    permalink = quote("test/TestEntity")
    response = await client.get(f"{project_url}/knowledge/entities/{permalink}")
    assert response.status_code == 404


@pytest.mark.asyncio
async def test_delete_single_entity_by_title(client: AsyncClient, project_url):
    """Test DELETE /knowledge/entities with file path."""
    # Create test entity
    entity_data = {"title": "TestEntity", "folder": "", "entity_type": "test"}
    response = await client.post(f"{project_url}/knowledge/entities", json=entity_data)
    assert response.status_code == 200
    data = response.json()

    # Test deletion
    response = await client.delete(f"{project_url}/knowledge/entities/TestEntity")
    assert response.status_code == 200
    assert response.json() == {"deleted": True}

    # Verify entity is gone
    file_path = quote(data["file_path"])
    response = await client.get(f"{project_url}/knowledge/entities/{file_path}")
    assert response.status_code == 404


@pytest.mark.asyncio
async def test_delete_single_entity_not_found(client: AsyncClient, project_url):
    """Test DELETE /knowledge/entities with path ID."""

    # Test deletion
    response = await client.delete(f"{project_url}/knowledge/entities/test-not-found")
    assert response.status_code == 200
    assert response.json() == {"deleted": False}


@pytest.mark.asyncio
async def test_delete_entity_bulk(client: AsyncClient, project_url):
    """Test bulk entity deletion using path IDs."""
    # Create test entities
    await client.post(
        f"{project_url}/knowledge/entities", json={"file_path": "Entity1", "entity_type": "test"}
    )
    await client.post(
        f"{project_url}/knowledge/entities", json={"file_path": "Entity2", "entity_type": "test"}
    )

    # Test deletion
    response = await client.post(
        f"{project_url}/knowledge/entities/delete", json={"permalinks": ["Entity1", "Entity2"]}
    )
    assert response.status_code == 200
    assert response.json() == {"deleted": True}

    # Verify entities are gone
    for name in ["Entity1", "Entity2"]:
        permalink = quote(f"{name}")
        response = await client.get(f"{project_url}/knowledge/entities/{permalink}")
        assert response.status_code == 404


@pytest.mark.asyncio
async def test_delete_nonexistent_entity(client: AsyncClient, project_url):
    """Test deleting a nonexistent entity by path ID."""
    response = await client.post(
        f"{project_url}/knowledge/entities/delete", json={"permalinks": ["non_existent"]}
    )
    assert response.status_code == 200
    assert response.json() == {"deleted": True}


@pytest.mark.asyncio
async def test_entity_indexing(client: AsyncClient, project_url):
    """Test entity creation includes search indexing."""
    # Create entity
    response = await client.post(
        f"{project_url}/knowledge/entities",
        json={
            "title": "SearchTest",
            "folder": "",
            "entity_type": "test",
            "observations": ["Unique searchable observation"],
        },
    )
    assert response.status_code == 200

    # Verify it's searchable
    search_response = await client.post(
        f"{project_url}/search/",
        json={"text": "search", "entity_types": [SearchItemType.ENTITY.value]},
    )
    assert search_response.status_code == 200
    search_result = SearchResponse.model_validate(search_response.json())
    assert len(search_result.results) == 1
    assert search_result.results[0].permalink == "search-test"
    assert search_result.results[0].type == SearchItemType.ENTITY.value


@pytest.mark.asyncio
async def test_entity_delete_indexing(client: AsyncClient, project_url):
    """Test deleted entities are removed from search index."""

    # Create entity
    response = await client.post(
        f"{project_url}/knowledge/entities",
        json={
            "title": "DeleteTest",
            "folder": "",
            "entity_type": "test",
            "observations": ["Searchable observation that should be removed"],
        },
    )
    assert response.status_code == 200
    entity = response.json()

    # Verify it's initially searchable
    search_response = await client.post(
        f"{project_url}/search/",
        json={"text": "delete", "entity_types": [SearchItemType.ENTITY.value]},
    )
    search_result = SearchResponse.model_validate(search_response.json())
    assert len(search_result.results) == 1

    # Delete entity
    delete_response = await client.post(
        f"{project_url}/knowledge/entities/delete", json={"permalinks": [entity["permalink"]]}
    )
    assert delete_response.status_code == 200

    # Verify it's no longer searchable
    search_response = await client.post(
        f"{project_url}/search/", json={"text": "delete", "types": [SearchItemType.ENTITY.value]}
    )
    search_result = SearchResponse.model_validate(search_response.json())
    assert len(search_result.results) == 0


@pytest.mark.asyncio
async def test_update_entity_basic(client: AsyncClient, project_url):
    """Test basic entity field updates."""
    # Create initial entity
    response = await client.post(
        f"{project_url}/knowledge/entities",
        json={
            "title": "test",
            "folder": "",
            "entity_type": "test",
            "content": "Initial summary",
            "entity_metadata": {"status": "draft"},
        },
    )
    entity_response = response.json()

    # Update fields
    entity = Entity(**entity_response, folder="")
    entity.entity_metadata["status"] = "final"
    entity.content = "Updated summary"

    response = await client.put(
        f"{project_url}/knowledge/entities/{entity.permalink}", json=entity.model_dump()
    )
    assert response.status_code == 200
    updated = response.json()

    # Verify updates
    assert updated["entity_metadata"]["status"] == "final"  # Preserved

    response = await client.get(f"{project_url}/resource/{updated['permalink']}?content=true")

    # raw markdown content
    fetched = response.text
    assert "Updated summary" in fetched


@pytest.mark.asyncio
async def test_update_entity_content(client: AsyncClient, project_url):
    """Test updating content for different entity types."""
    # Create a note entity
    response = await client.post(
        f"{project_url}/knowledge/entities",
        json={"title": "test-note", "folder": "", "entity_type": "note", "summary": "Test note"},
    )
    note = response.json()

    # Update fields
    entity = Entity(**note, folder="")
    entity.content = "# Updated Note\n\nNew content."

    response = await client.put(
        f"{project_url}/knowledge/entities/{note['permalink']}", json=entity.model_dump()
    )
    assert response.status_code == 200
    updated = response.json()

    # Verify through get request to check file
    response = await client.get(f"{project_url}/resource/{updated['permalink']}?content=true")

    # raw markdown content
    fetched = response.text
    assert "# Updated Note" in fetched
    assert "New content" in fetched


@pytest.mark.asyncio
async def test_update_entity_type_conversion(client: AsyncClient, project_url):
    """Test converting between note and knowledge types."""
    # Create a note
    note_data = {
        "title": "test-note",
        "folder": "",
        "entity_type": "note",
        "summary": "Test note",
        "content": "# Test Note\n\nInitial content.",
    }
    response = await client.post(f"{project_url}/knowledge/entities", json=note_data)
    note = response.json()

    # Update fields
    entity = Entity(**note, folder="")
    entity.entity_type = "test"

    response = await client.put(
        f"{project_url}/knowledge/entities/{note['permalink']}", json=entity.model_dump()
    )
    assert response.status_code == 200
    updated = response.json()

    # Verify conversion
    assert updated["entity_type"] == "test"

    # Get latest to verify file format
    response = await client.get(f"{project_url}/knowledge/entities/{updated['permalink']}")
    knowledge = response.json()
    assert knowledge.get("content") is None


@pytest.mark.asyncio
async def test_update_entity_metadata(client: AsyncClient, project_url):
    """Test updating entity metadata."""
    # Create entity
    data = {
        "title": "test",
        "folder": "",
        "entity_type": "test",
        "entity_metadata": {"status": "draft"},
    }
    response = await client.post(f"{project_url}/knowledge/entities", json=data)
    entity_response = response.json()

    # Update fields
    entity = Entity(**entity_response, folder="")
    entity.entity_metadata["status"] = "final"
    entity.entity_metadata["reviewed"] = True

    # Update metadata
    response = await client.put(
        f"{project_url}/knowledge/entities/{entity.permalink}", json=entity.model_dump()
    )
    assert response.status_code == 200
    updated = response.json()

    # Verify metadata was merged, not replaced
    assert updated["entity_metadata"]["status"] == "final"
    assert updated["entity_metadata"]["reviewed"] in (True, "True")


@pytest.mark.asyncio
async def test_update_entity_not_found_does_create(client: AsyncClient, project_url):
    """Test updating non-existent entity does a create"""

    data = {
        "title": "nonexistent",
        "folder": "",
        "entity_type": "test",
        "observations": ["First observation", "Second observation"],
    }
    entity = Entity(**data)
    response = await client.put(
        f"{project_url}/knowledge/entities/nonexistent", json=entity.model_dump()
    )
    assert response.status_code == 201


@pytest.mark.asyncio
async def test_update_entity_incorrect_permalink(client: AsyncClient, project_url):
    """Test updating non-existent entity does a create"""

    data = {
        "title": "Test Entity",
        "folder": "",
        "entity_type": "test",
        "observations": ["First observation", "Second observation"],
    }
    entity = Entity(**data)
    response = await client.put(
        f"{project_url}/knowledge/entities/nonexistent", json=entity.model_dump()
    )
    assert response.status_code == 400


@pytest.mark.asyncio
async def test_update_entity_search_index(client: AsyncClient, project_url):
    """Test search index is updated after entity changes."""
    # Create entity
    data = {
        "title": "test",
        "folder": "",
        "entity_type": "test",
        "content": "Initial searchable content",
    }
    response = await client.post(f"{project_url}/knowledge/entities", json=data)
    entity_response = response.json()

    # Update fields
    entity = Entity(**entity_response, folder="")
    entity.content = "Updated with unique sphinx marker"

    response = await client.put(
        f"{project_url}/knowledge/entities/{entity.permalink}", json=entity.model_dump()
    )
    assert response.status_code == 200

    # Search should find new content
    search_response = await client.post(
        f"{project_url}/search/",
        json={"text": "sphinx marker", "entity_types": [SearchItemType.ENTITY.value]},
    )
    results = search_response.json()["results"]
    assert len(results) == 1
    assert results[0]["permalink"] == entity.permalink


# PATCH edit entity endpoint tests


@pytest.mark.asyncio
async def test_edit_entity_append(client: AsyncClient, project_url):
    """Test appending content to an entity via PATCH endpoint."""
    # Create test entity
    response = await client.post(
        f"{project_url}/knowledge/entities",
        json={
            "title": "Test Note",
            "folder": "test",
            "entity_type": "note",
            "content": "Original content",
        },
    )
    assert response.status_code == 200
    entity = response.json()

    # Edit entity with append operation
    response = await client.patch(
        f"{project_url}/knowledge/entities/{entity['permalink']}",
        json={"operation": "append", "content": "Appended content"},
    )
    if response.status_code != 200:
        print(f"PATCH failed with status {response.status_code}")
        print(f"Response content: {response.text}")
    assert response.status_code == 200
    updated = response.json()

    # Verify content was appended by reading the file
    response = await client.get(f"{project_url}/resource/{updated['permalink']}?content=true")
    file_content = response.text
    assert "Original content" in file_content
    assert "Appended content" in file_content
    assert file_content.index("Original content") < file_content.index("Appended content")


@pytest.mark.asyncio
async def test_edit_entity_prepend(client: AsyncClient, project_url):
    """Test prepending content to an entity via PATCH endpoint."""
    # Create test entity
    response = await client.post(
        f"{project_url}/knowledge/entities",
        json={
            "title": "Test Note",
            "folder": "test",
            "entity_type": "note",
            "content": "Original content",
        },
    )
    assert response.status_code == 200
    entity = response.json()

    # Edit entity with prepend operation
    response = await client.patch(
        f"{project_url}/knowledge/entities/{entity['permalink']}",
        json={"operation": "prepend", "content": "Prepended content"},
    )
    if response.status_code != 200:
        print(f"PATCH prepend failed with status {response.status_code}")
        print(f"Response content: {response.text}")
    assert response.status_code == 200
    updated = response.json()

    # Verify the entire file content structure
    response = await client.get(f"{project_url}/resource/{updated['permalink']}?content=true")
    file_content = response.text

    # Expected content with frontmatter preserved and content prepended to body
    expected_content = normalize_newlines("""---
title: Test Note
type: note
permalink: test/test-note
---

Prepended content
Original content""")

    assert file_content.strip() == expected_content.strip()


@pytest.mark.asyncio
async def test_edit_entity_find_replace(client: AsyncClient, project_url):
    """Test find and replace operation via PATCH endpoint."""
    # Create test entity
    response = await client.post(
        f"{project_url}/knowledge/entities",
        json={
            "title": "Test Note",
            "folder": "test",
            "entity_type": "note",
            "content": "This is old content that needs updating",
        },
    )
    assert response.status_code == 200
    entity = response.json()

    # Edit entity with find_replace operation
    response = await client.patch(
        f"{project_url}/knowledge/entities/{entity['permalink']}",
        json={"operation": "find_replace", "content": "new content", "find_text": "old content"},
    )
    assert response.status_code == 200
    updated = response.json()

    # Verify content was replaced
    response = await client.get(f"{project_url}/resource/{updated['permalink']}?content=true")
    file_content = response.text
    assert "old content" not in file_content
    assert "This is new content that needs updating" in file_content


@pytest.mark.asyncio
async def test_edit_entity_find_replace_with_expected_replacements(
    client: AsyncClient, project_url
):
    """Test find and replace with expected_replacements parameter."""
    # Create test entity with repeated text
    response = await client.post(
        f"{project_url}/knowledge/entities",
        json={
            "title": "Sample Note",
            "folder": "docs",
            "entity_type": "note",
            "content": "The word banana appears here. Another banana word here.",
        },
    )
    assert response.status_code == 200
    entity = response.json()

    # Edit entity with find_replace operation, expecting 2 replacements
    response = await client.patch(
        f"{project_url}/knowledge/entities/{entity['permalink']}",
        json={
            "operation": "find_replace",
            "content": "apple",
            "find_text": "banana",
            "expected_replacements": 2,
        },
    )
    assert response.status_code == 200
    updated = response.json()

    # Verify both instances were replaced
    response = await client.get(f"{project_url}/resource/{updated['permalink']}?content=true")
    file_content = response.text
    assert "The word apple appears here. Another apple word here." in file_content


@pytest.mark.asyncio
async def test_edit_entity_replace_section(client: AsyncClient, project_url):
    """Test replacing a section via PATCH endpoint."""
    # Create test entity with sections
    content = """# Main Title

## Section 1
Original section 1 content

## Section 2
Original section 2 content"""

    response = await client.post(
        f"{project_url}/knowledge/entities",
        json={
            "title": "Sample Note",
            "folder": "docs",
            "entity_type": "note",
            "content": content,
        },
    )
    assert response.status_code == 200
    entity = response.json()

    # Edit entity with replace_section operation
    response = await client.patch(
        f"{project_url}/knowledge/entities/{entity['permalink']}",
        json={
            "operation": "replace_section",
            "content": "New section 1 content",
            "section": "## Section 1",
        },
    )
    assert response.status_code == 200
    updated = response.json()

    # Verify section was replaced
    response = await client.get(f"{project_url}/resource/{updated['permalink']}?content=true")
    file_content = response.text
    assert "New section 1 content" in file_content
    assert "Original section 1 content" not in file_content
    assert "Original section 2 content" in file_content  # Other sections preserved


@pytest.mark.asyncio
async def test_edit_entity_not_found(client: AsyncClient, project_url):
    """Test editing a non-existent entity returns 400."""
    response = await client.patch(
        f"{project_url}/knowledge/entities/non-existent",
        json={"operation": "append", "content": "content"},
    )
    assert response.status_code == 400
    assert "Entity not found" in response.json()["detail"]


@pytest.mark.asyncio
async def test_edit_entity_invalid_operation(client: AsyncClient, project_url):
    """Test editing with invalid operation returns 400."""
    # Create test entity
    response = await client.post(
        f"{project_url}/knowledge/entities",
        json={
            "title": "Test Note",
            "folder": "test",
            "entity_type": "note",
            "content": "Original content",
        },
    )
    assert response.status_code == 200
    entity = response.json()

    # Try invalid operation
    response = await client.patch(
        f"{project_url}/knowledge/entities/{entity['permalink']}",
        json={"operation": "invalid_operation", "content": "content"},
    )
    assert response.status_code == 422
    assert "invalid_operation" in response.json()["detail"][0]["input"]


@pytest.mark.asyncio
async def test_edit_entity_find_replace_missing_find_text(client: AsyncClient, project_url):
    """Test find_replace without find_text returns 400."""
    # Create test entity
    response = await client.post(
        f"{project_url}/knowledge/entities",
        json={
            "title": "Test Note",
            "folder": "test",
            "entity_type": "note",
            "content": "Original content",
        },
    )
    assert response.status_code == 200
    entity = response.json()

    # Try find_replace without find_text
    response = await client.patch(
        f"{project_url}/knowledge/entities/{entity['permalink']}",
        json={"operation": "find_replace", "content": "new content"},
    )
    assert response.status_code == 400
    assert "find_text is required" in response.json()["detail"]


@pytest.mark.asyncio
async def test_edit_entity_replace_section_missing_section(client: AsyncClient, project_url):
    """Test replace_section without section parameter returns 400."""
    # Create test entity
    response = await client.post(
        f"{project_url}/knowledge/entities",
        json={
            "title": "Test Note",
            "folder": "test",
            "entity_type": "note",
            "content": "Original content",
        },
    )
    assert response.status_code == 200
    entity = response.json()

    # Try replace_section without section
    response = await client.patch(
        f"{project_url}/knowledge/entities/{entity['permalink']}",
        json={"operation": "replace_section", "content": "new content"},
    )
    assert response.status_code == 400
    assert "section is required" in response.json()["detail"]


@pytest.mark.asyncio
async def test_edit_entity_find_replace_not_found(client: AsyncClient, project_url):
    """Test find_replace when text is not found returns 400."""
    # Create test entity
    response = await client.post(
        f"{project_url}/knowledge/entities",
        json={
            "title": "Test Note",
            "folder": "test",
            "entity_type": "note",
            "content": "This is some content",
        },
    )
    assert response.status_code == 200
    entity = response.json()

    # Try to replace text that doesn't exist
    response = await client.patch(
        f"{project_url}/knowledge/entities/{entity['permalink']}",
        json={"operation": "find_replace", "content": "new content", "find_text": "nonexistent"},
    )
    assert response.status_code == 400
    assert "Text to replace not found" in response.json()["detail"]


@pytest.mark.asyncio
async def test_edit_entity_find_replace_wrong_expected_count(client: AsyncClient, project_url):
    """Test find_replace with wrong expected_replacements count returns 400."""
    # Create test entity with repeated text
    response = await client.post(
        f"{project_url}/knowledge/entities",
        json={
            "title": "Sample Note",
            "folder": "docs",
            "entity_type": "note",
            "content": "The word banana appears here. Another banana word here.",
        },
    )
    assert response.status_code == 200
    entity = response.json()

    # Try to replace with wrong expected count
    response = await client.patch(
        f"{project_url}/knowledge/entities/{entity['permalink']}",
        json={
            "operation": "find_replace",
            "content": "replacement",
            "find_text": "banana",
            "expected_replacements": 1,  # Wrong - there are actually 2
        },
    )
    assert response.status_code == 400
    assert "Expected 1 occurrences" in response.json()["detail"]
    assert "but found 2" in response.json()["detail"]


@pytest.mark.asyncio
async def test_edit_entity_search_reindex(client: AsyncClient, project_url):
    """Test that edited entities are reindexed for search."""
    # Create test entity
    response = await client.post(
        f"{project_url}/knowledge/entities",
        json={
            "title": "Search Test",
            "folder": "test",
            "entity_type": "note",
            "content": "Original searchable content",
        },
    )
    assert response.status_code == 200
    entity = response.json()

    # Edit the entity
    response = await client.patch(
        f"{project_url}/knowledge/entities/{entity['permalink']}",
        json={"operation": "append", "content": " with unique zebra marker"},
    )
    assert response.status_code == 200

    # Search should find the new content
    search_response = await client.post(
        f"{project_url}/search/",
        json={"text": "zebra marker", "entity_types": ["entity"]},
    )
    results = search_response.json()["results"]
    assert len(results) == 1
    assert results[0]["permalink"] == entity["permalink"]


# Move entity endpoint tests


@pytest.mark.asyncio
async def test_move_entity_success(client: AsyncClient, project_url):
    """Test successfully moving an entity to a new location."""
    # Create test entity
    response = await client.post(
        f"{project_url}/knowledge/entities",
        json={
            "title": "TestNote",
            "folder": "source",
            "entity_type": "note",
            "content": "Test content",
        },
    )
    assert response.status_code == 200
    entity = response.json()
    original_permalink = entity["permalink"]

    # Move entity
    move_data = {
        "identifier": original_permalink,
        "destination_path": "target/MovedNote.md",
    }
    response = await client.post(f"{project_url}/knowledge/move", json=move_data)
    assert response.status_code == 200
    response_model = EntityResponse.model_validate(response.json())
    assert response_model.file_path == "target/MovedNote.md"

    # Verify original entity no longer exists
    response = await client.get(f"{project_url}/knowledge/entities/{original_permalink}")
    assert response.status_code == 404

    # Verify entity exists at new location
    response = await client.get(f"{project_url}/knowledge/entities/target/moved-note")
    assert response.status_code == 200
    moved_entity = response.json()
    assert moved_entity["file_path"] == "target/MovedNote.md"
    assert moved_entity["permalink"] == "target/moved-note"

    # Verify file content using resource endpoint
    response = await client.get(f"{project_url}/resource/target/moved-note?content=true")
    assert response.status_code == 200
    file_content = response.text
    assert "Test content" in file_content


@pytest.mark.asyncio
async def test_move_entity_with_folder_creation(client: AsyncClient, project_url):
    """Test moving entity creates necessary folders."""
    # Create test entity
    response = await client.post(
        f"{project_url}/knowledge/entities",
        json={
            "title": "TestNote",
            "folder": "",
            "entity_type": "note",
            "content": "Test content",
        },
    )
    assert response.status_code == 200
    entity = response.json()

    # Move to deeply nested path
    move_data = {
        "identifier": entity["permalink"],
        "destination_path": "deeply/nested/folder/MovedNote.md",
    }
    response = await client.post(f"{project_url}/knowledge/move", json=move_data)
    assert response.status_code == 200

    # Verify entity exists at new location
    response = await client.get(f"{project_url}/knowledge/entities/deeply/nested/folder/moved-note")
    assert response.status_code == 200
    moved_entity = response.json()
    assert moved_entity["file_path"] == "deeply/nested/folder/MovedNote.md"


@pytest.mark.asyncio
async def test_move_entity_with_observations_and_relations(client: AsyncClient, project_url):
    """Test moving entity preserves observations and relations."""
    # Create test entity with complex content
    content = """# Complex Entity

## Observations
- [note] Important observation #tag1
- [feature] Key feature #feature
- relation to [[SomeOtherEntity]]
- depends on [[Dependency]]

Some additional content."""

    response = await client.post(
        f"{project_url}/knowledge/entities",
        json={
            "title": "ComplexEntity",
            "folder": "source",
            "entity_type": "note",
            "content": content,
        },
    )
    assert response.status_code == 200
    entity = response.json()

    # Verify original observations and relations
    assert len(entity["observations"]) == 2
    assert len(entity["relations"]) == 2

    # Move entity
    move_data = {
        "identifier": entity["permalink"],
        "destination_path": "target/MovedComplex.md",
    }
    response = await client.post(f"{project_url}/knowledge/move", json=move_data)
    assert response.status_code == 200

    # Verify moved entity preserves data
    response = await client.get(f"{project_url}/knowledge/entities/target/moved-complex")
    assert response.status_code == 200
    moved_entity = response.json()

    # Check observations preserved
    assert len(moved_entity["observations"]) == 2
    obs_categories = {obs["category"] for obs in moved_entity["observations"]}
    assert obs_categories == {"note", "feature"}

    # Check relations preserved
    assert len(moved_entity["relations"]) == 2
    rel_types = {rel["relation_type"] for rel in moved_entity["relations"]}
    assert rel_types == {"relation to", "depends on"}

    # Verify file content preserved
    response = await client.get(f"{project_url}/resource/target/moved-complex?content=true")
    assert response.status_code == 200
    file_content = response.text
    assert "Important observation #tag1" in file_content
    assert "[[SomeOtherEntity]]" in file_content


@pytest.mark.asyncio
async def test_move_entity_search_reindexing(client: AsyncClient, project_url):
    """Test that moved entities are properly reindexed for search."""
    # Create searchable entity
    response = await client.post(
        f"{project_url}/knowledge/entities",
        json={
            "title": "SearchableNote",
            "folder": "source",
            "entity_type": "note",
            "content": "Unique searchable elephant content",
        },
    )
    assert response.status_code == 200
    entity = response.json()

    # Move entity
    move_data = {
        "identifier": entity["permalink"],
        "destination_path": "target/MovedSearchable.md",
    }
    response = await client.post(f"{project_url}/knowledge/move", json=move_data)
    assert response.status_code == 200

    # Search should find entity at new location
    search_response = await client.post(
        f"{project_url}/search/",
        json={"text": "elephant", "entity_types": [SearchItemType.ENTITY.value]},
    )
    results = search_response.json()["results"]
    assert len(results) == 1
    assert results[0]["permalink"] == "target/moved-searchable"


@pytest.mark.asyncio
async def test_move_entity_not_found(client: AsyncClient, project_url):
    """Test moving non-existent entity returns 400 error."""
    move_data = {
        "identifier": "non-existent-entity",
        "destination_path": "target/SomeFile.md",
    }
    response = await client.post(f"{project_url}/knowledge/move", json=move_data)
    assert response.status_code == 400
    assert "Entity not found" in response.json()["detail"]


@pytest.mark.asyncio
async def test_move_entity_invalid_destination_path(client: AsyncClient, project_url):
    """Test moving entity with invalid destination path."""
    # Create test entity
    response = await client.post(
        f"{project_url}/knowledge/entities",
        json={
            "title": "TestNote",
            "folder": "",
            "entity_type": "note",
            "content": "Test content",
        },
    )
    assert response.status_code == 200
    entity = response.json()

    # Test various invalid paths
    invalid_paths = [
        "/absolute/path.md",  # Absolute path
        "../parent/path.md",  # Parent directory
        "",  # Empty string
        "   ",  # Whitespace only
    ]

    for invalid_path in invalid_paths:
        move_data = {
            "identifier": entity["permalink"],
            "destination_path": invalid_path,
        }
        response = await client.post(f"{project_url}/knowledge/move", json=move_data)
        assert response.status_code == 422  # Validation error


@pytest.mark.asyncio
async def test_move_entity_destination_exists(client: AsyncClient, project_url):
    """Test moving entity to existing destination returns error."""
    # Create source entity
    response = await client.post(
        f"{project_url}/knowledge/entities",
        json={
            "title": "SourceNote",
            "folder": "source",
            "entity_type": "note",
            "content": "Source content",
        },
    )
    assert response.status_code == 200
    source_entity = response.json()

    # Create destination entity
    response = await client.post(
        f"{project_url}/knowledge/entities",
        json={
            "title": "DestinationNote",
            "folder": "target",
            "entity_type": "note",
            "content": "Destination content",
        },
    )
    assert response.status_code == 200

    # Try to move source to existing destination
    move_data = {
        "identifier": source_entity["permalink"],
        "destination_path": "target/DestinationNote.md",
    }
    response = await client.post(f"{project_url}/knowledge/move", json=move_data)
    assert response.status_code == 400
    assert "already exists" in response.json()["detail"]


@pytest.mark.asyncio
async def test_move_entity_missing_identifier(client: AsyncClient, project_url):
    """Test move request with missing identifier."""
    move_data = {
        "destination_path": "target/SomeFile.md",
    }
    response = await client.post(f"{project_url}/knowledge/move", json=move_data)
    assert response.status_code == 422  # Validation error


@pytest.mark.asyncio
async def test_move_entity_missing_destination(client: AsyncClient, project_url):
    """Test move request with missing destination path."""
    move_data = {
        "identifier": "some-entity",
    }
    response = await client.post(f"{project_url}/knowledge/move", json=move_data)
    assert response.status_code == 422  # Validation error


@pytest.mark.asyncio
async def test_move_entity_by_file_path(client: AsyncClient, project_url):
    """Test moving entity using file path as identifier."""
    # Create test entity
    response = await client.post(
        f"{project_url}/knowledge/entities",
        json={
            "title": "TestNote",
            "folder": "source",
            "entity_type": "note",
            "content": "Test content",
        },
    )
    assert response.status_code == 200
    entity = response.json()

    # Move using file path as identifier
    move_data = {
        "identifier": entity["file_path"],
        "destination_path": "target/MovedByPath.md",
    }
    response = await client.post(f"{project_url}/knowledge/move", json=move_data)
    assert response.status_code == 200

    # Verify entity exists at new location
    response = await client.get(f"{project_url}/knowledge/entities/target/moved-by-path")
    assert response.status_code == 200
    moved_entity = response.json()
    assert moved_entity["file_path"] == "target/MovedByPath.md"


@pytest.mark.asyncio
async def test_move_entity_by_title(client: AsyncClient, project_url):
    """Test moving entity using title as identifier."""
    # Create test entity with unique title
    response = await client.post(
        f"{project_url}/knowledge/entities",
        json={
            "title": "UniqueTestTitle",
            "folder": "source",
            "entity_type": "note",
            "content": "Test content",
        },
    )
    assert response.status_code == 200

    # Move using title as identifier
    move_data = {
        "identifier": "UniqueTestTitle",
        "destination_path": "target/MovedByTitle.md",
    }
    response = await client.post(f"{project_url}/knowledge/move", json=move_data)
    assert response.status_code == 200

    # Verify entity exists at new location
    response = await client.get(f"{project_url}/knowledge/entities/target/moved-by-title")
    assert response.status_code == 200
    moved_entity = response.json()
    assert moved_entity["file_path"] == "target/MovedByTitle.md"
    assert moved_entity["title"] == "UniqueTestTitle"

```
Page 14/17FirstPrevNextLast