This is page 17 of 19. Use http://codebase.md/basicmachines-co/basic-memory?page={x} to view the full context.
# Directory Structure
```
├── .claude
│ ├── commands
│ │ ├── release
│ │ │ ├── beta.md
│ │ │ ├── changelog.md
│ │ │ ├── release-check.md
│ │ │ └── release.md
│ │ ├── spec.md
│ │ └── test-live.md
│ └── settings.json
├── .dockerignore
├── .env.example
├── .github
│ ├── dependabot.yml
│ ├── ISSUE_TEMPLATE
│ │ ├── bug_report.md
│ │ ├── config.yml
│ │ ├── documentation.md
│ │ └── feature_request.md
│ └── workflows
│ ├── claude-code-review.yml
│ ├── claude-issue-triage.yml
│ ├── claude.yml
│ ├── dev-release.yml
│ ├── docker.yml
│ ├── pr-title.yml
│ ├── release.yml
│ └── test.yml
├── .gitignore
├── .python-version
├── CHANGELOG.md
├── CITATION.cff
├── CLA.md
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── docker-compose-postgres.yml
├── docker-compose.yml
├── Dockerfile
├── docs
│ ├── ai-assistant-guide-extended.md
│ ├── ARCHITECTURE.md
│ ├── character-handling.md
│ ├── cloud-cli.md
│ ├── Docker.md
│ └── testing-coverage.md
├── justfile
├── LICENSE
├── llms-install.md
├── pyproject.toml
├── README.md
├── SECURITY.md
├── smithery.yaml
├── specs
│ ├── SPEC-1 Specification-Driven Development Process.md
│ ├── SPEC-10 Unified Deployment Workflow and Event Tracking.md
│ ├── SPEC-11 Basic Memory API Performance Optimization.md
│ ├── SPEC-12 OpenTelemetry Observability.md
│ ├── SPEC-13 CLI Authentication with Subscription Validation.md
│ ├── SPEC-14 Cloud Git Versioning & GitHub Backup.md
│ ├── SPEC-14- Cloud Git Versioning & GitHub Backup.md
│ ├── SPEC-15 Configuration Persistence via Tigris for Cloud Tenants.md
│ ├── SPEC-16 MCP Cloud Service Consolidation.md
│ ├── SPEC-17 Semantic Search with ChromaDB.md
│ ├── SPEC-18 AI Memory Management Tool.md
│ ├── SPEC-19 Sync Performance and Memory Optimization.md
│ ├── SPEC-2 Slash Commands Reference.md
│ ├── SPEC-20 Simplified Project-Scoped Rclone Sync.md
│ ├── SPEC-3 Agent Definitions.md
│ ├── SPEC-4 Notes Web UI Component Architecture.md
│ ├── SPEC-5 CLI Cloud Upload via WebDAV.md
│ ├── SPEC-6 Explicit Project Parameter Architecture.md
│ ├── SPEC-7 POC to spike Tigris Turso for local access to cloud data.md
│ ├── SPEC-8 TigrisFS Integration.md
│ ├── SPEC-9 Multi-Project Bidirectional Sync Architecture.md
│ ├── SPEC-9 Signed Header Tenant Information.md
│ └── SPEC-9-1 Follow-Ups- Conflict, Sync, and Observability.md
├── src
│ └── basic_memory
│ ├── __init__.py
│ ├── alembic
│ │ ├── alembic.ini
│ │ ├── env.py
│ │ ├── migrations.py
│ │ ├── script.py.mako
│ │ └── versions
│ │ ├── 314f1ea54dc4_add_postgres_full_text_search_support_.py
│ │ ├── 3dae7c7b1564_initial_schema.py
│ │ ├── 502b60eaa905_remove_required_from_entity_permalink.py
│ │ ├── 5fe1ab1ccebe_add_projects_table.py
│ │ ├── 647e7a75e2cd_project_constraint_fix.py
│ │ ├── 6830751f5fb6_merge_multiple_heads.py
│ │ ├── 9d9c1cb7d8f5_add_mtime_and_size_columns_to_entity_.py
│ │ ├── a1b2c3d4e5f6_fix_project_foreign_keys.py
│ │ ├── a2b3c4d5e6f7_add_search_index_entity_cascade.py
│ │ ├── b3c3938bacdb_relation_to_name_unique_index.py
│ │ ├── cc7172b46608_update_search_index_schema.py
│ │ ├── e7e1f4367280_add_scan_watermark_tracking_to_project.py
│ │ ├── f8a9b2c3d4e5_add_pg_trgm_for_fuzzy_link_resolution.py
│ │ └── g9a0b3c4d5e6_add_external_id_to_project_and_entity.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── app.py
│ │ ├── container.py
│ │ ├── routers
│ │ │ ├── __init__.py
│ │ │ ├── directory_router.py
│ │ │ ├── importer_router.py
│ │ │ ├── knowledge_router.py
│ │ │ ├── management_router.py
│ │ │ ├── memory_router.py
│ │ │ ├── project_router.py
│ │ │ ├── prompt_router.py
│ │ │ ├── resource_router.py
│ │ │ ├── search_router.py
│ │ │ └── utils.py
│ │ ├── template_loader.py
│ │ └── v2
│ │ ├── __init__.py
│ │ └── routers
│ │ ├── __init__.py
│ │ ├── directory_router.py
│ │ ├── importer_router.py
│ │ ├── knowledge_router.py
│ │ ├── memory_router.py
│ │ ├── project_router.py
│ │ ├── prompt_router.py
│ │ ├── resource_router.py
│ │ └── search_router.py
│ ├── cli
│ │ ├── __init__.py
│ │ ├── app.py
│ │ ├── auth.py
│ │ ├── commands
│ │ │ ├── __init__.py
│ │ │ ├── cloud
│ │ │ │ ├── __init__.py
│ │ │ │ ├── api_client.py
│ │ │ │ ├── bisync_commands.py
│ │ │ │ ├── cloud_utils.py
│ │ │ │ ├── core_commands.py
│ │ │ │ ├── rclone_commands.py
│ │ │ │ ├── rclone_config.py
│ │ │ │ ├── rclone_installer.py
│ │ │ │ ├── upload_command.py
│ │ │ │ └── upload.py
│ │ │ ├── command_utils.py
│ │ │ ├── db.py
│ │ │ ├── format.py
│ │ │ ├── import_chatgpt.py
│ │ │ ├── import_claude_conversations.py
│ │ │ ├── import_claude_projects.py
│ │ │ ├── import_memory_json.py
│ │ │ ├── mcp.py
│ │ │ ├── project.py
│ │ │ ├── status.py
│ │ │ ├── telemetry.py
│ │ │ └── tool.py
│ │ ├── container.py
│ │ └── main.py
│ ├── config.py
│ ├── db.py
│ ├── deps
│ │ ├── __init__.py
│ │ ├── config.py
│ │ ├── db.py
│ │ ├── importers.py
│ │ ├── projects.py
│ │ ├── repositories.py
│ │ └── services.py
│ ├── deps.py
│ ├── file_utils.py
│ ├── ignore_utils.py
│ ├── importers
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── chatgpt_importer.py
│ │ ├── claude_conversations_importer.py
│ │ ├── claude_projects_importer.py
│ │ ├── memory_json_importer.py
│ │ └── utils.py
│ ├── markdown
│ │ ├── __init__.py
│ │ ├── entity_parser.py
│ │ ├── markdown_processor.py
│ │ ├── plugins.py
│ │ ├── schemas.py
│ │ └── utils.py
│ ├── mcp
│ │ ├── __init__.py
│ │ ├── async_client.py
│ │ ├── clients
│ │ │ ├── __init__.py
│ │ │ ├── directory.py
│ │ │ ├── knowledge.py
│ │ │ ├── memory.py
│ │ │ ├── project.py
│ │ │ ├── resource.py
│ │ │ └── search.py
│ │ ├── container.py
│ │ ├── project_context.py
│ │ ├── prompts
│ │ │ ├── __init__.py
│ │ │ ├── ai_assistant_guide.py
│ │ │ ├── continue_conversation.py
│ │ │ ├── recent_activity.py
│ │ │ ├── search.py
│ │ │ └── utils.py
│ │ ├── resources
│ │ │ ├── ai_assistant_guide.md
│ │ │ └── project_info.py
│ │ ├── server.py
│ │ └── tools
│ │ ├── __init__.py
│ │ ├── build_context.py
│ │ ├── canvas.py
│ │ ├── chatgpt_tools.py
│ │ ├── delete_note.py
│ │ ├── edit_note.py
│ │ ├── list_directory.py
│ │ ├── move_note.py
│ │ ├── project_management.py
│ │ ├── read_content.py
│ │ ├── read_note.py
│ │ ├── recent_activity.py
│ │ ├── search.py
│ │ ├── utils.py
│ │ ├── view_note.py
│ │ └── write_note.py
│ ├── models
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── knowledge.py
│ │ ├── project.py
│ │ └── search.py
│ ├── project_resolver.py
│ ├── repository
│ │ ├── __init__.py
│ │ ├── entity_repository.py
│ │ ├── observation_repository.py
│ │ ├── postgres_search_repository.py
│ │ ├── project_info_repository.py
│ │ ├── project_repository.py
│ │ ├── relation_repository.py
│ │ ├── repository.py
│ │ ├── search_index_row.py
│ │ ├── search_repository_base.py
│ │ ├── search_repository.py
│ │ └── sqlite_search_repository.py
│ ├── runtime.py
│ ├── schemas
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── cloud.py
│ │ ├── delete.py
│ │ ├── directory.py
│ │ ├── importer.py
│ │ ├── memory.py
│ │ ├── project_info.py
│ │ ├── prompt.py
│ │ ├── request.py
│ │ ├── response.py
│ │ ├── search.py
│ │ ├── sync_report.py
│ │ └── v2
│ │ ├── __init__.py
│ │ ├── entity.py
│ │ └── resource.py
│ ├── services
│ │ ├── __init__.py
│ │ ├── context_service.py
│ │ ├── directory_service.py
│ │ ├── entity_service.py
│ │ ├── exceptions.py
│ │ ├── file_service.py
│ │ ├── initialization.py
│ │ ├── link_resolver.py
│ │ ├── project_service.py
│ │ ├── search_service.py
│ │ └── service.py
│ ├── sync
│ │ ├── __init__.py
│ │ ├── background_sync.py
│ │ ├── coordinator.py
│ │ ├── sync_service.py
│ │ └── watch_service.py
│ ├── telemetry.py
│ ├── templates
│ │ └── prompts
│ │ ├── continue_conversation.hbs
│ │ └── search.hbs
│ └── utils.py
├── test-int
│ ├── BENCHMARKS.md
│ ├── cli
│ │ ├── test_project_commands_integration.py
│ │ └── test_version_integration.py
│ ├── conftest.py
│ ├── mcp
│ │ ├── test_build_context_underscore.py
│ │ ├── test_build_context_validation.py
│ │ ├── test_chatgpt_tools_integration.py
│ │ ├── test_default_project_mode_integration.py
│ │ ├── test_delete_note_integration.py
│ │ ├── test_edit_note_integration.py
│ │ ├── test_lifespan_shutdown_sync_task_cancellation_integration.py
│ │ ├── test_list_directory_integration.py
│ │ ├── test_move_note_integration.py
│ │ ├── test_project_management_integration.py
│ │ ├── test_project_state_sync_integration.py
│ │ ├── test_read_content_integration.py
│ │ ├── test_read_note_integration.py
│ │ ├── test_search_integration.py
│ │ ├── test_single_project_mcp_integration.py
│ │ └── test_write_note_integration.py
│ ├── test_db_wal_mode.py
│ └── test_disable_permalinks_integration.py
├── tests
│ ├── __init__.py
│ ├── api
│ │ ├── conftest.py
│ │ ├── test_api_container.py
│ │ ├── test_async_client.py
│ │ ├── test_continue_conversation_template.py
│ │ ├── test_directory_router.py
│ │ ├── test_importer_router.py
│ │ ├── test_knowledge_router.py
│ │ ├── test_management_router.py
│ │ ├── test_memory_router.py
│ │ ├── test_project_router_operations.py
│ │ ├── test_project_router.py
│ │ ├── test_prompt_router.py
│ │ ├── test_relation_background_resolution.py
│ │ ├── test_resource_router.py
│ │ ├── test_search_router.py
│ │ ├── test_search_template.py
│ │ ├── test_template_loader_helpers.py
│ │ ├── test_template_loader.py
│ │ └── v2
│ │ ├── __init__.py
│ │ ├── conftest.py
│ │ ├── test_directory_router.py
│ │ ├── test_importer_router.py
│ │ ├── test_knowledge_router.py
│ │ ├── test_memory_router.py
│ │ ├── test_project_router.py
│ │ ├── test_prompt_router.py
│ │ ├── test_resource_router.py
│ │ └── test_search_router.py
│ ├── cli
│ │ ├── cloud
│ │ │ ├── test_cloud_api_client_and_utils.py
│ │ │ ├── test_rclone_config_and_bmignore_filters.py
│ │ │ └── test_upload_path.py
│ │ ├── conftest.py
│ │ ├── test_auth_cli_auth.py
│ │ ├── test_cli_container.py
│ │ ├── test_cli_exit.py
│ │ ├── test_cli_tool_exit.py
│ │ ├── test_cli_tools.py
│ │ ├── test_cloud_authentication.py
│ │ ├── test_ignore_utils.py
│ │ ├── test_import_chatgpt.py
│ │ ├── test_import_claude_conversations.py
│ │ ├── test_import_claude_projects.py
│ │ ├── test_import_memory_json.py
│ │ ├── test_project_add_with_local_path.py
│ │ └── test_upload.py
│ ├── conftest.py
│ ├── db
│ │ └── test_issue_254_foreign_key_constraints.py
│ ├── importers
│ │ ├── test_conversation_indexing.py
│ │ ├── test_importer_base.py
│ │ └── test_importer_utils.py
│ ├── markdown
│ │ ├── __init__.py
│ │ ├── test_date_frontmatter_parsing.py
│ │ ├── test_entity_parser_error_handling.py
│ │ ├── test_entity_parser.py
│ │ ├── test_markdown_plugins.py
│ │ ├── test_markdown_processor.py
│ │ ├── test_observation_edge_cases.py
│ │ ├── test_parser_edge_cases.py
│ │ ├── test_relation_edge_cases.py
│ │ └── test_task_detection.py
│ ├── mcp
│ │ ├── clients
│ │ │ ├── __init__.py
│ │ │ └── test_clients.py
│ │ ├── conftest.py
│ │ ├── test_async_client_modes.py
│ │ ├── test_mcp_container.py
│ │ ├── test_obsidian_yaml_formatting.py
│ │ ├── test_permalink_collision_file_overwrite.py
│ │ ├── test_project_context.py
│ │ ├── test_prompts.py
│ │ ├── test_recent_activity_prompt_modes.py
│ │ ├── test_resources.py
│ │ ├── test_server_lifespan_branches.py
│ │ ├── test_tool_build_context.py
│ │ ├── test_tool_canvas.py
│ │ ├── test_tool_delete_note.py
│ │ ├── test_tool_edit_note.py
│ │ ├── test_tool_list_directory.py
│ │ ├── test_tool_move_note.py
│ │ ├── test_tool_project_management.py
│ │ ├── test_tool_read_content.py
│ │ ├── test_tool_read_note.py
│ │ ├── test_tool_recent_activity.py
│ │ ├── test_tool_resource.py
│ │ ├── test_tool_search.py
│ │ ├── test_tool_utils.py
│ │ ├── test_tool_view_note.py
│ │ ├── test_tool_write_note_kebab_filenames.py
│ │ ├── test_tool_write_note.py
│ │ └── tools
│ │ └── test_chatgpt_tools.py
│ ├── Non-MarkdownFileSupport.pdf
│ ├── README.md
│ ├── repository
│ │ ├── test_entity_repository_upsert.py
│ │ ├── test_entity_repository.py
│ │ ├── test_entity_upsert_issue_187.py
│ │ ├── test_observation_repository.py
│ │ ├── test_postgres_search_repository.py
│ │ ├── test_project_info_repository.py
│ │ ├── test_project_repository.py
│ │ ├── test_relation_repository.py
│ │ ├── test_repository.py
│ │ ├── test_search_repository_edit_bug_fix.py
│ │ └── test_search_repository.py
│ ├── schemas
│ │ ├── test_base_timeframe_minimum.py
│ │ ├── test_memory_serialization.py
│ │ ├── test_memory_url_validation.py
│ │ ├── test_memory_url.py
│ │ ├── test_relation_response_reference_resolution.py
│ │ ├── test_schemas.py
│ │ └── test_search.py
│ ├── Screenshot.png
│ ├── services
│ │ ├── test_context_service.py
│ │ ├── test_directory_service.py
│ │ ├── test_entity_service_disable_permalinks.py
│ │ ├── test_entity_service.py
│ │ ├── test_file_service.py
│ │ ├── test_initialization_cloud_mode_branches.py
│ │ ├── test_initialization.py
│ │ ├── test_link_resolver.py
│ │ ├── test_project_removal_bug.py
│ │ ├── test_project_service_operations.py
│ │ ├── test_project_service.py
│ │ └── test_search_service.py
│ ├── sync
│ │ ├── test_character_conflicts.py
│ │ ├── test_coordinator.py
│ │ ├── test_sync_service_incremental.py
│ │ ├── test_sync_service.py
│ │ ├── test_sync_wikilink_issue.py
│ │ ├── test_tmp_files.py
│ │ ├── test_watch_service_atomic_adds.py
│ │ ├── test_watch_service_edge_cases.py
│ │ ├── test_watch_service_reload.py
│ │ └── test_watch_service.py
│ ├── test_config.py
│ ├── test_deps.py
│ ├── test_production_cascade_delete.py
│ ├── test_project_resolver.py
│ ├── test_rclone_commands.py
│ ├── test_runtime.py
│ ├── test_telemetry.py
│ └── utils
│ ├── test_file_utils.py
│ ├── test_frontmatter_obsidian_compatible.py
│ ├── test_parse_tags.py
│ ├── test_permalink_formatting.py
│ ├── test_timezone_utils.py
│ ├── test_utf8_handling.py
│ └── test_validate_project_path.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/specs/SPEC-17 Semantic Search with ChromaDB.md:
--------------------------------------------------------------------------------
```markdown
---
title: 'SPEC-17: Semantic Search with ChromaDB'
type: spec
permalink: specs/spec-17-semantic-search-chromadb
tags:
- search
- chromadb
- semantic-search
- vector-database
- postgres-migration
---
# SPEC-17: Semantic Search with ChromaDB
Why ChromaDB for Knowledge Management
Your users aren't just searching for keywords - they're trying to:
- "Find notes related to this concept"
- "Show me similar ideas"
- "What else did I write about this topic?"
Example:
# User searches: "AI ethics"
# FTS5/MeiliSearch finds:
- "AI ethics guidelines" ✅
- "ethical AI development" ✅
- "artificial intelligence" ❌ No keyword match
# ChromaDB finds:
- "AI ethics guidelines" ✅
- "ethical AI development" ✅
- "artificial intelligence" ✅ Semantic match!
- "bias in ML models" ✅ Related concept
- "responsible technology" ✅ Similar theme
- "neural network fairness" ✅ Connected idea
ChromaDB vs MeiliSearch vs Typesense
| Feature | ChromaDB | MeiliSearch | Typesense |
|------------------|--------------------|--------------------|--------------------|
| Semantic Search | ✅ Excellent | ❌ No | ❌ No |
| Keyword Search | ⚠️ Via metadata | ✅ Excellent | ✅ Excellent |
| Local Deployment | ✅ Embedded mode | ⚠️ Server required | ⚠️ Server required |
| No Server Needed | ✅ YES! | ❌ No | ❌ No |
| Embedding Cost | ~$0.13/1M tokens | None | None |
| Search Speed | 50-200ms | 10-50ms | 10-50ms |
| Best For | Semantic discovery | Exact terms | Exact terms |
The Killer Feature: Embedded Mode
ChromaDB has an embedded client that runs in-process - NO SERVER NEEDED!
# Local (FOSS) - ChromaDB embedded in Python process
import chromadb
client = chromadb.PersistentClient(path="/path/to/chroma_data")
collection = client.get_or_create_collection("knowledge_base")
# Add documents
collection.add(
ids=["note1", "note2"],
documents=["AI ethics", "Neural networks"],
metadatas=[{"type": "note"}, {"type": "spec"}]
)
# Search - NO API calls, runs locally!
results = collection.query(
query_texts=["machine learning"],
n_results=10
)
## Why
### Current Problem: Database Persistence in Cloud
In cloud deployments, `memory.db` (SQLite) doesn't persist across Docker container restarts. This means:
- Database must be rebuilt on every container restart
- Initial sync takes ~49 seconds for 500 files (after optimization in #352)
- Users experience delays on each deployment
### Search Architecture Issues
Current SQLite FTS5 implementation creates a **dual-implementation problem** for PostgreSQL migration:
- FTS5 (SQLite) uses `VIRTUAL TABLE` with `MATCH` queries
- PostgreSQL full-text search uses `TSVECTOR` with `@@` operator
- These are fundamentally incompatible architectures
- Would require **2x search code** and **2x tests** to support both
**Example of incompatibility:**
```python
# SQLite FTS5
"content_stems MATCH :text"
# PostgreSQL
"content_vector @@ plainto_tsquery(:text)"
```
### Search Quality Limitations
Current keyword-based FTS5 has limitations:
- No semantic understanding (search "AI" doesn't find "machine learning")
- No word relationships (search "neural networks" doesn't find "deep learning")
- Limited typo tolerance
- No relevance ranking beyond keyword matching
### Strategic Goal: PostgreSQL Migration
Moving to PostgreSQL (Neon) for cloud deployments would:
- ✅ Solve persistence issues (database survives restarts)
- ✅ Enable multi-tenant architecture
- ✅ Better performance for large datasets
- ✅ Support for cloud-native scaling
**But requires solving the search compatibility problem.**
## What
Migrate from SQLite FTS5 to **ChromaDB** for semantic vector search across all deployments.
**Key insight:** ChromaDB is **database-agnostic** - it works with both SQLite and PostgreSQL, eliminating the dual-implementation problem.
### Affected Areas
- Search implementation (`src/basic_memory/repository/search_repository.py`)
- Search service (`src/basic_memory/services/search_service.py`)
- Search models (`src/basic_memory/models/search.py`)
- Database initialization (`src/basic_memory/db.py`)
- MCP search tools (`src/basic_memory/mcp/tools/search.py`)
- Dependencies (`pyproject.toml` - add ChromaDB)
- Alembic migrations (FTS5 table removal)
- Documentation
### What Changes
**Removed:**
- SQLite FTS5 virtual table
- `MATCH` query syntax
- FTS5-specific tokenization and prefix handling
- ~300 lines of FTS5 query preparation code
**Added:**
- ChromaDB persistent client (embedded mode)
- Vector embedding generation
- Semantic similarity search
- Local embedding model (`sentence-transformers`)
- Collection management for multi-project support
### What Stays the Same
- Search API interface (MCP tools, REST endpoints)
- Entity/Observation/Relation indexing workflow
- Multi-project isolation
- Search filtering by type, date, metadata
- Pagination and result formatting
- **All SQL queries for exact lookups and metadata filtering**
## Hybrid Architecture: SQL + ChromaDB
**Critical Design Decision:** ChromaDB **complements** SQL, it doesn't **replace** it.
### Why Hybrid?
ChromaDB is excellent for semantic text search but terrible for exact lookups. SQL is perfect for exact lookups and structured queries. We use both:
```
┌─────────────────────────────────────────────────┐
│ Search Request │
└─────────────────────────────────────────────────┘
▼
┌────────────────────────┐
│ SearchRepository │
│ (Smart Router) │
└────────────────────────┘
▼ ▼
┌───────────┐ ┌──────────────┐
│ SQL │ │ ChromaDB │
│ Queries │ │ Semantic │
└───────────┘ └──────────────┘
▼ ▼
Exact lookups Text search
- Permalink - Semantic similarity
- Pattern match - Related concepts
- Title exact - Typo tolerance
- Metadata filter - Fuzzy matching
- Date ranges
```
### When to Use Each
#### Use SQL For (Fast & Exact)
**Exact Permalink Lookup:**
```python
# Find by exact permalink - SQL wins
"SELECT * FROM entities WHERE permalink = 'specs/search-feature'"
# ~1ms, perfect for exact matches
# ChromaDB would be: ~50ms, wasteful
```
**Pattern Matching:**
```python
# Find all specs - SQL wins
"SELECT * FROM entities WHERE permalink GLOB 'specs/*'"
# ~5ms, perfect for wildcards
# ChromaDB doesn't support glob patterns
```
**Pure Metadata Queries:**
```python
# Find all meetings tagged "important" - SQL wins
"SELECT * FROM entities
WHERE json_extract(entity_metadata, '$.entity_type') = 'meeting'
AND json_extract(entity_metadata, '$.tags') LIKE '%important%'"
# ~5ms, structured query
# No text search needed, SQL is faster and simpler
```
**Date Filtering:**
```python
# Find recent specs - SQL wins
"SELECT * FROM entities
WHERE entity_type = 'spec'
AND created_at > '2024-01-01'
ORDER BY created_at DESC"
# ~2ms, perfect for structured data
```
#### Use ChromaDB For (Semantic & Fuzzy)
**Semantic Content Search:**
```python
# Find notes about "neural networks" - ChromaDB wins
collection.query(query_texts=["neural networks"])
# Finds: "machine learning", "deep learning", "AI models"
# ~50-100ms, semantic understanding
# SQL FTS5 would only find exact keyword matches
```
**Text Search + Metadata:**
```python
# Find meeting notes about "project planning" tagged "important"
collection.query(
query_texts=["project planning"],
where={
"entity_type": "meeting",
"tags": {"$contains": "important"}
}
)
# ~100ms, semantic search with filters
# Finds: "roadmap discussion", "sprint planning", etc.
```
**Typo Tolerance:**
```python
# User types "serch feature" (typo) - ChromaDB wins
collection.query(query_texts=["serch feature"])
# Still finds: "search feature" documents
# ~50-100ms, fuzzy matching
# SQL would find nothing
```
### Performance Comparison
| Query Type | SQL | ChromaDB | Winner |
|-----------|-----|----------|--------|
| Exact permalink | 1-2ms | 50ms | ✅ SQL |
| Pattern match (specs/*) | 5-10ms | N/A | ✅ SQL |
| Pure metadata filter | 5ms | 50ms | ✅ SQL |
| Semantic text search | ❌ Can't | 50-100ms | ✅ ChromaDB |
| Text + metadata | ❌ Keywords only | 100ms | ✅ ChromaDB |
| Typo tolerance | ❌ Can't | 50ms | ✅ ChromaDB |
### Metadata/Frontmatter Handling
**Both systems support full frontmatter filtering!**
#### SQL Metadata Storage
```python
# Entities table stores frontmatter as JSON
CREATE TABLE entities (
id INTEGER PRIMARY KEY,
title TEXT,
permalink TEXT,
file_path TEXT,
entity_type TEXT,
entity_metadata JSON, -- All frontmatter here!
created_at DATETIME,
...
)
# Query frontmatter fields
SELECT * FROM entities
WHERE json_extract(entity_metadata, '$.entity_type') = 'meeting'
AND json_extract(entity_metadata, '$.tags') LIKE '%important%'
AND json_extract(entity_metadata, '$.status') = 'completed'
```
#### ChromaDB Metadata Storage
```python
# When indexing, store ALL frontmatter as metadata
class ChromaSearchBackend:
async def index_entity(self, entity: Entity):
"""Index with complete frontmatter metadata."""
# Extract ALL frontmatter fields
metadata = {
"entity_id": entity.id,
"project_id": entity.project_id,
"permalink": entity.permalink,
"file_path": entity.file_path,
"entity_type": entity.entity_type,
"type": "entity",
# ALL frontmatter tags
"tags": entity.entity_metadata.get("tags", []),
# Custom frontmatter fields
"status": entity.entity_metadata.get("status"),
"priority": entity.entity_metadata.get("priority"),
# Spread any other custom fields
**{k: v for k, v in entity.entity_metadata.items()
if k not in ["tags", "entity_type"]}
}
self.collection.upsert(
ids=[f"entity_{entity.id}_{entity.project_id}"],
documents=[self._format_document(entity)],
metadatas=[metadata] # Full frontmatter!
)
```
#### ChromaDB Metadata Queries
ChromaDB supports rich filtering:
```python
# Simple filter - single field
collection.query(
query_texts=["project planning"],
where={"entity_type": "meeting"}
)
# Multiple conditions (AND)
collection.query(
query_texts=["architecture decisions"],
where={
"entity_type": "spec",
"tags": {"$contains": "important"}
}
)
# Complex filters with operators
collection.query(
query_texts=["machine learning"],
where={
"$and": [
{"entity_type": {"$in": ["note", "spec"]}},
{"tags": {"$contains": "AI"}},
{"created_at": {"$gt": "2024-01-01"}},
{"status": "in-progress"}
]
}
)
# Multiple tags (all must match)
collection.query(
query_texts=["cloud architecture"],
where={
"$and": [
{"tags": {"$contains": "architecture"}},
{"tags": {"$contains": "cloud"}}
]
}
)
```
### Smart Routing Implementation
```python
class SearchRepository:
def __init__(
self,
session_maker: async_sessionmaker[AsyncSession],
project_id: int,
chroma_backend: ChromaSearchBackend
):
self.sql = session_maker # Keep SQL!
self.chroma = chroma_backend
self.project_id = project_id
async def search(
self,
search_text: Optional[str] = None,
permalink: Optional[str] = None,
permalink_match: Optional[str] = None,
title: Optional[str] = None,
types: Optional[List[str]] = None,
tags: Optional[List[str]] = None,
after_date: Optional[datetime] = None,
custom_metadata: Optional[dict] = None,
limit: int = 10,
offset: int = 0,
) -> List[SearchIndexRow]:
"""Smart routing between SQL and ChromaDB."""
# ==========================================
# Route 1: Exact Lookups → SQL (1-5ms)
# ==========================================
if permalink:
# Exact permalink: "specs/search-feature"
return await self._sql_permalink_lookup(permalink)
if permalink_match:
# Pattern match: "specs/*"
return await self._sql_pattern_match(permalink_match)
if title and not search_text:
# Exact title lookup (no semantic search needed)
return await self._sql_title_match(title)
# ==========================================
# Route 2: Pure Metadata → SQL (5-10ms)
# ==========================================
# No text search, just filtering by metadata
if not search_text and (types or tags or after_date or custom_metadata):
return await self._sql_metadata_filter(
types=types,
tags=tags,
after_date=after_date,
custom_metadata=custom_metadata,
limit=limit,
offset=offset
)
# ==========================================
# Route 3: Text Search → ChromaDB (50-100ms)
# ==========================================
if search_text:
# Build ChromaDB metadata filters
where_filters = self._build_chroma_filters(
types=types,
tags=tags,
after_date=after_date,
custom_metadata=custom_metadata
)
# Semantic search with metadata filtering
return await self.chroma.search(
query_text=search_text,
project_id=self.project_id,
where=where_filters,
limit=limit
)
# ==========================================
# Route 4: List All → SQL (2-5ms)
# ==========================================
return await self._sql_list_entities(
limit=limit,
offset=offset
)
def _build_chroma_filters(
self,
types: Optional[List[str]] = None,
tags: Optional[List[str]] = None,
after_date: Optional[datetime] = None,
custom_metadata: Optional[dict] = None
) -> dict:
"""Build ChromaDB where clause from filters."""
filters = {"project_id": self.project_id}
# Type filtering
if types:
if len(types) == 1:
filters["entity_type"] = types[0]
else:
filters["entity_type"] = {"$in": types}
# Tag filtering (array contains)
if tags:
if len(tags) == 1:
filters["tags"] = {"$contains": tags[0]}
else:
# Multiple tags - all must match
filters = {
"$and": [
filters,
*[{"tags": {"$contains": tag}} for tag in tags]
]
}
# Date filtering
if after_date:
filters["created_at"] = {"$gt": after_date.isoformat()}
# Custom frontmatter fields
if custom_metadata:
filters.update(custom_metadata)
return filters
async def _sql_metadata_filter(
self,
types: Optional[List[str]] = None,
tags: Optional[List[str]] = None,
after_date: Optional[datetime] = None,
custom_metadata: Optional[dict] = None,
limit: int = 10,
offset: int = 0
) -> List[SearchIndexRow]:
"""Pure metadata queries using SQL."""
conditions = ["project_id = :project_id"]
params = {"project_id": self.project_id}
if types:
type_list = ", ".join(f"'{t}'" for t in types)
conditions.append(f"entity_type IN ({type_list})")
if tags:
# Check each tag
for i, tag in enumerate(tags):
param_name = f"tag_{i}"
conditions.append(
f"json_extract(entity_metadata, '$.tags') LIKE :{param_name}"
)
params[param_name] = f"%{tag}%"
if after_date:
conditions.append("created_at > :after_date")
params["after_date"] = after_date
if custom_metadata:
for key, value in custom_metadata.items():
param_name = f"meta_{key}"
conditions.append(
f"json_extract(entity_metadata, '$.{key}') = :{param_name}"
)
params[param_name] = value
where = " AND ".join(conditions)
sql = f"""
SELECT * FROM entities
WHERE {where}
ORDER BY created_at DESC
LIMIT :limit OFFSET :offset
"""
params["limit"] = limit
params["offset"] = offset
async with db.scoped_session(self.session_maker) as session:
result = await session.execute(text(sql), params)
return self._format_sql_results(result)
```
### Real-World Examples
#### Example 1: Pure Metadata Query (No Text)
```python
# "Find all meetings tagged 'important'"
results = await search_repo.search(
types=["meeting"],
tags=["important"]
)
# Routing: → SQL (~5ms)
# SQL: SELECT * FROM entities
# WHERE entity_type = 'meeting'
# AND json_extract(entity_metadata, '$.tags') LIKE '%important%'
```
#### Example 2: Semantic Search (No Metadata)
```python
# "Find notes about neural networks"
results = await search_repo.search(
search_text="neural networks"
)
# Routing: → ChromaDB (~80ms)
# Finds: "machine learning", "deep learning", "AI models", etc.
```
#### Example 3: Semantic + Metadata
```python
# "Find meeting notes about 'project planning' tagged 'important'"
results = await search_repo.search(
search_text="project planning",
types=["meeting"],
tags=["important"]
)
# Routing: → ChromaDB with filters (~100ms)
# ChromaDB: query_texts=["project planning"]
# where={"entity_type": "meeting",
# "tags": {"$contains": "important"}}
# Finds: "roadmap discussion", "sprint planning", etc.
```
#### Example 4: Complex Frontmatter Query
```python
# "Find in-progress specs with multiple tags, recent"
results = await search_repo.search(
types=["spec"],
tags=["architecture", "cloud"],
after_date=datetime(2024, 1, 1),
custom_metadata={"status": "in-progress"}
)
# Routing: → SQL (~10ms)
# No text search, pure structured query - SQL is faster
```
#### Example 5: Semantic + Complex Metadata
```python
# "Find notes about 'authentication' that are in-progress"
results = await search_repo.search(
search_text="authentication",
custom_metadata={"status": "in-progress", "priority": "high"}
)
# Routing: → ChromaDB with metadata filters (~100ms)
# Semantic search for "authentication" concept
# Filters by status and priority in metadata
```
#### Example 6: Exact Permalink
```python
# "Show me specs/search-feature"
results = await search_repo.search(
permalink="specs/search-feature"
)
# Routing: → SQL (~1ms)
# SQL: SELECT * FROM entities WHERE permalink = 'specs/search-feature'
```
#### Example 7: Pattern Match
```python
# "Show me all specs"
results = await search_repo.search(
permalink_match="specs/*"
)
# Routing: → SQL (~5ms)
# SQL: SELECT * FROM entities WHERE permalink GLOB 'specs/*'
```
### What We Remove vs Keep
**REMOVE (FTS5-specific):**
- ❌ `CREATE VIRTUAL TABLE search_index USING fts5(...)`
- ❌ `MATCH` operator queries
- ❌ FTS5 tokenization configuration
- ❌ ~300 lines of FTS5 query preparation code
- ❌ Trigram generation and prefix handling
**KEEP (Standard SQL):**
- ✅ `SELECT * FROM entities WHERE permalink = :permalink`
- ✅ `SELECT * FROM entities WHERE permalink GLOB :pattern`
- ✅ `SELECT * FROM entities WHERE title LIKE :title`
- ✅ `SELECT * FROM entities WHERE json_extract(entity_metadata, ...) = :value`
- ✅ All date filtering, pagination, sorting
- ✅ Entity table structure and indexes
**ADD (ChromaDB):**
- ✅ ChromaDB persistent client (embedded)
- ✅ Semantic vector search
- ✅ Metadata filtering in ChromaDB
- ✅ Smart routing logic
## How (High Level)
### Architecture Overview
```
┌─────────────────────────────────────────────────────────────┐
│ FOSS Deployment (Local) │
├─────────────────────────────────────────────────────────────┤
│ SQLite (data) + ChromaDB embedded (search) │
│ - No external services │
│ - Local embedding model (sentence-transformers) │
│ - Persists in ~/.basic-memory/chroma_data/ │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ Cloud Deployment (Multi-tenant) │
├─────────────────────────────────────────────────────────────┤
│ PostgreSQL/Neon (data) + ChromaDB server (search) │
│ - Neon serverless Postgres for persistence │
│ - ChromaDB server in Docker container │
│ - Optional: OpenAI embeddings for better quality │
└─────────────────────────────────────────────────────────────┘
```
### Phase 1: ChromaDB Integration (2-3 days)
#### 1. Add ChromaDB Dependency
```toml
# pyproject.toml
dependencies = [
"chromadb>=0.4.0",
"sentence-transformers>=2.2.0", # Local embeddings
]
```
#### 2. Create ChromaSearchBackend
```python
# src/basic_memory/search/chroma_backend.py
from chromadb import PersistentClient
from chromadb.utils import embedding_functions
class ChromaSearchBackend:
def __init__(
self,
persist_directory: Path,
collection_name: str = "knowledge_base",
embedding_model: str = "all-MiniLM-L6-v2"
):
"""Initialize ChromaDB with local embeddings."""
self.client = PersistentClient(path=str(persist_directory))
# Use local sentence-transformers model (no API costs)
self.embed_fn = embedding_functions.SentenceTransformerEmbeddingFunction(
model_name=embedding_model
)
self.collection = self.client.get_or_create_collection(
name=collection_name,
embedding_function=self.embed_fn,
metadata={"hnsw:space": "cosine"} # Similarity metric
)
async def index_entity(self, entity: Entity):
"""Index entity with automatic embeddings."""
# Combine title and content for semantic search
document = self._format_document(entity)
self.collection.upsert(
ids=[f"entity_{entity.id}_{entity.project_id}"],
documents=[document],
metadatas=[{
"entity_id": entity.id,
"project_id": entity.project_id,
"permalink": entity.permalink,
"file_path": entity.file_path,
"entity_type": entity.entity_type,
"type": "entity",
}]
)
async def search(
self,
query_text: str,
project_id: int,
limit: int = 10,
filters: dict = None
) -> List[SearchResult]:
"""Semantic search with metadata filtering."""
where = {"project_id": project_id}
if filters:
where.update(filters)
results = self.collection.query(
query_texts=[query_text],
n_results=limit,
where=where
)
return self._format_results(results)
```
#### 3. Update SearchRepository
```python
# src/basic_memory/repository/search_repository.py
class SearchRepository:
def __init__(
self,
session_maker: async_sessionmaker[AsyncSession],
project_id: int,
chroma_backend: ChromaSearchBackend
):
self.session_maker = session_maker
self.project_id = project_id
self.chroma = chroma_backend
async def search(
self,
search_text: Optional[str] = None,
permalink: Optional[str] = None,
# ... other filters
) -> List[SearchIndexRow]:
"""Search using ChromaDB for text, SQL for exact lookups."""
# For exact permalink/pattern matches, use SQL
if permalink or permalink_match:
return await self._sql_exact_search(...)
# For text search, use ChromaDB semantic search
if search_text:
results = await self.chroma.search(
query_text=search_text,
project_id=self.project_id,
limit=limit,
filters=self._build_filters(types, after_date, ...)
)
return results
# Fallback to listing all
return await self._list_entities(...)
```
#### 4. Update SearchService
```python
# src/basic_memory/services/search_service.py
class SearchService:
def __init__(
self,
search_repository: SearchRepository,
entity_repository: EntityRepository,
file_service: FileService,
chroma_backend: ChromaSearchBackend,
):
self.repository = search_repository
self.entity_repository = entity_repository
self.file_service = file_service
self.chroma = chroma_backend
async def index_entity(self, entity: Entity):
"""Index entity in ChromaDB."""
if entity.is_markdown:
await self._index_entity_markdown(entity)
else:
await self._index_entity_file(entity)
async def _index_entity_markdown(self, entity: Entity):
"""Index markdown entity with full content."""
# Index entity
await self.chroma.index_entity(entity)
# Index observations (as separate documents)
for obs in entity.observations:
await self.chroma.index_observation(obs, entity)
# Index relations (metadata only)
for rel in entity.outgoing_relations:
await self.chroma.index_relation(rel, entity)
```
### Phase 2: PostgreSQL Support (1 day)
#### 1. Add PostgreSQL Database Type
```python
# src/basic_memory/db.py
class DatabaseType(Enum):
MEMORY = auto()
FILESYSTEM = auto()
POSTGRESQL = auto() # NEW
@classmethod
def get_db_url(cls, db_path_or_url: str, db_type: "DatabaseType") -> str:
if db_type == cls.POSTGRESQL:
return db_path_or_url # Neon connection string
elif db_type == cls.MEMORY:
return "sqlite+aiosqlite://"
return f"sqlite+aiosqlite:///{db_path_or_url}"
```
#### 2. Update Connection Handling
```python
def _create_engine_and_session(...):
db_url = DatabaseType.get_db_url(db_path_or_url, db_type)
if db_type == DatabaseType.POSTGRESQL:
# Use asyncpg driver for Postgres
engine = create_async_engine(
db_url,
pool_size=10,
max_overflow=20,
pool_pre_ping=True, # Health checks
)
else:
# SQLite configuration
engine = create_async_engine(db_url, connect_args=connect_args)
# Only configure SQLite-specific settings for SQLite
if db_type != DatabaseType.MEMORY:
@event.listens_for(engine.sync_engine, "connect")
def enable_wal_mode(dbapi_conn, connection_record):
_configure_sqlite_connection(dbapi_conn, enable_wal=True)
return engine, async_sessionmaker(engine, expire_on_commit=False)
```
#### 3. Remove SQLite-Specific Code
```python
# Remove from scoped_session context manager:
# await session.execute(text("PRAGMA foreign_keys=ON")) # DELETE
# PostgreSQL handles foreign keys by default
```
### Phase 3: Migration & Testing (1-2 days)
#### 1. Create Migration Script
```python
# scripts/migrate_to_chromadb.py
async def migrate_fts5_to_chromadb():
"""One-time migration from FTS5 to ChromaDB."""
# 1. Read all entities from database
entities = await entity_repository.find_all()
# 2. Index in ChromaDB
for entity in entities:
await search_service.index_entity(entity)
# 3. Drop FTS5 table (Alembic migration)
await session.execute(text("DROP TABLE IF EXISTS search_index"))
```
#### 2. Update Tests
- Replace FTS5 test fixtures with ChromaDB fixtures
- Test semantic search quality
- Test multi-project isolation in ChromaDB
- Benchmark performance vs FTS5
#### 3. Documentation Updates
- Update search documentation
- Add ChromaDB configuration guide
- Document embedding model options
- PostgreSQL deployment guide
### Configuration
```python
# config.py
class BasicMemoryConfig:
# Database
database_type: DatabaseType = DatabaseType.FILESYSTEM
database_path: Path = Path.home() / ".basic-memory" / "memory.db"
database_url: Optional[str] = None # For Postgres: postgresql://...
# Search
chroma_persist_directory: Path = Path.home() / ".basic-memory" / "chroma_data"
embedding_model: str = "all-MiniLM-L6-v2" # Local model
embedding_provider: str = "local" # or "openai"
openai_api_key: Optional[str] = None # For cloud deployments
```
### Deployment Configurations
#### Local (FOSS)
```yaml
# Default configuration
database_type: FILESYSTEM
database_path: ~/.basic-memory/memory.db
chroma_persist_directory: ~/.basic-memory/chroma_data
embedding_model: all-MiniLM-L6-v2
embedding_provider: local
```
#### Cloud (Docker Compose)
```yaml
services:
postgres:
image: postgres:15
environment:
POSTGRES_DB: basic_memory
POSTGRES_PASSWORD: ${DB_PASSWORD}
chromadb:
image: chromadb/chroma:latest
volumes:
- chroma_data:/chroma/chroma
environment:
ALLOW_RESET: true
app:
environment:
DATABASE_TYPE: POSTGRESQL
DATABASE_URL: postgresql://postgres:${DB_PASSWORD}@postgres/basic_memory
CHROMA_HOST: chromadb
CHROMA_PORT: 8000
EMBEDDING_PROVIDER: local # or openai
```
## How to Evaluate
### Success Criteria
#### Functional Requirements
- ✅ Semantic search finds related concepts (e.g., "AI" finds "machine learning")
- ✅ Exact permalink/pattern matches work (e.g., `specs/*`)
- ✅ Multi-project isolation maintained
- ✅ All existing search filters work (type, date, metadata)
- ✅ MCP tools continue to work without changes
- ✅ Works with both SQLite and PostgreSQL
#### Performance Requirements
- ✅ Search latency < 200ms for 1000 documents (local embedding)
- ✅ Indexing time comparable to FTS5 (~10 files/sec)
- ✅ Initial sync time not significantly worse than current
- ✅ Memory footprint < 1GB for local deployments
#### Quality Requirements
- ✅ Better search relevance than FTS5 keyword matching
- ✅ Handles typos and word variations
- ✅ Finds semantically similar content
#### Deployment Requirements
- ✅ FOSS: Works out-of-box with no external services
- ✅ Cloud: Integrates with PostgreSQL (Neon)
- ✅ No breaking changes to MCP API
- ✅ Migration script for existing users
### Testing Procedure
#### 1. Unit Tests
```bash
# Test ChromaDB backend
pytest tests/test_chroma_backend.py
# Test search repository with ChromaDB
pytest tests/test_search_repository.py
# Test search service
pytest tests/test_search_service.py
```
#### 2. Integration Tests
```bash
# Test full search workflow
pytest test-int/test_search_integration.py
# Test with PostgreSQL
DATABASE_TYPE=POSTGRESQL pytest test-int/
```
#### 3. Semantic Search Quality Tests
```python
# Test semantic similarity
search("machine learning") should find:
- "neural networks"
- "deep learning"
- "AI algorithms"
search("software architecture") should find:
- "system design"
- "design patterns"
- "microservices"
```
#### 4. Performance Benchmarks
```bash
# Run search benchmarks
pytest test-int/test_search_performance.py -v
# Measure:
- Search latency (should be < 200ms)
- Indexing throughput (should be ~10 files/sec)
- Memory usage (should be < 1GB)
```
#### 5. Migration Testing
```bash
# Test migration from FTS5 to ChromaDB
python scripts/migrate_to_chromadb.py
# Verify all entities indexed
# Verify search results quality
# Verify no data loss
```
### Metrics
**Search Quality:**
- Semantic relevance score (manual evaluation)
- Precision/recall for common queries
- User satisfaction (qualitative)
**Performance:**
- Average search latency (ms)
- P95/P99 search latency
- Indexing throughput (files/sec)
- Memory usage (MB)
**Deployment:**
- Local deployment success rate
- Cloud deployment success rate
- Migration success rate
## Implementation Checklist
### Phase 1: ChromaDB Integration
- [ ] Add ChromaDB and sentence-transformers dependencies
- [ ] Create ChromaSearchBackend class
- [ ] Update SearchRepository to use ChromaDB
- [ ] Update SearchService indexing methods
- [ ] Remove FTS5 table creation code
- [ ] Update search query logic
- [ ] Add ChromaDB configuration to BasicMemoryConfig
### Phase 2: PostgreSQL Support
- [ ] Add DatabaseType.POSTGRESQL enum
- [ ] Update get_db_url() for Postgres connection strings
- [ ] Add asyncpg dependency
- [ ] Update engine creation for Postgres
- [ ] Remove SQLite-specific PRAGMA statements
- [ ] Test with Neon database
### Phase 3: Testing & Migration
- [ ] Write unit tests for ChromaSearchBackend
- [ ] Update search integration tests
- [ ] Add semantic search quality tests
- [ ] Create performance benchmarks
- [ ] Write migration script from FTS5
- [ ] Test migration with existing data
- [ ] Update documentation
### Phase 4: Deployment
- [ ] Update docker-compose.yml for cloud
- [ ] Document local FOSS deployment
- [ ] Document cloud PostgreSQL deployment
- [ ] Create migration guide for users
- [ ] Update MCP tool documentation
## Notes
### Embedding Model Trade-offs
**Local Model: `all-MiniLM-L6-v2`**
- Size: 80MB download
- Speed: ~50ms embedding time
- Dimensions: 384
- Cost: $0
- Quality: Good for general knowledge
- Best for: FOSS deployments
**OpenAI: `text-embedding-3-small`**
- Speed: ~100-200ms (API call)
- Dimensions: 1536
- Cost: ~$0.13 per 1M tokens (~$0.01 per 1000 notes)
- Quality: Excellent
- Best for: Cloud deployments with budget
### ChromaDB Storage
ChromaDB stores data in:
```
~/.basic-memory/chroma_data/
├── chroma.sqlite3 # Metadata
├── index/ # HNSW indexes
└── collections/ # Vector data
```
Typical sizes:
- 100 notes: ~5MB
- 1000 notes: ~50MB
- 10000 notes: ~500MB
### Why Not Keep FTS5?
**Considered:** Hybrid approach (FTS5 for SQLite + tsvector for Postgres)
**Rejected because:**
- 2x the code to maintain
- 2x the tests to write
- 2x the bugs to fix
- Inconsistent search behavior between deployments
- ChromaDB provides better search quality anyway
**ChromaDB wins:**
- One implementation for both databases
- Better search quality (semantic!)
- Database-agnostic architecture
- Embedded mode for FOSS (no servers needed)
## implementation
Proposed Architecture
Option 1: ChromaDB Only (Simplest)
class ChromaSearchBackend:
def __init__(self, path: str, embedding_model: str = "all-MiniLM-L6-v2"):yes
# For local: embedded client (no server!)
self.client = chromadb.PersistentClient(path=path)
# Use local embedding model (no API costs!)
from chromadb.utils import embedding_functions
self.embed_fn = embedding_functions.SentenceTransformerEmbeddingFunction(
model_name=embedding_model
)
self.collection = self.client.get_or_create_collection(
name="knowledge_base",
embedding_function=self.embed_fn
)
async def index_entity(self, entity: Entity):
# ChromaDB handles embeddings automatically!
self.collection.upsert(
ids=[str(entity.id)],
documents=[f"{entity.title}\n{entity.content}"],
metadatas=[{
"permalink": entity.permalink,
"type": entity.entity_type,
"file_path": entity.file_path
}]
)
async def search(self, query: str, filters: dict = None):
# Semantic search with optional metadata filters
results = self.collection.query(
query_texts=[query],
n_results=10,
where=filters # e.g., {"type": "note"}
)
return results
Deployment:
- Local (FOSS): ChromaDB embedded, local embedding model, NO servers
- Cloud: ChromaDB server OR still embedded (it's just a Python lib!)
Option 2: Hybrid FTS + ChromaDB (Best UX)
class HybridSearchBackend:
def __init__(self):
self.fts = SQLiteFTS5Backend() # Fast keyword search
self.chroma = ChromaSearchBackend() # Semantic search
async def search(self, query: str, search_type: str = "auto"):
if search_type == "exact":
# User wants exact match: "specs/search-feature"
return await self.fts.search(query)
elif search_type == "semantic":
# User wants related concepts
return await self.chroma.search(query)
else: # "auto"
# Check if query looks like exact match
if "/" in query or query.startswith('"'):
return await self.fts.search(query)
# Otherwise use semantic search
return await self.chroma.search(query)
Embedding Options
Option A: Local Model (FREE, FOSS-friendly)
# Uses sentence-transformers (runs locally)
# Model: ~100MB download
# Speed: ~50-100ms for embedding
# Cost: $0
from chromadb.utils import embedding_functions
embed_fn = embedding_functions.SentenceTransformerEmbeddingFunction(
model_name="all-MiniLM-L6-v2" # Fast, accurate, free
)
Option B: OpenAI Embeddings (Cloud only)
# For cloud users who want best quality
# Model: text-embedding-3-small
# Speed: ~100-200ms via API
# Cost: ~$0.13 per 1M tokens (~$0.01 per 1000 notes)
embed_fn = embedding_functions.OpenAIEmbeddingFunction(
api_key="...",
model_name="text-embedding-3-small"
)
Performance Comparison
Local embedding model: all-MiniLM-L6-v2
Embedding time: ~50ms per note
Search time: ~100ms for 1000 notes
Memory: ~500MB (model + ChromaDB)
Cost: $0
Quality: Good (384 dimensions)
OpenAI embeddings: text-embedding-3-small
Embedding time: ~100-200ms per note (API call)
Search time: ~50ms for 1000 notes
Cost: ~$0.01 per 1000 notes
Quality: Excellent (1536 dimensions)
My Recommendation: ChromaDB with Local Embeddings
Here's the plan:
Phase 1: Local ChromaDB (1-2 days)
# FOSS version
- SQLite for data persistence
- ChromaDB embedded for semantic search
- Local embedding model (no API costs)
- NO external services required
Benefits:
- ✅ Same deployment as current (just Python package)
- ✅ Semantic search for better UX
- ✅ Free embeddings with local model
- ✅ No servers needed
Phase 2: Postgres + ChromaDB Cloud (1-2 days)
# Cloud version
- Postgres for data persistence
- ChromaDB server for semantic search
- OpenAI embeddings (higher quality)
- OR keep local embeddings (cheaper)
Phase 3: Hybrid Search (optional, 1 day)
# Add FTS for exact matches alongside ChromaDB
- Quick keyword search when needed
- Semantic search for exploration
- Best of both worlds
Code Estimate
Just ChromaDB (replacing FTS5):
- Remove FTS5 code: 2 hours
- Add ChromaDB backend: 4 hours
- Update search service: 2 hours
- Testing: 4 hours
- Total: 1.5 days
ChromaDB + Postgres migration:
- Add Postgres support: 4 hours
- Test with Neon: 2 hours
- Total: +0.75 days
Grand total: 2-3 days for complete migration
The Kicker
ChromaDB solves BOTH problems:
1. ✅ Works with SQLite AND Postgres (it's separate!)
2. ✅ No server needed for local (embedded mode)
3. ✅ Better search than FTS5 (semantic!)
4. ✅ One implementation for both deployments
Want me to prototype this? I can show you:
1. ChromaDB embedded with local embeddings
2. Example searches showing semantic matching
3. Performance benchmarks
4. Migration from FTS5
## Observations
- [problem] SQLite FTS5 and PostgreSQL tsvector are incompatible architectures requiring dual implementation #database-compatibility
- [problem] Cloud deployments lose database on container restart requiring full re-sync #persistence
- [solution] ChromaDB provides database-agnostic semantic search eliminating dual implementation #architecture
- [advantage] Semantic search finds related concepts beyond keyword matching improving UX #search-quality
- [deployment] Embedded ChromaDB requires no external services for FOSS #simplicity
- [migration] Moving to PostgreSQL solves cloud persistence issues #cloud-architecture
- [performance] Local embedding models provide good quality at zero cost #cost-optimization
- [trade-off] Embedding generation adds ~50ms latency vs instant FTS5 indexing #performance
- [benefit] Single search codebase reduces maintenance burden and test coverage needs #maintainability
## Prior Art / References
### Community Fork: manuelbliemel/basic-memory (feature/vector-search)
**Repository**: https://github.com/manuelbliemel/basic-memory/tree/feature/vector-search
**Key Implementation Details**:
**Vector Database**: ChromaDB (same as our approach!)
**Embedding Models**:
- Local: `all-MiniLM-L6-v2` (default, 384 dims) - same model we planned
- Also supports: `all-mpnet-base-v2`, `paraphrase-MiniLM-L6-v2`, `multi-qa-MiniLM-L6-cos-v1`
- OpenAI: `text-embedding-ada-002`, `text-embedding-3-small`, `text-embedding-3-large`
**Chunking Strategy** (interesting - we didn't consider this):
- Chunk Size: 500 characters
- Chunk Overlap: 50 characters
- Breaks documents into smaller pieces for better semantic search
**Search Strategies**:
1. `fuzzy_only` (default) - FTS5 only
2. `vector_only` - ChromaDB only
3. `hybrid` (recommended) - Both FTS5 + ChromaDB
4. `fuzzy_primary` - FTS5 first, ChromaDB fallback
5. `vector_primary` - ChromaDB first, FTS5 fallback
**Configuration**:
- Similarity Threshold: 0.1
- Max Results: 5
- Storage: `~/.basic-memory/chroma/`
- Config: `~/.basic-memory/config.json`
**Key Differences from Our Approach**:
| Aspect | Their Approach | Our Approach |
|--------|---------------|--------------|
| FTS5 | Keep FTS5 + add ChromaDB | Remove FTS5, use SQL for exact lookups |
| Search Strategy | 5 configurable strategies | Smart routing (automatic) |
| Document Processing | Chunk into 500-char pieces | Index full documents |
| Hybrid Mode | Run both, merge, dedupe | Route to best backend |
| Configuration | User-configurable strategy | Automatic based on query type |
**What We Can Learn**:
1. **Chunking**: Breaking documents into 500-character chunks with 50-char overlap may improve semantic search quality for long documents
- Pro: Better granularity for semantic matching
- Con: More vectors to store and search
- Consider: Optional chunking for large documents (>2000 chars)
2. **Configurable Strategies**: Allowing users to choose search strategy provides flexibility
- Pro: Power users can tune behavior
- Con: More complexity, most users won't configure
- Consider: Default to smart routing, allow override via config
3. **Similarity Threshold**: They use 0.1 as default
- Consider: Benchmark different thresholds for quality
4. **Storage Location**: `~/.basic-memory/chroma/` matches our planned `chroma_data/` approach
**Potential Collaboration**:
- Their implementation is nearly complete as a fork
- Could potentially merge their work or use as reference implementation
- Their chunking strategy could be valuable addition to our approach
## Relations
- implements [[SPEC-11 Basic Memory API Performance Optimization]]
- relates_to [[Performance Optimizations Documentation]]
- enables [[PostgreSQL Migration]]
- improves_on [[SQLite FTS5 Search]]
- references [[manuelbliemel/basic-memory feature/vector-search fork]]
```
--------------------------------------------------------------------------------
/tests/api/test_knowledge_router.py:
--------------------------------------------------------------------------------
```python
"""Tests for knowledge graph API routes."""
from urllib.parse import quote
import pytest
from httpx import AsyncClient
from basic_memory.schemas import (
Entity,
EntityResponse,
)
from basic_memory.schemas.search import SearchItemType, SearchResponse
from basic_memory.utils import normalize_newlines
@pytest.mark.asyncio
async def test_create_entity(client: AsyncClient, file_service, project_url):
"""Should create entity successfully."""
data = {
"title": "TestEntity",
"folder": "test",
"entity_type": "test",
"content": "TestContent",
"project": "Test Project Context",
}
# Create an entity
print(f"Requesting with data: {data}")
# Use the permalink version of the project name in the path
response = await client.post(f"{project_url}/knowledge/entities", json=data)
# Print response for debugging
print(f"Response status: {response.status_code}")
print(f"Response content: {response.text}")
# Verify creation
assert response.status_code == 200
entity = EntityResponse.model_validate(response.json())
assert entity.permalink == "test/test-entity"
assert entity.file_path == "test/TestEntity.md"
assert entity.entity_type == data["entity_type"]
assert entity.content_type == "text/markdown"
# Verify file has new content but preserved metadata
file_path = file_service.get_entity_path(entity)
file_content, _ = await file_service.read_file(file_path)
assert data["content"] in file_content
@pytest.mark.asyncio
async def test_create_entity_observations_relations(client: AsyncClient, file_service, project_url):
"""Should create entity successfully."""
data = {
"title": "TestEntity",
"folder": "test",
"content": """
# TestContent
## Observations
- [note] This is notable #tag1 (testing)
- related to [[SomeOtherThing]]
""",
}
# Create an entity
response = await client.post(f"{project_url}/knowledge/entities", json=data)
# Verify creation
assert response.status_code == 200
entity = EntityResponse.model_validate(response.json())
assert entity.permalink == "test/test-entity"
assert entity.file_path == "test/TestEntity.md"
assert entity.entity_type == "note"
assert entity.content_type == "text/markdown"
assert len(entity.observations) == 1
assert entity.observations[0].category == "note"
assert entity.observations[0].content == "This is notable #tag1"
assert entity.observations[0].tags == ["tag1"]
assert entity.observations[0].context == "testing"
assert len(entity.relations) == 1
assert entity.relations[0].relation_type == "related to"
assert entity.relations[0].from_id == "test/test-entity"
assert entity.relations[0].to_id is None
# Verify file has new content but preserved metadata
file_path = file_service.get_entity_path(entity)
file_content, _ = await file_service.read_file(file_path)
assert data["content"].strip() in file_content
@pytest.mark.asyncio
async def test_relation_resolution_after_creation(client: AsyncClient, project_url):
"""Test that relation resolution works after creating entities and handles exceptions gracefully."""
# Create first entity with unresolved relation
entity1_data = {
"title": "EntityOne",
"folder": "test",
"entity_type": "test",
"content": "This entity references [[EntityTwo]]",
}
response1 = await client.put(
f"{project_url}/knowledge/entities/test/entity-one", json=entity1_data
)
assert response1.status_code == 201
entity1 = response1.json()
# Verify relation exists but is unresolved
assert len(entity1["relations"]) == 1
assert entity1["relations"][0]["to_id"] is None
assert entity1["relations"][0]["to_name"] == "EntityTwo"
# Create the referenced entity
entity2_data = {
"title": "EntityTwo",
"folder": "test",
"entity_type": "test",
"content": "This is the referenced entity",
}
response2 = await client.put(
f"{project_url}/knowledge/entities/test/entity-two", json=entity2_data
)
assert response2.status_code == 201
# Verify the original entity's relation was resolved
response_check = await client.get(f"{project_url}/knowledge/entities/test/entity-one")
assert response_check.status_code == 200
updated_entity1 = response_check.json()
# The relation should now be resolved via the automatic resolution after entity creation
resolved_relations = [r for r in updated_entity1["relations"] if r["to_id"] is not None]
assert (
len(resolved_relations) >= 0
) # May or may not be resolved immediately depending on timing
@pytest.mark.asyncio
async def test_get_entity_by_permalink(client: AsyncClient, project_url):
"""Should retrieve an entity by path ID."""
# First create an entity
data = {"title": "TestEntity", "folder": "test", "entity_type": "test"}
response = await client.post(f"{project_url}/knowledge/entities", json=data)
assert response.status_code == 200
data = response.json()
# Now get it by permalink
permalink = data["permalink"]
response = await client.get(f"{project_url}/knowledge/entities/{permalink}")
# Verify retrieval
assert response.status_code == 200
entity = response.json()
assert entity["title"] == "TestEntity"
assert entity["file_path"] == "test/TestEntity.md"
assert entity["entity_type"] == "test"
assert entity["permalink"] == "test/test-entity"
@pytest.mark.asyncio
async def test_get_entity_by_file_path(client: AsyncClient, project_url):
"""Should retrieve an entity by path ID."""
# First create an entity
data = {"title": "TestEntity", "folder": "test", "entity_type": "test"}
response = await client.post(f"{project_url}/knowledge/entities", json=data)
assert response.status_code == 200
data = response.json()
# Now get it by path
file_path = data["file_path"]
response = await client.get(f"{project_url}/knowledge/entities/{file_path}")
# Verify retrieval
assert response.status_code == 200
entity = response.json()
assert entity["title"] == "TestEntity"
assert entity["file_path"] == "test/TestEntity.md"
assert entity["entity_type"] == "test"
assert entity["permalink"] == "test/test-entity"
@pytest.mark.asyncio
async def test_get_entities(client: AsyncClient, project_url):
"""Should open multiple entities by path IDs."""
# Create a few entities with different names
await client.post(
f"{project_url}/knowledge/entities",
json={"title": "AlphaTest", "folder": "", "entity_type": "test"},
)
await client.post(
f"{project_url}/knowledge/entities",
json={"title": "BetaTest", "folder": "", "entity_type": "test"},
)
# Open nodes by path IDs
response = await client.get(
f"{project_url}/knowledge/entities?permalink=alpha-test&permalink=beta-test",
)
# Verify results
assert response.status_code == 200
data = response.json()
assert len(data["entities"]) == 2
entity_0 = data["entities"][0]
assert entity_0["title"] == "AlphaTest"
assert entity_0["file_path"] == "AlphaTest.md"
assert entity_0["entity_type"] == "test"
assert entity_0["permalink"] == "alpha-test"
entity_1 = data["entities"][1]
assert entity_1["title"] == "BetaTest"
assert entity_1["file_path"] == "BetaTest.md"
assert entity_1["entity_type"] == "test"
assert entity_1["permalink"] == "beta-test"
@pytest.mark.asyncio
async def test_delete_entity(client: AsyncClient, project_url):
"""Test DELETE /knowledge/entities with path ID."""
# Create test entity
entity_data = {"file_path": "TestEntity", "entity_type": "test"}
await client.post(f"{project_url}/knowledge/entities", json=entity_data)
# Test deletion
response = await client.post(
f"{project_url}/knowledge/entities/delete", json={"permalinks": ["test-entity"]}
)
assert response.status_code == 200
assert response.json() == {"deleted": True}
# Verify entity is gone
permalink = quote("test/TestEntity")
response = await client.get(f"{project_url}/knowledge/entities/{permalink}")
assert response.status_code == 404
@pytest.mark.asyncio
async def test_delete_single_entity(client: AsyncClient, project_url):
"""Test DELETE /knowledge/entities with path ID."""
# Create test entity
entity_data = {"title": "TestEntity", "folder": "", "entity_type": "test"}
await client.post(f"{project_url}/knowledge/entities", json=entity_data)
# Test deletion
response = await client.delete(f"{project_url}/knowledge/entities/test-entity")
assert response.status_code == 200
assert response.json() == {"deleted": True}
# Verify entity is gone
permalink = quote("test/TestEntity")
response = await client.get(f"{project_url}/knowledge/entities/{permalink}")
assert response.status_code == 404
@pytest.mark.asyncio
async def test_delete_single_entity_by_title(client: AsyncClient, project_url):
"""Test DELETE /knowledge/entities with file path."""
# Create test entity
entity_data = {"title": "TestEntity", "folder": "", "entity_type": "test"}
response = await client.post(f"{project_url}/knowledge/entities", json=entity_data)
assert response.status_code == 200
data = response.json()
# Test deletion
response = await client.delete(f"{project_url}/knowledge/entities/TestEntity")
assert response.status_code == 200
assert response.json() == {"deleted": True}
# Verify entity is gone
file_path = quote(data["file_path"])
response = await client.get(f"{project_url}/knowledge/entities/{file_path}")
assert response.status_code == 404
@pytest.mark.asyncio
async def test_delete_single_entity_not_found(client: AsyncClient, project_url):
"""Test DELETE /knowledge/entities with path ID."""
# Test deletion
response = await client.delete(f"{project_url}/knowledge/entities/test-not-found")
assert response.status_code == 200
assert response.json() == {"deleted": False}
@pytest.mark.asyncio
async def test_delete_entity_bulk(client: AsyncClient, project_url):
"""Test bulk entity deletion using path IDs."""
# Create test entities
await client.post(
f"{project_url}/knowledge/entities", json={"file_path": "Entity1", "entity_type": "test"}
)
await client.post(
f"{project_url}/knowledge/entities", json={"file_path": "Entity2", "entity_type": "test"}
)
# Test deletion
response = await client.post(
f"{project_url}/knowledge/entities/delete", json={"permalinks": ["Entity1", "Entity2"]}
)
assert response.status_code == 200
assert response.json() == {"deleted": True}
# Verify entities are gone
for name in ["Entity1", "Entity2"]:
permalink = quote(f"{name}")
response = await client.get(f"{project_url}/knowledge/entities/{permalink}")
assert response.status_code == 404
@pytest.mark.asyncio
async def test_delete_nonexistent_entity(client: AsyncClient, project_url):
"""Test deleting a nonexistent entity by path ID."""
response = await client.post(
f"{project_url}/knowledge/entities/delete", json={"permalinks": ["non_existent"]}
)
assert response.status_code == 200
assert response.json() == {"deleted": True}
@pytest.mark.asyncio
async def test_entity_indexing(client: AsyncClient, project_url):
"""Test entity creation includes search indexing."""
# Create entity
response = await client.post(
f"{project_url}/knowledge/entities",
json={
"title": "SearchTest",
"folder": "",
"entity_type": "test",
"observations": ["Unique searchable observation"],
},
)
assert response.status_code == 200
# Verify it's searchable
search_response = await client.post(
f"{project_url}/search/",
json={"text": "search", "entity_types": [SearchItemType.ENTITY.value]},
)
assert search_response.status_code == 200
search_result = SearchResponse.model_validate(search_response.json())
assert len(search_result.results) == 1
assert search_result.results[0].permalink == "search-test"
assert search_result.results[0].type == SearchItemType.ENTITY.value
@pytest.mark.asyncio
async def test_entity_delete_indexing(client: AsyncClient, project_url):
"""Test deleted entities are removed from search index."""
# Create entity
response = await client.post(
f"{project_url}/knowledge/entities",
json={
"title": "DeleteTest",
"folder": "",
"entity_type": "test",
"observations": ["Searchable observation that should be removed"],
},
)
assert response.status_code == 200
entity = response.json()
# Verify it's initially searchable
search_response = await client.post(
f"{project_url}/search/",
json={"text": "delete", "entity_types": [SearchItemType.ENTITY.value]},
)
search_result = SearchResponse.model_validate(search_response.json())
assert len(search_result.results) == 1
# Delete entity
delete_response = await client.post(
f"{project_url}/knowledge/entities/delete", json={"permalinks": [entity["permalink"]]}
)
assert delete_response.status_code == 200
# Verify it's no longer searchable
search_response = await client.post(
f"{project_url}/search/", json={"text": "delete", "types": [SearchItemType.ENTITY.value]}
)
search_result = SearchResponse.model_validate(search_response.json())
assert len(search_result.results) == 0
@pytest.mark.asyncio
async def test_update_entity_basic(client: AsyncClient, project_url):
"""Test basic entity field updates."""
# Create initial entity
response = await client.post(
f"{project_url}/knowledge/entities",
json={
"title": "test",
"folder": "",
"entity_type": "test",
"content": "Initial summary",
"entity_metadata": {"status": "draft"},
},
)
entity_response = response.json()
# Update fields
entity = Entity(**entity_response, folder="")
entity.entity_metadata["status"] = "final"
entity.content = "Updated summary"
response = await client.put(
f"{project_url}/knowledge/entities/{entity.permalink}", json=entity.model_dump()
)
assert response.status_code == 200
updated = response.json()
# Verify updates
assert updated["entity_metadata"]["status"] == "final" # Preserved
response = await client.get(f"{project_url}/resource/{updated['permalink']}?content=true")
# raw markdown content
fetched = response.text
assert "Updated summary" in fetched
@pytest.mark.asyncio
async def test_update_entity_content(client: AsyncClient, project_url):
"""Test updating content for different entity types."""
# Create a note entity
response = await client.post(
f"{project_url}/knowledge/entities",
json={"title": "test-note", "folder": "", "entity_type": "note", "summary": "Test note"},
)
note = response.json()
# Update fields
entity = Entity(**note, folder="")
entity.content = "# Updated Note\n\nNew content."
response = await client.put(
f"{project_url}/knowledge/entities/{note['permalink']}", json=entity.model_dump()
)
assert response.status_code == 200
updated = response.json()
# Verify through get request to check file
response = await client.get(f"{project_url}/resource/{updated['permalink']}?content=true")
# raw markdown content
fetched = response.text
assert "# Updated Note" in fetched
assert "New content" in fetched
@pytest.mark.asyncio
async def test_update_entity_type_conversion(client: AsyncClient, project_url):
"""Test converting between note and knowledge types."""
# Create a note
note_data = {
"title": "test-note",
"folder": "",
"entity_type": "note",
"summary": "Test note",
"content": "# Test Note\n\nInitial content.",
}
response = await client.post(f"{project_url}/knowledge/entities", json=note_data)
note = response.json()
# Update fields
entity = Entity(**note, folder="")
entity.entity_type = "test"
response = await client.put(
f"{project_url}/knowledge/entities/{note['permalink']}", json=entity.model_dump()
)
assert response.status_code == 200
updated = response.json()
# Verify conversion
assert updated["entity_type"] == "test"
# Get latest to verify file format
response = await client.get(f"{project_url}/knowledge/entities/{updated['permalink']}")
knowledge = response.json()
assert knowledge.get("content") is None
@pytest.mark.asyncio
async def test_update_entity_metadata(client: AsyncClient, project_url):
"""Test updating entity metadata."""
# Create entity
data = {
"title": "test",
"folder": "",
"entity_type": "test",
"entity_metadata": {"status": "draft"},
}
response = await client.post(f"{project_url}/knowledge/entities", json=data)
entity_response = response.json()
# Update fields
entity = Entity(**entity_response, folder="")
entity.entity_metadata["status"] = "final"
entity.entity_metadata["reviewed"] = True
# Update metadata
response = await client.put(
f"{project_url}/knowledge/entities/{entity.permalink}", json=entity.model_dump()
)
assert response.status_code == 200
updated = response.json()
# Verify metadata was merged, not replaced
assert updated["entity_metadata"]["status"] == "final"
assert updated["entity_metadata"]["reviewed"] in (True, "True")
@pytest.mark.asyncio
async def test_update_entity_not_found_does_create(client: AsyncClient, project_url):
"""Test updating non-existent entity does a create"""
data = {
"title": "nonexistent",
"folder": "",
"entity_type": "test",
"observations": ["First observation", "Second observation"],
}
entity = Entity(**data)
response = await client.put(
f"{project_url}/knowledge/entities/nonexistent", json=entity.model_dump()
)
assert response.status_code == 201
@pytest.mark.asyncio
async def test_update_entity_incorrect_permalink(client: AsyncClient, project_url):
"""Test updating non-existent entity does a create"""
data = {
"title": "Test Entity",
"folder": "",
"entity_type": "test",
"observations": ["First observation", "Second observation"],
}
entity = Entity(**data)
response = await client.put(
f"{project_url}/knowledge/entities/nonexistent", json=entity.model_dump()
)
assert response.status_code == 400
@pytest.mark.asyncio
async def test_update_entity_search_index(client: AsyncClient, project_url):
"""Test search index is updated after entity changes."""
# Create entity
data = {
"title": "test",
"folder": "",
"entity_type": "test",
"content": "Initial searchable content",
}
response = await client.post(f"{project_url}/knowledge/entities", json=data)
entity_response = response.json()
# Update fields
entity = Entity(**entity_response, folder="")
entity.content = "Updated with unique sphinx marker"
response = await client.put(
f"{project_url}/knowledge/entities/{entity.permalink}", json=entity.model_dump()
)
assert response.status_code == 200
# Search should find new content
search_response = await client.post(
f"{project_url}/search/",
json={"text": "sphinx marker", "entity_types": [SearchItemType.ENTITY.value]},
)
results = search_response.json()["results"]
assert len(results) == 1
assert results[0]["permalink"] == entity.permalink
# PATCH edit entity endpoint tests
@pytest.mark.asyncio
async def test_edit_entity_append(client: AsyncClient, project_url):
"""Test appending content to an entity via PATCH endpoint."""
# Create test entity
response = await client.post(
f"{project_url}/knowledge/entities",
json={
"title": "Test Note",
"folder": "test",
"entity_type": "note",
"content": "Original content",
},
)
assert response.status_code == 200
entity = response.json()
# Edit entity with append operation
response = await client.patch(
f"{project_url}/knowledge/entities/{entity['permalink']}",
json={"operation": "append", "content": "Appended content"},
)
if response.status_code != 200:
print(f"PATCH failed with status {response.status_code}")
print(f"Response content: {response.text}")
assert response.status_code == 200
updated = response.json()
# Verify content was appended by reading the file
response = await client.get(f"{project_url}/resource/{updated['permalink']}?content=true")
file_content = response.text
assert "Original content" in file_content
assert "Appended content" in file_content
assert file_content.index("Original content") < file_content.index("Appended content")
@pytest.mark.asyncio
async def test_edit_entity_prepend(client: AsyncClient, project_url):
"""Test prepending content to an entity via PATCH endpoint."""
# Create test entity
response = await client.post(
f"{project_url}/knowledge/entities",
json={
"title": "Test Note",
"folder": "test",
"entity_type": "note",
"content": "Original content",
},
)
assert response.status_code == 200
entity = response.json()
# Edit entity with prepend operation
response = await client.patch(
f"{project_url}/knowledge/entities/{entity['permalink']}",
json={"operation": "prepend", "content": "Prepended content"},
)
if response.status_code != 200:
print(f"PATCH prepend failed with status {response.status_code}")
print(f"Response content: {response.text}")
assert response.status_code == 200
updated = response.json()
# Verify the entire file content structure
response = await client.get(f"{project_url}/resource/{updated['permalink']}?content=true")
file_content = response.text
# Expected content with frontmatter preserved and content prepended to body
expected_content = normalize_newlines("""---
title: Test Note
type: note
permalink: test/test-note
---
Prepended content
Original content""")
assert file_content.strip() == expected_content.strip()
@pytest.mark.asyncio
async def test_edit_entity_find_replace(client: AsyncClient, project_url):
"""Test find and replace operation via PATCH endpoint."""
# Create test entity
response = await client.post(
f"{project_url}/knowledge/entities",
json={
"title": "Test Note",
"folder": "test",
"entity_type": "note",
"content": "This is old content that needs updating",
},
)
assert response.status_code == 200
entity = response.json()
# Edit entity with find_replace operation
response = await client.patch(
f"{project_url}/knowledge/entities/{entity['permalink']}",
json={"operation": "find_replace", "content": "new content", "find_text": "old content"},
)
assert response.status_code == 200
updated = response.json()
# Verify content was replaced
response = await client.get(f"{project_url}/resource/{updated['permalink']}?content=true")
file_content = response.text
assert "old content" not in file_content
assert "This is new content that needs updating" in file_content
@pytest.mark.asyncio
async def test_edit_entity_find_replace_with_expected_replacements(
client: AsyncClient, project_url
):
"""Test find and replace with expected_replacements parameter."""
# Create test entity with repeated text
response = await client.post(
f"{project_url}/knowledge/entities",
json={
"title": "Sample Note",
"folder": "docs",
"entity_type": "note",
"content": "The word banana appears here. Another banana word here.",
},
)
assert response.status_code == 200
entity = response.json()
# Edit entity with find_replace operation, expecting 2 replacements
response = await client.patch(
f"{project_url}/knowledge/entities/{entity['permalink']}",
json={
"operation": "find_replace",
"content": "apple",
"find_text": "banana",
"expected_replacements": 2,
},
)
assert response.status_code == 200
updated = response.json()
# Verify both instances were replaced
response = await client.get(f"{project_url}/resource/{updated['permalink']}?content=true")
file_content = response.text
assert "The word apple appears here. Another apple word here." in file_content
@pytest.mark.asyncio
async def test_edit_entity_replace_section(client: AsyncClient, project_url):
"""Test replacing a section via PATCH endpoint."""
# Create test entity with sections
content = """# Main Title
## Section 1
Original section 1 content
## Section 2
Original section 2 content"""
response = await client.post(
f"{project_url}/knowledge/entities",
json={
"title": "Sample Note",
"folder": "docs",
"entity_type": "note",
"content": content,
},
)
assert response.status_code == 200
entity = response.json()
# Edit entity with replace_section operation
response = await client.patch(
f"{project_url}/knowledge/entities/{entity['permalink']}",
json={
"operation": "replace_section",
"content": "New section 1 content",
"section": "## Section 1",
},
)
assert response.status_code == 200
updated = response.json()
# Verify section was replaced
response = await client.get(f"{project_url}/resource/{updated['permalink']}?content=true")
file_content = response.text
assert "New section 1 content" in file_content
assert "Original section 1 content" not in file_content
assert "Original section 2 content" in file_content # Other sections preserved
@pytest.mark.asyncio
async def test_edit_entity_not_found(client: AsyncClient, project_url):
"""Test editing a non-existent entity returns 400."""
response = await client.patch(
f"{project_url}/knowledge/entities/non-existent",
json={"operation": "append", "content": "content"},
)
assert response.status_code == 400
assert "Entity not found" in response.json()["detail"]
@pytest.mark.asyncio
async def test_edit_entity_invalid_operation(client: AsyncClient, project_url):
"""Test editing with invalid operation returns 400."""
# Create test entity
response = await client.post(
f"{project_url}/knowledge/entities",
json={
"title": "Test Note",
"folder": "test",
"entity_type": "note",
"content": "Original content",
},
)
assert response.status_code == 200
entity = response.json()
# Try invalid operation
response = await client.patch(
f"{project_url}/knowledge/entities/{entity['permalink']}",
json={"operation": "invalid_operation", "content": "content"},
)
assert response.status_code == 422
assert "invalid_operation" in response.json()["detail"][0]["input"]
@pytest.mark.asyncio
async def test_edit_entity_find_replace_missing_find_text(client: AsyncClient, project_url):
"""Test find_replace without find_text returns 400."""
# Create test entity
response = await client.post(
f"{project_url}/knowledge/entities",
json={
"title": "Test Note",
"folder": "test",
"entity_type": "note",
"content": "Original content",
},
)
assert response.status_code == 200
entity = response.json()
# Try find_replace without find_text
response = await client.patch(
f"{project_url}/knowledge/entities/{entity['permalink']}",
json={"operation": "find_replace", "content": "new content"},
)
assert response.status_code == 400
assert "find_text is required" in response.json()["detail"]
@pytest.mark.asyncio
async def test_edit_entity_replace_section_missing_section(client: AsyncClient, project_url):
"""Test replace_section without section parameter returns 400."""
# Create test entity
response = await client.post(
f"{project_url}/knowledge/entities",
json={
"title": "Test Note",
"folder": "test",
"entity_type": "note",
"content": "Original content",
},
)
assert response.status_code == 200
entity = response.json()
# Try replace_section without section
response = await client.patch(
f"{project_url}/knowledge/entities/{entity['permalink']}",
json={"operation": "replace_section", "content": "new content"},
)
assert response.status_code == 400
assert "section is required" in response.json()["detail"]
@pytest.mark.asyncio
async def test_edit_entity_find_replace_not_found(client: AsyncClient, project_url):
"""Test find_replace when text is not found returns 400."""
# Create test entity
response = await client.post(
f"{project_url}/knowledge/entities",
json={
"title": "Test Note",
"folder": "test",
"entity_type": "note",
"content": "This is some content",
},
)
assert response.status_code == 200
entity = response.json()
# Try to replace text that doesn't exist
response = await client.patch(
f"{project_url}/knowledge/entities/{entity['permalink']}",
json={"operation": "find_replace", "content": "new content", "find_text": "nonexistent"},
)
assert response.status_code == 400
assert "Text to replace not found" in response.json()["detail"]
@pytest.mark.asyncio
async def test_edit_entity_find_replace_wrong_expected_count(client: AsyncClient, project_url):
"""Test find_replace with wrong expected_replacements count returns 400."""
# Create test entity with repeated text
response = await client.post(
f"{project_url}/knowledge/entities",
json={
"title": "Sample Note",
"folder": "docs",
"entity_type": "note",
"content": "The word banana appears here. Another banana word here.",
},
)
assert response.status_code == 200
entity = response.json()
# Try to replace with wrong expected count
response = await client.patch(
f"{project_url}/knowledge/entities/{entity['permalink']}",
json={
"operation": "find_replace",
"content": "replacement",
"find_text": "banana",
"expected_replacements": 1, # Wrong - there are actually 2
},
)
assert response.status_code == 400
assert "Expected 1 occurrences" in response.json()["detail"]
assert "but found 2" in response.json()["detail"]
@pytest.mark.asyncio
async def test_edit_entity_search_reindex(client: AsyncClient, project_url):
"""Test that edited entities are reindexed for search."""
# Create test entity
response = await client.post(
f"{project_url}/knowledge/entities",
json={
"title": "Search Test",
"folder": "test",
"entity_type": "note",
"content": "Original searchable content",
},
)
assert response.status_code == 200
entity = response.json()
# Edit the entity
response = await client.patch(
f"{project_url}/knowledge/entities/{entity['permalink']}",
json={"operation": "append", "content": " with unique zebra marker"},
)
assert response.status_code == 200
# Search should find the new content
search_response = await client.post(
f"{project_url}/search/",
json={"text": "zebra marker", "entity_types": ["entity"]},
)
results = search_response.json()["results"]
assert len(results) == 1
assert results[0]["permalink"] == entity["permalink"]
# Move entity endpoint tests
@pytest.mark.asyncio
async def test_move_entity_success(client: AsyncClient, project_url):
"""Test successfully moving an entity to a new location."""
# Create test entity
response = await client.post(
f"{project_url}/knowledge/entities",
json={
"title": "TestNote",
"folder": "source",
"entity_type": "note",
"content": "Test content",
},
)
assert response.status_code == 200
entity = response.json()
original_permalink = entity["permalink"]
# Move entity
move_data = {
"identifier": original_permalink,
"destination_path": "target/MovedNote.md",
}
response = await client.post(f"{project_url}/knowledge/move", json=move_data)
assert response.status_code == 200
response_model = EntityResponse.model_validate(response.json())
assert response_model.file_path == "target/MovedNote.md"
# Verify original entity no longer exists
response = await client.get(f"{project_url}/knowledge/entities/{original_permalink}")
assert response.status_code == 404
# Verify entity exists at new location
response = await client.get(f"{project_url}/knowledge/entities/target/moved-note")
assert response.status_code == 200
moved_entity = response.json()
assert moved_entity["file_path"] == "target/MovedNote.md"
assert moved_entity["permalink"] == "target/moved-note"
# Verify file content using resource endpoint
response = await client.get(f"{project_url}/resource/target/moved-note?content=true")
assert response.status_code == 200
file_content = response.text
assert "Test content" in file_content
@pytest.mark.asyncio
async def test_move_entity_with_folder_creation(client: AsyncClient, project_url):
"""Test moving entity creates necessary folders."""
# Create test entity
response = await client.post(
f"{project_url}/knowledge/entities",
json={
"title": "TestNote",
"folder": "",
"entity_type": "note",
"content": "Test content",
},
)
assert response.status_code == 200
entity = response.json()
# Move to deeply nested path
move_data = {
"identifier": entity["permalink"],
"destination_path": "deeply/nested/folder/MovedNote.md",
}
response = await client.post(f"{project_url}/knowledge/move", json=move_data)
assert response.status_code == 200
# Verify entity exists at new location
response = await client.get(f"{project_url}/knowledge/entities/deeply/nested/folder/moved-note")
assert response.status_code == 200
moved_entity = response.json()
assert moved_entity["file_path"] == "deeply/nested/folder/MovedNote.md"
@pytest.mark.asyncio
async def test_move_entity_with_observations_and_relations(client: AsyncClient, project_url):
"""Test moving entity preserves observations and relations."""
# Create test entity with complex content
content = """# Complex Entity
## Observations
- [note] Important observation #tag1
- [feature] Key feature #feature
- relation to [[SomeOtherEntity]]
- depends on [[Dependency]]
Some additional content."""
response = await client.post(
f"{project_url}/knowledge/entities",
json={
"title": "ComplexEntity",
"folder": "source",
"entity_type": "note",
"content": content,
},
)
assert response.status_code == 200
entity = response.json()
# Verify original observations and relations
assert len(entity["observations"]) == 2
assert len(entity["relations"]) == 2
# Move entity
move_data = {
"identifier": entity["permalink"],
"destination_path": "target/MovedComplex.md",
}
response = await client.post(f"{project_url}/knowledge/move", json=move_data)
assert response.status_code == 200
# Verify moved entity preserves data
response = await client.get(f"{project_url}/knowledge/entities/target/moved-complex")
assert response.status_code == 200
moved_entity = response.json()
# Check observations preserved
assert len(moved_entity["observations"]) == 2
obs_categories = {obs["category"] for obs in moved_entity["observations"]}
assert obs_categories == {"note", "feature"}
# Check relations preserved
assert len(moved_entity["relations"]) == 2
rel_types = {rel["relation_type"] for rel in moved_entity["relations"]}
assert rel_types == {"relation to", "depends on"}
# Verify file content preserved
response = await client.get(f"{project_url}/resource/target/moved-complex?content=true")
assert response.status_code == 200
file_content = response.text
assert "Important observation #tag1" in file_content
assert "[[SomeOtherEntity]]" in file_content
@pytest.mark.asyncio
async def test_move_entity_search_reindexing(client: AsyncClient, project_url):
"""Test that moved entities are properly reindexed for search."""
# Create searchable entity
response = await client.post(
f"{project_url}/knowledge/entities",
json={
"title": "SearchableNote",
"folder": "source",
"entity_type": "note",
"content": "Unique searchable elephant content",
},
)
assert response.status_code == 200
entity = response.json()
# Move entity
move_data = {
"identifier": entity["permalink"],
"destination_path": "target/MovedSearchable.md",
}
response = await client.post(f"{project_url}/knowledge/move", json=move_data)
assert response.status_code == 200
# Search should find entity at new location
search_response = await client.post(
f"{project_url}/search/",
json={"text": "elephant", "entity_types": [SearchItemType.ENTITY.value]},
)
results = search_response.json()["results"]
assert len(results) == 1
assert results[0]["permalink"] == "target/moved-searchable"
@pytest.mark.asyncio
async def test_move_entity_not_found(client: AsyncClient, project_url):
"""Test moving non-existent entity returns 400 error."""
move_data = {
"identifier": "non-existent-entity",
"destination_path": "target/SomeFile.md",
}
response = await client.post(f"{project_url}/knowledge/move", json=move_data)
assert response.status_code == 400
assert "Entity not found" in response.json()["detail"]
@pytest.mark.asyncio
async def test_move_entity_invalid_destination_path(client: AsyncClient, project_url):
"""Test moving entity with invalid destination path."""
# Create test entity
response = await client.post(
f"{project_url}/knowledge/entities",
json={
"title": "TestNote",
"folder": "",
"entity_type": "note",
"content": "Test content",
},
)
assert response.status_code == 200
entity = response.json()
# Test various invalid paths
invalid_paths = [
"/absolute/path.md", # Absolute path
"../parent/path.md", # Parent directory
"", # Empty string
" ", # Whitespace only
]
for invalid_path in invalid_paths:
move_data = {
"identifier": entity["permalink"],
"destination_path": invalid_path,
}
response = await client.post(f"{project_url}/knowledge/move", json=move_data)
assert response.status_code == 422 # Validation error
@pytest.mark.asyncio
async def test_move_entity_destination_exists(client: AsyncClient, project_url):
"""Test moving entity to existing destination returns error."""
# Create source entity
response = await client.post(
f"{project_url}/knowledge/entities",
json={
"title": "SourceNote",
"folder": "source",
"entity_type": "note",
"content": "Source content",
},
)
assert response.status_code == 200
source_entity = response.json()
# Create destination entity
response = await client.post(
f"{project_url}/knowledge/entities",
json={
"title": "DestinationNote",
"folder": "target",
"entity_type": "note",
"content": "Destination content",
},
)
assert response.status_code == 200
# Try to move source to existing destination
move_data = {
"identifier": source_entity["permalink"],
"destination_path": "target/DestinationNote.md",
}
response = await client.post(f"{project_url}/knowledge/move", json=move_data)
assert response.status_code == 400
assert "already exists" in response.json()["detail"]
@pytest.mark.asyncio
async def test_move_entity_missing_identifier(client: AsyncClient, project_url):
"""Test move request with missing identifier."""
move_data = {
"destination_path": "target/SomeFile.md",
}
response = await client.post(f"{project_url}/knowledge/move", json=move_data)
assert response.status_code == 422 # Validation error
@pytest.mark.asyncio
async def test_move_entity_missing_destination(client: AsyncClient, project_url):
"""Test move request with missing destination path."""
move_data = {
"identifier": "some-entity",
}
response = await client.post(f"{project_url}/knowledge/move", json=move_data)
assert response.status_code == 422 # Validation error
@pytest.mark.asyncio
async def test_move_entity_by_file_path(client: AsyncClient, project_url):
"""Test moving entity using file path as identifier."""
# Create test entity
response = await client.post(
f"{project_url}/knowledge/entities",
json={
"title": "TestNote",
"folder": "source",
"entity_type": "note",
"content": "Test content",
},
)
assert response.status_code == 200
entity = response.json()
# Move using file path as identifier
move_data = {
"identifier": entity["file_path"],
"destination_path": "target/MovedByPath.md",
}
response = await client.post(f"{project_url}/knowledge/move", json=move_data)
assert response.status_code == 200
# Verify entity exists at new location
response = await client.get(f"{project_url}/knowledge/entities/target/moved-by-path")
assert response.status_code == 200
moved_entity = response.json()
assert moved_entity["file_path"] == "target/MovedByPath.md"
@pytest.mark.asyncio
async def test_move_entity_by_title(client: AsyncClient, project_url):
"""Test moving entity using title as identifier."""
# Create test entity with unique title
response = await client.post(
f"{project_url}/knowledge/entities",
json={
"title": "UniqueTestTitle",
"folder": "source",
"entity_type": "note",
"content": "Test content",
},
)
assert response.status_code == 200
# Move using title as identifier
move_data = {
"identifier": "UniqueTestTitle",
"destination_path": "target/MovedByTitle.md",
}
response = await client.post(f"{project_url}/knowledge/move", json=move_data)
assert response.status_code == 200
# Verify entity exists at new location
response = await client.get(f"{project_url}/knowledge/entities/target/moved-by-title")
assert response.status_code == 200
moved_entity = response.json()
assert moved_entity["file_path"] == "target/MovedByTitle.md"
assert moved_entity["title"] == "UniqueTestTitle"
```
--------------------------------------------------------------------------------
/tests/sync/test_sync_service.py:
--------------------------------------------------------------------------------
```python
"""Test general sync behavior."""
import asyncio
import os
from datetime import datetime, timezone
from pathlib import Path
from textwrap import dedent
import pytest
from basic_memory.config import ProjectConfig, BasicMemoryConfig
from basic_memory.models import Entity
from basic_memory.repository import EntityRepository
from basic_memory.schemas.search import SearchQuery
from basic_memory.services import EntityService, FileService
from basic_memory.services.search_service import SearchService
from basic_memory.sync.sync_service import SyncService
async def create_test_file(path: Path, content: str = "test content") -> None:
"""Create a test file with given content."""
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content)
async def touch_file(path: Path) -> None:
"""Touch a file to update its mtime (for watermark testing)."""
import time
# Read and rewrite to update mtime
content = path.read_text()
time.sleep(0.5) # Ensure mtime changes and is newer than watermark (500ms)
path.write_text(content)
async def force_full_scan(sync_service: SyncService) -> None:
"""Force next sync to do a full scan by clearing watermark (for testing moves/deletions)."""
if sync_service.entity_repository.project_id is not None:
project = await sync_service.project_repository.find_by_id(
sync_service.entity_repository.project_id
)
if project:
await sync_service.project_repository.update(
project.id,
{
"last_scan_timestamp": None,
"last_file_count": None,
},
)
@pytest.mark.asyncio
async def test_forward_reference_resolution(
sync_service: SyncService,
project_config: ProjectConfig,
entity_service: EntityService,
):
"""Test that forward references get resolved when target file is created."""
project_dir = project_config.home
# First create a file with a forward reference
source_content = """
---
type: knowledge
---
# Source Document
## Relations
- depends_on [[target-doc]]
- depends_on [[target-doc]] # duplicate
"""
await create_test_file(project_dir / "source.md", source_content)
# Initial sync - should create forward reference
await sync_service.sync(project_config.home)
# Verify forward reference
source = await entity_service.get_by_permalink("source")
assert len(source.relations) == 1
assert source.relations[0].to_id is None
assert source.relations[0].to_name == "target-doc"
# Now create the target file
target_content = """
---
type: knowledge
---
# Target Doc
Target content
"""
target_file = project_dir / "target_doc.md"
await create_test_file(target_file, target_content)
# Force full scan to ensure the new file is detected
# Incremental scans have timing precision issues with watermarks on some filesystems
await force_full_scan(sync_service)
# Sync again - should resolve the reference
await sync_service.sync(project_config.home)
# Verify reference is now resolved
source = await entity_service.get_by_permalink("source")
target = await entity_service.get_by_permalink("target-doc")
assert len(source.relations) == 1
assert source.relations[0].to_id == target.id
assert source.relations[0].to_name == target.title
@pytest.mark.asyncio
async def test_resolve_relations_deletes_duplicate_unresolved_relation(
sync_service: SyncService,
project_config: ProjectConfig,
entity_service: EntityService,
):
"""Test that resolve_relations deletes duplicate unresolved relations on IntegrityError.
When resolving a forward reference would create a duplicate (from_id, to_id, relation_type),
the unresolved relation should be deleted since a resolved version already exists.
"""
from basic_memory.models import Relation
project_dir = project_config.home
# Create source entity
source_content = """
---
type: knowledge
---
# Source Entity
Content
"""
await create_test_file(project_dir / "source.md", source_content)
# Create target entity
target_content = """
---
type: knowledge
title: Target Entity
---
# Target Entity
Content
"""
await create_test_file(project_dir / "target.md", target_content)
# Sync to create both entities
await sync_service.sync(project_config.home)
source = await entity_service.get_by_permalink("source")
target = await entity_service.get_by_permalink("target")
# Create a resolved relation (already exists) that the unresolved one would become.
resolved_relation = Relation(
from_id=source.id,
to_id=target.id,
to_name=target.title,
relation_type="relates_to",
)
await sync_service.relation_repository.add(resolved_relation)
# Create an unresolved relation that will resolve to target
unresolved_relation = Relation(
from_id=source.id,
to_id=None, # Unresolved
to_name="target", # Will resolve to target entity
relation_type="relates_to",
)
await sync_service.relation_repository.add(unresolved_relation)
unresolved_id = unresolved_relation.id
# Verify we have the unresolved relation
source = await entity_service.get_by_permalink("source")
unresolved_outgoing = [r for r in source.outgoing_relations if r.to_id is None]
assert len(unresolved_outgoing) == 1
assert unresolved_outgoing[0].id == unresolved_id
assert unresolved_outgoing[0].to_name == "target"
# Call resolve_relations - should hit a real IntegrityError (unique constraint) and delete
# the duplicate unresolved relation.
await sync_service.resolve_relations()
# Verify the unresolved relation was deleted
deleted = await sync_service.relation_repository.find_by_id(unresolved_id)
assert deleted is None
# Verify no unresolved relations remain
unresolved = await sync_service.relation_repository.find_unresolved_relations()
assert len(unresolved) == 0
# Verify only the resolved relation remains
source = await entity_service.get_by_permalink("source")
assert len(source.outgoing_relations) == 1
assert source.outgoing_relations[0].to_id == target.id
@pytest.mark.asyncio
async def test_sync(
sync_service: SyncService, project_config: ProjectConfig, entity_service: EntityService
):
"""Test basic knowledge sync functionality."""
# Create test files
project_dir = project_config.home
# New entity with relation
new_content = """
---
type: knowledge
permalink: concept/test-concept
created: 2023-01-01
modified: 2023-01-01
---
# Test Concept
A test concept.
## Observations
- [design] Core feature
## Relations
- depends_on [[concept/other]]
"""
await create_test_file(project_dir / "concept/test_concept.md", new_content)
# Create related entity in DB that will be deleted
# because file was not found
other = Entity(
permalink="concept/other",
title="Other",
entity_type="test",
file_path="concept/other.md",
checksum="12345678",
content_type="text/markdown",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
await entity_service.repository.add(other)
# Run sync
await sync_service.sync(project_config.home)
# Verify results
entities = await entity_service.repository.find_all()
assert len(entities) == 1
# Find new entity
test_concept = next(e for e in entities if e.permalink == "concept/test-concept")
assert test_concept.entity_type == "knowledge"
# Verify relation was created
# with forward link
entity = await entity_service.get_by_permalink(test_concept.permalink)
relations = entity.relations
assert len(relations) == 1, "Expected 1 relation for entity"
assert relations[0].to_name == "concept/other"
@pytest.mark.asyncio
async def test_sync_hidden_file(
sync_service: SyncService, project_config: ProjectConfig, entity_service: EntityService
):
"""Test basic knowledge sync functionality."""
# Create test files
project_dir = project_config.home
# hidden file
await create_test_file(project_dir / "concept/.hidden.md", "hidden")
# Run sync
await sync_service.sync(project_config.home)
# Verify results
entities = await entity_service.repository.find_all()
assert len(entities) == 0
@pytest.mark.asyncio
async def test_sync_entity_with_nonexistent_relations(
sync_service: SyncService, project_config: ProjectConfig
):
"""Test syncing an entity that references nonexistent entities."""
project_dir = project_config.home
# Create entity that references entities we haven't created yet
content = """
---
type: knowledge
permalink: concept/depends-on-future
created: 2024-01-01
modified: 2024-01-01
---
# Test Dependencies
## Observations
- [design] Testing future dependencies
## Relations
- depends_on [[concept/not_created_yet]]
- uses [[concept/also_future]]
"""
await create_test_file(project_dir / "concept/depends_on_future.md", content)
# Sync
await sync_service.sync(project_config.home)
# Verify entity created but no relations
entity = await sync_service.entity_service.repository.get_by_permalink(
"concept/depends-on-future"
)
assert entity is not None
assert len(entity.relations) == 2
assert entity.relations[0].to_name == "concept/not_created_yet"
assert entity.relations[1].to_name == "concept/also_future"
@pytest.mark.asyncio
async def test_sync_entity_circular_relations(
sync_service: SyncService, project_config: ProjectConfig
):
"""Test syncing entities with circular dependencies."""
project_dir = project_config.home
# Create entity A that depends on B
content_a = """
---
type: knowledge
permalink: concept/entity-a
created: 2024-01-01
modified: 2024-01-01
---
# Entity A
## Observations
- First entity in circular reference
## Relations
- depends_on [[concept/entity-b]]
"""
await create_test_file(project_dir / "concept/entity_a.md", content_a)
# Create entity B that depends on A
content_b = """
---
type: knowledge
permalink: concept/entity-b
created: 2024-01-01
modified: 2024-01-01
---
# Entity B
## Observations
- Second entity in circular reference
## Relations
- depends_on [[concept/entity-a]]
"""
await create_test_file(project_dir / "concept/entity_b.md", content_b)
# Sync
await sync_service.sync(project_config.home)
# Verify both entities and their relations
entity_a = await sync_service.entity_service.repository.get_by_permalink("concept/entity-a")
entity_b = await sync_service.entity_service.repository.get_by_permalink("concept/entity-b")
# outgoing relations
assert len(entity_a.outgoing_relations) == 1
assert len(entity_b.outgoing_relations) == 1
# incoming relations
assert len(entity_a.incoming_relations) == 1
assert len(entity_b.incoming_relations) == 1
# all relations
assert len(entity_a.relations) == 2
assert len(entity_b.relations) == 2
# Verify circular reference works
a_relation = entity_a.outgoing_relations[0]
assert a_relation.to_id == entity_b.id
b_relation = entity_b.outgoing_relations[0]
assert b_relation.to_id == entity_a.id
@pytest.mark.asyncio
async def test_sync_entity_duplicate_relations(
sync_service: SyncService, project_config: ProjectConfig
):
"""Test handling of duplicate relations in an entity."""
project_dir = project_config.home
# Create target entity first
target_content = """
---
type: knowledge
permalink: concept/target
created: 2024-01-01
modified: 2024-01-01
---
# Target Entity
## Observations
- something to observe
"""
await create_test_file(project_dir / "concept/target.md", target_content)
# Create entity with duplicate relations
content = """
---
type: knowledge
permalink: concept/duplicate-relations
created: 2024-01-01
modified: 2024-01-01
---
# Test Duplicates
## Observations
- this has a lot of relations
## Relations
- depends_on [[concept/target]]
- depends_on [[concept/target]] # Duplicate
- uses [[concept/target]] # Different relation type
- uses [[concept/target]] # Duplicate of different type
"""
await create_test_file(project_dir / "concept/duplicate_relations.md", content)
# Sync
await sync_service.sync(project_config.home)
# Verify duplicates are handled
entity = await sync_service.entity_service.repository.get_by_permalink(
"concept/duplicate-relations"
)
# Count relations by type
relation_counts = {}
for rel in entity.relations:
relation_counts[rel.relation_type] = relation_counts.get(rel.relation_type, 0) + 1
# Should only have one of each type
assert relation_counts["depends_on"] == 1
assert relation_counts["uses"] == 1
@pytest.mark.asyncio
async def test_sync_entity_with_random_categories(
sync_service: SyncService, project_config: ProjectConfig
):
"""Test handling of random observation categories."""
project_dir = project_config.home
content = """
---
type: knowledge
permalink: concept/invalid-category
created: 2024-01-01
modified: 2024-01-01
---
# Test Categories
## Observations
- [random category] This is fine
- [ a space category] Should default to note
- This one is not an observation, should be ignored
- [design] This is valid
"""
await create_test_file(project_dir / "concept/invalid_category.md", content)
# Sync
await sync_service.sync(project_config.home)
# Verify observations
entity = await sync_service.entity_service.repository.get_by_permalink(
"concept/invalid-category"
)
assert len(entity.observations) == 3
categories = [obs.category for obs in entity.observations]
# Invalid categories should be converted to default
assert "random category" in categories
# Valid categories preserved
assert "a space category" in categories
assert "design" in categories
@pytest.mark.skip("sometimes fails")
@pytest.mark.asyncio
async def test_sync_entity_with_order_dependent_relations(
sync_service: SyncService, project_config: ProjectConfig
):
"""Test that order of entity syncing doesn't affect relation creation."""
project_dir = project_config.home
# Create several interrelated entities
entities = {
"a": """
---
type: knowledge
permalink: concept/entity-a
created: 2024-01-01
modified: 2024-01-01
---
# Entity A
## Observations
- depends on b
- depends on c
## Relations
- depends_on [[concept/entity-b]]
- depends_on [[concept/entity-c]]
""",
"b": """
---
type: knowledge
permalink: concept/entity-b
created: 2024-01-01
modified: 2024-01-01
---
# Entity B
## Observations
- depends on c
## Relations
- depends_on [[concept/entity-c]]
""",
"c": """
---
type: knowledge
permalink: concept/entity-c
created: 2024-01-01
modified: 2024-01-01
---
# Entity C
## Observations
- depends on a
## Relations
- depends_on [[concept/entity-a]]
""",
}
# Create files in different orders and verify results are the same
for name, content in entities.items():
await create_test_file(project_dir / f"concept/entity_{name}.md", content)
# Sync
await sync_service.sync(project_config.home)
# Verify all relations are created correctly regardless of order
entity_a = await sync_service.entity_service.repository.get_by_permalink("concept/entity-a")
entity_b = await sync_service.entity_service.repository.get_by_permalink("concept/entity-b")
entity_c = await sync_service.entity_service.repository.get_by_permalink("concept/entity-c")
# Verify outgoing relations by checking actual targets
a_outgoing_targets = {rel.to_id for rel in entity_a.outgoing_relations}
assert entity_b.id in a_outgoing_targets, (
f"A should depend on B. A's targets: {a_outgoing_targets}, B's ID: {entity_b.id}"
)
assert entity_c.id in a_outgoing_targets, (
f"A should depend on C. A's targets: {a_outgoing_targets}, C's ID: {entity_c.id}"
)
assert len(entity_a.outgoing_relations) == 2, "A should have exactly 2 outgoing relations"
b_outgoing_targets = {rel.to_id for rel in entity_b.outgoing_relations}
assert entity_c.id in b_outgoing_targets, "B should depend on C"
assert len(entity_b.outgoing_relations) == 1, "B should have exactly 1 outgoing relation"
c_outgoing_targets = {rel.to_id for rel in entity_c.outgoing_relations}
assert entity_a.id in c_outgoing_targets, "C should depend on A"
assert len(entity_c.outgoing_relations) == 1, "C should have exactly 1 outgoing relation"
# Verify incoming relations by checking actual sources
a_incoming_sources = {rel.from_id for rel in entity_a.incoming_relations}
assert entity_c.id in a_incoming_sources, "A should have incoming relation from C"
b_incoming_sources = {rel.from_id for rel in entity_b.incoming_relations}
assert entity_a.id in b_incoming_sources, "B should have incoming relation from A"
c_incoming_sources = {rel.from_id for rel in entity_c.incoming_relations}
assert entity_a.id in c_incoming_sources, "C should have incoming relation from A"
assert entity_b.id in c_incoming_sources, "C should have incoming relation from B"
@pytest.mark.asyncio
async def test_sync_empty_directories(sync_service: SyncService, project_config: ProjectConfig):
"""Test syncing empty directories."""
await sync_service.sync(project_config.home)
# Should not raise exceptions for empty dirs
assert project_config.home.exists()
@pytest.mark.skip("flaky on Windows due to filesystem timing precision")
@pytest.mark.asyncio
async def test_sync_file_modified_during_sync(
sync_service: SyncService, project_config: ProjectConfig
):
"""Test handling of files that change during sync process."""
# Create initial files
doc_path = project_config.home / "changing.md"
await create_test_file(
doc_path,
"""
---
type: knowledge
id: changing
created: 2024-01-01
modified: 2024-01-01
---
# Knowledge File
## Observations
- This is a test
""",
)
# Setup async modification during sync
async def modify_file():
await asyncio.sleep(0.1) # Small delay to ensure sync has started
doc_path.write_text("Modified during sync")
# Run sync and modification concurrently
await asyncio.gather(sync_service.sync(project_config.home), modify_file())
# Verify final state
doc = await sync_service.entity_service.repository.get_by_permalink("changing")
assert doc is not None
# if we failed in the middle of a sync, the next one should fix it.
if doc.checksum is None:
await sync_service.sync(project_config.home)
doc = await sync_service.entity_service.repository.get_by_permalink("changing")
assert doc.checksum is not None
@pytest.mark.asyncio
async def test_permalink_formatting(
sync_service: SyncService, project_config: ProjectConfig, entity_service: EntityService
):
"""Test that permalinks are properly formatted during sync."""
# Test cases with different filename formats
test_files = {
# filename -> expected permalink
"my_awesome_feature.md": "my-awesome-feature",
"MIXED_CASE_NAME.md": "mixed-case-name",
"spaces and_underscores.md": "spaces-and-underscores",
"design/model_refactor.md": "design/model-refactor",
"test/multiple_word_directory/feature_name.md": "test/multiple-word-directory/feature-name",
}
# Create test files
content: str = """
---
type: knowledge
created: 2024-01-01
modified: 2024-01-01
---
# Test File
Testing permalink generation.
"""
for filename, _ in test_files.items():
await create_test_file(project_config.home / filename, content)
# Run sync once after all files are created
await sync_service.sync(project_config.home)
# Verify permalinks
entities = await entity_service.repository.find_all()
for filename, expected_permalink in test_files.items():
# Find entity for this file
entity = next(e for e in entities if e.file_path == filename)
assert entity.permalink == expected_permalink, (
f"File {filename} should have permalink {expected_permalink}"
)
@pytest.mark.asyncio
async def test_handle_entity_deletion(
test_graph,
sync_service: SyncService,
entity_repository: EntityRepository,
search_service: SearchService,
):
"""Test deletion of entity cleans up search index."""
root_entity = test_graph["root"]
# Delete the entity
await sync_service.handle_delete(root_entity.file_path)
# Verify entity is gone from db
assert await entity_repository.get_by_permalink(root_entity.permalink) is None
# Verify entity is gone from search index
entity_results = await search_service.search(SearchQuery(text=root_entity.title))
assert len(entity_results) == 0
obs_results = await search_service.search(SearchQuery(text="Root note 1"))
assert len(obs_results) == 0
# Verify relations from root entity are gone
# (Postgres stemming would match "connects_to" with "connected_to", so use permalink)
rel_results = await search_service.search(SearchQuery(permalink=root_entity.permalink))
assert len(rel_results) == 0
@pytest.mark.asyncio
async def test_sync_preserves_timestamps(
sync_service: SyncService,
project_config: ProjectConfig,
entity_service: EntityService,
):
"""Test that sync preserves file timestamps and frontmatter dates."""
project_dir = project_config.home
# Create a file with explicit frontmatter dates
frontmatter_content = """
---
type: knowledge
---
# Explicit Dates
Testing frontmatter dates
"""
await create_test_file(project_dir / "explicit_dates.md", frontmatter_content)
# Create a file without dates (will use file timestamps)
file_dates_content = """
---
type: knowledge
---
# File Dates
Testing file timestamps
"""
file_path = project_dir / "file_dates3.md"
await create_test_file(file_path, file_dates_content)
# Run sync
await sync_service.sync(project_config.home)
# Check explicit frontmatter dates
explicit_entity = await entity_service.get_by_permalink("explicit-dates")
assert explicit_entity.created_at is not None
assert explicit_entity.updated_at is not None
# Check file timestamps
file_entity = await entity_service.get_by_permalink("file-dates3")
file_stats = file_path.stat()
# Compare using epoch timestamps to handle timezone differences correctly
# This ensures we're comparing the actual points in time, not display representations
entity_created_epoch = file_entity.created_at.timestamp()
entity_updated_epoch = file_entity.updated_at.timestamp()
# Allow 2s difference on Windows due to filesystem timing precision
tolerance = 2 if os.name == "nt" else 1
assert abs(entity_created_epoch - file_stats.st_ctime) < tolerance
assert abs(entity_updated_epoch - file_stats.st_mtime) < tolerance # Allow tolerance difference
@pytest.mark.asyncio
async def test_sync_updates_timestamps_on_file_modification(
sync_service: SyncService,
project_config: ProjectConfig,
entity_service: EntityService,
):
"""Test that sync updates entity timestamps when files are modified.
This test specifically validates that when an existing file is modified and re-synced,
the updated_at timestamp in the database reflects the file's actual modification time,
not the database operation time. This is critical for accurate temporal ordering in
search and recent_activity queries.
"""
project_dir = project_config.home
# Create initial file
initial_content = """
---
type: knowledge
---
# Test File
Initial content for timestamp test
"""
file_path = project_dir / "timestamp_test.md"
await create_test_file(file_path, initial_content)
# Initial sync
await sync_service.sync(project_config.home)
# Get initial entity and timestamps
entity_before = await entity_service.get_by_permalink("timestamp-test")
initial_updated_at = entity_before.updated_at
# Modify the file content and update mtime to be newer than watermark
modified_content = """
---
type: knowledge
---
# Test File
Modified content for timestamp test
## Observations
- [test] This was modified
"""
file_path.write_text(modified_content)
# Touch file to ensure mtime is newer than watermark
# This uses our helper which sleeps 500ms and rewrites to guarantee mtime change
await touch_file(file_path)
# Get the file's modification time after our changes
file_stats_after_modification = file_path.stat()
# Force full scan to ensure the modified file is detected
# (incremental scans have timing precision issues with watermarks on some filesystems)
await force_full_scan(sync_service)
# Re-sync the modified file
await sync_service.sync(project_config.home)
# Get entity after re-sync
entity_after = await entity_service.get_by_permalink("timestamp-test")
# Verify that updated_at changed
assert entity_after.updated_at != initial_updated_at, (
"updated_at should change when file is modified"
)
# Verify that updated_at matches the file's modification time, not db operation time
entity_updated_epoch = entity_after.updated_at.timestamp()
file_mtime = file_stats_after_modification.st_mtime
# Allow 2s difference on Windows due to filesystem timing precision
tolerance = 2 if os.name == "nt" else 1
assert abs(entity_updated_epoch - file_mtime) < tolerance, (
f"Entity updated_at ({entity_after.updated_at}) should match file mtime "
f"({datetime.fromtimestamp(file_mtime)}) within {tolerance}s tolerance"
)
# Verify the content was actually updated
assert len(entity_after.observations) == 1
assert entity_after.observations[0].content == "This was modified"
@pytest.mark.asyncio
async def test_file_move_updates_search_index(
sync_service: SyncService,
project_config: ProjectConfig,
search_service: SearchService,
):
"""Test that moving a file updates its path in the search index."""
project_dir = project_config.home
# Create initial file
content = """
---
type: knowledge
---
# Test Move
Content for move test
"""
old_path = project_dir / "old" / "test_move.md"
old_path.parent.mkdir(parents=True)
await create_test_file(old_path, content)
# Initial sync
await sync_service.sync(project_config.home)
# Move the file
new_path = project_dir / "new" / "moved_file.md"
new_path.parent.mkdir(parents=True)
old_path.rename(new_path)
# Force full scan to detect the move
# (rename doesn't update mtime, so incremental scan won't find it)
await force_full_scan(sync_service)
# Second sync should detect the move
await sync_service.sync(project_config.home)
# Check search index has updated path
results = await search_service.search(SearchQuery(text="Content for move test"))
assert len(results) == 1
assert results[0].file_path == new_path.relative_to(project_dir).as_posix()
@pytest.mark.asyncio
async def test_sync_null_checksum_cleanup(
sync_service: SyncService,
project_config: ProjectConfig,
entity_service: EntityService,
):
"""Test handling of entities with null checksums from incomplete syncs."""
# Create entity with null checksum (simulating incomplete sync)
entity = Entity(
permalink="concept/incomplete",
title="Incomplete",
entity_type="test",
file_path="concept/incomplete.md",
checksum=None, # Null checksum
content_type="text/markdown",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
await entity_service.repository.add(entity)
# Create corresponding file
content = """
---
type: knowledge
id: concept/incomplete
created: 2024-01-01
modified: 2024-01-01
---
# Incomplete Entity
## Observations
- Testing cleanup
"""
await create_test_file(project_config.home / "concept/incomplete.md", content)
# Run sync
await sync_service.sync(project_config.home)
# Verify entity was properly synced
updated = await entity_service.get_by_permalink("concept/incomplete")
assert updated.checksum is not None
@pytest.mark.asyncio
async def test_sync_permalink_resolved(
sync_service: SyncService, project_config: ProjectConfig, file_service: FileService, app_config
):
"""Test that we resolve duplicate permalinks on sync ."""
project_dir = project_config.home
# Create initial file
content = """
---
type: knowledge
---
# Test Move
Content for move test
"""
old_path = project_dir / "old" / "test_move.md"
old_path.parent.mkdir(parents=True)
await create_test_file(old_path, content)
# Initial sync
await sync_service.sync(project_config.home)
# Move the file
new_path = project_dir / "new" / "moved_file.md"
new_path.parent.mkdir(parents=True)
old_path.rename(new_path)
# Force full scan to detect the move
# (rename doesn't update mtime, so incremental scan won't find it)
await force_full_scan(sync_service)
# Sync again
await sync_service.sync(project_config.home)
file_content, _ = await file_service.read_file(new_path)
assert "permalink: new/moved-file" in file_content
# Create another that has the same permalink
content = """
---
type: knowledge
permalink: new/moved-file
---
# Test Move
Content for move test
"""
old_path = project_dir / "old" / "test_move.md"
old_path.parent.mkdir(parents=True, exist_ok=True)
await create_test_file(old_path, content)
# Force full scan to detect the new file
# (file just created may not be newer than watermark due to timing precision)
await force_full_scan(sync_service)
# Sync new file
await sync_service.sync(project_config.home)
# assert permalink is unique
file_content, _ = await file_service.read_file(old_path)
assert "permalink: new/moved-file-1" in file_content
@pytest.mark.asyncio
async def test_sync_permalink_resolved_on_update(
sync_service: SyncService,
project_config: ProjectConfig,
file_service: FileService,
):
"""Test that sync resolves permalink conflicts on update."""
project_dir = project_config.home
one_file = project_dir / "one.md"
two_file = project_dir / "two.md"
await create_test_file(
one_file,
content=dedent(
"""
---
permalink: one
---
test content
"""
),
)
await create_test_file(
two_file,
content=dedent(
"""
---
permalink: two
---
test content
"""
),
)
# Run sync
await sync_service.sync(project_config.home)
# Check permalinks
file_one_content, _ = await file_service.read_file(one_file)
assert "permalink: one" in file_one_content
file_two_content, _ = await file_service.read_file(two_file)
assert "permalink: two" in file_two_content
# update the second file with a duplicate permalink
updated_content = """
---
title: two.md
type: note
permalink: one
tags: []
---
test content
"""
two_file.write_text(updated_content)
# Force full scan to detect the modified file
# (file just modified may not be newer than watermark due to timing precision)
await force_full_scan(sync_service)
# Run sync
await sync_service.sync(project_config.home)
# Check permalinks
file_two_content, _ = await file_service.read_file(two_file)
assert "permalink: two" in file_two_content
# new content with duplicate permalink
new_content = """
---
title: new.md
type: note
permalink: one
tags: []
---
test content
"""
new_file = project_dir / "new.md"
await create_test_file(new_file, new_content)
# Force full scan to detect the new file
# (file just created may not be newer than watermark due to timing precision)
await force_full_scan(sync_service)
# Run another time
await sync_service.sync(project_config.home)
# Should have deduplicated permalink
new_file_content, _ = await file_service.read_file(new_file)
assert "permalink: one-1" in new_file_content
@pytest.mark.asyncio
async def test_sync_permalink_not_created_if_no_frontmatter(
sync_service: SyncService,
project_config: ProjectConfig,
file_service: FileService,
):
"""Test that sync resolves permalink conflicts on update."""
project_dir = project_config.home
file = project_dir / "one.md"
await create_test_file(file)
# Run sync
await sync_service.sync(project_config.home)
# Check permalink not created
file_content, _ = await file_service.read_file(file)
assert "permalink:" not in file_content
@pytest.fixture
def test_config_update_permamlinks_on_move(app_config) -> BasicMemoryConfig:
"""Test configuration using in-memory DB."""
app_config.update_permalinks_on_move = True
return app_config
@pytest.mark.asyncio
async def test_sync_permalink_updated_on_move(
test_config_update_permamlinks_on_move: BasicMemoryConfig,
project_config: ProjectConfig,
sync_service: SyncService,
file_service: FileService,
):
"""Test that we update a permalink on a file move if set in config ."""
project_dir = project_config.home
# Create initial file
content = dedent(
"""
---
type: knowledge
---
# Test Move
Content for move test
"""
)
old_path = project_dir / "old" / "test_move.md"
old_path.parent.mkdir(parents=True)
await create_test_file(old_path, content)
# Initial sync
await sync_service.sync(project_config.home)
# verify permalink
old_content, _ = await file_service.read_file(old_path)
assert "permalink: old/test-move" in old_content
# Move the file
new_path = project_dir / "new" / "moved_file.md"
new_path.parent.mkdir(parents=True)
old_path.rename(new_path)
# Force full scan to detect the move
# (rename doesn't update mtime, so incremental scan won't find it)
await force_full_scan(sync_service)
# Sync again
await sync_service.sync(project_config.home)
file_content, _ = await file_service.read_file(new_path)
assert "permalink: new/moved-file" in file_content
@pytest.mark.asyncio
async def test_sync_non_markdown_files(sync_service, project_config, test_files):
"""Test syncing non-markdown files."""
report = await sync_service.sync(project_config.home)
assert report.total == 2
# Check files were detected
assert test_files["pdf"].name in [f for f in report.new]
assert test_files["image"].name in [f for f in report.new]
# Verify entities were created
pdf_entity = await sync_service.entity_repository.get_by_file_path(str(test_files["pdf"].name))
assert pdf_entity is not None, "PDF entity should have been created"
assert pdf_entity.content_type == "application/pdf"
image_entity = await sync_service.entity_repository.get_by_file_path(
str(test_files["image"].name)
)
assert image_entity.content_type == "image/png"
@pytest.mark.asyncio
async def test_sync_non_markdown_files_modified(
sync_service, project_config, test_files, file_service
):
"""Test syncing non-markdown files."""
report = await sync_service.sync(project_config.home)
assert report.total == 2
# Check files were detected
assert test_files["pdf"].name in [f for f in report.new]
assert test_files["image"].name in [f for f in report.new]
test_files["pdf"].write_text("New content")
test_files["image"].write_text("New content")
# Force full scan to detect the modified files
# (files just modified may not be newer than watermark due to timing precision)
await force_full_scan(sync_service)
report = await sync_service.sync(project_config.home)
assert len(report.modified) == 2
pdf_file_content, pdf_checksum = await file_service.read_file(test_files["pdf"].name)
image_file_content, img_checksum = await file_service.read_file(test_files["image"].name)
pdf_entity = await sync_service.entity_repository.get_by_file_path(str(test_files["pdf"].name))
image_entity = await sync_service.entity_repository.get_by_file_path(
str(test_files["image"].name)
)
assert pdf_entity.checksum == pdf_checksum
assert image_entity.checksum == img_checksum
@pytest.mark.asyncio
async def test_sync_non_markdown_files_move(sync_service, project_config, test_files):
"""Test syncing non-markdown files updates permalink"""
report = await sync_service.sync(project_config.home)
assert report.total == 2
# Check files were detected
assert test_files["pdf"].name in [f for f in report.new]
assert test_files["image"].name in [f for f in report.new]
test_files["pdf"].rename(project_config.home / "moved_pdf.pdf")
# Force full scan to detect the move
# (rename doesn't update mtime, so incremental scan won't find it)
await force_full_scan(sync_service)
report2 = await sync_service.sync(project_config.home)
assert len(report2.moves) == 1
# Verify entity is updated
pdf_entity = await sync_service.entity_repository.get_by_file_path("moved_pdf.pdf")
assert pdf_entity is not None
assert pdf_entity.permalink is None
@pytest.mark.asyncio
async def test_sync_non_markdown_files_deleted(sync_service, project_config, test_files):
"""Test syncing non-markdown files updates permalink"""
report = await sync_service.sync(project_config.home)
assert report.total == 2
# Check files were detected
assert test_files["pdf"].name in [f for f in report.new]
assert test_files["image"].name in [f for f in report.new]
test_files["pdf"].unlink()
report2 = await sync_service.sync(project_config.home)
assert len(report2.deleted) == 1
# Verify entity is deleted
pdf_entity = await sync_service.entity_repository.get_by_file_path("moved_pdf.pdf")
assert pdf_entity is None
@pytest.mark.asyncio
async def test_sync_non_markdown_files_move_with_delete(
sync_service, project_config, test_files, file_service
):
"""Test syncing non-markdown files handles file deletes and renames during sync"""
# Create initial files
await create_test_file(project_config.home / "doc.pdf", "content1")
await create_test_file(project_config.home / "other/doc-1.pdf", "content2")
# Initial sync
await sync_service.sync(project_config.home)
# First move/delete the original file to make way for the move
(project_config.home / "doc.pdf").unlink()
(project_config.home / "other/doc-1.pdf").rename(project_config.home / "doc.pdf")
# Sync again
await sync_service.sync(project_config.home)
# Verify the changes
moved_entity = await sync_service.entity_repository.get_by_file_path("doc.pdf")
assert moved_entity is not None
assert moved_entity.permalink is None
file_content, _ = await file_service.read_file("doc.pdf")
assert "content2" in file_content
@pytest.mark.asyncio
async def test_sync_relation_to_non_markdown_file(
sync_service: SyncService, project_config: ProjectConfig, file_service: FileService, test_files
):
"""Test that sync resolves permalink conflicts on update."""
project_dir = project_config.home
content = f"""
---
title: a note
type: note
tags: []
---
- relates_to [[{test_files["pdf"].name}]]
"""
note_file = project_dir / "note.md"
await create_test_file(note_file, content)
# Run sync
await sync_service.sync(project_config.home)
# Check permalinks
file_one_content, _ = await file_service.read_file(note_file)
assert (
f"""---
title: a note
type: note
tags: []
permalink: note
---
- relates_to [[{test_files["pdf"].name}]]
""".strip()
== file_one_content
)
@pytest.mark.asyncio
async def test_sync_regular_file_race_condition_handling(
sync_service: SyncService, project_config: ProjectConfig
):
"""Test that sync_regular_file handles race condition with IntegrityError (lines 380-401)."""
from datetime import datetime, timezone
# Create a test file
test_file = project_config.home / "test_race.md"
test_content = """
---
type: knowledge
---
# Test Race Condition
This is a test file for race condition handling.
"""
await create_test_file(test_file, test_content)
rel_path = test_file.relative_to(project_config.home).as_posix()
# Create an existing entity with the same file_path to force a real DB IntegrityError
# on the "add" call (same effect as the race-condition branch).
await sync_service.entity_repository.add(
Entity(
entity_type="file",
file_path=rel_path,
checksum="old_checksum",
title="Test Race Condition",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
content_type="text/markdown",
mtime=None,
size=None,
)
)
# Call sync_regular_file (new=True) - should fall back to update path
entity, checksum = await sync_service.sync_regular_file(rel_path, new=True)
assert entity is not None
assert entity.file_path == rel_path
assert entity.checksum == checksum
@pytest.mark.asyncio
async def test_circuit_breaker_should_skip_after_three_recorded_failures(
sync_service: SyncService, project_config: ProjectConfig
):
"""Circuit breaker: after 3 recorded failures, unchanged file should be skipped."""
project_dir = project_config.home
test_file = project_dir / "failing_file.md"
await create_test_file(test_file, "---\ntype: note\n---\ncontent\n")
rel_path = test_file.relative_to(project_dir).as_posix()
await sync_service._record_failure(rel_path, "failure 1")
await sync_service._record_failure(rel_path, "failure 2")
await sync_service._record_failure(rel_path, "failure 3")
assert await sync_service._should_skip_file(rel_path) is True
assert rel_path in sync_service._file_failures
assert sync_service._file_failures[rel_path].count == 3
@pytest.mark.asyncio
async def test_circuit_breaker_resets_when_checksum_changes(
sync_service: SyncService, project_config: ProjectConfig
):
"""Circuit breaker: if file checksum changes, it should be retried (not skipped)."""
project_dir = project_config.home
test_file = project_dir / "changing_file.md"
await create_test_file(test_file, "---\ntype: note\n---\ncontent\n")
rel_path = test_file.relative_to(project_dir).as_posix()
await sync_service._record_failure(rel_path, "failure 1")
await sync_service._record_failure(rel_path, "failure 2")
await sync_service._record_failure(rel_path, "failure 3")
assert await sync_service._should_skip_file(rel_path) is True
# Change content → checksum changes → _should_skip_file should reset and allow retry
test_file.write_text("---\ntype: note\n---\nchanged content\n")
assert await sync_service._should_skip_file(rel_path) is False
assert rel_path not in sync_service._file_failures
@pytest.mark.asyncio
async def test_record_failure_uses_empty_checksum_when_checksum_computation_fails(
sync_service: SyncService,
):
"""_record_failure() should not crash if checksum computation fails."""
missing_path = "does-not-exist.md"
await sync_service._record_failure(missing_path, "boom")
assert missing_path in sync_service._file_failures
assert sync_service._file_failures[missing_path].last_checksum == ""
@pytest.mark.asyncio
async def test_sync_fatal_error_terminates_sync_immediately(
sync_service: SyncService, project_config: ProjectConfig, entity_service: EntityService
):
"""Test that SyncFatalError terminates sync immediately without circuit breaker retry.
This tests the fix for issue #188 where project deletion during sync should
terminate immediately rather than retrying each file 3 times.
"""
pytest.skip(
"SyncFatalError behavior is excluded from coverage and not reliably reproducible "
"without patching (depends on project deletion during sync)."
)
@pytest.mark.asyncio
async def test_scan_directory_basic(sync_service: SyncService, project_config: ProjectConfig):
"""Test basic streaming directory scan functionality."""
project_dir = project_config.home
# Create test files in different directories
await create_test_file(project_dir / "root.md", "root content")
await create_test_file(project_dir / "subdir/file1.md", "file 1 content")
await create_test_file(project_dir / "subdir/file2.md", "file 2 content")
await create_test_file(project_dir / "subdir/nested/file3.md", "file 3 content")
# Collect results from streaming iterator
results = []
async for file_path, stat_info in sync_service.scan_directory(project_dir):
rel_path = Path(file_path).relative_to(project_dir).as_posix()
results.append((rel_path, stat_info))
# Verify all files were found
file_paths = {rel_path for rel_path, _ in results}
assert "root.md" in file_paths
assert "subdir/file1.md" in file_paths
assert "subdir/file2.md" in file_paths
assert "subdir/nested/file3.md" in file_paths
assert len(file_paths) == 4
# Verify stat info is present for each file
for rel_path, stat_info in results:
assert stat_info is not None
assert stat_info.st_size > 0 # Files have content
assert stat_info.st_mtime > 0 # Have modification time
@pytest.mark.asyncio
async def test_scan_directory_respects_ignore_patterns(
sync_service: SyncService, project_config: ProjectConfig
):
"""Test that streaming scan respects .gitignore patterns."""
project_dir = project_config.home
# Create .gitignore file in project (will be used along with .bmignore)
(project_dir / ".gitignore").write_text("*.ignored\n.hidden/\n")
# Reload ignore patterns using project's .gitignore
from basic_memory.ignore_utils import load_gitignore_patterns
sync_service._ignore_patterns = load_gitignore_patterns(project_dir)
# Create test files - some should be ignored
await create_test_file(project_dir / "included.md", "included")
await create_test_file(project_dir / "excluded.ignored", "excluded")
await create_test_file(project_dir / ".hidden/secret.md", "secret")
await create_test_file(project_dir / "subdir/file.md", "file")
# Collect results
results = []
async for file_path, stat_info in sync_service.scan_directory(project_dir):
rel_path = Path(file_path).relative_to(project_dir).as_posix()
results.append(rel_path)
# Verify ignored files were not returned
assert "included.md" in results
assert "subdir/file.md" in results
assert "excluded.ignored" not in results
assert ".hidden/secret.md" not in results
assert ".bmignore" not in results # .bmignore itself should be ignored
@pytest.mark.asyncio
async def test_scan_directory_cached_stat_info(
sync_service: SyncService, project_config: ProjectConfig
):
"""Test that streaming scan provides cached stat info (no redundant stat calls)."""
project_dir = project_config.home
# Create test file
test_file = project_dir / "test.md"
await create_test_file(test_file, "test content")
# Get stat info from streaming scan
async for file_path, stat_info in sync_service.scan_directory(project_dir):
if Path(file_path).name == "test.md":
# Get independent stat for comparison
independent_stat = test_file.stat()
# Verify stat info matches (cached stat should be accurate)
assert stat_info.st_size == independent_stat.st_size
assert abs(stat_info.st_mtime - independent_stat.st_mtime) < 1 # Allow 1s tolerance
assert abs(stat_info.st_ctime - independent_stat.st_ctime) < 1
break
@pytest.mark.asyncio
async def test_scan_directory_empty_directory(
sync_service: SyncService, project_config: ProjectConfig
):
"""Test streaming scan on empty directory (ignoring hidden files)."""
project_dir = project_config.home
# Directory exists but has no user files (may have .basic-memory config dir)
assert project_dir.exists()
# Don't create any user files - just scan empty directory
# Scan should yield no results (hidden files are ignored by default)
results = []
async for file_path, stat_info in sync_service.scan_directory(project_dir):
results.append(file_path)
# Should find no files (config dirs are hidden and ignored)
assert len(results) == 0
@pytest.mark.asyncio
async def test_scan_directory_handles_permission_error(
sync_service: SyncService, project_config: ProjectConfig
):
"""Test that streaming scan handles permission errors gracefully."""
import sys
# Skip on Windows - permission handling is different
if sys.platform == "win32":
pytest.skip("Permission tests not reliable on Windows")
project_dir = project_config.home
# Create accessible file
await create_test_file(project_dir / "accessible.md", "accessible")
# Create restricted directory
restricted_dir = project_dir / "restricted"
restricted_dir.mkdir()
await create_test_file(restricted_dir / "secret.md", "secret")
# Remove read permission from restricted directory
restricted_dir.chmod(0o000)
try:
# Scan should handle permission error and continue
results = []
async for file_path, stat_info in sync_service.scan_directory(project_dir):
rel_path = Path(file_path).relative_to(project_dir).as_posix()
results.append(rel_path)
# Should have found accessible file but not restricted one
assert "accessible.md" in results
assert "restricted/secret.md" not in results
finally:
# Restore permissions for cleanup
restricted_dir.chmod(0o755)
@pytest.mark.asyncio
async def test_scan_directory_non_markdown_files(
sync_service: SyncService, project_config: ProjectConfig
):
"""Test that streaming scan finds all file types, not just markdown."""
project_dir = project_config.home
# Create various file types
await create_test_file(project_dir / "doc.md", "markdown")
(project_dir / "image.png").write_bytes(b"PNG content")
(project_dir / "data.json").write_text('{"key": "value"}')
(project_dir / "script.py").write_text("print('hello')")
# Collect results
results = []
async for file_path, stat_info in sync_service.scan_directory(project_dir):
rel_path = Path(file_path).relative_to(project_dir).as_posix()
results.append(rel_path)
# All files should be found
assert "doc.md" in results
assert "image.png" in results
assert "data.json" in results
assert "script.py" in results
@pytest.mark.asyncio
async def test_file_service_checksum_correctness(
sync_service: SyncService, project_config: ProjectConfig
):
"""Test that FileService computes correct checksums."""
import hashlib
project_dir = project_config.home
# Test small markdown file
small_content = "Test content for checksum validation" * 10
small_file = project_dir / "small.md"
await create_test_file(small_file, small_content)
rel_path = small_file.relative_to(project_dir).as_posix()
checksum = await sync_service.file_service.compute_checksum(rel_path)
# Verify checksum is correct
expected = hashlib.sha256(small_content.encode("utf-8")).hexdigest()
assert checksum == expected
assert len(checksum) == 64 # SHA256 hex digest length
@pytest.mark.asyncio
async def test_sync_handles_file_not_found_gracefully(
sync_service: SyncService, project_config: ProjectConfig
):
"""Test that FileNotFoundError during sync is handled gracefully.
This tests the fix for issue #386 where files existing in the database
but missing from the filesystem would crash the sync worker.
"""
project_dir = project_config.home
# Create a test file
test_file = project_dir / "missing_file.md"
await create_test_file(
test_file,
dedent(
"""
---
type: knowledge
permalink: missing-file
---
# Missing File
Content that will disappear
"""
),
)
# Sync to add entity to database
await sync_service.sync(project_dir)
# Verify entity was created
entity = await sync_service.entity_repository.get_by_file_path("missing_file.md")
assert entity is not None
assert entity.permalink == "missing-file"
# Delete the file but leave the entity in database (simulating inconsistency)
test_file.unlink()
# Sync the missing file directly: sync_markdown_file will raise FileNotFoundError naturally,
# and sync_file() should treat it as deletion.
await sync_service.sync_file("missing_file.md", new=False)
# Entity should be deleted from database
entity = await sync_service.entity_repository.get_by_file_path("missing_file.md")
assert entity is None, "Orphaned entity should be deleted when file is not found"
```
--------------------------------------------------------------------------------
/src/basic_memory/sync/sync_service.py:
--------------------------------------------------------------------------------
```python
"""Service for syncing files between filesystem and database."""
import asyncio
import os
import sys
import time
from collections import OrderedDict
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import AsyncIterator, Dict, List, Optional, Set, Tuple
import aiofiles.os
from loguru import logger
from sqlalchemy.exc import IntegrityError
from basic_memory import db
from basic_memory.config import BasicMemoryConfig, ConfigManager
from basic_memory.file_utils import has_frontmatter
from basic_memory.ignore_utils import load_bmignore_patterns, should_ignore_path
from basic_memory.markdown import EntityParser, MarkdownProcessor
from basic_memory.models import Entity, Project
from basic_memory.repository import (
EntityRepository,
RelationRepository,
ObservationRepository,
ProjectRepository,
)
from basic_memory.repository.search_repository import create_search_repository
from basic_memory.services import EntityService, FileService
from basic_memory.services.exceptions import SyncFatalError
from basic_memory.services.link_resolver import LinkResolver
from basic_memory.services.search_service import SearchService
# Circuit breaker configuration
MAX_CONSECUTIVE_FAILURES = 3
@dataclass
class FileFailureInfo:
"""Track failure information for a file that repeatedly fails to sync.
Attributes:
count: Number of consecutive failures
first_failure: Timestamp of first failure in current sequence
last_failure: Timestamp of most recent failure
last_error: Error message from most recent failure
last_checksum: Checksum of file when it last failed (for detecting file changes)
"""
count: int
first_failure: datetime
last_failure: datetime
last_error: str
last_checksum: str
@dataclass
class SkippedFile:
"""Information about a file that was skipped due to repeated failures.
Attributes:
path: File path relative to project root
reason: Error message from last failure
failure_count: Number of consecutive failures
first_failed: Timestamp of first failure
"""
path: str
reason: str
failure_count: int
first_failed: datetime
@dataclass
class SyncReport:
"""Report of file changes found compared to database state.
Attributes:
total: Total number of files in directory being synced
new: Files that exist on disk but not in database
modified: Files that exist in both but have different checksums
deleted: Files that exist in database but not on disk
moves: Files that have been moved from one location to another
checksums: Current checksums for files on disk
skipped_files: Files that were skipped due to repeated failures
"""
# We keep paths as strings in sets/dicts for easier serialization
new: Set[str] = field(default_factory=set)
modified: Set[str] = field(default_factory=set)
deleted: Set[str] = field(default_factory=set)
moves: Dict[str, str] = field(default_factory=dict) # old_path -> new_path
checksums: Dict[str, str] = field(default_factory=dict) # path -> checksum
skipped_files: List[SkippedFile] = field(default_factory=list)
@property
def total(self) -> int:
"""Total number of changes."""
return len(self.new) + len(self.modified) + len(self.deleted) + len(self.moves)
@dataclass
class ScanResult:
"""Result of scanning a directory."""
# file_path -> checksum
files: Dict[str, str] = field(default_factory=dict)
# checksum -> file_path
checksums: Dict[str, str] = field(default_factory=dict)
# file_path -> error message
errors: Dict[str, str] = field(default_factory=dict)
class SyncService:
"""Syncs documents and knowledge files with database."""
def __init__(
self,
app_config: BasicMemoryConfig,
entity_service: EntityService,
entity_parser: EntityParser,
entity_repository: EntityRepository,
relation_repository: RelationRepository,
project_repository: ProjectRepository,
search_service: SearchService,
file_service: FileService,
):
self.app_config = app_config
self.entity_service = entity_service
self.entity_parser = entity_parser
self.entity_repository = entity_repository
self.relation_repository = relation_repository
self.project_repository = project_repository
self.search_service = search_service
self.file_service = file_service
# Load ignore patterns once at initialization for performance
self._ignore_patterns = load_bmignore_patterns()
# Circuit breaker: track file failures to prevent infinite retry loops
# Use OrderedDict for LRU behavior with bounded size to prevent unbounded memory growth
self._file_failures: OrderedDict[str, FileFailureInfo] = OrderedDict()
self._max_tracked_failures = 100 # Limit failure cache size
async def _should_skip_file(self, path: str) -> bool:
"""Check if file should be skipped due to repeated failures.
Computes current file checksum and compares with last failed checksum.
If checksums differ, file has changed and we should retry.
Args:
path: File path to check
Returns:
True if file should be skipped, False otherwise
"""
if path not in self._file_failures:
return False
failure_info = self._file_failures[path]
# Check if failure count exceeds threshold
if failure_info.count < MAX_CONSECUTIVE_FAILURES:
return False
# Compute current checksum to see if file changed
try:
current_checksum = await self.file_service.compute_checksum(path)
# If checksum changed, file was modified - reset and retry
if current_checksum != failure_info.last_checksum:
logger.info(
f"File {path} changed since last failure (checksum differs), "
f"resetting failure count and retrying"
)
del self._file_failures[path]
return False
except Exception as e:
# If we can't compute checksum, log but still skip to avoid infinite loops
logger.warning(f"Failed to compute checksum for {path}: {e}")
# File unchanged and exceeded threshold - skip it
return True
async def _record_failure(self, path: str, error: str) -> None:
"""Record a file sync failure for circuit breaker tracking.
Uses LRU cache with bounded size to prevent unbounded memory growth.
Args:
path: File path that failed
error: Error message from the failure
"""
now = datetime.now()
# Compute checksum for failure tracking
try:
checksum = await self.file_service.compute_checksum(path)
except Exception:
# If checksum fails, use empty string (better than crashing)
checksum = ""
if path in self._file_failures:
# Update existing failure record and move to end (most recently used)
failure_info = self._file_failures.pop(path)
failure_info.count += 1
failure_info.last_failure = now
failure_info.last_error = error
failure_info.last_checksum = checksum
self._file_failures[path] = failure_info
logger.warning(
f"File sync failed (attempt {failure_info.count}/{MAX_CONSECUTIVE_FAILURES}): "
f"path={path}, error={error}"
)
# Log when threshold is reached
if failure_info.count >= MAX_CONSECUTIVE_FAILURES:
logger.error(
f"File {path} has failed {MAX_CONSECUTIVE_FAILURES} times and will be skipped. "
f"First failure: {failure_info.first_failure}, Last error: {error}"
)
else:
# Create new failure record
self._file_failures[path] = FileFailureInfo(
count=1,
first_failure=now,
last_failure=now,
last_error=error,
last_checksum=checksum,
)
logger.debug(f"Recording first failure for {path}: {error}")
# Enforce cache size limit - remove oldest entry if over limit
if len(self._file_failures) > self._max_tracked_failures:
removed_path, removed_info = self._file_failures.popitem(last=False)
logger.debug(
f"Evicting oldest failure record from cache: path={removed_path}, "
f"failures={removed_info.count}"
)
def _clear_failure(self, path: str) -> None:
"""Clear failure tracking for a file after successful sync.
Args:
path: File path that successfully synced
"""
if path in self._file_failures:
logger.info(f"Clearing failure history for {path} after successful sync")
del self._file_failures[path]
async def sync(
self, directory: Path, project_name: Optional[str] = None, force_full: bool = False
) -> SyncReport:
"""Sync all files with database and update scan watermark.
Args:
directory: Directory to sync
project_name: Optional project name
force_full: If True, force a full scan bypassing watermark optimization
"""
start_time = time.time()
sync_start_timestamp = time.time() # Capture at start for watermark
logger.info(f"Sync operation started for directory: {directory} (force_full={force_full})")
# initial paths from db to sync
# path -> checksum
report = await self.scan(directory, force_full=force_full)
# order of sync matters to resolve relations effectively
logger.info(
f"Sync changes detected: new_files={len(report.new)}, modified_files={len(report.modified)}, "
+ f"deleted_files={len(report.deleted)}, moved_files={len(report.moves)}"
)
# sync moves first
for old_path, new_path in report.moves.items():
# in the case where a file has been deleted and replaced by another file
# it will show up in the move and modified lists, so handle it in modified
if new_path in report.modified:
report.modified.remove(new_path)
logger.debug(
f"File marked as moved and modified: old_path={old_path}, new_path={new_path}"
)
else:
await self.handle_move(old_path, new_path)
# deleted next
for path in report.deleted:
await self.handle_delete(path)
# then new and modified
for path in report.new:
entity, _ = await self.sync_file(path, new=True)
# Track if file was skipped
if entity is None and await self._should_skip_file(path):
failure_info = self._file_failures[path]
report.skipped_files.append(
SkippedFile(
path=path,
reason=failure_info.last_error,
failure_count=failure_info.count,
first_failed=failure_info.first_failure,
)
)
for path in report.modified:
entity, _ = await self.sync_file(path, new=False)
# Track if file was skipped
if entity is None and await self._should_skip_file(path):
failure_info = self._file_failures[path]
report.skipped_files.append(
SkippedFile(
path=path,
reason=failure_info.last_error,
failure_count=failure_info.count,
first_failed=failure_info.first_failure,
)
)
# Only resolve relations if there were actual changes
# If no files changed, no new unresolved relations could have been created
if report.total > 0:
await self.resolve_relations()
else:
logger.info("Skipping relation resolution - no file changes detected")
# Update scan watermark after successful sync
# Use the timestamp from sync start (not end) to ensure we catch files
# created during the sync on the next iteration
current_file_count = await self._quick_count_files(directory)
if self.entity_repository.project_id is not None:
project = await self.project_repository.find_by_id(self.entity_repository.project_id)
if project:
await self.project_repository.update(
project.id,
{
"last_scan_timestamp": sync_start_timestamp,
"last_file_count": current_file_count,
},
)
logger.debug(
f"Updated scan watermark: timestamp={sync_start_timestamp}, "
f"file_count={current_file_count}"
)
duration_ms = int((time.time() - start_time) * 1000)
# Log summary with skipped files if any
if report.skipped_files:
logger.warning(
f"Sync completed with {len(report.skipped_files)} skipped files: "
f"directory={directory}, total_changes={report.total}, "
f"skipped={len(report.skipped_files)}, duration_ms={duration_ms}"
)
for skipped in report.skipped_files:
logger.warning(
f"Skipped file: path={skipped.path}, "
f"failures={skipped.failure_count}, reason={skipped.reason}"
)
else:
logger.info(
f"Sync operation completed: directory={directory}, "
f"total_changes={report.total}, duration_ms={duration_ms}"
)
return report
async def scan(self, directory, force_full: bool = False):
"""Smart scan using watermark and file count for large project optimization.
Uses scan watermark tracking to dramatically reduce scan time for large projects:
- Tracks last_scan_timestamp and last_file_count in Project model
- Uses `find -newermt` for incremental scanning (only changed files)
- Falls back to full scan when deletions detected (file count decreased)
Expected performance:
- No changes: 225x faster (2s vs 450s for 1,460 files on TigrisFS)
- Few changes: 84x faster (5s vs 420s)
- Deletions: Full scan (rare, acceptable)
Architecture:
- Get current file count quickly (find | wc -l: 1.4s)
- Compare with last_file_count to detect deletions
- If no deletions: incremental scan with find -newermt (0.2s)
- Process changed files with mtime-based comparison
Args:
directory: Directory to scan
force_full: If True, bypass watermark optimization and force full scan
"""
scan_start_time = time.time()
report = SyncReport()
# Get current project to check watermark
if self.entity_repository.project_id is None:
raise ValueError("Entity repository has no project_id set")
project = await self.project_repository.find_by_id(self.entity_repository.project_id)
if project is None:
raise ValueError(f"Project not found: {self.entity_repository.project_id}")
# Step 1: Quick file count
logger.debug("Counting files in directory")
current_count = await self._quick_count_files(directory)
logger.debug(f"Found {current_count} files in directory")
# Step 2: Determine scan strategy based on watermark and file count
if force_full:
# User explicitly requested full scan → bypass watermark optimization
scan_type = "full_forced"
logger.info("Force full scan requested, bypassing watermark optimization")
file_paths_to_scan = await self._scan_directory_full(directory)
elif project.last_file_count is None:
# First sync ever → full scan
scan_type = "full_initial"
logger.info("First sync for this project, performing full scan")
file_paths_to_scan = await self._scan_directory_full(directory)
elif current_count < project.last_file_count:
# Files deleted → need full scan to detect which ones
scan_type = "full_deletions"
logger.info(
f"File count decreased ({project.last_file_count} → {current_count}), "
f"running full scan to detect deletions"
)
file_paths_to_scan = await self._scan_directory_full(directory)
elif project.last_scan_timestamp is not None:
# Incremental scan: only files modified since last scan
scan_type = "incremental"
logger.info(
f"Running incremental scan for files modified since {project.last_scan_timestamp}"
)
file_paths_to_scan = await self._scan_directory_modified_since(
directory, project.last_scan_timestamp
)
logger.info(
f"Incremental scan found {len(file_paths_to_scan)} potentially changed files"
)
else:
# Fallback to full scan (no watermark available)
scan_type = "full_fallback"
logger.warning("No scan watermark available, falling back to full scan")
file_paths_to_scan = await self._scan_directory_full(directory)
# Step 3: Process each file with mtime-based comparison
scanned_paths: Set[str] = set()
changed_checksums: Dict[str, str] = {}
logger.debug(f"Processing {len(file_paths_to_scan)} files with mtime-based comparison")
for rel_path in file_paths_to_scan:
scanned_paths.add(rel_path)
# Get file stats
abs_path = directory / rel_path
if not abs_path.exists():
# File was deleted between scan and now (race condition)
continue
stat_info = abs_path.stat()
# Indexed lookup - single file query (not full table scan)
db_entity = await self.entity_repository.get_by_file_path(rel_path)
if db_entity is None:
# New file - need checksum for move detection
checksum = await self.file_service.compute_checksum(rel_path)
report.new.add(rel_path)
changed_checksums[rel_path] = checksum
logger.trace(f"New file detected: {rel_path}")
continue
# File exists in DB - check if mtime/size changed
db_mtime = db_entity.mtime
db_size = db_entity.size
fs_mtime = stat_info.st_mtime
fs_size = stat_info.st_size
# Compare mtime and size (like rsync/rclone)
# Allow small epsilon for float comparison (0.01s = 10ms)
mtime_changed = db_mtime is None or abs(fs_mtime - db_mtime) > 0.01
size_changed = db_size is None or fs_size != db_size
if mtime_changed or size_changed:
# File modified - compute checksum
checksum = await self.file_service.compute_checksum(rel_path)
db_checksum = db_entity.checksum
# Only mark as modified if checksum actually differs
# (handles cases where mtime changed but content didn't, e.g., git operations)
if checksum != db_checksum:
report.modified.add(rel_path)
changed_checksums[rel_path] = checksum
logger.trace(
f"Modified file detected: {rel_path}, "
f"mtime_changed={mtime_changed}, size_changed={size_changed}"
)
else:
# File unchanged - no checksum needed
logger.trace(f"File unchanged (mtime/size match): {rel_path}")
# Step 4: Detect moves (for both full and incremental scans)
# Check if any "new" files are actually moves by matching checksums
for new_path in list(report.new): # Use list() to allow modification during iteration
new_checksum = changed_checksums.get(new_path)
if not new_checksum:
continue
# Look for existing entity with same checksum but different path
# This could be a move or a copy
existing_entities = await self.entity_repository.find_by_checksum(new_checksum)
for candidate in existing_entities:
if candidate.file_path == new_path:
# Same path, skip (shouldn't happen for "new" files but be safe)
continue
# Check if the old path still exists on disk
old_path_abs = directory / candidate.file_path
if old_path_abs.exists():
# Original still exists → this is a copy, not a move
logger.trace(
f"File copy detected (not move): {candidate.file_path} copied to {new_path}"
)
continue
# Original doesn't exist → this is a move!
report.moves[candidate.file_path] = new_path
report.new.remove(new_path)
logger.trace(f"Move detected: {candidate.file_path} -> {new_path}")
break # Only match first candidate
# Step 5: Detect deletions (only for full scans)
# Incremental scans can't reliably detect deletions since they only see modified files
if scan_type in ("full_initial", "full_deletions", "full_fallback", "full_forced"):
# Use optimized query for just file paths (not full entities)
db_file_paths = await self.entity_repository.get_all_file_paths()
logger.debug(f"Found {len(db_file_paths)} db paths for deletion detection")
for db_path in db_file_paths:
if db_path not in scanned_paths:
# File in DB but not on filesystem
# Check if it was already detected as a move
if db_path in report.moves:
# Already handled as a move, skip
continue
# File was deleted
report.deleted.add(db_path)
logger.trace(f"Deleted file detected: {db_path}")
# Store checksums for files that need syncing
report.checksums = changed_checksums
scan_duration_ms = int((time.time() - scan_start_time) * 1000)
logger.info(
f"Completed {scan_type} scan for directory {directory} in {scan_duration_ms}ms, "
f"found {report.total} changes (new={len(report.new)}, "
f"modified={len(report.modified)}, deleted={len(report.deleted)}, "
f"moves={len(report.moves)})"
)
return report
async def sync_file(
self, path: str, new: bool = True
) -> Tuple[Optional[Entity], Optional[str]]:
"""Sync a single file with circuit breaker protection.
Args:
path: Path to file to sync
new: Whether this is a new file
Returns:
Tuple of (entity, checksum) or (None, None) if sync fails or file is skipped
"""
# Check if file should be skipped due to repeated failures
if await self._should_skip_file(path):
logger.warning(f"Skipping file due to repeated failures: {path}")
return None, None
try:
logger.debug(
f"Syncing file path={path} is_new={new} is_markdown={self.file_service.is_markdown(path)}"
)
if self.file_service.is_markdown(path):
entity, checksum = await self.sync_markdown_file(path, new)
else:
entity, checksum = await self.sync_regular_file(path, new)
if entity is not None:
await self.search_service.index_entity(entity)
# Clear failure tracking on successful sync
self._clear_failure(path)
logger.debug(
f"File sync completed, path={path}, entity_id={entity.id}, checksum={checksum[:8]}"
)
return entity, checksum
except FileNotFoundError:
# File exists in database but not on filesystem
# This indicates a database/filesystem inconsistency - treat as deletion
logger.warning(
f"File not found during sync, treating as deletion: path={path}. "
"This may indicate a race condition or manual file deletion."
)
await self.handle_delete(path)
return None, None
except Exception as e:
# Check if this is a fatal error (or caused by one)
# Fatal errors like project deletion should terminate sync immediately
if isinstance(e, SyncFatalError) or isinstance(
e.__cause__, SyncFatalError
): # pragma: no cover
logger.error(f"Fatal sync error encountered, terminating sync: path={path}")
raise
# Otherwise treat as recoverable file-level error
error_msg = str(e)
logger.error(f"Failed to sync file: path={path}, error={error_msg}")
# Record failure for circuit breaker
await self._record_failure(path, error_msg)
return None, None
async def sync_markdown_file(self, path: str, new: bool = True) -> Tuple[Optional[Entity], str]:
"""Sync a markdown file with full processing.
Args:
path: Path to markdown file
new: Whether this is a new file
Returns:
Tuple of (entity, checksum)
"""
# Parse markdown first to get any existing permalink
logger.debug(f"Parsing markdown file, path: {path}, new: {new}")
file_content = await self.file_service.read_file_content(path)
file_contains_frontmatter = has_frontmatter(file_content)
# Get file timestamps for tracking modification times
file_metadata = await self.file_service.get_file_metadata(path)
created = file_metadata.created_at
modified = file_metadata.modified_at
# Parse markdown content with file metadata (avoids redundant file read/stat)
# This enables cloud implementations (S3FileService) to provide metadata from head_object
abs_path = self.file_service.base_path / path
entity_markdown = await self.entity_parser.parse_markdown_content(
file_path=abs_path,
content=file_content,
mtime=file_metadata.modified_at.timestamp(),
ctime=file_metadata.created_at.timestamp(),
)
# if the file contains frontmatter, resolve a permalink (unless disabled)
if file_contains_frontmatter and not self.app_config.disable_permalinks:
# Resolve permalink - skip conflict checks during bulk sync for performance
permalink = await self.entity_service.resolve_permalink(
path, markdown=entity_markdown, skip_conflict_check=True
)
# If permalink changed, update the file
if permalink != entity_markdown.frontmatter.permalink:
logger.info(
f"Updating permalink for path: {path}, old_permalink: {entity_markdown.frontmatter.permalink}, new_permalink: {permalink}"
)
entity_markdown.frontmatter.metadata["permalink"] = permalink
await self.file_service.update_frontmatter(path, {"permalink": permalink})
# if the file is new, create an entity
if new:
# Create entity with final permalink
logger.debug(f"Creating new entity from markdown, path={path}")
await self.entity_service.create_entity_from_markdown(Path(path), entity_markdown)
# otherwise we need to update the entity and observations
else:
logger.debug(f"Updating entity from markdown, path={path}")
await self.entity_service.update_entity_and_observations(Path(path), entity_markdown)
# Update relations and search index
entity = await self.entity_service.update_entity_relations(path, entity_markdown)
# After updating relations, we need to compute the checksum again
# This is necessary for files with wikilinks to ensure consistent checksums
# after relation processing is complete
final_checksum = await self.file_service.compute_checksum(path)
# Update checksum, timestamps, and file metadata from file system
# Store mtime/size for efficient change detection in future scans
# This ensures temporal ordering in search and recent activity uses actual file modification times
await self.entity_repository.update(
entity.id,
{
"checksum": final_checksum,
"created_at": created,
"updated_at": modified,
"mtime": file_metadata.modified_at.timestamp(),
"size": file_metadata.size,
},
)
logger.debug(
f"Markdown sync completed: path={path}, entity_id={entity.id}, "
f"observation_count={len(entity.observations)}, relation_count={len(entity.relations)}, "
f"checksum={final_checksum[:8]}"
)
# Return the final checksum to ensure everything is consistent
return entity, final_checksum
async def sync_regular_file(self, path: str, new: bool = True) -> Tuple[Optional[Entity], str]:
"""Sync a non-markdown file with basic tracking.
Args:
path: Path to file
new: Whether this is a new file
Returns:
Tuple of (entity, checksum)
"""
checksum = await self.file_service.compute_checksum(path)
if new:
# Generate permalink from path - skip conflict checks during bulk sync
await self.entity_service.resolve_permalink(path, skip_conflict_check=True)
# get file timestamps
file_metadata = await self.file_service.get_file_metadata(path)
created = file_metadata.created_at
modified = file_metadata.modified_at
# get mime type
content_type = self.file_service.content_type(path)
file_path = Path(path)
try:
entity = await self.entity_repository.add(
Entity(
entity_type="file",
file_path=path,
checksum=checksum,
title=file_path.name,
created_at=created,
updated_at=modified,
content_type=content_type,
mtime=file_metadata.modified_at.timestamp(),
size=file_metadata.size,
)
)
return entity, checksum
except IntegrityError as e:
# Handle race condition where entity was created by another process
msg = str(e)
if (
"UNIQUE constraint failed: entity.file_path" in msg
or "uix_entity_file_path_project" in msg
or "duplicate key value violates unique constraint" in msg
and "file_path" in msg
):
logger.info(
f"Entity already exists for file_path={path}, updating instead of creating"
)
# Treat as update instead of create
entity = await self.entity_repository.get_by_file_path(path)
if entity is None: # pragma: no cover
logger.error(f"Entity not found after constraint violation, path={path}")
raise ValueError(f"Entity not found after constraint violation: {path}")
# Re-get file metadata since we're in update path
file_metadata_for_update = await self.file_service.get_file_metadata(path)
updated = await self.entity_repository.update(
entity.id,
{
"file_path": path,
"checksum": checksum,
"mtime": file_metadata_for_update.modified_at.timestamp(),
"size": file_metadata_for_update.size,
},
)
if updated is None: # pragma: no cover
logger.error(f"Failed to update entity, entity_id={entity.id}, path={path}")
raise ValueError(f"Failed to update entity with ID {entity.id}")
return updated, checksum
else:
# Re-raise if it's a different integrity error
raise # pragma: no cover
else:
# Get file timestamps for updating modification time
file_metadata = await self.file_service.get_file_metadata(path)
modified = file_metadata.modified_at
entity = await self.entity_repository.get_by_file_path(path)
if entity is None: # pragma: no cover
logger.error(f"Entity not found for existing file, path={path}")
raise ValueError(f"Entity not found for existing file: {path}")
# Update checksum, modification time, and file metadata from file system
# Store mtime/size for efficient change detection in future scans
updated = await self.entity_repository.update(
entity.id,
{
"file_path": path,
"checksum": checksum,
"updated_at": modified,
"mtime": file_metadata.modified_at.timestamp(),
"size": file_metadata.size,
},
)
if updated is None: # pragma: no cover
logger.error(f"Failed to update entity, entity_id={entity.id}, path={path}")
raise ValueError(f"Failed to update entity with ID {entity.id}")
return updated, checksum
async def handle_delete(self, file_path: str):
"""Handle complete entity deletion including search index cleanup."""
# First get entity to get permalink before deletion
entity = await self.entity_repository.get_by_file_path(file_path)
if entity:
logger.info(
f"Deleting entity with file_path={file_path}, entity_id={entity.id}, permalink={entity.permalink}"
)
# Delete from db (this cascades to observations/relations)
await self.entity_service.delete_entity_by_file_path(file_path)
# Clean up search index
permalinks = (
[entity.permalink]
+ [o.permalink for o in entity.observations]
+ [r.permalink for r in entity.relations]
)
logger.debug(
f"Cleaning up search index for entity_id={entity.id}, file_path={file_path}, "
f"index_entries={len(permalinks)}"
)
for permalink in permalinks:
if permalink:
await self.search_service.delete_by_permalink(permalink)
else:
await self.search_service.delete_by_entity_id(entity.id)
async def handle_move(self, old_path, new_path):
logger.debug("Moving entity", old_path=old_path, new_path=new_path)
entity = await self.entity_repository.get_by_file_path(old_path)
if entity:
# Check if destination path is already occupied by another entity
existing_at_destination = await self.entity_repository.get_by_file_path(new_path)
if existing_at_destination and existing_at_destination.id != entity.id:
# Handle the conflict - this could be a file swap or replacement scenario
logger.warning(
f"File path conflict detected during move: "
f"entity_id={entity.id} trying to move from '{old_path}' to '{new_path}', "
f"but entity_id={existing_at_destination.id} already occupies '{new_path}'"
)
# Check if this is a file swap (the destination entity is being moved to our old path)
# This would indicate a simultaneous move operation
old_path_after_swap = await self.entity_repository.get_by_file_path(old_path)
if old_path_after_swap and old_path_after_swap.id == existing_at_destination.id:
logger.info(f"Detected file swap between '{old_path}' and '{new_path}'")
# This is a swap scenario - both moves should succeed
# We'll allow this to proceed since the other file has moved out
else:
# This is a conflict where the destination is occupied
raise ValueError(
f"Cannot move entity from '{old_path}' to '{new_path}': "
f"destination path is already occupied by another file. "
f"This may be caused by: "
f"1. Conflicting file names with different character encodings, "
f"2. Case sensitivity differences (e.g., 'Finance/' vs 'finance/'), "
f"3. Character conflicts between hyphens in filenames and generated permalinks, "
f"4. Files with similar names containing special characters. "
f"Try renaming one of the conflicting files to resolve this issue."
)
# Update file_path in all cases
updates = {"file_path": new_path}
# If configured, also update permalink to match new path
if (
self.app_config.update_permalinks_on_move
and not self.app_config.disable_permalinks
and self.file_service.is_markdown(new_path)
):
# generate new permalink value - skip conflict checks during bulk sync
new_permalink = await self.entity_service.resolve_permalink(
new_path, skip_conflict_check=True
)
# write to file and get new checksum
new_checksum = await self.file_service.update_frontmatter(
new_path, {"permalink": new_permalink}
)
updates["permalink"] = new_permalink
updates["checksum"] = new_checksum
logger.info(
f"Updating permalink on move,old_permalink={entity.permalink}"
f"new_permalink={new_permalink}"
f"new_checksum={new_checksum}"
)
try:
updated = await self.entity_repository.update(entity.id, updates)
except Exception as e:
# Catch any database integrity errors and provide helpful context
if "UNIQUE constraint failed" in str(e):
logger.error(
f"Database constraint violation during move: "
f"entity_id={entity.id}, old_path='{old_path}', new_path='{new_path}'"
)
raise ValueError(
f"Cannot complete move from '{old_path}' to '{new_path}': "
f"a database constraint was violated. This usually indicates "
f"a file path or permalink conflict. Please check for: "
f"1. Duplicate file names, "
f"2. Case sensitivity issues (e.g., 'File.md' vs 'file.md'), "
f"3. Character encoding conflicts in file names."
) from e
else:
# Re-raise other exceptions as-is
raise
if updated is None: # pragma: no cover
logger.error(
"Failed to update entity path"
f"entity_id={entity.id}"
f"old_path={old_path}"
f"new_path={new_path}"
)
raise ValueError(f"Failed to update entity path for ID {entity.id}")
logger.debug(
"Entity path updated"
f"entity_id={entity.id} "
f"permalink={entity.permalink} "
f"old_path={old_path} "
f"new_path={new_path} "
)
# update search index
await self.search_service.index_entity(updated)
async def resolve_relations(self, entity_id: int | None = None):
"""Try to resolve unresolved relations.
Args:
entity_id: If provided, only resolve relations for this specific entity.
Otherwise, resolve all unresolved relations in the database.
"""
if entity_id:
# Only get unresolved relations for the specific entity
unresolved_relations = (
await self.relation_repository.find_unresolved_relations_for_entity(entity_id)
)
logger.info(
f"Resolving forward references for entity {entity_id}",
count=len(unresolved_relations),
)
else:
# Get all unresolved relations (original behavior)
unresolved_relations = await self.relation_repository.find_unresolved_relations()
logger.info("Resolving all forward references", count=len(unresolved_relations))
for relation in unresolved_relations:
logger.trace(
"Attempting to resolve relation "
f"relation_id={relation.id} "
f"from_id={relation.from_id} "
f"to_name={relation.to_name}"
)
resolved_entity = await self.entity_service.link_resolver.resolve_link(relation.to_name)
# ignore reference to self
if resolved_entity and resolved_entity.id != relation.from_id:
logger.debug(
"Resolved forward reference "
f"relation_id={relation.id} "
f"from_id={relation.from_id} "
f"to_name={relation.to_name} "
f"resolved_id={resolved_entity.id} "
f"resolved_title={resolved_entity.title}",
)
try:
await self.relation_repository.update(
relation.id,
{
"to_id": resolved_entity.id,
"to_name": resolved_entity.title,
},
)
# update search index only on successful resolution
await self.search_service.index_entity(resolved_entity)
except IntegrityError:
# IntegrityError means a relation with this (from_id, to_id, relation_type)
# already exists. The UPDATE was rolled back, so our unresolved relation
# (to_id=NULL) still exists in the database. We delete it because:
# 1. It's redundant - a resolved relation already captures this relationship
# 2. If we don't delete it, future syncs will try to resolve it again
# and get the same IntegrityError
logger.debug(
"Deleting duplicate unresolved relation "
f"relation_id={relation.id} "
f"from_id={relation.from_id} "
f"to_name={relation.to_name} "
f"resolved_to_id={resolved_entity.id}"
)
try:
await self.relation_repository.delete(relation.id)
except Exception as e:
# Log but don't fail - the relation may have been deleted already
logger.debug(f"Could not delete duplicate relation {relation.id}: {e}")
async def _quick_count_files(self, directory: Path) -> int:
"""Fast file count using find command.
Uses subprocess to leverage OS-level file counting which is much faster
than Python iteration, especially on network filesystems like TigrisFS.
On Windows, subprocess is not supported with SelectorEventLoop (which we use
to avoid aiosqlite cleanup issues), so we fall back to Python-based counting.
Args:
directory: Directory to count files in
Returns:
Number of files in directory (recursive)
"""
# Windows with SelectorEventLoop doesn't support subprocess
if sys.platform == "win32":
count = 0
async for _ in self.scan_directory(directory):
count += 1
return count
process = await asyncio.create_subprocess_shell(
f'find "{directory}" -type f | wc -l',
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
if process.returncode != 0:
error_msg = stderr.decode().strip()
logger.error(
f"FILE COUNT OPTIMIZATION FAILED: find command failed with exit code {process.returncode}, "
f"error: {error_msg}. Falling back to manual count. "
f"This will slow down watermark detection!"
)
# Fallback: count using scan_directory
count = 0
async for _ in self.scan_directory(directory):
count += 1
return count
return int(stdout.strip())
async def _scan_directory_modified_since(
self, directory: Path, since_timestamp: float
) -> List[str]:
"""Use find -newermt for filesystem-level filtering of modified files.
This is dramatically faster than scanning all files and comparing mtimes,
especially on network filesystems like TigrisFS where stat operations are expensive.
On Windows, subprocess is not supported with SelectorEventLoop (which we use
to avoid aiosqlite cleanup issues), so we implement mtime filtering in Python.
Args:
directory: Directory to scan
since_timestamp: Unix timestamp to find files newer than
Returns:
List of relative file paths modified since the timestamp (respects .bmignore)
"""
# Windows with SelectorEventLoop doesn't support subprocess
# Implement mtime filtering in Python to preserve watermark optimization
if sys.platform == "win32":
file_paths = []
async for file_path_str, stat_info in self.scan_directory(directory):
if stat_info.st_mtime > since_timestamp:
rel_path = Path(file_path_str).relative_to(directory).as_posix()
file_paths.append(rel_path)
return file_paths
# Convert timestamp to find-compatible format
since_date = datetime.fromtimestamp(since_timestamp).strftime("%Y-%m-%d %H:%M:%S")
process = await asyncio.create_subprocess_shell(
f'find "{directory}" -type f -newermt "{since_date}"',
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await process.communicate()
if process.returncode != 0:
error_msg = stderr.decode().strip()
logger.error(
f"SCAN OPTIMIZATION FAILED: find -newermt command failed with exit code {process.returncode}, "
f"error: {error_msg}. Falling back to full scan. "
f"This will cause slow syncs on large projects!"
)
# Fallback to full scan
return await self._scan_directory_full(directory)
# Convert absolute paths to relative and filter through ignore patterns
file_paths = []
for line in stdout.decode().splitlines():
if line:
try:
abs_path = Path(line)
rel_path = abs_path.relative_to(directory).as_posix()
# Apply ignore patterns (same as scan_directory)
if should_ignore_path(abs_path, directory, self._ignore_patterns):
logger.trace(f"Ignoring path per .bmignore: {rel_path}")
continue
file_paths.append(rel_path)
except ValueError:
# Path is not relative to directory, skip it
logger.warning(f"Skipping file not under directory: {line}")
continue
return file_paths
async def _scan_directory_full(self, directory: Path) -> List[str]:
"""Full directory scan returning all file paths.
Uses scan_directory() which respects .bmignore patterns.
Args:
directory: Directory to scan
Returns:
List of relative file paths (respects .bmignore)
"""
file_paths = []
async for file_path_str, _ in self.scan_directory(directory):
rel_path = Path(file_path_str).relative_to(directory).as_posix()
file_paths.append(rel_path)
return file_paths
async def scan_directory(self, directory: Path) -> AsyncIterator[Tuple[str, os.stat_result]]:
"""Stream files from directory using aiofiles.os.scandir() with cached stat info.
This method uses aiofiles.os.scandir() to leverage async I/O and cached stat
information from directory entries. This reduces network I/O by 50% on network
filesystems like TigrisFS by avoiding redundant stat() calls.
Args:
directory: Directory to scan
Yields:
Tuples of (absolute_file_path, stat_info) for each file
"""
try:
entries = await aiofiles.os.scandir(directory)
except PermissionError:
logger.warning(f"Permission denied scanning directory: {directory}")
return
results = []
subdirs = []
for entry in entries:
entry_path = Path(entry.path)
# Check ignore patterns
if should_ignore_path(entry_path, directory, self._ignore_patterns):
logger.trace(f"Ignoring path per .bmignore: {entry_path.relative_to(directory)}")
continue
if entry.is_dir(follow_symlinks=False):
# Collect subdirectories to recurse into
subdirs.append(entry_path)
elif entry.is_file(follow_symlinks=False):
# Get cached stat info (no extra syscall!)
stat_info = entry.stat(follow_symlinks=False)
results.append((entry.path, stat_info))
# Yield files from current directory
for file_path, stat_info in results:
yield (file_path, stat_info)
# Recurse into subdirectories
for subdir in subdirs:
async for result in self.scan_directory(subdir):
yield result
async def get_sync_service(project: Project) -> SyncService: # pragma: no cover
"""Get sync service instance with all dependencies."""
app_config = ConfigManager().config
_, session_maker = await db.get_or_create_db(
db_path=app_config.database_path, db_type=db.DatabaseType.FILESYSTEM
)
project_path = Path(project.path)
entity_parser = EntityParser(project_path)
markdown_processor = MarkdownProcessor(entity_parser, app_config=app_config)
file_service = FileService(project_path, markdown_processor, app_config=app_config)
# Initialize repositories
entity_repository = EntityRepository(session_maker, project_id=project.id)
observation_repository = ObservationRepository(session_maker, project_id=project.id)
relation_repository = RelationRepository(session_maker, project_id=project.id)
search_repository = create_search_repository(session_maker, project_id=project.id)
project_repository = ProjectRepository(session_maker)
# Initialize services
search_service = SearchService(search_repository, entity_repository, file_service)
link_resolver = LinkResolver(entity_repository, search_service)
# Initialize services
entity_service = EntityService(
entity_parser,
entity_repository,
observation_repository,
relation_repository,
file_service,
link_resolver,
)
# Create sync service
sync_service = SyncService(
app_config=app_config,
entity_service=entity_service,
entity_parser=entity_parser,
entity_repository=entity_repository,
relation_repository=relation_repository,
project_repository=project_repository,
search_service=search_service,
file_service=file_service,
)
return sync_service
```