#
tokens: 49474/50000 5/416 files (page 16/19)
lines: off (toggle) GitHub
raw markdown copy
This is page 16 of 19. Use http://codebase.md/basicmachines-co/basic-memory?page={x} to view the full context.

# Directory Structure

```
├── .claude
│   ├── commands
│   │   ├── release
│   │   │   ├── beta.md
│   │   │   ├── changelog.md
│   │   │   ├── release-check.md
│   │   │   └── release.md
│   │   ├── spec.md
│   │   └── test-live.md
│   └── settings.json
├── .dockerignore
├── .env.example
├── .github
│   ├── dependabot.yml
│   ├── ISSUE_TEMPLATE
│   │   ├── bug_report.md
│   │   ├── config.yml
│   │   ├── documentation.md
│   │   └── feature_request.md
│   └── workflows
│       ├── claude-code-review.yml
│       ├── claude-issue-triage.yml
│       ├── claude.yml
│       ├── dev-release.yml
│       ├── docker.yml
│       ├── pr-title.yml
│       ├── release.yml
│       └── test.yml
├── .gitignore
├── .python-version
├── CHANGELOG.md
├── CITATION.cff
├── CLA.md
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── docker-compose-postgres.yml
├── docker-compose.yml
├── Dockerfile
├── docs
│   ├── ai-assistant-guide-extended.md
│   ├── ARCHITECTURE.md
│   ├── character-handling.md
│   ├── cloud-cli.md
│   ├── Docker.md
│   └── testing-coverage.md
├── justfile
├── LICENSE
├── llms-install.md
├── pyproject.toml
├── README.md
├── SECURITY.md
├── smithery.yaml
├── specs
│   ├── SPEC-1 Specification-Driven Development Process.md
│   ├── SPEC-10 Unified Deployment Workflow and Event Tracking.md
│   ├── SPEC-11 Basic Memory API Performance Optimization.md
│   ├── SPEC-12 OpenTelemetry Observability.md
│   ├── SPEC-13 CLI Authentication with Subscription Validation.md
│   ├── SPEC-14 Cloud Git Versioning & GitHub Backup.md
│   ├── SPEC-14- Cloud Git Versioning & GitHub Backup.md
│   ├── SPEC-15 Configuration Persistence via Tigris for Cloud Tenants.md
│   ├── SPEC-16 MCP Cloud Service Consolidation.md
│   ├── SPEC-17 Semantic Search with ChromaDB.md
│   ├── SPEC-18 AI Memory Management Tool.md
│   ├── SPEC-19 Sync Performance and Memory Optimization.md
│   ├── SPEC-2 Slash Commands Reference.md
│   ├── SPEC-20 Simplified Project-Scoped Rclone Sync.md
│   ├── SPEC-3 Agent Definitions.md
│   ├── SPEC-4 Notes Web UI Component Architecture.md
│   ├── SPEC-5 CLI Cloud Upload via WebDAV.md
│   ├── SPEC-6 Explicit Project Parameter Architecture.md
│   ├── SPEC-7 POC to spike Tigris Turso for local access to cloud data.md
│   ├── SPEC-8 TigrisFS Integration.md
│   ├── SPEC-9 Multi-Project Bidirectional Sync Architecture.md
│   ├── SPEC-9 Signed Header Tenant Information.md
│   └── SPEC-9-1 Follow-Ups- Conflict, Sync, and Observability.md
├── src
│   └── basic_memory
│       ├── __init__.py
│       ├── alembic
│       │   ├── alembic.ini
│       │   ├── env.py
│       │   ├── migrations.py
│       │   ├── script.py.mako
│       │   └── versions
│       │       ├── 314f1ea54dc4_add_postgres_full_text_search_support_.py
│       │       ├── 3dae7c7b1564_initial_schema.py
│       │       ├── 502b60eaa905_remove_required_from_entity_permalink.py
│       │       ├── 5fe1ab1ccebe_add_projects_table.py
│       │       ├── 647e7a75e2cd_project_constraint_fix.py
│       │       ├── 6830751f5fb6_merge_multiple_heads.py
│       │       ├── 9d9c1cb7d8f5_add_mtime_and_size_columns_to_entity_.py
│       │       ├── a1b2c3d4e5f6_fix_project_foreign_keys.py
│       │       ├── a2b3c4d5e6f7_add_search_index_entity_cascade.py
│       │       ├── b3c3938bacdb_relation_to_name_unique_index.py
│       │       ├── cc7172b46608_update_search_index_schema.py
│       │       ├── e7e1f4367280_add_scan_watermark_tracking_to_project.py
│       │       ├── f8a9b2c3d4e5_add_pg_trgm_for_fuzzy_link_resolution.py
│       │       └── g9a0b3c4d5e6_add_external_id_to_project_and_entity.py
│       ├── api
│       │   ├── __init__.py
│       │   ├── app.py
│       │   ├── container.py
│       │   ├── routers
│       │   │   ├── __init__.py
│       │   │   ├── directory_router.py
│       │   │   ├── importer_router.py
│       │   │   ├── knowledge_router.py
│       │   │   ├── management_router.py
│       │   │   ├── memory_router.py
│       │   │   ├── project_router.py
│       │   │   ├── prompt_router.py
│       │   │   ├── resource_router.py
│       │   │   ├── search_router.py
│       │   │   └── utils.py
│       │   ├── template_loader.py
│       │   └── v2
│       │       ├── __init__.py
│       │       └── routers
│       │           ├── __init__.py
│       │           ├── directory_router.py
│       │           ├── importer_router.py
│       │           ├── knowledge_router.py
│       │           ├── memory_router.py
│       │           ├── project_router.py
│       │           ├── prompt_router.py
│       │           ├── resource_router.py
│       │           └── search_router.py
│       ├── cli
│       │   ├── __init__.py
│       │   ├── app.py
│       │   ├── auth.py
│       │   ├── commands
│       │   │   ├── __init__.py
│       │   │   ├── cloud
│       │   │   │   ├── __init__.py
│       │   │   │   ├── api_client.py
│       │   │   │   ├── bisync_commands.py
│       │   │   │   ├── cloud_utils.py
│       │   │   │   ├── core_commands.py
│       │   │   │   ├── rclone_commands.py
│       │   │   │   ├── rclone_config.py
│       │   │   │   ├── rclone_installer.py
│       │   │   │   ├── upload_command.py
│       │   │   │   └── upload.py
│       │   │   ├── command_utils.py
│       │   │   ├── db.py
│       │   │   ├── format.py
│       │   │   ├── import_chatgpt.py
│       │   │   ├── import_claude_conversations.py
│       │   │   ├── import_claude_projects.py
│       │   │   ├── import_memory_json.py
│       │   │   ├── mcp.py
│       │   │   ├── project.py
│       │   │   ├── status.py
│       │   │   ├── telemetry.py
│       │   │   └── tool.py
│       │   ├── container.py
│       │   └── main.py
│       ├── config.py
│       ├── db.py
│       ├── deps
│       │   ├── __init__.py
│       │   ├── config.py
│       │   ├── db.py
│       │   ├── importers.py
│       │   ├── projects.py
│       │   ├── repositories.py
│       │   └── services.py
│       ├── deps.py
│       ├── file_utils.py
│       ├── ignore_utils.py
│       ├── importers
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── chatgpt_importer.py
│       │   ├── claude_conversations_importer.py
│       │   ├── claude_projects_importer.py
│       │   ├── memory_json_importer.py
│       │   └── utils.py
│       ├── markdown
│       │   ├── __init__.py
│       │   ├── entity_parser.py
│       │   ├── markdown_processor.py
│       │   ├── plugins.py
│       │   ├── schemas.py
│       │   └── utils.py
│       ├── mcp
│       │   ├── __init__.py
│       │   ├── async_client.py
│       │   ├── clients
│       │   │   ├── __init__.py
│       │   │   ├── directory.py
│       │   │   ├── knowledge.py
│       │   │   ├── memory.py
│       │   │   ├── project.py
│       │   │   ├── resource.py
│       │   │   └── search.py
│       │   ├── container.py
│       │   ├── project_context.py
│       │   ├── prompts
│       │   │   ├── __init__.py
│       │   │   ├── ai_assistant_guide.py
│       │   │   ├── continue_conversation.py
│       │   │   ├── recent_activity.py
│       │   │   ├── search.py
│       │   │   └── utils.py
│       │   ├── resources
│       │   │   ├── ai_assistant_guide.md
│       │   │   └── project_info.py
│       │   ├── server.py
│       │   └── tools
│       │       ├── __init__.py
│       │       ├── build_context.py
│       │       ├── canvas.py
│       │       ├── chatgpt_tools.py
│       │       ├── delete_note.py
│       │       ├── edit_note.py
│       │       ├── list_directory.py
│       │       ├── move_note.py
│       │       ├── project_management.py
│       │       ├── read_content.py
│       │       ├── read_note.py
│       │       ├── recent_activity.py
│       │       ├── search.py
│       │       ├── utils.py
│       │       ├── view_note.py
│       │       └── write_note.py
│       ├── models
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── knowledge.py
│       │   ├── project.py
│       │   └── search.py
│       ├── project_resolver.py
│       ├── repository
│       │   ├── __init__.py
│       │   ├── entity_repository.py
│       │   ├── observation_repository.py
│       │   ├── postgres_search_repository.py
│       │   ├── project_info_repository.py
│       │   ├── project_repository.py
│       │   ├── relation_repository.py
│       │   ├── repository.py
│       │   ├── search_index_row.py
│       │   ├── search_repository_base.py
│       │   ├── search_repository.py
│       │   └── sqlite_search_repository.py
│       ├── runtime.py
│       ├── schemas
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── cloud.py
│       │   ├── delete.py
│       │   ├── directory.py
│       │   ├── importer.py
│       │   ├── memory.py
│       │   ├── project_info.py
│       │   ├── prompt.py
│       │   ├── request.py
│       │   ├── response.py
│       │   ├── search.py
│       │   ├── sync_report.py
│       │   └── v2
│       │       ├── __init__.py
│       │       ├── entity.py
│       │       └── resource.py
│       ├── services
│       │   ├── __init__.py
│       │   ├── context_service.py
│       │   ├── directory_service.py
│       │   ├── entity_service.py
│       │   ├── exceptions.py
│       │   ├── file_service.py
│       │   ├── initialization.py
│       │   ├── link_resolver.py
│       │   ├── project_service.py
│       │   ├── search_service.py
│       │   └── service.py
│       ├── sync
│       │   ├── __init__.py
│       │   ├── background_sync.py
│       │   ├── coordinator.py
│       │   ├── sync_service.py
│       │   └── watch_service.py
│       ├── telemetry.py
│       ├── templates
│       │   └── prompts
│       │       ├── continue_conversation.hbs
│       │       └── search.hbs
│       └── utils.py
├── test-int
│   ├── BENCHMARKS.md
│   ├── cli
│   │   ├── test_project_commands_integration.py
│   │   └── test_version_integration.py
│   ├── conftest.py
│   ├── mcp
│   │   ├── test_build_context_underscore.py
│   │   ├── test_build_context_validation.py
│   │   ├── test_chatgpt_tools_integration.py
│   │   ├── test_default_project_mode_integration.py
│   │   ├── test_delete_note_integration.py
│   │   ├── test_edit_note_integration.py
│   │   ├── test_lifespan_shutdown_sync_task_cancellation_integration.py
│   │   ├── test_list_directory_integration.py
│   │   ├── test_move_note_integration.py
│   │   ├── test_project_management_integration.py
│   │   ├── test_project_state_sync_integration.py
│   │   ├── test_read_content_integration.py
│   │   ├── test_read_note_integration.py
│   │   ├── test_search_integration.py
│   │   ├── test_single_project_mcp_integration.py
│   │   └── test_write_note_integration.py
│   ├── test_db_wal_mode.py
│   └── test_disable_permalinks_integration.py
├── tests
│   ├── __init__.py
│   ├── api
│   │   ├── conftest.py
│   │   ├── test_api_container.py
│   │   ├── test_async_client.py
│   │   ├── test_continue_conversation_template.py
│   │   ├── test_directory_router.py
│   │   ├── test_importer_router.py
│   │   ├── test_knowledge_router.py
│   │   ├── test_management_router.py
│   │   ├── test_memory_router.py
│   │   ├── test_project_router_operations.py
│   │   ├── test_project_router.py
│   │   ├── test_prompt_router.py
│   │   ├── test_relation_background_resolution.py
│   │   ├── test_resource_router.py
│   │   ├── test_search_router.py
│   │   ├── test_search_template.py
│   │   ├── test_template_loader_helpers.py
│   │   ├── test_template_loader.py
│   │   └── v2
│   │       ├── __init__.py
│   │       ├── conftest.py
│   │       ├── test_directory_router.py
│   │       ├── test_importer_router.py
│   │       ├── test_knowledge_router.py
│   │       ├── test_memory_router.py
│   │       ├── test_project_router.py
│   │       ├── test_prompt_router.py
│   │       ├── test_resource_router.py
│   │       └── test_search_router.py
│   ├── cli
│   │   ├── cloud
│   │   │   ├── test_cloud_api_client_and_utils.py
│   │   │   ├── test_rclone_config_and_bmignore_filters.py
│   │   │   └── test_upload_path.py
│   │   ├── conftest.py
│   │   ├── test_auth_cli_auth.py
│   │   ├── test_cli_container.py
│   │   ├── test_cli_exit.py
│   │   ├── test_cli_tool_exit.py
│   │   ├── test_cli_tools.py
│   │   ├── test_cloud_authentication.py
│   │   ├── test_ignore_utils.py
│   │   ├── test_import_chatgpt.py
│   │   ├── test_import_claude_conversations.py
│   │   ├── test_import_claude_projects.py
│   │   ├── test_import_memory_json.py
│   │   ├── test_project_add_with_local_path.py
│   │   └── test_upload.py
│   ├── conftest.py
│   ├── db
│   │   └── test_issue_254_foreign_key_constraints.py
│   ├── importers
│   │   ├── test_conversation_indexing.py
│   │   ├── test_importer_base.py
│   │   └── test_importer_utils.py
│   ├── markdown
│   │   ├── __init__.py
│   │   ├── test_date_frontmatter_parsing.py
│   │   ├── test_entity_parser_error_handling.py
│   │   ├── test_entity_parser.py
│   │   ├── test_markdown_plugins.py
│   │   ├── test_markdown_processor.py
│   │   ├── test_observation_edge_cases.py
│   │   ├── test_parser_edge_cases.py
│   │   ├── test_relation_edge_cases.py
│   │   └── test_task_detection.py
│   ├── mcp
│   │   ├── clients
│   │   │   ├── __init__.py
│   │   │   └── test_clients.py
│   │   ├── conftest.py
│   │   ├── test_async_client_modes.py
│   │   ├── test_mcp_container.py
│   │   ├── test_obsidian_yaml_formatting.py
│   │   ├── test_permalink_collision_file_overwrite.py
│   │   ├── test_project_context.py
│   │   ├── test_prompts.py
│   │   ├── test_recent_activity_prompt_modes.py
│   │   ├── test_resources.py
│   │   ├── test_server_lifespan_branches.py
│   │   ├── test_tool_build_context.py
│   │   ├── test_tool_canvas.py
│   │   ├── test_tool_delete_note.py
│   │   ├── test_tool_edit_note.py
│   │   ├── test_tool_list_directory.py
│   │   ├── test_tool_move_note.py
│   │   ├── test_tool_project_management.py
│   │   ├── test_tool_read_content.py
│   │   ├── test_tool_read_note.py
│   │   ├── test_tool_recent_activity.py
│   │   ├── test_tool_resource.py
│   │   ├── test_tool_search.py
│   │   ├── test_tool_utils.py
│   │   ├── test_tool_view_note.py
│   │   ├── test_tool_write_note_kebab_filenames.py
│   │   ├── test_tool_write_note.py
│   │   └── tools
│   │       └── test_chatgpt_tools.py
│   ├── Non-MarkdownFileSupport.pdf
│   ├── README.md
│   ├── repository
│   │   ├── test_entity_repository_upsert.py
│   │   ├── test_entity_repository.py
│   │   ├── test_entity_upsert_issue_187.py
│   │   ├── test_observation_repository.py
│   │   ├── test_postgres_search_repository.py
│   │   ├── test_project_info_repository.py
│   │   ├── test_project_repository.py
│   │   ├── test_relation_repository.py
│   │   ├── test_repository.py
│   │   ├── test_search_repository_edit_bug_fix.py
│   │   └── test_search_repository.py
│   ├── schemas
│   │   ├── test_base_timeframe_minimum.py
│   │   ├── test_memory_serialization.py
│   │   ├── test_memory_url_validation.py
│   │   ├── test_memory_url.py
│   │   ├── test_relation_response_reference_resolution.py
│   │   ├── test_schemas.py
│   │   └── test_search.py
│   ├── Screenshot.png
│   ├── services
│   │   ├── test_context_service.py
│   │   ├── test_directory_service.py
│   │   ├── test_entity_service_disable_permalinks.py
│   │   ├── test_entity_service.py
│   │   ├── test_file_service.py
│   │   ├── test_initialization_cloud_mode_branches.py
│   │   ├── test_initialization.py
│   │   ├── test_link_resolver.py
│   │   ├── test_project_removal_bug.py
│   │   ├── test_project_service_operations.py
│   │   ├── test_project_service.py
│   │   └── test_search_service.py
│   ├── sync
│   │   ├── test_character_conflicts.py
│   │   ├── test_coordinator.py
│   │   ├── test_sync_service_incremental.py
│   │   ├── test_sync_service.py
│   │   ├── test_sync_wikilink_issue.py
│   │   ├── test_tmp_files.py
│   │   ├── test_watch_service_atomic_adds.py
│   │   ├── test_watch_service_edge_cases.py
│   │   ├── test_watch_service_reload.py
│   │   └── test_watch_service.py
│   ├── test_config.py
│   ├── test_deps.py
│   ├── test_production_cascade_delete.py
│   ├── test_project_resolver.py
│   ├── test_rclone_commands.py
│   ├── test_runtime.py
│   ├── test_telemetry.py
│   └── utils
│       ├── test_file_utils.py
│       ├── test_frontmatter_obsidian_compatible.py
│       ├── test_parse_tags.py
│       ├── test_permalink_formatting.py
│       ├── test_timezone_utils.py
│       ├── test_utf8_handling.py
│       └── test_validate_project_path.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/specs/SPEC-19 Sync Performance and Memory Optimization.md:
--------------------------------------------------------------------------------

```markdown
---
title: 'SPEC-19: Sync Performance and Memory Optimization'
type: spec
permalink: specs/spec-17-sync-performance-optimization
tags:
- performance
- memory
- sync
- optimization
- core
status: draft
---

# SPEC-19: Sync Performance and Memory Optimization

## Why

### Problem Statement

Current sync implementation causes Out-of-Memory (OOM) kills and poor performance on production systems:

**Evidence from Production**:
- **Tenant-6d2ff1a3**: OOM killed on 1GB machine
  - Files: 2,621 total (31 PDFs, 80MB binary data)
  - Memory: 1.5-1.7GB peak usage
  - Sync duration: 15+ minutes
  - Error: `Out of memory: Killed process 693 (python)`

**Root Causes**:

1. **Checksum-based scanning loads ALL files into memory**
   - `scan_directory()` computes checksums for ALL 2,624 files upfront
   - Results stored in multiple dicts (`ScanResult.files`, `SyncReport.checksums`)
   - Even unchanged files are fully read and checksummed

2. **Large files read entirely for checksums**
   - 16MB PDF → Full read into memory → Compute checksum
   - No streaming or chunked processing
   - TigrisFS caching compounds memory usage

3. **Unbounded concurrency**
   - All 2,624 files processed simultaneously
   - Each file loads full content into memory
   - No semaphore limiting concurrent operations

4. **Cloud-specific resource leaks**
   - aiohttp session leak in keepalive (not in context manager)
   - Circuit breaker resets every 30s sync cycle (ineffective)
   - Thundering herd: all tenants sync at :00 and :30

### Impact

- **Production stability**: OOM kills are unacceptable
- **User experience**: 15+ minute syncs are too slow
- **Cost**: Forced upgrades from 1GB → 2GB machines ($5-10/mo per tenant)
- **Scalability**: Current approach won't scale to 100+ tenants

### Architectural Decision

**Fix in basic-memory core first, NOT UberSync**

Rationale:
- Root causes are algorithmic, not architectural
- Benefits all users (CLI + Cloud)
- Lower risk than new centralized service
- Known solutions (rsync/rclone use same pattern)
- Can defer UberSync until metrics prove it necessary

## What

### Affected Components

**basic-memory (core)**:
- `src/basic_memory/sync/sync_service.py` - Core sync algorithm (~42KB)
- `src/basic_memory/models.py` - Entity model (add mtime/size columns)
- `src/basic_memory/file_utils.py` - Checksum computation functions
- `src/basic_memory/repository/entity_repository.py` - Database queries
- `alembic/versions/` - Database migration for schema changes

**basic-memory-cloud (wrapper)**:
- `apps/api/src/basic_memory_cloud_api/sync_worker.py` - Cloud sync wrapper
- Circuit breaker implementation
- Sync coordination logic

### Database Schema Changes

Add to Entity model:
```python
mtime: float  # File modification timestamp
size: int     # File size in bytes
```

## How (High Level)

### Phase 1: Core Algorithm Fixes (basic-memory)

**Priority: P0 - Critical**

#### 1.1 mtime-based Scanning (Issue #383)

Replace expensive checksum-based scanning with lightweight stat-based comparison:

```python
async def scan_directory(self, directory: Path) -> ScanResult:
    """Scan using mtime/size instead of checksums"""
    result = ScanResult()

    for root, dirnames, filenames in os.walk(str(directory)):
        for filename in filenames:
            rel_path = path.relative_to(directory).as_posix()
            stat = path.stat()

            # Store lightweight metadata instead of checksum
            result.files[rel_path] = {
                'mtime': stat.st_mtime,
                'size': stat.st_size
            }

    return result

async def scan(self, directory: Path):
    """Compare mtime/size, only compute checksums for changed files"""
    db_state = await self.get_db_file_state()  # Include mtime/size
    scan_result = await self.scan_directory(directory)

    for file_path, metadata in scan_result.files.items():
        db_metadata = db_state.get(file_path)

        # Only compute expensive checksum if mtime/size changed
        if not db_metadata or metadata['mtime'] != db_metadata['mtime']:
            checksum = await self._compute_checksum_streaming(file_path)
            # Process immediately, don't accumulate in memory
```

**Benefits**:
- No file reads during initial scan (just stat calls)
- ~90% reduction in memory usage
- ~10x faster scan phase
- Only checksum files that actually changed

#### 1.2 Streaming Checksum Computation (Issue #382)

For large files (>1MB), use chunked reading to avoid loading entire file:

```python
async def _compute_checksum_streaming(self, path: Path, chunk_size: int = 65536) -> str:
    """Compute checksum using 64KB chunks for large files"""
    hasher = hashlib.sha256()

    loop = asyncio.get_event_loop()

    def read_chunks():
        with open(path, 'rb') as f:
            while chunk := f.read(chunk_size):
                hasher.update(chunk)

    await loop.run_in_executor(None, read_chunks)
    return hasher.hexdigest()

async def _compute_checksum_async(self, file_path: Path) -> str:
    """Choose appropriate checksum method based on file size"""
    stat = file_path.stat()

    if stat.st_size > 1_048_576:  # 1MB threshold
        return await self._compute_checksum_streaming(file_path)
    else:
        # Small files: existing fast path
        content = await self._read_file_async(file_path)
        return compute_checksum(content)
```

**Benefits**:
- Constant memory usage regardless of file size
- 16MB PDF uses 64KB memory (not 16MB)
- Works well with TigrisFS network I/O

#### 1.3 Bounded Concurrency (Issue #198)

Add semaphore to limit concurrent file operations, or consider using aiofiles and async reads

```python
class SyncService:
    def __init__(self, ...):
        # ... existing code ...
        self._file_semaphore = asyncio.Semaphore(10)  # Max 10 concurrent
        self._max_tracked_failures = 100  # LRU cache limit

    async def _read_file_async(self, file_path: Path) -> str:
        async with self._file_semaphore:
            loop = asyncio.get_event_loop()
            return await loop.run_in_executor(
                self._thread_pool,
                file_path.read_text,
                "utf-8"
            )

    async def _record_failure(self, path: str, error: str):
        # ... existing code ...

        # Implement LRU eviction
        if len(self._file_failures) > self._max_tracked_failures:
            self._file_failures.popitem(last=False)  # Remove oldest
```

**Benefits**:
- Maximum 10 files in memory at once (vs all 2,624)
- 90%+ reduction in peak memory usage
- Prevents unbounded memory growth on error-prone projects

### Phase 2: Cloud-Specific Fixes (basic-memory-cloud)

**Priority: P1 - High**

#### 2.1 Fix Resource Leaks

```python
# apps/api/src/basic_memory_cloud_api/sync_worker.py

async def send_keepalive():
    """Send keepalive pings using proper session management"""
    # Use context manager to ensure cleanup
    async with aiohttp.ClientSession(
        timeout=aiohttp.ClientTimeout(total=5)
    ) as session:
        while True:
            try:
                await session.get(f"https://{fly_app_name}.fly.dev/health")
                await asyncio.sleep(10)
            except asyncio.CancelledError:
                raise  # Exit cleanly
            except Exception as e:
                logger.warning(f"Keepalive failed: {e}")
```

#### 2.2 Improve Circuit Breaker

Track failures across sync cycles instead of resetting every 30s:

```python
# Persistent failure tracking
class SyncWorker:
    def __init__(self):
        self._persistent_failures: Dict[str, int] = {}  # file -> failure_count
        self._failure_window_start = time.time()

    async def should_skip_file(self, file_path: str) -> bool:
        # Skip files that failed >3 times in last hour
        if self._persistent_failures.get(file_path, 0) > 3:
            if time.time() - self._failure_window_start < 3600:
                return True
        return False
```

### Phase 3: Measurement & Decision

**Priority: P2 - Future**

After implementing Phases 1-2, collect metrics for 2 weeks:
- Memory usage per tenant sync
- Sync duration (scan + process)
- Concurrent sync load at peak times
- OOM incidents
- Resource costs

**UberSync Decision Criteria**:

Build centralized sync service ONLY if metrics show:
- ✅ Core fixes insufficient for >100 tenants
- ✅ Resource contention causing problems
- ✅ Need for tenant tier prioritization (paid > free)
- ✅ Cost savings justify complexity

Otherwise, defer UberSync as premature optimization.

## How to Evaluate

### Success Metrics (Phase 1)

**Memory Usage**:
- ✅ Peak memory <500MB for 2,000+ file projects (was 1.5-1.7GB)
- ✅ Memory usage linear with concurrent files (10 max), not total files
- ✅ Large file memory usage: 64KB chunks (not 16MB)

**Performance**:
- ✅ Initial scan <30 seconds (was 5+ minutes)
- ✅ Full sync <5 minutes for 2,000+ files (was 15+ minutes)
- ✅ Subsequent syncs <10 seconds (only changed files)

**Stability**:
- ✅ 2,000+ file projects run on 1GB machines
- ✅ Zero OOM kills in production
- ✅ No degradation with binary files (PDFs, images)

### Success Metrics (Phase 2)

**Resource Management**:
- ✅ Zero aiohttp session leaks (verified via monitoring)
- ✅ Circuit breaker prevents repeated failures (>3 fails = skip for 1 hour)
- ✅ Tenant syncs distributed over 30s window (no thundering herd)

**Observability**:
- ✅ Logfire traces show memory usage per sync
- ✅ Clear logging of skipped files and reasons
- ✅ Metrics on sync duration, file counts, failure rates

### Test Plan

**Unit Tests** (basic-memory):
- mtime comparison logic
- Streaming checksum correctness
- Semaphore limiting (mock 100 files, verify max 10 concurrent)
- LRU cache eviction
- Checksum computation: streaming vs non-streaming equivalence

**Integration Tests** (basic-memory):
- Large file handling (create 20MB test file)
- Mixed file types (text + binary)
- Changed file detection via mtime
- Sync with 1,000+ files

**Load Tests** (basic-memory-cloud):
- Test on tenant-6d2ff1a3 (2,621 files, 31 PDFs)
- Monitor memory during full sync with Logfire
- Measure scan and sync duration
- Run on 1GB machine (downgrade from 2GB to verify)
- Simulate 10 concurrent tenant syncs

**Regression Tests**:
- Verify existing sync scenarios still work
- CLI sync behavior unchanged
- File watcher integration unaffected

### Performance Benchmarks

Establish baseline, then compare after each phase:

| Metric | Baseline | Phase 1 Target | Phase 2 Target |
|--------|----------|----------------|----------------|
| Peak Memory (2,600 files) | 1.5-1.7GB | <500MB | <450MB |
| Initial Scan Time | 5+ min | <30 sec | <30 sec |
| Full Sync Time | 15+ min | <5 min | <5 min |
| Subsequent Sync | 2+ min | <10 sec | <10 sec |
| OOM Incidents/Week | 2-3 | 0 | 0 |
| Min RAM Required | 2GB | 1GB | 1GB |

## Implementation Phases

### Phase 0.5: Database Schema & Streaming Foundation

**Priority: P0 - Required for Phase 1**

This phase establishes the foundation for streaming sync with mtime-based change detection.

**Database Schema Changes**:
- [x] Add `mtime` column to Entity model (REAL type for float timestamp)
- [x] Add `size` column to Entity model (INTEGER type for file size in bytes)
- [x] Create Alembic migration for new columns (nullable initially)
- [x] Add indexes on `(file_path, project_id)` for optimistic upsert performance
- [ ] Backfill existing entities with mtime/size from filesystem

**Streaming Architecture**:
- [x] Replace `os.walk()` with `os.scandir()` for cached stat info
- [ ] Eliminate `get_db_file_state()` - no upfront SELECT all entities
- [x] Implement streaming iterator `_scan_directory_streaming()`
- [x] Add `get_by_file_path()` optimized query (single file lookup)
- [x] Add `get_all_file_paths()` for deletion detection (paths only, no entities)

**Benefits**:
- **50% fewer network calls** on Tigris (scandir returns cached stat)
- **No large dicts in memory** (process files one at a time)
- **Indexed lookups** instead of full table scan
- **Foundation for mtime comparison** (Phase 1)

**Code Changes**:

```python
# Before: Load all entities upfront
db_paths = await self.get_db_file_state()  # SELECT * FROM entity WHERE project_id = ?
scan_result = await self.scan_directory()  # os.walk() + stat() per file

# After: Stream and query incrementally
async for file_path, stat_info in self.scan_directory():  # scandir() with cached stat
    db_entity = await self.entity_repository.get_by_file_path(rel_path)  # Indexed lookup
    # Process immediately, no accumulation
```

**Files Modified**:
- `src/basic_memory/models.py` - Add mtime/size columns
- `alembic/versions/xxx_add_mtime_size.py` - Migration
- `src/basic_memory/sync/sync_service.py` - Streaming implementation
- `src/basic_memory/repository/entity_repository.py` - Add get_all_file_paths()

**Migration Strategy**:
```sql
-- Migration: Add nullable columns
ALTER TABLE entity ADD COLUMN mtime REAL;
ALTER TABLE entity ADD COLUMN size INTEGER;

-- Backfill from filesystem during first sync after upgrade
-- (Handled in sync_service on first scan)
```

### Phase 1: Core Fixes

**mtime-based scanning**:
- [x] Add mtime/size columns to Entity model (completed in Phase 0.5)
- [x] Database migration (alembic) (completed in Phase 0.5)
- [x] Refactor `scan()` to use streaming architecture with mtime/size comparison
- [x] Update `sync_markdown_file()` and `sync_regular_file()` to store mtime/size in database
- [x] Only compute checksums for changed files (mtime/size differ)
- [x] Unit tests for streaming scan (6 tests passing)
- [ ] Integration test with 1,000 files (defer to benchmarks)

**Streaming checksums**:
- [x] Implement `_compute_checksum_streaming()` with chunked reading
- [x] Add file size threshold logic (1MB)
- [x] Test with large files (16MB PDF)
- [x] Verify memory usage stays constant
- [x] Test checksum equivalence (streaming vs non-streaming)

**Bounded concurrency**:
- [x] Add semaphore (10 concurrent) to `_read_file_async()` (already existed)
- [x] Add LRU cache for failures (100 max) (already existed)
- [ ] Review thread pool size configuration
- [ ] Load test with 2,000+ files
- [ ] Verify <500MB peak memory

**Cleanup & Optimization**:
- [x] Eliminate `get_db_file_state()` - no upfront SELECT all entities (streaming architecture complete)
- [x] Consolidate file operations in FileService (eliminate duplicate checksum logic)
- [x] Add aiofiles dependency (already present)
- [x] FileService streaming checksums for files >1MB
- [x] SyncService delegates all file operations to FileService
- [x] Complete true async I/O refactoring - all file operations use aiofiles
  - [x] Added `FileService.read_file_content()` using aiofiles
  - [x] Removed `SyncService._read_file_async()` wrapper method
  - [x] Removed `SyncService._compute_checksum_async()` wrapper method
  - [x] Inlined all 7 checksum calls to use `file_service.compute_checksum()` directly
  - [x] All file I/O operations now properly consolidated in FileService with non-blocking I/O
- [x] Removed sync_status_service completely (unnecessary complexity and state tracking)
  - [x] Removed `sync_status_service.py` and `sync_status` MCP tool
  - [x] Removed all `sync_status_tracker` calls from `sync_service.py`
  - [x] Removed migration status checks from MCP tools (`write_note`, `read_note`, `build_context`)
  - [x] Removed `check_migration_status()` and `wait_for_migration_or_return_status()` from `utils.py`
  - [x] Removed all related tests (4 test files deleted)
  - [x] All 1184 tests passing

**Phase 1 Implementation Summary:**

Phase 1 is now complete with all core fixes implemented and tested:

1. **Streaming Architecture** (Phase 0.5 + Phase 1):
   - Replaced `os.walk()` with `os.scandir()` for cached stat info
   - Eliminated upfront `get_db_file_state()` SELECT query
   - Implemented `_scan_directory_streaming()` for incremental processing
   - Added indexed `get_by_file_path()` lookups
   - Result: 50% fewer network calls on TigrisFS, no large dicts in memory

2. **mtime-based Change Detection**:
   - Added `mtime` and `size` columns to Entity model
   - Alembic migration completed and deployed
   - Only compute checksums when mtime/size differs from database
   - Result: ~90% reduction in checksum operations during typical syncs

3. **True Async I/O with aiofiles**:
   - All file operations consolidated in FileService
   - `FileService.compute_checksum()`: 64KB chunked reading for constant memory (lines 261-296 of file_service.py)
   - `FileService.read_file_content()`: Non-blocking file reads with aiofiles (lines 160-193 of file_service.py)
   - Removed all wrapper methods from SyncService (`_read_file_async`, `_compute_checksum_async`)
   - Semaphore controls concurrency (max 10 concurrent file operations)
   - Result: Constant memory usage regardless of file size, true non-blocking I/O

4. **Test Coverage**:
   - 41/43 sync tests passing (2 skipped as expected)
   - Circuit breaker tests updated for new architecture
   - Streaming checksum equivalence verified
   - All edge cases covered (large files, concurrent operations, failures)

**Key Files Modified**:
- `src/basic_memory/models.py` - Added mtime/size columns
- `alembic/versions/xxx_add_mtime_size.py` - Database migration
- `src/basic_memory/sync/sync_service.py` - Streaming implementation, removed wrapper methods
- `src/basic_memory/services/file_service.py` - Added `read_file_content()`, streaming checksums
- `src/basic_memory/repository/entity_repository.py` - Added `get_all_file_paths()`
- `tests/sync/test_sync_service.py` - Updated circuit breaker test mocks

**Performance Improvements Achieved**:
- Memory usage: Constant per file (64KB chunks) vs full file in memory
- Scan speed: Stat-only scan (no checksums for unchanged files)
- I/O efficiency: True async with aiofiles (no thread pool blocking)
- Network efficiency: 50% fewer calls on TigrisFS via scandir caching
- Architecture: Clean separation of concerns (FileService owns all file I/O)
- Reduced complexity: Removed unnecessary sync_status_service state tracking

**Observability**:
- [x] Added Logfire instrumentation to `sync_file()` and `sync_markdown_file()`
- [x] Logfire disabled by default via `ignore_no_config = true` in pyproject.toml
- [x] No telemetry in FOSS version unless explicitly configured
- [x] Cloud deployment can enable Logfire for performance monitoring

**Next Steps**: Phase 1.5 scan watermark optimization for large project performance.

### Phase 1.5: Scan Watermark Optimization

**Priority: P0 - Critical for Large Projects**

This phase addresses Issue #388 where large projects (1,460+ files) take 7+ minutes for sync operations even when no files have changed.

**Problem Analysis**:

From production data (tenant-0a20eb58):
- Total sync time: 420-450 seconds (7+ minutes) with 0 changes
- Scan phase: 321 seconds (75% of total time)
- Per-file cost: 220ms × 1,460 files = 5+ minutes
- Root cause: Network I/O to TigrisFS for stat operations (even with mtime columns)
- 15 concurrent syncs every 30 seconds compounds the problem

**Current Behavior** (Phase 1):
```python
async def scan(self, directory: Path):
    """Scan filesystem using mtime/size comparison"""
    # Still stats ALL 1,460 files every sync cycle
    async for file_path, stat_info in self._scan_directory_streaming():
        db_entity = await self.entity_repository.get_by_file_path(file_path)
        # Compare mtime/size, skip unchanged files
        # Only checksum if changed (✅ already optimized)
```

**Problem**: Even with mtime optimization, we stat every file on every scan. On TigrisFS (network FUSE mount), this means 1,460 network calls taking 5+ minutes.

**Solution: Scan Watermark + File Count Detection**

Track when we last scanned and how many files existed. Use filesystem-level filtering to only examine files modified since last scan.

**Key Insight**: File count changes signal deletions
- Count same → incremental scan (95% of syncs)
- Count increased → new files found by incremental (4% of syncs)
- Count decreased → files deleted, need full scan (1% of syncs)

**Database Schema Changes**:

Add to Project model:
```python
last_scan_timestamp: float | None  # Unix timestamp of last successful scan start
last_file_count: int | None        # Number of files found in last scan
```

**Implementation Strategy**:

```python
async def scan(self, directory: Path):
    """Smart scan using watermark and file count"""
    project = await self.project_repository.get_current()

    # Step 1: Quick file count (fast on TigrisFS: 1.4s for 1,460 files)
    current_count = await self._quick_count_files(directory)

    # Step 2: Determine scan strategy
    if project.last_file_count is None:
        # First sync ever → full scan
        file_paths = await self._scan_directory_full(directory)
        scan_type = "full_initial"

    elif current_count < project.last_file_count:
        # Files deleted → need full scan to detect which ones
        file_paths = await self._scan_directory_full(directory)
        scan_type = "full_deletions"
        logger.info(f"File count decreased ({project.last_file_count} → {current_count}), running full scan")

    elif project.last_scan_timestamp is not None:
        # Incremental scan: only files modified since last scan
        file_paths = await self._scan_directory_modified_since(
            directory,
            project.last_scan_timestamp
        )
        scan_type = "incremental"
        logger.info(f"Incremental scan since {project.last_scan_timestamp}, found {len(file_paths)} changed files")
    else:
        # Fallback to full scan
        file_paths = await self._scan_directory_full(directory)
        scan_type = "full_fallback"

    # Step 3: Process changed files (existing logic)
    for file_path in file_paths:
        await self._process_file(file_path)

    # Step 4: Update watermark AFTER successful scan
    await self.project_repository.update(
        project.id,
        last_scan_timestamp=time.time(),  # Start of THIS scan
        last_file_count=current_count
    )

    # Step 5: Record metrics
    logfire.metric_counter(f"sync.scan.{scan_type}").add(1)
    logfire.metric_histogram("sync.scan.files_scanned", unit="files").record(len(file_paths))
```

**Helper Methods**:

```python
async def _quick_count_files(self, directory: Path) -> int:
    """Fast file count using find command"""
    # TigrisFS: 1.4s for 1,460 files
    result = await asyncio.create_subprocess_shell(
        f'find "{directory}" -type f | wc -l',
        stdout=asyncio.subprocess.PIPE
    )
    stdout, _ = await result.communicate()
    return int(stdout.strip())

async def _scan_directory_modified_since(
    self,
    directory: Path,
    since_timestamp: float
) -> List[str]:
    """Use find -newermt for filesystem-level filtering"""
    # Convert timestamp to find-compatible format
    since_date = datetime.fromtimestamp(since_timestamp).strftime("%Y-%m-%d %H:%M:%S")

    # TigrisFS: 0.2s for 0 changed files (vs 5+ minutes for full scan)
    result = await asyncio.create_subprocess_shell(
        f'find "{directory}" -type f -newermt "{since_date}"',
        stdout=asyncio.subprocess.PIPE
    )
    stdout, _ = await result.communicate()

    # Convert absolute paths to relative
    file_paths = []
    for line in stdout.decode().splitlines():
        if line:
            rel_path = Path(line).relative_to(directory).as_posix()
            file_paths.append(rel_path)

    return file_paths
```

**TigrisFS Testing Results** (SSH to production-basic-memory-tenant-0a20eb58):

```bash
# Full file count
$ time find . -type f | wc -l
1460
real    0m1.362s  # ✅ Acceptable

# Incremental scan (1 hour window)
$ time find . -type f -newermt "2025-01-20 10:00:00" | wc -l
0
real    0m0.161s  # ✅ 8.5x faster!

# Incremental scan (24 hours)
$ time find . -type f -newermt "2025-01-19 11:00:00" | wc -l
0
real    0m0.239s  # ✅ 5.7x faster!
```

**Conclusion**: `find -newermt` works perfectly on TigrisFS and provides massive speedup.

**Expected Performance Improvements**:

| Scenario | Files Changed | Current Time | With Watermark | Speedup |
|----------|---------------|--------------|----------------|---------|
| No changes (common) | 0 | 420s | ~2s | 210x |
| Few changes | 5-10 | 420s | ~5s | 84x |
| Many changes | 100+ | 420s | ~30s | 14x |
| Deletions (rare) | N/A | 420s | 420s | 1x |

**Full sync breakdown** (1,460 files, 0 changes):
- File count: 1.4s
- Incremental scan: 0.2s
- Database updates: 0.4s
- **Total: ~2s (225x faster)**

**Metrics to Track**:

```python
# Scan type distribution
logfire.metric_counter("sync.scan.full_initial").add(1)
logfire.metric_counter("sync.scan.full_deletions").add(1)
logfire.metric_counter("sync.scan.incremental").add(1)

# Performance metrics
logfire.metric_histogram("sync.scan.duration", unit="ms").record(scan_ms)
logfire.metric_histogram("sync.scan.files_scanned", unit="files").record(file_count)
logfire.metric_histogram("sync.scan.files_changed", unit="files").record(changed_count)

# Watermark effectiveness
logfire.metric_histogram("sync.scan.watermark_age", unit="s").record(
    time.time() - project.last_scan_timestamp
)
```

**Edge Cases Handled**:

1. **First sync**: No watermark → full scan (expected)
2. **Deletions**: File count decreased → full scan (rare but correct)
3. **Clock skew**: Use scan start time, not end time (captures files created during scan)
4. **Scan failure**: Don't update watermark on failure (retry will re-scan)
5. **New files**: Count increased → incremental scan finds them (common, fast)

**Files to Modify**:
- `src/basic_memory/models.py` - Add last_scan_timestamp, last_file_count to Project
- `alembic/versions/xxx_add_scan_watermark.py` - Migration for new columns
- `src/basic_memory/sync/sync_service.py` - Implement watermark logic
- `src/basic_memory/repository/project_repository.py` - Update methods
- `tests/sync/test_sync_watermark.py` - Test watermark behavior

**Test Plan**:
- [x] SSH test on TigrisFS confirms `find -newermt` works (completed)
- [x] Unit tests for scan strategy selection (4 tests)
- [x] Unit tests for file count detection (integrated in strategy tests)
- [x] Integration test: verify incremental scan finds changed files (4 tests)
- [x] Integration test: verify deletion detection triggers full scan (2 tests)
- [ ] Load test on tenant-0a20eb58 (1,460 files) - pending production deployment
- [ ] Verify <3s for no-change sync - pending production deployment

**Implementation Status**: ✅ **COMPLETED**

**Code Changes** (Commit: `fb16055d`):
- ✅ Added `last_scan_timestamp` and `last_file_count` to Project model
- ✅ Created database migration `e7e1f4367280_add_scan_watermark_tracking_to_project.py`
- ✅ Implemented smart scan strategy selection in `sync_service.py`
- ✅ Added `_quick_count_files()` using `find | wc -l` (~1.4s for 1,460 files)
- ✅ Added `_scan_directory_modified_since()` using `find -newermt` (~0.2s)
- ✅ Added `_scan_directory_full()` wrapper for full scans
- ✅ Watermark update logic after successful sync (uses sync START time)
- ✅ Logfire metrics for scan types and performance tracking

**Test Coverage** (18 tests in `test_sync_service_incremental.py`):
- ✅ Scan strategy selection (4 tests)
  - First sync uses full scan
  - File count decreased triggers full scan
  - Same file count uses incremental scan
  - Increased file count uses incremental scan
- ✅ Incremental scan base cases (4 tests)
  - No changes scenario
  - Detects new files
  - Detects modified files
  - Detects multiple changes
- ✅ Deletion detection (2 tests)
  - Single file deletion
  - Multiple file deletions
- ✅ Move detection (2 tests)
  - Moves require full scan (renames don't update mtime)
  - Moves detected in full scan via checksum
- ✅ Watermark update (3 tests)
  - Watermark updated after successful sync
  - Watermark uses sync start time
  - File count accuracy
- ✅ Edge cases (3 tests)
  - Concurrent file changes
  - Empty directory handling
  - Respects .gitignore patterns

**Performance Expectations** (to be verified in production):
- No changes: 420s → ~2s (210x faster)
- Few changes (5-10): 420s → ~5s (84x faster)
- Many changes (100+): 420s → ~30s (14x faster)
- Deletions: 420s → 420s (full scan, rare case)

**Rollout Strategy**:
1. ✅ Code complete and tested (18 new tests, all passing)
2. ✅ Pushed to `phase-0.5-streaming-foundation` branch
3. ⏳ Windows CI tests running
4. 📊 Deploy to staging tenant with watermark optimization
5. 📊 Monitor scan performance metrics via Logfire
6. 📊 Verify no missed files (compare full vs incremental results)
7. 📊 Deploy to production tenant-0a20eb58
8. 📊 Measure actual improvement (expect 420s → 2-3s)

**Success Criteria**:
- ✅ Implementation complete with comprehensive tests
- [ ] No-change syncs complete in <3 seconds (was 420s) - pending production test
- [ ] Incremental scans (95% of cases) use watermark - pending production test
- [ ] Deletion detection works correctly (full scan when needed) - tested in unit tests ✅
- [ ] No files missed due to watermark logic - tested in unit tests ✅
- [ ] Metrics show scan type distribution matches expectations - pending production test

**Next Steps**:
1. Production deployment to tenant-0a20eb58
2. Measure actual performance improvements
3. Monitor metrics for 1 week
4. Phase 2 cloud-specific fixes
5. Phase 3 production measurement and UberSync decision

### Phase 2: Cloud Fixes 

**Resource leaks**:
- [ ] Fix aiohttp session context manager
- [ ] Implement persistent circuit breaker
- [ ] Add memory monitoring/alerts
- [ ] Test on production tenant

**Sync coordination**:
- [ ] Implement hash-based staggering
- [ ] Add jitter to sync intervals
- [ ] Load test with 10 concurrent tenants
- [ ] Verify no thundering herd

### Phase 3: Measurement

**Deploy to production**:
- [ ] Deploy Phase 1+2 changes
- [ ] Downgrade tenant-6d2ff1a3 to 1GB
- [ ] Monitor for OOM incidents

**Collect metrics**:
- [ ] Memory usage patterns
- [ ] Sync duration distributions
- [ ] Concurrent sync load
- [ ] Cost analysis

**UberSync decision**:
- [ ] Review metrics against decision criteria
- [ ] Document findings
- [ ] Create SPEC-18 for UberSync if needed

## Related Issues

### basic-memory (core)
- [#383](https://github.com/basicmachines-co/basic-memory/issues/383) - Refactor sync to use mtime-based scanning
- [#382](https://github.com/basicmachines-co/basic-memory/issues/382) - Optimize memory for large file syncs
- [#371](https://github.com/basicmachines-co/basic-memory/issues/371) - aiofiles for non-blocking I/O (future)

### basic-memory-cloud
- [#198](https://github.com/basicmachines-co/basic-memory-cloud/issues/198) - Memory optimization for sync worker
- [#189](https://github.com/basicmachines-co/basic-memory-cloud/issues/189) - Circuit breaker for infinite retry loops

## References

**Standard sync tools using mtime**:
- rsync: Uses mtime-based comparison by default, only checksums on `--checksum` flag
- rclone: Default is mtime/size, `--checksum` mode optional
- syncthing: Block-level sync with mtime tracking

**fsnotify polling** (future consideration):
- [fsnotify/fsnotify#9](https://github.com/fsnotify/fsnotify/issues/9) - Polling mode for network filesystems

## Notes

### Why Not UberSync Now?

**Premature Optimization**:
- Current problems are algorithmic, not architectural
- No evidence that multi-tenant coordination is the issue
- Single tenant OOM proves algorithm is the problem

**Benefits of Core-First Approach**:
- ✅ Helps all users (CLI + Cloud)
- ✅ Lower risk (no new service)
- ✅ Clear path (issues specify fixes)
- ✅ Can defer UberSync until proven necessary

**When UberSync Makes Sense**:
- >100 active tenants causing resource contention
- Need for tenant tier prioritization (paid > free)
- Centralized observability requirements
- Cost optimization at scale

### Migration Strategy

**Backward Compatibility**:
- New mtime/size columns nullable initially
- Existing entities sync normally (compute mtime on first scan)
- No breaking changes to MCP API
- CLI behavior unchanged

**Rollout**:
1. Deploy to staging with test tenant
2. Validate memory/performance improvements
3. Deploy to production (blue-green)
4. Monitor for 1 week
5. Downgrade tenant machines if successful

## Further Considerations

### Version Control System (VCS) Integration

**Context:** Users frequently request git versioning, and large projects with PDFs/images pose memory challenges.

#### Git-Based Sync

**Approach:** Use git for change detection instead of custom mtime comparison.

```python
# Git automatically tracks changes
repo = git.Repo(project_path)
repo.git.add(A=True)
diff = repo.index.diff('HEAD')

for change in diff:
    if change.change_type == 'M':  # Modified
        await sync_file(change.b_path)
```

**Pros:**
- ✅ Proven, battle-tested change detection
- ✅ Built-in rename/move detection (similarity index)
- ✅ Efficient for cloud sync (git protocol over HTTP)
- ✅ Could enable version history as bonus feature
- ✅ Users want git integration anyway

**Cons:**
- ❌ User confusion (`.git` folder in knowledge base)
- ❌ Conflicts with existing git repos (submodule complexity)
- ❌ Adds dependency (git binary or dulwich/pygit2)
- ❌ Less control over sync logic
- ❌ Doesn't solve large file problem (PDFs still checksummed)
- ❌ Git LFS adds complexity

#### Jujutsu (jj) Alternative

**Why jj is compelling:**

1. **Working Copy as Source of Truth**
   - Git: Staging area is intermediate state
   - Jujutsu: Working copy IS a commit
   - Aligns with "files are source of truth" philosophy!

2. **Automatic Change Tracking**
   - No manual staging required
   - Working copy changes tracked automatically
   - Better fit for sync operations vs git's commit-centric model

3. **Conflict Handling**
   - User edits + sync changes both preserved
   - Operation log vs linear history
   - Built for operations, not just history

**Cons:**
- ❌ New/immature (2020 vs git's 2005)
- ❌ Not universally available
- ❌ Steeper learning curve for users
- ❌ No LFS equivalent yet
- ❌ Still doesn't solve large file checksumming

#### Git Index Format (Hybrid Approach)

**Best of both worlds:** Use git's index format without full git repo.

```python
from dulwich.index import Index  # Pure Python

# Use git index format for tracking
idx = Index(project_path / '.basic-memory' / 'index')

for file in files:
    stat = file.stat()
    if idx.get(file) and idx[file].mtime == stat.st_mtime:
        continue  # Unchanged (git's proven logic)

    await sync_file(file)
    idx[file] = (stat.st_mtime, stat.st_size, sha)
```

**Pros:**
- ✅ Git's proven change detection logic
- ✅ No user-visible `.git` folder
- ✅ No git dependency (pure Python)
- ✅ Full control over sync

**Cons:**
- ❌ Adds dependency (dulwich)
- ❌ Doesn't solve large files
- ❌ No built-in versioning

### Large File Handling

**Problem:** PDFs/images cause memory issues regardless of VCS choice.

**Solutions (Phase 1+):**

**1. Skip Checksums for Large Files**
```python
if stat.st_size > 10_000_000:  # 10MB threshold
    checksum = None  # Use mtime/size only
    logger.info(f"Skipping checksum for {file_path}")
```

**2. Partial Hashing**
```python
if file.suffix in ['.pdf', '.jpg', '.png']:
    # Hash first/last 64KB instead of entire file
    checksum = hash_partial(file, chunk_size=65536)
```

**3. External Blob Storage**
```python
if stat.st_size > 10_000_000:
    blob_id = await upload_to_tigris_blob(file)
    entity.blob_id = blob_id
    entity.file_path = None  # Not in main sync
```

### Recommendation & Timeline

**Phase 0.5-1 (Now):** Custom streaming + mtime
- ✅ Solves urgent memory issues
- ✅ No dependencies
- ✅ Full control
- ✅ Skip checksums for large files (>10MB)
- ✅ Proven pattern (rsync/rclone)

**Phase 2 (After metrics):** Git index format exploration
```python
# Optional: Use git index for tracking if beneficial
from dulwich.index import Index
# No git repo, just index file format
```

**Future (User feature):** User-facing versioning
```python
# Let users opt into VCS:
basic-memory config set versioning git
basic-memory config set versioning jj
basic-memory config set versioning none  # Current behavior

# Integrate with their chosen workflow
# Not forced upon them
```

**Rationale:**
1. **Don't block on VCS decision** - Memory issues are P0
2. **Learn from deployment** - See actual usage patterns
3. **Keep options open** - Can add git/jj later
4. **Files as source of truth** - Core philosophy preserved
5. **Large files need attention regardless** - VCS won't solve that

**Decision Point:**
- If Phase 0.5/1 achieves memory targets → VCS integration deferred
- If users strongly request versioning → Add as opt-in feature
- If change detection becomes bottleneck → Explore git index format

## Agent Assignment

**Phase 1 Implementation**: `python-developer` agent
- Expertise in FastAPI, async Python, database migrations
- Handles basic-memory core changes

**Phase 2 Implementation**: `python-developer` agent
- Same agent continues with cloud-specific fixes
- Maintains consistency across phases

**Phase 3 Review**: `system-architect` agent
- Analyzes metrics and makes UberSync decision
- Creates SPEC-18 if centralized service needed

```

--------------------------------------------------------------------------------
/specs/SPEC-20 Simplified Project-Scoped Rclone Sync.md:
--------------------------------------------------------------------------------

```markdown
---
title: 'SPEC-20: Simplified Project-Scoped Rclone Sync'
date: 2025-01-27
updated: 2025-01-28
status: Implemented
priority: High
goal: Simplify cloud sync by making it project-scoped, safe by design, and closer to native rclone commands
parent: SPEC-8
---

## Executive Summary

The current rclone implementation (SPEC-8) has proven too complex with multiple footguns:
- Two workflows (mount vs bisync) with different directories causing confusion
- Multiple profiles (3 for mount, 3 for bisync) creating too much choice
- Directory conflicts (`~/basic-memory-cloud/` vs `~/basic-memory-cloud-sync/`)
- Auto-discovery of folders leading to errors
- Unclear what syncs and when

This spec proposes a **radical simplification**: project-scoped sync operations that are explicit, safe, and thin wrappers around rclone commands.

## Why

### Current Problems

**Complexity:**
- Users must choose between mount and bisync workflows
- Different directories for different workflows
- Profile selection (6 total profiles) overwhelms users
- Setup requires multiple steps with potential conflicts

**Footguns:**
- Renaming local folder breaks sync (no config tracking)
- Mount directory conflicts with bisync directory
- Auto-discovered folders create phantom projects
- Uninitialized bisync state causes failures
- Unclear which files sync (all files in root directory?)

**User Confusion:**
- "What does `bm sync` actually do?"
- "Is `~/basic-memory-cloud-sync/my-folder/` a project or just a folder?"
- "Why do I have two basic-memory directories?"
- "How do I sync just one project?"

### Design Principles (Revised)

1. **Projects are independent** - Each project manages its own sync state
2. **Global cloud mode** - You're either local or cloud (no per-project flag)
3. **Explicit operations** - No auto-discovery, no magic
4. **Safe by design** - Config tracks state, not filesystem
5. **Thin rclone wrappers** - Stay close to rclone commands
6. **One good way** - Remove choices that don't matter

## What

### New Architecture

#### Core Concepts

1. **Global Cloud Mode** (existing, keep as-is)
   - `cloud_mode_enabled` in config
   - `bm cloud login` enables it, `bm cloud logout` disables it
   - When enabled, ALL Basic Memory operations hit cloud API

2. **Project-Scoped Sync** (new)
   - Each project optionally has a `local_path` for local working copy
   - Sync operations are explicit: `bm project sync --name research`
   - Projects can live anywhere on disk, not forced into sync directory

3. **Simplified rclone Config** (new)
   - Single remote named `bm-cloud` (not `basic-memory-{tenant_id}`)
   - One credential set per user
   - Config lives at `~/.config/rclone/rclone.conf`

#### Command Structure

```bash
# Setup (once)
bm cloud login                          # Authenticate, enable cloud mode
bm cloud setup                          # Install rclone, generate credentials

# Project creation with optional sync
bm project add research                 # Create cloud project (no local sync)
bm project add research --local ~/docs  # Create with local sync configured

# Or configure sync later
bm project sync-setup research ~/docs   # Configure local sync for existing project

# Project-scoped rclone operations
bm project sync --name research         # One-way sync (local → cloud)
bm project bisync --name research       # Two-way sync (local ↔ cloud)
bm project check --name research        # Verify integrity

# Advanced file operations
bm project ls --name research [path]    # List remote files
bm project copy --name research src dst # Copy files

# Batch operations
bm project sync --all                   # Sync all projects with local_sync_path
bm project bisync --all                 # Two-way sync all projects
```

#### Config Model

```json
{
  "cloud_mode": true,
  "cloud_host":  "https://cloud.basicmemory.com",

  "projects": {
    // Used in LOCAL mode only (simple name → path mapping)
    "main": "/Users/user/basic-memory"
  },

  "cloud_projects": {
    // Used in CLOUD mode for sync configuration
    "research": {
      "local_path": "~/Documents/research",
      "last_sync": "2025-01-27T10:00:00Z",
      "bisync_initialized": true
    },
    "work": {
      "local_path": "~/work",
      "last_sync": null,
      "bisync_initialized": false
    }
  }
}
```

**Note:** In cloud mode, the actual project list comes from the API (`GET /projects/projects`). The `cloud_projects` dict only stores local sync configuration.

#### Rclone Config

```ini
# ~/.config/rclone/rclone.conf
[basic-memory-cloud]
type = s3
provider = Other
access_key_id = {scoped_access_key}
secret_access_key = {scoped_secret_key}
endpoint = https://fly.storage.tigris.dev
region = auto
```

### What Gets Removed

- ❌ Mount commands (`bm cloud mount`, `bm cloud unmount`, `bm cloud mount-status`)
- ❌ Profile selection (both mount and bisync profiles)
- ❌ `~/basic-memory-cloud/` directory (mount point)
- ❌ `~/basic-memory-cloud-sync/` directory (forced sync location)
- ❌ Auto-discovery of folders
- ❌ Separate `bisync-setup` command
- ❌ `bisync_config` in config.json
- ❌ Convenience commands (`bm sync`, `bm bisync` without project name)
- ❌ Complex state management for global sync

### What Gets Simplified

- ✅ One setup command: `bm cloud setup`
- ✅ One rclone remote: `bm-cloud`
- ✅ One workflow: project-scoped bisync (remove mount)
- ✅ One set of defaults (balanced settings from SPEC-8)
- ✅ Clear project-to-path mapping in config
- ✅ Explicit sync operations only

### What Gets Added

- ✅ `bm project sync --name <project>` (one-way: local → cloud)
- ✅ `bm project bisync --name <project>` (two-way: local ↔ cloud)
- ✅ `bm project check --name <project>` (integrity verification)
- ✅ `bm project sync-setup <project> <local_path>` (configure sync)
- ✅ `bm project ls --name <project> [path]` (list remote files)
- ✅ `bm project copy --name <project> <src> <dst>` (copy files)
- ✅ `cloud_projects` dict in config.json

## How

### Phase 1: Project Model Updates

**1.1 Update Config Schema**

```python
# basic_memory/config.py
class Config(BaseModel):
    # ... existing fields ...
    cloud_mode: bool = False
    cloud_host: str = "https://cloud.basicmemory.com"

    # Local mode: simple name → path mapping
    projects: dict[str, str] = {}

    # Cloud mode: sync configuration per project
    cloud_projects: dict[str, CloudProjectConfig] = {}


class CloudProjectConfig(BaseModel):
    """Sync configuration for a cloud project."""
    local_path: str                        # Local working directory
    last_sync: Optional[datetime] = None   # Last successful sync
    bisync_initialized: bool = False       # Whether bisync baseline exists
```

**No database changes needed** - sync config lives in `~/.basic-memory/config.json` only.

### Phase 2: Simplified Rclone Configuration

**2.1 Simplify Remote Naming**

```python
# basic_memory/cli/commands/cloud/rclone_config.py

def configure_rclone_remote(
    access_key: str,
    secret_key: str,
    endpoint: str = "https://fly.storage.tigris.dev",
    region: str = "auto",
) -> None:
    """Configure single rclone remote named 'bm-cloud'."""

    config = load_rclone_config()

    # Single remote name (not tenant-specific)
    REMOTE_NAME = "basic-memory-cloud"

    if not config.has_section(REMOTE_NAME):
        config.add_section(REMOTE_NAME)

    config.set(REMOTE_NAME, "type", "s3")
    config.set(REMOTE_NAME, "provider", "Other")
    config.set(REMOTE_NAME, "access_key_id", access_key)
    config.set(REMOTE_NAME, "secret_access_key", secret_key)
    config.set(REMOTE_NAME, "endpoint", endpoint)
    config.set(REMOTE_NAME, "region", region)

    save_rclone_config(config)
```

**2.2 Remove Profile Complexity**

Use single set of balanced defaults (from SPEC-8 Phase 4 testing):
- `conflict_resolve`: `newer` (auto-resolve to most recent)
- `max_delete`: `25` (safety limit)
- `check_access`: `false` (skip for performance)

### Phase 3: Project-Scoped Rclone Commands

**3.1 Core Sync Operations**

```python
# basic_memory/cli/commands/cloud/rclone_commands.py

def get_project_remote(project: Project, bucket_name: str) -> str:
    """Build rclone remote path for project.

    Returns: bm-cloud:bucket-name/app/data/research
    """
    # Strip leading slash from cloud path
    cloud_path = project.path.lstrip("/")
    return f"basic-memory-cloud:{bucket_name}/{cloud_path}"


def project_sync(
    project: Project,
    bucket_name: str,
    dry_run: bool = False,
    verbose: bool = False,
) -> bool:
    """One-way sync: local → cloud.

    Uses rclone sync to make cloud identical to local.
    """
    if not project.local_sync_path:
        raise RcloneError(f"Project {project.name} has no local_sync_path configured")

    local_path = Path(project.local_sync_path).expanduser()
    remote_path = get_project_remote(project, bucket_name)
    filter_path = get_bmignore_filter_path()

    cmd = [
        "rclone", "sync",
        str(local_path),
        remote_path,
        "--filters-file", str(filter_path),
    ]

    if verbose:
        cmd.append("--verbose")
    else:
        cmd.append("--progress")

    if dry_run:
        cmd.append("--dry-run")

    result = subprocess.run(cmd, text=True)
    return result.returncode == 0


def project_bisync(
    project: Project,
    bucket_name: str,
    dry_run: bool = False,
    resync: bool = False,
    verbose: bool = False,
) -> bool:
    """Two-way sync: local ↔ cloud.

    Uses rclone bisync with balanced defaults.
    """
    if not project.local_sync_path:
        raise RcloneError(f"Project {project.name} has no local_sync_path configured")

    local_path = Path(project.local_sync_path).expanduser()
    remote_path = get_project_remote(project, bucket_name)
    filter_path = get_bmignore_filter_path()
    state_path = get_project_bisync_state(project.name)

    # Ensure state directory exists
    state_path.mkdir(parents=True, exist_ok=True)

    cmd = [
        "rclone", "bisync",
        str(local_path),
        remote_path,
        "--create-empty-src-dirs",
        "--resilient",
        "--conflict-resolve=newer",
        "--max-delete=25",
        "--filters-file", str(filter_path),
        "--workdir", str(state_path),
    ]

    if verbose:
        cmd.append("--verbose")
    else:
        cmd.append("--progress")

    if dry_run:
        cmd.append("--dry-run")

    if resync:
        cmd.append("--resync")

    # Check if first run requires resync
    if not resync and not bisync_initialized(project.name) and not dry_run:
        raise RcloneError(
            f"First bisync for {project.name} requires --resync to establish baseline.\n"
            f"Run: bm project bisync --name {project.name} --resync"
        )

    result = subprocess.run(cmd, text=True)
    return result.returncode == 0


def project_check(
    project: Project,
    bucket_name: str,
    one_way: bool = False,
) -> bool:
    """Check integrity between local and cloud.

    Returns True if files match, False if differences found.
    """
    if not project.local_sync_path:
        raise RcloneError(f"Project {project.name} has no local_sync_path configured")

    local_path = Path(project.local_sync_path).expanduser()
    remote_path = get_project_remote(project, bucket_name)
    filter_path = get_bmignore_filter_path()

    cmd = [
        "rclone", "check",
        str(local_path),
        remote_path,
        "--filter-from", str(filter_path),
    ]

    if one_way:
        cmd.append("--one-way")

    result = subprocess.run(cmd, capture_output=True, text=True)
    return result.returncode == 0
```

**3.2 Advanced File Operations**

```python
def project_ls(
    project: Project,
    bucket_name: str,
    path: Optional[str] = None,
) -> list[str]:
    """List files in remote project."""
    remote_path = get_project_remote(project, bucket_name)
    if path:
        remote_path = f"{remote_path}/{path}"

    cmd = ["rclone", "ls", remote_path]
    result = subprocess.run(cmd, capture_output=True, text=True, check=True)
    return result.stdout.splitlines()


def project_copy(
    project: Project,
    bucket_name: str,
    src: str,
    dst: str,
    dry_run: bool = False,
) -> bool:
    """Copy files within project scope."""
    # Implementation similar to sync
    pass
```

### Phase 4: CLI Integration

**4.1 Update Project Commands**

```python
# basic_memory/cli/commands/project.py

@project_app.command("add")
def add_project(
    name: str = typer.Argument(..., help="Name of the project"),
    path: str = typer.Argument(None, help="Path (required for local mode)"),
    local: str = typer.Option(None, "--local", help="Local sync path for cloud mode"),
    set_default: bool = typer.Option(False, "--default", help="Set as default"),
) -> None:
    """Add a new project.

    Cloud mode examples:
      bm project add research                    # No local sync
      bm project add research --local ~/docs     # With local sync

    Local mode example:
      bm project add research ~/Documents/research
    """
    config = ConfigManager().config

    if config.cloud_mode_enabled:
        # Cloud mode: auto-generate cloud path from name
        async def _add_project():
            async with get_client() as client:
                data = {
                    "name": name,
                    "path": generate_permalink(name),
                    "local_sync_path": local,  # Optional
                    "set_default": set_default,
                }
                response = await call_post(client, "/projects/projects", json=data)
                return ProjectStatusResponse.model_validate(response.json())
    else:
        # Local mode: path is required
        if path is None:
            console.print("[red]Error: path argument is required in local mode[/red]")
            raise typer.Exit(1)

        resolved_path = Path(os.path.abspath(os.path.expanduser(path))).as_posix()

        async def _add_project():
            async with get_client() as client:
                data = {
                    "name": name,
                    "path": resolved_path,
                    "set_default": set_default,
                }
                response = await call_post(client, "/projects/projects", json=data)
                return ProjectStatusResponse.model_validate(response.json())

    # Execute and display result
    result = asyncio.run(_add_project())
    console.print(f"[green]{result.message}[/green]")


@project_app.command("sync-setup")
def setup_project_sync(
    name: str = typer.Argument(..., help="Project name"),
    local_path: str = typer.Argument(..., help="Local sync directory"),
) -> None:
    """Configure local sync for an existing cloud project."""

    config = ConfigManager().config

    if not config.cloud_mode_enabled:
        console.print("[red]Error: sync-setup only available in cloud mode[/red]")
        raise typer.Exit(1)

    resolved_path = Path(os.path.abspath(os.path.expanduser(local_path))).as_posix()

    async def _update_project():
        async with get_client() as client:
            data = {"local_sync_path": resolved_path}
            project_permalink = generate_permalink(name)
            response = await call_patch(
                client,
                f"/projects/{project_permalink}",
                json=data,
            )
            return ProjectStatusResponse.model_validate(response.json())

    result = asyncio.run(_update_project())
    console.print(f"[green]{result.message}[/green]")
    console.print(f"\nLocal sync configured: {resolved_path}")
    console.print(f"\nTo sync: bm project bisync --name {name} --resync")
```

**4.2 New Sync Commands**

```python
# basic_memory/cli/commands/project.py

@project_app.command("sync")
def sync_project(
    name: str = typer.Option(..., "--name", help="Project name"),
    all_projects: bool = typer.Option(False, "--all", help="Sync all projects"),
    dry_run: bool = typer.Option(False, "--dry-run", help="Preview changes"),
    verbose: bool = typer.Option(False, "--verbose", help="Show detailed output"),
) -> None:
    """One-way sync: local → cloud (make cloud identical to local)."""

    config = ConfigManager().config
    if not config.cloud_mode_enabled:
        console.print("[red]Error: sync only available in cloud mode[/red]")
        raise typer.Exit(1)

    # Get projects to sync
    if all_projects:
        projects = get_all_sync_projects()
    else:
        projects = [get_project_by_name(name)]

    # Get bucket name
    tenant_info = asyncio.run(get_mount_info())
    bucket_name = tenant_info.bucket_name

    # Sync each project
    for project in projects:
        if not project.local_sync_path:
            console.print(f"[yellow]Skipping {project.name}: no local_sync_path[/yellow]")
            continue

        console.print(f"[blue]Syncing {project.name}...[/blue]")
        try:
            project_sync(project, bucket_name, dry_run=dry_run, verbose=verbose)
            console.print(f"[green]✓ {project.name} synced[/green]")
        except RcloneError as e:
            console.print(f"[red]✗ {project.name} failed: {e}[/red]")


@project_app.command("bisync")
def bisync_project(
    name: str = typer.Option(..., "--name", help="Project name"),
    all_projects: bool = typer.Option(False, "--all", help="Bisync all projects"),
    dry_run: bool = typer.Option(False, "--dry-run", help="Preview changes"),
    resync: bool = typer.Option(False, "--resync", help="Force new baseline"),
    verbose: bool = typer.Option(False, "--verbose", help="Show detailed output"),
) -> None:
    """Two-way sync: local ↔ cloud (bidirectional sync)."""

    # Similar to sync but calls project_bisync()
    pass


@project_app.command("check")
def check_project(
    name: str = typer.Option(..., "--name", help="Project name"),
    one_way: bool = typer.Option(False, "--one-way", help="Check one direction only"),
) -> None:
    """Verify file integrity between local and cloud."""

    # Calls project_check()
    pass


@project_app.command("ls")
def list_project_files(
    name: str = typer.Option(..., "--name", help="Project name"),
    path: str = typer.Argument(None, help="Path within project"),
) -> None:
    """List files in remote project."""

    # Calls project_ls()
    pass
```

**4.3 Update Cloud Setup**

```python
# basic_memory/cli/commands/cloud/core_commands.py

@cloud_app.command("setup")
def cloud_setup() -> None:
    """Set up cloud sync (install rclone and configure credentials)."""

    console.print("[bold blue]Basic Memory Cloud Setup[/bold blue]")
    console.print("Installing rclone and configuring credentials...\n")

    try:
        # Step 1: Install rclone
        console.print("[blue]Step 1: Installing rclone...[/blue]")
        install_rclone()

        # Step 2: Get tenant info
        console.print("\n[blue]Step 2: Getting tenant information...[/blue]")
        tenant_info = asyncio.run(get_mount_info())
        tenant_id = tenant_info.tenant_id

        console.print(f"[green]✓ Tenant: {tenant_id}[/green]")

        # Step 3: Generate credentials
        console.print("\n[blue]Step 3: Generating sync credentials...[/blue]")
        creds = asyncio.run(generate_mount_credentials(tenant_id))

        console.print("[green]✓ Generated credentials[/green]")

        # Step 4: Configure rclone (single remote: bm-cloud)
        console.print("\n[blue]Step 4: Configuring rclone...[/blue]")
        configure_rclone_remote(
            access_key=creds.access_key,
            secret_key=creds.secret_key,
        )

        console.print("\n[bold green]✓ Cloud setup completed![/bold green]")
        console.print("\nNext steps:")
        console.print("  1. Create projects with local sync:")
        console.print("     bm project add research --local ~/Documents/research")
        console.print("\n  2. Or configure sync for existing projects:")
        console.print("     bm project sync-setup research ~/Documents/research")
        console.print("\n  3. Start syncing:")
        console.print("     bm project bisync --name research --resync")

    except Exception as e:
        console.print(f"\n[red]Setup failed: {e}[/red]")
        raise typer.Exit(1)
```

### Phase 5: Cleanup

**5.1 Remove Deprecated Commands**

```python
# Remove from cloud commands:
- cloud mount
- cloud unmount
- cloud mount-status
- bisync-setup
- Individual bisync command (moved to project bisync)

# Remove from root commands:
- bm sync (without project specification)
- bm bisync (without project specification)
```

**5.2 Remove Deprecated Code**

```python
# Files to remove:
- mount_commands.py (entire file)

# Functions to remove from rclone_config.py:
- MOUNT_PROFILES
- get_default_mount_path()
- build_mount_command()
- is_path_mounted()
- get_rclone_processes()
- kill_rclone_process()
- unmount_path()
- cleanup_orphaned_rclone_processes()

# Functions to remove from bisync_commands.py:
- BISYNC_PROFILES (use single default)
- setup_cloud_bisync() (replaced by cloud setup)
- run_bisync_watch() (can add back to project bisync if needed)
- show_bisync_status() (replaced by project list showing sync status)
```

**5.3 Update Configuration Schema**

```python
# Remove from config.json:
- bisync_config (no longer needed)

# The projects array is the source of truth for sync configuration
```

### Phase 6: Documentation Updates

**6.1 Update CLI Documentation**

```markdown
# docs/cloud-cli.md

## Project-Scoped Cloud Sync

Basic Memory cloud sync is project-scoped - each project can optionally be configured with a local working directory that syncs with the cloud.

### Setup (One Time)

1. Authenticate and enable cloud mode:
   ```bash
   bm cloud login
   ```

2. Install rclone and configure credentials:
   ```bash
   bm cloud setup
   ```

### Create Projects with Sync

Create a cloud project with optional local sync:

```bash
# Create project without local sync
bm project add research

# Create project with local sync
bm project add research --local ~/Documents/research
```

Or configure sync for existing (remote) project:

```bash
bm project sync-setup research ~/Documents/research
```

### Syncing Projects

**Two-way sync (recommended):**
```bash
# First time - establish baseline
bm project bisync --name research --resync

# Subsequent syncs
bm project bisync --name research

# Sync all projects with local_sync_path configured
bm project bisync --all
```

**One-way sync (local → cloud):**
```bash
bm project sync --name research
```

**Verify integrity:**
```bash
bm project check --name research
```

### Advanced Operations

**List remote files:**
```bash
bm project ls --name research
bm project ls --name research subfolder
```

**Preview changes before syncing:**
```bash
bm project bisync --name research --dry-run
```

**Verbose output for debugging:**
```bash
bm project bisync --name research --verbose
```

### Project Management

**List projects (shows sync status):**
```bash
bm project list
```

**Update sync path:**
```bash
bm project sync-setup research ~/new/path
```

**Remove project:**
```bash
bm project remove research
```
```

**6.2 Update SPEC-8**

Add to SPEC-8's "Implementation Notes" section:

```markdown
## Superseded by SPEC-20

The initial implementation in SPEC-8 proved too complex with multiple footguns:
- Mount vs bisync workflow confusion
- Multiple profiles creating decision paralysis
- Directory conflicts and auto-discovery errors

SPEC-20 supersedes the sync implementation with a simplified project-scoped approach while keeping the core Tigris infrastructure from SPEC-8.
```

## How to Evaluate

### Success Criteria

**1. Simplified Setup**
- [ ] `bm cloud setup` completes in one command
- [ ] Creates single rclone remote named `bm-cloud`
- [ ] No profile selection required
- [ ] Clear next steps printed after setup

**2. Clear Project Model**
- [ ] Projects can be created with or without local sync
- [ ] `bm project list` shows sync status for each project
- [ ] `local_sync_path` stored in project config
- [ ] Renaming local folder doesn't break sync (config is source of truth)

**3. Working Sync Operations**
- [ ] `bm project sync --name <project>` performs one-way sync
- [ ] `bm project bisync --name <project>` performs two-way sync
- [ ] `bm project check --name <project>` verifies integrity
- [ ] `--all` flag syncs all configured projects
- [ ] `--dry-run` shows changes without applying
- [ ] First bisync requires `--resync` with clear error message

**4. Safety**
- [ ] Cannot sync project without `local_sync_path` configured
- [ ] Bisync state is per-project (not global)
- [ ] `.bmignore` patterns respected
- [ ] Max delete safety (25 files) prevents accidents
- [ ] Clear error messages for all failure modes

**5. Clean Removal**
- [ ] Mount commands removed
- [ ] Profile selection removed
- [ ] Global sync directory removed (`~/basic-memory-cloud-sync/`)
- [ ] Auto-discovery removed
- [ ] Convenience commands (`bm sync`) removed

**6. Documentation**
- [ ] Updated cloud-cli.md with new workflow
- [ ] Clear examples for common operations
- [ ] Migration guide for existing users
- [ ] Troubleshooting section

### Test Scenarios

**Scenario 1: New User Setup**
```bash
# Start fresh
bm cloud login
bm cloud setup
bm project add research --local ~/docs/research
bm project bisync --name research --resync
# Edit files locally
bm project bisync --name research
# Verify changes synced
```

**Scenario 2: Multiple Projects**
```bash
bm project add work --local ~/work
bm project add personal --local ~/personal
bm project bisync --all --resync
# Edit files in both projects
bm project bisync --all
```

**Scenario 3: Project Without Sync**
```bash
bm project add temp-notes
# Try to sync (should fail gracefully)
bm project bisync --name temp-notes
# Should see: "Project temp-notes has no local_sync_path configured"
```

**Scenario 4: Integrity Check**
```bash
bm project bisync --name research
# Manually edit file in cloud UI
bm project check --name research
# Should report differences
bm project bisync --name research
# Should sync changes back to local
```

**Scenario 5: Safety Features**
```bash
# Delete 30 files locally
bm project sync --name research
# Should fail with max delete error
# User reviews and confirms
bm project sync --name research  # After confirming
```

### Performance Targets

- Setup completes in < 30 seconds
- Single project sync < 5 seconds for small changes
- Bisync initialization (--resync) < 10 seconds for typical project
- Batch sync (--all) processes N projects in N*5 seconds

### Breaking Changes

This is a **breaking change** from SPEC-8 implementation:

**Migration Required:**
- Users must run `bm cloud setup` again
- Existing `~/basic-memory-cloud-sync/` directory abandoned
- Projects must be configured with `local_sync_path`
- Mount users must switch to bisync workflow

**Migration Guide:**
```bash
# 1. Note current project locations
bm project list

# 2. Re-run setup
bm cloud setup

# 3. Configure sync for each project
bm project sync-setup research ~/Documents/research
bm project sync-setup work ~/work

# 4. Establish baselines
bm project bisync --all --resync

# 5. Old directory can be deleted
rm -rf ~/basic-memory-cloud-sync/
```

## Dependencies

- **SPEC-8**: TigrisFS Integration (bucket provisioning, credentials)
- Python 3.12+
- rclone 1.64.0+
- Typer (CLI framework)
- Rich (console output)

## Risks

**Risk 1: User Confusion from Breaking Changes**
- Mitigation: Clear migration guide, version bump (0.16.0)
- Mitigation: Detect old config and print migration instructions

**Risk 2: Per-Project Bisync State Complexity**
- Mitigation: Use rclone's `--workdir` to isolate state per project
- Mitigation: Store in `~/.basic-memory/bisync-state/{project_name}/`

**Risk 3: Batch Operations Performance**
- Mitigation: Run syncs sequentially with progress indicators
- Mitigation: Add `--parallel` flag in future if needed

**Risk 4: Lost Features (Mount)**
- Mitigation: Document mount as experimental/advanced feature
- Mitigation: Can restore if users request it

## Open Questions

1. **Should we keep mount as experimental command?**
   - Lean toward: Remove entirely, focus on bisync
   - Alternative: Keep as `bm project mount --name <project>` (advanced)

   - Answer: remove

2. **Batch sync order?**
   - Alphabetical by project name?
   - By last modified time?
   - Let user specify order?
   - answer: project order from api or config

3. **Credential refresh?**
   - Auto-detect expired credentials and re-run credential generation?
   - Or require manual `bm cloud setup` again?
   - answer: manual setup is fine

4. **Watch mode for projects?**
   - `bm project bisync --name research --watch`?
   - Or removed entirely (users can use OS tools)?
   - answer: remove for now: we can add it back later if it's useful

5. **Project path validation?**
   - Ensure `local_path` exists before allowing bisync?
   - Or let rclone error naturally?
   - answer: create if needed, exists is ok

## Implementation Checklist

### Phase 1: Config Schema (1-2 days) ✅
- [x] Add `CloudProjectConfig` model to `basic_memory/config.py`
- [x] Add `cloud_projects: dict[str, CloudProjectConfig]` to Config model
- [x] Test config loading/saving with new schema
- [x] Handle migration from old config format

### Phase 2: Rclone Config Simplification ✅
- [x] Update `configure_rclone_remote()` to use `basic-memory-cloud` as remote name
- [x] Remove `add_tenant_to_rclone_config()` (replaced by configure_rclone_remote)
- [x] Remove tenant_id from remote naming
- [x] Test rclone config generation
- [x] Clean up deprecated import references in bisync_commands.py and core_commands.py

### Phase 3: Project-Scoped Rclone Commands ✅
- [x] Create `src/basic_memory/cli/commands/cloud/rclone_commands.py`
- [x] Implement `get_project_remote(project, bucket_name)`
- [x] Implement `project_sync()` (one-way: local → cloud)
- [x] Implement `project_bisync()` (two-way: local ↔ cloud)
- [x] Implement `project_check()` (integrity verification)
- [x] Implement `project_ls()` (list remote files)
- [x] Add helper: `get_project_bisync_state(project_name)`
- [x] Add helper: `bisync_initialized(project_name)`
- [x] Add helper: `get_bmignore_filter_path()`
- [x] Add `SyncProject` dataclass for project representation
- [x] Write unit tests for rclone commands (22 tests, 99% coverage)
- [x] Temporarily disable mount commands in core_commands.py

### Phase 4: CLI Integration ✅
- [x] Update `project.py`: Add `--local-path` flag to `project add` command
- [x] Update `project.py`: Create `project sync-setup` command
- [x] Create `project.py`: Add `project sync` command
- [x] Create `project.py`: Add `project bisync` command
- [x] Create `project.py`: Add `project check` command
- [x] Create `project.py`: Add `project ls` command
- [x] Create `project.py`: Add `project bisync-reset` command
- [x] Import rclone_commands module and get_mount_info helper
- [x] Update `project list` to show local sync paths in cloud mode
- [x] Update `project list` to conditionally show columns based on config
- [x] Update `project remove` to clean up local directories and bisync state
- [x] Add automatic database sync trigger after file sync operations
- [x] Add path normalization to prevent S3 mount point leakage
- [x] Update `cloud/core_commands.py`: Simplified `cloud setup` command
- [x] Write unit tests for `project add --local-path` (4 tests passing)

### Phase 5: Cleanup ✅
- [x] Remove `mount_commands.py` (entire file)
- [x] Remove mount-related functions from `rclone_config.py`:
  - [x] `MOUNT_PROFILES`
  - [x] `get_default_mount_path()`
  - [x] `build_mount_command()`
  - [x] `is_path_mounted()`
  - [x] `get_rclone_processes()`
  - [x] `kill_rclone_process()`
  - [x] `unmount_path()`
  - [x] `cleanup_orphaned_rclone_processes()`
- [x] Remove from `bisync_commands.py`:
  - [x] `BISYNC_PROFILES` (use single default)
  - [x] `setup_cloud_bisync()`
  - [x] `run_bisync_watch()`
  - [x] `show_bisync_status()`
  - [x] `run_bisync()`
  - [x] `run_check()`
- [x] Remove `bisync_config` from config schema
- [x] Remove deprecated cloud commands:
  - [x] `cloud mount`
  - [x] `cloud unmount`
  - [x] Simplified `cloud setup` to just install rclone and configure credentials
- [x] Remove convenience commands:
  - [x] Root-level `bm sync` (removed - confusing in cloud mode, automatic in local mode)
- [x] Update tests to remove references to deprecated functionality
- [x] All typecheck errors resolved

### Phase 6: Documentation ✅
- [x] Update `docs/cloud-cli.md` with new workflow
- [x] Add troubleshooting section for empty directory issues
- [x] Add troubleshooting section for bisync state corruption
- [x] Document `bisync-reset` command usage
- [x] Update command reference with all new commands
- [x] Add examples for common workflows
- [ ] Add migration guide for existing users (deferred - no users on old system yet)
- [ ] Update SPEC-8 with "Superseded by SPEC-20" note (deferred)

### Testing & Validation ✅
- [x] Test Scenario 1: New user setup (manual testing complete)
- [x] Test Scenario 2: Multiple projects (manual testing complete)
- [x] Test Scenario 3: Project without sync (manual testing complete)
- [x] Test Scenario 4: Integrity check (manual testing complete)
- [x] Test Scenario 5: bisync-reset command (manual testing complete)
- [x] Test cleanup on remove (manual testing complete)
- [x] Verify all commands work end-to-end
- [x] Document known issues (empty directory bisync limitation)
- [ ] Automated integration tests (deferred)
- [ ] Test migration from SPEC-8 implementation (N/A - no users yet)

## Implementation Notes

### Key Improvements Added During Implementation

**1. Path Normalization (Critical Bug Fix)**

**Problem:** Files were syncing to `/app/data/app/data/project/` instead of `/app/data/project/`

**Root cause:**
- S3 bucket contains projects directly (e.g., `basic-memory-llc/`)
- Fly machine mounts bucket at `/app/data/`
- API returns paths like `/app/data/basic-memory-llc` (mount point + project)
- Rclone was using this full path, causing path doubling

**Solution (three layers):**
- API side: Added `normalize_project_path()` in `project_router.py` to strip `/app/data/` prefix
- CLI side: Added defensive normalization in `project.py` commands
- Rclone side: Updated `get_project_remote()` to strip prefix before building remote path

**Files modified:**
- `src/basic_memory/api/routers/project_router.py` - API normalization
- `src/basic_memory/cli/commands/project.py` - CLI normalization
- `src/basic_memory/cli/commands/cloud/rclone_commands.py` - Rclone remote path construction

**2. Automatic Database Sync After File Operations**

**Enhancement:** After successful file sync or bisync, automatically trigger database sync via API

**Implementation:**
- After `project sync`: POST to `/{project}/project/sync`
- After `project bisync`: POST to `/{project}/project/sync` + update config timestamps
- Skip trigger on `--dry-run`
- Graceful error handling with warnings

**Benefit:** Files and database stay in sync automatically without manual intervention

**3. Enhanced Project Removal with Cleanup**

**Enhancement:** `bm project remove` now properly cleans up local artifacts

**Behavior with `--delete-notes`:**
- ✓ Removes project from cloud API
- ✓ Deletes cloud files
- ✓ Removes local sync directory
- ✓ Removes bisync state directory
- ✓ Removes `cloud_projects` config entry

**Behavior without `--delete-notes`:**
- ✓ Removes project from cloud API
- ✗ Keeps local files (shows path in message)
- ✓ Removes bisync state directory (cleanup)
- ✓ Removes `cloud_projects` config entry

**Files modified:**
- `src/basic_memory/cli/commands/project.py` - Enhanced `remove_project()` function

**4. Bisync State Reset Command**

**New command:** `bm project bisync-reset <project>`

**Purpose:** Clear bisync state when it becomes corrupted (e.g., after mixing dry-run and actual runs)

**What it does:**
- Removes all bisync metadata from `~/.basic-memory/bisync-state/{project}/`
- Forces fresh baseline on next `--resync`
- Safe operation (doesn't touch files)
- Also runs automatically on project removal

**Files created:**
- Added `bisync-reset` command to `src/basic_memory/cli/commands/project.py`

**5. Improved UI for Project List**

**Enhancements:**
- Shows "Local Path" column in cloud mode for projects with sync configured
- Conditionally shows/hides columns based on config:
  - Local Path: only in cloud mode
  - Default: only when `default_project_mode` is True
- Uses `no_wrap=True, overflow="fold"` to prevent path truncation
- Applies path normalization to prevent showing mount point details

**Files modified:**
- `src/basic_memory/cli/commands/project.py` - Enhanced `list_projects()` function

**6. Documentation of Known Issues**

**Issue documented:** Rclone bisync limitation with empty directories

**Problem:** "Empty prior Path1 listing. Cannot sync to an empty directory"

**Explanation:** Bisync creates listing files that track state. When both directories are completely empty, these listing files are considered invalid.

**Solution documented:** Add at least one file (like README.md) before running `--resync`

**Files updated:**
- `docs/cloud-cli.md` - Added troubleshooting sections for:
  - Empty directory issues
  - Bisync state corruption
  - Usage of `bisync-reset` command

### Rclone Flag Fix

**Bug fix:** Incorrect rclone flag causing sync failures

**Error:** `unknown flag: --filters-file`

**Fix:** Changed `--filters-file` to correct flag `--filter-from` in both `project_sync()` and `project_bisync()` functions

**Files modified:**
- `src/basic_memory/cli/commands/cloud/rclone_commands.py`

### Test Coverage

**Unit tests added:**
- `tests/cli/test_project_add_with_local_path.py` - 4 tests for `--local-path` functionality
  - Test with local path saves to config
  - Test without local path doesn't save to config
  - Test tilde expansion in paths
  - Test nested directory creation

**Manual testing completed:**
- All 10 project commands tested end-to-end
- Path normalization verified
- Database sync trigger verified
- Cleanup on remove verified
- Bisync state reset verified

## Future Enhancements (Out of Scope)

- **Per-project rclone profiles**: Allow advanced users to override defaults
- **Conflict resolution UI**: Interactive conflict resolution for bisync
- **Sync scheduling**: Automatic periodic sync without watch mode
- **Sync analytics**: Track sync frequency, data transferred, etc.
- **Multi-machine coordination**: Detect and warn about concurrent edits from different machines

```

--------------------------------------------------------------------------------
/tests/repository/test_entity_repository.py:
--------------------------------------------------------------------------------

```python
"""Tests for the EntityRepository."""

from datetime import datetime, timezone

import pytest
import pytest_asyncio
from sqlalchemy import select

from basic_memory import db
from basic_memory.models import Entity, Observation, Relation, Project
from basic_memory.repository.entity_repository import EntityRepository
from basic_memory.utils import generate_permalink


@pytest_asyncio.fixture
async def entity_with_observations(session_maker, sample_entity):
    """Create an entity with observations."""
    async with db.scoped_session(session_maker) as session:
        observations = [
            Observation(
                project_id=sample_entity.project_id,
                entity_id=sample_entity.id,
                content="First observation",
            ),
            Observation(
                project_id=sample_entity.project_id,
                entity_id=sample_entity.id,
                content="Second observation",
            ),
        ]
        session.add_all(observations)
        return sample_entity


@pytest_asyncio.fixture
async def related_results(session_maker, test_project: Project):
    """Create entities with relations between them."""
    async with db.scoped_session(session_maker) as session:
        source = Entity(
            project_id=test_project.id,
            title="source",
            entity_type="test",
            permalink="source/source",
            file_path="source/source.md",
            content_type="text/markdown",
            created_at=datetime.now(timezone.utc),
            updated_at=datetime.now(timezone.utc),
        )
        target = Entity(
            project_id=test_project.id,
            title="target",
            entity_type="test",
            permalink="target/target",
            file_path="target/target.md",
            content_type="text/markdown",
            created_at=datetime.now(timezone.utc),
            updated_at=datetime.now(timezone.utc),
        )
        session.add(source)
        session.add(target)
        await session.flush()

        relation = Relation(
            project_id=test_project.id,
            from_id=source.id,
            to_id=target.id,
            to_name=target.title,
            relation_type="connects_to",
        )
        session.add(relation)

        return source, target, relation


@pytest.mark.asyncio
async def test_create_entity(entity_repository: EntityRepository):
    """Test creating a new entity"""
    entity_data = {
        "project_id": entity_repository.project_id,
        "title": "Test",
        "entity_type": "test",
        "permalink": "test/test",
        "file_path": "test/test.md",
        "content_type": "text/markdown",
        "created_at": datetime.now(timezone.utc),
        "updated_at": datetime.now(timezone.utc),
    }
    entity = await entity_repository.create(entity_data)

    # Verify returned object
    assert entity.id is not None
    assert entity.title == "Test"
    assert isinstance(entity.created_at, datetime)
    assert isinstance(entity.updated_at, datetime)

    # Verify in database
    found = await entity_repository.find_by_id(entity.id)
    assert found is not None
    assert found.id is not None
    assert found.id == entity.id
    assert found.title == entity.title

    # assert relations are eagerly loaded
    assert len(entity.observations) == 0
    assert len(entity.relations) == 0


@pytest.mark.asyncio
async def test_create_all(entity_repository: EntityRepository):
    """Test creating a new entity"""
    entity_data = [
        {
            "project_id": entity_repository.project_id,
            "title": "Test_1",
            "entity_type": "test",
            "permalink": "test/test-1",
            "file_path": "test/test_1.md",
            "content_type": "text/markdown",
            "created_at": datetime.now(timezone.utc),
            "updated_at": datetime.now(timezone.utc),
        },
        {
            "project_id": entity_repository.project_id,
            "title": "Test-2",
            "entity_type": "test",
            "permalink": "test/test-2",
            "file_path": "test/test_2.md",
            "content_type": "text/markdown",
            "created_at": datetime.now(timezone.utc),
            "updated_at": datetime.now(timezone.utc),
        },
    ]
    entities = await entity_repository.create_all(entity_data)

    assert len(entities) == 2
    entity = entities[0]

    # Verify in database
    found = await entity_repository.find_by_id(entity.id)
    assert found is not None
    assert found.id is not None
    assert found.id == entity.id
    assert found.title == entity.title

    # assert relations are eagerly loaded
    assert len(entity.observations) == 0
    assert len(entity.relations) == 0


@pytest.mark.asyncio
async def test_find_by_id(entity_repository: EntityRepository, sample_entity: Entity):
    """Test finding an entity by ID"""
    found = await entity_repository.find_by_id(sample_entity.id)
    assert found is not None
    assert found.id == sample_entity.id
    assert found.title == sample_entity.title

    # Verify against direct database query
    async with db.scoped_session(entity_repository.session_maker) as session:
        stmt = select(Entity).where(Entity.id == sample_entity.id)
        result = await session.execute(stmt)
        db_entity = result.scalar_one()
        assert db_entity.id == found.id
        assert db_entity.title == found.title


@pytest.mark.asyncio
async def test_update_entity(entity_repository: EntityRepository, sample_entity: Entity):
    """Test updating an entity"""
    updated = await entity_repository.update(sample_entity.id, {"title": "Updated title"})
    assert updated is not None
    assert updated.title == "Updated title"

    # Verify in database
    async with db.scoped_session(entity_repository.session_maker) as session:
        stmt = select(Entity).where(Entity.id == sample_entity.id)
        result = await session.execute(stmt)
        db_entity = result.scalar_one()
        assert db_entity.title == "Updated title"


@pytest.mark.asyncio
async def test_update_entity_returns_with_relations_and_observations(
    entity_repository: EntityRepository, entity_with_observations, test_project: Project
):
    """Test that update() returns entity with observations and relations eagerly loaded."""
    entity = entity_with_observations

    # Create a target entity and relation
    async with db.scoped_session(entity_repository.session_maker) as session:
        target = Entity(
            project_id=test_project.id,
            title="target",
            entity_type="test",
            permalink="target/target",
            file_path="target/target.md",
            content_type="text/markdown",
            created_at=datetime.now(timezone.utc),
            updated_at=datetime.now(timezone.utc),
        )
        session.add(target)
        await session.flush()

        relation = Relation(
            project_id=test_project.id,
            from_id=entity.id,
            to_id=target.id,
            to_name=target.title,
            relation_type="connects_to",
        )
        session.add(relation)

    # Now update the entity
    updated = await entity_repository.update(entity.id, {"title": "Updated with relations"})

    # Verify returned entity has observations and relations accessible
    # (would raise DetachedInstanceError if not eagerly loaded)
    assert updated is not None
    assert updated.title == "Updated with relations"

    # Access observations - should NOT raise DetachedInstanceError
    assert len(updated.observations) == 2
    assert updated.observations[0].content in ["First observation", "Second observation"]

    # Access relations - should NOT raise DetachedInstanceError
    assert len(updated.relations) == 1
    assert updated.relations[0].relation_type == "connects_to"
    assert updated.relations[0].to_name == "target"


@pytest.mark.asyncio
async def test_delete_entity(entity_repository: EntityRepository, sample_entity):
    """Test deleting an entity."""
    result = await entity_repository.delete(sample_entity.id)
    assert result is True

    # Verify deletion
    deleted = await entity_repository.find_by_id(sample_entity.id)
    assert deleted is None


@pytest.mark.asyncio
async def test_delete_entity_with_observations(
    entity_repository: EntityRepository, entity_with_observations
):
    """Test deleting an entity cascades to its observations."""
    entity = entity_with_observations

    result = await entity_repository.delete(entity.id)
    assert result is True

    # Verify entity deletion
    deleted = await entity_repository.find_by_id(entity.id)
    assert deleted is None

    # Verify observations were cascaded
    async with db.scoped_session(entity_repository.session_maker) as session:
        query = select(Observation).filter(Observation.entity_id == entity.id)
        result = await session.execute(query)
        remaining_observations = result.scalars().all()
        assert len(remaining_observations) == 0


@pytest.mark.asyncio
async def test_delete_entities_by_type(entity_repository: EntityRepository, sample_entity):
    """Test deleting entities by type."""
    result = await entity_repository.delete_by_fields(entity_type=sample_entity.entity_type)
    assert result is True

    # Verify deletion
    async with db.scoped_session(entity_repository.session_maker) as session:
        query = select(Entity).filter(Entity.entity_type == sample_entity.entity_type)
        result = await session.execute(query)
        remaining = result.scalars().all()
        assert len(remaining) == 0


@pytest.mark.asyncio
async def test_delete_entity_with_relations(entity_repository: EntityRepository, related_results):
    """Test deleting an entity cascades to its relations."""
    source, target, relation = related_results

    # Delete source entity
    result = await entity_repository.delete(source.id)
    assert result is True

    # Verify relation was cascaded
    async with db.scoped_session(entity_repository.session_maker) as session:
        query = select(Relation).filter(Relation.from_id == source.id)
        result = await session.execute(query)
        remaining_relations = result.scalars().all()
        assert len(remaining_relations) == 0

        # Verify target entity still exists
        target_exists = await entity_repository.find_by_id(target.id)
        assert target_exists is not None


@pytest.mark.asyncio
async def test_delete_nonexistent_entity(entity_repository: EntityRepository):
    """Test deleting an entity that doesn't exist."""
    result = await entity_repository.delete(0)
    assert result is False


@pytest_asyncio.fixture
async def test_entities(session_maker, test_project: Project):
    """Create multiple test entities."""
    async with db.scoped_session(session_maker) as session:
        entities = [
            Entity(
                project_id=test_project.id,
                title="entity1",
                entity_type="test",
                permalink="type1/entity1",
                file_path="type1/entity1.md",
                content_type="text/markdown",
                created_at=datetime.now(timezone.utc),
                updated_at=datetime.now(timezone.utc),
            ),
            Entity(
                project_id=test_project.id,
                title="entity2",
                entity_type="test",
                permalink="type1/entity2",
                file_path="type1/entity2.md",
                content_type="text/markdown",
                created_at=datetime.now(timezone.utc),
                updated_at=datetime.now(timezone.utc),
            ),
            Entity(
                project_id=test_project.id,
                title="entity3",
                entity_type="test",
                permalink="type2/entity3",
                file_path="type2/entity3.md",
                content_type="text/markdown",
                created_at=datetime.now(timezone.utc),
                updated_at=datetime.now(timezone.utc),
            ),
        ]
        session.add_all(entities)
        return entities


@pytest.mark.asyncio
async def test_find_by_permalinks(entity_repository: EntityRepository, test_entities):
    """Test finding multiple entities by their type/name pairs."""
    # Test finding multiple entities
    permalinks = [e.permalink for e in test_entities]
    found = await entity_repository.find_by_permalinks(permalinks)
    assert len(found) == 3
    names = {e.title for e in found}
    assert names == {"entity1", "entity2", "entity3"}

    # Test finding subset of entities
    permalinks = [e.permalink for e in test_entities if e.title != "entity2"]
    found = await entity_repository.find_by_permalinks(permalinks)
    assert len(found) == 2
    names = {e.title for e in found}
    assert names == {"entity1", "entity3"}

    # Test with non-existent entities
    permalinks = ["type1/entity1", "type3/nonexistent"]
    found = await entity_repository.find_by_permalinks(permalinks)
    assert len(found) == 1
    assert found[0].title == "entity1"

    # Test empty input
    found = await entity_repository.find_by_permalinks([])
    assert len(found) == 0


@pytest.mark.asyncio
async def test_generate_permalink_from_file_path():
    """Test permalink generation from different file paths."""
    test_cases = [
        ("docs/My Feature.md", "docs/my-feature"),
        ("specs/API (v2).md", "specs/api-v2"),
        ("notes/2024/Q1 Planning!!!.md", "notes/2024/q1-planning"),
        ("test/Über File.md", "test/uber-file"),
        ("docs/my_feature_name.md", "docs/my-feature-name"),
        ("specs/multiple--dashes.md", "specs/multiple-dashes"),
        ("notes/trailing/space/ file.md", "notes/trailing/space/file"),
    ]

    for input_path, expected in test_cases:
        result = generate_permalink(input_path)
        assert result == expected, f"Failed for {input_path}"
        # Verify the result passes validation
        Entity(
            title="test",
            entity_type="test",
            permalink=result,
            file_path=input_path,
            content_type="text/markdown",
        )  # This will raise ValueError if invalid


@pytest.mark.asyncio
async def test_get_by_title(entity_repository: EntityRepository, session_maker):
    """Test getting an entity by title."""
    # Create test entities
    async with db.scoped_session(session_maker) as session:
        entities = [
            Entity(
                project_id=entity_repository.project_id,
                title="Unique Title",
                entity_type="test",
                permalink="test/unique-title",
                file_path="test/unique-title.md",
                content_type="text/markdown",
                created_at=datetime.now(timezone.utc),
                updated_at=datetime.now(timezone.utc),
            ),
            Entity(
                project_id=entity_repository.project_id,
                title="Another Title",
                entity_type="test",
                permalink="test/another-title",
                file_path="test/another-title.md",
                content_type="text/markdown",
                created_at=datetime.now(timezone.utc),
                updated_at=datetime.now(timezone.utc),
            ),
            Entity(
                project_id=entity_repository.project_id,
                title="Another Title",
                entity_type="test",
                permalink="test/another-title-1",
                file_path="test/another-title-1.md",
                content_type="text/markdown",
                created_at=datetime.now(timezone.utc),
                updated_at=datetime.now(timezone.utc),
            ),
        ]
        session.add_all(entities)
        await session.flush()

    # Test getting by exact title
    found = await entity_repository.get_by_title("Unique Title")
    assert found is not None
    assert len(found) == 1
    assert found[0].title == "Unique Title"

    # Test case sensitivity
    found = await entity_repository.get_by_title("unique title")
    assert not found  # Should be case-sensitive

    # Test non-existent title
    found = await entity_repository.get_by_title("Non Existent")
    assert not found

    # Test multiple rows found
    found = await entity_repository.get_by_title("Another Title")
    assert len(found) == 2


@pytest.mark.asyncio
async def test_get_by_file_path(entity_repository: EntityRepository, session_maker):
    """Test getting an entity by title."""
    # Create test entities
    async with db.scoped_session(session_maker) as session:
        entities = [
            Entity(
                project_id=entity_repository.project_id,
                title="Unique Title",
                entity_type="test",
                permalink="test/unique-title",
                file_path="test/unique-title.md",
                content_type="text/markdown",
                created_at=datetime.now(timezone.utc),
                updated_at=datetime.now(timezone.utc),
            ),
        ]
        session.add_all(entities)
        await session.flush()

    # Test getting by file_path
    found = await entity_repository.get_by_file_path("test/unique-title.md")
    assert found is not None
    assert found.title == "Unique Title"

    # Test non-existent file_path
    found = await entity_repository.get_by_file_path("not/a/real/file.md")
    assert found is None


@pytest.mark.asyncio
async def test_get_distinct_directories(entity_repository: EntityRepository, session_maker):
    """Test getting distinct directory paths from entity file paths."""
    # Create test entities with various directory structures
    async with db.scoped_session(session_maker) as session:
        entities = [
            Entity(
                project_id=entity_repository.project_id,
                title="File 1",
                entity_type="test",
                permalink="docs/guides/file1",
                file_path="docs/guides/file1.md",
                content_type="text/markdown",
                created_at=datetime.now(timezone.utc),
                updated_at=datetime.now(timezone.utc),
            ),
            Entity(
                project_id=entity_repository.project_id,
                title="File 2",
                entity_type="test",
                permalink="docs/guides/file2",
                file_path="docs/guides/file2.md",
                content_type="text/markdown",
                created_at=datetime.now(timezone.utc),
                updated_at=datetime.now(timezone.utc),
            ),
            Entity(
                project_id=entity_repository.project_id,
                title="File 3",
                entity_type="test",
                permalink="docs/api/file3",
                file_path="docs/api/file3.md",
                content_type="text/markdown",
                created_at=datetime.now(timezone.utc),
                updated_at=datetime.now(timezone.utc),
            ),
            Entity(
                project_id=entity_repository.project_id,
                title="File 4",
                entity_type="test",
                permalink="specs/file4",
                file_path="specs/file4.md",
                content_type="text/markdown",
                created_at=datetime.now(timezone.utc),
                updated_at=datetime.now(timezone.utc),
            ),
            Entity(
                project_id=entity_repository.project_id,
                title="File 5",
                entity_type="test",
                permalink="notes/2024/q1/file5",
                file_path="notes/2024/q1/file5.md",
                content_type="text/markdown",
                created_at=datetime.now(timezone.utc),
                updated_at=datetime.now(timezone.utc),
            ),
        ]
        session.add_all(entities)
        await session.flush()

    # Get distinct directories
    directories = await entity_repository.get_distinct_directories()

    # Verify directories are extracted correctly
    assert isinstance(directories, list)
    assert len(directories) > 0

    # Should include all parent directories but not filenames
    expected_dirs = {
        "docs",
        "docs/guides",
        "docs/api",
        "notes",
        "notes/2024",
        "notes/2024/q1",
        "specs",
    }
    assert set(directories) == expected_dirs

    # Verify results are sorted
    assert directories == sorted(directories)

    # Verify no file paths are included
    for dir_path in directories:
        assert not dir_path.endswith(".md")


@pytest.mark.asyncio
async def test_get_distinct_directories_empty_db(entity_repository: EntityRepository):
    """Test getting distinct directories when database is empty."""
    directories = await entity_repository.get_distinct_directories()
    assert directories == []


@pytest.mark.asyncio
async def test_find_by_directory_prefix(entity_repository: EntityRepository, session_maker):
    """Test finding entities by directory prefix."""
    # Create test entities in various directories
    async with db.scoped_session(session_maker) as session:
        entities = [
            Entity(
                project_id=entity_repository.project_id,
                title="File 1",
                entity_type="test",
                permalink="docs/file1",
                file_path="docs/file1.md",
                content_type="text/markdown",
                created_at=datetime.now(timezone.utc),
                updated_at=datetime.now(timezone.utc),
            ),
            Entity(
                project_id=entity_repository.project_id,
                title="File 2",
                entity_type="test",
                permalink="docs/guides/file2",
                file_path="docs/guides/file2.md",
                content_type="text/markdown",
                created_at=datetime.now(timezone.utc),
                updated_at=datetime.now(timezone.utc),
            ),
            Entity(
                project_id=entity_repository.project_id,
                title="File 3",
                entity_type="test",
                permalink="docs/api/file3",
                file_path="docs/api/file3.md",
                content_type="text/markdown",
                created_at=datetime.now(timezone.utc),
                updated_at=datetime.now(timezone.utc),
            ),
            Entity(
                project_id=entity_repository.project_id,
                title="File 4",
                entity_type="test",
                permalink="specs/file4",
                file_path="specs/file4.md",
                content_type="text/markdown",
                created_at=datetime.now(timezone.utc),
                updated_at=datetime.now(timezone.utc),
            ),
        ]
        session.add_all(entities)
        await session.flush()

    # Test finding all entities in "docs" directory and subdirectories
    docs_entities = await entity_repository.find_by_directory_prefix("docs")
    assert len(docs_entities) == 3
    file_paths = {e.file_path for e in docs_entities}
    assert file_paths == {"docs/file1.md", "docs/guides/file2.md", "docs/api/file3.md"}

    # Test finding entities in "docs/guides" subdirectory
    guides_entities = await entity_repository.find_by_directory_prefix("docs/guides")
    assert len(guides_entities) == 1
    assert guides_entities[0].file_path == "docs/guides/file2.md"

    # Test finding entities in "specs" directory
    specs_entities = await entity_repository.find_by_directory_prefix("specs")
    assert len(specs_entities) == 1
    assert specs_entities[0].file_path == "specs/file4.md"

    # Test with root directory (empty string)
    all_entities = await entity_repository.find_by_directory_prefix("")
    assert len(all_entities) == 4

    # Test with root directory (slash)
    all_entities = await entity_repository.find_by_directory_prefix("/")
    assert len(all_entities) == 4

    # Test with non-existent directory
    nonexistent = await entity_repository.find_by_directory_prefix("nonexistent")
    assert len(nonexistent) == 0


@pytest.mark.asyncio
async def test_find_by_directory_prefix_basic_fields_only(
    entity_repository: EntityRepository, session_maker
):
    """Test that find_by_directory_prefix returns basic entity fields.

    Note: This method uses use_query_options=False for performance,
    so it doesn't eager load relationships. Directory trees only need
    basic entity fields.
    """
    # Create test entity
    async with db.scoped_session(session_maker) as session:
        entity = Entity(
            project_id=entity_repository.project_id,
            title="Test Entity",
            entity_type="test",
            permalink="docs/test",
            file_path="docs/test.md",
            content_type="text/markdown",
            created_at=datetime.now(timezone.utc),
            updated_at=datetime.now(timezone.utc),
        )
        session.add(entity)
        await session.flush()

    # Query entity by directory prefix
    entities = await entity_repository.find_by_directory_prefix("docs")
    assert len(entities) == 1

    # Verify basic fields are present (all we need for directory trees)
    entity = entities[0]
    assert entity.title == "Test Entity"
    assert entity.file_path == "docs/test.md"
    assert entity.permalink == "docs/test"
    assert entity.entity_type == "test"
    assert entity.content_type == "text/markdown"
    assert entity.updated_at is not None


@pytest.mark.asyncio
async def test_get_all_file_paths(entity_repository: EntityRepository, session_maker):
    """Test getting all file paths for deletion detection during sync."""
    # Create test entities with various file paths
    async with db.scoped_session(session_maker) as session:
        entities = [
            Entity(
                project_id=entity_repository.project_id,
                title="File 1",
                entity_type="test",
                permalink="docs/file1",
                file_path="docs/file1.md",
                content_type="text/markdown",
                created_at=datetime.now(timezone.utc),
                updated_at=datetime.now(timezone.utc),
            ),
            Entity(
                project_id=entity_repository.project_id,
                title="File 2",
                entity_type="test",
                permalink="specs/file2",
                file_path="specs/file2.md",
                content_type="text/markdown",
                created_at=datetime.now(timezone.utc),
                updated_at=datetime.now(timezone.utc),
            ),
            Entity(
                project_id=entity_repository.project_id,
                title="File 3",
                entity_type="test",
                permalink="notes/file3",
                file_path="notes/file3.md",
                content_type="text/markdown",
                created_at=datetime.now(timezone.utc),
                updated_at=datetime.now(timezone.utc),
            ),
        ]
        session.add_all(entities)
        await session.flush()

    # Get all file paths
    file_paths = await entity_repository.get_all_file_paths()

    # Verify results
    assert isinstance(file_paths, list)
    assert len(file_paths) == 3
    assert set(file_paths) == {"docs/file1.md", "specs/file2.md", "notes/file3.md"}


@pytest.mark.asyncio
async def test_get_all_file_paths_empty_db(entity_repository: EntityRepository):
    """Test getting all file paths when database is empty."""
    file_paths = await entity_repository.get_all_file_paths()
    assert file_paths == []


@pytest.mark.asyncio
async def test_get_all_file_paths_performance(entity_repository: EntityRepository, session_maker):
    """Test that get_all_file_paths doesn't load entities or relationships.

    This method is optimized for deletion detection during streaming sync.
    It should only query file_path strings, not full entity objects.
    """
    # Create test entity with observations and relations
    async with db.scoped_session(session_maker) as session:
        # Create entities
        entity1 = Entity(
            project_id=entity_repository.project_id,
            title="Entity 1",
            entity_type="test",
            permalink="test/entity1",
            file_path="test/entity1.md",
            content_type="text/markdown",
            created_at=datetime.now(timezone.utc),
            updated_at=datetime.now(timezone.utc),
        )
        entity2 = Entity(
            project_id=entity_repository.project_id,
            title="Entity 2",
            entity_type="test",
            permalink="test/entity2",
            file_path="test/entity2.md",
            content_type="text/markdown",
            created_at=datetime.now(timezone.utc),
            updated_at=datetime.now(timezone.utc),
        )
        session.add_all([entity1, entity2])
        await session.flush()

        # Add observations to entity1
        observation = Observation(
            project_id=entity_repository.project_id,
            entity_id=entity1.id,
            content="Test observation",
            category="note",
        )
        session.add(observation)

        # Add relation between entities
        relation = Relation(
            project_id=entity_repository.project_id,
            from_id=entity1.id,
            to_id=entity2.id,
            to_name=entity2.title,
            relation_type="relates_to",
        )
        session.add(relation)
        await session.flush()

    # Get all file paths - should be fast and not load relationships
    file_paths = await entity_repository.get_all_file_paths()

    # Verify results - just file paths, no entities or relationships loaded
    assert len(file_paths) == 2
    assert set(file_paths) == {"test/entity1.md", "test/entity2.md"}

    # Result should be list of strings, not entity objects
    for path in file_paths:
        assert isinstance(path, str)


@pytest.mark.asyncio
async def test_get_all_file_paths_project_isolation(
    entity_repository: EntityRepository, session_maker
):
    """Test that get_all_file_paths only returns paths from the current project."""
    # Create entities in the repository's project
    async with db.scoped_session(session_maker) as session:
        entity1 = Entity(
            project_id=entity_repository.project_id,
            title="Project 1 File",
            entity_type="test",
            permalink="test/file1",
            file_path="test/file1.md",
            content_type="text/markdown",
            created_at=datetime.now(timezone.utc),
            updated_at=datetime.now(timezone.utc),
        )
        session.add(entity1)
        await session.flush()

        # Create a second project
        project2 = Project(name="other-project", path="/tmp/other")
        session.add(project2)
        await session.flush()

        # Create entity in different project
        entity2 = Entity(
            project_id=project2.id,
            title="Project 2 File",
            entity_type="test",
            permalink="test/file2",
            file_path="test/file2.md",
            content_type="text/markdown",
            created_at=datetime.now(timezone.utc),
            updated_at=datetime.now(timezone.utc),
        )
        session.add(entity2)
        await session.flush()

    # Get all file paths for project 1
    file_paths = await entity_repository.get_all_file_paths()

    # Should only include files from project 1
    assert len(file_paths) == 1
    assert file_paths == ["test/file1.md"]


# -------------------------------------------------------------------------
# Tests for lightweight permalink resolution methods
# -------------------------------------------------------------------------


@pytest.mark.asyncio
async def test_permalink_exists(entity_repository: EntityRepository, sample_entity: Entity):
    """Test checking if a permalink exists without loading full entity."""
    # Existing permalink should return True
    assert await entity_repository.permalink_exists(sample_entity.permalink) is True  # pyright: ignore [reportArgumentType]

    # Non-existent permalink should return False
    assert await entity_repository.permalink_exists("nonexistent/permalink") is False


@pytest.mark.asyncio
async def test_permalink_exists_project_isolation(
    entity_repository: EntityRepository, session_maker
):
    """Test that permalink_exists respects project isolation."""
    async with db.scoped_session(session_maker) as session:
        # Create entity in repository's project
        entity1 = Entity(
            project_id=entity_repository.project_id,
            title="Project 1 Entity",
            entity_type="test",
            permalink="test/entity1",
            file_path="test/entity1.md",
            content_type="text/markdown",
            created_at=datetime.now(timezone.utc),
            updated_at=datetime.now(timezone.utc),
        )
        session.add(entity1)

        # Create a second project with same permalink
        project2 = Project(name="other-project", path="/tmp/other")
        session.add(project2)
        await session.flush()

        entity2 = Entity(
            project_id=project2.id,
            title="Project 2 Entity",
            entity_type="test",
            permalink="test/entity2",
            file_path="test/entity2.md",
            content_type="text/markdown",
            created_at=datetime.now(timezone.utc),
            updated_at=datetime.now(timezone.utc),
        )
        session.add(entity2)

    # Should find entity1's permalink in project 1
    assert await entity_repository.permalink_exists("test/entity1") is True

    # Should NOT find entity2's permalink (it's in project 2)
    assert await entity_repository.permalink_exists("test/entity2") is False


@pytest.mark.asyncio
async def test_get_file_path_for_permalink(
    entity_repository: EntityRepository, sample_entity: Entity
):
    """Test getting file_path for a permalink without loading full entity."""
    # Existing permalink should return file_path
    file_path = await entity_repository.get_file_path_for_permalink(sample_entity.permalink)  # pyright: ignore [reportArgumentType]
    assert file_path == sample_entity.file_path

    # Non-existent permalink should return None
    result = await entity_repository.get_file_path_for_permalink("nonexistent/permalink")
    assert result is None


@pytest.mark.asyncio
async def test_get_permalink_for_file_path(
    entity_repository: EntityRepository, sample_entity: Entity
):
    """Test getting permalink for a file_path without loading full entity."""
    # Existing file_path should return permalink
    permalink = await entity_repository.get_permalink_for_file_path(sample_entity.file_path)
    assert permalink == sample_entity.permalink

    # Non-existent file_path should return None
    result = await entity_repository.get_permalink_for_file_path("nonexistent/path.md")
    assert result is None


@pytest.mark.asyncio
async def test_get_all_permalinks(entity_repository: EntityRepository, session_maker):
    """Test getting all permalinks without loading full entities."""
    async with db.scoped_session(session_maker) as session:
        entity1 = Entity(
            project_id=entity_repository.project_id,
            title="Entity 1",
            entity_type="test",
            permalink="test/entity1",
            file_path="test/entity1.md",
            content_type="text/markdown",
            created_at=datetime.now(timezone.utc),
            updated_at=datetime.now(timezone.utc),
        )
        entity2 = Entity(
            project_id=entity_repository.project_id,
            title="Entity 2",
            entity_type="test",
            permalink="test/entity2",
            file_path="test/entity2.md",
            content_type="text/markdown",
            created_at=datetime.now(timezone.utc),
            updated_at=datetime.now(timezone.utc),
        )
        session.add_all([entity1, entity2])

    permalinks = await entity_repository.get_all_permalinks()

    assert len(permalinks) == 2
    assert set(permalinks) == {"test/entity1", "test/entity2"}

    # Results should be strings, not entities
    for permalink in permalinks:
        assert isinstance(permalink, str)


@pytest.mark.asyncio
async def test_get_permalink_to_file_path_map(entity_repository: EntityRepository, session_maker):
    """Test getting permalink -> file_path mapping for bulk operations."""
    async with db.scoped_session(session_maker) as session:
        entity1 = Entity(
            project_id=entity_repository.project_id,
            title="Entity 1",
            entity_type="test",
            permalink="test/entity1",
            file_path="test/entity1.md",
            content_type="text/markdown",
            created_at=datetime.now(timezone.utc),
            updated_at=datetime.now(timezone.utc),
        )
        entity2 = Entity(
            project_id=entity_repository.project_id,
            title="Entity 2",
            entity_type="test",
            permalink="test/entity2",
            file_path="test/entity2.md",
            content_type="text/markdown",
            created_at=datetime.now(timezone.utc),
            updated_at=datetime.now(timezone.utc),
        )
        session.add_all([entity1, entity2])

    mapping = await entity_repository.get_permalink_to_file_path_map()

    assert len(mapping) == 2
    assert mapping["test/entity1"] == "test/entity1.md"
    assert mapping["test/entity2"] == "test/entity2.md"


@pytest.mark.asyncio
async def test_get_file_path_to_permalink_map(entity_repository: EntityRepository, session_maker):
    """Test getting file_path -> permalink mapping for bulk operations."""
    async with db.scoped_session(session_maker) as session:
        entity1 = Entity(
            project_id=entity_repository.project_id,
            title="Entity 1",
            entity_type="test",
            permalink="test/entity1",
            file_path="test/entity1.md",
            content_type="text/markdown",
            created_at=datetime.now(timezone.utc),
            updated_at=datetime.now(timezone.utc),
        )
        entity2 = Entity(
            project_id=entity_repository.project_id,
            title="Entity 2",
            entity_type="test",
            permalink="test/entity2",
            file_path="test/entity2.md",
            content_type="text/markdown",
            created_at=datetime.now(timezone.utc),
            updated_at=datetime.now(timezone.utc),
        )
        session.add_all([entity1, entity2])

    mapping = await entity_repository.get_file_path_to_permalink_map()

    assert len(mapping) == 2
    assert mapping["test/entity1.md"] == "test/entity1"
    assert mapping["test/entity2.md"] == "test/entity2"

```

--------------------------------------------------------------------------------
/specs/SPEC-9 Multi-Project Bidirectional Sync Architecture.md:
--------------------------------------------------------------------------------

```markdown
---
title: 'SPEC-9: Multi-Project Bidirectional Sync Architecture'
type: spec
permalink: specs/spec-9-multi-project-bisync
tags:
- cloud
- bisync
- architecture
- multi-project
---

# SPEC-9: Multi-Project Bidirectional Sync Architecture

## Status: ✅ Implementation Complete

**Completed Phases:**
- ✅ Phase 1: Cloud Mode Toggle & Config
- ✅ Phase 2: Bisync Updates (Multi-Project)
- ✅ Phase 3: Sync Command Dual Mode
- ✅ Phase 4: Remove Duplicate Commands & Cloud Mode Auth
- ✅ Phase 5: Mount Updates
- ✅ Phase 6: Safety & Validation
- ⏸️ Phase 7: Cloud-Side Implementation (Deferred to cloud repo)
- ✅ Phase 8.1: Testing (All test scenarios validated)
- ✅ Phase 8.2: Documentation (Core docs complete, demos pending)

**Key Achievements:**
- Unified CLI: `bm sync`, `bm project`, `bm tool` work transparently in both local and cloud modes
- Multi-project sync: Single `bm sync` operation handles all projects bidirectionally
- Cloud mode toggle: `bm cloud login` / `bm cloud logout` switches modes seamlessly
- Integrity checking: `bm cloud check` verifies file matching without data transfer
- Directory isolation: Mount and bisync use separate directories with conflict prevention
- Clean UX: No RCLONE_TEST files, clear error messages, transparent implementation

## Why

**Current State:**
SPEC-8 implemented rclone bisync for cloud file synchronization, but has several architectural limitations:
1. Syncs only a single project subdirectory (`bucket:/basic-memory`)
2. Requires separate `bm cloud` command namespace, duplicating existing CLI commands
3. Users must learn different commands for local vs cloud operations
4. RCLONE_TEST marker files clutter user directories

**Problems:**
1. **Duplicate Commands**: `bm project` vs `bm cloud project`, `bm tool` vs (no cloud equivalent)
2. **Inconsistent UX**: Same operations require different command syntax depending on mode
3. **Single Project Sync**: Users can only sync one project at a time
4. **Manual Coordination**: Creating new projects requires manual coordination between local and cloud
5. **Confusing Artifacts**: RCLONE_TEST marker files confuse users

**Goals:**
- **Unified CLI**: All existing `bm` commands work in both local and cloud mode via toggle
- **Multi-Project Sync**: Single sync operation handles all projects bidirectionally
- **Simple Mode Switch**: `bm cloud login` enables cloud mode, `logout` returns to local
- **Automatic Registration**: Projects auto-register on both local and cloud sides
- **Clean UX**: Remove unnecessary safety checks and confusing artifacts

## Cloud Access Paradigm: The Dropbox Model

**Mental Model Shift:**

Basic Memory cloud access follows the **Dropbox/iCloud paradigm** - not a per-project cloud connection model.

**What This Means:**

```
Traditional Project-Based Model (❌ Not This):
  bm cloud mount --project work      # Mount individual project
  bm cloud mount --project personal  # Mount another project
  bm cloud sync --project research   # Sync specific project
  → Multiple connections, multiple credentials, complex management

Dropbox Model (✅ This):
  bm cloud mount                     # One mount, all projects
  bm sync                            # One sync, all projects
  ~/basic-memory-cloud/              # One folder, all content
  → Single connection, organized by folders (projects)
```

**Key Principles:**

1. **Mount/Bisync = Access Methods, Not Project Tools**
   - Mount: Read-through cache to cloud (like Dropbox folder)
   - Bisync: Bidirectional sync with cloud (like Dropbox sync)
   - Both operate at **bucket level** (all projects)

2. **Projects = Organization Within Cloud Space**
   - Projects are folders within your cloud storage
   - Creating a folder creates a project (auto-discovered)
   - Projects are managed via `bm project` commands

3. **One Cloud Space Per Machine**
   - One set of IAM credentials per tenant
   - One mount point: `~/basic-memory-cloud/`
   - One bisync directory: `~/basic-memory-cloud-sync/` (default)
   - All projects accessible through this single entry point

4. **Why This Works Better**
   - **Credential Management**: One credential set, not N sets per project
   - **Resource Efficiency**: One rclone process, not N processes
   - **Familiar Pattern**: Users already understand Dropbox/iCloud
   - **Operational Simplicity**: `mount` once, `unmount` once
   - **Scales Naturally**: Add projects by creating folders, not reconfiguring cloud access

**User Journey:**

```bash
# Setup cloud access (once)
bm cloud login
bm cloud mount  # or: bm cloud setup for bisync

# Work with projects (create folders as needed)
cd ~/basic-memory-cloud/
mkdir my-new-project
echo "# Notes" > my-new-project/readme.md

# Cloud auto-discovers and registers project
# No additional cloud configuration needed
```

This paradigm shift means **mount and bisync are infrastructure concerns**, while **projects are content organization**. Users think about their knowledge, not about cloud plumbing.

## What

This spec affects:

1. **Cloud Mode Toggle** (`config.py`, `async_client.py`):
   - Add `cloud_mode` flag to `~/.basic-memory/config.json`
   - Set/unset `BASIC_MEMORY_PROXY_URL` based on cloud mode
   - `bm cloud login` enables cloud mode, `logout` disables it
   - All CLI commands respect cloud mode via existing async_client

2. **Unified CLI Commands**:
   - **Remove**: `bm cloud project` commands (duplicate of `bm project`)
   - **Enhance**: `bm sync` co-opted for bisync in cloud mode
   - **Keep**: `bm cloud login/logout/status/setup` for mode management
   - **Result**: `bm project`, `bm tool`, `bm sync` work in both modes

3. **Bisync Integration** (`bisync_commands.py`):
   - Remove `--check-access` (no RCLONE_TEST files)
   - Sync bucket root (all projects), not single subdirectory
   - Project auto-registration before sync
   - `bm sync` triggers bisync in cloud mode
   - `bm sync --watch` for continuous sync

4. **Config Structure**:
   ```json
   {
     "cloud_mode": true,
     "cloud_host": "https://cloud.basicmemory.com",
     "auth_tokens": {...},
     "bisync_config": {
       "profile": "balanced",
       "sync_dir": "~/basic-memory-cloud-sync"
     }
   }
   ```

5. **User Workflows**:
   - **Enable cloud**: `bm cloud login` → all commands work remotely
   - **Create projects**: `bm project add "name"` creates on cloud
   - **Sync files**: `bm sync` runs bisync (all projects)
   - **Use tools**: `bm tool write-note` creates notes on cloud
   - **Disable cloud**: `bm cloud logout` → back to local mode

## Implementation Tasks

### Phase 1: Cloud Mode Toggle & Config (Foundation) ✅

**1.1 Update Config Schema**
- [x] Add `cloud_mode: bool = False` to Config model
- [x] Add `bisync_config: dict` with `profile` and `sync_dir` fields
- [x] Ensure `cloud_host` field exists
- [x] Add config migration for existing users (defaults handle this)

**1.2 Update async_client.py**
- [x] Read `cloud_mode` from config (not just environment)
- [x] Set `BASIC_MEMORY_PROXY_URL` from config when `cloud_mode=true`
- [x] Priority: env var > config.cloud_host (if cloud_mode) > None (local ASGI)
- [ ] Test both local and cloud mode routing

**1.3 Update Login/Logout Commands**
- [x] `bm cloud login`: Set `cloud_mode=true` and save config
- [x] `bm cloud login`: Set `BASIC_MEMORY_PROXY_URL` environment variable
- [x] `bm cloud logout`: Set `cloud_mode=false` and save config
- [x] `bm cloud logout`: Clear `BASIC_MEMORY_PROXY_URL` environment variable
- [x] `bm cloud status`: Show current mode (local/cloud), connection status

**1.4 Skip Initialization in Cloud Mode** ✅
- [x] Update `ensure_initialization()` to check `cloud_mode` and return early
- [x] Document that `config.projects` is only used in local mode
- [x] Cloud manages its own projects via API, no local reconciliation needed

### Phase 2: Bisync Updates (Multi-Project)

**2.1 Remove RCLONE_TEST Files** ✅
- [x] Update all bisync profiles: `check_access=False`
- [x] Remove RCLONE_TEST creation from `setup_cloud_bisync()`
- [x] Remove RCLONE_TEST upload logic
- [ ] Update documentation

**2.2 Sync Bucket Root (All Projects)** ✅
- [x] Change remote path from `bucket:/basic-memory` to `bucket:/` in `build_bisync_command()`
- [x] Update `setup_cloud_bisync()` to use bucket root
- [ ] Test with multiple projects

**2.3 Project Auto-Registration (Bisync)** ✅
- [x] Add `fetch_cloud_projects()` function (GET /proxy/projects/projects)
- [x] Add `scan_local_directories()` function
- [x] Add `create_cloud_project()` function (POST /proxy/projects/projects)
- [x] Integrate into `run_bisync()`: fetch → scan → create missing → sync
- [x] Wait for API 201 response before syncing

**2.4 Bisync Directory Configuration** ✅
- [x] Add `--dir` parameter to `bm cloud bisync-setup`
- [x] Store bisync directory in config
- [x] Default to `~/basic-memory-cloud-sync/`
- [x] Add `validate_bisync_directory()` safety check
- [x] Update `get_default_mount_path()` to return fixed `~/basic-memory-cloud/`

**2.5 Sync/Status API Infrastructure** ✅ (commit d48b1dc)
- [x] Create `POST /{project}/project/sync` endpoint for background sync
- [x] Create `POST /{project}/project/status` endpoint for scan-only status
- [x] Create `SyncReportResponse` Pydantic schema
- [x] Refactor CLI `sync` command to use API endpoint
- [x] Refactor CLI `status` command to use API endpoint
- [x] Create `command_utils.py` with shared `run_sync()` function
- [x] Update `notify_container_sync()` to call `run_sync()` for each project
- [x] Update all tests to match new API-based implementation

### Phase 3: Sync Command Dual Mode ✅

**3.1 Update `bm sync` Command** ✅
- [x] Check `config.cloud_mode` at start
- [x] If `cloud_mode=false`: Run existing local sync
- [x] If `cloud_mode=true`: Run bisync
- [x] Add `--watch` parameter for continuous sync
- [x] Add `--interval` parameter (default 60 seconds)
- [x] Error if `--watch` used in local mode with helpful message

**3.2 Watch Mode for Bisync** ✅
- [x] Implement `run_bisync_watch()` with interval loop
- [x] Add `--interval` parameter (default 60 seconds)
- [x] Handle errors gracefully, continue on failure
- [x] Show sync progress and status

**3.3 Integrity Check Command** ✅
- [x] Implement `bm cloud check` command using `rclone check`
- [x] Read-only operation that verifies file matching
- [x] Error with helpful messages if rclone/bisync not set up
- [x] Support `--one-way` flag for faster checks
- [x] Transparent about rclone implementation
- [x] Suggest `bm sync` to resolve differences

**Implementation Notes:**
- `bm sync` adapts to cloud mode automatically - users don't need separate commands
- `bm cloud bisync` kept for power users with full options (--dry-run, --resync, --profile, --verbose)
- `bm cloud check` provides integrity verification without transferring data
- Design philosophy: Simplicity for everyday use, transparency about implementation

### Phase 4: Remove Duplicate Commands & Cloud Mode Auth ✅

**4.0 Cloud Mode Authentication** ✅
- [x] Update `async_client.py` to support dual auth sources
- [x] FastMCP context auth (cloud service mode) via `inject_auth_header()`
- [x] JWT token file auth (CLI cloud mode) via `CLIAuth.get_valid_token()`
- [x] Automatic token refresh for CLI cloud mode
- [x] Remove `BASIC_MEMORY_PROXY_URL` environment variable dependency
- [x] Simplify to use only `config.cloud_mode` + `config.cloud_host`

**4.1 Delete `bm cloud project` Commands** ✅
- [x] Remove `bm cloud project list` (use `bm project list`)
- [x] Remove `bm cloud project add` (use `bm project add`)
- [x] Update `core_commands.py` to remove project_app subcommands
- [x] Keep only: `login`, `logout`, `status`, `setup`, `mount`, `unmount`, bisync commands
- [x] Remove unused imports (Table, generate_permalink, os)
- [x] Clean up environment variable references in login/logout

**4.2 CLI Command Cloud Mode Integration** ✅
- [x] Add runtime `cloud_mode_enabled` checks to all CLI commands
- [x] Update `list_projects()` to conditionally authenticate based on cloud mode
- [x] Update `remove_project()` to conditionally authenticate based on cloud mode
- [x] Update `run_sync()` to conditionally authenticate based on cloud mode
- [x] Update `get_project_info()` to conditionally authenticate based on cloud mode
- [x] Update `run_status()` to conditionally authenticate based on cloud mode
- [x] Remove auth from `set_default_project()` (local-only command, no cloud version)
- [x] Create CLI integration tests (`test-int/cli/`) to validate both local and cloud modes
- [x] Replace mock-heavy CLI tests with integration tests (deleted 5 mock test files)

**4.3 OAuth Authentication Fixes** ✅
- [x] Restore missing `SettingsConfigDict` in `BasicMemoryConfig`
- [x] Fix environment variable reading with `BASIC_MEMORY_` prefix
- [x] Fix `.env` file loading
- [x] Fix extra field handling for config files
- [x] Resolve `bm cloud login` OAuth failure ("Something went wrong" error)
- [x] Implement PKCE (Proof Key for Code Exchange) for device flow
- [x] Generate code verifier and SHA256 challenge for device authorization
- [x] Send code_verifier with token polling requests
- [x] Support both PKCE-required and PKCE-optional OAuth clients
- [x] Verify authentication flow works end-to-end with staging and production
- [x] Document WorkOS requirement: redirect URI must be configured even for device flow

**4.4 Update Documentation**
- [ ] Update `cloud-cli.md` with cloud mode toggle workflow
- [ ] Document `bm cloud login` → use normal commands
- [ ] Add examples of cloud mode usage
- [ ] Document mount vs bisync directory isolation
- [ ] Add troubleshooting section

### Phase 5: Mount Updates ✅

**5.1 Fixed Mount Directory** ✅
- [x] Change mount path to `~/basic-memory-cloud/` (fixed, no tenant ID)
- [x] Update `get_default_mount_path()` function
- [x] Remove configurability (fixed location)
- [x] Update mount commands to use new path

**5.2 Mount at Bucket Root** ✅
- [x] Ensure mount uses bucket root (not subdirectory)
- [x] Test with multiple projects
- [x] Verify all projects visible in mount

**Implementation:** Mount uses fixed `~/basic-memory-cloud/` directory and syncs entire bucket root `basic-memory-{tenant_id}:{bucket_name}` for all projects.

### Phase 6: Safety & Validation ✅

**6.1 Directory Conflict Prevention** ✅
- [x] Implement `validate_bisync_directory()` check
- [x] Detect if bisync dir == mount dir
- [x] Detect if bisync dir is currently mounted
- [x] Show clear error messages with solutions

**6.2 State Management** ✅
- [x] Use `--workdir` for bisync state
- [x] Store state in `~/.basic-memory/bisync-state/{tenant-id}/`
- [x] Ensure state directory created before bisync

**Implementation:** `validate_bisync_directory()` prevents conflicts by checking directory equality and mount status. State managed in isolated `~/.basic-memory/bisync-state/{tenant-id}/` directory using `--workdir` flag.

### Phase 7: Cloud-Side Implementation (Deferred to Cloud Repo)

**7.1 Project Discovery Service (Cloud)** - Deferred
- [ ] Create `ProjectDiscoveryService` background job
- [ ] Scan `/app/data/` every 2 minutes
- [ ] Auto-register new directories as projects
- [ ] Log discovery events
- [ ] Handle errors gracefully

**7.2 Project API Updates (Cloud)** - Deferred
- [ ] Ensure `POST /proxy/projects/projects` creates directory synchronously
- [ ] Return 201 with project details
- [ ] Ensure directory ready immediately after creation

**Note:** Phase 7 is cloud-side work that belongs in the basic-memory-cloud repository. The CLI-side implementation (Phase 2.3 auto-registration) is complete and working - it calls the existing cloud API endpoints.

### Phase 8: Testing & Documentation

**8.1 Test Scenarios**
- [x] Test: Cloud mode toggle (login/logout)
- [x] Test: Local-first project creation (bisync)
- [x] Test: Cloud-first project creation (API)
- [x] Test: Multi-project bidirectional sync
- [x] Test: MCP tools in cloud mode
- [x] Test: Watch mode continuous sync
- [x] Test: Safety profile protection (max_delete implemented)
- [x] Test: No RCLONE_TEST files (check_access=False in all profiles)
- [x] Test: Mount/bisync directory isolation (validate_bisync_directory)
- [x] Test: Integrity check command (bm cloud check)

**8.2 Documentation**
- [x] Update cloud-cli.md with cloud mode instructions
- [x] Document Dropbox model paradigm
- [x] Update command reference with new commands
- [x] Document `bm sync` dual mode behavior
- [x] Document `bm cloud check` command
- [x] Document directory structure and fixed paths
- [ ] Update README with quick start
- [ ] Create migration guide for existing users
- [ ] Create video/GIF demos

### Success Criteria Checklist

- [x] `bm cloud login` enables cloud mode for all commands
- [x] `bm cloud logout` reverts to local mode
- [x] `bm project`, `bm tool`, `bm sync` work transparently in both modes
- [x] `bm sync` runs bisync in cloud mode, local sync in local mode
- [x] Single sync operation handles all projects bidirectionally
- [x] Local directories auto-create cloud projects via API
- [x] Cloud projects auto-sync to local directories
- [x] No RCLONE_TEST files in user directories
- [x] Bisync profiles provide safety via `max_delete` limits
- [x] `bm sync --watch` enables continuous sync
- [x] No duplicate `bm cloud project` commands (removed)
- [x] `bm cloud check` command for integrity verification
- [ ] Documentation covers cloud mode toggle and workflows
- [ ] Edge cases handled gracefully with clear errors

## How (High Level)

### Architecture Overview

**Cloud Mode Toggle:**
```
┌─────────────────────────────────────┐
│  bm cloud login                     │
│  ├─ Authenticate via OAuth          │
│  ├─ Set cloud_mode: true in config  │
│  └─ Set BASIC_MEMORY_PROXY_URL      │
└─────────────────────────────────────┘
           ↓
┌─────────────────────────────────────┐
│  All CLI commands use async_client  │
│  ├─ async_client checks proxy URL   │
│  ├─ If set: HTTP to cloud           │
│  └─ If not: Local ASGI              │
└─────────────────────────────────────┘
           ↓
┌─────────────────────────────────────┐
│  bm project add "work"              │
│  bm tool write-note ...             │
│  bm sync (triggers bisync)          │
│  → All work against cloud           │
└─────────────────────────────────────┘
```

**Storage Hierarchy:**
```
Cloud Container:                   Bucket:                      Local Sync Dir:
/app/data/ (mounted) ←→ production-tenant-{id}/ ←→ ~/basic-memory-cloud-sync/
├── basic-memory/               ├── basic-memory/               ├── basic-memory/
│   ├── notes/                  │   ├── notes/                  │   ├── notes/
│   └── concepts/               │   └── concepts/               │   └── concepts/
├── work-project/               ├── work-project/               ├── work-project/
│   └── tasks/                  │   └── tasks/                  │   └── tasks/
└── personal/                   └── personal/                   └── personal/
    └── journal/                    └── journal/                    └── journal/

Bidirectional sync via rclone bisync
```

### Sync Flow

**`bm sync` execution (in cloud mode):**

1. **Check cloud mode**
   ```python
   if not config.cloud_mode:
       # Run normal local file sync
       run_local_sync()
       return

   # Cloud mode: Run bisync
   ```

2. **Fetch cloud projects**
   ```python
   # GET /proxy/projects/projects (via async_client)
   cloud_projects = fetch_cloud_projects()
   cloud_project_names = {p["name"] for p in cloud_projects["projects"]}
   ```

3. **Scan local sync directory**
   ```python
   sync_dir = config.bisync_config["sync_dir"]  # ~/basic-memory-cloud-sync
   local_dirs = [d.name for d in sync_dir.iterdir()
                 if d.is_dir() and not d.name.startswith('.')]
   ```

4. **Create missing cloud projects**
   ```python
   for dir_name in local_dirs:
       if dir_name not in cloud_project_names:
           # POST /proxy/projects/projects (via async_client)
           create_cloud_project(name=dir_name)
           # Blocks until 201 response
   ```

5. **Run bisync on bucket root**
   ```bash
   rclone bisync \
     ~/basic-memory-cloud-sync \
     basic-memory-{tenant}:{bucket} \
     --filters-file ~/.basic-memory/.bmignore.rclone \
     --conflict-resolve=newer \
     --max-delete=25
   # Syncs ALL project subdirectories bidirectionally
   ```

6. **Notify cloud to refresh** (commit d48b1dc)
   ```python
   # After rclone bisync completes, sync each project's database
   for project in cloud_projects:
       # POST /{project}/project/sync (via async_client)
       # Triggers background sync for this project
       await run_sync(project=project_name)
   ```

### Key Changes

**1. Cloud Mode via Config**

**Config changes:**
```python
class Config:
    cloud_mode: bool = False
    cloud_host: str = "https://cloud.basicmemory.com"
    bisync_config: dict = {
        "profile": "balanced",
        "sync_dir": "~/basic-memory-cloud-sync"
    }
```

**async_client.py behavior:**
```python
def create_client() -> AsyncClient:
    # Check config first, then environment
    config = ConfigManager().config
    proxy_url = os.getenv("BASIC_MEMORY_PROXY_URL") or \
                (config.cloud_host if config.cloud_mode else None)

    if proxy_url:
        return AsyncClient(base_url=proxy_url)  # HTTP to cloud
    else:
        return AsyncClient(transport=ASGITransport(...))  # Local ASGI
```

**2. Login/Logout Sets Cloud Mode**

```python
# bm cloud login
async def login():
    # Existing OAuth flow...
    success = await auth.login()
    if success:
        config.cloud_mode = True
        config.save()
        os.environ["BASIC_MEMORY_PROXY_URL"] = config.cloud_host
```

```python
# bm cloud logout
def logout():
    config.cloud_mode = False
    config.save()
    os.environ.pop("BASIC_MEMORY_PROXY_URL", None)
```

**3. Remove Duplicate Commands**

**Delete:**
- `bm cloud project list` → use `bm project list`
- `bm cloud project add` → use `bm project add`

**Keep:**
- `bm cloud login` - Enable cloud mode
- `bm cloud logout` - Disable cloud mode
- `bm cloud status` - Show current mode & connection
- `bm cloud setup` - Initial bisync setup
- `bm cloud bisync` - Power-user command with full options
- `bm cloud check` - Verify file integrity between local and cloud

**4. Sync Command Dual Mode**

```python
# bm sync
def sync_command(watch: bool = False, profile: str = "balanced"):
    config = ConfigManager().config

    if config.cloud_mode:
        # Run bisync for cloud sync
        run_bisync(profile=profile, watch=watch)
    else:
        # Run local file sync
        run_local_sync()
```

**5. Remove RCLONE_TEST Files**

```python
# All profiles: check_access=False
BISYNC_PROFILES = {
    "safe": RcloneBisyncProfile(check_access=False, max_delete=10),
    "balanced": RcloneBisyncProfile(check_access=False, max_delete=25),
    "fast": RcloneBisyncProfile(check_access=False, max_delete=50),
}
```

**6. Sync Bucket Root (All Projects)**

```python
# Sync entire bucket, not subdirectory
rclone_remote = f"basic-memory-{tenant_id}:{bucket_name}"
```

## How to Evaluate

### Test Scenarios

**1. Cloud Mode Toggle**
```bash
# Start in local mode
bm project list
# → Shows local projects

# Enable cloud mode
bm cloud login
# → Authenticates, sets cloud_mode=true

bm project list
# → Now shows cloud projects (same command!)

# Disable cloud mode
bm cloud logout

bm project list
# → Back to local projects
```

**Expected:** ✅ Single command works in both modes

**2. Local-First Project Creation (Cloud Mode)**
```bash
# Enable cloud mode
bm cloud login

# Create new project locally in sync dir
mkdir ~/basic-memory-cloud-sync/my-research
echo "# Research Notes" > ~/basic-memory-cloud-sync/my-research/index.md

# Run sync (triggers bisync in cloud mode)
bm sync

# Verify:
# - Cloud project created automatically via API
# - Files synced to bucket:/my-research/
# - Cloud database updated
# - `bm project list` shows new project
```

**Expected:** ✅ Project visible in cloud project list

**3. Cloud-First Project Creation**
```bash
# In cloud mode
bm project add "work-notes"
# → Creates project on cloud (via async_client HTTP)

# Run sync
bm sync

# Verify:
# - Local directory ~/basic-memory-cloud-sync/work-notes/ created
# - Files sync bidirectionally
# - Can use `bm tool write-note` to add content remotely
```

**Expected:** ✅ Project accessible via all CLI commands

**4. Multi-Project Bidirectional Sync**
```bash
# Setup: 3 projects in cloud mode
# Modify files in all 3 locally and remotely

bm sync

# Verify:
# - All 3 projects sync simultaneously
# - Changes propagate correctly
# - No cross-project interference
```

**Expected:** ✅ All projects in sync state

**5. MCP Tools Work in Cloud Mode**
```bash
# In cloud mode
bm tool write-note \
  --title "Meeting Notes" \
  --folder "work-notes" \
  --content "Discussion points..."

# Verify:
# - Note created on cloud (via async_client HTTP)
# - Next `bm sync` pulls note to local
# - Note appears in ~/basic-memory-cloud-sync/work-notes/
```

**Expected:** ✅ Tools work transparently in cloud mode

**6. Watch Mode Continuous Sync**
```bash
# In cloud mode
bm sync --watch

# While running:
# - Create local folder → auto-creates cloud project
# - Edit files locally → syncs to cloud
# - Edit files remotely → syncs to local
# - Create project via API → appears locally

# Verify:
# - Continuous bidirectional sync
# - New projects handled automatically
# - No manual intervention needed
```

**Expected:** ✅ Seamless continuous sync

**7. Safety Profile Protection**
```bash
# Create project with 15 files locally
# Delete project from cloud (simulate error)

bm sync --profile safe

# Verify:
# - Bisync detects 15 pending deletions
# - Exceeds max_delete=10 limit
# - Aborts with clear error
# - No files deleted locally
```

**Expected:** ✅ Safety limit prevents data loss

**8. No RCLONE_TEST Files**
```bash
# After setup and multiple syncs
ls -la ~/basic-memory-cloud-sync/

# Verify:
# - No RCLONE_TEST files
# - No .rclone state files (in ~/.basic-memory/bisync-state/)
# - Clean directory structure
```

**Expected:** ✅ User directory stays clean

### Success Criteria

- [x] `bm cloud login` enables cloud mode for all commands
- [x] `bm cloud logout` reverts to local mode
- [x] `bm project`, `bm tool`, `bm sync` work in both modes transparently
- [x] `bm sync` runs bisync in cloud mode, local sync in local mode
- [x] Single sync operation handles all projects bidirectionally
- [x] Local directories auto-create cloud projects via API
- [x] Cloud projects auto-sync to local directories
- [x] No RCLONE_TEST files in user directories
- [x] Bisync profiles provide safety via `max_delete` limits
- [x] `bm sync --watch` enables continuous sync
- [x] No duplicate `bm cloud project` commands (removed)
- [x] `bm cloud check` command for integrity verification
- [ ] Documentation covers cloud mode toggle and workflows
- [ ] Edge cases handled gracefully with clear errors

## Notes

### API Contract

**Cloud must provide:**

1. **Project Management APIs:**
   - `GET /proxy/projects/projects` - List all projects
   - `POST /proxy/projects/projects` - Create project synchronously
   - `POST /proxy/sync` - Trigger cache refresh

2. **Project Discovery Service (Background):**
   - **Purpose**: Auto-register projects created via mount, direct bucket uploads, or any non-API method
   - **Interval**: Every 2 minutes
   - **Behavior**:
     - Scan `/app/data/` for directories
     - Register any directory not already in project database
     - Log discovery events
   - **Implementation**:
     ```python
     class ProjectDiscoveryService:
         """Background service to auto-discover projects from filesystem."""

         async def run(self):
             """Scan /app/data/ and register new project directories."""
             data_path = Path("/app/data")

             for dir_path in data_path.iterdir():
                 # Skip hidden and special directories
                 if not dir_path.is_dir() or dir_path.name.startswith('.'):
                     continue

                 project_name = dir_path.name

                 # Check if project already registered
                 project = await self.project_repo.get_by_name(project_name)
                 if not project:
                     # Auto-register new project
                     await self.project_repo.create(
                         name=project_name,
                         path=str(dir_path)
                     )
                     logger.info(f"Auto-discovered project: {project_name}")
     ```

**Project Creation (API-based):**
- API creates `/app/data/{project-name}/` directory
- Registers project in database
- Returns 201 with project details
- Directory ready for bisync immediately

**Project Creation (Discovery-based):**
- User creates folder via mount: `~/basic-memory-cloud/new-project/`
- Files appear in `/app/data/new-project/` (mounted bucket)
- Discovery service finds directory on next scan (within 2 minutes)
- Auto-registers as project
- User sees project in `bm project list` after discovery

**Why Both Methods:**
- **API**: Immediate registration when using bisync (client-side scan + API call)
- **Discovery**: Delayed registration when using mount (no API call hook)
- **Result**: Projects created ANY way (API, mount, bisync, WebDAV) eventually registered
- **Trade-off**: 2-minute delay for mount-created projects is acceptable

### Mount vs Bisync Directory Isolation

**Critical Safety Requirement**: Mount and bisync MUST use different directories to prevent conflicts.

**The Dropbox Model Applied:**

Both mount and bisync operate at **bucket level** (all projects), following the Dropbox/iCloud paradigm:

```
~/basic-memory-cloud/          # Mount: Read-through cache (like Dropbox folder)
├── work-notes/
├── personal/
└── research/

~/basic-memory-cloud-sync/         # Bisync: Bidirectional sync (like Dropbox sync folder)
├── work-notes/
├── personal/
└── research/
```

**Mount Directory (Fixed):**
```bash
# Fixed location, not configurable
~/basic-memory-cloud/
```
- **Scope**: Entire bucket (all projects)
- **Method**: NFS mount via `rclone nfsmount`
- **Behavior**: Read-through cache to cloud bucket
- **Credentials**: One IAM credential set per tenant
- **Process**: One rclone mount process
- **Use Case**: Quick access, browsing, light editing
- **Known Issue**: Obsidian compatibility problems with NFS
- **Not Configurable**: Fixed location prevents user error

**Why Fixed Location:**
- One mount point per machine (like `/Users/you/Dropbox`)
- Prevents credential proliferation (one credential set, not N)
- Prevents multiple mount processes (resource efficiency)
- Familiar pattern users already understand
- Simple operations: `mount` once, `unmount` once

**Bisync Directory (User Configurable):**
```bash
# Default location
~/basic-memory-cloud-sync/

# User can override
bm cloud setup --dir ~/my-knowledge-base
```
- **Scope**: Entire bucket (all projects)
- **Method**: Bidirectional sync via `rclone bisync`
- **Behavior**: Full local copy with periodic sync
- **Credentials**: Same IAM credential set as mount
- **Use Case**: Full offline access, reliable editing, Obsidian support
- **Configurable**: Users may want specific locations (external drive, existing folder structure)

**Why User Configurable:**
- Users have preferences for where local copies live
- May want sync folder on external drive
- May want to integrate with existing folder structure
- Default works for most, option available for power users

**Conflict Prevention:**
```python
def validate_bisync_directory(bisync_dir: Path):
    """Ensure bisync directory doesn't conflict with mount."""
    mount_dir = Path.home() / "basic-memory-cloud"

    if bisync_dir.resolve() == mount_dir.resolve():
        raise BisyncError(
            f"Cannot use {bisync_dir} for bisync - it's the mount directory!\n"
            f"Mount and bisync must use different directories.\n\n"
            f"Options:\n"
            f"  1. Use default: ~/basic-memory-cloud-sync/\n"
            f"  2. Specify different directory: --dir ~/my-sync-folder"
        )

    # Check if mount is active at this location
    result = subprocess.run(["mount"], capture_output=True, text=True)
    if str(bisync_dir) in result.stdout and "rclone" in result.stdout:
        raise BisyncError(
            f"{bisync_dir} is currently mounted via 'bm cloud mount'\n"
            f"Cannot use mounted directory for bisync.\n\n"
            f"Either:\n"
            f"  1. Unmount first: bm cloud unmount\n"
            f"  2. Use different directory for bisync"
        )
```

**Why This Matters:**
- Mounting and syncing the SAME directory would create infinite loops
- rclone mount → bisync detects changes → syncs to bucket → mount sees changes → triggers bisync → ∞
- Separate directories = clean separation of concerns
- Mount is read-heavy caching layer, bisync is write-heavy bidirectional sync

### Future Enhancements

**Phase 2 (Not in this spec):**
- **Near Real-Time Sync**: Integrate `watch_service.py` with cloud mode
  - Watch service detects local changes (already battle-tested)
  - Queue changes in memory
  - Use `rclone copy` for individual file sync (near instant)
  - Example: `rclone copyto ~/sync/project/file.md tenant:{bucket}/project/file.md`
  - Fallback to full `rclone bisync` every N seconds for bidirectional changes
  - Provides near real-time sync without polling overhead
- Per-project bisync profiles (different safety levels per project)
- Selective project sync (exclude specific projects from sync)
- Project deletion workflow (cascade to cloud/local)
- Conflict resolution UI/CLI

**Phase 3:**
- Project sharing between tenants
- Incremental backup/restore
- Sync statistics and bandwidth monitoring
- Mobile app integration with cloud mode

### Related Specs

- **SPEC-8**: TigrisFS Integration - Original bisync implementation
- **SPEC-6**: Explicit Project Parameter Architecture - Multi-project foundations
- **SPEC-5**: CLI Cloud Upload via WebDAV - Cloud file operations

### Implementation Notes

**Architectural Simplifications:**
- **Unified CLI**: Eliminated duplicate commands by using mode toggle
- **Single Entry Point**: All commands route through `async_client` which handles mode
- **Config-Driven**: Cloud mode stored in persistent config, not just environment
- **Transparent Routing**: Existing commands work without modification in cloud mode

**Complexity Trade-offs:**
- Removed: Separate `bm cloud project` command namespace
- Removed: Complex state detection for new projects
- Removed: RCLONE_TEST marker file management
- Added: Simple cloud_mode flag and config integration
- Added: Simple project list comparison before sync
- Relied on: Existing bisync profile safety mechanisms
- Result: Significantly simpler, more maintainable code

**User Experience:**
- **Mental Model**: "Toggle cloud mode, use normal commands"
- **No Learning Curve**: Same commands work locally and in cloud
- **Minimal Config**: Just login/logout to switch modes
- **Safety**: Profile system gives users control over safety/speed trade-offs
- **"Just Works"**: Create folders anywhere, they sync automatically

**Migration Path:**
- Existing `bm cloud project` users: Use `bm project` instead
- Existing `bm cloud bisync` becomes `bm sync` in cloud mode
- Config automatically migrates on first `bm cloud login`


## Testing


Initial Setup (One Time)

1. Login to cloud and enable cloud mode:
bm cloud login
# → Authenticates via OAuth
# → Sets cloud_mode=true in config
# → Sets BASIC_MEMORY_PROXY_URL environment variable
# → All CLI commands now route to cloud

2. Check cloud mode status:
bm cloud status
# → Shows: Mode: Cloud (enabled)
# → Shows: Host: https://cloud.basicmemory.com
# → Checks cloud health

3. Set up bidirectional sync:
bm cloud bisync-setup
# Or with custom directory:
bm cloud bisync-setup --dir ~/my-sync-folder

# This will:
# → Install rclone (if not already installed)
# → Get tenant info (tenant_id, bucket_name)
# → Generate scoped IAM credentials
# → Configure rclone with credentials
# → Create sync directory (default: ~/basic-memory-cloud-sync/)
# → Validate no conflict with mount directory
# → Run initial --resync to establish baseline

Normal Usage

4. Create local project and sync:
# Create a local project directory
mkdir ~/basic-memory-cloud-sync/my-research
echo "# Research Notes" > ~/basic-memory-cloud-sync/my-research/readme.md

# Run sync
bm cloud bisync

# Auto-magic happens:
# → Checks for new local directories
# → Finds "my-research" not in cloud
# → Creates project on cloud via POST /proxy/projects/projects
# → Runs bidirectional sync (all projects)
# → Syncs to bucket root (all projects synced together)

5. Watch mode for continuous sync:
bm cloud bisync --watch
# Or with custom interval:
bm cloud bisync --watch --interval 30

# → Syncs every 60 seconds (or custom interval)
# → Auto-registers new projects on each run
# → Press Ctrl+C to stop

6. Check bisync status:
bm cloud bisync-status
# → Shows tenant ID
# → Shows sync directory path
# → Shows initialization status
# → Shows last sync time
# → Lists available profiles (safe/balanced/fast)

7. Manual sync with different profiles:
# Safe mode (max 10 deletes, preserves conflicts)
bm cloud bisync --profile safe

# Balanced mode (max 25 deletes, auto-resolve to newer) - default
bm cloud bisync --profile balanced

# Fast mode (max 50 deletes, skip verification)
bm cloud bisync --profile fast

8. Dry run to preview changes:
bm cloud bisync --dry-run
# → Shows what would be synced without making changes

9. Force resync (if needed):
bm cloud bisync --resync
# → Establishes new baseline
# → Use if sync state is corrupted

10. Check file integrity:
bm cloud check
# → Verifies all files match between local and cloud
# → Read-only operation (no data transfer)
# → Shows differences if any found

# Faster one-way check
bm cloud check --one-way
# → Only checks for missing files on destination

Verify Cloud Mode Integration

11. Test that all commands work in cloud mode:
# List cloud projects (not local)
bm project list

# Create project on cloud
bm project add "work-notes"

# Use MCP tools against cloud
bm tool write-note --title "Test" --folder "my-research" --content "Hello"

# All of these work against cloud because cloud_mode=true

12. Switch back to local mode:
bm cloud logout
# → Sets cloud_mode=false
# → Clears BASIC_MEMORY_PROXY_URL
# → All commands now work locally again

Expected Directory Structure

~/basic-memory-cloud-sync/          # Your local sync directory
├── my-research/                    # Auto-created cloud project
│   ├── readme.md
│   └── notes.md
├── work-notes/                     # Another project
│   └── tasks.md
└── personal/                       # Another project
  └── journal.md

# All sync bidirectionally with:
bucket:/                            # Cloud bucket root
├── my-research/
├── work-notes/
└── personal/

Key Points to Test

1. ✅ Cloud mode toggle works (login/logout)
2. ✅ Bisync setup validates directory (no conflict with mount)
3. ✅ Local directories auto-create cloud projects
4. ✅ All projects sync together (bucket root)
5. ✅ No RCLONE_TEST files created
6. ✅ Changes sync bidirectionally
7. ✅ Watch mode continuous sync works
8. ✅ Profile safety limits work (max_delete)
9. ✅ `bm sync` adapts to cloud mode automatically
10. ✅ `bm cloud check` verifies file integrity without side effects

```

--------------------------------------------------------------------------------
/tests/mcp/test_tool_write_note.py:
--------------------------------------------------------------------------------

```python
"""Tests for note tools that exercise the full stack with SQLite."""

from textwrap import dedent
import pytest

from basic_memory.mcp.tools import write_note, read_note, delete_note
from basic_memory.utils import normalize_newlines


@pytest.mark.asyncio
async def test_write_note(app, test_project):
    """Test creating a new note.

    Should:
    - Create entity with correct type and content
    - Save markdown content
    - Handle tags correctly
    - Return valid permalink
    """
    result = await write_note.fn(
        project=test_project.name,
        title="Test Note",
        folder="test",
        content="# Test\nThis is a test note",
        tags=["test", "documentation"],
    )

    assert result
    assert "# Created note" in result
    assert f"project: {test_project.name}" in result
    assert "file_path: test/Test Note.md" in result
    assert "permalink: test/test-note" in result
    assert "## Tags" in result
    assert "- test, documentation" in result
    assert f"[Session: Using project '{test_project.name}']" in result

    # Try reading it back via permalink
    content = await read_note.fn("test/test-note", project=test_project.name)
    assert (
        normalize_newlines(
            dedent("""
        ---
        title: Test Note
        type: note
        permalink: test/test-note
        tags:
        - test
        - documentation
        ---
        
        # Test
        This is a test note
        """).strip()
        )
        in content
    )


@pytest.mark.asyncio
async def test_write_note_no_tags(app, test_project):
    """Test creating a note without tags."""
    result = await write_note.fn(
        project=test_project.name, title="Simple Note", folder="test", content="Just some text"
    )

    assert result
    assert "# Created note" in result
    assert f"project: {test_project.name}" in result
    assert "file_path: test/Simple Note.md" in result
    assert "permalink: test/simple-note" in result
    assert f"[Session: Using project '{test_project.name}']" in result
    # Should be able to read it back
    content = await read_note.fn("test/simple-note", project=test_project.name)
    assert (
        normalize_newlines(
            dedent("""
        ---
        title: Simple Note
        type: note
        permalink: test/simple-note
        ---
        
        Just some text
        """).strip()
        )
        in content
    )


@pytest.mark.asyncio
async def test_write_note_update_existing(app, test_project):
    """Test creating a new note.

    Should:
    - Create entity with correct type and content
    - Save markdown content
    - Handle tags correctly
    - Return valid permalink
    """
    result = await write_note.fn(
        project=test_project.name,
        title="Test Note",
        folder="test",
        content="# Test\nThis is a test note",
        tags=["test", "documentation"],
    )

    assert result  # Got a valid permalink
    assert "# Created note" in result
    assert f"project: {test_project.name}" in result
    assert "file_path: test/Test Note.md" in result
    assert "permalink: test/test-note" in result
    assert "## Tags" in result
    assert "- test, documentation" in result
    assert f"[Session: Using project '{test_project.name}']" in result

    result = await write_note.fn(
        project=test_project.name,
        title="Test Note",
        folder="test",
        content="# Test\nThis is an updated note",
        tags=["test", "documentation"],
    )
    assert "# Updated note" in result
    assert f"project: {test_project.name}" in result
    assert "file_path: test/Test Note.md" in result
    assert "permalink: test/test-note" in result
    assert "## Tags" in result
    assert "- test, documentation" in result
    assert f"[Session: Using project '{test_project.name}']" in result

    # Try reading it back
    content = await read_note.fn("test/test-note", project=test_project.name)
    assert (
        normalize_newlines(
            dedent(
                """
        ---
        title: Test Note
        type: note
        permalink: test/test-note
        tags:
        - test
        - documentation
        ---
        
        # Test
        This is an updated note
        """
            ).strip()
        )
        == content
    )


@pytest.mark.asyncio
async def test_issue_93_write_note_respects_custom_permalink_new_note(app, test_project):
    """Test that write_note respects custom permalinks in frontmatter for new notes (Issue #93)"""

    # Create a note with custom permalink in frontmatter
    content_with_custom_permalink = dedent("""
        ---
        permalink: custom/my-desired-permalink
        ---

        # My New Note

        This note has a custom permalink specified in frontmatter.

        - [note] Testing if custom permalink is respected
    """).strip()

    result = await write_note.fn(
        project=test_project.name,
        title="My New Note",
        folder="notes",
        content=content_with_custom_permalink,
    )

    # Verify the custom permalink is respected
    assert "# Created note" in result
    assert f"project: {test_project.name}" in result
    assert "file_path: notes/My New Note.md" in result
    assert "permalink: custom/my-desired-permalink" in result
    assert f"[Session: Using project '{test_project.name}']" in result


@pytest.mark.asyncio
async def test_issue_93_write_note_respects_custom_permalink_existing_note(app, test_project):
    """Test that write_note respects custom permalinks when updating existing notes (Issue #93)"""

    # Step 1: Create initial note (auto-generated permalink)
    result1 = await write_note.fn(
        project=test_project.name,
        title="Existing Note",
        folder="test",
        content="Initial content without custom permalink",
    )

    assert "# Created note" in result1
    assert f"project: {test_project.name}" in result1

    # Extract the auto-generated permalink
    initial_permalink = None
    for line in result1.split("\n"):
        if line.startswith("permalink:"):
            initial_permalink = line.split(":", 1)[1].strip()
            break

    assert initial_permalink is not None

    # Step 2: Update with content that includes custom permalink in frontmatter
    updated_content = dedent("""
        ---
        permalink: custom/new-permalink
        ---

        # Existing Note

        Updated content with custom permalink in frontmatter.

        - [note] Custom permalink should be respected on update
    """).strip()

    result2 = await write_note.fn(
        project=test_project.name,
        title="Existing Note",
        folder="test",
        content=updated_content,
    )

    # Verify the custom permalink is respected
    assert "# Updated note" in result2
    assert f"project: {test_project.name}" in result2
    assert "permalink: custom/new-permalink" in result2
    assert f"permalink: {initial_permalink}" not in result2
    assert f"[Session: Using project '{test_project.name}']" in result2


@pytest.mark.asyncio
async def test_delete_note_existing(app, test_project):
    """Test deleting a new note.

    Should:
    - Create entity with correct type and content
    - Return valid permalink
    - Delete the note
    """
    result = await write_note.fn(
        project=test_project.name,
        title="Test Note",
        folder="test",
        content="# Test\nThis is a test note",
        tags=["test", "documentation"],
    )

    assert result
    assert f"project: {test_project.name}" in result

    deleted = await delete_note.fn("test/test-note", project=test_project.name)
    assert deleted is True


@pytest.mark.asyncio
async def test_delete_note_doesnt_exist(app, test_project):
    """Test deleting a new note.

    Should:
    - Delete the note
    - verify returns false
    """
    deleted = await delete_note.fn("doesnt-exist", project=test_project.name)
    assert deleted is False


@pytest.mark.asyncio
async def test_write_note_with_tag_array_from_bug_report(app, test_project):
    """Test creating a note with a tag array as reported in issue #38.

    This reproduces the exact payload from the bug report where Cursor
    was passing an array of tags and getting a type mismatch error.
    """
    # This is the exact payload from the bug report
    bug_payload = {
        "project": test_project.name,
        "title": "Title",
        "folder": "folder",
        "content": "CONTENT",
        "tags": ["hipporag", "search", "fallback", "symfony", "error-handling"],
    }

    # Try to call the function with this data directly
    result = await write_note.fn(**bug_payload)

    assert result
    assert f"project: {test_project.name}" in result
    assert "permalink: folder/title" in result
    assert "Tags" in result
    assert "hipporag" in result
    assert f"[Session: Using project '{test_project.name}']" in result


@pytest.mark.asyncio
async def test_write_note_verbose(app, test_project):
    """Test creating a new note.

    Should:
    - Create entity with correct type and content
    - Save markdown content
    - Handle tags correctly
    - Return valid permalink
    """
    result = await write_note.fn(
        project=test_project.name,
        title="Test Note",
        folder="test",
        content="""
# Test\nThis is a test note

- [note] First observation
- relates to [[Knowledge]]

""",
        tags=["test", "documentation"],
    )

    assert "# Created note" in result
    assert f"project: {test_project.name}" in result
    assert "file_path: test/Test Note.md" in result
    assert "permalink: test/test-note" in result
    assert "## Observations" in result
    assert "- note: 1" in result
    assert "## Relations" in result
    assert "## Tags" in result
    assert "- test, documentation" in result
    assert f"[Session: Using project '{test_project.name}']" in result


@pytest.mark.asyncio
async def test_write_note_preserves_custom_metadata(app, project_config, test_project):
    """Test that updating a note preserves custom metadata fields.

    Reproduces issue #36 where custom frontmatter fields like Status
    were being lost when updating notes with the write_note tool.

    Should:
    - Create a note with custom frontmatter
    - Update the note with new content
    - Verify custom frontmatter is preserved
    """
    # First, create a note with custom metadata using write_note
    await write_note.fn(
        project=test_project.name,
        title="Custom Metadata Note",
        folder="test",
        content="# Initial content",
        tags=["test"],
    )

    # Read the note to get its permalink
    content = await read_note.fn("test/custom-metadata-note", project=test_project.name)

    # Now directly update the file with custom frontmatter
    # We need to use a direct file update to add custom frontmatter
    import frontmatter

    file_path = project_config.home / "test" / "Custom Metadata Note.md"
    post = frontmatter.load(file_path)

    # Add custom frontmatter
    post["Status"] = "In Progress"
    post["Priority"] = "High"
    post["Version"] = "1.0"

    # Write the file back
    with open(file_path, "w") as f:
        f.write(frontmatter.dumps(post))

    # Now update the note using write_note
    result = await write_note.fn(
        project=test_project.name,
        title="Custom Metadata Note",
        folder="test",
        content="# Updated content",
        tags=["test", "updated"],
    )

    # Verify the update was successful
    assert (
        "Updated note\nproject: test-project\nfile_path: test/Custom Metadata Note.md"
    ) in result
    assert f"project: {test_project.name}" in result

    # Read the note back and check if custom frontmatter is preserved
    content = await read_note.fn("test/custom-metadata-note", project=test_project.name)

    # Custom frontmatter should be preserved
    assert "Status: In Progress" in content
    assert "Priority: High" in content
    # Version might be quoted as '1.0' due to YAML serialization
    assert "Version:" in content  # Just check that the field exists
    assert "1.0" in content  # And that the value exists somewhere

    # And new content should be there
    assert "# Updated content" in content

    # And tags should be updated (without # prefix)
    assert "- test" in content
    assert "- updated" in content


@pytest.mark.asyncio
async def test_write_note_preserves_content_frontmatter(app, test_project):
    """Test creating a new note."""
    await write_note.fn(
        project=test_project.name,
        title="Test Note",
        folder="test",
        content=dedent(
            """
            ---
            title: Test Note
            type: note
            version: 1.0
            author: name
            ---
            # Test

            This is a test note
            """
        ),
        tags=["test", "documentation"],
    )

    # Try reading it back via permalink
    content = await read_note.fn("test/test-note", project=test_project.name)
    assert (
        normalize_newlines(
            dedent(
                """
            ---
            title: Test Note
            type: note
            permalink: test/test-note
            version: 1.0
            author: name
            tags:
            - test
            - documentation
            ---

            # Test

            This is a test note
            """
            ).strip()
        )
        in content
    )


@pytest.mark.asyncio
async def test_write_note_permalink_collision_fix_issue_139(app, test_project):
    """Test fix for GitHub Issue #139: UNIQUE constraint failed: entity.permalink.

    This reproduces the exact scenario described in the issue:
    1. Create a note with title "Note 1"
    2. Create another note with title "Note 2"
    3. Try to create/replace first note again with same title "Note 1"

    Before the fix, step 3 would fail with UNIQUE constraint error.
    After the fix, it should either update the existing note or create with unique permalink.
    """
    # Step 1: Create first note
    result1 = await write_note.fn(
        project=test_project.name,
        title="Note 1",
        folder="test",
        content="Original content for note 1",
    )
    assert "# Created note" in result1
    assert f"project: {test_project.name}" in result1
    assert "permalink: test/note-1" in result1

    # Step 2: Create second note with different title
    result2 = await write_note.fn(
        project=test_project.name, title="Note 2", folder="test", content="Content for note 2"
    )
    assert "# Created note" in result2
    assert f"project: {test_project.name}" in result2
    assert "permalink: test/note-2" in result2

    # Step 3: Try to create/replace first note again
    # This scenario would trigger the UNIQUE constraint failure before the fix
    result3 = await write_note.fn(
        project=test_project.name,
        title="Note 1",  # Same title as first note
        folder="test",  # Same folder as first note
        content="Replacement content for note 1",  # Different content
    )

    # This should not raise a UNIQUE constraint failure error
    # It should succeed and either:
    # 1. Update the existing note (preferred behavior)
    # 2. Create a new note with unique permalink (fallback behavior)

    assert result3 is not None
    assert f"project: {test_project.name}" in result3
    assert "Updated note" in result3 or "Created note" in result3

    # The result should contain either the original permalink or a unique one
    assert "permalink: test/note-1" in result3 or "permalink: test/note-1-1" in result3

    # Verify we can read back the content
    if "permalink: test/note-1" in result3:
        # Updated existing note case
        content = await read_note.fn("test/note-1", project=test_project.name)
        assert "Replacement content for note 1" in content
    else:
        # Created new note with unique permalink case
        content = await read_note.fn(test_project.name, "test/note-1-1")
        assert "Replacement content for note 1" in content
        # Original note should still exist
        original_content = await read_note.fn(test_project.name, "test/note-1")
        assert "Original content for note 1" in original_content


@pytest.mark.asyncio
async def test_write_note_with_custom_entity_type(app, test_project):
    """Test creating a note with custom entity_type parameter.

    This test verifies the fix for Issue #144 where entity_type parameter
    was hardcoded to "note" instead of allowing custom types.
    """
    result = await write_note.fn(
        project=test_project.name,
        title="Test Guide",
        folder="guides",
        content="# Guide Content\nThis is a guide",
        tags=["guide", "documentation"],
        note_type="guide",
    )

    assert result
    assert "# Created note" in result
    assert f"project: {test_project.name}" in result
    assert "file_path: guides/Test Guide.md" in result
    assert "permalink: guides/test-guide" in result
    assert "## Tags" in result
    assert "- guide, documentation" in result
    assert f"[Session: Using project '{test_project.name}']" in result

    # Verify the entity type is correctly set in the frontmatter
    content = await read_note.fn("guides/test-guide", project=test_project.name)
    assert (
        normalize_newlines(
            dedent("""
        ---
        title: Test Guide
        type: guide
        permalink: guides/test-guide
        tags:
        - guide
        - documentation
        ---

        # Guide Content
        This is a guide
        """).strip()
        )
        in content
    )


@pytest.mark.asyncio
async def test_write_note_with_report_entity_type(app, test_project):
    """Test creating a note with note_type="report"."""
    result = await write_note.fn(
        project=test_project.name,
        title="Monthly Report",
        folder="reports",
        content="# Monthly Report\nThis is a monthly report",
        tags=["report", "monthly"],
        note_type="report",
    )

    assert result
    assert "# Created note" in result
    assert f"project: {test_project.name}" in result
    assert "file_path: reports/Monthly Report.md" in result
    assert "permalink: reports/monthly-report" in result
    assert f"[Session: Using project '{test_project.name}']" in result

    # Verify the entity type is correctly set in the frontmatter
    content = await read_note.fn("reports/monthly-report", project=test_project.name)
    assert "type: report" in content
    assert "# Monthly Report" in content


@pytest.mark.asyncio
async def test_write_note_with_config_entity_type(app, test_project):
    """Test creating a note with note_type="config"."""
    result = await write_note.fn(
        project=test_project.name,
        title="System Config",
        folder="config",
        content="# System Configuration\nThis is a config file",
        note_type="config",
    )

    assert result
    assert "# Created note" in result
    assert f"project: {test_project.name}" in result
    assert "file_path: config/System Config.md" in result
    assert "permalink: config/system-config" in result
    assert f"[Session: Using project '{test_project.name}']" in result

    # Verify the entity type is correctly set in the frontmatter
    content = await read_note.fn("config/system-config", project=test_project.name)
    assert "type: config" in content
    assert "# System Configuration" in content


@pytest.mark.asyncio
async def test_write_note_entity_type_default_behavior(app, test_project):
    """Test that the entity_type parameter defaults to "note" when not specified.

    This ensures backward compatibility - existing code that doesn't specify
    entity_type should continue to work as before.
    """
    result = await write_note.fn(
        project=test_project.name,
        title="Default Type Test",
        folder="test",
        content="# Default Type Test\nThis should be type 'note'",
        tags=["test"],
    )

    assert result
    assert "# Created note" in result
    assert f"project: {test_project.name}" in result
    assert "file_path: test/Default Type Test.md" in result
    assert "permalink: test/default-type-test" in result
    assert f"[Session: Using project '{test_project.name}']" in result

    # Verify the entity type defaults to "note"
    content = await read_note.fn("test/default-type-test", project=test_project.name)
    assert "type: note" in content
    assert "# Default Type Test" in content


@pytest.mark.asyncio
async def test_write_note_update_existing_with_different_entity_type(app, test_project):
    """Test updating an existing note with a different entity_type."""
    # Create initial note as "note" type
    result1 = await write_note.fn(
        project=test_project.name,
        title="Changeable Type",
        folder="test",
        content="# Initial Content\nThis starts as a note",
        tags=["test"],
        note_type="note",
    )

    assert result1
    assert "# Created note" in result1
    assert f"project: {test_project.name}" in result1

    # Update the same note with a different note_type
    result2 = await write_note.fn(
        project=test_project.name,
        title="Changeable Type",
        folder="test",
        content="# Updated Content\nThis is now a guide",
        tags=["guide"],
        note_type="guide",
    )

    assert result2
    assert "# Updated note" in result2
    assert f"project: {test_project.name}" in result2

    # Verify the entity type was updated
    content = await read_note.fn("test/changeable-type", project=test_project.name)
    assert "type: guide" in content
    assert "# Updated Content" in content
    assert "- guide" in content


@pytest.mark.asyncio
async def test_write_note_respects_frontmatter_entity_type(app, test_project):
    """Test that entity_type in frontmatter is respected when parameter is not provided.

    This verifies that when write_note is called without entity_type parameter,
    but the content includes frontmatter with a 'type' field, that type is respected
    instead of defaulting to 'note'.
    """
    note = dedent("""
        ---
        title: Test Guide
        type: guide
        permalink: guides/test-guide
        tags:
        - guide
        - documentation
        ---

        # Guide Content
        This is a guide
        """).strip()

    # Call write_note without entity_type parameter - it should respect frontmatter type
    result = await write_note.fn(
        project=test_project.name, title="Test Guide", folder="guides", content=note
    )

    assert result
    assert "# Created note" in result
    assert f"project: {test_project.name}" in result
    assert "file_path: guides/Test Guide.md" in result
    assert "permalink: guides/test-guide" in result
    assert f"[Session: Using project '{test_project.name}']" in result

    # Verify the entity type from frontmatter is respected (should be "guide", not "note")
    content = await read_note.fn("guides/test-guide", project=test_project.name)
    assert "type: guide" in content
    assert "# Guide Content" in content
    assert "- guide" in content
    assert "- documentation" in content


class TestWriteNoteSecurityValidation:
    """Test write_note security validation features."""

    @pytest.mark.asyncio
    async def test_write_note_blocks_path_traversal_unix(self, app, test_project):
        """Test that Unix-style path traversal attacks are blocked in folder parameter."""
        # Test various Unix-style path traversal patterns
        attack_folders = [
            "../",
            "../../",
            "../../../",
            "../secrets",
            "../../etc",
            "../../../etc/passwd_folder",
            "notes/../../../etc",
            "folder/../../outside",
            "../../../../malicious",
        ]

        for attack_folder in attack_folders:
            result = await write_note.fn(
                project=test_project.name,
                title="Test Note",
                folder=attack_folder,
                content="# Test Content\nThis should be blocked by security validation.",
            )

            assert isinstance(result, str)
            assert "# Error" in result
            assert "paths must stay within project boundaries" in result
            assert attack_folder in result

    @pytest.mark.asyncio
    async def test_write_note_blocks_path_traversal_windows(self, app, test_project):
        """Test that Windows-style path traversal attacks are blocked in folder parameter."""
        # Test various Windows-style path traversal patterns
        attack_folders = [
            "..\\",
            "..\\..\\",
            "..\\..\\..\\",
            "..\\secrets",
            "..\\..\\Windows",
            "..\\..\\..\\Windows\\System32",
            "notes\\..\\..\\..\\Windows",
            "\\\\server\\share",
            "\\\\..\\..\\Windows",
        ]

        for attack_folder in attack_folders:
            result = await write_note.fn(
                project=test_project.name,
                title="Test Note",
                folder=attack_folder,
                content="# Test Content\nThis should be blocked by security validation.",
            )

            assert isinstance(result, str)
            assert "# Error" in result
            assert "paths must stay within project boundaries" in result
            assert attack_folder in result

    @pytest.mark.asyncio
    async def test_write_note_blocks_absolute_paths(self, app, test_project):
        """Test that absolute paths are blocked in folder parameter."""
        # Test various absolute path patterns
        attack_folders = [
            "/etc",
            "/home/user",
            "/var/log",
            "/root",
            "C:\\Windows",
            "C:\\Users\\user",
            "D:\\secrets",
            "/tmp/malicious",
            "/usr/local/evil",
        ]

        for attack_folder in attack_folders:
            result = await write_note.fn(
                project=test_project.name,
                title="Test Note",
                folder=attack_folder,
                content="# Test Content\nThis should be blocked by security validation.",
            )

            assert isinstance(result, str)
            assert "# Error" in result
            assert "paths must stay within project boundaries" in result
            assert attack_folder in result

    @pytest.mark.asyncio
    async def test_write_note_blocks_home_directory_access(self, app, test_project):
        """Test that home directory access patterns are blocked in folder parameter."""
        # Test various home directory access patterns
        attack_folders = [
            "~",
            "~/",
            "~/secrets",
            "~/.ssh",
            "~/Documents",
            "~\\AppData",
            "~\\Desktop",
            "~/.env_folder",
        ]

        for attack_folder in attack_folders:
            result = await write_note.fn(
                project=test_project.name,
                title="Test Note",
                folder=attack_folder,
                content="# Test Content\nThis should be blocked by security validation.",
            )

            assert isinstance(result, str)
            assert "# Error" in result
            assert "paths must stay within project boundaries" in result
            assert attack_folder in result

    @pytest.mark.asyncio
    async def test_write_note_blocks_mixed_attack_patterns(self, app, test_project):
        """Test that mixed legitimate/attack patterns are blocked in folder parameter."""
        # Test mixed patterns that start legitimate but contain attacks
        attack_folders = [
            "notes/../../../etc",
            "docs/../../.env_folder",
            "legitimate/path/../../.ssh",
            "project/folder/../../../Windows",
            "valid/folder/../../home/user",
            "assets/../../../tmp/evil",
        ]

        for attack_folder in attack_folders:
            result = await write_note.fn(
                project=test_project.name,
                title="Test Note",
                folder=attack_folder,
                content="# Test Content\nThis should be blocked by security validation.",
            )

            assert isinstance(result, str)
            assert "# Error" in result
            assert "paths must stay within project boundaries" in result

    @pytest.mark.asyncio
    async def test_write_note_allows_safe_folder_paths(self, app, test_project):
        """Test that legitimate folder paths are still allowed."""
        # Test various safe folder patterns
        safe_folders = [
            "notes",
            "docs",
            "projects/2025",
            "archive/old-notes",
            "deep/nested/directory/structure",
            "folder/subfolder",
            "research/ml",
            "meeting-notes",
        ]

        for safe_folder in safe_folders:
            result = await write_note.fn(
                project=test_project.name,
                title=f"Test Note in {safe_folder.replace('/', '-')}",
                folder=safe_folder,
                content="# Test Content\nThis should work normally with security validation.",
                tags=["test", "security"],
            )

            # Should succeed (not a security error)
            assert isinstance(result, str)
            assert "# Error" not in result
            assert "paths must stay within project boundaries" not in result
            # Should be normal successful creation/update
            assert ("# Created note" in result) or ("# Updated note" in result)
            assert safe_folder in result  # Should show in file_path

    @pytest.mark.asyncio
    async def test_write_note_empty_folder_security(self, app, test_project):
        """Test that empty folder parameter is handled securely."""
        # Empty folder should be allowed (creates in root)
        result = await write_note.fn(
            project=test_project.name,
            title="Root Note",
            folder="",
            content="# Root Note\nThis note should be created in the project root.",
        )

        assert isinstance(result, str)
        # Empty folder should not trigger security error
        assert "# Error" not in result
        assert "paths must stay within project boundaries" not in result
        # Should succeed normally
        assert ("# Created note" in result) or ("# Updated note" in result)

    @pytest.mark.asyncio
    async def test_write_note_none_folder_security(self, app, test_project):
        """Test that default folder behavior works securely when folder is omitted."""
        # The write_note function requires folder parameter, but we can test with empty string
        # which effectively creates in project root
        result = await write_note.fn(
            project=test_project.name,
            title="Root Folder Note",
            folder="",  # Empty string instead of None since folder is required
            content="# Root Folder Note\nThis note should be created in the project root.",
        )

        assert isinstance(result, str)
        # Empty folder should not trigger security error
        assert "# Error" not in result
        assert "paths must stay within project boundaries" not in result
        # Should succeed normally
        assert ("# Created note" in result) or ("# Updated note" in result)

    @pytest.mark.asyncio
    async def test_write_note_current_directory_references_security(self, app, test_project):
        """Test that current directory references are handled securely."""
        # Test current directory references (should be safe)
        safe_folders = [
            "./notes",
            "folder/./subfolder",
            "./folder/subfolder",
        ]

        for safe_folder in safe_folders:
            result = await write_note.fn(
                project=test_project.name,
                title=f"Current Dir Test {safe_folder.replace('/', '-').replace('.', 'dot')}",
                folder=safe_folder,
                content="# Current Directory Test\nThis should work with current directory references.",
            )

            assert isinstance(result, str)
            # Should NOT contain security error message
            assert "# Error" not in result
            assert "paths must stay within project boundaries" not in result
            # Should succeed normally
            assert ("# Created note" in result) or ("# Updated note" in result)

    @pytest.mark.asyncio
    async def test_write_note_security_with_all_parameters(self, app, test_project):
        """Test security validation works with all write_note parameters."""
        # Test that security validation is applied even when all other parameters are provided
        result = await write_note.fn(
            project=test_project.name,
            title="Security Test with All Params",
            folder="../../../etc/malicious",
            content="# Malicious Content\nThis should be blocked by security validation.",
            tags=["malicious", "test"],
            note_type="guide",
        )

        assert isinstance(result, str)
        assert "# Error" in result
        assert "paths must stay within project boundaries" in result
        assert "../../../etc/malicious" in result

    @pytest.mark.asyncio
    async def test_write_note_security_logging(self, app, test_project, caplog):
        """Test that security violations are properly logged."""
        # Attempt path traversal attack
        result = await write_note.fn(
            project=test_project.name,
            title="Security Logging Test",
            folder="../../../etc/passwd_folder",
            content="# Test Content\nThis should trigger security logging.",
        )

        assert "# Error" in result
        assert "paths must stay within project boundaries" in result

        # Check that security violation was logged
        # Note: This test may need adjustment based on the actual logging setup
        # The security validation should generate a warning log entry

    @pytest.mark.asyncio
    async def test_write_note_preserves_functionality_with_security(self, app, test_project):
        """Test that security validation doesn't break normal note creation functionality."""
        # Create a note with all features to ensure security validation doesn't interfere
        result = await write_note.fn(
            project=test_project.name,
            title="Full Feature Security Test",
            folder="security-tests",
            content=dedent("""
                # Full Feature Security Test

                This note tests that security validation doesn't break normal functionality.

                ## Observations
                - [security] Path validation working correctly #security
                - [feature] All features still functional #test

                ## Relations
                - relates_to [[Security Implementation]]
                - depends_on [[Path Validation]]

                Additional content with various formatting.
            """).strip(),
            tags=["security", "test", "full-feature"],
            note_type="guide",
        )

        # Should succeed normally
        assert isinstance(result, str)
        assert "# Error" not in result
        assert "paths must stay within project boundaries" not in result
        assert "# Created note" in result
        assert "file_path: security-tests/Full Feature Security Test.md" in result
        assert "permalink: security-tests/full-feature-security-test" in result

        # Should process observations and relations
        assert "## Observations" in result
        assert "## Relations" in result
        assert "## Tags" in result

        # Should show proper counts
        assert "security: 1" in result
        assert "feature: 1" in result


class TestWriteNoteSecurityEdgeCases:
    """Test edge cases for write_note security validation."""

    @pytest.mark.asyncio
    async def test_write_note_unicode_folder_attacks(self, app, test_project):
        """Test that Unicode-based path traversal attempts are blocked."""
        # Test Unicode path traversal attempts
        unicode_attack_folders = [
            "notes/文档/../../../etc",  # Chinese characters
            "docs/café/../../secrets",  # Accented characters
            "files/αβγ/../../../malicious",  # Greek characters
        ]

        for attack_folder in unicode_attack_folders:
            result = await write_note.fn(
                project=test_project.name,
                title="Unicode Attack Test",
                folder=attack_folder,
                content="# Unicode Attack\nThis should be blocked.",
            )

            assert isinstance(result, str)
            assert "# Error" in result
            assert "paths must stay within project boundaries" in result

    @pytest.mark.asyncio
    async def test_write_note_very_long_attack_folder(self, app, test_project):
        """Test handling of very long attack folder paths."""
        # Create a very long path traversal attack
        long_attack_folder = "../" * 1000 + "etc/malicious"

        result = await write_note.fn(
            project=test_project.name,
            title="Long Attack Test",
            folder=long_attack_folder,
            content="# Long Attack\nThis should be blocked.",
        )

        assert isinstance(result, str)
        assert "# Error" in result
        assert "paths must stay within project boundaries" in result

    @pytest.mark.asyncio
    async def test_write_note_case_variations_attacks(self, app, test_project):
        """Test that case variations don't bypass security."""
        # Test case variations (though case sensitivity depends on filesystem)
        case_attack_folders = [
            "../ETC",
            "../Etc/SECRETS",
            "..\\WINDOWS",
            "~/SECRETS",
        ]

        for attack_folder in case_attack_folders:
            result = await write_note.fn(
                project=test_project.name,
                title="Case Variation Attack Test",
                folder=attack_folder,
                content="# Case Attack\nThis should be blocked.",
            )

            assert isinstance(result, str)
            assert "# Error" in result
            assert "paths must stay within project boundaries" in result

    @pytest.mark.asyncio
    async def test_write_note_whitespace_in_attack_folders(self, app, test_project):
        """Test that whitespace doesn't help bypass security."""
        # Test attack folders with various whitespace
        whitespace_attack_folders = [
            " ../../../etc ",
            "\t../../../secrets\t",
            " ..\\..\\Windows ",
            "notes/ ../../ malicious",
        ]

        for attack_folder in whitespace_attack_folders:
            result = await write_note.fn(
                project=test_project.name,
                title="Whitespace Attack Test",
                folder=attack_folder,
                content="# Whitespace Attack\nThis should be blocked.",
            )

            assert isinstance(result, str)
            # The attack should still be blocked even with whitespace
            if ".." in attack_folder.strip() or "~" in attack_folder.strip():
                assert "# Error" in result
                assert "paths must stay within project boundaries" in result

```
Page 16/19FirstPrevNextLast