This is page 16 of 19. Use http://codebase.md/basicmachines-co/basic-memory?page={x} to view the full context.
# Directory Structure
```
├── .claude
│ ├── commands
│ │ ├── release
│ │ │ ├── beta.md
│ │ │ ├── changelog.md
│ │ │ ├── release-check.md
│ │ │ └── release.md
│ │ ├── spec.md
│ │ └── test-live.md
│ └── settings.json
├── .dockerignore
├── .env.example
├── .github
│ ├── dependabot.yml
│ ├── ISSUE_TEMPLATE
│ │ ├── bug_report.md
│ │ ├── config.yml
│ │ ├── documentation.md
│ │ └── feature_request.md
│ └── workflows
│ ├── claude-code-review.yml
│ ├── claude-issue-triage.yml
│ ├── claude.yml
│ ├── dev-release.yml
│ ├── docker.yml
│ ├── pr-title.yml
│ ├── release.yml
│ └── test.yml
├── .gitignore
├── .python-version
├── CHANGELOG.md
├── CITATION.cff
├── CLA.md
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── docker-compose-postgres.yml
├── docker-compose.yml
├── Dockerfile
├── docs
│ ├── ai-assistant-guide-extended.md
│ ├── ARCHITECTURE.md
│ ├── character-handling.md
│ ├── cloud-cli.md
│ ├── Docker.md
│ └── testing-coverage.md
├── justfile
├── LICENSE
├── llms-install.md
├── pyproject.toml
├── README.md
├── SECURITY.md
├── smithery.yaml
├── specs
│ ├── SPEC-1 Specification-Driven Development Process.md
│ ├── SPEC-10 Unified Deployment Workflow and Event Tracking.md
│ ├── SPEC-11 Basic Memory API Performance Optimization.md
│ ├── SPEC-12 OpenTelemetry Observability.md
│ ├── SPEC-13 CLI Authentication with Subscription Validation.md
│ ├── SPEC-14 Cloud Git Versioning & GitHub Backup.md
│ ├── SPEC-14- Cloud Git Versioning & GitHub Backup.md
│ ├── SPEC-15 Configuration Persistence via Tigris for Cloud Tenants.md
│ ├── SPEC-16 MCP Cloud Service Consolidation.md
│ ├── SPEC-17 Semantic Search with ChromaDB.md
│ ├── SPEC-18 AI Memory Management Tool.md
│ ├── SPEC-19 Sync Performance and Memory Optimization.md
│ ├── SPEC-2 Slash Commands Reference.md
│ ├── SPEC-20 Simplified Project-Scoped Rclone Sync.md
│ ├── SPEC-3 Agent Definitions.md
│ ├── SPEC-4 Notes Web UI Component Architecture.md
│ ├── SPEC-5 CLI Cloud Upload via WebDAV.md
│ ├── SPEC-6 Explicit Project Parameter Architecture.md
│ ├── SPEC-7 POC to spike Tigris Turso for local access to cloud data.md
│ ├── SPEC-8 TigrisFS Integration.md
│ ├── SPEC-9 Multi-Project Bidirectional Sync Architecture.md
│ ├── SPEC-9 Signed Header Tenant Information.md
│ └── SPEC-9-1 Follow-Ups- Conflict, Sync, and Observability.md
├── src
│ └── basic_memory
│ ├── __init__.py
│ ├── alembic
│ │ ├── alembic.ini
│ │ ├── env.py
│ │ ├── migrations.py
│ │ ├── script.py.mako
│ │ └── versions
│ │ ├── 314f1ea54dc4_add_postgres_full_text_search_support_.py
│ │ ├── 3dae7c7b1564_initial_schema.py
│ │ ├── 502b60eaa905_remove_required_from_entity_permalink.py
│ │ ├── 5fe1ab1ccebe_add_projects_table.py
│ │ ├── 647e7a75e2cd_project_constraint_fix.py
│ │ ├── 6830751f5fb6_merge_multiple_heads.py
│ │ ├── 9d9c1cb7d8f5_add_mtime_and_size_columns_to_entity_.py
│ │ ├── a1b2c3d4e5f6_fix_project_foreign_keys.py
│ │ ├── a2b3c4d5e6f7_add_search_index_entity_cascade.py
│ │ ├── b3c3938bacdb_relation_to_name_unique_index.py
│ │ ├── cc7172b46608_update_search_index_schema.py
│ │ ├── e7e1f4367280_add_scan_watermark_tracking_to_project.py
│ │ ├── f8a9b2c3d4e5_add_pg_trgm_for_fuzzy_link_resolution.py
│ │ └── g9a0b3c4d5e6_add_external_id_to_project_and_entity.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── app.py
│ │ ├── container.py
│ │ ├── routers
│ │ │ ├── __init__.py
│ │ │ ├── directory_router.py
│ │ │ ├── importer_router.py
│ │ │ ├── knowledge_router.py
│ │ │ ├── management_router.py
│ │ │ ├── memory_router.py
│ │ │ ├── project_router.py
│ │ │ ├── prompt_router.py
│ │ │ ├── resource_router.py
│ │ │ ├── search_router.py
│ │ │ └── utils.py
│ │ ├── template_loader.py
│ │ └── v2
│ │ ├── __init__.py
│ │ └── routers
│ │ ├── __init__.py
│ │ ├── directory_router.py
│ │ ├── importer_router.py
│ │ ├── knowledge_router.py
│ │ ├── memory_router.py
│ │ ├── project_router.py
│ │ ├── prompt_router.py
│ │ ├── resource_router.py
│ │ └── search_router.py
│ ├── cli
│ │ ├── __init__.py
│ │ ├── app.py
│ │ ├── auth.py
│ │ ├── commands
│ │ │ ├── __init__.py
│ │ │ ├── cloud
│ │ │ │ ├── __init__.py
│ │ │ │ ├── api_client.py
│ │ │ │ ├── bisync_commands.py
│ │ │ │ ├── cloud_utils.py
│ │ │ │ ├── core_commands.py
│ │ │ │ ├── rclone_commands.py
│ │ │ │ ├── rclone_config.py
│ │ │ │ ├── rclone_installer.py
│ │ │ │ ├── upload_command.py
│ │ │ │ └── upload.py
│ │ │ ├── command_utils.py
│ │ │ ├── db.py
│ │ │ ├── format.py
│ │ │ ├── import_chatgpt.py
│ │ │ ├── import_claude_conversations.py
│ │ │ ├── import_claude_projects.py
│ │ │ ├── import_memory_json.py
│ │ │ ├── mcp.py
│ │ │ ├── project.py
│ │ │ ├── status.py
│ │ │ ├── telemetry.py
│ │ │ └── tool.py
│ │ ├── container.py
│ │ └── main.py
│ ├── config.py
│ ├── db.py
│ ├── deps
│ │ ├── __init__.py
│ │ ├── config.py
│ │ ├── db.py
│ │ ├── importers.py
│ │ ├── projects.py
│ │ ├── repositories.py
│ │ └── services.py
│ ├── deps.py
│ ├── file_utils.py
│ ├── ignore_utils.py
│ ├── importers
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── chatgpt_importer.py
│ │ ├── claude_conversations_importer.py
│ │ ├── claude_projects_importer.py
│ │ ├── memory_json_importer.py
│ │ └── utils.py
│ ├── markdown
│ │ ├── __init__.py
│ │ ├── entity_parser.py
│ │ ├── markdown_processor.py
│ │ ├── plugins.py
│ │ ├── schemas.py
│ │ └── utils.py
│ ├── mcp
│ │ ├── __init__.py
│ │ ├── async_client.py
│ │ ├── clients
│ │ │ ├── __init__.py
│ │ │ ├── directory.py
│ │ │ ├── knowledge.py
│ │ │ ├── memory.py
│ │ │ ├── project.py
│ │ │ ├── resource.py
│ │ │ └── search.py
│ │ ├── container.py
│ │ ├── project_context.py
│ │ ├── prompts
│ │ │ ├── __init__.py
│ │ │ ├── ai_assistant_guide.py
│ │ │ ├── continue_conversation.py
│ │ │ ├── recent_activity.py
│ │ │ ├── search.py
│ │ │ └── utils.py
│ │ ├── resources
│ │ │ ├── ai_assistant_guide.md
│ │ │ └── project_info.py
│ │ ├── server.py
│ │ └── tools
│ │ ├── __init__.py
│ │ ├── build_context.py
│ │ ├── canvas.py
│ │ ├── chatgpt_tools.py
│ │ ├── delete_note.py
│ │ ├── edit_note.py
│ │ ├── list_directory.py
│ │ ├── move_note.py
│ │ ├── project_management.py
│ │ ├── read_content.py
│ │ ├── read_note.py
│ │ ├── recent_activity.py
│ │ ├── search.py
│ │ ├── utils.py
│ │ ├── view_note.py
│ │ └── write_note.py
│ ├── models
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── knowledge.py
│ │ ├── project.py
│ │ └── search.py
│ ├── project_resolver.py
│ ├── repository
│ │ ├── __init__.py
│ │ ├── entity_repository.py
│ │ ├── observation_repository.py
│ │ ├── postgres_search_repository.py
│ │ ├── project_info_repository.py
│ │ ├── project_repository.py
│ │ ├── relation_repository.py
│ │ ├── repository.py
│ │ ├── search_index_row.py
│ │ ├── search_repository_base.py
│ │ ├── search_repository.py
│ │ └── sqlite_search_repository.py
│ ├── runtime.py
│ ├── schemas
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── cloud.py
│ │ ├── delete.py
│ │ ├── directory.py
│ │ ├── importer.py
│ │ ├── memory.py
│ │ ├── project_info.py
│ │ ├── prompt.py
│ │ ├── request.py
│ │ ├── response.py
│ │ ├── search.py
│ │ ├── sync_report.py
│ │ └── v2
│ │ ├── __init__.py
│ │ ├── entity.py
│ │ └── resource.py
│ ├── services
│ │ ├── __init__.py
│ │ ├── context_service.py
│ │ ├── directory_service.py
│ │ ├── entity_service.py
│ │ ├── exceptions.py
│ │ ├── file_service.py
│ │ ├── initialization.py
│ │ ├── link_resolver.py
│ │ ├── project_service.py
│ │ ├── search_service.py
│ │ └── service.py
│ ├── sync
│ │ ├── __init__.py
│ │ ├── background_sync.py
│ │ ├── coordinator.py
│ │ ├── sync_service.py
│ │ └── watch_service.py
│ ├── telemetry.py
│ ├── templates
│ │ └── prompts
│ │ ├── continue_conversation.hbs
│ │ └── search.hbs
│ └── utils.py
├── test-int
│ ├── BENCHMARKS.md
│ ├── cli
│ │ ├── test_project_commands_integration.py
│ │ └── test_version_integration.py
│ ├── conftest.py
│ ├── mcp
│ │ ├── test_build_context_underscore.py
│ │ ├── test_build_context_validation.py
│ │ ├── test_chatgpt_tools_integration.py
│ │ ├── test_default_project_mode_integration.py
│ │ ├── test_delete_note_integration.py
│ │ ├── test_edit_note_integration.py
│ │ ├── test_lifespan_shutdown_sync_task_cancellation_integration.py
│ │ ├── test_list_directory_integration.py
│ │ ├── test_move_note_integration.py
│ │ ├── test_project_management_integration.py
│ │ ├── test_project_state_sync_integration.py
│ │ ├── test_read_content_integration.py
│ │ ├── test_read_note_integration.py
│ │ ├── test_search_integration.py
│ │ ├── test_single_project_mcp_integration.py
│ │ └── test_write_note_integration.py
│ ├── test_db_wal_mode.py
│ └── test_disable_permalinks_integration.py
├── tests
│ ├── __init__.py
│ ├── api
│ │ ├── conftest.py
│ │ ├── test_api_container.py
│ │ ├── test_async_client.py
│ │ ├── test_continue_conversation_template.py
│ │ ├── test_directory_router.py
│ │ ├── test_importer_router.py
│ │ ├── test_knowledge_router.py
│ │ ├── test_management_router.py
│ │ ├── test_memory_router.py
│ │ ├── test_project_router_operations.py
│ │ ├── test_project_router.py
│ │ ├── test_prompt_router.py
│ │ ├── test_relation_background_resolution.py
│ │ ├── test_resource_router.py
│ │ ├── test_search_router.py
│ │ ├── test_search_template.py
│ │ ├── test_template_loader_helpers.py
│ │ ├── test_template_loader.py
│ │ └── v2
│ │ ├── __init__.py
│ │ ├── conftest.py
│ │ ├── test_directory_router.py
│ │ ├── test_importer_router.py
│ │ ├── test_knowledge_router.py
│ │ ├── test_memory_router.py
│ │ ├── test_project_router.py
│ │ ├── test_prompt_router.py
│ │ ├── test_resource_router.py
│ │ └── test_search_router.py
│ ├── cli
│ │ ├── cloud
│ │ │ ├── test_cloud_api_client_and_utils.py
│ │ │ ├── test_rclone_config_and_bmignore_filters.py
│ │ │ └── test_upload_path.py
│ │ ├── conftest.py
│ │ ├── test_auth_cli_auth.py
│ │ ├── test_cli_container.py
│ │ ├── test_cli_exit.py
│ │ ├── test_cli_tool_exit.py
│ │ ├── test_cli_tools.py
│ │ ├── test_cloud_authentication.py
│ │ ├── test_ignore_utils.py
│ │ ├── test_import_chatgpt.py
│ │ ├── test_import_claude_conversations.py
│ │ ├── test_import_claude_projects.py
│ │ ├── test_import_memory_json.py
│ │ ├── test_project_add_with_local_path.py
│ │ └── test_upload.py
│ ├── conftest.py
│ ├── db
│ │ └── test_issue_254_foreign_key_constraints.py
│ ├── importers
│ │ ├── test_conversation_indexing.py
│ │ ├── test_importer_base.py
│ │ └── test_importer_utils.py
│ ├── markdown
│ │ ├── __init__.py
│ │ ├── test_date_frontmatter_parsing.py
│ │ ├── test_entity_parser_error_handling.py
│ │ ├── test_entity_parser.py
│ │ ├── test_markdown_plugins.py
│ │ ├── test_markdown_processor.py
│ │ ├── test_observation_edge_cases.py
│ │ ├── test_parser_edge_cases.py
│ │ ├── test_relation_edge_cases.py
│ │ └── test_task_detection.py
│ ├── mcp
│ │ ├── clients
│ │ │ ├── __init__.py
│ │ │ └── test_clients.py
│ │ ├── conftest.py
│ │ ├── test_async_client_modes.py
│ │ ├── test_mcp_container.py
│ │ ├── test_obsidian_yaml_formatting.py
│ │ ├── test_permalink_collision_file_overwrite.py
│ │ ├── test_project_context.py
│ │ ├── test_prompts.py
│ │ ├── test_recent_activity_prompt_modes.py
│ │ ├── test_resources.py
│ │ ├── test_server_lifespan_branches.py
│ │ ├── test_tool_build_context.py
│ │ ├── test_tool_canvas.py
│ │ ├── test_tool_delete_note.py
│ │ ├── test_tool_edit_note.py
│ │ ├── test_tool_list_directory.py
│ │ ├── test_tool_move_note.py
│ │ ├── test_tool_project_management.py
│ │ ├── test_tool_read_content.py
│ │ ├── test_tool_read_note.py
│ │ ├── test_tool_recent_activity.py
│ │ ├── test_tool_resource.py
│ │ ├── test_tool_search.py
│ │ ├── test_tool_utils.py
│ │ ├── test_tool_view_note.py
│ │ ├── test_tool_write_note_kebab_filenames.py
│ │ ├── test_tool_write_note.py
│ │ └── tools
│ │ └── test_chatgpt_tools.py
│ ├── Non-MarkdownFileSupport.pdf
│ ├── README.md
│ ├── repository
│ │ ├── test_entity_repository_upsert.py
│ │ ├── test_entity_repository.py
│ │ ├── test_entity_upsert_issue_187.py
│ │ ├── test_observation_repository.py
│ │ ├── test_postgres_search_repository.py
│ │ ├── test_project_info_repository.py
│ │ ├── test_project_repository.py
│ │ ├── test_relation_repository.py
│ │ ├── test_repository.py
│ │ ├── test_search_repository_edit_bug_fix.py
│ │ └── test_search_repository.py
│ ├── schemas
│ │ ├── test_base_timeframe_minimum.py
│ │ ├── test_memory_serialization.py
│ │ ├── test_memory_url_validation.py
│ │ ├── test_memory_url.py
│ │ ├── test_relation_response_reference_resolution.py
│ │ ├── test_schemas.py
│ │ └── test_search.py
│ ├── Screenshot.png
│ ├── services
│ │ ├── test_context_service.py
│ │ ├── test_directory_service.py
│ │ ├── test_entity_service_disable_permalinks.py
│ │ ├── test_entity_service.py
│ │ ├── test_file_service.py
│ │ ├── test_initialization_cloud_mode_branches.py
│ │ ├── test_initialization.py
│ │ ├── test_link_resolver.py
│ │ ├── test_project_removal_bug.py
│ │ ├── test_project_service_operations.py
│ │ ├── test_project_service.py
│ │ └── test_search_service.py
│ ├── sync
│ │ ├── test_character_conflicts.py
│ │ ├── test_coordinator.py
│ │ ├── test_sync_service_incremental.py
│ │ ├── test_sync_service.py
│ │ ├── test_sync_wikilink_issue.py
│ │ ├── test_tmp_files.py
│ │ ├── test_watch_service_atomic_adds.py
│ │ ├── test_watch_service_edge_cases.py
│ │ ├── test_watch_service_reload.py
│ │ └── test_watch_service.py
│ ├── test_config.py
│ ├── test_deps.py
│ ├── test_production_cascade_delete.py
│ ├── test_project_resolver.py
│ ├── test_rclone_commands.py
│ ├── test_runtime.py
│ ├── test_telemetry.py
│ └── utils
│ ├── test_file_utils.py
│ ├── test_frontmatter_obsidian_compatible.py
│ ├── test_parse_tags.py
│ ├── test_permalink_formatting.py
│ ├── test_timezone_utils.py
│ ├── test_utf8_handling.py
│ └── test_validate_project_path.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/specs/SPEC-19 Sync Performance and Memory Optimization.md:
--------------------------------------------------------------------------------
```markdown
---
title: 'SPEC-19: Sync Performance and Memory Optimization'
type: spec
permalink: specs/spec-17-sync-performance-optimization
tags:
- performance
- memory
- sync
- optimization
- core
status: draft
---
# SPEC-19: Sync Performance and Memory Optimization
## Why
### Problem Statement
Current sync implementation causes Out-of-Memory (OOM) kills and poor performance on production systems:
**Evidence from Production**:
- **Tenant-6d2ff1a3**: OOM killed on 1GB machine
- Files: 2,621 total (31 PDFs, 80MB binary data)
- Memory: 1.5-1.7GB peak usage
- Sync duration: 15+ minutes
- Error: `Out of memory: Killed process 693 (python)`
**Root Causes**:
1. **Checksum-based scanning loads ALL files into memory**
- `scan_directory()` computes checksums for ALL 2,624 files upfront
- Results stored in multiple dicts (`ScanResult.files`, `SyncReport.checksums`)
- Even unchanged files are fully read and checksummed
2. **Large files read entirely for checksums**
- 16MB PDF → Full read into memory → Compute checksum
- No streaming or chunked processing
- TigrisFS caching compounds memory usage
3. **Unbounded concurrency**
- All 2,624 files processed simultaneously
- Each file loads full content into memory
- No semaphore limiting concurrent operations
4. **Cloud-specific resource leaks**
- aiohttp session leak in keepalive (not in context manager)
- Circuit breaker resets every 30s sync cycle (ineffective)
- Thundering herd: all tenants sync at :00 and :30
### Impact
- **Production stability**: OOM kills are unacceptable
- **User experience**: 15+ minute syncs are too slow
- **Cost**: Forced upgrades from 1GB → 2GB machines ($5-10/mo per tenant)
- **Scalability**: Current approach won't scale to 100+ tenants
### Architectural Decision
**Fix in basic-memory core first, NOT UberSync**
Rationale:
- Root causes are algorithmic, not architectural
- Benefits all users (CLI + Cloud)
- Lower risk than new centralized service
- Known solutions (rsync/rclone use same pattern)
- Can defer UberSync until metrics prove it necessary
## What
### Affected Components
**basic-memory (core)**:
- `src/basic_memory/sync/sync_service.py` - Core sync algorithm (~42KB)
- `src/basic_memory/models.py` - Entity model (add mtime/size columns)
- `src/basic_memory/file_utils.py` - Checksum computation functions
- `src/basic_memory/repository/entity_repository.py` - Database queries
- `alembic/versions/` - Database migration for schema changes
**basic-memory-cloud (wrapper)**:
- `apps/api/src/basic_memory_cloud_api/sync_worker.py` - Cloud sync wrapper
- Circuit breaker implementation
- Sync coordination logic
### Database Schema Changes
Add to Entity model:
```python
mtime: float # File modification timestamp
size: int # File size in bytes
```
## How (High Level)
### Phase 1: Core Algorithm Fixes (basic-memory)
**Priority: P0 - Critical**
#### 1.1 mtime-based Scanning (Issue #383)
Replace expensive checksum-based scanning with lightweight stat-based comparison:
```python
async def scan_directory(self, directory: Path) -> ScanResult:
"""Scan using mtime/size instead of checksums"""
result = ScanResult()
for root, dirnames, filenames in os.walk(str(directory)):
for filename in filenames:
rel_path = path.relative_to(directory).as_posix()
stat = path.stat()
# Store lightweight metadata instead of checksum
result.files[rel_path] = {
'mtime': stat.st_mtime,
'size': stat.st_size
}
return result
async def scan(self, directory: Path):
"""Compare mtime/size, only compute checksums for changed files"""
db_state = await self.get_db_file_state() # Include mtime/size
scan_result = await self.scan_directory(directory)
for file_path, metadata in scan_result.files.items():
db_metadata = db_state.get(file_path)
# Only compute expensive checksum if mtime/size changed
if not db_metadata or metadata['mtime'] != db_metadata['mtime']:
checksum = await self._compute_checksum_streaming(file_path)
# Process immediately, don't accumulate in memory
```
**Benefits**:
- No file reads during initial scan (just stat calls)
- ~90% reduction in memory usage
- ~10x faster scan phase
- Only checksum files that actually changed
#### 1.2 Streaming Checksum Computation (Issue #382)
For large files (>1MB), use chunked reading to avoid loading entire file:
```python
async def _compute_checksum_streaming(self, path: Path, chunk_size: int = 65536) -> str:
"""Compute checksum using 64KB chunks for large files"""
hasher = hashlib.sha256()
loop = asyncio.get_event_loop()
def read_chunks():
with open(path, 'rb') as f:
while chunk := f.read(chunk_size):
hasher.update(chunk)
await loop.run_in_executor(None, read_chunks)
return hasher.hexdigest()
async def _compute_checksum_async(self, file_path: Path) -> str:
"""Choose appropriate checksum method based on file size"""
stat = file_path.stat()
if stat.st_size > 1_048_576: # 1MB threshold
return await self._compute_checksum_streaming(file_path)
else:
# Small files: existing fast path
content = await self._read_file_async(file_path)
return compute_checksum(content)
```
**Benefits**:
- Constant memory usage regardless of file size
- 16MB PDF uses 64KB memory (not 16MB)
- Works well with TigrisFS network I/O
#### 1.3 Bounded Concurrency (Issue #198)
Add semaphore to limit concurrent file operations, or consider using aiofiles and async reads
```python
class SyncService:
def __init__(self, ...):
# ... existing code ...
self._file_semaphore = asyncio.Semaphore(10) # Max 10 concurrent
self._max_tracked_failures = 100 # LRU cache limit
async def _read_file_async(self, file_path: Path) -> str:
async with self._file_semaphore:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
self._thread_pool,
file_path.read_text,
"utf-8"
)
async def _record_failure(self, path: str, error: str):
# ... existing code ...
# Implement LRU eviction
if len(self._file_failures) > self._max_tracked_failures:
self._file_failures.popitem(last=False) # Remove oldest
```
**Benefits**:
- Maximum 10 files in memory at once (vs all 2,624)
- 90%+ reduction in peak memory usage
- Prevents unbounded memory growth on error-prone projects
### Phase 2: Cloud-Specific Fixes (basic-memory-cloud)
**Priority: P1 - High**
#### 2.1 Fix Resource Leaks
```python
# apps/api/src/basic_memory_cloud_api/sync_worker.py
async def send_keepalive():
"""Send keepalive pings using proper session management"""
# Use context manager to ensure cleanup
async with aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=5)
) as session:
while True:
try:
await session.get(f"https://{fly_app_name}.fly.dev/health")
await asyncio.sleep(10)
except asyncio.CancelledError:
raise # Exit cleanly
except Exception as e:
logger.warning(f"Keepalive failed: {e}")
```
#### 2.2 Improve Circuit Breaker
Track failures across sync cycles instead of resetting every 30s:
```python
# Persistent failure tracking
class SyncWorker:
def __init__(self):
self._persistent_failures: Dict[str, int] = {} # file -> failure_count
self._failure_window_start = time.time()
async def should_skip_file(self, file_path: str) -> bool:
# Skip files that failed >3 times in last hour
if self._persistent_failures.get(file_path, 0) > 3:
if time.time() - self._failure_window_start < 3600:
return True
return False
```
### Phase 3: Measurement & Decision
**Priority: P2 - Future**
After implementing Phases 1-2, collect metrics for 2 weeks:
- Memory usage per tenant sync
- Sync duration (scan + process)
- Concurrent sync load at peak times
- OOM incidents
- Resource costs
**UberSync Decision Criteria**:
Build centralized sync service ONLY if metrics show:
- ✅ Core fixes insufficient for >100 tenants
- ✅ Resource contention causing problems
- ✅ Need for tenant tier prioritization (paid > free)
- ✅ Cost savings justify complexity
Otherwise, defer UberSync as premature optimization.
## How to Evaluate
### Success Metrics (Phase 1)
**Memory Usage**:
- ✅ Peak memory <500MB for 2,000+ file projects (was 1.5-1.7GB)
- ✅ Memory usage linear with concurrent files (10 max), not total files
- ✅ Large file memory usage: 64KB chunks (not 16MB)
**Performance**:
- ✅ Initial scan <30 seconds (was 5+ minutes)
- ✅ Full sync <5 minutes for 2,000+ files (was 15+ minutes)
- ✅ Subsequent syncs <10 seconds (only changed files)
**Stability**:
- ✅ 2,000+ file projects run on 1GB machines
- ✅ Zero OOM kills in production
- ✅ No degradation with binary files (PDFs, images)
### Success Metrics (Phase 2)
**Resource Management**:
- ✅ Zero aiohttp session leaks (verified via monitoring)
- ✅ Circuit breaker prevents repeated failures (>3 fails = skip for 1 hour)
- ✅ Tenant syncs distributed over 30s window (no thundering herd)
**Observability**:
- ✅ Logfire traces show memory usage per sync
- ✅ Clear logging of skipped files and reasons
- ✅ Metrics on sync duration, file counts, failure rates
### Test Plan
**Unit Tests** (basic-memory):
- mtime comparison logic
- Streaming checksum correctness
- Semaphore limiting (mock 100 files, verify max 10 concurrent)
- LRU cache eviction
- Checksum computation: streaming vs non-streaming equivalence
**Integration Tests** (basic-memory):
- Large file handling (create 20MB test file)
- Mixed file types (text + binary)
- Changed file detection via mtime
- Sync with 1,000+ files
**Load Tests** (basic-memory-cloud):
- Test on tenant-6d2ff1a3 (2,621 files, 31 PDFs)
- Monitor memory during full sync with Logfire
- Measure scan and sync duration
- Run on 1GB machine (downgrade from 2GB to verify)
- Simulate 10 concurrent tenant syncs
**Regression Tests**:
- Verify existing sync scenarios still work
- CLI sync behavior unchanged
- File watcher integration unaffected
### Performance Benchmarks
Establish baseline, then compare after each phase:
| Metric | Baseline | Phase 1 Target | Phase 2 Target |
|--------|----------|----------------|----------------|
| Peak Memory (2,600 files) | 1.5-1.7GB | <500MB | <450MB |
| Initial Scan Time | 5+ min | <30 sec | <30 sec |
| Full Sync Time | 15+ min | <5 min | <5 min |
| Subsequent Sync | 2+ min | <10 sec | <10 sec |
| OOM Incidents/Week | 2-3 | 0 | 0 |
| Min RAM Required | 2GB | 1GB | 1GB |
## Implementation Phases
### Phase 0.5: Database Schema & Streaming Foundation
**Priority: P0 - Required for Phase 1**
This phase establishes the foundation for streaming sync with mtime-based change detection.
**Database Schema Changes**:
- [x] Add `mtime` column to Entity model (REAL type for float timestamp)
- [x] Add `size` column to Entity model (INTEGER type for file size in bytes)
- [x] Create Alembic migration for new columns (nullable initially)
- [x] Add indexes on `(file_path, project_id)` for optimistic upsert performance
- [ ] Backfill existing entities with mtime/size from filesystem
**Streaming Architecture**:
- [x] Replace `os.walk()` with `os.scandir()` for cached stat info
- [ ] Eliminate `get_db_file_state()` - no upfront SELECT all entities
- [x] Implement streaming iterator `_scan_directory_streaming()`
- [x] Add `get_by_file_path()` optimized query (single file lookup)
- [x] Add `get_all_file_paths()` for deletion detection (paths only, no entities)
**Benefits**:
- **50% fewer network calls** on Tigris (scandir returns cached stat)
- **No large dicts in memory** (process files one at a time)
- **Indexed lookups** instead of full table scan
- **Foundation for mtime comparison** (Phase 1)
**Code Changes**:
```python
# Before: Load all entities upfront
db_paths = await self.get_db_file_state() # SELECT * FROM entity WHERE project_id = ?
scan_result = await self.scan_directory() # os.walk() + stat() per file
# After: Stream and query incrementally
async for file_path, stat_info in self.scan_directory(): # scandir() with cached stat
db_entity = await self.entity_repository.get_by_file_path(rel_path) # Indexed lookup
# Process immediately, no accumulation
```
**Files Modified**:
- `src/basic_memory/models.py` - Add mtime/size columns
- `alembic/versions/xxx_add_mtime_size.py` - Migration
- `src/basic_memory/sync/sync_service.py` - Streaming implementation
- `src/basic_memory/repository/entity_repository.py` - Add get_all_file_paths()
**Migration Strategy**:
```sql
-- Migration: Add nullable columns
ALTER TABLE entity ADD COLUMN mtime REAL;
ALTER TABLE entity ADD COLUMN size INTEGER;
-- Backfill from filesystem during first sync after upgrade
-- (Handled in sync_service on first scan)
```
### Phase 1: Core Fixes
**mtime-based scanning**:
- [x] Add mtime/size columns to Entity model (completed in Phase 0.5)
- [x] Database migration (alembic) (completed in Phase 0.5)
- [x] Refactor `scan()` to use streaming architecture with mtime/size comparison
- [x] Update `sync_markdown_file()` and `sync_regular_file()` to store mtime/size in database
- [x] Only compute checksums for changed files (mtime/size differ)
- [x] Unit tests for streaming scan (6 tests passing)
- [ ] Integration test with 1,000 files (defer to benchmarks)
**Streaming checksums**:
- [x] Implement `_compute_checksum_streaming()` with chunked reading
- [x] Add file size threshold logic (1MB)
- [x] Test with large files (16MB PDF)
- [x] Verify memory usage stays constant
- [x] Test checksum equivalence (streaming vs non-streaming)
**Bounded concurrency**:
- [x] Add semaphore (10 concurrent) to `_read_file_async()` (already existed)
- [x] Add LRU cache for failures (100 max) (already existed)
- [ ] Review thread pool size configuration
- [ ] Load test with 2,000+ files
- [ ] Verify <500MB peak memory
**Cleanup & Optimization**:
- [x] Eliminate `get_db_file_state()` - no upfront SELECT all entities (streaming architecture complete)
- [x] Consolidate file operations in FileService (eliminate duplicate checksum logic)
- [x] Add aiofiles dependency (already present)
- [x] FileService streaming checksums for files >1MB
- [x] SyncService delegates all file operations to FileService
- [x] Complete true async I/O refactoring - all file operations use aiofiles
- [x] Added `FileService.read_file_content()` using aiofiles
- [x] Removed `SyncService._read_file_async()` wrapper method
- [x] Removed `SyncService._compute_checksum_async()` wrapper method
- [x] Inlined all 7 checksum calls to use `file_service.compute_checksum()` directly
- [x] All file I/O operations now properly consolidated in FileService with non-blocking I/O
- [x] Removed sync_status_service completely (unnecessary complexity and state tracking)
- [x] Removed `sync_status_service.py` and `sync_status` MCP tool
- [x] Removed all `sync_status_tracker` calls from `sync_service.py`
- [x] Removed migration status checks from MCP tools (`write_note`, `read_note`, `build_context`)
- [x] Removed `check_migration_status()` and `wait_for_migration_or_return_status()` from `utils.py`
- [x] Removed all related tests (4 test files deleted)
- [x] All 1184 tests passing
**Phase 1 Implementation Summary:**
Phase 1 is now complete with all core fixes implemented and tested:
1. **Streaming Architecture** (Phase 0.5 + Phase 1):
- Replaced `os.walk()` with `os.scandir()` for cached stat info
- Eliminated upfront `get_db_file_state()` SELECT query
- Implemented `_scan_directory_streaming()` for incremental processing
- Added indexed `get_by_file_path()` lookups
- Result: 50% fewer network calls on TigrisFS, no large dicts in memory
2. **mtime-based Change Detection**:
- Added `mtime` and `size` columns to Entity model
- Alembic migration completed and deployed
- Only compute checksums when mtime/size differs from database
- Result: ~90% reduction in checksum operations during typical syncs
3. **True Async I/O with aiofiles**:
- All file operations consolidated in FileService
- `FileService.compute_checksum()`: 64KB chunked reading for constant memory (lines 261-296 of file_service.py)
- `FileService.read_file_content()`: Non-blocking file reads with aiofiles (lines 160-193 of file_service.py)
- Removed all wrapper methods from SyncService (`_read_file_async`, `_compute_checksum_async`)
- Semaphore controls concurrency (max 10 concurrent file operations)
- Result: Constant memory usage regardless of file size, true non-blocking I/O
4. **Test Coverage**:
- 41/43 sync tests passing (2 skipped as expected)
- Circuit breaker tests updated for new architecture
- Streaming checksum equivalence verified
- All edge cases covered (large files, concurrent operations, failures)
**Key Files Modified**:
- `src/basic_memory/models.py` - Added mtime/size columns
- `alembic/versions/xxx_add_mtime_size.py` - Database migration
- `src/basic_memory/sync/sync_service.py` - Streaming implementation, removed wrapper methods
- `src/basic_memory/services/file_service.py` - Added `read_file_content()`, streaming checksums
- `src/basic_memory/repository/entity_repository.py` - Added `get_all_file_paths()`
- `tests/sync/test_sync_service.py` - Updated circuit breaker test mocks
**Performance Improvements Achieved**:
- Memory usage: Constant per file (64KB chunks) vs full file in memory
- Scan speed: Stat-only scan (no checksums for unchanged files)
- I/O efficiency: True async with aiofiles (no thread pool blocking)
- Network efficiency: 50% fewer calls on TigrisFS via scandir caching
- Architecture: Clean separation of concerns (FileService owns all file I/O)
- Reduced complexity: Removed unnecessary sync_status_service state tracking
**Observability**:
- [x] Added Logfire instrumentation to `sync_file()` and `sync_markdown_file()`
- [x] Logfire disabled by default via `ignore_no_config = true` in pyproject.toml
- [x] No telemetry in FOSS version unless explicitly configured
- [x] Cloud deployment can enable Logfire for performance monitoring
**Next Steps**: Phase 1.5 scan watermark optimization for large project performance.
### Phase 1.5: Scan Watermark Optimization
**Priority: P0 - Critical for Large Projects**
This phase addresses Issue #388 where large projects (1,460+ files) take 7+ minutes for sync operations even when no files have changed.
**Problem Analysis**:
From production data (tenant-0a20eb58):
- Total sync time: 420-450 seconds (7+ minutes) with 0 changes
- Scan phase: 321 seconds (75% of total time)
- Per-file cost: 220ms × 1,460 files = 5+ minutes
- Root cause: Network I/O to TigrisFS for stat operations (even with mtime columns)
- 15 concurrent syncs every 30 seconds compounds the problem
**Current Behavior** (Phase 1):
```python
async def scan(self, directory: Path):
"""Scan filesystem using mtime/size comparison"""
# Still stats ALL 1,460 files every sync cycle
async for file_path, stat_info in self._scan_directory_streaming():
db_entity = await self.entity_repository.get_by_file_path(file_path)
# Compare mtime/size, skip unchanged files
# Only checksum if changed (✅ already optimized)
```
**Problem**: Even with mtime optimization, we stat every file on every scan. On TigrisFS (network FUSE mount), this means 1,460 network calls taking 5+ minutes.
**Solution: Scan Watermark + File Count Detection**
Track when we last scanned and how many files existed. Use filesystem-level filtering to only examine files modified since last scan.
**Key Insight**: File count changes signal deletions
- Count same → incremental scan (95% of syncs)
- Count increased → new files found by incremental (4% of syncs)
- Count decreased → files deleted, need full scan (1% of syncs)
**Database Schema Changes**:
Add to Project model:
```python
last_scan_timestamp: float | None # Unix timestamp of last successful scan start
last_file_count: int | None # Number of files found in last scan
```
**Implementation Strategy**:
```python
async def scan(self, directory: Path):
"""Smart scan using watermark and file count"""
project = await self.project_repository.get_current()
# Step 1: Quick file count (fast on TigrisFS: 1.4s for 1,460 files)
current_count = await self._quick_count_files(directory)
# Step 2: Determine scan strategy
if project.last_file_count is None:
# First sync ever → full scan
file_paths = await self._scan_directory_full(directory)
scan_type = "full_initial"
elif current_count < project.last_file_count:
# Files deleted → need full scan to detect which ones
file_paths = await self._scan_directory_full(directory)
scan_type = "full_deletions"
logger.info(f"File count decreased ({project.last_file_count} → {current_count}), running full scan")
elif project.last_scan_timestamp is not None:
# Incremental scan: only files modified since last scan
file_paths = await self._scan_directory_modified_since(
directory,
project.last_scan_timestamp
)
scan_type = "incremental"
logger.info(f"Incremental scan since {project.last_scan_timestamp}, found {len(file_paths)} changed files")
else:
# Fallback to full scan
file_paths = await self._scan_directory_full(directory)
scan_type = "full_fallback"
# Step 3: Process changed files (existing logic)
for file_path in file_paths:
await self._process_file(file_path)
# Step 4: Update watermark AFTER successful scan
await self.project_repository.update(
project.id,
last_scan_timestamp=time.time(), # Start of THIS scan
last_file_count=current_count
)
# Step 5: Record metrics
logfire.metric_counter(f"sync.scan.{scan_type}").add(1)
logfire.metric_histogram("sync.scan.files_scanned", unit="files").record(len(file_paths))
```
**Helper Methods**:
```python
async def _quick_count_files(self, directory: Path) -> int:
"""Fast file count using find command"""
# TigrisFS: 1.4s for 1,460 files
result = await asyncio.create_subprocess_shell(
f'find "{directory}" -type f | wc -l',
stdout=asyncio.subprocess.PIPE
)
stdout, _ = await result.communicate()
return int(stdout.strip())
async def _scan_directory_modified_since(
self,
directory: Path,
since_timestamp: float
) -> List[str]:
"""Use find -newermt for filesystem-level filtering"""
# Convert timestamp to find-compatible format
since_date = datetime.fromtimestamp(since_timestamp).strftime("%Y-%m-%d %H:%M:%S")
# TigrisFS: 0.2s for 0 changed files (vs 5+ minutes for full scan)
result = await asyncio.create_subprocess_shell(
f'find "{directory}" -type f -newermt "{since_date}"',
stdout=asyncio.subprocess.PIPE
)
stdout, _ = await result.communicate()
# Convert absolute paths to relative
file_paths = []
for line in stdout.decode().splitlines():
if line:
rel_path = Path(line).relative_to(directory).as_posix()
file_paths.append(rel_path)
return file_paths
```
**TigrisFS Testing Results** (SSH to production-basic-memory-tenant-0a20eb58):
```bash
# Full file count
$ time find . -type f | wc -l
1460
real 0m1.362s # ✅ Acceptable
# Incremental scan (1 hour window)
$ time find . -type f -newermt "2025-01-20 10:00:00" | wc -l
0
real 0m0.161s # ✅ 8.5x faster!
# Incremental scan (24 hours)
$ time find . -type f -newermt "2025-01-19 11:00:00" | wc -l
0
real 0m0.239s # ✅ 5.7x faster!
```
**Conclusion**: `find -newermt` works perfectly on TigrisFS and provides massive speedup.
**Expected Performance Improvements**:
| Scenario | Files Changed | Current Time | With Watermark | Speedup |
|----------|---------------|--------------|----------------|---------|
| No changes (common) | 0 | 420s | ~2s | 210x |
| Few changes | 5-10 | 420s | ~5s | 84x |
| Many changes | 100+ | 420s | ~30s | 14x |
| Deletions (rare) | N/A | 420s | 420s | 1x |
**Full sync breakdown** (1,460 files, 0 changes):
- File count: 1.4s
- Incremental scan: 0.2s
- Database updates: 0.4s
- **Total: ~2s (225x faster)**
**Metrics to Track**:
```python
# Scan type distribution
logfire.metric_counter("sync.scan.full_initial").add(1)
logfire.metric_counter("sync.scan.full_deletions").add(1)
logfire.metric_counter("sync.scan.incremental").add(1)
# Performance metrics
logfire.metric_histogram("sync.scan.duration", unit="ms").record(scan_ms)
logfire.metric_histogram("sync.scan.files_scanned", unit="files").record(file_count)
logfire.metric_histogram("sync.scan.files_changed", unit="files").record(changed_count)
# Watermark effectiveness
logfire.metric_histogram("sync.scan.watermark_age", unit="s").record(
time.time() - project.last_scan_timestamp
)
```
**Edge Cases Handled**:
1. **First sync**: No watermark → full scan (expected)
2. **Deletions**: File count decreased → full scan (rare but correct)
3. **Clock skew**: Use scan start time, not end time (captures files created during scan)
4. **Scan failure**: Don't update watermark on failure (retry will re-scan)
5. **New files**: Count increased → incremental scan finds them (common, fast)
**Files to Modify**:
- `src/basic_memory/models.py` - Add last_scan_timestamp, last_file_count to Project
- `alembic/versions/xxx_add_scan_watermark.py` - Migration for new columns
- `src/basic_memory/sync/sync_service.py` - Implement watermark logic
- `src/basic_memory/repository/project_repository.py` - Update methods
- `tests/sync/test_sync_watermark.py` - Test watermark behavior
**Test Plan**:
- [x] SSH test on TigrisFS confirms `find -newermt` works (completed)
- [x] Unit tests for scan strategy selection (4 tests)
- [x] Unit tests for file count detection (integrated in strategy tests)
- [x] Integration test: verify incremental scan finds changed files (4 tests)
- [x] Integration test: verify deletion detection triggers full scan (2 tests)
- [ ] Load test on tenant-0a20eb58 (1,460 files) - pending production deployment
- [ ] Verify <3s for no-change sync - pending production deployment
**Implementation Status**: ✅ **COMPLETED**
**Code Changes** (Commit: `fb16055d`):
- ✅ Added `last_scan_timestamp` and `last_file_count` to Project model
- ✅ Created database migration `e7e1f4367280_add_scan_watermark_tracking_to_project.py`
- ✅ Implemented smart scan strategy selection in `sync_service.py`
- ✅ Added `_quick_count_files()` using `find | wc -l` (~1.4s for 1,460 files)
- ✅ Added `_scan_directory_modified_since()` using `find -newermt` (~0.2s)
- ✅ Added `_scan_directory_full()` wrapper for full scans
- ✅ Watermark update logic after successful sync (uses sync START time)
- ✅ Logfire metrics for scan types and performance tracking
**Test Coverage** (18 tests in `test_sync_service_incremental.py`):
- ✅ Scan strategy selection (4 tests)
- First sync uses full scan
- File count decreased triggers full scan
- Same file count uses incremental scan
- Increased file count uses incremental scan
- ✅ Incremental scan base cases (4 tests)
- No changes scenario
- Detects new files
- Detects modified files
- Detects multiple changes
- ✅ Deletion detection (2 tests)
- Single file deletion
- Multiple file deletions
- ✅ Move detection (2 tests)
- Moves require full scan (renames don't update mtime)
- Moves detected in full scan via checksum
- ✅ Watermark update (3 tests)
- Watermark updated after successful sync
- Watermark uses sync start time
- File count accuracy
- ✅ Edge cases (3 tests)
- Concurrent file changes
- Empty directory handling
- Respects .gitignore patterns
**Performance Expectations** (to be verified in production):
- No changes: 420s → ~2s (210x faster)
- Few changes (5-10): 420s → ~5s (84x faster)
- Many changes (100+): 420s → ~30s (14x faster)
- Deletions: 420s → 420s (full scan, rare case)
**Rollout Strategy**:
1. ✅ Code complete and tested (18 new tests, all passing)
2. ✅ Pushed to `phase-0.5-streaming-foundation` branch
3. ⏳ Windows CI tests running
4. 📊 Deploy to staging tenant with watermark optimization
5. 📊 Monitor scan performance metrics via Logfire
6. 📊 Verify no missed files (compare full vs incremental results)
7. 📊 Deploy to production tenant-0a20eb58
8. 📊 Measure actual improvement (expect 420s → 2-3s)
**Success Criteria**:
- ✅ Implementation complete with comprehensive tests
- [ ] No-change syncs complete in <3 seconds (was 420s) - pending production test
- [ ] Incremental scans (95% of cases) use watermark - pending production test
- [ ] Deletion detection works correctly (full scan when needed) - tested in unit tests ✅
- [ ] No files missed due to watermark logic - tested in unit tests ✅
- [ ] Metrics show scan type distribution matches expectations - pending production test
**Next Steps**:
1. Production deployment to tenant-0a20eb58
2. Measure actual performance improvements
3. Monitor metrics for 1 week
4. Phase 2 cloud-specific fixes
5. Phase 3 production measurement and UberSync decision
### Phase 2: Cloud Fixes
**Resource leaks**:
- [ ] Fix aiohttp session context manager
- [ ] Implement persistent circuit breaker
- [ ] Add memory monitoring/alerts
- [ ] Test on production tenant
**Sync coordination**:
- [ ] Implement hash-based staggering
- [ ] Add jitter to sync intervals
- [ ] Load test with 10 concurrent tenants
- [ ] Verify no thundering herd
### Phase 3: Measurement
**Deploy to production**:
- [ ] Deploy Phase 1+2 changes
- [ ] Downgrade tenant-6d2ff1a3 to 1GB
- [ ] Monitor for OOM incidents
**Collect metrics**:
- [ ] Memory usage patterns
- [ ] Sync duration distributions
- [ ] Concurrent sync load
- [ ] Cost analysis
**UberSync decision**:
- [ ] Review metrics against decision criteria
- [ ] Document findings
- [ ] Create SPEC-18 for UberSync if needed
## Related Issues
### basic-memory (core)
- [#383](https://github.com/basicmachines-co/basic-memory/issues/383) - Refactor sync to use mtime-based scanning
- [#382](https://github.com/basicmachines-co/basic-memory/issues/382) - Optimize memory for large file syncs
- [#371](https://github.com/basicmachines-co/basic-memory/issues/371) - aiofiles for non-blocking I/O (future)
### basic-memory-cloud
- [#198](https://github.com/basicmachines-co/basic-memory-cloud/issues/198) - Memory optimization for sync worker
- [#189](https://github.com/basicmachines-co/basic-memory-cloud/issues/189) - Circuit breaker for infinite retry loops
## References
**Standard sync tools using mtime**:
- rsync: Uses mtime-based comparison by default, only checksums on `--checksum` flag
- rclone: Default is mtime/size, `--checksum` mode optional
- syncthing: Block-level sync with mtime tracking
**fsnotify polling** (future consideration):
- [fsnotify/fsnotify#9](https://github.com/fsnotify/fsnotify/issues/9) - Polling mode for network filesystems
## Notes
### Why Not UberSync Now?
**Premature Optimization**:
- Current problems are algorithmic, not architectural
- No evidence that multi-tenant coordination is the issue
- Single tenant OOM proves algorithm is the problem
**Benefits of Core-First Approach**:
- ✅ Helps all users (CLI + Cloud)
- ✅ Lower risk (no new service)
- ✅ Clear path (issues specify fixes)
- ✅ Can defer UberSync until proven necessary
**When UberSync Makes Sense**:
- >100 active tenants causing resource contention
- Need for tenant tier prioritization (paid > free)
- Centralized observability requirements
- Cost optimization at scale
### Migration Strategy
**Backward Compatibility**:
- New mtime/size columns nullable initially
- Existing entities sync normally (compute mtime on first scan)
- No breaking changes to MCP API
- CLI behavior unchanged
**Rollout**:
1. Deploy to staging with test tenant
2. Validate memory/performance improvements
3. Deploy to production (blue-green)
4. Monitor for 1 week
5. Downgrade tenant machines if successful
## Further Considerations
### Version Control System (VCS) Integration
**Context:** Users frequently request git versioning, and large projects with PDFs/images pose memory challenges.
#### Git-Based Sync
**Approach:** Use git for change detection instead of custom mtime comparison.
```python
# Git automatically tracks changes
repo = git.Repo(project_path)
repo.git.add(A=True)
diff = repo.index.diff('HEAD')
for change in diff:
if change.change_type == 'M': # Modified
await sync_file(change.b_path)
```
**Pros:**
- ✅ Proven, battle-tested change detection
- ✅ Built-in rename/move detection (similarity index)
- ✅ Efficient for cloud sync (git protocol over HTTP)
- ✅ Could enable version history as bonus feature
- ✅ Users want git integration anyway
**Cons:**
- ❌ User confusion (`.git` folder in knowledge base)
- ❌ Conflicts with existing git repos (submodule complexity)
- ❌ Adds dependency (git binary or dulwich/pygit2)
- ❌ Less control over sync logic
- ❌ Doesn't solve large file problem (PDFs still checksummed)
- ❌ Git LFS adds complexity
#### Jujutsu (jj) Alternative
**Why jj is compelling:**
1. **Working Copy as Source of Truth**
- Git: Staging area is intermediate state
- Jujutsu: Working copy IS a commit
- Aligns with "files are source of truth" philosophy!
2. **Automatic Change Tracking**
- No manual staging required
- Working copy changes tracked automatically
- Better fit for sync operations vs git's commit-centric model
3. **Conflict Handling**
- User edits + sync changes both preserved
- Operation log vs linear history
- Built for operations, not just history
**Cons:**
- ❌ New/immature (2020 vs git's 2005)
- ❌ Not universally available
- ❌ Steeper learning curve for users
- ❌ No LFS equivalent yet
- ❌ Still doesn't solve large file checksumming
#### Git Index Format (Hybrid Approach)
**Best of both worlds:** Use git's index format without full git repo.
```python
from dulwich.index import Index # Pure Python
# Use git index format for tracking
idx = Index(project_path / '.basic-memory' / 'index')
for file in files:
stat = file.stat()
if idx.get(file) and idx[file].mtime == stat.st_mtime:
continue # Unchanged (git's proven logic)
await sync_file(file)
idx[file] = (stat.st_mtime, stat.st_size, sha)
```
**Pros:**
- ✅ Git's proven change detection logic
- ✅ No user-visible `.git` folder
- ✅ No git dependency (pure Python)
- ✅ Full control over sync
**Cons:**
- ❌ Adds dependency (dulwich)
- ❌ Doesn't solve large files
- ❌ No built-in versioning
### Large File Handling
**Problem:** PDFs/images cause memory issues regardless of VCS choice.
**Solutions (Phase 1+):**
**1. Skip Checksums for Large Files**
```python
if stat.st_size > 10_000_000: # 10MB threshold
checksum = None # Use mtime/size only
logger.info(f"Skipping checksum for {file_path}")
```
**2. Partial Hashing**
```python
if file.suffix in ['.pdf', '.jpg', '.png']:
# Hash first/last 64KB instead of entire file
checksum = hash_partial(file, chunk_size=65536)
```
**3. External Blob Storage**
```python
if stat.st_size > 10_000_000:
blob_id = await upload_to_tigris_blob(file)
entity.blob_id = blob_id
entity.file_path = None # Not in main sync
```
### Recommendation & Timeline
**Phase 0.5-1 (Now):** Custom streaming + mtime
- ✅ Solves urgent memory issues
- ✅ No dependencies
- ✅ Full control
- ✅ Skip checksums for large files (>10MB)
- ✅ Proven pattern (rsync/rclone)
**Phase 2 (After metrics):** Git index format exploration
```python
# Optional: Use git index for tracking if beneficial
from dulwich.index import Index
# No git repo, just index file format
```
**Future (User feature):** User-facing versioning
```python
# Let users opt into VCS:
basic-memory config set versioning git
basic-memory config set versioning jj
basic-memory config set versioning none # Current behavior
# Integrate with their chosen workflow
# Not forced upon them
```
**Rationale:**
1. **Don't block on VCS decision** - Memory issues are P0
2. **Learn from deployment** - See actual usage patterns
3. **Keep options open** - Can add git/jj later
4. **Files as source of truth** - Core philosophy preserved
5. **Large files need attention regardless** - VCS won't solve that
**Decision Point:**
- If Phase 0.5/1 achieves memory targets → VCS integration deferred
- If users strongly request versioning → Add as opt-in feature
- If change detection becomes bottleneck → Explore git index format
## Agent Assignment
**Phase 1 Implementation**: `python-developer` agent
- Expertise in FastAPI, async Python, database migrations
- Handles basic-memory core changes
**Phase 2 Implementation**: `python-developer` agent
- Same agent continues with cloud-specific fixes
- Maintains consistency across phases
**Phase 3 Review**: `system-architect` agent
- Analyzes metrics and makes UberSync decision
- Creates SPEC-18 if centralized service needed
```
--------------------------------------------------------------------------------
/specs/SPEC-20 Simplified Project-Scoped Rclone Sync.md:
--------------------------------------------------------------------------------
```markdown
---
title: 'SPEC-20: Simplified Project-Scoped Rclone Sync'
date: 2025-01-27
updated: 2025-01-28
status: Implemented
priority: High
goal: Simplify cloud sync by making it project-scoped, safe by design, and closer to native rclone commands
parent: SPEC-8
---
## Executive Summary
The current rclone implementation (SPEC-8) has proven too complex with multiple footguns:
- Two workflows (mount vs bisync) with different directories causing confusion
- Multiple profiles (3 for mount, 3 for bisync) creating too much choice
- Directory conflicts (`~/basic-memory-cloud/` vs `~/basic-memory-cloud-sync/`)
- Auto-discovery of folders leading to errors
- Unclear what syncs and when
This spec proposes a **radical simplification**: project-scoped sync operations that are explicit, safe, and thin wrappers around rclone commands.
## Why
### Current Problems
**Complexity:**
- Users must choose between mount and bisync workflows
- Different directories for different workflows
- Profile selection (6 total profiles) overwhelms users
- Setup requires multiple steps with potential conflicts
**Footguns:**
- Renaming local folder breaks sync (no config tracking)
- Mount directory conflicts with bisync directory
- Auto-discovered folders create phantom projects
- Uninitialized bisync state causes failures
- Unclear which files sync (all files in root directory?)
**User Confusion:**
- "What does `bm sync` actually do?"
- "Is `~/basic-memory-cloud-sync/my-folder/` a project or just a folder?"
- "Why do I have two basic-memory directories?"
- "How do I sync just one project?"
### Design Principles (Revised)
1. **Projects are independent** - Each project manages its own sync state
2. **Global cloud mode** - You're either local or cloud (no per-project flag)
3. **Explicit operations** - No auto-discovery, no magic
4. **Safe by design** - Config tracks state, not filesystem
5. **Thin rclone wrappers** - Stay close to rclone commands
6. **One good way** - Remove choices that don't matter
## What
### New Architecture
#### Core Concepts
1. **Global Cloud Mode** (existing, keep as-is)
- `cloud_mode_enabled` in config
- `bm cloud login` enables it, `bm cloud logout` disables it
- When enabled, ALL Basic Memory operations hit cloud API
2. **Project-Scoped Sync** (new)
- Each project optionally has a `local_path` for local working copy
- Sync operations are explicit: `bm project sync --name research`
- Projects can live anywhere on disk, not forced into sync directory
3. **Simplified rclone Config** (new)
- Single remote named `bm-cloud` (not `basic-memory-{tenant_id}`)
- One credential set per user
- Config lives at `~/.config/rclone/rclone.conf`
#### Command Structure
```bash
# Setup (once)
bm cloud login # Authenticate, enable cloud mode
bm cloud setup # Install rclone, generate credentials
# Project creation with optional sync
bm project add research # Create cloud project (no local sync)
bm project add research --local ~/docs # Create with local sync configured
# Or configure sync later
bm project sync-setup research ~/docs # Configure local sync for existing project
# Project-scoped rclone operations
bm project sync --name research # One-way sync (local → cloud)
bm project bisync --name research # Two-way sync (local ↔ cloud)
bm project check --name research # Verify integrity
# Advanced file operations
bm project ls --name research [path] # List remote files
bm project copy --name research src dst # Copy files
# Batch operations
bm project sync --all # Sync all projects with local_sync_path
bm project bisync --all # Two-way sync all projects
```
#### Config Model
```json
{
"cloud_mode": true,
"cloud_host": "https://cloud.basicmemory.com",
"projects": {
// Used in LOCAL mode only (simple name → path mapping)
"main": "/Users/user/basic-memory"
},
"cloud_projects": {
// Used in CLOUD mode for sync configuration
"research": {
"local_path": "~/Documents/research",
"last_sync": "2025-01-27T10:00:00Z",
"bisync_initialized": true
},
"work": {
"local_path": "~/work",
"last_sync": null,
"bisync_initialized": false
}
}
}
```
**Note:** In cloud mode, the actual project list comes from the API (`GET /projects/projects`). The `cloud_projects` dict only stores local sync configuration.
#### Rclone Config
```ini
# ~/.config/rclone/rclone.conf
[basic-memory-cloud]
type = s3
provider = Other
access_key_id = {scoped_access_key}
secret_access_key = {scoped_secret_key}
endpoint = https://fly.storage.tigris.dev
region = auto
```
### What Gets Removed
- ❌ Mount commands (`bm cloud mount`, `bm cloud unmount`, `bm cloud mount-status`)
- ❌ Profile selection (both mount and bisync profiles)
- ❌ `~/basic-memory-cloud/` directory (mount point)
- ❌ `~/basic-memory-cloud-sync/` directory (forced sync location)
- ❌ Auto-discovery of folders
- ❌ Separate `bisync-setup` command
- ❌ `bisync_config` in config.json
- ❌ Convenience commands (`bm sync`, `bm bisync` without project name)
- ❌ Complex state management for global sync
### What Gets Simplified
- ✅ One setup command: `bm cloud setup`
- ✅ One rclone remote: `bm-cloud`
- ✅ One workflow: project-scoped bisync (remove mount)
- ✅ One set of defaults (balanced settings from SPEC-8)
- ✅ Clear project-to-path mapping in config
- ✅ Explicit sync operations only
### What Gets Added
- ✅ `bm project sync --name <project>` (one-way: local → cloud)
- ✅ `bm project bisync --name <project>` (two-way: local ↔ cloud)
- ✅ `bm project check --name <project>` (integrity verification)
- ✅ `bm project sync-setup <project> <local_path>` (configure sync)
- ✅ `bm project ls --name <project> [path]` (list remote files)
- ✅ `bm project copy --name <project> <src> <dst>` (copy files)
- ✅ `cloud_projects` dict in config.json
## How
### Phase 1: Project Model Updates
**1.1 Update Config Schema**
```python
# basic_memory/config.py
class Config(BaseModel):
# ... existing fields ...
cloud_mode: bool = False
cloud_host: str = "https://cloud.basicmemory.com"
# Local mode: simple name → path mapping
projects: dict[str, str] = {}
# Cloud mode: sync configuration per project
cloud_projects: dict[str, CloudProjectConfig] = {}
class CloudProjectConfig(BaseModel):
"""Sync configuration for a cloud project."""
local_path: str # Local working directory
last_sync: Optional[datetime] = None # Last successful sync
bisync_initialized: bool = False # Whether bisync baseline exists
```
**No database changes needed** - sync config lives in `~/.basic-memory/config.json` only.
### Phase 2: Simplified Rclone Configuration
**2.1 Simplify Remote Naming**
```python
# basic_memory/cli/commands/cloud/rclone_config.py
def configure_rclone_remote(
access_key: str,
secret_key: str,
endpoint: str = "https://fly.storage.tigris.dev",
region: str = "auto",
) -> None:
"""Configure single rclone remote named 'bm-cloud'."""
config = load_rclone_config()
# Single remote name (not tenant-specific)
REMOTE_NAME = "basic-memory-cloud"
if not config.has_section(REMOTE_NAME):
config.add_section(REMOTE_NAME)
config.set(REMOTE_NAME, "type", "s3")
config.set(REMOTE_NAME, "provider", "Other")
config.set(REMOTE_NAME, "access_key_id", access_key)
config.set(REMOTE_NAME, "secret_access_key", secret_key)
config.set(REMOTE_NAME, "endpoint", endpoint)
config.set(REMOTE_NAME, "region", region)
save_rclone_config(config)
```
**2.2 Remove Profile Complexity**
Use single set of balanced defaults (from SPEC-8 Phase 4 testing):
- `conflict_resolve`: `newer` (auto-resolve to most recent)
- `max_delete`: `25` (safety limit)
- `check_access`: `false` (skip for performance)
### Phase 3: Project-Scoped Rclone Commands
**3.1 Core Sync Operations**
```python
# basic_memory/cli/commands/cloud/rclone_commands.py
def get_project_remote(project: Project, bucket_name: str) -> str:
"""Build rclone remote path for project.
Returns: bm-cloud:bucket-name/app/data/research
"""
# Strip leading slash from cloud path
cloud_path = project.path.lstrip("/")
return f"basic-memory-cloud:{bucket_name}/{cloud_path}"
def project_sync(
project: Project,
bucket_name: str,
dry_run: bool = False,
verbose: bool = False,
) -> bool:
"""One-way sync: local → cloud.
Uses rclone sync to make cloud identical to local.
"""
if not project.local_sync_path:
raise RcloneError(f"Project {project.name} has no local_sync_path configured")
local_path = Path(project.local_sync_path).expanduser()
remote_path = get_project_remote(project, bucket_name)
filter_path = get_bmignore_filter_path()
cmd = [
"rclone", "sync",
str(local_path),
remote_path,
"--filters-file", str(filter_path),
]
if verbose:
cmd.append("--verbose")
else:
cmd.append("--progress")
if dry_run:
cmd.append("--dry-run")
result = subprocess.run(cmd, text=True)
return result.returncode == 0
def project_bisync(
project: Project,
bucket_name: str,
dry_run: bool = False,
resync: bool = False,
verbose: bool = False,
) -> bool:
"""Two-way sync: local ↔ cloud.
Uses rclone bisync with balanced defaults.
"""
if not project.local_sync_path:
raise RcloneError(f"Project {project.name} has no local_sync_path configured")
local_path = Path(project.local_sync_path).expanduser()
remote_path = get_project_remote(project, bucket_name)
filter_path = get_bmignore_filter_path()
state_path = get_project_bisync_state(project.name)
# Ensure state directory exists
state_path.mkdir(parents=True, exist_ok=True)
cmd = [
"rclone", "bisync",
str(local_path),
remote_path,
"--create-empty-src-dirs",
"--resilient",
"--conflict-resolve=newer",
"--max-delete=25",
"--filters-file", str(filter_path),
"--workdir", str(state_path),
]
if verbose:
cmd.append("--verbose")
else:
cmd.append("--progress")
if dry_run:
cmd.append("--dry-run")
if resync:
cmd.append("--resync")
# Check if first run requires resync
if not resync and not bisync_initialized(project.name) and not dry_run:
raise RcloneError(
f"First bisync for {project.name} requires --resync to establish baseline.\n"
f"Run: bm project bisync --name {project.name} --resync"
)
result = subprocess.run(cmd, text=True)
return result.returncode == 0
def project_check(
project: Project,
bucket_name: str,
one_way: bool = False,
) -> bool:
"""Check integrity between local and cloud.
Returns True if files match, False if differences found.
"""
if not project.local_sync_path:
raise RcloneError(f"Project {project.name} has no local_sync_path configured")
local_path = Path(project.local_sync_path).expanduser()
remote_path = get_project_remote(project, bucket_name)
filter_path = get_bmignore_filter_path()
cmd = [
"rclone", "check",
str(local_path),
remote_path,
"--filter-from", str(filter_path),
]
if one_way:
cmd.append("--one-way")
result = subprocess.run(cmd, capture_output=True, text=True)
return result.returncode == 0
```
**3.2 Advanced File Operations**
```python
def project_ls(
project: Project,
bucket_name: str,
path: Optional[str] = None,
) -> list[str]:
"""List files in remote project."""
remote_path = get_project_remote(project, bucket_name)
if path:
remote_path = f"{remote_path}/{path}"
cmd = ["rclone", "ls", remote_path]
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
return result.stdout.splitlines()
def project_copy(
project: Project,
bucket_name: str,
src: str,
dst: str,
dry_run: bool = False,
) -> bool:
"""Copy files within project scope."""
# Implementation similar to sync
pass
```
### Phase 4: CLI Integration
**4.1 Update Project Commands**
```python
# basic_memory/cli/commands/project.py
@project_app.command("add")
def add_project(
name: str = typer.Argument(..., help="Name of the project"),
path: str = typer.Argument(None, help="Path (required for local mode)"),
local: str = typer.Option(None, "--local", help="Local sync path for cloud mode"),
set_default: bool = typer.Option(False, "--default", help="Set as default"),
) -> None:
"""Add a new project.
Cloud mode examples:
bm project add research # No local sync
bm project add research --local ~/docs # With local sync
Local mode example:
bm project add research ~/Documents/research
"""
config = ConfigManager().config
if config.cloud_mode_enabled:
# Cloud mode: auto-generate cloud path from name
async def _add_project():
async with get_client() as client:
data = {
"name": name,
"path": generate_permalink(name),
"local_sync_path": local, # Optional
"set_default": set_default,
}
response = await call_post(client, "/projects/projects", json=data)
return ProjectStatusResponse.model_validate(response.json())
else:
# Local mode: path is required
if path is None:
console.print("[red]Error: path argument is required in local mode[/red]")
raise typer.Exit(1)
resolved_path = Path(os.path.abspath(os.path.expanduser(path))).as_posix()
async def _add_project():
async with get_client() as client:
data = {
"name": name,
"path": resolved_path,
"set_default": set_default,
}
response = await call_post(client, "/projects/projects", json=data)
return ProjectStatusResponse.model_validate(response.json())
# Execute and display result
result = asyncio.run(_add_project())
console.print(f"[green]{result.message}[/green]")
@project_app.command("sync-setup")
def setup_project_sync(
name: str = typer.Argument(..., help="Project name"),
local_path: str = typer.Argument(..., help="Local sync directory"),
) -> None:
"""Configure local sync for an existing cloud project."""
config = ConfigManager().config
if not config.cloud_mode_enabled:
console.print("[red]Error: sync-setup only available in cloud mode[/red]")
raise typer.Exit(1)
resolved_path = Path(os.path.abspath(os.path.expanduser(local_path))).as_posix()
async def _update_project():
async with get_client() as client:
data = {"local_sync_path": resolved_path}
project_permalink = generate_permalink(name)
response = await call_patch(
client,
f"/projects/{project_permalink}",
json=data,
)
return ProjectStatusResponse.model_validate(response.json())
result = asyncio.run(_update_project())
console.print(f"[green]{result.message}[/green]")
console.print(f"\nLocal sync configured: {resolved_path}")
console.print(f"\nTo sync: bm project bisync --name {name} --resync")
```
**4.2 New Sync Commands**
```python
# basic_memory/cli/commands/project.py
@project_app.command("sync")
def sync_project(
name: str = typer.Option(..., "--name", help="Project name"),
all_projects: bool = typer.Option(False, "--all", help="Sync all projects"),
dry_run: bool = typer.Option(False, "--dry-run", help="Preview changes"),
verbose: bool = typer.Option(False, "--verbose", help="Show detailed output"),
) -> None:
"""One-way sync: local → cloud (make cloud identical to local)."""
config = ConfigManager().config
if not config.cloud_mode_enabled:
console.print("[red]Error: sync only available in cloud mode[/red]")
raise typer.Exit(1)
# Get projects to sync
if all_projects:
projects = get_all_sync_projects()
else:
projects = [get_project_by_name(name)]
# Get bucket name
tenant_info = asyncio.run(get_mount_info())
bucket_name = tenant_info.bucket_name
# Sync each project
for project in projects:
if not project.local_sync_path:
console.print(f"[yellow]Skipping {project.name}: no local_sync_path[/yellow]")
continue
console.print(f"[blue]Syncing {project.name}...[/blue]")
try:
project_sync(project, bucket_name, dry_run=dry_run, verbose=verbose)
console.print(f"[green]✓ {project.name} synced[/green]")
except RcloneError as e:
console.print(f"[red]✗ {project.name} failed: {e}[/red]")
@project_app.command("bisync")
def bisync_project(
name: str = typer.Option(..., "--name", help="Project name"),
all_projects: bool = typer.Option(False, "--all", help="Bisync all projects"),
dry_run: bool = typer.Option(False, "--dry-run", help="Preview changes"),
resync: bool = typer.Option(False, "--resync", help="Force new baseline"),
verbose: bool = typer.Option(False, "--verbose", help="Show detailed output"),
) -> None:
"""Two-way sync: local ↔ cloud (bidirectional sync)."""
# Similar to sync but calls project_bisync()
pass
@project_app.command("check")
def check_project(
name: str = typer.Option(..., "--name", help="Project name"),
one_way: bool = typer.Option(False, "--one-way", help="Check one direction only"),
) -> None:
"""Verify file integrity between local and cloud."""
# Calls project_check()
pass
@project_app.command("ls")
def list_project_files(
name: str = typer.Option(..., "--name", help="Project name"),
path: str = typer.Argument(None, help="Path within project"),
) -> None:
"""List files in remote project."""
# Calls project_ls()
pass
```
**4.3 Update Cloud Setup**
```python
# basic_memory/cli/commands/cloud/core_commands.py
@cloud_app.command("setup")
def cloud_setup() -> None:
"""Set up cloud sync (install rclone and configure credentials)."""
console.print("[bold blue]Basic Memory Cloud Setup[/bold blue]")
console.print("Installing rclone and configuring credentials...\n")
try:
# Step 1: Install rclone
console.print("[blue]Step 1: Installing rclone...[/blue]")
install_rclone()
# Step 2: Get tenant info
console.print("\n[blue]Step 2: Getting tenant information...[/blue]")
tenant_info = asyncio.run(get_mount_info())
tenant_id = tenant_info.tenant_id
console.print(f"[green]✓ Tenant: {tenant_id}[/green]")
# Step 3: Generate credentials
console.print("\n[blue]Step 3: Generating sync credentials...[/blue]")
creds = asyncio.run(generate_mount_credentials(tenant_id))
console.print("[green]✓ Generated credentials[/green]")
# Step 4: Configure rclone (single remote: bm-cloud)
console.print("\n[blue]Step 4: Configuring rclone...[/blue]")
configure_rclone_remote(
access_key=creds.access_key,
secret_key=creds.secret_key,
)
console.print("\n[bold green]✓ Cloud setup completed![/bold green]")
console.print("\nNext steps:")
console.print(" 1. Create projects with local sync:")
console.print(" bm project add research --local ~/Documents/research")
console.print("\n 2. Or configure sync for existing projects:")
console.print(" bm project sync-setup research ~/Documents/research")
console.print("\n 3. Start syncing:")
console.print(" bm project bisync --name research --resync")
except Exception as e:
console.print(f"\n[red]Setup failed: {e}[/red]")
raise typer.Exit(1)
```
### Phase 5: Cleanup
**5.1 Remove Deprecated Commands**
```python
# Remove from cloud commands:
- cloud mount
- cloud unmount
- cloud mount-status
- bisync-setup
- Individual bisync command (moved to project bisync)
# Remove from root commands:
- bm sync (without project specification)
- bm bisync (without project specification)
```
**5.2 Remove Deprecated Code**
```python
# Files to remove:
- mount_commands.py (entire file)
# Functions to remove from rclone_config.py:
- MOUNT_PROFILES
- get_default_mount_path()
- build_mount_command()
- is_path_mounted()
- get_rclone_processes()
- kill_rclone_process()
- unmount_path()
- cleanup_orphaned_rclone_processes()
# Functions to remove from bisync_commands.py:
- BISYNC_PROFILES (use single default)
- setup_cloud_bisync() (replaced by cloud setup)
- run_bisync_watch() (can add back to project bisync if needed)
- show_bisync_status() (replaced by project list showing sync status)
```
**5.3 Update Configuration Schema**
```python
# Remove from config.json:
- bisync_config (no longer needed)
# The projects array is the source of truth for sync configuration
```
### Phase 6: Documentation Updates
**6.1 Update CLI Documentation**
```markdown
# docs/cloud-cli.md
## Project-Scoped Cloud Sync
Basic Memory cloud sync is project-scoped - each project can optionally be configured with a local working directory that syncs with the cloud.
### Setup (One Time)
1. Authenticate and enable cloud mode:
```bash
bm cloud login
```
2. Install rclone and configure credentials:
```bash
bm cloud setup
```
### Create Projects with Sync
Create a cloud project with optional local sync:
```bash
# Create project without local sync
bm project add research
# Create project with local sync
bm project add research --local ~/Documents/research
```
Or configure sync for existing (remote) project:
```bash
bm project sync-setup research ~/Documents/research
```
### Syncing Projects
**Two-way sync (recommended):**
```bash
# First time - establish baseline
bm project bisync --name research --resync
# Subsequent syncs
bm project bisync --name research
# Sync all projects with local_sync_path configured
bm project bisync --all
```
**One-way sync (local → cloud):**
```bash
bm project sync --name research
```
**Verify integrity:**
```bash
bm project check --name research
```
### Advanced Operations
**List remote files:**
```bash
bm project ls --name research
bm project ls --name research subfolder
```
**Preview changes before syncing:**
```bash
bm project bisync --name research --dry-run
```
**Verbose output for debugging:**
```bash
bm project bisync --name research --verbose
```
### Project Management
**List projects (shows sync status):**
```bash
bm project list
```
**Update sync path:**
```bash
bm project sync-setup research ~/new/path
```
**Remove project:**
```bash
bm project remove research
```
```
**6.2 Update SPEC-8**
Add to SPEC-8's "Implementation Notes" section:
```markdown
## Superseded by SPEC-20
The initial implementation in SPEC-8 proved too complex with multiple footguns:
- Mount vs bisync workflow confusion
- Multiple profiles creating decision paralysis
- Directory conflicts and auto-discovery errors
SPEC-20 supersedes the sync implementation with a simplified project-scoped approach while keeping the core Tigris infrastructure from SPEC-8.
```
## How to Evaluate
### Success Criteria
**1. Simplified Setup**
- [ ] `bm cloud setup` completes in one command
- [ ] Creates single rclone remote named `bm-cloud`
- [ ] No profile selection required
- [ ] Clear next steps printed after setup
**2. Clear Project Model**
- [ ] Projects can be created with or without local sync
- [ ] `bm project list` shows sync status for each project
- [ ] `local_sync_path` stored in project config
- [ ] Renaming local folder doesn't break sync (config is source of truth)
**3. Working Sync Operations**
- [ ] `bm project sync --name <project>` performs one-way sync
- [ ] `bm project bisync --name <project>` performs two-way sync
- [ ] `bm project check --name <project>` verifies integrity
- [ ] `--all` flag syncs all configured projects
- [ ] `--dry-run` shows changes without applying
- [ ] First bisync requires `--resync` with clear error message
**4. Safety**
- [ ] Cannot sync project without `local_sync_path` configured
- [ ] Bisync state is per-project (not global)
- [ ] `.bmignore` patterns respected
- [ ] Max delete safety (25 files) prevents accidents
- [ ] Clear error messages for all failure modes
**5. Clean Removal**
- [ ] Mount commands removed
- [ ] Profile selection removed
- [ ] Global sync directory removed (`~/basic-memory-cloud-sync/`)
- [ ] Auto-discovery removed
- [ ] Convenience commands (`bm sync`) removed
**6. Documentation**
- [ ] Updated cloud-cli.md with new workflow
- [ ] Clear examples for common operations
- [ ] Migration guide for existing users
- [ ] Troubleshooting section
### Test Scenarios
**Scenario 1: New User Setup**
```bash
# Start fresh
bm cloud login
bm cloud setup
bm project add research --local ~/docs/research
bm project bisync --name research --resync
# Edit files locally
bm project bisync --name research
# Verify changes synced
```
**Scenario 2: Multiple Projects**
```bash
bm project add work --local ~/work
bm project add personal --local ~/personal
bm project bisync --all --resync
# Edit files in both projects
bm project bisync --all
```
**Scenario 3: Project Without Sync**
```bash
bm project add temp-notes
# Try to sync (should fail gracefully)
bm project bisync --name temp-notes
# Should see: "Project temp-notes has no local_sync_path configured"
```
**Scenario 4: Integrity Check**
```bash
bm project bisync --name research
# Manually edit file in cloud UI
bm project check --name research
# Should report differences
bm project bisync --name research
# Should sync changes back to local
```
**Scenario 5: Safety Features**
```bash
# Delete 30 files locally
bm project sync --name research
# Should fail with max delete error
# User reviews and confirms
bm project sync --name research # After confirming
```
### Performance Targets
- Setup completes in < 30 seconds
- Single project sync < 5 seconds for small changes
- Bisync initialization (--resync) < 10 seconds for typical project
- Batch sync (--all) processes N projects in N*5 seconds
### Breaking Changes
This is a **breaking change** from SPEC-8 implementation:
**Migration Required:**
- Users must run `bm cloud setup` again
- Existing `~/basic-memory-cloud-sync/` directory abandoned
- Projects must be configured with `local_sync_path`
- Mount users must switch to bisync workflow
**Migration Guide:**
```bash
# 1. Note current project locations
bm project list
# 2. Re-run setup
bm cloud setup
# 3. Configure sync for each project
bm project sync-setup research ~/Documents/research
bm project sync-setup work ~/work
# 4. Establish baselines
bm project bisync --all --resync
# 5. Old directory can be deleted
rm -rf ~/basic-memory-cloud-sync/
```
## Dependencies
- **SPEC-8**: TigrisFS Integration (bucket provisioning, credentials)
- Python 3.12+
- rclone 1.64.0+
- Typer (CLI framework)
- Rich (console output)
## Risks
**Risk 1: User Confusion from Breaking Changes**
- Mitigation: Clear migration guide, version bump (0.16.0)
- Mitigation: Detect old config and print migration instructions
**Risk 2: Per-Project Bisync State Complexity**
- Mitigation: Use rclone's `--workdir` to isolate state per project
- Mitigation: Store in `~/.basic-memory/bisync-state/{project_name}/`
**Risk 3: Batch Operations Performance**
- Mitigation: Run syncs sequentially with progress indicators
- Mitigation: Add `--parallel` flag in future if needed
**Risk 4: Lost Features (Mount)**
- Mitigation: Document mount as experimental/advanced feature
- Mitigation: Can restore if users request it
## Open Questions
1. **Should we keep mount as experimental command?**
- Lean toward: Remove entirely, focus on bisync
- Alternative: Keep as `bm project mount --name <project>` (advanced)
- Answer: remove
2. **Batch sync order?**
- Alphabetical by project name?
- By last modified time?
- Let user specify order?
- answer: project order from api or config
3. **Credential refresh?**
- Auto-detect expired credentials and re-run credential generation?
- Or require manual `bm cloud setup` again?
- answer: manual setup is fine
4. **Watch mode for projects?**
- `bm project bisync --name research --watch`?
- Or removed entirely (users can use OS tools)?
- answer: remove for now: we can add it back later if it's useful
5. **Project path validation?**
- Ensure `local_path` exists before allowing bisync?
- Or let rclone error naturally?
- answer: create if needed, exists is ok
## Implementation Checklist
### Phase 1: Config Schema (1-2 days) ✅
- [x] Add `CloudProjectConfig` model to `basic_memory/config.py`
- [x] Add `cloud_projects: dict[str, CloudProjectConfig]` to Config model
- [x] Test config loading/saving with new schema
- [x] Handle migration from old config format
### Phase 2: Rclone Config Simplification ✅
- [x] Update `configure_rclone_remote()` to use `basic-memory-cloud` as remote name
- [x] Remove `add_tenant_to_rclone_config()` (replaced by configure_rclone_remote)
- [x] Remove tenant_id from remote naming
- [x] Test rclone config generation
- [x] Clean up deprecated import references in bisync_commands.py and core_commands.py
### Phase 3: Project-Scoped Rclone Commands ✅
- [x] Create `src/basic_memory/cli/commands/cloud/rclone_commands.py`
- [x] Implement `get_project_remote(project, bucket_name)`
- [x] Implement `project_sync()` (one-way: local → cloud)
- [x] Implement `project_bisync()` (two-way: local ↔ cloud)
- [x] Implement `project_check()` (integrity verification)
- [x] Implement `project_ls()` (list remote files)
- [x] Add helper: `get_project_bisync_state(project_name)`
- [x] Add helper: `bisync_initialized(project_name)`
- [x] Add helper: `get_bmignore_filter_path()`
- [x] Add `SyncProject` dataclass for project representation
- [x] Write unit tests for rclone commands (22 tests, 99% coverage)
- [x] Temporarily disable mount commands in core_commands.py
### Phase 4: CLI Integration ✅
- [x] Update `project.py`: Add `--local-path` flag to `project add` command
- [x] Update `project.py`: Create `project sync-setup` command
- [x] Create `project.py`: Add `project sync` command
- [x] Create `project.py`: Add `project bisync` command
- [x] Create `project.py`: Add `project check` command
- [x] Create `project.py`: Add `project ls` command
- [x] Create `project.py`: Add `project bisync-reset` command
- [x] Import rclone_commands module and get_mount_info helper
- [x] Update `project list` to show local sync paths in cloud mode
- [x] Update `project list` to conditionally show columns based on config
- [x] Update `project remove` to clean up local directories and bisync state
- [x] Add automatic database sync trigger after file sync operations
- [x] Add path normalization to prevent S3 mount point leakage
- [x] Update `cloud/core_commands.py`: Simplified `cloud setup` command
- [x] Write unit tests for `project add --local-path` (4 tests passing)
### Phase 5: Cleanup ✅
- [x] Remove `mount_commands.py` (entire file)
- [x] Remove mount-related functions from `rclone_config.py`:
- [x] `MOUNT_PROFILES`
- [x] `get_default_mount_path()`
- [x] `build_mount_command()`
- [x] `is_path_mounted()`
- [x] `get_rclone_processes()`
- [x] `kill_rclone_process()`
- [x] `unmount_path()`
- [x] `cleanup_orphaned_rclone_processes()`
- [x] Remove from `bisync_commands.py`:
- [x] `BISYNC_PROFILES` (use single default)
- [x] `setup_cloud_bisync()`
- [x] `run_bisync_watch()`
- [x] `show_bisync_status()`
- [x] `run_bisync()`
- [x] `run_check()`
- [x] Remove `bisync_config` from config schema
- [x] Remove deprecated cloud commands:
- [x] `cloud mount`
- [x] `cloud unmount`
- [x] Simplified `cloud setup` to just install rclone and configure credentials
- [x] Remove convenience commands:
- [x] Root-level `bm sync` (removed - confusing in cloud mode, automatic in local mode)
- [x] Update tests to remove references to deprecated functionality
- [x] All typecheck errors resolved
### Phase 6: Documentation ✅
- [x] Update `docs/cloud-cli.md` with new workflow
- [x] Add troubleshooting section for empty directory issues
- [x] Add troubleshooting section for bisync state corruption
- [x] Document `bisync-reset` command usage
- [x] Update command reference with all new commands
- [x] Add examples for common workflows
- [ ] Add migration guide for existing users (deferred - no users on old system yet)
- [ ] Update SPEC-8 with "Superseded by SPEC-20" note (deferred)
### Testing & Validation ✅
- [x] Test Scenario 1: New user setup (manual testing complete)
- [x] Test Scenario 2: Multiple projects (manual testing complete)
- [x] Test Scenario 3: Project without sync (manual testing complete)
- [x] Test Scenario 4: Integrity check (manual testing complete)
- [x] Test Scenario 5: bisync-reset command (manual testing complete)
- [x] Test cleanup on remove (manual testing complete)
- [x] Verify all commands work end-to-end
- [x] Document known issues (empty directory bisync limitation)
- [ ] Automated integration tests (deferred)
- [ ] Test migration from SPEC-8 implementation (N/A - no users yet)
## Implementation Notes
### Key Improvements Added During Implementation
**1. Path Normalization (Critical Bug Fix)**
**Problem:** Files were syncing to `/app/data/app/data/project/` instead of `/app/data/project/`
**Root cause:**
- S3 bucket contains projects directly (e.g., `basic-memory-llc/`)
- Fly machine mounts bucket at `/app/data/`
- API returns paths like `/app/data/basic-memory-llc` (mount point + project)
- Rclone was using this full path, causing path doubling
**Solution (three layers):**
- API side: Added `normalize_project_path()` in `project_router.py` to strip `/app/data/` prefix
- CLI side: Added defensive normalization in `project.py` commands
- Rclone side: Updated `get_project_remote()` to strip prefix before building remote path
**Files modified:**
- `src/basic_memory/api/routers/project_router.py` - API normalization
- `src/basic_memory/cli/commands/project.py` - CLI normalization
- `src/basic_memory/cli/commands/cloud/rclone_commands.py` - Rclone remote path construction
**2. Automatic Database Sync After File Operations**
**Enhancement:** After successful file sync or bisync, automatically trigger database sync via API
**Implementation:**
- After `project sync`: POST to `/{project}/project/sync`
- After `project bisync`: POST to `/{project}/project/sync` + update config timestamps
- Skip trigger on `--dry-run`
- Graceful error handling with warnings
**Benefit:** Files and database stay in sync automatically without manual intervention
**3. Enhanced Project Removal with Cleanup**
**Enhancement:** `bm project remove` now properly cleans up local artifacts
**Behavior with `--delete-notes`:**
- ✓ Removes project from cloud API
- ✓ Deletes cloud files
- ✓ Removes local sync directory
- ✓ Removes bisync state directory
- ✓ Removes `cloud_projects` config entry
**Behavior without `--delete-notes`:**
- ✓ Removes project from cloud API
- ✗ Keeps local files (shows path in message)
- ✓ Removes bisync state directory (cleanup)
- ✓ Removes `cloud_projects` config entry
**Files modified:**
- `src/basic_memory/cli/commands/project.py` - Enhanced `remove_project()` function
**4. Bisync State Reset Command**
**New command:** `bm project bisync-reset <project>`
**Purpose:** Clear bisync state when it becomes corrupted (e.g., after mixing dry-run and actual runs)
**What it does:**
- Removes all bisync metadata from `~/.basic-memory/bisync-state/{project}/`
- Forces fresh baseline on next `--resync`
- Safe operation (doesn't touch files)
- Also runs automatically on project removal
**Files created:**
- Added `bisync-reset` command to `src/basic_memory/cli/commands/project.py`
**5. Improved UI for Project List**
**Enhancements:**
- Shows "Local Path" column in cloud mode for projects with sync configured
- Conditionally shows/hides columns based on config:
- Local Path: only in cloud mode
- Default: only when `default_project_mode` is True
- Uses `no_wrap=True, overflow="fold"` to prevent path truncation
- Applies path normalization to prevent showing mount point details
**Files modified:**
- `src/basic_memory/cli/commands/project.py` - Enhanced `list_projects()` function
**6. Documentation of Known Issues**
**Issue documented:** Rclone bisync limitation with empty directories
**Problem:** "Empty prior Path1 listing. Cannot sync to an empty directory"
**Explanation:** Bisync creates listing files that track state. When both directories are completely empty, these listing files are considered invalid.
**Solution documented:** Add at least one file (like README.md) before running `--resync`
**Files updated:**
- `docs/cloud-cli.md` - Added troubleshooting sections for:
- Empty directory issues
- Bisync state corruption
- Usage of `bisync-reset` command
### Rclone Flag Fix
**Bug fix:** Incorrect rclone flag causing sync failures
**Error:** `unknown flag: --filters-file`
**Fix:** Changed `--filters-file` to correct flag `--filter-from` in both `project_sync()` and `project_bisync()` functions
**Files modified:**
- `src/basic_memory/cli/commands/cloud/rclone_commands.py`
### Test Coverage
**Unit tests added:**
- `tests/cli/test_project_add_with_local_path.py` - 4 tests for `--local-path` functionality
- Test with local path saves to config
- Test without local path doesn't save to config
- Test tilde expansion in paths
- Test nested directory creation
**Manual testing completed:**
- All 10 project commands tested end-to-end
- Path normalization verified
- Database sync trigger verified
- Cleanup on remove verified
- Bisync state reset verified
## Future Enhancements (Out of Scope)
- **Per-project rclone profiles**: Allow advanced users to override defaults
- **Conflict resolution UI**: Interactive conflict resolution for bisync
- **Sync scheduling**: Automatic periodic sync without watch mode
- **Sync analytics**: Track sync frequency, data transferred, etc.
- **Multi-machine coordination**: Detect and warn about concurrent edits from different machines
```
--------------------------------------------------------------------------------
/tests/repository/test_entity_repository.py:
--------------------------------------------------------------------------------
```python
"""Tests for the EntityRepository."""
from datetime import datetime, timezone
import pytest
import pytest_asyncio
from sqlalchemy import select
from basic_memory import db
from basic_memory.models import Entity, Observation, Relation, Project
from basic_memory.repository.entity_repository import EntityRepository
from basic_memory.utils import generate_permalink
@pytest_asyncio.fixture
async def entity_with_observations(session_maker, sample_entity):
"""Create an entity with observations."""
async with db.scoped_session(session_maker) as session:
observations = [
Observation(
project_id=sample_entity.project_id,
entity_id=sample_entity.id,
content="First observation",
),
Observation(
project_id=sample_entity.project_id,
entity_id=sample_entity.id,
content="Second observation",
),
]
session.add_all(observations)
return sample_entity
@pytest_asyncio.fixture
async def related_results(session_maker, test_project: Project):
"""Create entities with relations between them."""
async with db.scoped_session(session_maker) as session:
source = Entity(
project_id=test_project.id,
title="source",
entity_type="test",
permalink="source/source",
file_path="source/source.md",
content_type="text/markdown",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
target = Entity(
project_id=test_project.id,
title="target",
entity_type="test",
permalink="target/target",
file_path="target/target.md",
content_type="text/markdown",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
session.add(source)
session.add(target)
await session.flush()
relation = Relation(
project_id=test_project.id,
from_id=source.id,
to_id=target.id,
to_name=target.title,
relation_type="connects_to",
)
session.add(relation)
return source, target, relation
@pytest.mark.asyncio
async def test_create_entity(entity_repository: EntityRepository):
"""Test creating a new entity"""
entity_data = {
"project_id": entity_repository.project_id,
"title": "Test",
"entity_type": "test",
"permalink": "test/test",
"file_path": "test/test.md",
"content_type": "text/markdown",
"created_at": datetime.now(timezone.utc),
"updated_at": datetime.now(timezone.utc),
}
entity = await entity_repository.create(entity_data)
# Verify returned object
assert entity.id is not None
assert entity.title == "Test"
assert isinstance(entity.created_at, datetime)
assert isinstance(entity.updated_at, datetime)
# Verify in database
found = await entity_repository.find_by_id(entity.id)
assert found is not None
assert found.id is not None
assert found.id == entity.id
assert found.title == entity.title
# assert relations are eagerly loaded
assert len(entity.observations) == 0
assert len(entity.relations) == 0
@pytest.mark.asyncio
async def test_create_all(entity_repository: EntityRepository):
"""Test creating a new entity"""
entity_data = [
{
"project_id": entity_repository.project_id,
"title": "Test_1",
"entity_type": "test",
"permalink": "test/test-1",
"file_path": "test/test_1.md",
"content_type": "text/markdown",
"created_at": datetime.now(timezone.utc),
"updated_at": datetime.now(timezone.utc),
},
{
"project_id": entity_repository.project_id,
"title": "Test-2",
"entity_type": "test",
"permalink": "test/test-2",
"file_path": "test/test_2.md",
"content_type": "text/markdown",
"created_at": datetime.now(timezone.utc),
"updated_at": datetime.now(timezone.utc),
},
]
entities = await entity_repository.create_all(entity_data)
assert len(entities) == 2
entity = entities[0]
# Verify in database
found = await entity_repository.find_by_id(entity.id)
assert found is not None
assert found.id is not None
assert found.id == entity.id
assert found.title == entity.title
# assert relations are eagerly loaded
assert len(entity.observations) == 0
assert len(entity.relations) == 0
@pytest.mark.asyncio
async def test_find_by_id(entity_repository: EntityRepository, sample_entity: Entity):
"""Test finding an entity by ID"""
found = await entity_repository.find_by_id(sample_entity.id)
assert found is not None
assert found.id == sample_entity.id
assert found.title == sample_entity.title
# Verify against direct database query
async with db.scoped_session(entity_repository.session_maker) as session:
stmt = select(Entity).where(Entity.id == sample_entity.id)
result = await session.execute(stmt)
db_entity = result.scalar_one()
assert db_entity.id == found.id
assert db_entity.title == found.title
@pytest.mark.asyncio
async def test_update_entity(entity_repository: EntityRepository, sample_entity: Entity):
"""Test updating an entity"""
updated = await entity_repository.update(sample_entity.id, {"title": "Updated title"})
assert updated is not None
assert updated.title == "Updated title"
# Verify in database
async with db.scoped_session(entity_repository.session_maker) as session:
stmt = select(Entity).where(Entity.id == sample_entity.id)
result = await session.execute(stmt)
db_entity = result.scalar_one()
assert db_entity.title == "Updated title"
@pytest.mark.asyncio
async def test_update_entity_returns_with_relations_and_observations(
entity_repository: EntityRepository, entity_with_observations, test_project: Project
):
"""Test that update() returns entity with observations and relations eagerly loaded."""
entity = entity_with_observations
# Create a target entity and relation
async with db.scoped_session(entity_repository.session_maker) as session:
target = Entity(
project_id=test_project.id,
title="target",
entity_type="test",
permalink="target/target",
file_path="target/target.md",
content_type="text/markdown",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
session.add(target)
await session.flush()
relation = Relation(
project_id=test_project.id,
from_id=entity.id,
to_id=target.id,
to_name=target.title,
relation_type="connects_to",
)
session.add(relation)
# Now update the entity
updated = await entity_repository.update(entity.id, {"title": "Updated with relations"})
# Verify returned entity has observations and relations accessible
# (would raise DetachedInstanceError if not eagerly loaded)
assert updated is not None
assert updated.title == "Updated with relations"
# Access observations - should NOT raise DetachedInstanceError
assert len(updated.observations) == 2
assert updated.observations[0].content in ["First observation", "Second observation"]
# Access relations - should NOT raise DetachedInstanceError
assert len(updated.relations) == 1
assert updated.relations[0].relation_type == "connects_to"
assert updated.relations[0].to_name == "target"
@pytest.mark.asyncio
async def test_delete_entity(entity_repository: EntityRepository, sample_entity):
"""Test deleting an entity."""
result = await entity_repository.delete(sample_entity.id)
assert result is True
# Verify deletion
deleted = await entity_repository.find_by_id(sample_entity.id)
assert deleted is None
@pytest.mark.asyncio
async def test_delete_entity_with_observations(
entity_repository: EntityRepository, entity_with_observations
):
"""Test deleting an entity cascades to its observations."""
entity = entity_with_observations
result = await entity_repository.delete(entity.id)
assert result is True
# Verify entity deletion
deleted = await entity_repository.find_by_id(entity.id)
assert deleted is None
# Verify observations were cascaded
async with db.scoped_session(entity_repository.session_maker) as session:
query = select(Observation).filter(Observation.entity_id == entity.id)
result = await session.execute(query)
remaining_observations = result.scalars().all()
assert len(remaining_observations) == 0
@pytest.mark.asyncio
async def test_delete_entities_by_type(entity_repository: EntityRepository, sample_entity):
"""Test deleting entities by type."""
result = await entity_repository.delete_by_fields(entity_type=sample_entity.entity_type)
assert result is True
# Verify deletion
async with db.scoped_session(entity_repository.session_maker) as session:
query = select(Entity).filter(Entity.entity_type == sample_entity.entity_type)
result = await session.execute(query)
remaining = result.scalars().all()
assert len(remaining) == 0
@pytest.mark.asyncio
async def test_delete_entity_with_relations(entity_repository: EntityRepository, related_results):
"""Test deleting an entity cascades to its relations."""
source, target, relation = related_results
# Delete source entity
result = await entity_repository.delete(source.id)
assert result is True
# Verify relation was cascaded
async with db.scoped_session(entity_repository.session_maker) as session:
query = select(Relation).filter(Relation.from_id == source.id)
result = await session.execute(query)
remaining_relations = result.scalars().all()
assert len(remaining_relations) == 0
# Verify target entity still exists
target_exists = await entity_repository.find_by_id(target.id)
assert target_exists is not None
@pytest.mark.asyncio
async def test_delete_nonexistent_entity(entity_repository: EntityRepository):
"""Test deleting an entity that doesn't exist."""
result = await entity_repository.delete(0)
assert result is False
@pytest_asyncio.fixture
async def test_entities(session_maker, test_project: Project):
"""Create multiple test entities."""
async with db.scoped_session(session_maker) as session:
entities = [
Entity(
project_id=test_project.id,
title="entity1",
entity_type="test",
permalink="type1/entity1",
file_path="type1/entity1.md",
content_type="text/markdown",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
),
Entity(
project_id=test_project.id,
title="entity2",
entity_type="test",
permalink="type1/entity2",
file_path="type1/entity2.md",
content_type="text/markdown",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
),
Entity(
project_id=test_project.id,
title="entity3",
entity_type="test",
permalink="type2/entity3",
file_path="type2/entity3.md",
content_type="text/markdown",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
),
]
session.add_all(entities)
return entities
@pytest.mark.asyncio
async def test_find_by_permalinks(entity_repository: EntityRepository, test_entities):
"""Test finding multiple entities by their type/name pairs."""
# Test finding multiple entities
permalinks = [e.permalink for e in test_entities]
found = await entity_repository.find_by_permalinks(permalinks)
assert len(found) == 3
names = {e.title for e in found}
assert names == {"entity1", "entity2", "entity3"}
# Test finding subset of entities
permalinks = [e.permalink for e in test_entities if e.title != "entity2"]
found = await entity_repository.find_by_permalinks(permalinks)
assert len(found) == 2
names = {e.title for e in found}
assert names == {"entity1", "entity3"}
# Test with non-existent entities
permalinks = ["type1/entity1", "type3/nonexistent"]
found = await entity_repository.find_by_permalinks(permalinks)
assert len(found) == 1
assert found[0].title == "entity1"
# Test empty input
found = await entity_repository.find_by_permalinks([])
assert len(found) == 0
@pytest.mark.asyncio
async def test_generate_permalink_from_file_path():
"""Test permalink generation from different file paths."""
test_cases = [
("docs/My Feature.md", "docs/my-feature"),
("specs/API (v2).md", "specs/api-v2"),
("notes/2024/Q1 Planning!!!.md", "notes/2024/q1-planning"),
("test/Über File.md", "test/uber-file"),
("docs/my_feature_name.md", "docs/my-feature-name"),
("specs/multiple--dashes.md", "specs/multiple-dashes"),
("notes/trailing/space/ file.md", "notes/trailing/space/file"),
]
for input_path, expected in test_cases:
result = generate_permalink(input_path)
assert result == expected, f"Failed for {input_path}"
# Verify the result passes validation
Entity(
title="test",
entity_type="test",
permalink=result,
file_path=input_path,
content_type="text/markdown",
) # This will raise ValueError if invalid
@pytest.mark.asyncio
async def test_get_by_title(entity_repository: EntityRepository, session_maker):
"""Test getting an entity by title."""
# Create test entities
async with db.scoped_session(session_maker) as session:
entities = [
Entity(
project_id=entity_repository.project_id,
title="Unique Title",
entity_type="test",
permalink="test/unique-title",
file_path="test/unique-title.md",
content_type="text/markdown",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
),
Entity(
project_id=entity_repository.project_id,
title="Another Title",
entity_type="test",
permalink="test/another-title",
file_path="test/another-title.md",
content_type="text/markdown",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
),
Entity(
project_id=entity_repository.project_id,
title="Another Title",
entity_type="test",
permalink="test/another-title-1",
file_path="test/another-title-1.md",
content_type="text/markdown",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
),
]
session.add_all(entities)
await session.flush()
# Test getting by exact title
found = await entity_repository.get_by_title("Unique Title")
assert found is not None
assert len(found) == 1
assert found[0].title == "Unique Title"
# Test case sensitivity
found = await entity_repository.get_by_title("unique title")
assert not found # Should be case-sensitive
# Test non-existent title
found = await entity_repository.get_by_title("Non Existent")
assert not found
# Test multiple rows found
found = await entity_repository.get_by_title("Another Title")
assert len(found) == 2
@pytest.mark.asyncio
async def test_get_by_file_path(entity_repository: EntityRepository, session_maker):
"""Test getting an entity by title."""
# Create test entities
async with db.scoped_session(session_maker) as session:
entities = [
Entity(
project_id=entity_repository.project_id,
title="Unique Title",
entity_type="test",
permalink="test/unique-title",
file_path="test/unique-title.md",
content_type="text/markdown",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
),
]
session.add_all(entities)
await session.flush()
# Test getting by file_path
found = await entity_repository.get_by_file_path("test/unique-title.md")
assert found is not None
assert found.title == "Unique Title"
# Test non-existent file_path
found = await entity_repository.get_by_file_path("not/a/real/file.md")
assert found is None
@pytest.mark.asyncio
async def test_get_distinct_directories(entity_repository: EntityRepository, session_maker):
"""Test getting distinct directory paths from entity file paths."""
# Create test entities with various directory structures
async with db.scoped_session(session_maker) as session:
entities = [
Entity(
project_id=entity_repository.project_id,
title="File 1",
entity_type="test",
permalink="docs/guides/file1",
file_path="docs/guides/file1.md",
content_type="text/markdown",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
),
Entity(
project_id=entity_repository.project_id,
title="File 2",
entity_type="test",
permalink="docs/guides/file2",
file_path="docs/guides/file2.md",
content_type="text/markdown",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
),
Entity(
project_id=entity_repository.project_id,
title="File 3",
entity_type="test",
permalink="docs/api/file3",
file_path="docs/api/file3.md",
content_type="text/markdown",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
),
Entity(
project_id=entity_repository.project_id,
title="File 4",
entity_type="test",
permalink="specs/file4",
file_path="specs/file4.md",
content_type="text/markdown",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
),
Entity(
project_id=entity_repository.project_id,
title="File 5",
entity_type="test",
permalink="notes/2024/q1/file5",
file_path="notes/2024/q1/file5.md",
content_type="text/markdown",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
),
]
session.add_all(entities)
await session.flush()
# Get distinct directories
directories = await entity_repository.get_distinct_directories()
# Verify directories are extracted correctly
assert isinstance(directories, list)
assert len(directories) > 0
# Should include all parent directories but not filenames
expected_dirs = {
"docs",
"docs/guides",
"docs/api",
"notes",
"notes/2024",
"notes/2024/q1",
"specs",
}
assert set(directories) == expected_dirs
# Verify results are sorted
assert directories == sorted(directories)
# Verify no file paths are included
for dir_path in directories:
assert not dir_path.endswith(".md")
@pytest.mark.asyncio
async def test_get_distinct_directories_empty_db(entity_repository: EntityRepository):
"""Test getting distinct directories when database is empty."""
directories = await entity_repository.get_distinct_directories()
assert directories == []
@pytest.mark.asyncio
async def test_find_by_directory_prefix(entity_repository: EntityRepository, session_maker):
"""Test finding entities by directory prefix."""
# Create test entities in various directories
async with db.scoped_session(session_maker) as session:
entities = [
Entity(
project_id=entity_repository.project_id,
title="File 1",
entity_type="test",
permalink="docs/file1",
file_path="docs/file1.md",
content_type="text/markdown",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
),
Entity(
project_id=entity_repository.project_id,
title="File 2",
entity_type="test",
permalink="docs/guides/file2",
file_path="docs/guides/file2.md",
content_type="text/markdown",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
),
Entity(
project_id=entity_repository.project_id,
title="File 3",
entity_type="test",
permalink="docs/api/file3",
file_path="docs/api/file3.md",
content_type="text/markdown",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
),
Entity(
project_id=entity_repository.project_id,
title="File 4",
entity_type="test",
permalink="specs/file4",
file_path="specs/file4.md",
content_type="text/markdown",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
),
]
session.add_all(entities)
await session.flush()
# Test finding all entities in "docs" directory and subdirectories
docs_entities = await entity_repository.find_by_directory_prefix("docs")
assert len(docs_entities) == 3
file_paths = {e.file_path for e in docs_entities}
assert file_paths == {"docs/file1.md", "docs/guides/file2.md", "docs/api/file3.md"}
# Test finding entities in "docs/guides" subdirectory
guides_entities = await entity_repository.find_by_directory_prefix("docs/guides")
assert len(guides_entities) == 1
assert guides_entities[0].file_path == "docs/guides/file2.md"
# Test finding entities in "specs" directory
specs_entities = await entity_repository.find_by_directory_prefix("specs")
assert len(specs_entities) == 1
assert specs_entities[0].file_path == "specs/file4.md"
# Test with root directory (empty string)
all_entities = await entity_repository.find_by_directory_prefix("")
assert len(all_entities) == 4
# Test with root directory (slash)
all_entities = await entity_repository.find_by_directory_prefix("/")
assert len(all_entities) == 4
# Test with non-existent directory
nonexistent = await entity_repository.find_by_directory_prefix("nonexistent")
assert len(nonexistent) == 0
@pytest.mark.asyncio
async def test_find_by_directory_prefix_basic_fields_only(
entity_repository: EntityRepository, session_maker
):
"""Test that find_by_directory_prefix returns basic entity fields.
Note: This method uses use_query_options=False for performance,
so it doesn't eager load relationships. Directory trees only need
basic entity fields.
"""
# Create test entity
async with db.scoped_session(session_maker) as session:
entity = Entity(
project_id=entity_repository.project_id,
title="Test Entity",
entity_type="test",
permalink="docs/test",
file_path="docs/test.md",
content_type="text/markdown",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
session.add(entity)
await session.flush()
# Query entity by directory prefix
entities = await entity_repository.find_by_directory_prefix("docs")
assert len(entities) == 1
# Verify basic fields are present (all we need for directory trees)
entity = entities[0]
assert entity.title == "Test Entity"
assert entity.file_path == "docs/test.md"
assert entity.permalink == "docs/test"
assert entity.entity_type == "test"
assert entity.content_type == "text/markdown"
assert entity.updated_at is not None
@pytest.mark.asyncio
async def test_get_all_file_paths(entity_repository: EntityRepository, session_maker):
"""Test getting all file paths for deletion detection during sync."""
# Create test entities with various file paths
async with db.scoped_session(session_maker) as session:
entities = [
Entity(
project_id=entity_repository.project_id,
title="File 1",
entity_type="test",
permalink="docs/file1",
file_path="docs/file1.md",
content_type="text/markdown",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
),
Entity(
project_id=entity_repository.project_id,
title="File 2",
entity_type="test",
permalink="specs/file2",
file_path="specs/file2.md",
content_type="text/markdown",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
),
Entity(
project_id=entity_repository.project_id,
title="File 3",
entity_type="test",
permalink="notes/file3",
file_path="notes/file3.md",
content_type="text/markdown",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
),
]
session.add_all(entities)
await session.flush()
# Get all file paths
file_paths = await entity_repository.get_all_file_paths()
# Verify results
assert isinstance(file_paths, list)
assert len(file_paths) == 3
assert set(file_paths) == {"docs/file1.md", "specs/file2.md", "notes/file3.md"}
@pytest.mark.asyncio
async def test_get_all_file_paths_empty_db(entity_repository: EntityRepository):
"""Test getting all file paths when database is empty."""
file_paths = await entity_repository.get_all_file_paths()
assert file_paths == []
@pytest.mark.asyncio
async def test_get_all_file_paths_performance(entity_repository: EntityRepository, session_maker):
"""Test that get_all_file_paths doesn't load entities or relationships.
This method is optimized for deletion detection during streaming sync.
It should only query file_path strings, not full entity objects.
"""
# Create test entity with observations and relations
async with db.scoped_session(session_maker) as session:
# Create entities
entity1 = Entity(
project_id=entity_repository.project_id,
title="Entity 1",
entity_type="test",
permalink="test/entity1",
file_path="test/entity1.md",
content_type="text/markdown",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
entity2 = Entity(
project_id=entity_repository.project_id,
title="Entity 2",
entity_type="test",
permalink="test/entity2",
file_path="test/entity2.md",
content_type="text/markdown",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
session.add_all([entity1, entity2])
await session.flush()
# Add observations to entity1
observation = Observation(
project_id=entity_repository.project_id,
entity_id=entity1.id,
content="Test observation",
category="note",
)
session.add(observation)
# Add relation between entities
relation = Relation(
project_id=entity_repository.project_id,
from_id=entity1.id,
to_id=entity2.id,
to_name=entity2.title,
relation_type="relates_to",
)
session.add(relation)
await session.flush()
# Get all file paths - should be fast and not load relationships
file_paths = await entity_repository.get_all_file_paths()
# Verify results - just file paths, no entities or relationships loaded
assert len(file_paths) == 2
assert set(file_paths) == {"test/entity1.md", "test/entity2.md"}
# Result should be list of strings, not entity objects
for path in file_paths:
assert isinstance(path, str)
@pytest.mark.asyncio
async def test_get_all_file_paths_project_isolation(
entity_repository: EntityRepository, session_maker
):
"""Test that get_all_file_paths only returns paths from the current project."""
# Create entities in the repository's project
async with db.scoped_session(session_maker) as session:
entity1 = Entity(
project_id=entity_repository.project_id,
title="Project 1 File",
entity_type="test",
permalink="test/file1",
file_path="test/file1.md",
content_type="text/markdown",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
session.add(entity1)
await session.flush()
# Create a second project
project2 = Project(name="other-project", path="/tmp/other")
session.add(project2)
await session.flush()
# Create entity in different project
entity2 = Entity(
project_id=project2.id,
title="Project 2 File",
entity_type="test",
permalink="test/file2",
file_path="test/file2.md",
content_type="text/markdown",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
session.add(entity2)
await session.flush()
# Get all file paths for project 1
file_paths = await entity_repository.get_all_file_paths()
# Should only include files from project 1
assert len(file_paths) == 1
assert file_paths == ["test/file1.md"]
# -------------------------------------------------------------------------
# Tests for lightweight permalink resolution methods
# -------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_permalink_exists(entity_repository: EntityRepository, sample_entity: Entity):
"""Test checking if a permalink exists without loading full entity."""
# Existing permalink should return True
assert await entity_repository.permalink_exists(sample_entity.permalink) is True # pyright: ignore [reportArgumentType]
# Non-existent permalink should return False
assert await entity_repository.permalink_exists("nonexistent/permalink") is False
@pytest.mark.asyncio
async def test_permalink_exists_project_isolation(
entity_repository: EntityRepository, session_maker
):
"""Test that permalink_exists respects project isolation."""
async with db.scoped_session(session_maker) as session:
# Create entity in repository's project
entity1 = Entity(
project_id=entity_repository.project_id,
title="Project 1 Entity",
entity_type="test",
permalink="test/entity1",
file_path="test/entity1.md",
content_type="text/markdown",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
session.add(entity1)
# Create a second project with same permalink
project2 = Project(name="other-project", path="/tmp/other")
session.add(project2)
await session.flush()
entity2 = Entity(
project_id=project2.id,
title="Project 2 Entity",
entity_type="test",
permalink="test/entity2",
file_path="test/entity2.md",
content_type="text/markdown",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
session.add(entity2)
# Should find entity1's permalink in project 1
assert await entity_repository.permalink_exists("test/entity1") is True
# Should NOT find entity2's permalink (it's in project 2)
assert await entity_repository.permalink_exists("test/entity2") is False
@pytest.mark.asyncio
async def test_get_file_path_for_permalink(
entity_repository: EntityRepository, sample_entity: Entity
):
"""Test getting file_path for a permalink without loading full entity."""
# Existing permalink should return file_path
file_path = await entity_repository.get_file_path_for_permalink(sample_entity.permalink) # pyright: ignore [reportArgumentType]
assert file_path == sample_entity.file_path
# Non-existent permalink should return None
result = await entity_repository.get_file_path_for_permalink("nonexistent/permalink")
assert result is None
@pytest.mark.asyncio
async def test_get_permalink_for_file_path(
entity_repository: EntityRepository, sample_entity: Entity
):
"""Test getting permalink for a file_path without loading full entity."""
# Existing file_path should return permalink
permalink = await entity_repository.get_permalink_for_file_path(sample_entity.file_path)
assert permalink == sample_entity.permalink
# Non-existent file_path should return None
result = await entity_repository.get_permalink_for_file_path("nonexistent/path.md")
assert result is None
@pytest.mark.asyncio
async def test_get_all_permalinks(entity_repository: EntityRepository, session_maker):
"""Test getting all permalinks without loading full entities."""
async with db.scoped_session(session_maker) as session:
entity1 = Entity(
project_id=entity_repository.project_id,
title="Entity 1",
entity_type="test",
permalink="test/entity1",
file_path="test/entity1.md",
content_type="text/markdown",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
entity2 = Entity(
project_id=entity_repository.project_id,
title="Entity 2",
entity_type="test",
permalink="test/entity2",
file_path="test/entity2.md",
content_type="text/markdown",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
session.add_all([entity1, entity2])
permalinks = await entity_repository.get_all_permalinks()
assert len(permalinks) == 2
assert set(permalinks) == {"test/entity1", "test/entity2"}
# Results should be strings, not entities
for permalink in permalinks:
assert isinstance(permalink, str)
@pytest.mark.asyncio
async def test_get_permalink_to_file_path_map(entity_repository: EntityRepository, session_maker):
"""Test getting permalink -> file_path mapping for bulk operations."""
async with db.scoped_session(session_maker) as session:
entity1 = Entity(
project_id=entity_repository.project_id,
title="Entity 1",
entity_type="test",
permalink="test/entity1",
file_path="test/entity1.md",
content_type="text/markdown",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
entity2 = Entity(
project_id=entity_repository.project_id,
title="Entity 2",
entity_type="test",
permalink="test/entity2",
file_path="test/entity2.md",
content_type="text/markdown",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
session.add_all([entity1, entity2])
mapping = await entity_repository.get_permalink_to_file_path_map()
assert len(mapping) == 2
assert mapping["test/entity1"] == "test/entity1.md"
assert mapping["test/entity2"] == "test/entity2.md"
@pytest.mark.asyncio
async def test_get_file_path_to_permalink_map(entity_repository: EntityRepository, session_maker):
"""Test getting file_path -> permalink mapping for bulk operations."""
async with db.scoped_session(session_maker) as session:
entity1 = Entity(
project_id=entity_repository.project_id,
title="Entity 1",
entity_type="test",
permalink="test/entity1",
file_path="test/entity1.md",
content_type="text/markdown",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
entity2 = Entity(
project_id=entity_repository.project_id,
title="Entity 2",
entity_type="test",
permalink="test/entity2",
file_path="test/entity2.md",
content_type="text/markdown",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
session.add_all([entity1, entity2])
mapping = await entity_repository.get_file_path_to_permalink_map()
assert len(mapping) == 2
assert mapping["test/entity1.md"] == "test/entity1"
assert mapping["test/entity2.md"] == "test/entity2"
```
--------------------------------------------------------------------------------
/specs/SPEC-9 Multi-Project Bidirectional Sync Architecture.md:
--------------------------------------------------------------------------------
```markdown
---
title: 'SPEC-9: Multi-Project Bidirectional Sync Architecture'
type: spec
permalink: specs/spec-9-multi-project-bisync
tags:
- cloud
- bisync
- architecture
- multi-project
---
# SPEC-9: Multi-Project Bidirectional Sync Architecture
## Status: ✅ Implementation Complete
**Completed Phases:**
- ✅ Phase 1: Cloud Mode Toggle & Config
- ✅ Phase 2: Bisync Updates (Multi-Project)
- ✅ Phase 3: Sync Command Dual Mode
- ✅ Phase 4: Remove Duplicate Commands & Cloud Mode Auth
- ✅ Phase 5: Mount Updates
- ✅ Phase 6: Safety & Validation
- ⏸️ Phase 7: Cloud-Side Implementation (Deferred to cloud repo)
- ✅ Phase 8.1: Testing (All test scenarios validated)
- ✅ Phase 8.2: Documentation (Core docs complete, demos pending)
**Key Achievements:**
- Unified CLI: `bm sync`, `bm project`, `bm tool` work transparently in both local and cloud modes
- Multi-project sync: Single `bm sync` operation handles all projects bidirectionally
- Cloud mode toggle: `bm cloud login` / `bm cloud logout` switches modes seamlessly
- Integrity checking: `bm cloud check` verifies file matching without data transfer
- Directory isolation: Mount and bisync use separate directories with conflict prevention
- Clean UX: No RCLONE_TEST files, clear error messages, transparent implementation
## Why
**Current State:**
SPEC-8 implemented rclone bisync for cloud file synchronization, but has several architectural limitations:
1. Syncs only a single project subdirectory (`bucket:/basic-memory`)
2. Requires separate `bm cloud` command namespace, duplicating existing CLI commands
3. Users must learn different commands for local vs cloud operations
4. RCLONE_TEST marker files clutter user directories
**Problems:**
1. **Duplicate Commands**: `bm project` vs `bm cloud project`, `bm tool` vs (no cloud equivalent)
2. **Inconsistent UX**: Same operations require different command syntax depending on mode
3. **Single Project Sync**: Users can only sync one project at a time
4. **Manual Coordination**: Creating new projects requires manual coordination between local and cloud
5. **Confusing Artifacts**: RCLONE_TEST marker files confuse users
**Goals:**
- **Unified CLI**: All existing `bm` commands work in both local and cloud mode via toggle
- **Multi-Project Sync**: Single sync operation handles all projects bidirectionally
- **Simple Mode Switch**: `bm cloud login` enables cloud mode, `logout` returns to local
- **Automatic Registration**: Projects auto-register on both local and cloud sides
- **Clean UX**: Remove unnecessary safety checks and confusing artifacts
## Cloud Access Paradigm: The Dropbox Model
**Mental Model Shift:**
Basic Memory cloud access follows the **Dropbox/iCloud paradigm** - not a per-project cloud connection model.
**What This Means:**
```
Traditional Project-Based Model (❌ Not This):
bm cloud mount --project work # Mount individual project
bm cloud mount --project personal # Mount another project
bm cloud sync --project research # Sync specific project
→ Multiple connections, multiple credentials, complex management
Dropbox Model (✅ This):
bm cloud mount # One mount, all projects
bm sync # One sync, all projects
~/basic-memory-cloud/ # One folder, all content
→ Single connection, organized by folders (projects)
```
**Key Principles:**
1. **Mount/Bisync = Access Methods, Not Project Tools**
- Mount: Read-through cache to cloud (like Dropbox folder)
- Bisync: Bidirectional sync with cloud (like Dropbox sync)
- Both operate at **bucket level** (all projects)
2. **Projects = Organization Within Cloud Space**
- Projects are folders within your cloud storage
- Creating a folder creates a project (auto-discovered)
- Projects are managed via `bm project` commands
3. **One Cloud Space Per Machine**
- One set of IAM credentials per tenant
- One mount point: `~/basic-memory-cloud/`
- One bisync directory: `~/basic-memory-cloud-sync/` (default)
- All projects accessible through this single entry point
4. **Why This Works Better**
- **Credential Management**: One credential set, not N sets per project
- **Resource Efficiency**: One rclone process, not N processes
- **Familiar Pattern**: Users already understand Dropbox/iCloud
- **Operational Simplicity**: `mount` once, `unmount` once
- **Scales Naturally**: Add projects by creating folders, not reconfiguring cloud access
**User Journey:**
```bash
# Setup cloud access (once)
bm cloud login
bm cloud mount # or: bm cloud setup for bisync
# Work with projects (create folders as needed)
cd ~/basic-memory-cloud/
mkdir my-new-project
echo "# Notes" > my-new-project/readme.md
# Cloud auto-discovers and registers project
# No additional cloud configuration needed
```
This paradigm shift means **mount and bisync are infrastructure concerns**, while **projects are content organization**. Users think about their knowledge, not about cloud plumbing.
## What
This spec affects:
1. **Cloud Mode Toggle** (`config.py`, `async_client.py`):
- Add `cloud_mode` flag to `~/.basic-memory/config.json`
- Set/unset `BASIC_MEMORY_PROXY_URL` based on cloud mode
- `bm cloud login` enables cloud mode, `logout` disables it
- All CLI commands respect cloud mode via existing async_client
2. **Unified CLI Commands**:
- **Remove**: `bm cloud project` commands (duplicate of `bm project`)
- **Enhance**: `bm sync` co-opted for bisync in cloud mode
- **Keep**: `bm cloud login/logout/status/setup` for mode management
- **Result**: `bm project`, `bm tool`, `bm sync` work in both modes
3. **Bisync Integration** (`bisync_commands.py`):
- Remove `--check-access` (no RCLONE_TEST files)
- Sync bucket root (all projects), not single subdirectory
- Project auto-registration before sync
- `bm sync` triggers bisync in cloud mode
- `bm sync --watch` for continuous sync
4. **Config Structure**:
```json
{
"cloud_mode": true,
"cloud_host": "https://cloud.basicmemory.com",
"auth_tokens": {...},
"bisync_config": {
"profile": "balanced",
"sync_dir": "~/basic-memory-cloud-sync"
}
}
```
5. **User Workflows**:
- **Enable cloud**: `bm cloud login` → all commands work remotely
- **Create projects**: `bm project add "name"` creates on cloud
- **Sync files**: `bm sync` runs bisync (all projects)
- **Use tools**: `bm tool write-note` creates notes on cloud
- **Disable cloud**: `bm cloud logout` → back to local mode
## Implementation Tasks
### Phase 1: Cloud Mode Toggle & Config (Foundation) ✅
**1.1 Update Config Schema**
- [x] Add `cloud_mode: bool = False` to Config model
- [x] Add `bisync_config: dict` with `profile` and `sync_dir` fields
- [x] Ensure `cloud_host` field exists
- [x] Add config migration for existing users (defaults handle this)
**1.2 Update async_client.py**
- [x] Read `cloud_mode` from config (not just environment)
- [x] Set `BASIC_MEMORY_PROXY_URL` from config when `cloud_mode=true`
- [x] Priority: env var > config.cloud_host (if cloud_mode) > None (local ASGI)
- [ ] Test both local and cloud mode routing
**1.3 Update Login/Logout Commands**
- [x] `bm cloud login`: Set `cloud_mode=true` and save config
- [x] `bm cloud login`: Set `BASIC_MEMORY_PROXY_URL` environment variable
- [x] `bm cloud logout`: Set `cloud_mode=false` and save config
- [x] `bm cloud logout`: Clear `BASIC_MEMORY_PROXY_URL` environment variable
- [x] `bm cloud status`: Show current mode (local/cloud), connection status
**1.4 Skip Initialization in Cloud Mode** ✅
- [x] Update `ensure_initialization()` to check `cloud_mode` and return early
- [x] Document that `config.projects` is only used in local mode
- [x] Cloud manages its own projects via API, no local reconciliation needed
### Phase 2: Bisync Updates (Multi-Project)
**2.1 Remove RCLONE_TEST Files** ✅
- [x] Update all bisync profiles: `check_access=False`
- [x] Remove RCLONE_TEST creation from `setup_cloud_bisync()`
- [x] Remove RCLONE_TEST upload logic
- [ ] Update documentation
**2.2 Sync Bucket Root (All Projects)** ✅
- [x] Change remote path from `bucket:/basic-memory` to `bucket:/` in `build_bisync_command()`
- [x] Update `setup_cloud_bisync()` to use bucket root
- [ ] Test with multiple projects
**2.3 Project Auto-Registration (Bisync)** ✅
- [x] Add `fetch_cloud_projects()` function (GET /proxy/projects/projects)
- [x] Add `scan_local_directories()` function
- [x] Add `create_cloud_project()` function (POST /proxy/projects/projects)
- [x] Integrate into `run_bisync()`: fetch → scan → create missing → sync
- [x] Wait for API 201 response before syncing
**2.4 Bisync Directory Configuration** ✅
- [x] Add `--dir` parameter to `bm cloud bisync-setup`
- [x] Store bisync directory in config
- [x] Default to `~/basic-memory-cloud-sync/`
- [x] Add `validate_bisync_directory()` safety check
- [x] Update `get_default_mount_path()` to return fixed `~/basic-memory-cloud/`
**2.5 Sync/Status API Infrastructure** ✅ (commit d48b1dc)
- [x] Create `POST /{project}/project/sync` endpoint for background sync
- [x] Create `POST /{project}/project/status` endpoint for scan-only status
- [x] Create `SyncReportResponse` Pydantic schema
- [x] Refactor CLI `sync` command to use API endpoint
- [x] Refactor CLI `status` command to use API endpoint
- [x] Create `command_utils.py` with shared `run_sync()` function
- [x] Update `notify_container_sync()` to call `run_sync()` for each project
- [x] Update all tests to match new API-based implementation
### Phase 3: Sync Command Dual Mode ✅
**3.1 Update `bm sync` Command** ✅
- [x] Check `config.cloud_mode` at start
- [x] If `cloud_mode=false`: Run existing local sync
- [x] If `cloud_mode=true`: Run bisync
- [x] Add `--watch` parameter for continuous sync
- [x] Add `--interval` parameter (default 60 seconds)
- [x] Error if `--watch` used in local mode with helpful message
**3.2 Watch Mode for Bisync** ✅
- [x] Implement `run_bisync_watch()` with interval loop
- [x] Add `--interval` parameter (default 60 seconds)
- [x] Handle errors gracefully, continue on failure
- [x] Show sync progress and status
**3.3 Integrity Check Command** ✅
- [x] Implement `bm cloud check` command using `rclone check`
- [x] Read-only operation that verifies file matching
- [x] Error with helpful messages if rclone/bisync not set up
- [x] Support `--one-way` flag for faster checks
- [x] Transparent about rclone implementation
- [x] Suggest `bm sync` to resolve differences
**Implementation Notes:**
- `bm sync` adapts to cloud mode automatically - users don't need separate commands
- `bm cloud bisync` kept for power users with full options (--dry-run, --resync, --profile, --verbose)
- `bm cloud check` provides integrity verification without transferring data
- Design philosophy: Simplicity for everyday use, transparency about implementation
### Phase 4: Remove Duplicate Commands & Cloud Mode Auth ✅
**4.0 Cloud Mode Authentication** ✅
- [x] Update `async_client.py` to support dual auth sources
- [x] FastMCP context auth (cloud service mode) via `inject_auth_header()`
- [x] JWT token file auth (CLI cloud mode) via `CLIAuth.get_valid_token()`
- [x] Automatic token refresh for CLI cloud mode
- [x] Remove `BASIC_MEMORY_PROXY_URL` environment variable dependency
- [x] Simplify to use only `config.cloud_mode` + `config.cloud_host`
**4.1 Delete `bm cloud project` Commands** ✅
- [x] Remove `bm cloud project list` (use `bm project list`)
- [x] Remove `bm cloud project add` (use `bm project add`)
- [x] Update `core_commands.py` to remove project_app subcommands
- [x] Keep only: `login`, `logout`, `status`, `setup`, `mount`, `unmount`, bisync commands
- [x] Remove unused imports (Table, generate_permalink, os)
- [x] Clean up environment variable references in login/logout
**4.2 CLI Command Cloud Mode Integration** ✅
- [x] Add runtime `cloud_mode_enabled` checks to all CLI commands
- [x] Update `list_projects()` to conditionally authenticate based on cloud mode
- [x] Update `remove_project()` to conditionally authenticate based on cloud mode
- [x] Update `run_sync()` to conditionally authenticate based on cloud mode
- [x] Update `get_project_info()` to conditionally authenticate based on cloud mode
- [x] Update `run_status()` to conditionally authenticate based on cloud mode
- [x] Remove auth from `set_default_project()` (local-only command, no cloud version)
- [x] Create CLI integration tests (`test-int/cli/`) to validate both local and cloud modes
- [x] Replace mock-heavy CLI tests with integration tests (deleted 5 mock test files)
**4.3 OAuth Authentication Fixes** ✅
- [x] Restore missing `SettingsConfigDict` in `BasicMemoryConfig`
- [x] Fix environment variable reading with `BASIC_MEMORY_` prefix
- [x] Fix `.env` file loading
- [x] Fix extra field handling for config files
- [x] Resolve `bm cloud login` OAuth failure ("Something went wrong" error)
- [x] Implement PKCE (Proof Key for Code Exchange) for device flow
- [x] Generate code verifier and SHA256 challenge for device authorization
- [x] Send code_verifier with token polling requests
- [x] Support both PKCE-required and PKCE-optional OAuth clients
- [x] Verify authentication flow works end-to-end with staging and production
- [x] Document WorkOS requirement: redirect URI must be configured even for device flow
**4.4 Update Documentation**
- [ ] Update `cloud-cli.md` with cloud mode toggle workflow
- [ ] Document `bm cloud login` → use normal commands
- [ ] Add examples of cloud mode usage
- [ ] Document mount vs bisync directory isolation
- [ ] Add troubleshooting section
### Phase 5: Mount Updates ✅
**5.1 Fixed Mount Directory** ✅
- [x] Change mount path to `~/basic-memory-cloud/` (fixed, no tenant ID)
- [x] Update `get_default_mount_path()` function
- [x] Remove configurability (fixed location)
- [x] Update mount commands to use new path
**5.2 Mount at Bucket Root** ✅
- [x] Ensure mount uses bucket root (not subdirectory)
- [x] Test with multiple projects
- [x] Verify all projects visible in mount
**Implementation:** Mount uses fixed `~/basic-memory-cloud/` directory and syncs entire bucket root `basic-memory-{tenant_id}:{bucket_name}` for all projects.
### Phase 6: Safety & Validation ✅
**6.1 Directory Conflict Prevention** ✅
- [x] Implement `validate_bisync_directory()` check
- [x] Detect if bisync dir == mount dir
- [x] Detect if bisync dir is currently mounted
- [x] Show clear error messages with solutions
**6.2 State Management** ✅
- [x] Use `--workdir` for bisync state
- [x] Store state in `~/.basic-memory/bisync-state/{tenant-id}/`
- [x] Ensure state directory created before bisync
**Implementation:** `validate_bisync_directory()` prevents conflicts by checking directory equality and mount status. State managed in isolated `~/.basic-memory/bisync-state/{tenant-id}/` directory using `--workdir` flag.
### Phase 7: Cloud-Side Implementation (Deferred to Cloud Repo)
**7.1 Project Discovery Service (Cloud)** - Deferred
- [ ] Create `ProjectDiscoveryService` background job
- [ ] Scan `/app/data/` every 2 minutes
- [ ] Auto-register new directories as projects
- [ ] Log discovery events
- [ ] Handle errors gracefully
**7.2 Project API Updates (Cloud)** - Deferred
- [ ] Ensure `POST /proxy/projects/projects` creates directory synchronously
- [ ] Return 201 with project details
- [ ] Ensure directory ready immediately after creation
**Note:** Phase 7 is cloud-side work that belongs in the basic-memory-cloud repository. The CLI-side implementation (Phase 2.3 auto-registration) is complete and working - it calls the existing cloud API endpoints.
### Phase 8: Testing & Documentation
**8.1 Test Scenarios**
- [x] Test: Cloud mode toggle (login/logout)
- [x] Test: Local-first project creation (bisync)
- [x] Test: Cloud-first project creation (API)
- [x] Test: Multi-project bidirectional sync
- [x] Test: MCP tools in cloud mode
- [x] Test: Watch mode continuous sync
- [x] Test: Safety profile protection (max_delete implemented)
- [x] Test: No RCLONE_TEST files (check_access=False in all profiles)
- [x] Test: Mount/bisync directory isolation (validate_bisync_directory)
- [x] Test: Integrity check command (bm cloud check)
**8.2 Documentation**
- [x] Update cloud-cli.md with cloud mode instructions
- [x] Document Dropbox model paradigm
- [x] Update command reference with new commands
- [x] Document `bm sync` dual mode behavior
- [x] Document `bm cloud check` command
- [x] Document directory structure and fixed paths
- [ ] Update README with quick start
- [ ] Create migration guide for existing users
- [ ] Create video/GIF demos
### Success Criteria Checklist
- [x] `bm cloud login` enables cloud mode for all commands
- [x] `bm cloud logout` reverts to local mode
- [x] `bm project`, `bm tool`, `bm sync` work transparently in both modes
- [x] `bm sync` runs bisync in cloud mode, local sync in local mode
- [x] Single sync operation handles all projects bidirectionally
- [x] Local directories auto-create cloud projects via API
- [x] Cloud projects auto-sync to local directories
- [x] No RCLONE_TEST files in user directories
- [x] Bisync profiles provide safety via `max_delete` limits
- [x] `bm sync --watch` enables continuous sync
- [x] No duplicate `bm cloud project` commands (removed)
- [x] `bm cloud check` command for integrity verification
- [ ] Documentation covers cloud mode toggle and workflows
- [ ] Edge cases handled gracefully with clear errors
## How (High Level)
### Architecture Overview
**Cloud Mode Toggle:**
```
┌─────────────────────────────────────┐
│ bm cloud login │
│ ├─ Authenticate via OAuth │
│ ├─ Set cloud_mode: true in config │
│ └─ Set BASIC_MEMORY_PROXY_URL │
└─────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ All CLI commands use async_client │
│ ├─ async_client checks proxy URL │
│ ├─ If set: HTTP to cloud │
│ └─ If not: Local ASGI │
└─────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ bm project add "work" │
│ bm tool write-note ... │
│ bm sync (triggers bisync) │
│ → All work against cloud │
└─────────────────────────────────────┘
```
**Storage Hierarchy:**
```
Cloud Container: Bucket: Local Sync Dir:
/app/data/ (mounted) ←→ production-tenant-{id}/ ←→ ~/basic-memory-cloud-sync/
├── basic-memory/ ├── basic-memory/ ├── basic-memory/
│ ├── notes/ │ ├── notes/ │ ├── notes/
│ └── concepts/ │ └── concepts/ │ └── concepts/
├── work-project/ ├── work-project/ ├── work-project/
│ └── tasks/ │ └── tasks/ │ └── tasks/
└── personal/ └── personal/ └── personal/
└── journal/ └── journal/ └── journal/
Bidirectional sync via rclone bisync
```
### Sync Flow
**`bm sync` execution (in cloud mode):**
1. **Check cloud mode**
```python
if not config.cloud_mode:
# Run normal local file sync
run_local_sync()
return
# Cloud mode: Run bisync
```
2. **Fetch cloud projects**
```python
# GET /proxy/projects/projects (via async_client)
cloud_projects = fetch_cloud_projects()
cloud_project_names = {p["name"] for p in cloud_projects["projects"]}
```
3. **Scan local sync directory**
```python
sync_dir = config.bisync_config["sync_dir"] # ~/basic-memory-cloud-sync
local_dirs = [d.name for d in sync_dir.iterdir()
if d.is_dir() and not d.name.startswith('.')]
```
4. **Create missing cloud projects**
```python
for dir_name in local_dirs:
if dir_name not in cloud_project_names:
# POST /proxy/projects/projects (via async_client)
create_cloud_project(name=dir_name)
# Blocks until 201 response
```
5. **Run bisync on bucket root**
```bash
rclone bisync \
~/basic-memory-cloud-sync \
basic-memory-{tenant}:{bucket} \
--filters-file ~/.basic-memory/.bmignore.rclone \
--conflict-resolve=newer \
--max-delete=25
# Syncs ALL project subdirectories bidirectionally
```
6. **Notify cloud to refresh** (commit d48b1dc)
```python
# After rclone bisync completes, sync each project's database
for project in cloud_projects:
# POST /{project}/project/sync (via async_client)
# Triggers background sync for this project
await run_sync(project=project_name)
```
### Key Changes
**1. Cloud Mode via Config**
**Config changes:**
```python
class Config:
cloud_mode: bool = False
cloud_host: str = "https://cloud.basicmemory.com"
bisync_config: dict = {
"profile": "balanced",
"sync_dir": "~/basic-memory-cloud-sync"
}
```
**async_client.py behavior:**
```python
def create_client() -> AsyncClient:
# Check config first, then environment
config = ConfigManager().config
proxy_url = os.getenv("BASIC_MEMORY_PROXY_URL") or \
(config.cloud_host if config.cloud_mode else None)
if proxy_url:
return AsyncClient(base_url=proxy_url) # HTTP to cloud
else:
return AsyncClient(transport=ASGITransport(...)) # Local ASGI
```
**2. Login/Logout Sets Cloud Mode**
```python
# bm cloud login
async def login():
# Existing OAuth flow...
success = await auth.login()
if success:
config.cloud_mode = True
config.save()
os.environ["BASIC_MEMORY_PROXY_URL"] = config.cloud_host
```
```python
# bm cloud logout
def logout():
config.cloud_mode = False
config.save()
os.environ.pop("BASIC_MEMORY_PROXY_URL", None)
```
**3. Remove Duplicate Commands**
**Delete:**
- `bm cloud project list` → use `bm project list`
- `bm cloud project add` → use `bm project add`
**Keep:**
- `bm cloud login` - Enable cloud mode
- `bm cloud logout` - Disable cloud mode
- `bm cloud status` - Show current mode & connection
- `bm cloud setup` - Initial bisync setup
- `bm cloud bisync` - Power-user command with full options
- `bm cloud check` - Verify file integrity between local and cloud
**4. Sync Command Dual Mode**
```python
# bm sync
def sync_command(watch: bool = False, profile: str = "balanced"):
config = ConfigManager().config
if config.cloud_mode:
# Run bisync for cloud sync
run_bisync(profile=profile, watch=watch)
else:
# Run local file sync
run_local_sync()
```
**5. Remove RCLONE_TEST Files**
```python
# All profiles: check_access=False
BISYNC_PROFILES = {
"safe": RcloneBisyncProfile(check_access=False, max_delete=10),
"balanced": RcloneBisyncProfile(check_access=False, max_delete=25),
"fast": RcloneBisyncProfile(check_access=False, max_delete=50),
}
```
**6. Sync Bucket Root (All Projects)**
```python
# Sync entire bucket, not subdirectory
rclone_remote = f"basic-memory-{tenant_id}:{bucket_name}"
```
## How to Evaluate
### Test Scenarios
**1. Cloud Mode Toggle**
```bash
# Start in local mode
bm project list
# → Shows local projects
# Enable cloud mode
bm cloud login
# → Authenticates, sets cloud_mode=true
bm project list
# → Now shows cloud projects (same command!)
# Disable cloud mode
bm cloud logout
bm project list
# → Back to local projects
```
**Expected:** ✅ Single command works in both modes
**2. Local-First Project Creation (Cloud Mode)**
```bash
# Enable cloud mode
bm cloud login
# Create new project locally in sync dir
mkdir ~/basic-memory-cloud-sync/my-research
echo "# Research Notes" > ~/basic-memory-cloud-sync/my-research/index.md
# Run sync (triggers bisync in cloud mode)
bm sync
# Verify:
# - Cloud project created automatically via API
# - Files synced to bucket:/my-research/
# - Cloud database updated
# - `bm project list` shows new project
```
**Expected:** ✅ Project visible in cloud project list
**3. Cloud-First Project Creation**
```bash
# In cloud mode
bm project add "work-notes"
# → Creates project on cloud (via async_client HTTP)
# Run sync
bm sync
# Verify:
# - Local directory ~/basic-memory-cloud-sync/work-notes/ created
# - Files sync bidirectionally
# - Can use `bm tool write-note` to add content remotely
```
**Expected:** ✅ Project accessible via all CLI commands
**4. Multi-Project Bidirectional Sync**
```bash
# Setup: 3 projects in cloud mode
# Modify files in all 3 locally and remotely
bm sync
# Verify:
# - All 3 projects sync simultaneously
# - Changes propagate correctly
# - No cross-project interference
```
**Expected:** ✅ All projects in sync state
**5. MCP Tools Work in Cloud Mode**
```bash
# In cloud mode
bm tool write-note \
--title "Meeting Notes" \
--folder "work-notes" \
--content "Discussion points..."
# Verify:
# - Note created on cloud (via async_client HTTP)
# - Next `bm sync` pulls note to local
# - Note appears in ~/basic-memory-cloud-sync/work-notes/
```
**Expected:** ✅ Tools work transparently in cloud mode
**6. Watch Mode Continuous Sync**
```bash
# In cloud mode
bm sync --watch
# While running:
# - Create local folder → auto-creates cloud project
# - Edit files locally → syncs to cloud
# - Edit files remotely → syncs to local
# - Create project via API → appears locally
# Verify:
# - Continuous bidirectional sync
# - New projects handled automatically
# - No manual intervention needed
```
**Expected:** ✅ Seamless continuous sync
**7. Safety Profile Protection**
```bash
# Create project with 15 files locally
# Delete project from cloud (simulate error)
bm sync --profile safe
# Verify:
# - Bisync detects 15 pending deletions
# - Exceeds max_delete=10 limit
# - Aborts with clear error
# - No files deleted locally
```
**Expected:** ✅ Safety limit prevents data loss
**8. No RCLONE_TEST Files**
```bash
# After setup and multiple syncs
ls -la ~/basic-memory-cloud-sync/
# Verify:
# - No RCLONE_TEST files
# - No .rclone state files (in ~/.basic-memory/bisync-state/)
# - Clean directory structure
```
**Expected:** ✅ User directory stays clean
### Success Criteria
- [x] `bm cloud login` enables cloud mode for all commands
- [x] `bm cloud logout` reverts to local mode
- [x] `bm project`, `bm tool`, `bm sync` work in both modes transparently
- [x] `bm sync` runs bisync in cloud mode, local sync in local mode
- [x] Single sync operation handles all projects bidirectionally
- [x] Local directories auto-create cloud projects via API
- [x] Cloud projects auto-sync to local directories
- [x] No RCLONE_TEST files in user directories
- [x] Bisync profiles provide safety via `max_delete` limits
- [x] `bm sync --watch` enables continuous sync
- [x] No duplicate `bm cloud project` commands (removed)
- [x] `bm cloud check` command for integrity verification
- [ ] Documentation covers cloud mode toggle and workflows
- [ ] Edge cases handled gracefully with clear errors
## Notes
### API Contract
**Cloud must provide:**
1. **Project Management APIs:**
- `GET /proxy/projects/projects` - List all projects
- `POST /proxy/projects/projects` - Create project synchronously
- `POST /proxy/sync` - Trigger cache refresh
2. **Project Discovery Service (Background):**
- **Purpose**: Auto-register projects created via mount, direct bucket uploads, or any non-API method
- **Interval**: Every 2 minutes
- **Behavior**:
- Scan `/app/data/` for directories
- Register any directory not already in project database
- Log discovery events
- **Implementation**:
```python
class ProjectDiscoveryService:
"""Background service to auto-discover projects from filesystem."""
async def run(self):
"""Scan /app/data/ and register new project directories."""
data_path = Path("/app/data")
for dir_path in data_path.iterdir():
# Skip hidden and special directories
if not dir_path.is_dir() or dir_path.name.startswith('.'):
continue
project_name = dir_path.name
# Check if project already registered
project = await self.project_repo.get_by_name(project_name)
if not project:
# Auto-register new project
await self.project_repo.create(
name=project_name,
path=str(dir_path)
)
logger.info(f"Auto-discovered project: {project_name}")
```
**Project Creation (API-based):**
- API creates `/app/data/{project-name}/` directory
- Registers project in database
- Returns 201 with project details
- Directory ready for bisync immediately
**Project Creation (Discovery-based):**
- User creates folder via mount: `~/basic-memory-cloud/new-project/`
- Files appear in `/app/data/new-project/` (mounted bucket)
- Discovery service finds directory on next scan (within 2 minutes)
- Auto-registers as project
- User sees project in `bm project list` after discovery
**Why Both Methods:**
- **API**: Immediate registration when using bisync (client-side scan + API call)
- **Discovery**: Delayed registration when using mount (no API call hook)
- **Result**: Projects created ANY way (API, mount, bisync, WebDAV) eventually registered
- **Trade-off**: 2-minute delay for mount-created projects is acceptable
### Mount vs Bisync Directory Isolation
**Critical Safety Requirement**: Mount and bisync MUST use different directories to prevent conflicts.
**The Dropbox Model Applied:**
Both mount and bisync operate at **bucket level** (all projects), following the Dropbox/iCloud paradigm:
```
~/basic-memory-cloud/ # Mount: Read-through cache (like Dropbox folder)
├── work-notes/
├── personal/
└── research/
~/basic-memory-cloud-sync/ # Bisync: Bidirectional sync (like Dropbox sync folder)
├── work-notes/
├── personal/
└── research/
```
**Mount Directory (Fixed):**
```bash
# Fixed location, not configurable
~/basic-memory-cloud/
```
- **Scope**: Entire bucket (all projects)
- **Method**: NFS mount via `rclone nfsmount`
- **Behavior**: Read-through cache to cloud bucket
- **Credentials**: One IAM credential set per tenant
- **Process**: One rclone mount process
- **Use Case**: Quick access, browsing, light editing
- **Known Issue**: Obsidian compatibility problems with NFS
- **Not Configurable**: Fixed location prevents user error
**Why Fixed Location:**
- One mount point per machine (like `/Users/you/Dropbox`)
- Prevents credential proliferation (one credential set, not N)
- Prevents multiple mount processes (resource efficiency)
- Familiar pattern users already understand
- Simple operations: `mount` once, `unmount` once
**Bisync Directory (User Configurable):**
```bash
# Default location
~/basic-memory-cloud-sync/
# User can override
bm cloud setup --dir ~/my-knowledge-base
```
- **Scope**: Entire bucket (all projects)
- **Method**: Bidirectional sync via `rclone bisync`
- **Behavior**: Full local copy with periodic sync
- **Credentials**: Same IAM credential set as mount
- **Use Case**: Full offline access, reliable editing, Obsidian support
- **Configurable**: Users may want specific locations (external drive, existing folder structure)
**Why User Configurable:**
- Users have preferences for where local copies live
- May want sync folder on external drive
- May want to integrate with existing folder structure
- Default works for most, option available for power users
**Conflict Prevention:**
```python
def validate_bisync_directory(bisync_dir: Path):
"""Ensure bisync directory doesn't conflict with mount."""
mount_dir = Path.home() / "basic-memory-cloud"
if bisync_dir.resolve() == mount_dir.resolve():
raise BisyncError(
f"Cannot use {bisync_dir} for bisync - it's the mount directory!\n"
f"Mount and bisync must use different directories.\n\n"
f"Options:\n"
f" 1. Use default: ~/basic-memory-cloud-sync/\n"
f" 2. Specify different directory: --dir ~/my-sync-folder"
)
# Check if mount is active at this location
result = subprocess.run(["mount"], capture_output=True, text=True)
if str(bisync_dir) in result.stdout and "rclone" in result.stdout:
raise BisyncError(
f"{bisync_dir} is currently mounted via 'bm cloud mount'\n"
f"Cannot use mounted directory for bisync.\n\n"
f"Either:\n"
f" 1. Unmount first: bm cloud unmount\n"
f" 2. Use different directory for bisync"
)
```
**Why This Matters:**
- Mounting and syncing the SAME directory would create infinite loops
- rclone mount → bisync detects changes → syncs to bucket → mount sees changes → triggers bisync → ∞
- Separate directories = clean separation of concerns
- Mount is read-heavy caching layer, bisync is write-heavy bidirectional sync
### Future Enhancements
**Phase 2 (Not in this spec):**
- **Near Real-Time Sync**: Integrate `watch_service.py` with cloud mode
- Watch service detects local changes (already battle-tested)
- Queue changes in memory
- Use `rclone copy` for individual file sync (near instant)
- Example: `rclone copyto ~/sync/project/file.md tenant:{bucket}/project/file.md`
- Fallback to full `rclone bisync` every N seconds for bidirectional changes
- Provides near real-time sync without polling overhead
- Per-project bisync profiles (different safety levels per project)
- Selective project sync (exclude specific projects from sync)
- Project deletion workflow (cascade to cloud/local)
- Conflict resolution UI/CLI
**Phase 3:**
- Project sharing between tenants
- Incremental backup/restore
- Sync statistics and bandwidth monitoring
- Mobile app integration with cloud mode
### Related Specs
- **SPEC-8**: TigrisFS Integration - Original bisync implementation
- **SPEC-6**: Explicit Project Parameter Architecture - Multi-project foundations
- **SPEC-5**: CLI Cloud Upload via WebDAV - Cloud file operations
### Implementation Notes
**Architectural Simplifications:**
- **Unified CLI**: Eliminated duplicate commands by using mode toggle
- **Single Entry Point**: All commands route through `async_client` which handles mode
- **Config-Driven**: Cloud mode stored in persistent config, not just environment
- **Transparent Routing**: Existing commands work without modification in cloud mode
**Complexity Trade-offs:**
- Removed: Separate `bm cloud project` command namespace
- Removed: Complex state detection for new projects
- Removed: RCLONE_TEST marker file management
- Added: Simple cloud_mode flag and config integration
- Added: Simple project list comparison before sync
- Relied on: Existing bisync profile safety mechanisms
- Result: Significantly simpler, more maintainable code
**User Experience:**
- **Mental Model**: "Toggle cloud mode, use normal commands"
- **No Learning Curve**: Same commands work locally and in cloud
- **Minimal Config**: Just login/logout to switch modes
- **Safety**: Profile system gives users control over safety/speed trade-offs
- **"Just Works"**: Create folders anywhere, they sync automatically
**Migration Path:**
- Existing `bm cloud project` users: Use `bm project` instead
- Existing `bm cloud bisync` becomes `bm sync` in cloud mode
- Config automatically migrates on first `bm cloud login`
## Testing
Initial Setup (One Time)
1. Login to cloud and enable cloud mode:
bm cloud login
# → Authenticates via OAuth
# → Sets cloud_mode=true in config
# → Sets BASIC_MEMORY_PROXY_URL environment variable
# → All CLI commands now route to cloud
2. Check cloud mode status:
bm cloud status
# → Shows: Mode: Cloud (enabled)
# → Shows: Host: https://cloud.basicmemory.com
# → Checks cloud health
3. Set up bidirectional sync:
bm cloud bisync-setup
# Or with custom directory:
bm cloud bisync-setup --dir ~/my-sync-folder
# This will:
# → Install rclone (if not already installed)
# → Get tenant info (tenant_id, bucket_name)
# → Generate scoped IAM credentials
# → Configure rclone with credentials
# → Create sync directory (default: ~/basic-memory-cloud-sync/)
# → Validate no conflict with mount directory
# → Run initial --resync to establish baseline
Normal Usage
4. Create local project and sync:
# Create a local project directory
mkdir ~/basic-memory-cloud-sync/my-research
echo "# Research Notes" > ~/basic-memory-cloud-sync/my-research/readme.md
# Run sync
bm cloud bisync
# Auto-magic happens:
# → Checks for new local directories
# → Finds "my-research" not in cloud
# → Creates project on cloud via POST /proxy/projects/projects
# → Runs bidirectional sync (all projects)
# → Syncs to bucket root (all projects synced together)
5. Watch mode for continuous sync:
bm cloud bisync --watch
# Or with custom interval:
bm cloud bisync --watch --interval 30
# → Syncs every 60 seconds (or custom interval)
# → Auto-registers new projects on each run
# → Press Ctrl+C to stop
6. Check bisync status:
bm cloud bisync-status
# → Shows tenant ID
# → Shows sync directory path
# → Shows initialization status
# → Shows last sync time
# → Lists available profiles (safe/balanced/fast)
7. Manual sync with different profiles:
# Safe mode (max 10 deletes, preserves conflicts)
bm cloud bisync --profile safe
# Balanced mode (max 25 deletes, auto-resolve to newer) - default
bm cloud bisync --profile balanced
# Fast mode (max 50 deletes, skip verification)
bm cloud bisync --profile fast
8. Dry run to preview changes:
bm cloud bisync --dry-run
# → Shows what would be synced without making changes
9. Force resync (if needed):
bm cloud bisync --resync
# → Establishes new baseline
# → Use if sync state is corrupted
10. Check file integrity:
bm cloud check
# → Verifies all files match between local and cloud
# → Read-only operation (no data transfer)
# → Shows differences if any found
# Faster one-way check
bm cloud check --one-way
# → Only checks for missing files on destination
Verify Cloud Mode Integration
11. Test that all commands work in cloud mode:
# List cloud projects (not local)
bm project list
# Create project on cloud
bm project add "work-notes"
# Use MCP tools against cloud
bm tool write-note --title "Test" --folder "my-research" --content "Hello"
# All of these work against cloud because cloud_mode=true
12. Switch back to local mode:
bm cloud logout
# → Sets cloud_mode=false
# → Clears BASIC_MEMORY_PROXY_URL
# → All commands now work locally again
Expected Directory Structure
~/basic-memory-cloud-sync/ # Your local sync directory
├── my-research/ # Auto-created cloud project
│ ├── readme.md
│ └── notes.md
├── work-notes/ # Another project
│ └── tasks.md
└── personal/ # Another project
└── journal.md
# All sync bidirectionally with:
bucket:/ # Cloud bucket root
├── my-research/
├── work-notes/
└── personal/
Key Points to Test
1. ✅ Cloud mode toggle works (login/logout)
2. ✅ Bisync setup validates directory (no conflict with mount)
3. ✅ Local directories auto-create cloud projects
4. ✅ All projects sync together (bucket root)
5. ✅ No RCLONE_TEST files created
6. ✅ Changes sync bidirectionally
7. ✅ Watch mode continuous sync works
8. ✅ Profile safety limits work (max_delete)
9. ✅ `bm sync` adapts to cloud mode automatically
10. ✅ `bm cloud check` verifies file integrity without side effects
```
--------------------------------------------------------------------------------
/tests/mcp/test_tool_write_note.py:
--------------------------------------------------------------------------------
```python
"""Tests for note tools that exercise the full stack with SQLite."""
from textwrap import dedent
import pytest
from basic_memory.mcp.tools import write_note, read_note, delete_note
from basic_memory.utils import normalize_newlines
@pytest.mark.asyncio
async def test_write_note(app, test_project):
"""Test creating a new note.
Should:
- Create entity with correct type and content
- Save markdown content
- Handle tags correctly
- Return valid permalink
"""
result = await write_note.fn(
project=test_project.name,
title="Test Note",
folder="test",
content="# Test\nThis is a test note",
tags=["test", "documentation"],
)
assert result
assert "# Created note" in result
assert f"project: {test_project.name}" in result
assert "file_path: test/Test Note.md" in result
assert "permalink: test/test-note" in result
assert "## Tags" in result
assert "- test, documentation" in result
assert f"[Session: Using project '{test_project.name}']" in result
# Try reading it back via permalink
content = await read_note.fn("test/test-note", project=test_project.name)
assert (
normalize_newlines(
dedent("""
---
title: Test Note
type: note
permalink: test/test-note
tags:
- test
- documentation
---
# Test
This is a test note
""").strip()
)
in content
)
@pytest.mark.asyncio
async def test_write_note_no_tags(app, test_project):
"""Test creating a note without tags."""
result = await write_note.fn(
project=test_project.name, title="Simple Note", folder="test", content="Just some text"
)
assert result
assert "# Created note" in result
assert f"project: {test_project.name}" in result
assert "file_path: test/Simple Note.md" in result
assert "permalink: test/simple-note" in result
assert f"[Session: Using project '{test_project.name}']" in result
# Should be able to read it back
content = await read_note.fn("test/simple-note", project=test_project.name)
assert (
normalize_newlines(
dedent("""
---
title: Simple Note
type: note
permalink: test/simple-note
---
Just some text
""").strip()
)
in content
)
@pytest.mark.asyncio
async def test_write_note_update_existing(app, test_project):
"""Test creating a new note.
Should:
- Create entity with correct type and content
- Save markdown content
- Handle tags correctly
- Return valid permalink
"""
result = await write_note.fn(
project=test_project.name,
title="Test Note",
folder="test",
content="# Test\nThis is a test note",
tags=["test", "documentation"],
)
assert result # Got a valid permalink
assert "# Created note" in result
assert f"project: {test_project.name}" in result
assert "file_path: test/Test Note.md" in result
assert "permalink: test/test-note" in result
assert "## Tags" in result
assert "- test, documentation" in result
assert f"[Session: Using project '{test_project.name}']" in result
result = await write_note.fn(
project=test_project.name,
title="Test Note",
folder="test",
content="# Test\nThis is an updated note",
tags=["test", "documentation"],
)
assert "# Updated note" in result
assert f"project: {test_project.name}" in result
assert "file_path: test/Test Note.md" in result
assert "permalink: test/test-note" in result
assert "## Tags" in result
assert "- test, documentation" in result
assert f"[Session: Using project '{test_project.name}']" in result
# Try reading it back
content = await read_note.fn("test/test-note", project=test_project.name)
assert (
normalize_newlines(
dedent(
"""
---
title: Test Note
type: note
permalink: test/test-note
tags:
- test
- documentation
---
# Test
This is an updated note
"""
).strip()
)
== content
)
@pytest.mark.asyncio
async def test_issue_93_write_note_respects_custom_permalink_new_note(app, test_project):
"""Test that write_note respects custom permalinks in frontmatter for new notes (Issue #93)"""
# Create a note with custom permalink in frontmatter
content_with_custom_permalink = dedent("""
---
permalink: custom/my-desired-permalink
---
# My New Note
This note has a custom permalink specified in frontmatter.
- [note] Testing if custom permalink is respected
""").strip()
result = await write_note.fn(
project=test_project.name,
title="My New Note",
folder="notes",
content=content_with_custom_permalink,
)
# Verify the custom permalink is respected
assert "# Created note" in result
assert f"project: {test_project.name}" in result
assert "file_path: notes/My New Note.md" in result
assert "permalink: custom/my-desired-permalink" in result
assert f"[Session: Using project '{test_project.name}']" in result
@pytest.mark.asyncio
async def test_issue_93_write_note_respects_custom_permalink_existing_note(app, test_project):
"""Test that write_note respects custom permalinks when updating existing notes (Issue #93)"""
# Step 1: Create initial note (auto-generated permalink)
result1 = await write_note.fn(
project=test_project.name,
title="Existing Note",
folder="test",
content="Initial content without custom permalink",
)
assert "# Created note" in result1
assert f"project: {test_project.name}" in result1
# Extract the auto-generated permalink
initial_permalink = None
for line in result1.split("\n"):
if line.startswith("permalink:"):
initial_permalink = line.split(":", 1)[1].strip()
break
assert initial_permalink is not None
# Step 2: Update with content that includes custom permalink in frontmatter
updated_content = dedent("""
---
permalink: custom/new-permalink
---
# Existing Note
Updated content with custom permalink in frontmatter.
- [note] Custom permalink should be respected on update
""").strip()
result2 = await write_note.fn(
project=test_project.name,
title="Existing Note",
folder="test",
content=updated_content,
)
# Verify the custom permalink is respected
assert "# Updated note" in result2
assert f"project: {test_project.name}" in result2
assert "permalink: custom/new-permalink" in result2
assert f"permalink: {initial_permalink}" not in result2
assert f"[Session: Using project '{test_project.name}']" in result2
@pytest.mark.asyncio
async def test_delete_note_existing(app, test_project):
"""Test deleting a new note.
Should:
- Create entity with correct type and content
- Return valid permalink
- Delete the note
"""
result = await write_note.fn(
project=test_project.name,
title="Test Note",
folder="test",
content="# Test\nThis is a test note",
tags=["test", "documentation"],
)
assert result
assert f"project: {test_project.name}" in result
deleted = await delete_note.fn("test/test-note", project=test_project.name)
assert deleted is True
@pytest.mark.asyncio
async def test_delete_note_doesnt_exist(app, test_project):
"""Test deleting a new note.
Should:
- Delete the note
- verify returns false
"""
deleted = await delete_note.fn("doesnt-exist", project=test_project.name)
assert deleted is False
@pytest.mark.asyncio
async def test_write_note_with_tag_array_from_bug_report(app, test_project):
"""Test creating a note with a tag array as reported in issue #38.
This reproduces the exact payload from the bug report where Cursor
was passing an array of tags and getting a type mismatch error.
"""
# This is the exact payload from the bug report
bug_payload = {
"project": test_project.name,
"title": "Title",
"folder": "folder",
"content": "CONTENT",
"tags": ["hipporag", "search", "fallback", "symfony", "error-handling"],
}
# Try to call the function with this data directly
result = await write_note.fn(**bug_payload)
assert result
assert f"project: {test_project.name}" in result
assert "permalink: folder/title" in result
assert "Tags" in result
assert "hipporag" in result
assert f"[Session: Using project '{test_project.name}']" in result
@pytest.mark.asyncio
async def test_write_note_verbose(app, test_project):
"""Test creating a new note.
Should:
- Create entity with correct type and content
- Save markdown content
- Handle tags correctly
- Return valid permalink
"""
result = await write_note.fn(
project=test_project.name,
title="Test Note",
folder="test",
content="""
# Test\nThis is a test note
- [note] First observation
- relates to [[Knowledge]]
""",
tags=["test", "documentation"],
)
assert "# Created note" in result
assert f"project: {test_project.name}" in result
assert "file_path: test/Test Note.md" in result
assert "permalink: test/test-note" in result
assert "## Observations" in result
assert "- note: 1" in result
assert "## Relations" in result
assert "## Tags" in result
assert "- test, documentation" in result
assert f"[Session: Using project '{test_project.name}']" in result
@pytest.mark.asyncio
async def test_write_note_preserves_custom_metadata(app, project_config, test_project):
"""Test that updating a note preserves custom metadata fields.
Reproduces issue #36 where custom frontmatter fields like Status
were being lost when updating notes with the write_note tool.
Should:
- Create a note with custom frontmatter
- Update the note with new content
- Verify custom frontmatter is preserved
"""
# First, create a note with custom metadata using write_note
await write_note.fn(
project=test_project.name,
title="Custom Metadata Note",
folder="test",
content="# Initial content",
tags=["test"],
)
# Read the note to get its permalink
content = await read_note.fn("test/custom-metadata-note", project=test_project.name)
# Now directly update the file with custom frontmatter
# We need to use a direct file update to add custom frontmatter
import frontmatter
file_path = project_config.home / "test" / "Custom Metadata Note.md"
post = frontmatter.load(file_path)
# Add custom frontmatter
post["Status"] = "In Progress"
post["Priority"] = "High"
post["Version"] = "1.0"
# Write the file back
with open(file_path, "w") as f:
f.write(frontmatter.dumps(post))
# Now update the note using write_note
result = await write_note.fn(
project=test_project.name,
title="Custom Metadata Note",
folder="test",
content="# Updated content",
tags=["test", "updated"],
)
# Verify the update was successful
assert (
"Updated note\nproject: test-project\nfile_path: test/Custom Metadata Note.md"
) in result
assert f"project: {test_project.name}" in result
# Read the note back and check if custom frontmatter is preserved
content = await read_note.fn("test/custom-metadata-note", project=test_project.name)
# Custom frontmatter should be preserved
assert "Status: In Progress" in content
assert "Priority: High" in content
# Version might be quoted as '1.0' due to YAML serialization
assert "Version:" in content # Just check that the field exists
assert "1.0" in content # And that the value exists somewhere
# And new content should be there
assert "# Updated content" in content
# And tags should be updated (without # prefix)
assert "- test" in content
assert "- updated" in content
@pytest.mark.asyncio
async def test_write_note_preserves_content_frontmatter(app, test_project):
"""Test creating a new note."""
await write_note.fn(
project=test_project.name,
title="Test Note",
folder="test",
content=dedent(
"""
---
title: Test Note
type: note
version: 1.0
author: name
---
# Test
This is a test note
"""
),
tags=["test", "documentation"],
)
# Try reading it back via permalink
content = await read_note.fn("test/test-note", project=test_project.name)
assert (
normalize_newlines(
dedent(
"""
---
title: Test Note
type: note
permalink: test/test-note
version: 1.0
author: name
tags:
- test
- documentation
---
# Test
This is a test note
"""
).strip()
)
in content
)
@pytest.mark.asyncio
async def test_write_note_permalink_collision_fix_issue_139(app, test_project):
"""Test fix for GitHub Issue #139: UNIQUE constraint failed: entity.permalink.
This reproduces the exact scenario described in the issue:
1. Create a note with title "Note 1"
2. Create another note with title "Note 2"
3. Try to create/replace first note again with same title "Note 1"
Before the fix, step 3 would fail with UNIQUE constraint error.
After the fix, it should either update the existing note or create with unique permalink.
"""
# Step 1: Create first note
result1 = await write_note.fn(
project=test_project.name,
title="Note 1",
folder="test",
content="Original content for note 1",
)
assert "# Created note" in result1
assert f"project: {test_project.name}" in result1
assert "permalink: test/note-1" in result1
# Step 2: Create second note with different title
result2 = await write_note.fn(
project=test_project.name, title="Note 2", folder="test", content="Content for note 2"
)
assert "# Created note" in result2
assert f"project: {test_project.name}" in result2
assert "permalink: test/note-2" in result2
# Step 3: Try to create/replace first note again
# This scenario would trigger the UNIQUE constraint failure before the fix
result3 = await write_note.fn(
project=test_project.name,
title="Note 1", # Same title as first note
folder="test", # Same folder as first note
content="Replacement content for note 1", # Different content
)
# This should not raise a UNIQUE constraint failure error
# It should succeed and either:
# 1. Update the existing note (preferred behavior)
# 2. Create a new note with unique permalink (fallback behavior)
assert result3 is not None
assert f"project: {test_project.name}" in result3
assert "Updated note" in result3 or "Created note" in result3
# The result should contain either the original permalink or a unique one
assert "permalink: test/note-1" in result3 or "permalink: test/note-1-1" in result3
# Verify we can read back the content
if "permalink: test/note-1" in result3:
# Updated existing note case
content = await read_note.fn("test/note-1", project=test_project.name)
assert "Replacement content for note 1" in content
else:
# Created new note with unique permalink case
content = await read_note.fn(test_project.name, "test/note-1-1")
assert "Replacement content for note 1" in content
# Original note should still exist
original_content = await read_note.fn(test_project.name, "test/note-1")
assert "Original content for note 1" in original_content
@pytest.mark.asyncio
async def test_write_note_with_custom_entity_type(app, test_project):
"""Test creating a note with custom entity_type parameter.
This test verifies the fix for Issue #144 where entity_type parameter
was hardcoded to "note" instead of allowing custom types.
"""
result = await write_note.fn(
project=test_project.name,
title="Test Guide",
folder="guides",
content="# Guide Content\nThis is a guide",
tags=["guide", "documentation"],
note_type="guide",
)
assert result
assert "# Created note" in result
assert f"project: {test_project.name}" in result
assert "file_path: guides/Test Guide.md" in result
assert "permalink: guides/test-guide" in result
assert "## Tags" in result
assert "- guide, documentation" in result
assert f"[Session: Using project '{test_project.name}']" in result
# Verify the entity type is correctly set in the frontmatter
content = await read_note.fn("guides/test-guide", project=test_project.name)
assert (
normalize_newlines(
dedent("""
---
title: Test Guide
type: guide
permalink: guides/test-guide
tags:
- guide
- documentation
---
# Guide Content
This is a guide
""").strip()
)
in content
)
@pytest.mark.asyncio
async def test_write_note_with_report_entity_type(app, test_project):
"""Test creating a note with note_type="report"."""
result = await write_note.fn(
project=test_project.name,
title="Monthly Report",
folder="reports",
content="# Monthly Report\nThis is a monthly report",
tags=["report", "monthly"],
note_type="report",
)
assert result
assert "# Created note" in result
assert f"project: {test_project.name}" in result
assert "file_path: reports/Monthly Report.md" in result
assert "permalink: reports/monthly-report" in result
assert f"[Session: Using project '{test_project.name}']" in result
# Verify the entity type is correctly set in the frontmatter
content = await read_note.fn("reports/monthly-report", project=test_project.name)
assert "type: report" in content
assert "# Monthly Report" in content
@pytest.mark.asyncio
async def test_write_note_with_config_entity_type(app, test_project):
"""Test creating a note with note_type="config"."""
result = await write_note.fn(
project=test_project.name,
title="System Config",
folder="config",
content="# System Configuration\nThis is a config file",
note_type="config",
)
assert result
assert "# Created note" in result
assert f"project: {test_project.name}" in result
assert "file_path: config/System Config.md" in result
assert "permalink: config/system-config" in result
assert f"[Session: Using project '{test_project.name}']" in result
# Verify the entity type is correctly set in the frontmatter
content = await read_note.fn("config/system-config", project=test_project.name)
assert "type: config" in content
assert "# System Configuration" in content
@pytest.mark.asyncio
async def test_write_note_entity_type_default_behavior(app, test_project):
"""Test that the entity_type parameter defaults to "note" when not specified.
This ensures backward compatibility - existing code that doesn't specify
entity_type should continue to work as before.
"""
result = await write_note.fn(
project=test_project.name,
title="Default Type Test",
folder="test",
content="# Default Type Test\nThis should be type 'note'",
tags=["test"],
)
assert result
assert "# Created note" in result
assert f"project: {test_project.name}" in result
assert "file_path: test/Default Type Test.md" in result
assert "permalink: test/default-type-test" in result
assert f"[Session: Using project '{test_project.name}']" in result
# Verify the entity type defaults to "note"
content = await read_note.fn("test/default-type-test", project=test_project.name)
assert "type: note" in content
assert "# Default Type Test" in content
@pytest.mark.asyncio
async def test_write_note_update_existing_with_different_entity_type(app, test_project):
"""Test updating an existing note with a different entity_type."""
# Create initial note as "note" type
result1 = await write_note.fn(
project=test_project.name,
title="Changeable Type",
folder="test",
content="# Initial Content\nThis starts as a note",
tags=["test"],
note_type="note",
)
assert result1
assert "# Created note" in result1
assert f"project: {test_project.name}" in result1
# Update the same note with a different note_type
result2 = await write_note.fn(
project=test_project.name,
title="Changeable Type",
folder="test",
content="# Updated Content\nThis is now a guide",
tags=["guide"],
note_type="guide",
)
assert result2
assert "# Updated note" in result2
assert f"project: {test_project.name}" in result2
# Verify the entity type was updated
content = await read_note.fn("test/changeable-type", project=test_project.name)
assert "type: guide" in content
assert "# Updated Content" in content
assert "- guide" in content
@pytest.mark.asyncio
async def test_write_note_respects_frontmatter_entity_type(app, test_project):
"""Test that entity_type in frontmatter is respected when parameter is not provided.
This verifies that when write_note is called without entity_type parameter,
but the content includes frontmatter with a 'type' field, that type is respected
instead of defaulting to 'note'.
"""
note = dedent("""
---
title: Test Guide
type: guide
permalink: guides/test-guide
tags:
- guide
- documentation
---
# Guide Content
This is a guide
""").strip()
# Call write_note without entity_type parameter - it should respect frontmatter type
result = await write_note.fn(
project=test_project.name, title="Test Guide", folder="guides", content=note
)
assert result
assert "# Created note" in result
assert f"project: {test_project.name}" in result
assert "file_path: guides/Test Guide.md" in result
assert "permalink: guides/test-guide" in result
assert f"[Session: Using project '{test_project.name}']" in result
# Verify the entity type from frontmatter is respected (should be "guide", not "note")
content = await read_note.fn("guides/test-guide", project=test_project.name)
assert "type: guide" in content
assert "# Guide Content" in content
assert "- guide" in content
assert "- documentation" in content
class TestWriteNoteSecurityValidation:
"""Test write_note security validation features."""
@pytest.mark.asyncio
async def test_write_note_blocks_path_traversal_unix(self, app, test_project):
"""Test that Unix-style path traversal attacks are blocked in folder parameter."""
# Test various Unix-style path traversal patterns
attack_folders = [
"../",
"../../",
"../../../",
"../secrets",
"../../etc",
"../../../etc/passwd_folder",
"notes/../../../etc",
"folder/../../outside",
"../../../../malicious",
]
for attack_folder in attack_folders:
result = await write_note.fn(
project=test_project.name,
title="Test Note",
folder=attack_folder,
content="# Test Content\nThis should be blocked by security validation.",
)
assert isinstance(result, str)
assert "# Error" in result
assert "paths must stay within project boundaries" in result
assert attack_folder in result
@pytest.mark.asyncio
async def test_write_note_blocks_path_traversal_windows(self, app, test_project):
"""Test that Windows-style path traversal attacks are blocked in folder parameter."""
# Test various Windows-style path traversal patterns
attack_folders = [
"..\\",
"..\\..\\",
"..\\..\\..\\",
"..\\secrets",
"..\\..\\Windows",
"..\\..\\..\\Windows\\System32",
"notes\\..\\..\\..\\Windows",
"\\\\server\\share",
"\\\\..\\..\\Windows",
]
for attack_folder in attack_folders:
result = await write_note.fn(
project=test_project.name,
title="Test Note",
folder=attack_folder,
content="# Test Content\nThis should be blocked by security validation.",
)
assert isinstance(result, str)
assert "# Error" in result
assert "paths must stay within project boundaries" in result
assert attack_folder in result
@pytest.mark.asyncio
async def test_write_note_blocks_absolute_paths(self, app, test_project):
"""Test that absolute paths are blocked in folder parameter."""
# Test various absolute path patterns
attack_folders = [
"/etc",
"/home/user",
"/var/log",
"/root",
"C:\\Windows",
"C:\\Users\\user",
"D:\\secrets",
"/tmp/malicious",
"/usr/local/evil",
]
for attack_folder in attack_folders:
result = await write_note.fn(
project=test_project.name,
title="Test Note",
folder=attack_folder,
content="# Test Content\nThis should be blocked by security validation.",
)
assert isinstance(result, str)
assert "# Error" in result
assert "paths must stay within project boundaries" in result
assert attack_folder in result
@pytest.mark.asyncio
async def test_write_note_blocks_home_directory_access(self, app, test_project):
"""Test that home directory access patterns are blocked in folder parameter."""
# Test various home directory access patterns
attack_folders = [
"~",
"~/",
"~/secrets",
"~/.ssh",
"~/Documents",
"~\\AppData",
"~\\Desktop",
"~/.env_folder",
]
for attack_folder in attack_folders:
result = await write_note.fn(
project=test_project.name,
title="Test Note",
folder=attack_folder,
content="# Test Content\nThis should be blocked by security validation.",
)
assert isinstance(result, str)
assert "# Error" in result
assert "paths must stay within project boundaries" in result
assert attack_folder in result
@pytest.mark.asyncio
async def test_write_note_blocks_mixed_attack_patterns(self, app, test_project):
"""Test that mixed legitimate/attack patterns are blocked in folder parameter."""
# Test mixed patterns that start legitimate but contain attacks
attack_folders = [
"notes/../../../etc",
"docs/../../.env_folder",
"legitimate/path/../../.ssh",
"project/folder/../../../Windows",
"valid/folder/../../home/user",
"assets/../../../tmp/evil",
]
for attack_folder in attack_folders:
result = await write_note.fn(
project=test_project.name,
title="Test Note",
folder=attack_folder,
content="# Test Content\nThis should be blocked by security validation.",
)
assert isinstance(result, str)
assert "# Error" in result
assert "paths must stay within project boundaries" in result
@pytest.mark.asyncio
async def test_write_note_allows_safe_folder_paths(self, app, test_project):
"""Test that legitimate folder paths are still allowed."""
# Test various safe folder patterns
safe_folders = [
"notes",
"docs",
"projects/2025",
"archive/old-notes",
"deep/nested/directory/structure",
"folder/subfolder",
"research/ml",
"meeting-notes",
]
for safe_folder in safe_folders:
result = await write_note.fn(
project=test_project.name,
title=f"Test Note in {safe_folder.replace('/', '-')}",
folder=safe_folder,
content="# Test Content\nThis should work normally with security validation.",
tags=["test", "security"],
)
# Should succeed (not a security error)
assert isinstance(result, str)
assert "# Error" not in result
assert "paths must stay within project boundaries" not in result
# Should be normal successful creation/update
assert ("# Created note" in result) or ("# Updated note" in result)
assert safe_folder in result # Should show in file_path
@pytest.mark.asyncio
async def test_write_note_empty_folder_security(self, app, test_project):
"""Test that empty folder parameter is handled securely."""
# Empty folder should be allowed (creates in root)
result = await write_note.fn(
project=test_project.name,
title="Root Note",
folder="",
content="# Root Note\nThis note should be created in the project root.",
)
assert isinstance(result, str)
# Empty folder should not trigger security error
assert "# Error" not in result
assert "paths must stay within project boundaries" not in result
# Should succeed normally
assert ("# Created note" in result) or ("# Updated note" in result)
@pytest.mark.asyncio
async def test_write_note_none_folder_security(self, app, test_project):
"""Test that default folder behavior works securely when folder is omitted."""
# The write_note function requires folder parameter, but we can test with empty string
# which effectively creates in project root
result = await write_note.fn(
project=test_project.name,
title="Root Folder Note",
folder="", # Empty string instead of None since folder is required
content="# Root Folder Note\nThis note should be created in the project root.",
)
assert isinstance(result, str)
# Empty folder should not trigger security error
assert "# Error" not in result
assert "paths must stay within project boundaries" not in result
# Should succeed normally
assert ("# Created note" in result) or ("# Updated note" in result)
@pytest.mark.asyncio
async def test_write_note_current_directory_references_security(self, app, test_project):
"""Test that current directory references are handled securely."""
# Test current directory references (should be safe)
safe_folders = [
"./notes",
"folder/./subfolder",
"./folder/subfolder",
]
for safe_folder in safe_folders:
result = await write_note.fn(
project=test_project.name,
title=f"Current Dir Test {safe_folder.replace('/', '-').replace('.', 'dot')}",
folder=safe_folder,
content="# Current Directory Test\nThis should work with current directory references.",
)
assert isinstance(result, str)
# Should NOT contain security error message
assert "# Error" not in result
assert "paths must stay within project boundaries" not in result
# Should succeed normally
assert ("# Created note" in result) or ("# Updated note" in result)
@pytest.mark.asyncio
async def test_write_note_security_with_all_parameters(self, app, test_project):
"""Test security validation works with all write_note parameters."""
# Test that security validation is applied even when all other parameters are provided
result = await write_note.fn(
project=test_project.name,
title="Security Test with All Params",
folder="../../../etc/malicious",
content="# Malicious Content\nThis should be blocked by security validation.",
tags=["malicious", "test"],
note_type="guide",
)
assert isinstance(result, str)
assert "# Error" in result
assert "paths must stay within project boundaries" in result
assert "../../../etc/malicious" in result
@pytest.mark.asyncio
async def test_write_note_security_logging(self, app, test_project, caplog):
"""Test that security violations are properly logged."""
# Attempt path traversal attack
result = await write_note.fn(
project=test_project.name,
title="Security Logging Test",
folder="../../../etc/passwd_folder",
content="# Test Content\nThis should trigger security logging.",
)
assert "# Error" in result
assert "paths must stay within project boundaries" in result
# Check that security violation was logged
# Note: This test may need adjustment based on the actual logging setup
# The security validation should generate a warning log entry
@pytest.mark.asyncio
async def test_write_note_preserves_functionality_with_security(self, app, test_project):
"""Test that security validation doesn't break normal note creation functionality."""
# Create a note with all features to ensure security validation doesn't interfere
result = await write_note.fn(
project=test_project.name,
title="Full Feature Security Test",
folder="security-tests",
content=dedent("""
# Full Feature Security Test
This note tests that security validation doesn't break normal functionality.
## Observations
- [security] Path validation working correctly #security
- [feature] All features still functional #test
## Relations
- relates_to [[Security Implementation]]
- depends_on [[Path Validation]]
Additional content with various formatting.
""").strip(),
tags=["security", "test", "full-feature"],
note_type="guide",
)
# Should succeed normally
assert isinstance(result, str)
assert "# Error" not in result
assert "paths must stay within project boundaries" not in result
assert "# Created note" in result
assert "file_path: security-tests/Full Feature Security Test.md" in result
assert "permalink: security-tests/full-feature-security-test" in result
# Should process observations and relations
assert "## Observations" in result
assert "## Relations" in result
assert "## Tags" in result
# Should show proper counts
assert "security: 1" in result
assert "feature: 1" in result
class TestWriteNoteSecurityEdgeCases:
"""Test edge cases for write_note security validation."""
@pytest.mark.asyncio
async def test_write_note_unicode_folder_attacks(self, app, test_project):
"""Test that Unicode-based path traversal attempts are blocked."""
# Test Unicode path traversal attempts
unicode_attack_folders = [
"notes/文档/../../../etc", # Chinese characters
"docs/café/../../secrets", # Accented characters
"files/αβγ/../../../malicious", # Greek characters
]
for attack_folder in unicode_attack_folders:
result = await write_note.fn(
project=test_project.name,
title="Unicode Attack Test",
folder=attack_folder,
content="# Unicode Attack\nThis should be blocked.",
)
assert isinstance(result, str)
assert "# Error" in result
assert "paths must stay within project boundaries" in result
@pytest.mark.asyncio
async def test_write_note_very_long_attack_folder(self, app, test_project):
"""Test handling of very long attack folder paths."""
# Create a very long path traversal attack
long_attack_folder = "../" * 1000 + "etc/malicious"
result = await write_note.fn(
project=test_project.name,
title="Long Attack Test",
folder=long_attack_folder,
content="# Long Attack\nThis should be blocked.",
)
assert isinstance(result, str)
assert "# Error" in result
assert "paths must stay within project boundaries" in result
@pytest.mark.asyncio
async def test_write_note_case_variations_attacks(self, app, test_project):
"""Test that case variations don't bypass security."""
# Test case variations (though case sensitivity depends on filesystem)
case_attack_folders = [
"../ETC",
"../Etc/SECRETS",
"..\\WINDOWS",
"~/SECRETS",
]
for attack_folder in case_attack_folders:
result = await write_note.fn(
project=test_project.name,
title="Case Variation Attack Test",
folder=attack_folder,
content="# Case Attack\nThis should be blocked.",
)
assert isinstance(result, str)
assert "# Error" in result
assert "paths must stay within project boundaries" in result
@pytest.mark.asyncio
async def test_write_note_whitespace_in_attack_folders(self, app, test_project):
"""Test that whitespace doesn't help bypass security."""
# Test attack folders with various whitespace
whitespace_attack_folders = [
" ../../../etc ",
"\t../../../secrets\t",
" ..\\..\\Windows ",
"notes/ ../../ malicious",
]
for attack_folder in whitespace_attack_folders:
result = await write_note.fn(
project=test_project.name,
title="Whitespace Attack Test",
folder=attack_folder,
content="# Whitespace Attack\nThis should be blocked.",
)
assert isinstance(result, str)
# The attack should still be blocked even with whitespace
if ".." in attack_folder.strip() or "~" in attack_folder.strip():
assert "# Error" in result
assert "paths must stay within project boundaries" in result
```