#
tokens: 47201/50000 4/416 files (page 17/19)
lines: off (toggle) GitHub
raw markdown copy
This is page 17 of 19. Use http://codebase.md/basicmachines-co/basic-memory?page={x} to view the full context.

# Directory Structure

```
├── .claude
│   ├── commands
│   │   ├── release
│   │   │   ├── beta.md
│   │   │   ├── changelog.md
│   │   │   ├── release-check.md
│   │   │   └── release.md
│   │   ├── spec.md
│   │   └── test-live.md
│   └── settings.json
├── .dockerignore
├── .env.example
├── .github
│   ├── dependabot.yml
│   ├── ISSUE_TEMPLATE
│   │   ├── bug_report.md
│   │   ├── config.yml
│   │   ├── documentation.md
│   │   └── feature_request.md
│   └── workflows
│       ├── claude-code-review.yml
│       ├── claude-issue-triage.yml
│       ├── claude.yml
│       ├── dev-release.yml
│       ├── docker.yml
│       ├── pr-title.yml
│       ├── release.yml
│       └── test.yml
├── .gitignore
├── .python-version
├── CHANGELOG.md
├── CITATION.cff
├── CLA.md
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── docker-compose-postgres.yml
├── docker-compose.yml
├── Dockerfile
├── docs
│   ├── ai-assistant-guide-extended.md
│   ├── ARCHITECTURE.md
│   ├── character-handling.md
│   ├── cloud-cli.md
│   ├── Docker.md
│   └── testing-coverage.md
├── justfile
├── LICENSE
├── llms-install.md
├── pyproject.toml
├── README.md
├── SECURITY.md
├── smithery.yaml
├── specs
│   ├── SPEC-1 Specification-Driven Development Process.md
│   ├── SPEC-10 Unified Deployment Workflow and Event Tracking.md
│   ├── SPEC-11 Basic Memory API Performance Optimization.md
│   ├── SPEC-12 OpenTelemetry Observability.md
│   ├── SPEC-13 CLI Authentication with Subscription Validation.md
│   ├── SPEC-14 Cloud Git Versioning & GitHub Backup.md
│   ├── SPEC-14- Cloud Git Versioning & GitHub Backup.md
│   ├── SPEC-15 Configuration Persistence via Tigris for Cloud Tenants.md
│   ├── SPEC-16 MCP Cloud Service Consolidation.md
│   ├── SPEC-17 Semantic Search with ChromaDB.md
│   ├── SPEC-18 AI Memory Management Tool.md
│   ├── SPEC-19 Sync Performance and Memory Optimization.md
│   ├── SPEC-2 Slash Commands Reference.md
│   ├── SPEC-20 Simplified Project-Scoped Rclone Sync.md
│   ├── SPEC-3 Agent Definitions.md
│   ├── SPEC-4 Notes Web UI Component Architecture.md
│   ├── SPEC-5 CLI Cloud Upload via WebDAV.md
│   ├── SPEC-6 Explicit Project Parameter Architecture.md
│   ├── SPEC-7 POC to spike Tigris Turso for local access to cloud data.md
│   ├── SPEC-8 TigrisFS Integration.md
│   ├── SPEC-9 Multi-Project Bidirectional Sync Architecture.md
│   ├── SPEC-9 Signed Header Tenant Information.md
│   └── SPEC-9-1 Follow-Ups- Conflict, Sync, and Observability.md
├── src
│   └── basic_memory
│       ├── __init__.py
│       ├── alembic
│       │   ├── alembic.ini
│       │   ├── env.py
│       │   ├── migrations.py
│       │   ├── script.py.mako
│       │   └── versions
│       │       ├── 314f1ea54dc4_add_postgres_full_text_search_support_.py
│       │       ├── 3dae7c7b1564_initial_schema.py
│       │       ├── 502b60eaa905_remove_required_from_entity_permalink.py
│       │       ├── 5fe1ab1ccebe_add_projects_table.py
│       │       ├── 647e7a75e2cd_project_constraint_fix.py
│       │       ├── 6830751f5fb6_merge_multiple_heads.py
│       │       ├── 9d9c1cb7d8f5_add_mtime_and_size_columns_to_entity_.py
│       │       ├── a1b2c3d4e5f6_fix_project_foreign_keys.py
│       │       ├── a2b3c4d5e6f7_add_search_index_entity_cascade.py
│       │       ├── b3c3938bacdb_relation_to_name_unique_index.py
│       │       ├── cc7172b46608_update_search_index_schema.py
│       │       ├── e7e1f4367280_add_scan_watermark_tracking_to_project.py
│       │       ├── f8a9b2c3d4e5_add_pg_trgm_for_fuzzy_link_resolution.py
│       │       └── g9a0b3c4d5e6_add_external_id_to_project_and_entity.py
│       ├── api
│       │   ├── __init__.py
│       │   ├── app.py
│       │   ├── container.py
│       │   ├── routers
│       │   │   ├── __init__.py
│       │   │   ├── directory_router.py
│       │   │   ├── importer_router.py
│       │   │   ├── knowledge_router.py
│       │   │   ├── management_router.py
│       │   │   ├── memory_router.py
│       │   │   ├── project_router.py
│       │   │   ├── prompt_router.py
│       │   │   ├── resource_router.py
│       │   │   ├── search_router.py
│       │   │   └── utils.py
│       │   ├── template_loader.py
│       │   └── v2
│       │       ├── __init__.py
│       │       └── routers
│       │           ├── __init__.py
│       │           ├── directory_router.py
│       │           ├── importer_router.py
│       │           ├── knowledge_router.py
│       │           ├── memory_router.py
│       │           ├── project_router.py
│       │           ├── prompt_router.py
│       │           ├── resource_router.py
│       │           └── search_router.py
│       ├── cli
│       │   ├── __init__.py
│       │   ├── app.py
│       │   ├── auth.py
│       │   ├── commands
│       │   │   ├── __init__.py
│       │   │   ├── cloud
│       │   │   │   ├── __init__.py
│       │   │   │   ├── api_client.py
│       │   │   │   ├── bisync_commands.py
│       │   │   │   ├── cloud_utils.py
│       │   │   │   ├── core_commands.py
│       │   │   │   ├── rclone_commands.py
│       │   │   │   ├── rclone_config.py
│       │   │   │   ├── rclone_installer.py
│       │   │   │   ├── upload_command.py
│       │   │   │   └── upload.py
│       │   │   ├── command_utils.py
│       │   │   ├── db.py
│       │   │   ├── format.py
│       │   │   ├── import_chatgpt.py
│       │   │   ├── import_claude_conversations.py
│       │   │   ├── import_claude_projects.py
│       │   │   ├── import_memory_json.py
│       │   │   ├── mcp.py
│       │   │   ├── project.py
│       │   │   ├── status.py
│       │   │   ├── telemetry.py
│       │   │   └── tool.py
│       │   ├── container.py
│       │   └── main.py
│       ├── config.py
│       ├── db.py
│       ├── deps
│       │   ├── __init__.py
│       │   ├── config.py
│       │   ├── db.py
│       │   ├── importers.py
│       │   ├── projects.py
│       │   ├── repositories.py
│       │   └── services.py
│       ├── deps.py
│       ├── file_utils.py
│       ├── ignore_utils.py
│       ├── importers
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── chatgpt_importer.py
│       │   ├── claude_conversations_importer.py
│       │   ├── claude_projects_importer.py
│       │   ├── memory_json_importer.py
│       │   └── utils.py
│       ├── markdown
│       │   ├── __init__.py
│       │   ├── entity_parser.py
│       │   ├── markdown_processor.py
│       │   ├── plugins.py
│       │   ├── schemas.py
│       │   └── utils.py
│       ├── mcp
│       │   ├── __init__.py
│       │   ├── async_client.py
│       │   ├── clients
│       │   │   ├── __init__.py
│       │   │   ├── directory.py
│       │   │   ├── knowledge.py
│       │   │   ├── memory.py
│       │   │   ├── project.py
│       │   │   ├── resource.py
│       │   │   └── search.py
│       │   ├── container.py
│       │   ├── project_context.py
│       │   ├── prompts
│       │   │   ├── __init__.py
│       │   │   ├── ai_assistant_guide.py
│       │   │   ├── continue_conversation.py
│       │   │   ├── recent_activity.py
│       │   │   ├── search.py
│       │   │   └── utils.py
│       │   ├── resources
│       │   │   ├── ai_assistant_guide.md
│       │   │   └── project_info.py
│       │   ├── server.py
│       │   └── tools
│       │       ├── __init__.py
│       │       ├── build_context.py
│       │       ├── canvas.py
│       │       ├── chatgpt_tools.py
│       │       ├── delete_note.py
│       │       ├── edit_note.py
│       │       ├── list_directory.py
│       │       ├── move_note.py
│       │       ├── project_management.py
│       │       ├── read_content.py
│       │       ├── read_note.py
│       │       ├── recent_activity.py
│       │       ├── search.py
│       │       ├── utils.py
│       │       ├── view_note.py
│       │       └── write_note.py
│       ├── models
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── knowledge.py
│       │   ├── project.py
│       │   └── search.py
│       ├── project_resolver.py
│       ├── repository
│       │   ├── __init__.py
│       │   ├── entity_repository.py
│       │   ├── observation_repository.py
│       │   ├── postgres_search_repository.py
│       │   ├── project_info_repository.py
│       │   ├── project_repository.py
│       │   ├── relation_repository.py
│       │   ├── repository.py
│       │   ├── search_index_row.py
│       │   ├── search_repository_base.py
│       │   ├── search_repository.py
│       │   └── sqlite_search_repository.py
│       ├── runtime.py
│       ├── schemas
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── cloud.py
│       │   ├── delete.py
│       │   ├── directory.py
│       │   ├── importer.py
│       │   ├── memory.py
│       │   ├── project_info.py
│       │   ├── prompt.py
│       │   ├── request.py
│       │   ├── response.py
│       │   ├── search.py
│       │   ├── sync_report.py
│       │   └── v2
│       │       ├── __init__.py
│       │       ├── entity.py
│       │       └── resource.py
│       ├── services
│       │   ├── __init__.py
│       │   ├── context_service.py
│       │   ├── directory_service.py
│       │   ├── entity_service.py
│       │   ├── exceptions.py
│       │   ├── file_service.py
│       │   ├── initialization.py
│       │   ├── link_resolver.py
│       │   ├── project_service.py
│       │   ├── search_service.py
│       │   └── service.py
│       ├── sync
│       │   ├── __init__.py
│       │   ├── background_sync.py
│       │   ├── coordinator.py
│       │   ├── sync_service.py
│       │   └── watch_service.py
│       ├── telemetry.py
│       ├── templates
│       │   └── prompts
│       │       ├── continue_conversation.hbs
│       │       └── search.hbs
│       └── utils.py
├── test-int
│   ├── BENCHMARKS.md
│   ├── cli
│   │   ├── test_project_commands_integration.py
│   │   └── test_version_integration.py
│   ├── conftest.py
│   ├── mcp
│   │   ├── test_build_context_underscore.py
│   │   ├── test_build_context_validation.py
│   │   ├── test_chatgpt_tools_integration.py
│   │   ├── test_default_project_mode_integration.py
│   │   ├── test_delete_note_integration.py
│   │   ├── test_edit_note_integration.py
│   │   ├── test_lifespan_shutdown_sync_task_cancellation_integration.py
│   │   ├── test_list_directory_integration.py
│   │   ├── test_move_note_integration.py
│   │   ├── test_project_management_integration.py
│   │   ├── test_project_state_sync_integration.py
│   │   ├── test_read_content_integration.py
│   │   ├── test_read_note_integration.py
│   │   ├── test_search_integration.py
│   │   ├── test_single_project_mcp_integration.py
│   │   └── test_write_note_integration.py
│   ├── test_db_wal_mode.py
│   └── test_disable_permalinks_integration.py
├── tests
│   ├── __init__.py
│   ├── api
│   │   ├── conftest.py
│   │   ├── test_api_container.py
│   │   ├── test_async_client.py
│   │   ├── test_continue_conversation_template.py
│   │   ├── test_directory_router.py
│   │   ├── test_importer_router.py
│   │   ├── test_knowledge_router.py
│   │   ├── test_management_router.py
│   │   ├── test_memory_router.py
│   │   ├── test_project_router_operations.py
│   │   ├── test_project_router.py
│   │   ├── test_prompt_router.py
│   │   ├── test_relation_background_resolution.py
│   │   ├── test_resource_router.py
│   │   ├── test_search_router.py
│   │   ├── test_search_template.py
│   │   ├── test_template_loader_helpers.py
│   │   ├── test_template_loader.py
│   │   └── v2
│   │       ├── __init__.py
│   │       ├── conftest.py
│   │       ├── test_directory_router.py
│   │       ├── test_importer_router.py
│   │       ├── test_knowledge_router.py
│   │       ├── test_memory_router.py
│   │       ├── test_project_router.py
│   │       ├── test_prompt_router.py
│   │       ├── test_resource_router.py
│   │       └── test_search_router.py
│   ├── cli
│   │   ├── cloud
│   │   │   ├── test_cloud_api_client_and_utils.py
│   │   │   ├── test_rclone_config_and_bmignore_filters.py
│   │   │   └── test_upload_path.py
│   │   ├── conftest.py
│   │   ├── test_auth_cli_auth.py
│   │   ├── test_cli_container.py
│   │   ├── test_cli_exit.py
│   │   ├── test_cli_tool_exit.py
│   │   ├── test_cli_tools.py
│   │   ├── test_cloud_authentication.py
│   │   ├── test_ignore_utils.py
│   │   ├── test_import_chatgpt.py
│   │   ├── test_import_claude_conversations.py
│   │   ├── test_import_claude_projects.py
│   │   ├── test_import_memory_json.py
│   │   ├── test_project_add_with_local_path.py
│   │   └── test_upload.py
│   ├── conftest.py
│   ├── db
│   │   └── test_issue_254_foreign_key_constraints.py
│   ├── importers
│   │   ├── test_conversation_indexing.py
│   │   ├── test_importer_base.py
│   │   └── test_importer_utils.py
│   ├── markdown
│   │   ├── __init__.py
│   │   ├── test_date_frontmatter_parsing.py
│   │   ├── test_entity_parser_error_handling.py
│   │   ├── test_entity_parser.py
│   │   ├── test_markdown_plugins.py
│   │   ├── test_markdown_processor.py
│   │   ├── test_observation_edge_cases.py
│   │   ├── test_parser_edge_cases.py
│   │   ├── test_relation_edge_cases.py
│   │   └── test_task_detection.py
│   ├── mcp
│   │   ├── clients
│   │   │   ├── __init__.py
│   │   │   └── test_clients.py
│   │   ├── conftest.py
│   │   ├── test_async_client_modes.py
│   │   ├── test_mcp_container.py
│   │   ├── test_obsidian_yaml_formatting.py
│   │   ├── test_permalink_collision_file_overwrite.py
│   │   ├── test_project_context.py
│   │   ├── test_prompts.py
│   │   ├── test_recent_activity_prompt_modes.py
│   │   ├── test_resources.py
│   │   ├── test_server_lifespan_branches.py
│   │   ├── test_tool_build_context.py
│   │   ├── test_tool_canvas.py
│   │   ├── test_tool_delete_note.py
│   │   ├── test_tool_edit_note.py
│   │   ├── test_tool_list_directory.py
│   │   ├── test_tool_move_note.py
│   │   ├── test_tool_project_management.py
│   │   ├── test_tool_read_content.py
│   │   ├── test_tool_read_note.py
│   │   ├── test_tool_recent_activity.py
│   │   ├── test_tool_resource.py
│   │   ├── test_tool_search.py
│   │   ├── test_tool_utils.py
│   │   ├── test_tool_view_note.py
│   │   ├── test_tool_write_note_kebab_filenames.py
│   │   ├── test_tool_write_note.py
│   │   └── tools
│   │       └── test_chatgpt_tools.py
│   ├── Non-MarkdownFileSupport.pdf
│   ├── README.md
│   ├── repository
│   │   ├── test_entity_repository_upsert.py
│   │   ├── test_entity_repository.py
│   │   ├── test_entity_upsert_issue_187.py
│   │   ├── test_observation_repository.py
│   │   ├── test_postgres_search_repository.py
│   │   ├── test_project_info_repository.py
│   │   ├── test_project_repository.py
│   │   ├── test_relation_repository.py
│   │   ├── test_repository.py
│   │   ├── test_search_repository_edit_bug_fix.py
│   │   └── test_search_repository.py
│   ├── schemas
│   │   ├── test_base_timeframe_minimum.py
│   │   ├── test_memory_serialization.py
│   │   ├── test_memory_url_validation.py
│   │   ├── test_memory_url.py
│   │   ├── test_relation_response_reference_resolution.py
│   │   ├── test_schemas.py
│   │   └── test_search.py
│   ├── Screenshot.png
│   ├── services
│   │   ├── test_context_service.py
│   │   ├── test_directory_service.py
│   │   ├── test_entity_service_disable_permalinks.py
│   │   ├── test_entity_service.py
│   │   ├── test_file_service.py
│   │   ├── test_initialization_cloud_mode_branches.py
│   │   ├── test_initialization.py
│   │   ├── test_link_resolver.py
│   │   ├── test_project_removal_bug.py
│   │   ├── test_project_service_operations.py
│   │   ├── test_project_service.py
│   │   └── test_search_service.py
│   ├── sync
│   │   ├── test_character_conflicts.py
│   │   ├── test_coordinator.py
│   │   ├── test_sync_service_incremental.py
│   │   ├── test_sync_service.py
│   │   ├── test_sync_wikilink_issue.py
│   │   ├── test_tmp_files.py
│   │   ├── test_watch_service_atomic_adds.py
│   │   ├── test_watch_service_edge_cases.py
│   │   ├── test_watch_service_reload.py
│   │   └── test_watch_service.py
│   ├── test_config.py
│   ├── test_deps.py
│   ├── test_production_cascade_delete.py
│   ├── test_project_resolver.py
│   ├── test_rclone_commands.py
│   ├── test_runtime.py
│   ├── test_telemetry.py
│   └── utils
│       ├── test_file_utils.py
│       ├── test_frontmatter_obsidian_compatible.py
│       ├── test_parse_tags.py
│       ├── test_permalink_formatting.py
│       ├── test_timezone_utils.py
│       ├── test_utf8_handling.py
│       └── test_validate_project_path.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/specs/SPEC-17 Semantic Search with ChromaDB.md:
--------------------------------------------------------------------------------

```markdown
---
title: 'SPEC-17: Semantic Search with ChromaDB'
type: spec
permalink: specs/spec-17-semantic-search-chromadb
tags:
- search
- chromadb
- semantic-search
- vector-database
- postgres-migration
---

# SPEC-17: Semantic Search with ChromaDB

Why ChromaDB for Knowledge Management

Your users aren't just searching for keywords - they're trying to:
- "Find notes related to this concept"
- "Show me similar ideas"
- "What else did I write about this topic?"

Example:
    # User searches: "AI ethics"

    # FTS5/MeiliSearch finds:
    - "AI ethics guidelines"     ✅
    - "ethical AI development"   ✅
    - "artificial intelligence"  ❌ No keyword match

    # ChromaDB finds:
    - "AI ethics guidelines"     ✅
    - "ethical AI development"   ✅
    - "artificial intelligence"  ✅ Semantic match!
    - "bias in ML models"        ✅ Related concept
    - "responsible technology"   ✅ Similar theme
    - "neural network fairness"  ✅ Connected idea

ChromaDB vs MeiliSearch vs Typesense

| Feature          | ChromaDB           | MeiliSearch        | Typesense          |
|------------------|--------------------|--------------------|--------------------|
| Semantic Search  | ✅ Excellent        | ❌ No               | ❌ No               |
| Keyword Search   | ⚠️ Via metadata    | ✅ Excellent        | ✅ Excellent        |
| Local Deployment | ✅ Embedded mode    | ⚠️ Server required | ⚠️ Server required |
| No Server Needed | ✅ YES!             | ❌ No               | ❌ No               |
| Embedding Cost   | ~$0.13/1M tokens   | None               | None               |
| Search Speed     | 50-200ms           | 10-50ms            | 10-50ms            |
| Best For         | Semantic discovery | Exact terms        | Exact terms        |

The Killer Feature: Embedded Mode

ChromaDB has an embedded client that runs in-process - NO SERVER NEEDED!

# Local (FOSS) - ChromaDB embedded in Python process
import chromadb

client = chromadb.PersistentClient(path="/path/to/chroma_data")
collection = client.get_or_create_collection("knowledge_base")

# Add documents
collection.add(
  ids=["note1", "note2"],
  documents=["AI ethics", "Neural networks"],
  metadatas=[{"type": "note"}, {"type": "spec"}]
)

# Search - NO API calls, runs locally!
results = collection.query(
  query_texts=["machine learning"],
  n_results=10
)


## Why

### Current Problem: Database Persistence in Cloud
In cloud deployments, `memory.db` (SQLite) doesn't persist across Docker container restarts. This means:
- Database must be rebuilt on every container restart
- Initial sync takes ~49 seconds for 500 files (after optimization in #352)
- Users experience delays on each deployment

### Search Architecture Issues
Current SQLite FTS5 implementation creates a **dual-implementation problem** for PostgreSQL migration:
- FTS5 (SQLite) uses `VIRTUAL TABLE` with `MATCH` queries
- PostgreSQL full-text search uses `TSVECTOR` with `@@` operator
- These are fundamentally incompatible architectures
- Would require **2x search code** and **2x tests** to support both

**Example of incompatibility:**
```python
# SQLite FTS5
"content_stems MATCH :text"

# PostgreSQL
"content_vector @@ plainto_tsquery(:text)"
```

### Search Quality Limitations
Current keyword-based FTS5 has limitations:
- No semantic understanding (search "AI" doesn't find "machine learning")
- No word relationships (search "neural networks" doesn't find "deep learning")
- Limited typo tolerance
- No relevance ranking beyond keyword matching

### Strategic Goal: PostgreSQL Migration
Moving to PostgreSQL (Neon) for cloud deployments would:
- ✅ Solve persistence issues (database survives restarts)
- ✅ Enable multi-tenant architecture
- ✅ Better performance for large datasets
- ✅ Support for cloud-native scaling

**But requires solving the search compatibility problem.**

## What

Migrate from SQLite FTS5 to **ChromaDB** for semantic vector search across all deployments.

**Key insight:** ChromaDB is **database-agnostic** - it works with both SQLite and PostgreSQL, eliminating the dual-implementation problem.

### Affected Areas
- Search implementation (`src/basic_memory/repository/search_repository.py`)
- Search service (`src/basic_memory/services/search_service.py`)
- Search models (`src/basic_memory/models/search.py`)
- Database initialization (`src/basic_memory/db.py`)
- MCP search tools (`src/basic_memory/mcp/tools/search.py`)
- Dependencies (`pyproject.toml` - add ChromaDB)
- Alembic migrations (FTS5 table removal)
- Documentation

### What Changes
**Removed:**
- SQLite FTS5 virtual table
- `MATCH` query syntax
- FTS5-specific tokenization and prefix handling
- ~300 lines of FTS5 query preparation code

**Added:**
- ChromaDB persistent client (embedded mode)
- Vector embedding generation
- Semantic similarity search
- Local embedding model (`sentence-transformers`)
- Collection management for multi-project support

### What Stays the Same
- Search API interface (MCP tools, REST endpoints)
- Entity/Observation/Relation indexing workflow
- Multi-project isolation
- Search filtering by type, date, metadata
- Pagination and result formatting
- **All SQL queries for exact lookups and metadata filtering**

## Hybrid Architecture: SQL + ChromaDB

**Critical Design Decision:** ChromaDB **complements** SQL, it doesn't **replace** it.

### Why Hybrid?

ChromaDB is excellent for semantic text search but terrible for exact lookups. SQL is perfect for exact lookups and structured queries. We use both:

```
┌─────────────────────────────────────────────────┐
│ Search Request                                   │
└─────────────────────────────────────────────────┘
                    ▼
       ┌────────────────────────┐
       │ SearchRepository       │
       │  (Smart Router)        │
       └────────────────────────┘
              ▼           ▼
  ┌───────────┐      ┌──────────────┐
  │ SQL       │      │ ChromaDB     │
  │ Queries   │      │ Semantic     │
  └───────────┘      └──────────────┘
       ▼                    ▼
  Exact lookups      Text search
  - Permalink        - Semantic similarity
  - Pattern match    - Related concepts
  - Title exact      - Typo tolerance
  - Metadata filter  - Fuzzy matching
  - Date ranges
```

### When to Use Each

#### Use SQL For (Fast & Exact)

**Exact Permalink Lookup:**
```python
# Find by exact permalink - SQL wins
"SELECT * FROM entities WHERE permalink = 'specs/search-feature'"
# ~1ms, perfect for exact matches

# ChromaDB would be: ~50ms, wasteful
```

**Pattern Matching:**
```python
# Find all specs - SQL wins
"SELECT * FROM entities WHERE permalink GLOB 'specs/*'"
# ~5ms, perfect for wildcards

# ChromaDB doesn't support glob patterns
```

**Pure Metadata Queries:**
```python
# Find all meetings tagged "important" - SQL wins
"SELECT * FROM entities
 WHERE json_extract(entity_metadata, '$.entity_type') = 'meeting'
 AND json_extract(entity_metadata, '$.tags') LIKE '%important%'"
# ~5ms, structured query

# No text search needed, SQL is faster and simpler
```

**Date Filtering:**
```python
# Find recent specs - SQL wins
"SELECT * FROM entities
 WHERE entity_type = 'spec'
 AND created_at > '2024-01-01'
 ORDER BY created_at DESC"
# ~2ms, perfect for structured data
```

#### Use ChromaDB For (Semantic & Fuzzy)

**Semantic Content Search:**
```python
# Find notes about "neural networks" - ChromaDB wins
collection.query(query_texts=["neural networks"])
# Finds: "machine learning", "deep learning", "AI models"
# ~50-100ms, semantic understanding

# SQL FTS5 would only find exact keyword matches
```

**Text Search + Metadata:**
```python
# Find meeting notes about "project planning" tagged "important"
collection.query(
    query_texts=["project planning"],
    where={
        "entity_type": "meeting",
        "tags": {"$contains": "important"}
    }
)
# ~100ms, semantic search with filters
# Finds: "roadmap discussion", "sprint planning", etc.
```

**Typo Tolerance:**
```python
# User types "serch feature" (typo) - ChromaDB wins
collection.query(query_texts=["serch feature"])
# Still finds: "search feature" documents
# ~50-100ms, fuzzy matching

# SQL would find nothing
```

### Performance Comparison

| Query Type | SQL | ChromaDB | Winner |
|-----------|-----|----------|--------|
| Exact permalink | 1-2ms | 50ms | ✅ SQL |
| Pattern match (specs/*) | 5-10ms | N/A | ✅ SQL |
| Pure metadata filter | 5ms | 50ms | ✅ SQL |
| Semantic text search | ❌ Can't | 50-100ms | ✅ ChromaDB |
| Text + metadata | ❌ Keywords only | 100ms | ✅ ChromaDB |
| Typo tolerance | ❌ Can't | 50ms | ✅ ChromaDB |

### Metadata/Frontmatter Handling

**Both systems support full frontmatter filtering!**

#### SQL Metadata Storage

```python
# Entities table stores frontmatter as JSON
CREATE TABLE entities (
    id INTEGER PRIMARY KEY,
    title TEXT,
    permalink TEXT,
    file_path TEXT,
    entity_type TEXT,
    entity_metadata JSON,  -- All frontmatter here!
    created_at DATETIME,
    ...
)

# Query frontmatter fields
SELECT * FROM entities
WHERE json_extract(entity_metadata, '$.entity_type') = 'meeting'
  AND json_extract(entity_metadata, '$.tags') LIKE '%important%'
  AND json_extract(entity_metadata, '$.status') = 'completed'
```

#### ChromaDB Metadata Storage

```python
# When indexing, store ALL frontmatter as metadata
class ChromaSearchBackend:
    async def index_entity(self, entity: Entity):
        """Index with complete frontmatter metadata."""

        # Extract ALL frontmatter fields
        metadata = {
            "entity_id": entity.id,
            "project_id": entity.project_id,
            "permalink": entity.permalink,
            "file_path": entity.file_path,
            "entity_type": entity.entity_type,
            "type": "entity",
            # ALL frontmatter tags
            "tags": entity.entity_metadata.get("tags", []),
            # Custom frontmatter fields
            "status": entity.entity_metadata.get("status"),
            "priority": entity.entity_metadata.get("priority"),
            # Spread any other custom fields
            **{k: v for k, v in entity.entity_metadata.items()
               if k not in ["tags", "entity_type"]}
        }

        self.collection.upsert(
            ids=[f"entity_{entity.id}_{entity.project_id}"],
            documents=[self._format_document(entity)],
            metadatas=[metadata]  # Full frontmatter!
        )
```

#### ChromaDB Metadata Queries

ChromaDB supports rich filtering:

```python
# Simple filter - single field
collection.query(
    query_texts=["project planning"],
    where={"entity_type": "meeting"}
)

# Multiple conditions (AND)
collection.query(
    query_texts=["architecture decisions"],
    where={
        "entity_type": "spec",
        "tags": {"$contains": "important"}
    }
)

# Complex filters with operators
collection.query(
    query_texts=["machine learning"],
    where={
        "$and": [
            {"entity_type": {"$in": ["note", "spec"]}},
            {"tags": {"$contains": "AI"}},
            {"created_at": {"$gt": "2024-01-01"}},
            {"status": "in-progress"}
        ]
    }
)

# Multiple tags (all must match)
collection.query(
    query_texts=["cloud architecture"],
    where={
        "$and": [
            {"tags": {"$contains": "architecture"}},
            {"tags": {"$contains": "cloud"}}
        ]
    }
)
```

### Smart Routing Implementation

```python
class SearchRepository:
    def __init__(
        self,
        session_maker: async_sessionmaker[AsyncSession],
        project_id: int,
        chroma_backend: ChromaSearchBackend
    ):
        self.sql = session_maker  # Keep SQL!
        self.chroma = chroma_backend
        self.project_id = project_id

    async def search(
        self,
        search_text: Optional[str] = None,
        permalink: Optional[str] = None,
        permalink_match: Optional[str] = None,
        title: Optional[str] = None,
        types: Optional[List[str]] = None,
        tags: Optional[List[str]] = None,
        after_date: Optional[datetime] = None,
        custom_metadata: Optional[dict] = None,
        limit: int = 10,
        offset: int = 0,
    ) -> List[SearchIndexRow]:
        """Smart routing between SQL and ChromaDB."""

        # ==========================================
        # Route 1: Exact Lookups → SQL (1-5ms)
        # ==========================================

        if permalink:
            # Exact permalink: "specs/search-feature"
            return await self._sql_permalink_lookup(permalink)

        if permalink_match:
            # Pattern match: "specs/*"
            return await self._sql_pattern_match(permalink_match)

        if title and not search_text:
            # Exact title lookup (no semantic search needed)
            return await self._sql_title_match(title)

        # ==========================================
        # Route 2: Pure Metadata → SQL (5-10ms)
        # ==========================================

        # No text search, just filtering by metadata
        if not search_text and (types or tags or after_date or custom_metadata):
            return await self._sql_metadata_filter(
                types=types,
                tags=tags,
                after_date=after_date,
                custom_metadata=custom_metadata,
                limit=limit,
                offset=offset
            )

        # ==========================================
        # Route 3: Text Search → ChromaDB (50-100ms)
        # ==========================================

        if search_text:
            # Build ChromaDB metadata filters
            where_filters = self._build_chroma_filters(
                types=types,
                tags=tags,
                after_date=after_date,
                custom_metadata=custom_metadata
            )

            # Semantic search with metadata filtering
            return await self.chroma.search(
                query_text=search_text,
                project_id=self.project_id,
                where=where_filters,
                limit=limit
            )

        # ==========================================
        # Route 4: List All → SQL (2-5ms)
        # ==========================================

        return await self._sql_list_entities(
            limit=limit,
            offset=offset
        )

    def _build_chroma_filters(
        self,
        types: Optional[List[str]] = None,
        tags: Optional[List[str]] = None,
        after_date: Optional[datetime] = None,
        custom_metadata: Optional[dict] = None
    ) -> dict:
        """Build ChromaDB where clause from filters."""
        filters = {"project_id": self.project_id}

        # Type filtering
        if types:
            if len(types) == 1:
                filters["entity_type"] = types[0]
            else:
                filters["entity_type"] = {"$in": types}

        # Tag filtering (array contains)
        if tags:
            if len(tags) == 1:
                filters["tags"] = {"$contains": tags[0]}
            else:
                # Multiple tags - all must match
                filters = {
                    "$and": [
                        filters,
                        *[{"tags": {"$contains": tag}} for tag in tags]
                    ]
                }

        # Date filtering
        if after_date:
            filters["created_at"] = {"$gt": after_date.isoformat()}

        # Custom frontmatter fields
        if custom_metadata:
            filters.update(custom_metadata)

        return filters

    async def _sql_metadata_filter(
        self,
        types: Optional[List[str]] = None,
        tags: Optional[List[str]] = None,
        after_date: Optional[datetime] = None,
        custom_metadata: Optional[dict] = None,
        limit: int = 10,
        offset: int = 0
    ) -> List[SearchIndexRow]:
        """Pure metadata queries using SQL."""
        conditions = ["project_id = :project_id"]
        params = {"project_id": self.project_id}

        if types:
            type_list = ", ".join(f"'{t}'" for t in types)
            conditions.append(f"entity_type IN ({type_list})")

        if tags:
            # Check each tag
            for i, tag in enumerate(tags):
                param_name = f"tag_{i}"
                conditions.append(
                    f"json_extract(entity_metadata, '$.tags') LIKE :{param_name}"
                )
                params[param_name] = f"%{tag}%"

        if after_date:
            conditions.append("created_at > :after_date")
            params["after_date"] = after_date

        if custom_metadata:
            for key, value in custom_metadata.items():
                param_name = f"meta_{key}"
                conditions.append(
                    f"json_extract(entity_metadata, '$.{key}') = :{param_name}"
                )
                params[param_name] = value

        where = " AND ".join(conditions)
        sql = f"""
            SELECT * FROM entities
            WHERE {where}
            ORDER BY created_at DESC
            LIMIT :limit OFFSET :offset
        """
        params["limit"] = limit
        params["offset"] = offset

        async with db.scoped_session(self.session_maker) as session:
            result = await session.execute(text(sql), params)
            return self._format_sql_results(result)
```

### Real-World Examples

#### Example 1: Pure Metadata Query (No Text)
```python
# "Find all meetings tagged 'important'"
results = await search_repo.search(
    types=["meeting"],
    tags=["important"]
)

# Routing: → SQL (~5ms)
# SQL: SELECT * FROM entities
#      WHERE entity_type = 'meeting'
#      AND json_extract(entity_metadata, '$.tags') LIKE '%important%'
```

#### Example 2: Semantic Search (No Metadata)
```python
# "Find notes about neural networks"
results = await search_repo.search(
    search_text="neural networks"
)

# Routing: → ChromaDB (~80ms)
# Finds: "machine learning", "deep learning", "AI models", etc.
```

#### Example 3: Semantic + Metadata
```python
# "Find meeting notes about 'project planning' tagged 'important'"
results = await search_repo.search(
    search_text="project planning",
    types=["meeting"],
    tags=["important"]
)

# Routing: → ChromaDB with filters (~100ms)
# ChromaDB: query_texts=["project planning"]
#           where={"entity_type": "meeting",
#                  "tags": {"$contains": "important"}}
# Finds: "roadmap discussion", "sprint planning", etc.
```

#### Example 4: Complex Frontmatter Query
```python
# "Find in-progress specs with multiple tags, recent"
results = await search_repo.search(
    types=["spec"],
    tags=["architecture", "cloud"],
    after_date=datetime(2024, 1, 1),
    custom_metadata={"status": "in-progress"}
)

# Routing: → SQL (~10ms)
# No text search, pure structured query - SQL is faster
```

#### Example 5: Semantic + Complex Metadata
```python
# "Find notes about 'authentication' that are in-progress"
results = await search_repo.search(
    search_text="authentication",
    custom_metadata={"status": "in-progress", "priority": "high"}
)

# Routing: → ChromaDB with metadata filters (~100ms)
# Semantic search for "authentication" concept
# Filters by status and priority in metadata
```

#### Example 6: Exact Permalink
```python
# "Show me specs/search-feature"
results = await search_repo.search(
    permalink="specs/search-feature"
)

# Routing: → SQL (~1ms)
# SQL: SELECT * FROM entities WHERE permalink = 'specs/search-feature'
```

#### Example 7: Pattern Match
```python
# "Show me all specs"
results = await search_repo.search(
    permalink_match="specs/*"
)

# Routing: → SQL (~5ms)
# SQL: SELECT * FROM entities WHERE permalink GLOB 'specs/*'
```

### What We Remove vs Keep

**REMOVE (FTS5-specific):**
- ❌ `CREATE VIRTUAL TABLE search_index USING fts5(...)`
- ❌ `MATCH` operator queries
- ❌ FTS5 tokenization configuration
- ❌ ~300 lines of FTS5 query preparation code
- ❌ Trigram generation and prefix handling

**KEEP (Standard SQL):**
- ✅ `SELECT * FROM entities WHERE permalink = :permalink`
- ✅ `SELECT * FROM entities WHERE permalink GLOB :pattern`
- ✅ `SELECT * FROM entities WHERE title LIKE :title`
- ✅ `SELECT * FROM entities WHERE json_extract(entity_metadata, ...) = :value`
- ✅ All date filtering, pagination, sorting
- ✅ Entity table structure and indexes

**ADD (ChromaDB):**
- ✅ ChromaDB persistent client (embedded)
- ✅ Semantic vector search
- ✅ Metadata filtering in ChromaDB
- ✅ Smart routing logic

## How (High Level)

### Architecture Overview

```
┌─────────────────────────────────────────────────────────────┐
│ FOSS Deployment (Local)                                      │
├─────────────────────────────────────────────────────────────┤
│ SQLite (data) + ChromaDB embedded (search)                   │
│ - No external services                                       │
│ - Local embedding model (sentence-transformers)              │
│ - Persists in ~/.basic-memory/chroma_data/                  │
└─────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────┐
│ Cloud Deployment (Multi-tenant)                              │
├─────────────────────────────────────────────────────────────┤
│ PostgreSQL/Neon (data) + ChromaDB server (search)           │
│ - Neon serverless Postgres for persistence                   │
│ - ChromaDB server in Docker container                        │
│ - Optional: OpenAI embeddings for better quality             │
└─────────────────────────────────────────────────────────────┘
```

### Phase 1: ChromaDB Integration (2-3 days)

#### 1. Add ChromaDB Dependency
```toml
# pyproject.toml
dependencies = [
    "chromadb>=0.4.0",
    "sentence-transformers>=2.2.0",  # Local embeddings
]
```

#### 2. Create ChromaSearchBackend
```python
# src/basic_memory/search/chroma_backend.py
from chromadb import PersistentClient
from chromadb.utils import embedding_functions

class ChromaSearchBackend:
    def __init__(
        self,
        persist_directory: Path,
        collection_name: str = "knowledge_base",
        embedding_model: str = "all-MiniLM-L6-v2"
    ):
        """Initialize ChromaDB with local embeddings."""
        self.client = PersistentClient(path=str(persist_directory))

        # Use local sentence-transformers model (no API costs)
        self.embed_fn = embedding_functions.SentenceTransformerEmbeddingFunction(
            model_name=embedding_model
        )

        self.collection = self.client.get_or_create_collection(
            name=collection_name,
            embedding_function=self.embed_fn,
            metadata={"hnsw:space": "cosine"}  # Similarity metric
        )

    async def index_entity(self, entity: Entity):
        """Index entity with automatic embeddings."""
        # Combine title and content for semantic search
        document = self._format_document(entity)

        self.collection.upsert(
            ids=[f"entity_{entity.id}_{entity.project_id}"],
            documents=[document],
            metadatas=[{
                "entity_id": entity.id,
                "project_id": entity.project_id,
                "permalink": entity.permalink,
                "file_path": entity.file_path,
                "entity_type": entity.entity_type,
                "type": "entity",
            }]
        )

    async def search(
        self,
        query_text: str,
        project_id: int,
        limit: int = 10,
        filters: dict = None
    ) -> List[SearchResult]:
        """Semantic search with metadata filtering."""
        where = {"project_id": project_id}
        if filters:
            where.update(filters)

        results = self.collection.query(
            query_texts=[query_text],
            n_results=limit,
            where=where
        )

        return self._format_results(results)
```

#### 3. Update SearchRepository
```python
# src/basic_memory/repository/search_repository.py
class SearchRepository:
    def __init__(
        self,
        session_maker: async_sessionmaker[AsyncSession],
        project_id: int,
        chroma_backend: ChromaSearchBackend
    ):
        self.session_maker = session_maker
        self.project_id = project_id
        self.chroma = chroma_backend

    async def search(
        self,
        search_text: Optional[str] = None,
        permalink: Optional[str] = None,
        # ... other filters
    ) -> List[SearchIndexRow]:
        """Search using ChromaDB for text, SQL for exact lookups."""

        # For exact permalink/pattern matches, use SQL
        if permalink or permalink_match:
            return await self._sql_exact_search(...)

        # For text search, use ChromaDB semantic search
        if search_text:
            results = await self.chroma.search(
                query_text=search_text,
                project_id=self.project_id,
                limit=limit,
                filters=self._build_filters(types, after_date, ...)
            )
            return results

        # Fallback to listing all
        return await self._list_entities(...)
```

#### 4. Update SearchService
```python
# src/basic_memory/services/search_service.py
class SearchService:
    def __init__(
        self,
        search_repository: SearchRepository,
        entity_repository: EntityRepository,
        file_service: FileService,
        chroma_backend: ChromaSearchBackend,
    ):
        self.repository = search_repository
        self.entity_repository = entity_repository
        self.file_service = file_service
        self.chroma = chroma_backend

    async def index_entity(self, entity: Entity):
        """Index entity in ChromaDB."""
        if entity.is_markdown:
            await self._index_entity_markdown(entity)
        else:
            await self._index_entity_file(entity)

    async def _index_entity_markdown(self, entity: Entity):
        """Index markdown entity with full content."""
        # Index entity
        await self.chroma.index_entity(entity)

        # Index observations (as separate documents)
        for obs in entity.observations:
            await self.chroma.index_observation(obs, entity)

        # Index relations (metadata only)
        for rel in entity.outgoing_relations:
            await self.chroma.index_relation(rel, entity)
```

### Phase 2: PostgreSQL Support (1 day)

#### 1. Add PostgreSQL Database Type
```python
# src/basic_memory/db.py
class DatabaseType(Enum):
    MEMORY = auto()
    FILESYSTEM = auto()
    POSTGRESQL = auto()  # NEW

    @classmethod
    def get_db_url(cls, db_path_or_url: str, db_type: "DatabaseType") -> str:
        if db_type == cls.POSTGRESQL:
            return db_path_or_url  # Neon connection string
        elif db_type == cls.MEMORY:
            return "sqlite+aiosqlite://"
        return f"sqlite+aiosqlite:///{db_path_or_url}"
```

#### 2. Update Connection Handling
```python
def _create_engine_and_session(...):
    db_url = DatabaseType.get_db_url(db_path_or_url, db_type)

    if db_type == DatabaseType.POSTGRESQL:
        # Use asyncpg driver for Postgres
        engine = create_async_engine(
            db_url,
            pool_size=10,
            max_overflow=20,
            pool_pre_ping=True,  # Health checks
        )
    else:
        # SQLite configuration
        engine = create_async_engine(db_url, connect_args=connect_args)

        # Only configure SQLite-specific settings for SQLite
        if db_type != DatabaseType.MEMORY:
            @event.listens_for(engine.sync_engine, "connect")
            def enable_wal_mode(dbapi_conn, connection_record):
                _configure_sqlite_connection(dbapi_conn, enable_wal=True)

    return engine, async_sessionmaker(engine, expire_on_commit=False)
```

#### 3. Remove SQLite-Specific Code
```python
# Remove from scoped_session context manager:
# await session.execute(text("PRAGMA foreign_keys=ON"))  # DELETE

# PostgreSQL handles foreign keys by default
```

### Phase 3: Migration & Testing (1-2 days)

#### 1. Create Migration Script
```python
# scripts/migrate_to_chromadb.py
async def migrate_fts5_to_chromadb():
    """One-time migration from FTS5 to ChromaDB."""
    # 1. Read all entities from database
    entities = await entity_repository.find_all()

    # 2. Index in ChromaDB
    for entity in entities:
        await search_service.index_entity(entity)

    # 3. Drop FTS5 table (Alembic migration)
    await session.execute(text("DROP TABLE IF EXISTS search_index"))
```

#### 2. Update Tests
- Replace FTS5 test fixtures with ChromaDB fixtures
- Test semantic search quality
- Test multi-project isolation in ChromaDB
- Benchmark performance vs FTS5

#### 3. Documentation Updates
- Update search documentation
- Add ChromaDB configuration guide
- Document embedding model options
- PostgreSQL deployment guide

### Configuration

```python
# config.py
class BasicMemoryConfig:
    # Database
    database_type: DatabaseType = DatabaseType.FILESYSTEM
    database_path: Path = Path.home() / ".basic-memory" / "memory.db"
    database_url: Optional[str] = None  # For Postgres: postgresql://...

    # Search
    chroma_persist_directory: Path = Path.home() / ".basic-memory" / "chroma_data"
    embedding_model: str = "all-MiniLM-L6-v2"  # Local model
    embedding_provider: str = "local"  # or "openai"
    openai_api_key: Optional[str] = None  # For cloud deployments
```

### Deployment Configurations

#### Local (FOSS)
```yaml
# Default configuration
database_type: FILESYSTEM
database_path: ~/.basic-memory/memory.db
chroma_persist_directory: ~/.basic-memory/chroma_data
embedding_model: all-MiniLM-L6-v2
embedding_provider: local
```

#### Cloud (Docker Compose)
```yaml
services:
  postgres:
    image: postgres:15
    environment:
      POSTGRES_DB: basic_memory
      POSTGRES_PASSWORD: ${DB_PASSWORD}

  chromadb:
    image: chromadb/chroma:latest
    volumes:
      - chroma_data:/chroma/chroma
    environment:
      ALLOW_RESET: true

  app:
    environment:
      DATABASE_TYPE: POSTGRESQL
      DATABASE_URL: postgresql://postgres:${DB_PASSWORD}@postgres/basic_memory
      CHROMA_HOST: chromadb
      CHROMA_PORT: 8000
      EMBEDDING_PROVIDER: local  # or openai
```

## How to Evaluate

### Success Criteria

#### Functional Requirements
- ✅ Semantic search finds related concepts (e.g., "AI" finds "machine learning")
- ✅ Exact permalink/pattern matches work (e.g., `specs/*`)
- ✅ Multi-project isolation maintained
- ✅ All existing search filters work (type, date, metadata)
- ✅ MCP tools continue to work without changes
- ✅ Works with both SQLite and PostgreSQL

#### Performance Requirements
- ✅ Search latency < 200ms for 1000 documents (local embedding)
- ✅ Indexing time comparable to FTS5 (~10 files/sec)
- ✅ Initial sync time not significantly worse than current
- ✅ Memory footprint < 1GB for local deployments

#### Quality Requirements
- ✅ Better search relevance than FTS5 keyword matching
- ✅ Handles typos and word variations
- ✅ Finds semantically similar content

#### Deployment Requirements
- ✅ FOSS: Works out-of-box with no external services
- ✅ Cloud: Integrates with PostgreSQL (Neon)
- ✅ No breaking changes to MCP API
- ✅ Migration script for existing users

### Testing Procedure

#### 1. Unit Tests
```bash
# Test ChromaDB backend
pytest tests/test_chroma_backend.py

# Test search repository with ChromaDB
pytest tests/test_search_repository.py

# Test search service
pytest tests/test_search_service.py
```

#### 2. Integration Tests
```bash
# Test full search workflow
pytest test-int/test_search_integration.py

# Test with PostgreSQL
DATABASE_TYPE=POSTGRESQL pytest test-int/
```

#### 3. Semantic Search Quality Tests
```python
# Test semantic similarity
search("machine learning") should find:
- "neural networks"
- "deep learning"
- "AI algorithms"

search("software architecture") should find:
- "system design"
- "design patterns"
- "microservices"
```

#### 4. Performance Benchmarks
```bash
# Run search benchmarks
pytest test-int/test_search_performance.py -v

# Measure:
- Search latency (should be < 200ms)
- Indexing throughput (should be ~10 files/sec)
- Memory usage (should be < 1GB)
```

#### 5. Migration Testing
```bash
# Test migration from FTS5 to ChromaDB
python scripts/migrate_to_chromadb.py

# Verify all entities indexed
# Verify search results quality
# Verify no data loss
```

### Metrics

**Search Quality:**
- Semantic relevance score (manual evaluation)
- Precision/recall for common queries
- User satisfaction (qualitative)

**Performance:**
- Average search latency (ms)
- P95/P99 search latency
- Indexing throughput (files/sec)
- Memory usage (MB)

**Deployment:**
- Local deployment success rate
- Cloud deployment success rate
- Migration success rate

## Implementation Checklist

### Phase 1: ChromaDB Integration
- [ ] Add ChromaDB and sentence-transformers dependencies
- [ ] Create ChromaSearchBackend class
- [ ] Update SearchRepository to use ChromaDB
- [ ] Update SearchService indexing methods
- [ ] Remove FTS5 table creation code
- [ ] Update search query logic
- [ ] Add ChromaDB configuration to BasicMemoryConfig

### Phase 2: PostgreSQL Support
- [ ] Add DatabaseType.POSTGRESQL enum
- [ ] Update get_db_url() for Postgres connection strings
- [ ] Add asyncpg dependency
- [ ] Update engine creation for Postgres
- [ ] Remove SQLite-specific PRAGMA statements
- [ ] Test with Neon database

### Phase 3: Testing & Migration
- [ ] Write unit tests for ChromaSearchBackend
- [ ] Update search integration tests
- [ ] Add semantic search quality tests
- [ ] Create performance benchmarks
- [ ] Write migration script from FTS5
- [ ] Test migration with existing data
- [ ] Update documentation

### Phase 4: Deployment
- [ ] Update docker-compose.yml for cloud
- [ ] Document local FOSS deployment
- [ ] Document cloud PostgreSQL deployment
- [ ] Create migration guide for users
- [ ] Update MCP tool documentation

## Notes

### Embedding Model Trade-offs

**Local Model: `all-MiniLM-L6-v2`**
- Size: 80MB download
- Speed: ~50ms embedding time
- Dimensions: 384
- Cost: $0
- Quality: Good for general knowledge
- Best for: FOSS deployments

**OpenAI: `text-embedding-3-small`**
- Speed: ~100-200ms (API call)
- Dimensions: 1536
- Cost: ~$0.13 per 1M tokens (~$0.01 per 1000 notes)
- Quality: Excellent
- Best for: Cloud deployments with budget

### ChromaDB Storage

ChromaDB stores data in:
```
~/.basic-memory/chroma_data/
  ├── chroma.sqlite3        # Metadata
  ├── index/                # HNSW indexes
  └── collections/          # Vector data
```

Typical sizes:
- 100 notes: ~5MB
- 1000 notes: ~50MB
- 10000 notes: ~500MB

### Why Not Keep FTS5?

**Considered:** Hybrid approach (FTS5 for SQLite + tsvector for Postgres)
**Rejected because:**
- 2x the code to maintain
- 2x the tests to write
- 2x the bugs to fix
- Inconsistent search behavior between deployments
- ChromaDB provides better search quality anyway

**ChromaDB wins:**
- One implementation for both databases
- Better search quality (semantic!)
- Database-agnostic architecture
- Embedded mode for FOSS (no servers needed)

## implementation

  Proposed Architecture

  Option 1: ChromaDB Only (Simplest)

  class ChromaSearchBackend:
      def __init__(self, path: str, embedding_model: str = "all-MiniLM-L6-v2"):yes
          # For local: embedded client (no server!)
          self.client = chromadb.PersistentClient(path=path)

          # Use local embedding model (no API costs!)
          from chromadb.utils import embedding_functions
          self.embed_fn = embedding_functions.SentenceTransformerEmbeddingFunction(
              model_name=embedding_model
          )

          self.collection = self.client.get_or_create_collection(
              name="knowledge_base",
              embedding_function=self.embed_fn
          )

      async def index_entity(self, entity: Entity):
          # ChromaDB handles embeddings automatically!
          self.collection.upsert(
              ids=[str(entity.id)],
              documents=[f"{entity.title}\n{entity.content}"],
              metadatas=[{
                  "permalink": entity.permalink,
                  "type": entity.entity_type,
                  "file_path": entity.file_path
              }]
          )

      async def search(self, query: str, filters: dict = None):
          # Semantic search with optional metadata filters
          results = self.collection.query(
              query_texts=[query],
              n_results=10,
              where=filters  # e.g., {"type": "note"}
          )
          return results

  Deployment:
  - Local (FOSS): ChromaDB embedded, local embedding model, NO servers
  - Cloud: ChromaDB server OR still embedded (it's just a Python lib!)

  Option 2: Hybrid FTS + ChromaDB (Best UX)

  class HybridSearchBackend:
      def __init__(self):
          self.fts = SQLiteFTS5Backend()    # Fast keyword search
          self.chroma = ChromaSearchBackend()  # Semantic search

      async def search(self, query: str, search_type: str = "auto"):
          if search_type == "exact":
              # User wants exact match: "specs/search-feature"
              return await self.fts.search(query)

          elif search_type == "semantic":
              # User wants related concepts
              return await self.chroma.search(query)

          else:  # "auto"
              # Check if query looks like exact match
              if "/" in query or query.startswith('"'):
                  return await self.fts.search(query)

              # Otherwise use semantic search
              return await self.chroma.search(query)

  Embedding Options

  Option A: Local Model (FREE, FOSS-friendly)

  # Uses sentence-transformers (runs locally)
  # Model: ~100MB download
  # Speed: ~50-100ms for embedding
  # Cost: $0

  from chromadb.utils import embedding_functions
  embed_fn = embedding_functions.SentenceTransformerEmbeddingFunction(
      model_name="all-MiniLM-L6-v2"  # Fast, accurate, free
  )

  Option B: OpenAI Embeddings (Cloud only)

  # For cloud users who want best quality
  # Model: text-embedding-3-small
  # Speed: ~100-200ms via API
  # Cost: ~$0.13 per 1M tokens (~$0.01 per 1000 notes)

  embed_fn = embedding_functions.OpenAIEmbeddingFunction(
      api_key="...",
      model_name="text-embedding-3-small"
  )

  Performance Comparison

  Local embedding model: all-MiniLM-L6-v2
  Embedding time: ~50ms per note
  Search time: ~100ms for 1000 notes
  Memory: ~500MB (model + ChromaDB)
  Cost: $0
  Quality: Good (384 dimensions)

  OpenAI embeddings: text-embedding-3-small
  Embedding time: ~100-200ms per note (API call)
  Search time: ~50ms for 1000 notes
  Cost: ~$0.01 per 1000 notes
  Quality: Excellent (1536 dimensions)

  My Recommendation: ChromaDB with Local Embeddings

  Here's the plan:

  Phase 1: Local ChromaDB (1-2 days)

  # FOSS version
  - SQLite for data persistence
  - ChromaDB embedded for semantic search
  - Local embedding model (no API costs)
  - NO external services required

  Benefits:
  - ✅ Same deployment as current (just Python package)
  - ✅ Semantic search for better UX
  - ✅ Free embeddings with local model
  - ✅ No servers needed

  Phase 2: Postgres + ChromaDB Cloud (1-2 days)

  # Cloud version
  - Postgres for data persistence
  - ChromaDB server for semantic search
  - OpenAI embeddings (higher quality)
  - OR keep local embeddings (cheaper)

  Phase 3: Hybrid Search (optional, 1 day)

  # Add FTS for exact matches alongside ChromaDB
  - Quick keyword search when needed
  - Semantic search for exploration
  - Best of both worlds

  Code Estimate

  Just ChromaDB (replacing FTS5):
  - Remove FTS5 code: 2 hours
  - Add ChromaDB backend: 4 hours
  - Update search service: 2 hours
  - Testing: 4 hours
  - Total: 1.5 days

  ChromaDB + Postgres migration:
  - Add Postgres support: 4 hours
  - Test with Neon: 2 hours
  - Total: +0.75 days

  Grand total: 2-3 days for complete migration

  The Kicker

  ChromaDB solves BOTH problems:
  1. ✅ Works with SQLite AND Postgres (it's separate!)
  2. ✅ No server needed for local (embedded mode)
  3. ✅ Better search than FTS5 (semantic!)
  4. ✅ One implementation for both deployments

  Want me to prototype this? I can show you:
  1. ChromaDB embedded with local embeddings
  2. Example searches showing semantic matching
  3. Performance benchmarks
  4. Migration from FTS5


## Observations

- [problem] SQLite FTS5 and PostgreSQL tsvector are incompatible architectures requiring dual implementation #database-compatibility
- [problem] Cloud deployments lose database on container restart requiring full re-sync #persistence
- [solution] ChromaDB provides database-agnostic semantic search eliminating dual implementation #architecture
- [advantage] Semantic search finds related concepts beyond keyword matching improving UX #search-quality
- [deployment] Embedded ChromaDB requires no external services for FOSS #simplicity
- [migration] Moving to PostgreSQL solves cloud persistence issues #cloud-architecture
- [performance] Local embedding models provide good quality at zero cost #cost-optimization
- [trade-off] Embedding generation adds ~50ms latency vs instant FTS5 indexing #performance
- [benefit] Single search codebase reduces maintenance burden and test coverage needs #maintainability

## Prior Art / References

### Community Fork: manuelbliemel/basic-memory (feature/vector-search)

**Repository**: https://github.com/manuelbliemel/basic-memory/tree/feature/vector-search

**Key Implementation Details**:

**Vector Database**: ChromaDB (same as our approach!)

**Embedding Models**:
- Local: `all-MiniLM-L6-v2` (default, 384 dims) - same model we planned
- Also supports: `all-mpnet-base-v2`, `paraphrase-MiniLM-L6-v2`, `multi-qa-MiniLM-L6-cos-v1`
- OpenAI: `text-embedding-ada-002`, `text-embedding-3-small`, `text-embedding-3-large`

**Chunking Strategy** (interesting - we didn't consider this):
- Chunk Size: 500 characters
- Chunk Overlap: 50 characters
- Breaks documents into smaller pieces for better semantic search

**Search Strategies**:
1. `fuzzy_only` (default) - FTS5 only
2. `vector_only` - ChromaDB only
3. `hybrid` (recommended) - Both FTS5 + ChromaDB
4. `fuzzy_primary` - FTS5 first, ChromaDB fallback
5. `vector_primary` - ChromaDB first, FTS5 fallback

**Configuration**:
- Similarity Threshold: 0.1
- Max Results: 5
- Storage: `~/.basic-memory/chroma/`
- Config: `~/.basic-memory/config.json`

**Key Differences from Our Approach**:

| Aspect | Their Approach | Our Approach |
|--------|---------------|--------------|
| FTS5 | Keep FTS5 + add ChromaDB | Remove FTS5, use SQL for exact lookups |
| Search Strategy | 5 configurable strategies | Smart routing (automatic) |
| Document Processing | Chunk into 500-char pieces | Index full documents |
| Hybrid Mode | Run both, merge, dedupe | Route to best backend |
| Configuration | User-configurable strategy | Automatic based on query type |

**What We Can Learn**:

1. **Chunking**: Breaking documents into 500-character chunks with 50-char overlap may improve semantic search quality for long documents
   - Pro: Better granularity for semantic matching
   - Con: More vectors to store and search
   - Consider: Optional chunking for large documents (>2000 chars)

2. **Configurable Strategies**: Allowing users to choose search strategy provides flexibility
   - Pro: Power users can tune behavior
   - Con: More complexity, most users won't configure
   - Consider: Default to smart routing, allow override via config

3. **Similarity Threshold**: They use 0.1 as default
   - Consider: Benchmark different thresholds for quality

4. **Storage Location**: `~/.basic-memory/chroma/` matches our planned `chroma_data/` approach

**Potential Collaboration**:
- Their implementation is nearly complete as a fork
- Could potentially merge their work or use as reference implementation
- Their chunking strategy could be valuable addition to our approach

## Relations

- implements [[SPEC-11 Basic Memory API Performance Optimization]]
- relates_to [[Performance Optimizations Documentation]]
- enables [[PostgreSQL Migration]]
- improves_on [[SQLite FTS5 Search]]
- references [[manuelbliemel/basic-memory feature/vector-search fork]]

```

--------------------------------------------------------------------------------
/tests/api/test_knowledge_router.py:
--------------------------------------------------------------------------------

```python
"""Tests for knowledge graph API routes."""

from urllib.parse import quote

import pytest
from httpx import AsyncClient

from basic_memory.schemas import (
    Entity,
    EntityResponse,
)
from basic_memory.schemas.search import SearchItemType, SearchResponse
from basic_memory.utils import normalize_newlines


@pytest.mark.asyncio
async def test_create_entity(client: AsyncClient, file_service, project_url):
    """Should create entity successfully."""

    data = {
        "title": "TestEntity",
        "folder": "test",
        "entity_type": "test",
        "content": "TestContent",
        "project": "Test Project Context",
    }
    # Create an entity
    print(f"Requesting with data: {data}")
    # Use the permalink version of the project name in the path
    response = await client.post(f"{project_url}/knowledge/entities", json=data)
    # Print response for debugging
    print(f"Response status: {response.status_code}")
    print(f"Response content: {response.text}")
    # Verify creation
    assert response.status_code == 200
    entity = EntityResponse.model_validate(response.json())

    assert entity.permalink == "test/test-entity"
    assert entity.file_path == "test/TestEntity.md"
    assert entity.entity_type == data["entity_type"]
    assert entity.content_type == "text/markdown"

    # Verify file has new content but preserved metadata
    file_path = file_service.get_entity_path(entity)
    file_content, _ = await file_service.read_file(file_path)

    assert data["content"] in file_content


@pytest.mark.asyncio
async def test_create_entity_observations_relations(client: AsyncClient, file_service, project_url):
    """Should create entity successfully."""

    data = {
        "title": "TestEntity",
        "folder": "test",
        "content": """
# TestContent

## Observations
- [note] This is notable #tag1 (testing)
- related to [[SomeOtherThing]]
""",
    }
    # Create an entity
    response = await client.post(f"{project_url}/knowledge/entities", json=data)
    # Verify creation
    assert response.status_code == 200
    entity = EntityResponse.model_validate(response.json())

    assert entity.permalink == "test/test-entity"
    assert entity.file_path == "test/TestEntity.md"
    assert entity.entity_type == "note"
    assert entity.content_type == "text/markdown"

    assert len(entity.observations) == 1
    assert entity.observations[0].category == "note"
    assert entity.observations[0].content == "This is notable #tag1"
    assert entity.observations[0].tags == ["tag1"]
    assert entity.observations[0].context == "testing"

    assert len(entity.relations) == 1
    assert entity.relations[0].relation_type == "related to"
    assert entity.relations[0].from_id == "test/test-entity"
    assert entity.relations[0].to_id is None

    # Verify file has new content but preserved metadata
    file_path = file_service.get_entity_path(entity)
    file_content, _ = await file_service.read_file(file_path)

    assert data["content"].strip() in file_content


@pytest.mark.asyncio
async def test_relation_resolution_after_creation(client: AsyncClient, project_url):
    """Test that relation resolution works after creating entities and handles exceptions gracefully."""

    # Create first entity with unresolved relation
    entity1_data = {
        "title": "EntityOne",
        "folder": "test",
        "entity_type": "test",
        "content": "This entity references [[EntityTwo]]",
    }
    response1 = await client.put(
        f"{project_url}/knowledge/entities/test/entity-one", json=entity1_data
    )
    assert response1.status_code == 201
    entity1 = response1.json()

    # Verify relation exists but is unresolved
    assert len(entity1["relations"]) == 1
    assert entity1["relations"][0]["to_id"] is None
    assert entity1["relations"][0]["to_name"] == "EntityTwo"

    # Create the referenced entity
    entity2_data = {
        "title": "EntityTwo",
        "folder": "test",
        "entity_type": "test",
        "content": "This is the referenced entity",
    }
    response2 = await client.put(
        f"{project_url}/knowledge/entities/test/entity-two", json=entity2_data
    )
    assert response2.status_code == 201

    # Verify the original entity's relation was resolved
    response_check = await client.get(f"{project_url}/knowledge/entities/test/entity-one")
    assert response_check.status_code == 200
    updated_entity1 = response_check.json()

    # The relation should now be resolved via the automatic resolution after entity creation
    resolved_relations = [r for r in updated_entity1["relations"] if r["to_id"] is not None]
    assert (
        len(resolved_relations) >= 0
    )  # May or may not be resolved immediately depending on timing


@pytest.mark.asyncio
async def test_get_entity_by_permalink(client: AsyncClient, project_url):
    """Should retrieve an entity by path ID."""
    # First create an entity
    data = {"title": "TestEntity", "folder": "test", "entity_type": "test"}
    response = await client.post(f"{project_url}/knowledge/entities", json=data)
    assert response.status_code == 200
    data = response.json()

    # Now get it by permalink
    permalink = data["permalink"]
    response = await client.get(f"{project_url}/knowledge/entities/{permalink}")

    # Verify retrieval
    assert response.status_code == 200
    entity = response.json()
    assert entity["title"] == "TestEntity"
    assert entity["file_path"] == "test/TestEntity.md"
    assert entity["entity_type"] == "test"
    assert entity["permalink"] == "test/test-entity"


@pytest.mark.asyncio
async def test_get_entity_by_file_path(client: AsyncClient, project_url):
    """Should retrieve an entity by path ID."""
    # First create an entity
    data = {"title": "TestEntity", "folder": "test", "entity_type": "test"}
    response = await client.post(f"{project_url}/knowledge/entities", json=data)
    assert response.status_code == 200
    data = response.json()

    # Now get it by path
    file_path = data["file_path"]
    response = await client.get(f"{project_url}/knowledge/entities/{file_path}")

    # Verify retrieval
    assert response.status_code == 200
    entity = response.json()
    assert entity["title"] == "TestEntity"
    assert entity["file_path"] == "test/TestEntity.md"
    assert entity["entity_type"] == "test"
    assert entity["permalink"] == "test/test-entity"


@pytest.mark.asyncio
async def test_get_entities(client: AsyncClient, project_url):
    """Should open multiple entities by path IDs."""
    # Create a few entities with different names
    await client.post(
        f"{project_url}/knowledge/entities",
        json={"title": "AlphaTest", "folder": "", "entity_type": "test"},
    )
    await client.post(
        f"{project_url}/knowledge/entities",
        json={"title": "BetaTest", "folder": "", "entity_type": "test"},
    )

    # Open nodes by path IDs
    response = await client.get(
        f"{project_url}/knowledge/entities?permalink=alpha-test&permalink=beta-test",
    )

    # Verify results
    assert response.status_code == 200
    data = response.json()
    assert len(data["entities"]) == 2

    entity_0 = data["entities"][0]
    assert entity_0["title"] == "AlphaTest"
    assert entity_0["file_path"] == "AlphaTest.md"
    assert entity_0["entity_type"] == "test"
    assert entity_0["permalink"] == "alpha-test"

    entity_1 = data["entities"][1]
    assert entity_1["title"] == "BetaTest"
    assert entity_1["file_path"] == "BetaTest.md"
    assert entity_1["entity_type"] == "test"
    assert entity_1["permalink"] == "beta-test"


@pytest.mark.asyncio
async def test_delete_entity(client: AsyncClient, project_url):
    """Test DELETE /knowledge/entities with path ID."""
    # Create test entity
    entity_data = {"file_path": "TestEntity", "entity_type": "test"}
    await client.post(f"{project_url}/knowledge/entities", json=entity_data)

    # Test deletion
    response = await client.post(
        f"{project_url}/knowledge/entities/delete", json={"permalinks": ["test-entity"]}
    )
    assert response.status_code == 200
    assert response.json() == {"deleted": True}

    # Verify entity is gone
    permalink = quote("test/TestEntity")
    response = await client.get(f"{project_url}/knowledge/entities/{permalink}")
    assert response.status_code == 404


@pytest.mark.asyncio
async def test_delete_single_entity(client: AsyncClient, project_url):
    """Test DELETE /knowledge/entities with path ID."""
    # Create test entity
    entity_data = {"title": "TestEntity", "folder": "", "entity_type": "test"}
    await client.post(f"{project_url}/knowledge/entities", json=entity_data)

    # Test deletion
    response = await client.delete(f"{project_url}/knowledge/entities/test-entity")
    assert response.status_code == 200
    assert response.json() == {"deleted": True}

    # Verify entity is gone
    permalink = quote("test/TestEntity")
    response = await client.get(f"{project_url}/knowledge/entities/{permalink}")
    assert response.status_code == 404


@pytest.mark.asyncio
async def test_delete_single_entity_by_title(client: AsyncClient, project_url):
    """Test DELETE /knowledge/entities with file path."""
    # Create test entity
    entity_data = {"title": "TestEntity", "folder": "", "entity_type": "test"}
    response = await client.post(f"{project_url}/knowledge/entities", json=entity_data)
    assert response.status_code == 200
    data = response.json()

    # Test deletion
    response = await client.delete(f"{project_url}/knowledge/entities/TestEntity")
    assert response.status_code == 200
    assert response.json() == {"deleted": True}

    # Verify entity is gone
    file_path = quote(data["file_path"])
    response = await client.get(f"{project_url}/knowledge/entities/{file_path}")
    assert response.status_code == 404


@pytest.mark.asyncio
async def test_delete_single_entity_not_found(client: AsyncClient, project_url):
    """Test DELETE /knowledge/entities with path ID."""

    # Test deletion
    response = await client.delete(f"{project_url}/knowledge/entities/test-not-found")
    assert response.status_code == 200
    assert response.json() == {"deleted": False}


@pytest.mark.asyncio
async def test_delete_entity_bulk(client: AsyncClient, project_url):
    """Test bulk entity deletion using path IDs."""
    # Create test entities
    await client.post(
        f"{project_url}/knowledge/entities", json={"file_path": "Entity1", "entity_type": "test"}
    )
    await client.post(
        f"{project_url}/knowledge/entities", json={"file_path": "Entity2", "entity_type": "test"}
    )

    # Test deletion
    response = await client.post(
        f"{project_url}/knowledge/entities/delete", json={"permalinks": ["Entity1", "Entity2"]}
    )
    assert response.status_code == 200
    assert response.json() == {"deleted": True}

    # Verify entities are gone
    for name in ["Entity1", "Entity2"]:
        permalink = quote(f"{name}")
        response = await client.get(f"{project_url}/knowledge/entities/{permalink}")
        assert response.status_code == 404


@pytest.mark.asyncio
async def test_delete_nonexistent_entity(client: AsyncClient, project_url):
    """Test deleting a nonexistent entity by path ID."""
    response = await client.post(
        f"{project_url}/knowledge/entities/delete", json={"permalinks": ["non_existent"]}
    )
    assert response.status_code == 200
    assert response.json() == {"deleted": True}


@pytest.mark.asyncio
async def test_entity_indexing(client: AsyncClient, project_url):
    """Test entity creation includes search indexing."""
    # Create entity
    response = await client.post(
        f"{project_url}/knowledge/entities",
        json={
            "title": "SearchTest",
            "folder": "",
            "entity_type": "test",
            "observations": ["Unique searchable observation"],
        },
    )
    assert response.status_code == 200

    # Verify it's searchable
    search_response = await client.post(
        f"{project_url}/search/",
        json={"text": "search", "entity_types": [SearchItemType.ENTITY.value]},
    )
    assert search_response.status_code == 200
    search_result = SearchResponse.model_validate(search_response.json())
    assert len(search_result.results) == 1
    assert search_result.results[0].permalink == "search-test"
    assert search_result.results[0].type == SearchItemType.ENTITY.value


@pytest.mark.asyncio
async def test_entity_delete_indexing(client: AsyncClient, project_url):
    """Test deleted entities are removed from search index."""

    # Create entity
    response = await client.post(
        f"{project_url}/knowledge/entities",
        json={
            "title": "DeleteTest",
            "folder": "",
            "entity_type": "test",
            "observations": ["Searchable observation that should be removed"],
        },
    )
    assert response.status_code == 200
    entity = response.json()

    # Verify it's initially searchable
    search_response = await client.post(
        f"{project_url}/search/",
        json={"text": "delete", "entity_types": [SearchItemType.ENTITY.value]},
    )
    search_result = SearchResponse.model_validate(search_response.json())
    assert len(search_result.results) == 1

    # Delete entity
    delete_response = await client.post(
        f"{project_url}/knowledge/entities/delete", json={"permalinks": [entity["permalink"]]}
    )
    assert delete_response.status_code == 200

    # Verify it's no longer searchable
    search_response = await client.post(
        f"{project_url}/search/", json={"text": "delete", "types": [SearchItemType.ENTITY.value]}
    )
    search_result = SearchResponse.model_validate(search_response.json())
    assert len(search_result.results) == 0


@pytest.mark.asyncio
async def test_update_entity_basic(client: AsyncClient, project_url):
    """Test basic entity field updates."""
    # Create initial entity
    response = await client.post(
        f"{project_url}/knowledge/entities",
        json={
            "title": "test",
            "folder": "",
            "entity_type": "test",
            "content": "Initial summary",
            "entity_metadata": {"status": "draft"},
        },
    )
    entity_response = response.json()

    # Update fields
    entity = Entity(**entity_response, folder="")
    entity.entity_metadata["status"] = "final"
    entity.content = "Updated summary"

    response = await client.put(
        f"{project_url}/knowledge/entities/{entity.permalink}", json=entity.model_dump()
    )
    assert response.status_code == 200
    updated = response.json()

    # Verify updates
    assert updated["entity_metadata"]["status"] == "final"  # Preserved

    response = await client.get(f"{project_url}/resource/{updated['permalink']}?content=true")

    # raw markdown content
    fetched = response.text
    assert "Updated summary" in fetched


@pytest.mark.asyncio
async def test_update_entity_content(client: AsyncClient, project_url):
    """Test updating content for different entity types."""
    # Create a note entity
    response = await client.post(
        f"{project_url}/knowledge/entities",
        json={"title": "test-note", "folder": "", "entity_type": "note", "summary": "Test note"},
    )
    note = response.json()

    # Update fields
    entity = Entity(**note, folder="")
    entity.content = "# Updated Note\n\nNew content."

    response = await client.put(
        f"{project_url}/knowledge/entities/{note['permalink']}", json=entity.model_dump()
    )
    assert response.status_code == 200
    updated = response.json()

    # Verify through get request to check file
    response = await client.get(f"{project_url}/resource/{updated['permalink']}?content=true")

    # raw markdown content
    fetched = response.text
    assert "# Updated Note" in fetched
    assert "New content" in fetched


@pytest.mark.asyncio
async def test_update_entity_type_conversion(client: AsyncClient, project_url):
    """Test converting between note and knowledge types."""
    # Create a note
    note_data = {
        "title": "test-note",
        "folder": "",
        "entity_type": "note",
        "summary": "Test note",
        "content": "# Test Note\n\nInitial content.",
    }
    response = await client.post(f"{project_url}/knowledge/entities", json=note_data)
    note = response.json()

    # Update fields
    entity = Entity(**note, folder="")
    entity.entity_type = "test"

    response = await client.put(
        f"{project_url}/knowledge/entities/{note['permalink']}", json=entity.model_dump()
    )
    assert response.status_code == 200
    updated = response.json()

    # Verify conversion
    assert updated["entity_type"] == "test"

    # Get latest to verify file format
    response = await client.get(f"{project_url}/knowledge/entities/{updated['permalink']}")
    knowledge = response.json()
    assert knowledge.get("content") is None


@pytest.mark.asyncio
async def test_update_entity_metadata(client: AsyncClient, project_url):
    """Test updating entity metadata."""
    # Create entity
    data = {
        "title": "test",
        "folder": "",
        "entity_type": "test",
        "entity_metadata": {"status": "draft"},
    }
    response = await client.post(f"{project_url}/knowledge/entities", json=data)
    entity_response = response.json()

    # Update fields
    entity = Entity(**entity_response, folder="")
    entity.entity_metadata["status"] = "final"
    entity.entity_metadata["reviewed"] = True

    # Update metadata
    response = await client.put(
        f"{project_url}/knowledge/entities/{entity.permalink}", json=entity.model_dump()
    )
    assert response.status_code == 200
    updated = response.json()

    # Verify metadata was merged, not replaced
    assert updated["entity_metadata"]["status"] == "final"
    assert updated["entity_metadata"]["reviewed"] in (True, "True")


@pytest.mark.asyncio
async def test_update_entity_not_found_does_create(client: AsyncClient, project_url):
    """Test updating non-existent entity does a create"""

    data = {
        "title": "nonexistent",
        "folder": "",
        "entity_type": "test",
        "observations": ["First observation", "Second observation"],
    }
    entity = Entity(**data)
    response = await client.put(
        f"{project_url}/knowledge/entities/nonexistent", json=entity.model_dump()
    )
    assert response.status_code == 201


@pytest.mark.asyncio
async def test_update_entity_incorrect_permalink(client: AsyncClient, project_url):
    """Test updating non-existent entity does a create"""

    data = {
        "title": "Test Entity",
        "folder": "",
        "entity_type": "test",
        "observations": ["First observation", "Second observation"],
    }
    entity = Entity(**data)
    response = await client.put(
        f"{project_url}/knowledge/entities/nonexistent", json=entity.model_dump()
    )
    assert response.status_code == 400


@pytest.mark.asyncio
async def test_update_entity_search_index(client: AsyncClient, project_url):
    """Test search index is updated after entity changes."""
    # Create entity
    data = {
        "title": "test",
        "folder": "",
        "entity_type": "test",
        "content": "Initial searchable content",
    }
    response = await client.post(f"{project_url}/knowledge/entities", json=data)
    entity_response = response.json()

    # Update fields
    entity = Entity(**entity_response, folder="")
    entity.content = "Updated with unique sphinx marker"

    response = await client.put(
        f"{project_url}/knowledge/entities/{entity.permalink}", json=entity.model_dump()
    )
    assert response.status_code == 200

    # Search should find new content
    search_response = await client.post(
        f"{project_url}/search/",
        json={"text": "sphinx marker", "entity_types": [SearchItemType.ENTITY.value]},
    )
    results = search_response.json()["results"]
    assert len(results) == 1
    assert results[0]["permalink"] == entity.permalink


# PATCH edit entity endpoint tests


@pytest.mark.asyncio
async def test_edit_entity_append(client: AsyncClient, project_url):
    """Test appending content to an entity via PATCH endpoint."""
    # Create test entity
    response = await client.post(
        f"{project_url}/knowledge/entities",
        json={
            "title": "Test Note",
            "folder": "test",
            "entity_type": "note",
            "content": "Original content",
        },
    )
    assert response.status_code == 200
    entity = response.json()

    # Edit entity with append operation
    response = await client.patch(
        f"{project_url}/knowledge/entities/{entity['permalink']}",
        json={"operation": "append", "content": "Appended content"},
    )
    if response.status_code != 200:
        print(f"PATCH failed with status {response.status_code}")
        print(f"Response content: {response.text}")
    assert response.status_code == 200
    updated = response.json()

    # Verify content was appended by reading the file
    response = await client.get(f"{project_url}/resource/{updated['permalink']}?content=true")
    file_content = response.text
    assert "Original content" in file_content
    assert "Appended content" in file_content
    assert file_content.index("Original content") < file_content.index("Appended content")


@pytest.mark.asyncio
async def test_edit_entity_prepend(client: AsyncClient, project_url):
    """Test prepending content to an entity via PATCH endpoint."""
    # Create test entity
    response = await client.post(
        f"{project_url}/knowledge/entities",
        json={
            "title": "Test Note",
            "folder": "test",
            "entity_type": "note",
            "content": "Original content",
        },
    )
    assert response.status_code == 200
    entity = response.json()

    # Edit entity with prepend operation
    response = await client.patch(
        f"{project_url}/knowledge/entities/{entity['permalink']}",
        json={"operation": "prepend", "content": "Prepended content"},
    )
    if response.status_code != 200:
        print(f"PATCH prepend failed with status {response.status_code}")
        print(f"Response content: {response.text}")
    assert response.status_code == 200
    updated = response.json()

    # Verify the entire file content structure
    response = await client.get(f"{project_url}/resource/{updated['permalink']}?content=true")
    file_content = response.text

    # Expected content with frontmatter preserved and content prepended to body
    expected_content = normalize_newlines("""---
title: Test Note
type: note
permalink: test/test-note
---

Prepended content
Original content""")

    assert file_content.strip() == expected_content.strip()


@pytest.mark.asyncio
async def test_edit_entity_find_replace(client: AsyncClient, project_url):
    """Test find and replace operation via PATCH endpoint."""
    # Create test entity
    response = await client.post(
        f"{project_url}/knowledge/entities",
        json={
            "title": "Test Note",
            "folder": "test",
            "entity_type": "note",
            "content": "This is old content that needs updating",
        },
    )
    assert response.status_code == 200
    entity = response.json()

    # Edit entity with find_replace operation
    response = await client.patch(
        f"{project_url}/knowledge/entities/{entity['permalink']}",
        json={"operation": "find_replace", "content": "new content", "find_text": "old content"},
    )
    assert response.status_code == 200
    updated = response.json()

    # Verify content was replaced
    response = await client.get(f"{project_url}/resource/{updated['permalink']}?content=true")
    file_content = response.text
    assert "old content" not in file_content
    assert "This is new content that needs updating" in file_content


@pytest.mark.asyncio
async def test_edit_entity_find_replace_with_expected_replacements(
    client: AsyncClient, project_url
):
    """Test find and replace with expected_replacements parameter."""
    # Create test entity with repeated text
    response = await client.post(
        f"{project_url}/knowledge/entities",
        json={
            "title": "Sample Note",
            "folder": "docs",
            "entity_type": "note",
            "content": "The word banana appears here. Another banana word here.",
        },
    )
    assert response.status_code == 200
    entity = response.json()

    # Edit entity with find_replace operation, expecting 2 replacements
    response = await client.patch(
        f"{project_url}/knowledge/entities/{entity['permalink']}",
        json={
            "operation": "find_replace",
            "content": "apple",
            "find_text": "banana",
            "expected_replacements": 2,
        },
    )
    assert response.status_code == 200
    updated = response.json()

    # Verify both instances were replaced
    response = await client.get(f"{project_url}/resource/{updated['permalink']}?content=true")
    file_content = response.text
    assert "The word apple appears here. Another apple word here." in file_content


@pytest.mark.asyncio
async def test_edit_entity_replace_section(client: AsyncClient, project_url):
    """Test replacing a section via PATCH endpoint."""
    # Create test entity with sections
    content = """# Main Title

## Section 1
Original section 1 content

## Section 2
Original section 2 content"""

    response = await client.post(
        f"{project_url}/knowledge/entities",
        json={
            "title": "Sample Note",
            "folder": "docs",
            "entity_type": "note",
            "content": content,
        },
    )
    assert response.status_code == 200
    entity = response.json()

    # Edit entity with replace_section operation
    response = await client.patch(
        f"{project_url}/knowledge/entities/{entity['permalink']}",
        json={
            "operation": "replace_section",
            "content": "New section 1 content",
            "section": "## Section 1",
        },
    )
    assert response.status_code == 200
    updated = response.json()

    # Verify section was replaced
    response = await client.get(f"{project_url}/resource/{updated['permalink']}?content=true")
    file_content = response.text
    assert "New section 1 content" in file_content
    assert "Original section 1 content" not in file_content
    assert "Original section 2 content" in file_content  # Other sections preserved


@pytest.mark.asyncio
async def test_edit_entity_not_found(client: AsyncClient, project_url):
    """Test editing a non-existent entity returns 400."""
    response = await client.patch(
        f"{project_url}/knowledge/entities/non-existent",
        json={"operation": "append", "content": "content"},
    )
    assert response.status_code == 400
    assert "Entity not found" in response.json()["detail"]


@pytest.mark.asyncio
async def test_edit_entity_invalid_operation(client: AsyncClient, project_url):
    """Test editing with invalid operation returns 400."""
    # Create test entity
    response = await client.post(
        f"{project_url}/knowledge/entities",
        json={
            "title": "Test Note",
            "folder": "test",
            "entity_type": "note",
            "content": "Original content",
        },
    )
    assert response.status_code == 200
    entity = response.json()

    # Try invalid operation
    response = await client.patch(
        f"{project_url}/knowledge/entities/{entity['permalink']}",
        json={"operation": "invalid_operation", "content": "content"},
    )
    assert response.status_code == 422
    assert "invalid_operation" in response.json()["detail"][0]["input"]


@pytest.mark.asyncio
async def test_edit_entity_find_replace_missing_find_text(client: AsyncClient, project_url):
    """Test find_replace without find_text returns 400."""
    # Create test entity
    response = await client.post(
        f"{project_url}/knowledge/entities",
        json={
            "title": "Test Note",
            "folder": "test",
            "entity_type": "note",
            "content": "Original content",
        },
    )
    assert response.status_code == 200
    entity = response.json()

    # Try find_replace without find_text
    response = await client.patch(
        f"{project_url}/knowledge/entities/{entity['permalink']}",
        json={"operation": "find_replace", "content": "new content"},
    )
    assert response.status_code == 400
    assert "find_text is required" in response.json()["detail"]


@pytest.mark.asyncio
async def test_edit_entity_replace_section_missing_section(client: AsyncClient, project_url):
    """Test replace_section without section parameter returns 400."""
    # Create test entity
    response = await client.post(
        f"{project_url}/knowledge/entities",
        json={
            "title": "Test Note",
            "folder": "test",
            "entity_type": "note",
            "content": "Original content",
        },
    )
    assert response.status_code == 200
    entity = response.json()

    # Try replace_section without section
    response = await client.patch(
        f"{project_url}/knowledge/entities/{entity['permalink']}",
        json={"operation": "replace_section", "content": "new content"},
    )
    assert response.status_code == 400
    assert "section is required" in response.json()["detail"]


@pytest.mark.asyncio
async def test_edit_entity_find_replace_not_found(client: AsyncClient, project_url):
    """Test find_replace when text is not found returns 400."""
    # Create test entity
    response = await client.post(
        f"{project_url}/knowledge/entities",
        json={
            "title": "Test Note",
            "folder": "test",
            "entity_type": "note",
            "content": "This is some content",
        },
    )
    assert response.status_code == 200
    entity = response.json()

    # Try to replace text that doesn't exist
    response = await client.patch(
        f"{project_url}/knowledge/entities/{entity['permalink']}",
        json={"operation": "find_replace", "content": "new content", "find_text": "nonexistent"},
    )
    assert response.status_code == 400
    assert "Text to replace not found" in response.json()["detail"]


@pytest.mark.asyncio
async def test_edit_entity_find_replace_wrong_expected_count(client: AsyncClient, project_url):
    """Test find_replace with wrong expected_replacements count returns 400."""
    # Create test entity with repeated text
    response = await client.post(
        f"{project_url}/knowledge/entities",
        json={
            "title": "Sample Note",
            "folder": "docs",
            "entity_type": "note",
            "content": "The word banana appears here. Another banana word here.",
        },
    )
    assert response.status_code == 200
    entity = response.json()

    # Try to replace with wrong expected count
    response = await client.patch(
        f"{project_url}/knowledge/entities/{entity['permalink']}",
        json={
            "operation": "find_replace",
            "content": "replacement",
            "find_text": "banana",
            "expected_replacements": 1,  # Wrong - there are actually 2
        },
    )
    assert response.status_code == 400
    assert "Expected 1 occurrences" in response.json()["detail"]
    assert "but found 2" in response.json()["detail"]


@pytest.mark.asyncio
async def test_edit_entity_search_reindex(client: AsyncClient, project_url):
    """Test that edited entities are reindexed for search."""
    # Create test entity
    response = await client.post(
        f"{project_url}/knowledge/entities",
        json={
            "title": "Search Test",
            "folder": "test",
            "entity_type": "note",
            "content": "Original searchable content",
        },
    )
    assert response.status_code == 200
    entity = response.json()

    # Edit the entity
    response = await client.patch(
        f"{project_url}/knowledge/entities/{entity['permalink']}",
        json={"operation": "append", "content": " with unique zebra marker"},
    )
    assert response.status_code == 200

    # Search should find the new content
    search_response = await client.post(
        f"{project_url}/search/",
        json={"text": "zebra marker", "entity_types": ["entity"]},
    )
    results = search_response.json()["results"]
    assert len(results) == 1
    assert results[0]["permalink"] == entity["permalink"]


# Move entity endpoint tests


@pytest.mark.asyncio
async def test_move_entity_success(client: AsyncClient, project_url):
    """Test successfully moving an entity to a new location."""
    # Create test entity
    response = await client.post(
        f"{project_url}/knowledge/entities",
        json={
            "title": "TestNote",
            "folder": "source",
            "entity_type": "note",
            "content": "Test content",
        },
    )
    assert response.status_code == 200
    entity = response.json()
    original_permalink = entity["permalink"]

    # Move entity
    move_data = {
        "identifier": original_permalink,
        "destination_path": "target/MovedNote.md",
    }
    response = await client.post(f"{project_url}/knowledge/move", json=move_data)
    assert response.status_code == 200
    response_model = EntityResponse.model_validate(response.json())
    assert response_model.file_path == "target/MovedNote.md"

    # Verify original entity no longer exists
    response = await client.get(f"{project_url}/knowledge/entities/{original_permalink}")
    assert response.status_code == 404

    # Verify entity exists at new location
    response = await client.get(f"{project_url}/knowledge/entities/target/moved-note")
    assert response.status_code == 200
    moved_entity = response.json()
    assert moved_entity["file_path"] == "target/MovedNote.md"
    assert moved_entity["permalink"] == "target/moved-note"

    # Verify file content using resource endpoint
    response = await client.get(f"{project_url}/resource/target/moved-note?content=true")
    assert response.status_code == 200
    file_content = response.text
    assert "Test content" in file_content


@pytest.mark.asyncio
async def test_move_entity_with_folder_creation(client: AsyncClient, project_url):
    """Test moving entity creates necessary folders."""
    # Create test entity
    response = await client.post(
        f"{project_url}/knowledge/entities",
        json={
            "title": "TestNote",
            "folder": "",
            "entity_type": "note",
            "content": "Test content",
        },
    )
    assert response.status_code == 200
    entity = response.json()

    # Move to deeply nested path
    move_data = {
        "identifier": entity["permalink"],
        "destination_path": "deeply/nested/folder/MovedNote.md",
    }
    response = await client.post(f"{project_url}/knowledge/move", json=move_data)
    assert response.status_code == 200

    # Verify entity exists at new location
    response = await client.get(f"{project_url}/knowledge/entities/deeply/nested/folder/moved-note")
    assert response.status_code == 200
    moved_entity = response.json()
    assert moved_entity["file_path"] == "deeply/nested/folder/MovedNote.md"


@pytest.mark.asyncio
async def test_move_entity_with_observations_and_relations(client: AsyncClient, project_url):
    """Test moving entity preserves observations and relations."""
    # Create test entity with complex content
    content = """# Complex Entity

## Observations
- [note] Important observation #tag1
- [feature] Key feature #feature
- relation to [[SomeOtherEntity]]
- depends on [[Dependency]]

Some additional content."""

    response = await client.post(
        f"{project_url}/knowledge/entities",
        json={
            "title": "ComplexEntity",
            "folder": "source",
            "entity_type": "note",
            "content": content,
        },
    )
    assert response.status_code == 200
    entity = response.json()

    # Verify original observations and relations
    assert len(entity["observations"]) == 2
    assert len(entity["relations"]) == 2

    # Move entity
    move_data = {
        "identifier": entity["permalink"],
        "destination_path": "target/MovedComplex.md",
    }
    response = await client.post(f"{project_url}/knowledge/move", json=move_data)
    assert response.status_code == 200

    # Verify moved entity preserves data
    response = await client.get(f"{project_url}/knowledge/entities/target/moved-complex")
    assert response.status_code == 200
    moved_entity = response.json()

    # Check observations preserved
    assert len(moved_entity["observations"]) == 2
    obs_categories = {obs["category"] for obs in moved_entity["observations"]}
    assert obs_categories == {"note", "feature"}

    # Check relations preserved
    assert len(moved_entity["relations"]) == 2
    rel_types = {rel["relation_type"] for rel in moved_entity["relations"]}
    assert rel_types == {"relation to", "depends on"}

    # Verify file content preserved
    response = await client.get(f"{project_url}/resource/target/moved-complex?content=true")
    assert response.status_code == 200
    file_content = response.text
    assert "Important observation #tag1" in file_content
    assert "[[SomeOtherEntity]]" in file_content


@pytest.mark.asyncio
async def test_move_entity_search_reindexing(client: AsyncClient, project_url):
    """Test that moved entities are properly reindexed for search."""
    # Create searchable entity
    response = await client.post(
        f"{project_url}/knowledge/entities",
        json={
            "title": "SearchableNote",
            "folder": "source",
            "entity_type": "note",
            "content": "Unique searchable elephant content",
        },
    )
    assert response.status_code == 200
    entity = response.json()

    # Move entity
    move_data = {
        "identifier": entity["permalink"],
        "destination_path": "target/MovedSearchable.md",
    }
    response = await client.post(f"{project_url}/knowledge/move", json=move_data)
    assert response.status_code == 200

    # Search should find entity at new location
    search_response = await client.post(
        f"{project_url}/search/",
        json={"text": "elephant", "entity_types": [SearchItemType.ENTITY.value]},
    )
    results = search_response.json()["results"]
    assert len(results) == 1
    assert results[0]["permalink"] == "target/moved-searchable"


@pytest.mark.asyncio
async def test_move_entity_not_found(client: AsyncClient, project_url):
    """Test moving non-existent entity returns 400 error."""
    move_data = {
        "identifier": "non-existent-entity",
        "destination_path": "target/SomeFile.md",
    }
    response = await client.post(f"{project_url}/knowledge/move", json=move_data)
    assert response.status_code == 400
    assert "Entity not found" in response.json()["detail"]


@pytest.mark.asyncio
async def test_move_entity_invalid_destination_path(client: AsyncClient, project_url):
    """Test moving entity with invalid destination path."""
    # Create test entity
    response = await client.post(
        f"{project_url}/knowledge/entities",
        json={
            "title": "TestNote",
            "folder": "",
            "entity_type": "note",
            "content": "Test content",
        },
    )
    assert response.status_code == 200
    entity = response.json()

    # Test various invalid paths
    invalid_paths = [
        "/absolute/path.md",  # Absolute path
        "../parent/path.md",  # Parent directory
        "",  # Empty string
        "   ",  # Whitespace only
    ]

    for invalid_path in invalid_paths:
        move_data = {
            "identifier": entity["permalink"],
            "destination_path": invalid_path,
        }
        response = await client.post(f"{project_url}/knowledge/move", json=move_data)
        assert response.status_code == 422  # Validation error


@pytest.mark.asyncio
async def test_move_entity_destination_exists(client: AsyncClient, project_url):
    """Test moving entity to existing destination returns error."""
    # Create source entity
    response = await client.post(
        f"{project_url}/knowledge/entities",
        json={
            "title": "SourceNote",
            "folder": "source",
            "entity_type": "note",
            "content": "Source content",
        },
    )
    assert response.status_code == 200
    source_entity = response.json()

    # Create destination entity
    response = await client.post(
        f"{project_url}/knowledge/entities",
        json={
            "title": "DestinationNote",
            "folder": "target",
            "entity_type": "note",
            "content": "Destination content",
        },
    )
    assert response.status_code == 200

    # Try to move source to existing destination
    move_data = {
        "identifier": source_entity["permalink"],
        "destination_path": "target/DestinationNote.md",
    }
    response = await client.post(f"{project_url}/knowledge/move", json=move_data)
    assert response.status_code == 400
    assert "already exists" in response.json()["detail"]


@pytest.mark.asyncio
async def test_move_entity_missing_identifier(client: AsyncClient, project_url):
    """Test move request with missing identifier."""
    move_data = {
        "destination_path": "target/SomeFile.md",
    }
    response = await client.post(f"{project_url}/knowledge/move", json=move_data)
    assert response.status_code == 422  # Validation error


@pytest.mark.asyncio
async def test_move_entity_missing_destination(client: AsyncClient, project_url):
    """Test move request with missing destination path."""
    move_data = {
        "identifier": "some-entity",
    }
    response = await client.post(f"{project_url}/knowledge/move", json=move_data)
    assert response.status_code == 422  # Validation error


@pytest.mark.asyncio
async def test_move_entity_by_file_path(client: AsyncClient, project_url):
    """Test moving entity using file path as identifier."""
    # Create test entity
    response = await client.post(
        f"{project_url}/knowledge/entities",
        json={
            "title": "TestNote",
            "folder": "source",
            "entity_type": "note",
            "content": "Test content",
        },
    )
    assert response.status_code == 200
    entity = response.json()

    # Move using file path as identifier
    move_data = {
        "identifier": entity["file_path"],
        "destination_path": "target/MovedByPath.md",
    }
    response = await client.post(f"{project_url}/knowledge/move", json=move_data)
    assert response.status_code == 200

    # Verify entity exists at new location
    response = await client.get(f"{project_url}/knowledge/entities/target/moved-by-path")
    assert response.status_code == 200
    moved_entity = response.json()
    assert moved_entity["file_path"] == "target/MovedByPath.md"


@pytest.mark.asyncio
async def test_move_entity_by_title(client: AsyncClient, project_url):
    """Test moving entity using title as identifier."""
    # Create test entity with unique title
    response = await client.post(
        f"{project_url}/knowledge/entities",
        json={
            "title": "UniqueTestTitle",
            "folder": "source",
            "entity_type": "note",
            "content": "Test content",
        },
    )
    assert response.status_code == 200

    # Move using title as identifier
    move_data = {
        "identifier": "UniqueTestTitle",
        "destination_path": "target/MovedByTitle.md",
    }
    response = await client.post(f"{project_url}/knowledge/move", json=move_data)
    assert response.status_code == 200

    # Verify entity exists at new location
    response = await client.get(f"{project_url}/knowledge/entities/target/moved-by-title")
    assert response.status_code == 200
    moved_entity = response.json()
    assert moved_entity["file_path"] == "target/MovedByTitle.md"
    assert moved_entity["title"] == "UniqueTestTitle"

```

--------------------------------------------------------------------------------
/tests/sync/test_sync_service.py:
--------------------------------------------------------------------------------

```python
"""Test general sync behavior."""

import asyncio
import os
from datetime import datetime, timezone
from pathlib import Path
from textwrap import dedent

import pytest

from basic_memory.config import ProjectConfig, BasicMemoryConfig
from basic_memory.models import Entity
from basic_memory.repository import EntityRepository
from basic_memory.schemas.search import SearchQuery
from basic_memory.services import EntityService, FileService
from basic_memory.services.search_service import SearchService
from basic_memory.sync.sync_service import SyncService


async def create_test_file(path: Path, content: str = "test content") -> None:
    """Create a test file with given content."""
    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(content)


async def touch_file(path: Path) -> None:
    """Touch a file to update its mtime (for watermark testing)."""
    import time

    # Read and rewrite to update mtime
    content = path.read_text()
    time.sleep(0.5)  # Ensure mtime changes and is newer than watermark (500ms)
    path.write_text(content)


async def force_full_scan(sync_service: SyncService) -> None:
    """Force next sync to do a full scan by clearing watermark (for testing moves/deletions)."""
    if sync_service.entity_repository.project_id is not None:
        project = await sync_service.project_repository.find_by_id(
            sync_service.entity_repository.project_id
        )
        if project:
            await sync_service.project_repository.update(
                project.id,
                {
                    "last_scan_timestamp": None,
                    "last_file_count": None,
                },
            )


@pytest.mark.asyncio
async def test_forward_reference_resolution(
    sync_service: SyncService,
    project_config: ProjectConfig,
    entity_service: EntityService,
):
    """Test that forward references get resolved when target file is created."""
    project_dir = project_config.home

    # First create a file with a forward reference
    source_content = """
---
type: knowledge
---
# Source Document

## Relations
- depends_on [[target-doc]]
- depends_on [[target-doc]] # duplicate
"""
    await create_test_file(project_dir / "source.md", source_content)

    # Initial sync - should create forward reference
    await sync_service.sync(project_config.home)

    # Verify forward reference
    source = await entity_service.get_by_permalink("source")
    assert len(source.relations) == 1
    assert source.relations[0].to_id is None
    assert source.relations[0].to_name == "target-doc"

    # Now create the target file
    target_content = """
---
type: knowledge
---
# Target Doc
Target content
"""
    target_file = project_dir / "target_doc.md"
    await create_test_file(target_file, target_content)

    # Force full scan to ensure the new file is detected
    # Incremental scans have timing precision issues with watermarks on some filesystems
    await force_full_scan(sync_service)

    # Sync again - should resolve the reference
    await sync_service.sync(project_config.home)

    # Verify reference is now resolved
    source = await entity_service.get_by_permalink("source")
    target = await entity_service.get_by_permalink("target-doc")
    assert len(source.relations) == 1
    assert source.relations[0].to_id == target.id
    assert source.relations[0].to_name == target.title


@pytest.mark.asyncio
async def test_resolve_relations_deletes_duplicate_unresolved_relation(
    sync_service: SyncService,
    project_config: ProjectConfig,
    entity_service: EntityService,
):
    """Test that resolve_relations deletes duplicate unresolved relations on IntegrityError.

    When resolving a forward reference would create a duplicate (from_id, to_id, relation_type),
    the unresolved relation should be deleted since a resolved version already exists.
    """
    from basic_memory.models import Relation

    project_dir = project_config.home

    # Create source entity
    source_content = """
---
type: knowledge
---
# Source Entity
Content
"""
    await create_test_file(project_dir / "source.md", source_content)

    # Create target entity
    target_content = """
---
type: knowledge
title: Target Entity
---
# Target Entity
Content
"""
    await create_test_file(project_dir / "target.md", target_content)

    # Sync to create both entities
    await sync_service.sync(project_config.home)

    source = await entity_service.get_by_permalink("source")
    target = await entity_service.get_by_permalink("target")

    # Create a resolved relation (already exists) that the unresolved one would become.
    resolved_relation = Relation(
        from_id=source.id,
        to_id=target.id,
        to_name=target.title,
        relation_type="relates_to",
    )
    await sync_service.relation_repository.add(resolved_relation)

    # Create an unresolved relation that will resolve to target
    unresolved_relation = Relation(
        from_id=source.id,
        to_id=None,  # Unresolved
        to_name="target",  # Will resolve to target entity
        relation_type="relates_to",
    )
    await sync_service.relation_repository.add(unresolved_relation)
    unresolved_id = unresolved_relation.id

    # Verify we have the unresolved relation
    source = await entity_service.get_by_permalink("source")
    unresolved_outgoing = [r for r in source.outgoing_relations if r.to_id is None]
    assert len(unresolved_outgoing) == 1
    assert unresolved_outgoing[0].id == unresolved_id
    assert unresolved_outgoing[0].to_name == "target"

    # Call resolve_relations - should hit a real IntegrityError (unique constraint) and delete
    # the duplicate unresolved relation.
    await sync_service.resolve_relations()

    # Verify the unresolved relation was deleted
    deleted = await sync_service.relation_repository.find_by_id(unresolved_id)
    assert deleted is None

    # Verify no unresolved relations remain
    unresolved = await sync_service.relation_repository.find_unresolved_relations()
    assert len(unresolved) == 0

    # Verify only the resolved relation remains
    source = await entity_service.get_by_permalink("source")
    assert len(source.outgoing_relations) == 1
    assert source.outgoing_relations[0].to_id == target.id


@pytest.mark.asyncio
async def test_sync(
    sync_service: SyncService, project_config: ProjectConfig, entity_service: EntityService
):
    """Test basic knowledge sync functionality."""
    # Create test files
    project_dir = project_config.home

    # New entity with relation
    new_content = """
---
type: knowledge
permalink: concept/test-concept
created: 2023-01-01
modified: 2023-01-01
---
# Test Concept

A test concept.

## Observations
- [design] Core feature

## Relations
- depends_on [[concept/other]]
"""
    await create_test_file(project_dir / "concept/test_concept.md", new_content)

    # Create related entity in DB that will be deleted
    # because file was not found
    other = Entity(
        permalink="concept/other",
        title="Other",
        entity_type="test",
        file_path="concept/other.md",
        checksum="12345678",
        content_type="text/markdown",
        created_at=datetime.now(timezone.utc),
        updated_at=datetime.now(timezone.utc),
    )
    await entity_service.repository.add(other)

    # Run sync
    await sync_service.sync(project_config.home)

    # Verify results
    entities = await entity_service.repository.find_all()
    assert len(entities) == 1

    # Find new entity
    test_concept = next(e for e in entities if e.permalink == "concept/test-concept")
    assert test_concept.entity_type == "knowledge"

    # Verify relation was created
    # with forward link
    entity = await entity_service.get_by_permalink(test_concept.permalink)
    relations = entity.relations
    assert len(relations) == 1, "Expected 1 relation for entity"
    assert relations[0].to_name == "concept/other"


@pytest.mark.asyncio
async def test_sync_hidden_file(
    sync_service: SyncService, project_config: ProjectConfig, entity_service: EntityService
):
    """Test basic knowledge sync functionality."""
    # Create test files
    project_dir = project_config.home

    # hidden file
    await create_test_file(project_dir / "concept/.hidden.md", "hidden")

    # Run sync
    await sync_service.sync(project_config.home)

    # Verify results
    entities = await entity_service.repository.find_all()
    assert len(entities) == 0


@pytest.mark.asyncio
async def test_sync_entity_with_nonexistent_relations(
    sync_service: SyncService, project_config: ProjectConfig
):
    """Test syncing an entity that references nonexistent entities."""
    project_dir = project_config.home

    # Create entity that references entities we haven't created yet
    content = """
---
type: knowledge
permalink: concept/depends-on-future
created: 2024-01-01
modified: 2024-01-01
---
# Test Dependencies

## Observations
- [design] Testing future dependencies

## Relations
- depends_on [[concept/not_created_yet]]
- uses [[concept/also_future]]
"""
    await create_test_file(project_dir / "concept/depends_on_future.md", content)

    # Sync
    await sync_service.sync(project_config.home)

    # Verify entity created but no relations
    entity = await sync_service.entity_service.repository.get_by_permalink(
        "concept/depends-on-future"
    )
    assert entity is not None
    assert len(entity.relations) == 2
    assert entity.relations[0].to_name == "concept/not_created_yet"
    assert entity.relations[1].to_name == "concept/also_future"


@pytest.mark.asyncio
async def test_sync_entity_circular_relations(
    sync_service: SyncService, project_config: ProjectConfig
):
    """Test syncing entities with circular dependencies."""
    project_dir = project_config.home

    # Create entity A that depends on B
    content_a = """
---
type: knowledge
permalink: concept/entity-a
created: 2024-01-01
modified: 2024-01-01
---
# Entity A

## Observations
- First entity in circular reference

## Relations
- depends_on [[concept/entity-b]]
"""
    await create_test_file(project_dir / "concept/entity_a.md", content_a)

    # Create entity B that depends on A
    content_b = """
---
type: knowledge
permalink: concept/entity-b
created: 2024-01-01
modified: 2024-01-01
---
# Entity B

## Observations
- Second entity in circular reference

## Relations
- depends_on [[concept/entity-a]]
"""
    await create_test_file(project_dir / "concept/entity_b.md", content_b)

    # Sync
    await sync_service.sync(project_config.home)

    # Verify both entities and their relations
    entity_a = await sync_service.entity_service.repository.get_by_permalink("concept/entity-a")
    entity_b = await sync_service.entity_service.repository.get_by_permalink("concept/entity-b")

    # outgoing relations
    assert len(entity_a.outgoing_relations) == 1
    assert len(entity_b.outgoing_relations) == 1

    # incoming relations
    assert len(entity_a.incoming_relations) == 1
    assert len(entity_b.incoming_relations) == 1

    # all relations
    assert len(entity_a.relations) == 2
    assert len(entity_b.relations) == 2

    # Verify circular reference works
    a_relation = entity_a.outgoing_relations[0]
    assert a_relation.to_id == entity_b.id

    b_relation = entity_b.outgoing_relations[0]
    assert b_relation.to_id == entity_a.id


@pytest.mark.asyncio
async def test_sync_entity_duplicate_relations(
    sync_service: SyncService, project_config: ProjectConfig
):
    """Test handling of duplicate relations in an entity."""
    project_dir = project_config.home

    # Create target entity first
    target_content = """
---
type: knowledge
permalink: concept/target
created: 2024-01-01
modified: 2024-01-01
---
# Target Entity

## Observations
- something to observe

"""
    await create_test_file(project_dir / "concept/target.md", target_content)

    # Create entity with duplicate relations
    content = """
---
type: knowledge
permalink: concept/duplicate-relations
created: 2024-01-01
modified: 2024-01-01
---
# Test Duplicates

## Observations
- this has a lot of relations

## Relations
- depends_on [[concept/target]]
- depends_on [[concept/target]]  # Duplicate
- uses [[concept/target]]  # Different relation type
- uses [[concept/target]]  # Duplicate of different type
"""
    await create_test_file(project_dir / "concept/duplicate_relations.md", content)

    # Sync
    await sync_service.sync(project_config.home)

    # Verify duplicates are handled
    entity = await sync_service.entity_service.repository.get_by_permalink(
        "concept/duplicate-relations"
    )

    # Count relations by type
    relation_counts = {}
    for rel in entity.relations:
        relation_counts[rel.relation_type] = relation_counts.get(rel.relation_type, 0) + 1

    # Should only have one of each type
    assert relation_counts["depends_on"] == 1
    assert relation_counts["uses"] == 1


@pytest.mark.asyncio
async def test_sync_entity_with_random_categories(
    sync_service: SyncService, project_config: ProjectConfig
):
    """Test handling of random observation categories."""
    project_dir = project_config.home

    content = """
---
type: knowledge
permalink: concept/invalid-category
created: 2024-01-01
modified: 2024-01-01
---
# Test Categories

## Observations
- [random category] This is fine
- [ a space category] Should default to note
- This one is not an observation, should be ignored
- [design] This is valid 
"""
    await create_test_file(project_dir / "concept/invalid_category.md", content)

    # Sync
    await sync_service.sync(project_config.home)

    # Verify observations
    entity = await sync_service.entity_service.repository.get_by_permalink(
        "concept/invalid-category"
    )

    assert len(entity.observations) == 3
    categories = [obs.category for obs in entity.observations]

    # Invalid categories should be converted to default
    assert "random category" in categories
    # Valid categories preserved
    assert "a space category" in categories
    assert "design" in categories


@pytest.mark.skip("sometimes fails")
@pytest.mark.asyncio
async def test_sync_entity_with_order_dependent_relations(
    sync_service: SyncService, project_config: ProjectConfig
):
    """Test that order of entity syncing doesn't affect relation creation."""
    project_dir = project_config.home

    # Create several interrelated entities
    entities = {
        "a": """
---
type: knowledge
permalink: concept/entity-a
created: 2024-01-01
modified: 2024-01-01
---
# Entity A

## Observations
- depends on b
- depends on c

## Relations
- depends_on [[concept/entity-b]]
- depends_on [[concept/entity-c]]
""",
        "b": """
---
type: knowledge
permalink: concept/entity-b
created: 2024-01-01
modified: 2024-01-01
---
# Entity B

## Observations
- depends on c

## Relations
- depends_on [[concept/entity-c]]
""",
        "c": """
---
type: knowledge
permalink: concept/entity-c
created: 2024-01-01
modified: 2024-01-01
---
# Entity C

## Observations
- depends on a

## Relations
- depends_on [[concept/entity-a]]
""",
    }

    # Create files in different orders and verify results are the same
    for name, content in entities.items():
        await create_test_file(project_dir / f"concept/entity_{name}.md", content)

    # Sync
    await sync_service.sync(project_config.home)

    # Verify all relations are created correctly regardless of order
    entity_a = await sync_service.entity_service.repository.get_by_permalink("concept/entity-a")
    entity_b = await sync_service.entity_service.repository.get_by_permalink("concept/entity-b")
    entity_c = await sync_service.entity_service.repository.get_by_permalink("concept/entity-c")

    # Verify outgoing relations by checking actual targets
    a_outgoing_targets = {rel.to_id for rel in entity_a.outgoing_relations}
    assert entity_b.id in a_outgoing_targets, (
        f"A should depend on B. A's targets: {a_outgoing_targets}, B's ID: {entity_b.id}"
    )
    assert entity_c.id in a_outgoing_targets, (
        f"A should depend on C. A's targets: {a_outgoing_targets}, C's ID: {entity_c.id}"
    )
    assert len(entity_a.outgoing_relations) == 2, "A should have exactly 2 outgoing relations"

    b_outgoing_targets = {rel.to_id for rel in entity_b.outgoing_relations}
    assert entity_c.id in b_outgoing_targets, "B should depend on C"
    assert len(entity_b.outgoing_relations) == 1, "B should have exactly 1 outgoing relation"

    c_outgoing_targets = {rel.to_id for rel in entity_c.outgoing_relations}
    assert entity_a.id in c_outgoing_targets, "C should depend on A"
    assert len(entity_c.outgoing_relations) == 1, "C should have exactly 1 outgoing relation"

    # Verify incoming relations by checking actual sources
    a_incoming_sources = {rel.from_id for rel in entity_a.incoming_relations}
    assert entity_c.id in a_incoming_sources, "A should have incoming relation from C"

    b_incoming_sources = {rel.from_id for rel in entity_b.incoming_relations}
    assert entity_a.id in b_incoming_sources, "B should have incoming relation from A"

    c_incoming_sources = {rel.from_id for rel in entity_c.incoming_relations}
    assert entity_a.id in c_incoming_sources, "C should have incoming relation from A"
    assert entity_b.id in c_incoming_sources, "C should have incoming relation from B"


@pytest.mark.asyncio
async def test_sync_empty_directories(sync_service: SyncService, project_config: ProjectConfig):
    """Test syncing empty directories."""
    await sync_service.sync(project_config.home)

    # Should not raise exceptions for empty dirs
    assert project_config.home.exists()


@pytest.mark.skip("flaky on Windows due to filesystem timing precision")
@pytest.mark.asyncio
async def test_sync_file_modified_during_sync(
    sync_service: SyncService, project_config: ProjectConfig
):
    """Test handling of files that change during sync process."""
    # Create initial files
    doc_path = project_config.home / "changing.md"
    await create_test_file(
        doc_path,
        """
---
type: knowledge
id: changing
created: 2024-01-01
modified: 2024-01-01
---
# Knowledge File

## Observations
- This is a test
""",
    )

    # Setup async modification during sync
    async def modify_file():
        await asyncio.sleep(0.1)  # Small delay to ensure sync has started
        doc_path.write_text("Modified during sync")

    # Run sync and modification concurrently
    await asyncio.gather(sync_service.sync(project_config.home), modify_file())

    # Verify final state
    doc = await sync_service.entity_service.repository.get_by_permalink("changing")
    assert doc is not None

    # if we failed in the middle of a sync, the next one should fix it.
    if doc.checksum is None:
        await sync_service.sync(project_config.home)
        doc = await sync_service.entity_service.repository.get_by_permalink("changing")
        assert doc.checksum is not None


@pytest.mark.asyncio
async def test_permalink_formatting(
    sync_service: SyncService, project_config: ProjectConfig, entity_service: EntityService
):
    """Test that permalinks are properly formatted during sync."""

    # Test cases with different filename formats
    test_files = {
        # filename -> expected permalink
        "my_awesome_feature.md": "my-awesome-feature",
        "MIXED_CASE_NAME.md": "mixed-case-name",
        "spaces and_underscores.md": "spaces-and-underscores",
        "design/model_refactor.md": "design/model-refactor",
        "test/multiple_word_directory/feature_name.md": "test/multiple-word-directory/feature-name",
    }

    # Create test files
    content: str = """
---
type: knowledge
created: 2024-01-01
modified: 2024-01-01
---
# Test File

Testing permalink generation.
"""
    for filename, _ in test_files.items():
        await create_test_file(project_config.home / filename, content)

    # Run sync once after all files are created
    await sync_service.sync(project_config.home)

    # Verify permalinks
    entities = await entity_service.repository.find_all()
    for filename, expected_permalink in test_files.items():
        # Find entity for this file
        entity = next(e for e in entities if e.file_path == filename)
        assert entity.permalink == expected_permalink, (
            f"File {filename} should have permalink {expected_permalink}"
        )


@pytest.mark.asyncio
async def test_handle_entity_deletion(
    test_graph,
    sync_service: SyncService,
    entity_repository: EntityRepository,
    search_service: SearchService,
):
    """Test deletion of entity cleans up search index."""

    root_entity = test_graph["root"]
    # Delete the entity
    await sync_service.handle_delete(root_entity.file_path)

    # Verify entity is gone from db
    assert await entity_repository.get_by_permalink(root_entity.permalink) is None

    # Verify entity is gone from search index
    entity_results = await search_service.search(SearchQuery(text=root_entity.title))
    assert len(entity_results) == 0

    obs_results = await search_service.search(SearchQuery(text="Root note 1"))
    assert len(obs_results) == 0

    # Verify relations from root entity are gone
    # (Postgres stemming would match "connects_to" with "connected_to", so use permalink)
    rel_results = await search_service.search(SearchQuery(permalink=root_entity.permalink))
    assert len(rel_results) == 0


@pytest.mark.asyncio
async def test_sync_preserves_timestamps(
    sync_service: SyncService,
    project_config: ProjectConfig,
    entity_service: EntityService,
):
    """Test that sync preserves file timestamps and frontmatter dates."""
    project_dir = project_config.home

    # Create a file with explicit frontmatter dates
    frontmatter_content = """
---
type: knowledge
---
# Explicit Dates
Testing frontmatter dates
"""
    await create_test_file(project_dir / "explicit_dates.md", frontmatter_content)

    # Create a file without dates (will use file timestamps)
    file_dates_content = """
---
type: knowledge
---
# File Dates
Testing file timestamps
"""
    file_path = project_dir / "file_dates3.md"
    await create_test_file(file_path, file_dates_content)

    # Run sync
    await sync_service.sync(project_config.home)

    # Check explicit frontmatter dates
    explicit_entity = await entity_service.get_by_permalink("explicit-dates")
    assert explicit_entity.created_at is not None
    assert explicit_entity.updated_at is not None

    # Check file timestamps
    file_entity = await entity_service.get_by_permalink("file-dates3")
    file_stats = file_path.stat()

    # Compare using epoch timestamps to handle timezone differences correctly
    # This ensures we're comparing the actual points in time, not display representations
    entity_created_epoch = file_entity.created_at.timestamp()
    entity_updated_epoch = file_entity.updated_at.timestamp()

    # Allow 2s difference on Windows due to filesystem timing precision
    tolerance = 2 if os.name == "nt" else 1
    assert abs(entity_created_epoch - file_stats.st_ctime) < tolerance
    assert abs(entity_updated_epoch - file_stats.st_mtime) < tolerance  # Allow tolerance difference


@pytest.mark.asyncio
async def test_sync_updates_timestamps_on_file_modification(
    sync_service: SyncService,
    project_config: ProjectConfig,
    entity_service: EntityService,
):
    """Test that sync updates entity timestamps when files are modified.

    This test specifically validates that when an existing file is modified and re-synced,
    the updated_at timestamp in the database reflects the file's actual modification time,
    not the database operation time. This is critical for accurate temporal ordering in
    search and recent_activity queries.
    """
    project_dir = project_config.home

    # Create initial file
    initial_content = """
---
type: knowledge
---
# Test File
Initial content for timestamp test
"""
    file_path = project_dir / "timestamp_test.md"
    await create_test_file(file_path, initial_content)

    # Initial sync
    await sync_service.sync(project_config.home)

    # Get initial entity and timestamps
    entity_before = await entity_service.get_by_permalink("timestamp-test")
    initial_updated_at = entity_before.updated_at

    # Modify the file content and update mtime to be newer than watermark
    modified_content = """
---
type: knowledge
---
# Test File
Modified content for timestamp test

## Observations
- [test] This was modified
"""
    file_path.write_text(modified_content)

    # Touch file to ensure mtime is newer than watermark
    # This uses our helper which sleeps 500ms and rewrites to guarantee mtime change
    await touch_file(file_path)

    # Get the file's modification time after our changes
    file_stats_after_modification = file_path.stat()

    # Force full scan to ensure the modified file is detected
    # (incremental scans have timing precision issues with watermarks on some filesystems)
    await force_full_scan(sync_service)

    # Re-sync the modified file
    await sync_service.sync(project_config.home)

    # Get entity after re-sync
    entity_after = await entity_service.get_by_permalink("timestamp-test")

    # Verify that updated_at changed
    assert entity_after.updated_at != initial_updated_at, (
        "updated_at should change when file is modified"
    )

    # Verify that updated_at matches the file's modification time, not db operation time
    entity_updated_epoch = entity_after.updated_at.timestamp()
    file_mtime = file_stats_after_modification.st_mtime

    # Allow 2s difference on Windows due to filesystem timing precision
    tolerance = 2 if os.name == "nt" else 1
    assert abs(entity_updated_epoch - file_mtime) < tolerance, (
        f"Entity updated_at ({entity_after.updated_at}) should match file mtime "
        f"({datetime.fromtimestamp(file_mtime)}) within {tolerance}s tolerance"
    )

    # Verify the content was actually updated
    assert len(entity_after.observations) == 1
    assert entity_after.observations[0].content == "This was modified"


@pytest.mark.asyncio
async def test_file_move_updates_search_index(
    sync_service: SyncService,
    project_config: ProjectConfig,
    search_service: SearchService,
):
    """Test that moving a file updates its path in the search index."""
    project_dir = project_config.home

    # Create initial file
    content = """
---
type: knowledge
---
# Test Move
Content for move test
"""
    old_path = project_dir / "old" / "test_move.md"
    old_path.parent.mkdir(parents=True)
    await create_test_file(old_path, content)

    # Initial sync
    await sync_service.sync(project_config.home)

    # Move the file
    new_path = project_dir / "new" / "moved_file.md"
    new_path.parent.mkdir(parents=True)
    old_path.rename(new_path)

    # Force full scan to detect the move
    # (rename doesn't update mtime, so incremental scan won't find it)
    await force_full_scan(sync_service)

    # Second sync should detect the move
    await sync_service.sync(project_config.home)

    # Check search index has updated path
    results = await search_service.search(SearchQuery(text="Content for move test"))
    assert len(results) == 1
    assert results[0].file_path == new_path.relative_to(project_dir).as_posix()


@pytest.mark.asyncio
async def test_sync_null_checksum_cleanup(
    sync_service: SyncService,
    project_config: ProjectConfig,
    entity_service: EntityService,
):
    """Test handling of entities with null checksums from incomplete syncs."""
    # Create entity with null checksum (simulating incomplete sync)
    entity = Entity(
        permalink="concept/incomplete",
        title="Incomplete",
        entity_type="test",
        file_path="concept/incomplete.md",
        checksum=None,  # Null checksum
        content_type="text/markdown",
        created_at=datetime.now(timezone.utc),
        updated_at=datetime.now(timezone.utc),
    )
    await entity_service.repository.add(entity)

    # Create corresponding file
    content = """
---
type: knowledge
id: concept/incomplete
created: 2024-01-01
modified: 2024-01-01
---
# Incomplete Entity

## Observations
- Testing cleanup
"""
    await create_test_file(project_config.home / "concept/incomplete.md", content)

    # Run sync
    await sync_service.sync(project_config.home)

    # Verify entity was properly synced
    updated = await entity_service.get_by_permalink("concept/incomplete")
    assert updated.checksum is not None


@pytest.mark.asyncio
async def test_sync_permalink_resolved(
    sync_service: SyncService, project_config: ProjectConfig, file_service: FileService, app_config
):
    """Test that we resolve duplicate permalinks on sync ."""
    project_dir = project_config.home

    # Create initial file
    content = """
---
type: knowledge
---
# Test Move
Content for move test
"""
    old_path = project_dir / "old" / "test_move.md"
    old_path.parent.mkdir(parents=True)
    await create_test_file(old_path, content)

    # Initial sync
    await sync_service.sync(project_config.home)

    # Move the file
    new_path = project_dir / "new" / "moved_file.md"
    new_path.parent.mkdir(parents=True)
    old_path.rename(new_path)

    # Force full scan to detect the move
    # (rename doesn't update mtime, so incremental scan won't find it)
    await force_full_scan(sync_service)

    # Sync again
    await sync_service.sync(project_config.home)

    file_content, _ = await file_service.read_file(new_path)
    assert "permalink: new/moved-file" in file_content

    # Create another that has the same permalink
    content = """
---
type: knowledge
permalink: new/moved-file
---
# Test Move
Content for move test
"""
    old_path = project_dir / "old" / "test_move.md"
    old_path.parent.mkdir(parents=True, exist_ok=True)
    await create_test_file(old_path, content)

    # Force full scan to detect the new file
    # (file just created may not be newer than watermark due to timing precision)
    await force_full_scan(sync_service)

    # Sync new file
    await sync_service.sync(project_config.home)

    # assert permalink is unique
    file_content, _ = await file_service.read_file(old_path)
    assert "permalink: new/moved-file-1" in file_content


@pytest.mark.asyncio
async def test_sync_permalink_resolved_on_update(
    sync_service: SyncService,
    project_config: ProjectConfig,
    file_service: FileService,
):
    """Test that sync resolves permalink conflicts on update."""
    project_dir = project_config.home

    one_file = project_dir / "one.md"
    two_file = project_dir / "two.md"
    await create_test_file(
        one_file,
        content=dedent(
            """
            ---
            permalink: one
            ---
            test content
            """
        ),
    )
    await create_test_file(
        two_file,
        content=dedent(
            """
            ---
            permalink: two
            ---
            test content
            """
        ),
    )

    # Run sync
    await sync_service.sync(project_config.home)

    # Check permalinks
    file_one_content, _ = await file_service.read_file(one_file)
    assert "permalink: one" in file_one_content

    file_two_content, _ = await file_service.read_file(two_file)
    assert "permalink: two" in file_two_content

    # update the second file with a duplicate permalink
    updated_content = """
---
title: two.md
type: note
permalink: one
tags: []
---

test content
"""
    two_file.write_text(updated_content)

    # Force full scan to detect the modified file
    # (file just modified may not be newer than watermark due to timing precision)
    await force_full_scan(sync_service)

    # Run sync
    await sync_service.sync(project_config.home)

    # Check permalinks
    file_two_content, _ = await file_service.read_file(two_file)
    assert "permalink: two" in file_two_content

    # new content with duplicate permalink
    new_content = """
---
title: new.md
type: note
permalink: one
tags: []
---

test content
"""
    new_file = project_dir / "new.md"
    await create_test_file(new_file, new_content)

    # Force full scan to detect the new file
    # (file just created may not be newer than watermark due to timing precision)
    await force_full_scan(sync_service)

    # Run another time
    await sync_service.sync(project_config.home)

    # Should have deduplicated permalink
    new_file_content, _ = await file_service.read_file(new_file)
    assert "permalink: one-1" in new_file_content


@pytest.mark.asyncio
async def test_sync_permalink_not_created_if_no_frontmatter(
    sync_service: SyncService,
    project_config: ProjectConfig,
    file_service: FileService,
):
    """Test that sync resolves permalink conflicts on update."""
    project_dir = project_config.home

    file = project_dir / "one.md"
    await create_test_file(file)

    # Run sync
    await sync_service.sync(project_config.home)

    # Check permalink not created
    file_content, _ = await file_service.read_file(file)
    assert "permalink:" not in file_content


@pytest.fixture
def test_config_update_permamlinks_on_move(app_config) -> BasicMemoryConfig:
    """Test configuration using in-memory DB."""
    app_config.update_permalinks_on_move = True
    return app_config


@pytest.mark.asyncio
async def test_sync_permalink_updated_on_move(
    test_config_update_permamlinks_on_move: BasicMemoryConfig,
    project_config: ProjectConfig,
    sync_service: SyncService,
    file_service: FileService,
):
    """Test that we update a permalink on a file move if set in config ."""
    project_dir = project_config.home

    # Create initial file
    content = dedent(
        """
        ---
        type: knowledge
        ---
        # Test Move
        Content for move test
        """
    )

    old_path = project_dir / "old" / "test_move.md"
    old_path.parent.mkdir(parents=True)
    await create_test_file(old_path, content)

    # Initial sync
    await sync_service.sync(project_config.home)

    # verify permalink
    old_content, _ = await file_service.read_file(old_path)
    assert "permalink: old/test-move" in old_content

    # Move the file
    new_path = project_dir / "new" / "moved_file.md"
    new_path.parent.mkdir(parents=True)
    old_path.rename(new_path)

    # Force full scan to detect the move
    # (rename doesn't update mtime, so incremental scan won't find it)
    await force_full_scan(sync_service)

    # Sync again
    await sync_service.sync(project_config.home)

    file_content, _ = await file_service.read_file(new_path)
    assert "permalink: new/moved-file" in file_content


@pytest.mark.asyncio
async def test_sync_non_markdown_files(sync_service, project_config, test_files):
    """Test syncing non-markdown files."""
    report = await sync_service.sync(project_config.home)
    assert report.total == 2

    # Check files were detected
    assert test_files["pdf"].name in [f for f in report.new]
    assert test_files["image"].name in [f for f in report.new]

    # Verify entities were created
    pdf_entity = await sync_service.entity_repository.get_by_file_path(str(test_files["pdf"].name))
    assert pdf_entity is not None, "PDF entity should have been created"
    assert pdf_entity.content_type == "application/pdf"

    image_entity = await sync_service.entity_repository.get_by_file_path(
        str(test_files["image"].name)
    )
    assert image_entity.content_type == "image/png"


@pytest.mark.asyncio
async def test_sync_non_markdown_files_modified(
    sync_service, project_config, test_files, file_service
):
    """Test syncing non-markdown files."""
    report = await sync_service.sync(project_config.home)
    assert report.total == 2

    # Check files were detected
    assert test_files["pdf"].name in [f for f in report.new]
    assert test_files["image"].name in [f for f in report.new]

    test_files["pdf"].write_text("New content")
    test_files["image"].write_text("New content")

    # Force full scan to detect the modified files
    # (files just modified may not be newer than watermark due to timing precision)
    await force_full_scan(sync_service)

    report = await sync_service.sync(project_config.home)
    assert len(report.modified) == 2

    pdf_file_content, pdf_checksum = await file_service.read_file(test_files["pdf"].name)
    image_file_content, img_checksum = await file_service.read_file(test_files["image"].name)

    pdf_entity = await sync_service.entity_repository.get_by_file_path(str(test_files["pdf"].name))
    image_entity = await sync_service.entity_repository.get_by_file_path(
        str(test_files["image"].name)
    )

    assert pdf_entity.checksum == pdf_checksum
    assert image_entity.checksum == img_checksum


@pytest.mark.asyncio
async def test_sync_non_markdown_files_move(sync_service, project_config, test_files):
    """Test syncing non-markdown files updates permalink"""
    report = await sync_service.sync(project_config.home)
    assert report.total == 2

    # Check files were detected
    assert test_files["pdf"].name in [f for f in report.new]
    assert test_files["image"].name in [f for f in report.new]

    test_files["pdf"].rename(project_config.home / "moved_pdf.pdf")

    # Force full scan to detect the move
    # (rename doesn't update mtime, so incremental scan won't find it)
    await force_full_scan(sync_service)

    report2 = await sync_service.sync(project_config.home)
    assert len(report2.moves) == 1

    # Verify entity is updated
    pdf_entity = await sync_service.entity_repository.get_by_file_path("moved_pdf.pdf")
    assert pdf_entity is not None
    assert pdf_entity.permalink is None


@pytest.mark.asyncio
async def test_sync_non_markdown_files_deleted(sync_service, project_config, test_files):
    """Test syncing non-markdown files updates permalink"""
    report = await sync_service.sync(project_config.home)
    assert report.total == 2

    # Check files were detected
    assert test_files["pdf"].name in [f for f in report.new]
    assert test_files["image"].name in [f for f in report.new]

    test_files["pdf"].unlink()
    report2 = await sync_service.sync(project_config.home)
    assert len(report2.deleted) == 1

    # Verify entity is deleted
    pdf_entity = await sync_service.entity_repository.get_by_file_path("moved_pdf.pdf")
    assert pdf_entity is None


@pytest.mark.asyncio
async def test_sync_non_markdown_files_move_with_delete(
    sync_service, project_config, test_files, file_service
):
    """Test syncing non-markdown files handles file deletes and renames during sync"""

    # Create initial files
    await create_test_file(project_config.home / "doc.pdf", "content1")
    await create_test_file(project_config.home / "other/doc-1.pdf", "content2")

    # Initial sync
    await sync_service.sync(project_config.home)

    # First move/delete the original file to make way for the move
    (project_config.home / "doc.pdf").unlink()
    (project_config.home / "other/doc-1.pdf").rename(project_config.home / "doc.pdf")

    # Sync again
    await sync_service.sync(project_config.home)

    # Verify the changes
    moved_entity = await sync_service.entity_repository.get_by_file_path("doc.pdf")
    assert moved_entity is not None
    assert moved_entity.permalink is None

    file_content, _ = await file_service.read_file("doc.pdf")
    assert "content2" in file_content


@pytest.mark.asyncio
async def test_sync_relation_to_non_markdown_file(
    sync_service: SyncService, project_config: ProjectConfig, file_service: FileService, test_files
):
    """Test that sync resolves permalink conflicts on update."""
    project_dir = project_config.home

    content = f"""
---
title: a note
type: note
tags: []
---

- relates_to [[{test_files["pdf"].name}]]
"""

    note_file = project_dir / "note.md"
    await create_test_file(note_file, content)

    # Run sync
    await sync_service.sync(project_config.home)

    # Check permalinks
    file_one_content, _ = await file_service.read_file(note_file)
    assert (
        f"""---
title: a note
type: note
tags: []
permalink: note
---

- relates_to [[{test_files["pdf"].name}]]
""".strip()
        == file_one_content
    )


@pytest.mark.asyncio
async def test_sync_regular_file_race_condition_handling(
    sync_service: SyncService, project_config: ProjectConfig
):
    """Test that sync_regular_file handles race condition with IntegrityError (lines 380-401)."""
    from datetime import datetime, timezone

    # Create a test file
    test_file = project_config.home / "test_race.md"
    test_content = """
---
type: knowledge
---
# Test Race Condition
This is a test file for race condition handling.
"""
    await create_test_file(test_file, test_content)

    rel_path = test_file.relative_to(project_config.home).as_posix()

    # Create an existing entity with the same file_path to force a real DB IntegrityError
    # on the "add" call (same effect as the race-condition branch).
    await sync_service.entity_repository.add(
        Entity(
            entity_type="file",
            file_path=rel_path,
            checksum="old_checksum",
            title="Test Race Condition",
            created_at=datetime.now(timezone.utc),
            updated_at=datetime.now(timezone.utc),
            content_type="text/markdown",
            mtime=None,
            size=None,
        )
    )

    # Call sync_regular_file (new=True) - should fall back to update path
    entity, checksum = await sync_service.sync_regular_file(rel_path, new=True)

    assert entity is not None
    assert entity.file_path == rel_path
    assert entity.checksum == checksum


@pytest.mark.asyncio
async def test_circuit_breaker_should_skip_after_three_recorded_failures(
    sync_service: SyncService, project_config: ProjectConfig
):
    """Circuit breaker: after 3 recorded failures, unchanged file should be skipped."""
    project_dir = project_config.home
    test_file = project_dir / "failing_file.md"
    await create_test_file(test_file, "---\ntype: note\n---\ncontent\n")

    rel_path = test_file.relative_to(project_dir).as_posix()

    await sync_service._record_failure(rel_path, "failure 1")
    await sync_service._record_failure(rel_path, "failure 2")
    await sync_service._record_failure(rel_path, "failure 3")

    assert await sync_service._should_skip_file(rel_path) is True
    assert rel_path in sync_service._file_failures
    assert sync_service._file_failures[rel_path].count == 3


@pytest.mark.asyncio
async def test_circuit_breaker_resets_when_checksum_changes(
    sync_service: SyncService, project_config: ProjectConfig
):
    """Circuit breaker: if file checksum changes, it should be retried (not skipped)."""
    project_dir = project_config.home
    test_file = project_dir / "changing_file.md"
    await create_test_file(test_file, "---\ntype: note\n---\ncontent\n")

    rel_path = test_file.relative_to(project_dir).as_posix()

    await sync_service._record_failure(rel_path, "failure 1")
    await sync_service._record_failure(rel_path, "failure 2")
    await sync_service._record_failure(rel_path, "failure 3")

    assert await sync_service._should_skip_file(rel_path) is True

    # Change content → checksum changes → _should_skip_file should reset and allow retry
    test_file.write_text("---\ntype: note\n---\nchanged content\n")
    assert await sync_service._should_skip_file(rel_path) is False
    assert rel_path not in sync_service._file_failures


@pytest.mark.asyncio
async def test_record_failure_uses_empty_checksum_when_checksum_computation_fails(
    sync_service: SyncService,
):
    """_record_failure() should not crash if checksum computation fails."""
    missing_path = "does-not-exist.md"
    await sync_service._record_failure(missing_path, "boom")
    assert missing_path in sync_service._file_failures
    assert sync_service._file_failures[missing_path].last_checksum == ""


@pytest.mark.asyncio
async def test_sync_fatal_error_terminates_sync_immediately(
    sync_service: SyncService, project_config: ProjectConfig, entity_service: EntityService
):
    """Test that SyncFatalError terminates sync immediately without circuit breaker retry.

    This tests the fix for issue #188 where project deletion during sync should
    terminate immediately rather than retrying each file 3 times.
    """
    pytest.skip(
        "SyncFatalError behavior is excluded from coverage and not reliably reproducible "
        "without patching (depends on project deletion during sync)."
    )


@pytest.mark.asyncio
async def test_scan_directory_basic(sync_service: SyncService, project_config: ProjectConfig):
    """Test basic streaming directory scan functionality."""
    project_dir = project_config.home

    # Create test files in different directories
    await create_test_file(project_dir / "root.md", "root content")
    await create_test_file(project_dir / "subdir/file1.md", "file 1 content")
    await create_test_file(project_dir / "subdir/file2.md", "file 2 content")
    await create_test_file(project_dir / "subdir/nested/file3.md", "file 3 content")

    # Collect results from streaming iterator
    results = []
    async for file_path, stat_info in sync_service.scan_directory(project_dir):
        rel_path = Path(file_path).relative_to(project_dir).as_posix()
        results.append((rel_path, stat_info))

    # Verify all files were found
    file_paths = {rel_path for rel_path, _ in results}
    assert "root.md" in file_paths
    assert "subdir/file1.md" in file_paths
    assert "subdir/file2.md" in file_paths
    assert "subdir/nested/file3.md" in file_paths
    assert len(file_paths) == 4

    # Verify stat info is present for each file
    for rel_path, stat_info in results:
        assert stat_info is not None
        assert stat_info.st_size > 0  # Files have content
        assert stat_info.st_mtime > 0  # Have modification time


@pytest.mark.asyncio
async def test_scan_directory_respects_ignore_patterns(
    sync_service: SyncService, project_config: ProjectConfig
):
    """Test that streaming scan respects .gitignore patterns."""
    project_dir = project_config.home

    # Create .gitignore file in project (will be used along with .bmignore)
    (project_dir / ".gitignore").write_text("*.ignored\n.hidden/\n")

    # Reload ignore patterns using project's .gitignore
    from basic_memory.ignore_utils import load_gitignore_patterns

    sync_service._ignore_patterns = load_gitignore_patterns(project_dir)

    # Create test files - some should be ignored
    await create_test_file(project_dir / "included.md", "included")
    await create_test_file(project_dir / "excluded.ignored", "excluded")
    await create_test_file(project_dir / ".hidden/secret.md", "secret")
    await create_test_file(project_dir / "subdir/file.md", "file")

    # Collect results
    results = []
    async for file_path, stat_info in sync_service.scan_directory(project_dir):
        rel_path = Path(file_path).relative_to(project_dir).as_posix()
        results.append(rel_path)

    # Verify ignored files were not returned
    assert "included.md" in results
    assert "subdir/file.md" in results
    assert "excluded.ignored" not in results
    assert ".hidden/secret.md" not in results
    assert ".bmignore" not in results  # .bmignore itself should be ignored


@pytest.mark.asyncio
async def test_scan_directory_cached_stat_info(
    sync_service: SyncService, project_config: ProjectConfig
):
    """Test that streaming scan provides cached stat info (no redundant stat calls)."""
    project_dir = project_config.home

    # Create test file
    test_file = project_dir / "test.md"
    await create_test_file(test_file, "test content")

    # Get stat info from streaming scan
    async for file_path, stat_info in sync_service.scan_directory(project_dir):
        if Path(file_path).name == "test.md":
            # Get independent stat for comparison
            independent_stat = test_file.stat()

            # Verify stat info matches (cached stat should be accurate)
            assert stat_info.st_size == independent_stat.st_size
            assert abs(stat_info.st_mtime - independent_stat.st_mtime) < 1  # Allow 1s tolerance
            assert abs(stat_info.st_ctime - independent_stat.st_ctime) < 1
            break


@pytest.mark.asyncio
async def test_scan_directory_empty_directory(
    sync_service: SyncService, project_config: ProjectConfig
):
    """Test streaming scan on empty directory (ignoring hidden files)."""
    project_dir = project_config.home

    # Directory exists but has no user files (may have .basic-memory config dir)
    assert project_dir.exists()

    # Don't create any user files - just scan empty directory
    # Scan should yield no results (hidden files are ignored by default)
    results = []
    async for file_path, stat_info in sync_service.scan_directory(project_dir):
        results.append(file_path)

    # Should find no files (config dirs are hidden and ignored)
    assert len(results) == 0


@pytest.mark.asyncio
async def test_scan_directory_handles_permission_error(
    sync_service: SyncService, project_config: ProjectConfig
):
    """Test that streaming scan handles permission errors gracefully."""
    import sys

    # Skip on Windows - permission handling is different
    if sys.platform == "win32":
        pytest.skip("Permission tests not reliable on Windows")

    project_dir = project_config.home

    # Create accessible file
    await create_test_file(project_dir / "accessible.md", "accessible")

    # Create restricted directory
    restricted_dir = project_dir / "restricted"
    restricted_dir.mkdir()
    await create_test_file(restricted_dir / "secret.md", "secret")

    # Remove read permission from restricted directory
    restricted_dir.chmod(0o000)

    try:
        # Scan should handle permission error and continue
        results = []
        async for file_path, stat_info in sync_service.scan_directory(project_dir):
            rel_path = Path(file_path).relative_to(project_dir).as_posix()
            results.append(rel_path)

        # Should have found accessible file but not restricted one
        assert "accessible.md" in results
        assert "restricted/secret.md" not in results

    finally:
        # Restore permissions for cleanup
        restricted_dir.chmod(0o755)


@pytest.mark.asyncio
async def test_scan_directory_non_markdown_files(
    sync_service: SyncService, project_config: ProjectConfig
):
    """Test that streaming scan finds all file types, not just markdown."""
    project_dir = project_config.home

    # Create various file types
    await create_test_file(project_dir / "doc.md", "markdown")
    (project_dir / "image.png").write_bytes(b"PNG content")
    (project_dir / "data.json").write_text('{"key": "value"}')
    (project_dir / "script.py").write_text("print('hello')")

    # Collect results
    results = []
    async for file_path, stat_info in sync_service.scan_directory(project_dir):
        rel_path = Path(file_path).relative_to(project_dir).as_posix()
        results.append(rel_path)

    # All files should be found
    assert "doc.md" in results
    assert "image.png" in results
    assert "data.json" in results
    assert "script.py" in results


@pytest.mark.asyncio
async def test_file_service_checksum_correctness(
    sync_service: SyncService, project_config: ProjectConfig
):
    """Test that FileService computes correct checksums."""
    import hashlib

    project_dir = project_config.home

    # Test small markdown file
    small_content = "Test content for checksum validation" * 10
    small_file = project_dir / "small.md"
    await create_test_file(small_file, small_content)

    rel_path = small_file.relative_to(project_dir).as_posix()
    checksum = await sync_service.file_service.compute_checksum(rel_path)

    # Verify checksum is correct
    expected = hashlib.sha256(small_content.encode("utf-8")).hexdigest()
    assert checksum == expected
    assert len(checksum) == 64  # SHA256 hex digest length


@pytest.mark.asyncio
async def test_sync_handles_file_not_found_gracefully(
    sync_service: SyncService, project_config: ProjectConfig
):
    """Test that FileNotFoundError during sync is handled gracefully.

    This tests the fix for issue #386 where files existing in the database
    but missing from the filesystem would crash the sync worker.
    """
    project_dir = project_config.home

    # Create a test file
    test_file = project_dir / "missing_file.md"
    await create_test_file(
        test_file,
        dedent(
            """
            ---
            type: knowledge
            permalink: missing-file
            ---
            # Missing File
            Content that will disappear
            """
        ),
    )

    # Sync to add entity to database
    await sync_service.sync(project_dir)

    # Verify entity was created
    entity = await sync_service.entity_repository.get_by_file_path("missing_file.md")
    assert entity is not None
    assert entity.permalink == "missing-file"

    # Delete the file but leave the entity in database (simulating inconsistency)
    test_file.unlink()

    # Sync the missing file directly: sync_markdown_file will raise FileNotFoundError naturally,
    # and sync_file() should treat it as deletion.
    await sync_service.sync_file("missing_file.md", new=False)

    # Entity should be deleted from database
    entity = await sync_service.entity_repository.get_by_file_path("missing_file.md")
    assert entity is None, "Orphaned entity should be deleted when file is not found"

```

--------------------------------------------------------------------------------
/src/basic_memory/sync/sync_service.py:
--------------------------------------------------------------------------------

```python
"""Service for syncing files between filesystem and database."""

import asyncio
import os
import sys
import time
from collections import OrderedDict
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import AsyncIterator, Dict, List, Optional, Set, Tuple

import aiofiles.os

from loguru import logger
from sqlalchemy.exc import IntegrityError

from basic_memory import db
from basic_memory.config import BasicMemoryConfig, ConfigManager
from basic_memory.file_utils import has_frontmatter
from basic_memory.ignore_utils import load_bmignore_patterns, should_ignore_path
from basic_memory.markdown import EntityParser, MarkdownProcessor
from basic_memory.models import Entity, Project
from basic_memory.repository import (
    EntityRepository,
    RelationRepository,
    ObservationRepository,
    ProjectRepository,
)
from basic_memory.repository.search_repository import create_search_repository
from basic_memory.services import EntityService, FileService
from basic_memory.services.exceptions import SyncFatalError
from basic_memory.services.link_resolver import LinkResolver
from basic_memory.services.search_service import SearchService

# Circuit breaker configuration
MAX_CONSECUTIVE_FAILURES = 3


@dataclass
class FileFailureInfo:
    """Track failure information for a file that repeatedly fails to sync.

    Attributes:
        count: Number of consecutive failures
        first_failure: Timestamp of first failure in current sequence
        last_failure: Timestamp of most recent failure
        last_error: Error message from most recent failure
        last_checksum: Checksum of file when it last failed (for detecting file changes)
    """

    count: int
    first_failure: datetime
    last_failure: datetime
    last_error: str
    last_checksum: str


@dataclass
class SkippedFile:
    """Information about a file that was skipped due to repeated failures.

    Attributes:
        path: File path relative to project root
        reason: Error message from last failure
        failure_count: Number of consecutive failures
        first_failed: Timestamp of first failure
    """

    path: str
    reason: str
    failure_count: int
    first_failed: datetime


@dataclass
class SyncReport:
    """Report of file changes found compared to database state.

    Attributes:
        total: Total number of files in directory being synced
        new: Files that exist on disk but not in database
        modified: Files that exist in both but have different checksums
        deleted: Files that exist in database but not on disk
        moves: Files that have been moved from one location to another
        checksums: Current checksums for files on disk
        skipped_files: Files that were skipped due to repeated failures
    """

    # We keep paths as strings in sets/dicts for easier serialization
    new: Set[str] = field(default_factory=set)
    modified: Set[str] = field(default_factory=set)
    deleted: Set[str] = field(default_factory=set)
    moves: Dict[str, str] = field(default_factory=dict)  # old_path -> new_path
    checksums: Dict[str, str] = field(default_factory=dict)  # path -> checksum
    skipped_files: List[SkippedFile] = field(default_factory=list)

    @property
    def total(self) -> int:
        """Total number of changes."""
        return len(self.new) + len(self.modified) + len(self.deleted) + len(self.moves)


@dataclass
class ScanResult:
    """Result of scanning a directory."""

    # file_path -> checksum
    files: Dict[str, str] = field(default_factory=dict)

    # checksum -> file_path
    checksums: Dict[str, str] = field(default_factory=dict)

    # file_path -> error message
    errors: Dict[str, str] = field(default_factory=dict)


class SyncService:
    """Syncs documents and knowledge files with database."""

    def __init__(
        self,
        app_config: BasicMemoryConfig,
        entity_service: EntityService,
        entity_parser: EntityParser,
        entity_repository: EntityRepository,
        relation_repository: RelationRepository,
        project_repository: ProjectRepository,
        search_service: SearchService,
        file_service: FileService,
    ):
        self.app_config = app_config
        self.entity_service = entity_service
        self.entity_parser = entity_parser
        self.entity_repository = entity_repository
        self.relation_repository = relation_repository
        self.project_repository = project_repository
        self.search_service = search_service
        self.file_service = file_service
        # Load ignore patterns once at initialization for performance
        self._ignore_patterns = load_bmignore_patterns()
        # Circuit breaker: track file failures to prevent infinite retry loops
        # Use OrderedDict for LRU behavior with bounded size to prevent unbounded memory growth
        self._file_failures: OrderedDict[str, FileFailureInfo] = OrderedDict()
        self._max_tracked_failures = 100  # Limit failure cache size

    async def _should_skip_file(self, path: str) -> bool:
        """Check if file should be skipped due to repeated failures.

        Computes current file checksum and compares with last failed checksum.
        If checksums differ, file has changed and we should retry.

        Args:
            path: File path to check

        Returns:
            True if file should be skipped, False otherwise
        """
        if path not in self._file_failures:
            return False

        failure_info = self._file_failures[path]

        # Check if failure count exceeds threshold
        if failure_info.count < MAX_CONSECUTIVE_FAILURES:
            return False

        # Compute current checksum to see if file changed
        try:
            current_checksum = await self.file_service.compute_checksum(path)

            # If checksum changed, file was modified - reset and retry
            if current_checksum != failure_info.last_checksum:
                logger.info(
                    f"File {path} changed since last failure (checksum differs), "
                    f"resetting failure count and retrying"
                )
                del self._file_failures[path]
                return False
        except Exception as e:
            # If we can't compute checksum, log but still skip to avoid infinite loops
            logger.warning(f"Failed to compute checksum for {path}: {e}")

        # File unchanged and exceeded threshold - skip it
        return True

    async def _record_failure(self, path: str, error: str) -> None:
        """Record a file sync failure for circuit breaker tracking.

        Uses LRU cache with bounded size to prevent unbounded memory growth.

        Args:
            path: File path that failed
            error: Error message from the failure
        """
        now = datetime.now()

        # Compute checksum for failure tracking
        try:
            checksum = await self.file_service.compute_checksum(path)
        except Exception:
            # If checksum fails, use empty string (better than crashing)
            checksum = ""

        if path in self._file_failures:
            # Update existing failure record and move to end (most recently used)
            failure_info = self._file_failures.pop(path)
            failure_info.count += 1
            failure_info.last_failure = now
            failure_info.last_error = error
            failure_info.last_checksum = checksum
            self._file_failures[path] = failure_info

            logger.warning(
                f"File sync failed (attempt {failure_info.count}/{MAX_CONSECUTIVE_FAILURES}): "
                f"path={path}, error={error}"
            )

            # Log when threshold is reached
            if failure_info.count >= MAX_CONSECUTIVE_FAILURES:
                logger.error(
                    f"File {path} has failed {MAX_CONSECUTIVE_FAILURES} times and will be skipped. "
                    f"First failure: {failure_info.first_failure}, Last error: {error}"
                )
        else:
            # Create new failure record
            self._file_failures[path] = FileFailureInfo(
                count=1,
                first_failure=now,
                last_failure=now,
                last_error=error,
                last_checksum=checksum,
            )
            logger.debug(f"Recording first failure for {path}: {error}")

            # Enforce cache size limit - remove oldest entry if over limit
            if len(self._file_failures) > self._max_tracked_failures:
                removed_path, removed_info = self._file_failures.popitem(last=False)
                logger.debug(
                    f"Evicting oldest failure record from cache: path={removed_path}, "
                    f"failures={removed_info.count}"
                )

    def _clear_failure(self, path: str) -> None:
        """Clear failure tracking for a file after successful sync.

        Args:
            path: File path that successfully synced
        """
        if path in self._file_failures:
            logger.info(f"Clearing failure history for {path} after successful sync")
            del self._file_failures[path]

    async def sync(
        self, directory: Path, project_name: Optional[str] = None, force_full: bool = False
    ) -> SyncReport:
        """Sync all files with database and update scan watermark.

        Args:
            directory: Directory to sync
            project_name: Optional project name
            force_full: If True, force a full scan bypassing watermark optimization
        """

        start_time = time.time()
        sync_start_timestamp = time.time()  # Capture at start for watermark
        logger.info(f"Sync operation started for directory: {directory} (force_full={force_full})")

        # initial paths from db to sync
        # path -> checksum
        report = await self.scan(directory, force_full=force_full)

        # order of sync matters to resolve relations effectively
        logger.info(
            f"Sync changes detected: new_files={len(report.new)}, modified_files={len(report.modified)}, "
            + f"deleted_files={len(report.deleted)}, moved_files={len(report.moves)}"
        )

        # sync moves first
        for old_path, new_path in report.moves.items():
            # in the case where a file has been deleted and replaced by another file
            # it will show up in the move and modified lists, so handle it in modified
            if new_path in report.modified:
                report.modified.remove(new_path)
                logger.debug(
                    f"File marked as moved and modified: old_path={old_path}, new_path={new_path}"
                )
            else:
                await self.handle_move(old_path, new_path)

        # deleted next
        for path in report.deleted:
            await self.handle_delete(path)

        # then new and modified
        for path in report.new:
            entity, _ = await self.sync_file(path, new=True)

            # Track if file was skipped
            if entity is None and await self._should_skip_file(path):
                failure_info = self._file_failures[path]
                report.skipped_files.append(
                    SkippedFile(
                        path=path,
                        reason=failure_info.last_error,
                        failure_count=failure_info.count,
                        first_failed=failure_info.first_failure,
                    )
                )

        for path in report.modified:
            entity, _ = await self.sync_file(path, new=False)

            # Track if file was skipped
            if entity is None and await self._should_skip_file(path):
                failure_info = self._file_failures[path]
                report.skipped_files.append(
                    SkippedFile(
                        path=path,
                        reason=failure_info.last_error,
                        failure_count=failure_info.count,
                        first_failed=failure_info.first_failure,
                    )
                )

        # Only resolve relations if there were actual changes
        # If no files changed, no new unresolved relations could have been created
        if report.total > 0:
            await self.resolve_relations()
        else:
            logger.info("Skipping relation resolution - no file changes detected")

        # Update scan watermark after successful sync
        # Use the timestamp from sync start (not end) to ensure we catch files
        # created during the sync on the next iteration
        current_file_count = await self._quick_count_files(directory)
        if self.entity_repository.project_id is not None:
            project = await self.project_repository.find_by_id(self.entity_repository.project_id)
            if project:
                await self.project_repository.update(
                    project.id,
                    {
                        "last_scan_timestamp": sync_start_timestamp,
                        "last_file_count": current_file_count,
                    },
                )
                logger.debug(
                    f"Updated scan watermark: timestamp={sync_start_timestamp}, "
                    f"file_count={current_file_count}"
                )

        duration_ms = int((time.time() - start_time) * 1000)

        # Log summary with skipped files if any
        if report.skipped_files:
            logger.warning(
                f"Sync completed with {len(report.skipped_files)} skipped files: "
                f"directory={directory}, total_changes={report.total}, "
                f"skipped={len(report.skipped_files)}, duration_ms={duration_ms}"
            )
            for skipped in report.skipped_files:
                logger.warning(
                    f"Skipped file: path={skipped.path}, "
                    f"failures={skipped.failure_count}, reason={skipped.reason}"
                )
        else:
            logger.info(
                f"Sync operation completed: directory={directory}, "
                f"total_changes={report.total}, duration_ms={duration_ms}"
            )

        return report

    async def scan(self, directory, force_full: bool = False):
        """Smart scan using watermark and file count for large project optimization.

        Uses scan watermark tracking to dramatically reduce scan time for large projects:
        - Tracks last_scan_timestamp and last_file_count in Project model
        - Uses `find -newermt` for incremental scanning (only changed files)
        - Falls back to full scan when deletions detected (file count decreased)

        Expected performance:
        - No changes: 225x faster (2s vs 450s for 1,460 files on TigrisFS)
        - Few changes: 84x faster (5s vs 420s)
        - Deletions: Full scan (rare, acceptable)

        Architecture:
        - Get current file count quickly (find | wc -l: 1.4s)
        - Compare with last_file_count to detect deletions
        - If no deletions: incremental scan with find -newermt (0.2s)
        - Process changed files with mtime-based comparison

        Args:
            directory: Directory to scan
            force_full: If True, bypass watermark optimization and force full scan
        """
        scan_start_time = time.time()

        report = SyncReport()

        # Get current project to check watermark
        if self.entity_repository.project_id is None:
            raise ValueError("Entity repository has no project_id set")

        project = await self.project_repository.find_by_id(self.entity_repository.project_id)
        if project is None:
            raise ValueError(f"Project not found: {self.entity_repository.project_id}")

        # Step 1: Quick file count
        logger.debug("Counting files in directory")
        current_count = await self._quick_count_files(directory)
        logger.debug(f"Found {current_count} files in directory")

        # Step 2: Determine scan strategy based on watermark and file count
        if force_full:
            # User explicitly requested full scan → bypass watermark optimization
            scan_type = "full_forced"
            logger.info("Force full scan requested, bypassing watermark optimization")
            file_paths_to_scan = await self._scan_directory_full(directory)

        elif project.last_file_count is None:
            # First sync ever → full scan
            scan_type = "full_initial"
            logger.info("First sync for this project, performing full scan")
            file_paths_to_scan = await self._scan_directory_full(directory)

        elif current_count < project.last_file_count:
            # Files deleted → need full scan to detect which ones
            scan_type = "full_deletions"
            logger.info(
                f"File count decreased ({project.last_file_count} → {current_count}), "
                f"running full scan to detect deletions"
            )
            file_paths_to_scan = await self._scan_directory_full(directory)

        elif project.last_scan_timestamp is not None:
            # Incremental scan: only files modified since last scan
            scan_type = "incremental"
            logger.info(
                f"Running incremental scan for files modified since {project.last_scan_timestamp}"
            )
            file_paths_to_scan = await self._scan_directory_modified_since(
                directory, project.last_scan_timestamp
            )
            logger.info(
                f"Incremental scan found {len(file_paths_to_scan)} potentially changed files"
            )

        else:
            # Fallback to full scan (no watermark available)
            scan_type = "full_fallback"
            logger.warning("No scan watermark available, falling back to full scan")
            file_paths_to_scan = await self._scan_directory_full(directory)

        # Step 3: Process each file with mtime-based comparison
        scanned_paths: Set[str] = set()
        changed_checksums: Dict[str, str] = {}

        logger.debug(f"Processing {len(file_paths_to_scan)} files with mtime-based comparison")

        for rel_path in file_paths_to_scan:
            scanned_paths.add(rel_path)

            # Get file stats
            abs_path = directory / rel_path
            if not abs_path.exists():
                # File was deleted between scan and now (race condition)
                continue

            stat_info = abs_path.stat()

            # Indexed lookup - single file query (not full table scan)
            db_entity = await self.entity_repository.get_by_file_path(rel_path)

            if db_entity is None:
                # New file - need checksum for move detection
                checksum = await self.file_service.compute_checksum(rel_path)
                report.new.add(rel_path)
                changed_checksums[rel_path] = checksum
                logger.trace(f"New file detected: {rel_path}")
                continue

            # File exists in DB - check if mtime/size changed
            db_mtime = db_entity.mtime
            db_size = db_entity.size
            fs_mtime = stat_info.st_mtime
            fs_size = stat_info.st_size

            # Compare mtime and size (like rsync/rclone)
            # Allow small epsilon for float comparison (0.01s = 10ms)
            mtime_changed = db_mtime is None or abs(fs_mtime - db_mtime) > 0.01
            size_changed = db_size is None or fs_size != db_size

            if mtime_changed or size_changed:
                # File modified - compute checksum
                checksum = await self.file_service.compute_checksum(rel_path)
                db_checksum = db_entity.checksum

                # Only mark as modified if checksum actually differs
                # (handles cases where mtime changed but content didn't, e.g., git operations)
                if checksum != db_checksum:
                    report.modified.add(rel_path)
                    changed_checksums[rel_path] = checksum
                    logger.trace(
                        f"Modified file detected: {rel_path}, "
                        f"mtime_changed={mtime_changed}, size_changed={size_changed}"
                    )
            else:
                # File unchanged - no checksum needed
                logger.trace(f"File unchanged (mtime/size match): {rel_path}")

        # Step 4: Detect moves (for both full and incremental scans)
        # Check if any "new" files are actually moves by matching checksums
        for new_path in list(report.new):  # Use list() to allow modification during iteration
            new_checksum = changed_checksums.get(new_path)
            if not new_checksum:
                continue

            # Look for existing entity with same checksum but different path
            # This could be a move or a copy
            existing_entities = await self.entity_repository.find_by_checksum(new_checksum)

            for candidate in existing_entities:
                if candidate.file_path == new_path:
                    # Same path, skip (shouldn't happen for "new" files but be safe)
                    continue

                # Check if the old path still exists on disk
                old_path_abs = directory / candidate.file_path
                if old_path_abs.exists():
                    # Original still exists → this is a copy, not a move
                    logger.trace(
                        f"File copy detected (not move): {candidate.file_path} copied to {new_path}"
                    )
                    continue

                # Original doesn't exist → this is a move!
                report.moves[candidate.file_path] = new_path
                report.new.remove(new_path)
                logger.trace(f"Move detected: {candidate.file_path} -> {new_path}")
                break  # Only match first candidate

        # Step 5: Detect deletions (only for full scans)
        # Incremental scans can't reliably detect deletions since they only see modified files
        if scan_type in ("full_initial", "full_deletions", "full_fallback", "full_forced"):
            # Use optimized query for just file paths (not full entities)
            db_file_paths = await self.entity_repository.get_all_file_paths()
            logger.debug(f"Found {len(db_file_paths)} db paths for deletion detection")

            for db_path in db_file_paths:
                if db_path not in scanned_paths:
                    # File in DB but not on filesystem
                    # Check if it was already detected as a move
                    if db_path in report.moves:
                        # Already handled as a move, skip
                        continue

                    # File was deleted
                    report.deleted.add(db_path)
                    logger.trace(f"Deleted file detected: {db_path}")

        # Store checksums for files that need syncing
        report.checksums = changed_checksums

        scan_duration_ms = int((time.time() - scan_start_time) * 1000)

        logger.info(
            f"Completed {scan_type} scan for directory {directory} in {scan_duration_ms}ms, "
            f"found {report.total} changes (new={len(report.new)}, "
            f"modified={len(report.modified)}, deleted={len(report.deleted)}, "
            f"moves={len(report.moves)})"
        )
        return report

    async def sync_file(
        self, path: str, new: bool = True
    ) -> Tuple[Optional[Entity], Optional[str]]:
        """Sync a single file with circuit breaker protection.

        Args:
            path: Path to file to sync
            new: Whether this is a new file

        Returns:
            Tuple of (entity, checksum) or (None, None) if sync fails or file is skipped
        """
        # Check if file should be skipped due to repeated failures
        if await self._should_skip_file(path):
            logger.warning(f"Skipping file due to repeated failures: {path}")
            return None, None

        try:
            logger.debug(
                f"Syncing file path={path} is_new={new} is_markdown={self.file_service.is_markdown(path)}"
            )

            if self.file_service.is_markdown(path):
                entity, checksum = await self.sync_markdown_file(path, new)
            else:
                entity, checksum = await self.sync_regular_file(path, new)

            if entity is not None:
                await self.search_service.index_entity(entity)

                # Clear failure tracking on successful sync
                self._clear_failure(path)

                logger.debug(
                    f"File sync completed, path={path}, entity_id={entity.id}, checksum={checksum[:8]}"
                )
            return entity, checksum

        except FileNotFoundError:
            # File exists in database but not on filesystem
            # This indicates a database/filesystem inconsistency - treat as deletion
            logger.warning(
                f"File not found during sync, treating as deletion: path={path}. "
                "This may indicate a race condition or manual file deletion."
            )
            await self.handle_delete(path)
            return None, None

        except Exception as e:
            # Check if this is a fatal error (or caused by one)
            # Fatal errors like project deletion should terminate sync immediately
            if isinstance(e, SyncFatalError) or isinstance(
                e.__cause__, SyncFatalError
            ):  # pragma: no cover
                logger.error(f"Fatal sync error encountered, terminating sync: path={path}")
                raise

            # Otherwise treat as recoverable file-level error
            error_msg = str(e)
            logger.error(f"Failed to sync file: path={path}, error={error_msg}")

            # Record failure for circuit breaker
            await self._record_failure(path, error_msg)

            return None, None

    async def sync_markdown_file(self, path: str, new: bool = True) -> Tuple[Optional[Entity], str]:
        """Sync a markdown file with full processing.

        Args:
            path: Path to markdown file
            new: Whether this is a new file

        Returns:
            Tuple of (entity, checksum)
        """
        # Parse markdown first to get any existing permalink
        logger.debug(f"Parsing markdown file, path: {path}, new: {new}")

        file_content = await self.file_service.read_file_content(path)
        file_contains_frontmatter = has_frontmatter(file_content)

        # Get file timestamps for tracking modification times
        file_metadata = await self.file_service.get_file_metadata(path)
        created = file_metadata.created_at
        modified = file_metadata.modified_at

        # Parse markdown content with file metadata (avoids redundant file read/stat)
        # This enables cloud implementations (S3FileService) to provide metadata from head_object
        abs_path = self.file_service.base_path / path
        entity_markdown = await self.entity_parser.parse_markdown_content(
            file_path=abs_path,
            content=file_content,
            mtime=file_metadata.modified_at.timestamp(),
            ctime=file_metadata.created_at.timestamp(),
        )

        # if the file contains frontmatter, resolve a permalink (unless disabled)
        if file_contains_frontmatter and not self.app_config.disable_permalinks:
            # Resolve permalink - skip conflict checks during bulk sync for performance
            permalink = await self.entity_service.resolve_permalink(
                path, markdown=entity_markdown, skip_conflict_check=True
            )

            # If permalink changed, update the file
            if permalink != entity_markdown.frontmatter.permalink:
                logger.info(
                    f"Updating permalink for path: {path}, old_permalink: {entity_markdown.frontmatter.permalink}, new_permalink: {permalink}"
                )

                entity_markdown.frontmatter.metadata["permalink"] = permalink
                await self.file_service.update_frontmatter(path, {"permalink": permalink})

        # if the file is new, create an entity
        if new:
            # Create entity with final permalink
            logger.debug(f"Creating new entity from markdown, path={path}")
            await self.entity_service.create_entity_from_markdown(Path(path), entity_markdown)

        # otherwise we need to update the entity and observations
        else:
            logger.debug(f"Updating entity from markdown, path={path}")
            await self.entity_service.update_entity_and_observations(Path(path), entity_markdown)

        # Update relations and search index
        entity = await self.entity_service.update_entity_relations(path, entity_markdown)

        # After updating relations, we need to compute the checksum again
        # This is necessary for files with wikilinks to ensure consistent checksums
        # after relation processing is complete
        final_checksum = await self.file_service.compute_checksum(path)

        # Update checksum, timestamps, and file metadata from file system
        # Store mtime/size for efficient change detection in future scans
        # This ensures temporal ordering in search and recent activity uses actual file modification times
        await self.entity_repository.update(
            entity.id,
            {
                "checksum": final_checksum,
                "created_at": created,
                "updated_at": modified,
                "mtime": file_metadata.modified_at.timestamp(),
                "size": file_metadata.size,
            },
        )

        logger.debug(
            f"Markdown sync completed: path={path}, entity_id={entity.id}, "
            f"observation_count={len(entity.observations)}, relation_count={len(entity.relations)}, "
            f"checksum={final_checksum[:8]}"
        )

        # Return the final checksum to ensure everything is consistent
        return entity, final_checksum

    async def sync_regular_file(self, path: str, new: bool = True) -> Tuple[Optional[Entity], str]:
        """Sync a non-markdown file with basic tracking.

        Args:
            path: Path to file
            new: Whether this is a new file

        Returns:
            Tuple of (entity, checksum)
        """
        checksum = await self.file_service.compute_checksum(path)
        if new:
            # Generate permalink from path - skip conflict checks during bulk sync
            await self.entity_service.resolve_permalink(path, skip_conflict_check=True)

            # get file timestamps
            file_metadata = await self.file_service.get_file_metadata(path)
            created = file_metadata.created_at
            modified = file_metadata.modified_at

            # get mime type
            content_type = self.file_service.content_type(path)

            file_path = Path(path)
            try:
                entity = await self.entity_repository.add(
                    Entity(
                        entity_type="file",
                        file_path=path,
                        checksum=checksum,
                        title=file_path.name,
                        created_at=created,
                        updated_at=modified,
                        content_type=content_type,
                        mtime=file_metadata.modified_at.timestamp(),
                        size=file_metadata.size,
                    )
                )
                return entity, checksum
            except IntegrityError as e:
                # Handle race condition where entity was created by another process
                msg = str(e)
                if (
                    "UNIQUE constraint failed: entity.file_path" in msg
                    or "uix_entity_file_path_project" in msg
                    or "duplicate key value violates unique constraint" in msg
                    and "file_path" in msg
                ):
                    logger.info(
                        f"Entity already exists for file_path={path}, updating instead of creating"
                    )
                    # Treat as update instead of create
                    entity = await self.entity_repository.get_by_file_path(path)
                    if entity is None:  # pragma: no cover
                        logger.error(f"Entity not found after constraint violation, path={path}")
                        raise ValueError(f"Entity not found after constraint violation: {path}")

                    # Re-get file metadata since we're in update path
                    file_metadata_for_update = await self.file_service.get_file_metadata(path)
                    updated = await self.entity_repository.update(
                        entity.id,
                        {
                            "file_path": path,
                            "checksum": checksum,
                            "mtime": file_metadata_for_update.modified_at.timestamp(),
                            "size": file_metadata_for_update.size,
                        },
                    )

                    if updated is None:  # pragma: no cover
                        logger.error(f"Failed to update entity, entity_id={entity.id}, path={path}")
                        raise ValueError(f"Failed to update entity with ID {entity.id}")

                    return updated, checksum
                else:
                    # Re-raise if it's a different integrity error
                    raise  # pragma: no cover
        else:
            # Get file timestamps for updating modification time
            file_metadata = await self.file_service.get_file_metadata(path)
            modified = file_metadata.modified_at

            entity = await self.entity_repository.get_by_file_path(path)
            if entity is None:  # pragma: no cover
                logger.error(f"Entity not found for existing file, path={path}")
                raise ValueError(f"Entity not found for existing file: {path}")

            # Update checksum, modification time, and file metadata from file system
            # Store mtime/size for efficient change detection in future scans
            updated = await self.entity_repository.update(
                entity.id,
                {
                    "file_path": path,
                    "checksum": checksum,
                    "updated_at": modified,
                    "mtime": file_metadata.modified_at.timestamp(),
                    "size": file_metadata.size,
                },
            )

            if updated is None:  # pragma: no cover
                logger.error(f"Failed to update entity, entity_id={entity.id}, path={path}")
                raise ValueError(f"Failed to update entity with ID {entity.id}")

            return updated, checksum

    async def handle_delete(self, file_path: str):
        """Handle complete entity deletion including search index cleanup."""

        # First get entity to get permalink before deletion
        entity = await self.entity_repository.get_by_file_path(file_path)
        if entity:
            logger.info(
                f"Deleting entity with file_path={file_path}, entity_id={entity.id}, permalink={entity.permalink}"
            )

            # Delete from db (this cascades to observations/relations)
            await self.entity_service.delete_entity_by_file_path(file_path)

            # Clean up search index
            permalinks = (
                [entity.permalink]
                + [o.permalink for o in entity.observations]
                + [r.permalink for r in entity.relations]
            )

            logger.debug(
                f"Cleaning up search index for entity_id={entity.id}, file_path={file_path}, "
                f"index_entries={len(permalinks)}"
            )

            for permalink in permalinks:
                if permalink:
                    await self.search_service.delete_by_permalink(permalink)
                else:
                    await self.search_service.delete_by_entity_id(entity.id)

    async def handle_move(self, old_path, new_path):
        logger.debug("Moving entity", old_path=old_path, new_path=new_path)

        entity = await self.entity_repository.get_by_file_path(old_path)
        if entity:
            # Check if destination path is already occupied by another entity
            existing_at_destination = await self.entity_repository.get_by_file_path(new_path)
            if existing_at_destination and existing_at_destination.id != entity.id:
                # Handle the conflict - this could be a file swap or replacement scenario
                logger.warning(
                    f"File path conflict detected during move: "
                    f"entity_id={entity.id} trying to move from '{old_path}' to '{new_path}', "
                    f"but entity_id={existing_at_destination.id} already occupies '{new_path}'"
                )

                # Check if this is a file swap (the destination entity is being moved to our old path)
                # This would indicate a simultaneous move operation
                old_path_after_swap = await self.entity_repository.get_by_file_path(old_path)
                if old_path_after_swap and old_path_after_swap.id == existing_at_destination.id:
                    logger.info(f"Detected file swap between '{old_path}' and '{new_path}'")
                    # This is a swap scenario - both moves should succeed
                    # We'll allow this to proceed since the other file has moved out
                else:
                    # This is a conflict where the destination is occupied
                    raise ValueError(
                        f"Cannot move entity from '{old_path}' to '{new_path}': "
                        f"destination path is already occupied by another file. "
                        f"This may be caused by: "
                        f"1. Conflicting file names with different character encodings, "
                        f"2. Case sensitivity differences (e.g., 'Finance/' vs 'finance/'), "
                        f"3. Character conflicts between hyphens in filenames and generated permalinks, "
                        f"4. Files with similar names containing special characters. "
                        f"Try renaming one of the conflicting files to resolve this issue."
                    )

            # Update file_path in all cases
            updates = {"file_path": new_path}

            # If configured, also update permalink to match new path
            if (
                self.app_config.update_permalinks_on_move
                and not self.app_config.disable_permalinks
                and self.file_service.is_markdown(new_path)
            ):
                # generate new permalink value - skip conflict checks during bulk sync
                new_permalink = await self.entity_service.resolve_permalink(
                    new_path, skip_conflict_check=True
                )

                # write to file and get new checksum
                new_checksum = await self.file_service.update_frontmatter(
                    new_path, {"permalink": new_permalink}
                )

                updates["permalink"] = new_permalink
                updates["checksum"] = new_checksum

                logger.info(
                    f"Updating permalink on move,old_permalink={entity.permalink}"
                    f"new_permalink={new_permalink}"
                    f"new_checksum={new_checksum}"
                )

            try:
                updated = await self.entity_repository.update(entity.id, updates)
            except Exception as e:
                # Catch any database integrity errors and provide helpful context
                if "UNIQUE constraint failed" in str(e):
                    logger.error(
                        f"Database constraint violation during move: "
                        f"entity_id={entity.id}, old_path='{old_path}', new_path='{new_path}'"
                    )
                    raise ValueError(
                        f"Cannot complete move from '{old_path}' to '{new_path}': "
                        f"a database constraint was violated. This usually indicates "
                        f"a file path or permalink conflict. Please check for: "
                        f"1. Duplicate file names, "
                        f"2. Case sensitivity issues (e.g., 'File.md' vs 'file.md'), "
                        f"3. Character encoding conflicts in file names."
                    ) from e
                else:
                    # Re-raise other exceptions as-is
                    raise

            if updated is None:  # pragma: no cover
                logger.error(
                    "Failed to update entity path"
                    f"entity_id={entity.id}"
                    f"old_path={old_path}"
                    f"new_path={new_path}"
                )
                raise ValueError(f"Failed to update entity path for ID {entity.id}")

            logger.debug(
                "Entity path updated"
                f"entity_id={entity.id} "
                f"permalink={entity.permalink} "
                f"old_path={old_path} "
                f"new_path={new_path} "
            )

            # update search index
            await self.search_service.index_entity(updated)

    async def resolve_relations(self, entity_id: int | None = None):
        """Try to resolve unresolved relations.

        Args:
            entity_id: If provided, only resolve relations for this specific entity.
                      Otherwise, resolve all unresolved relations in the database.
        """

        if entity_id:
            # Only get unresolved relations for the specific entity
            unresolved_relations = (
                await self.relation_repository.find_unresolved_relations_for_entity(entity_id)
            )
            logger.info(
                f"Resolving forward references for entity {entity_id}",
                count=len(unresolved_relations),
            )
        else:
            # Get all unresolved relations (original behavior)
            unresolved_relations = await self.relation_repository.find_unresolved_relations()
            logger.info("Resolving all forward references", count=len(unresolved_relations))

        for relation in unresolved_relations:
            logger.trace(
                "Attempting to resolve relation "
                f"relation_id={relation.id} "
                f"from_id={relation.from_id} "
                f"to_name={relation.to_name}"
            )

            resolved_entity = await self.entity_service.link_resolver.resolve_link(relation.to_name)

            # ignore reference to self
            if resolved_entity and resolved_entity.id != relation.from_id:
                logger.debug(
                    "Resolved forward reference "
                    f"relation_id={relation.id} "
                    f"from_id={relation.from_id} "
                    f"to_name={relation.to_name} "
                    f"resolved_id={resolved_entity.id} "
                    f"resolved_title={resolved_entity.title}",
                )
                try:
                    await self.relation_repository.update(
                        relation.id,
                        {
                            "to_id": resolved_entity.id,
                            "to_name": resolved_entity.title,
                        },
                    )
                    # update search index only on successful resolution
                    await self.search_service.index_entity(resolved_entity)
                except IntegrityError:
                    # IntegrityError means a relation with this (from_id, to_id, relation_type)
                    # already exists. The UPDATE was rolled back, so our unresolved relation
                    # (to_id=NULL) still exists in the database. We delete it because:
                    # 1. It's redundant - a resolved relation already captures this relationship
                    # 2. If we don't delete it, future syncs will try to resolve it again
                    #    and get the same IntegrityError
                    logger.debug(
                        "Deleting duplicate unresolved relation "
                        f"relation_id={relation.id} "
                        f"from_id={relation.from_id} "
                        f"to_name={relation.to_name} "
                        f"resolved_to_id={resolved_entity.id}"
                    )
                    try:
                        await self.relation_repository.delete(relation.id)
                    except Exception as e:
                        # Log but don't fail - the relation may have been deleted already
                        logger.debug(f"Could not delete duplicate relation {relation.id}: {e}")

    async def _quick_count_files(self, directory: Path) -> int:
        """Fast file count using find command.

        Uses subprocess to leverage OS-level file counting which is much faster
        than Python iteration, especially on network filesystems like TigrisFS.

        On Windows, subprocess is not supported with SelectorEventLoop (which we use
        to avoid aiosqlite cleanup issues), so we fall back to Python-based counting.

        Args:
            directory: Directory to count files in

        Returns:
            Number of files in directory (recursive)
        """
        # Windows with SelectorEventLoop doesn't support subprocess
        if sys.platform == "win32":
            count = 0
            async for _ in self.scan_directory(directory):
                count += 1
            return count

        process = await asyncio.create_subprocess_shell(
            f'find "{directory}" -type f | wc -l',
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
        )
        stdout, stderr = await process.communicate()

        if process.returncode != 0:
            error_msg = stderr.decode().strip()
            logger.error(
                f"FILE COUNT OPTIMIZATION FAILED: find command failed with exit code {process.returncode}, "
                f"error: {error_msg}. Falling back to manual count. "
                f"This will slow down watermark detection!"
            )
            # Fallback: count using scan_directory
            count = 0
            async for _ in self.scan_directory(directory):
                count += 1
            return count

        return int(stdout.strip())

    async def _scan_directory_modified_since(
        self, directory: Path, since_timestamp: float
    ) -> List[str]:
        """Use find -newermt for filesystem-level filtering of modified files.

        This is dramatically faster than scanning all files and comparing mtimes,
        especially on network filesystems like TigrisFS where stat operations are expensive.

        On Windows, subprocess is not supported with SelectorEventLoop (which we use
        to avoid aiosqlite cleanup issues), so we implement mtime filtering in Python.

        Args:
            directory: Directory to scan
            since_timestamp: Unix timestamp to find files newer than

        Returns:
            List of relative file paths modified since the timestamp (respects .bmignore)
        """
        # Windows with SelectorEventLoop doesn't support subprocess
        # Implement mtime filtering in Python to preserve watermark optimization
        if sys.platform == "win32":
            file_paths = []
            async for file_path_str, stat_info in self.scan_directory(directory):
                if stat_info.st_mtime > since_timestamp:
                    rel_path = Path(file_path_str).relative_to(directory).as_posix()
                    file_paths.append(rel_path)
            return file_paths

        # Convert timestamp to find-compatible format
        since_date = datetime.fromtimestamp(since_timestamp).strftime("%Y-%m-%d %H:%M:%S")

        process = await asyncio.create_subprocess_shell(
            f'find "{directory}" -type f -newermt "{since_date}"',
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
        )
        stdout, stderr = await process.communicate()

        if process.returncode != 0:
            error_msg = stderr.decode().strip()
            logger.error(
                f"SCAN OPTIMIZATION FAILED: find -newermt command failed with exit code {process.returncode}, "
                f"error: {error_msg}. Falling back to full scan. "
                f"This will cause slow syncs on large projects!"
            )
            # Fallback to full scan
            return await self._scan_directory_full(directory)

        # Convert absolute paths to relative and filter through ignore patterns
        file_paths = []
        for line in stdout.decode().splitlines():
            if line:
                try:
                    abs_path = Path(line)
                    rel_path = abs_path.relative_to(directory).as_posix()

                    # Apply ignore patterns (same as scan_directory)
                    if should_ignore_path(abs_path, directory, self._ignore_patterns):
                        logger.trace(f"Ignoring path per .bmignore: {rel_path}")
                        continue

                    file_paths.append(rel_path)
                except ValueError:
                    # Path is not relative to directory, skip it
                    logger.warning(f"Skipping file not under directory: {line}")
                    continue

        return file_paths

    async def _scan_directory_full(self, directory: Path) -> List[str]:
        """Full directory scan returning all file paths.

        Uses scan_directory() which respects .bmignore patterns.

        Args:
            directory: Directory to scan

        Returns:
            List of relative file paths (respects .bmignore)
        """
        file_paths = []
        async for file_path_str, _ in self.scan_directory(directory):
            rel_path = Path(file_path_str).relative_to(directory).as_posix()
            file_paths.append(rel_path)
        return file_paths

    async def scan_directory(self, directory: Path) -> AsyncIterator[Tuple[str, os.stat_result]]:
        """Stream files from directory using aiofiles.os.scandir() with cached stat info.

        This method uses aiofiles.os.scandir() to leverage async I/O and cached stat
        information from directory entries. This reduces network I/O by 50% on network
        filesystems like TigrisFS by avoiding redundant stat() calls.

        Args:
            directory: Directory to scan

        Yields:
            Tuples of (absolute_file_path, stat_info) for each file
        """
        try:
            entries = await aiofiles.os.scandir(directory)
        except PermissionError:
            logger.warning(f"Permission denied scanning directory: {directory}")
            return

        results = []
        subdirs = []

        for entry in entries:
            entry_path = Path(entry.path)

            # Check ignore patterns
            if should_ignore_path(entry_path, directory, self._ignore_patterns):
                logger.trace(f"Ignoring path per .bmignore: {entry_path.relative_to(directory)}")
                continue

            if entry.is_dir(follow_symlinks=False):
                # Collect subdirectories to recurse into
                subdirs.append(entry_path)
            elif entry.is_file(follow_symlinks=False):
                # Get cached stat info (no extra syscall!)
                stat_info = entry.stat(follow_symlinks=False)
                results.append((entry.path, stat_info))

        # Yield files from current directory
        for file_path, stat_info in results:
            yield (file_path, stat_info)

        # Recurse into subdirectories
        for subdir in subdirs:
            async for result in self.scan_directory(subdir):
                yield result


async def get_sync_service(project: Project) -> SyncService:  # pragma: no cover
    """Get sync service instance with all dependencies."""

    app_config = ConfigManager().config
    _, session_maker = await db.get_or_create_db(
        db_path=app_config.database_path, db_type=db.DatabaseType.FILESYSTEM
    )

    project_path = Path(project.path)
    entity_parser = EntityParser(project_path)
    markdown_processor = MarkdownProcessor(entity_parser, app_config=app_config)
    file_service = FileService(project_path, markdown_processor, app_config=app_config)

    # Initialize repositories
    entity_repository = EntityRepository(session_maker, project_id=project.id)
    observation_repository = ObservationRepository(session_maker, project_id=project.id)
    relation_repository = RelationRepository(session_maker, project_id=project.id)
    search_repository = create_search_repository(session_maker, project_id=project.id)
    project_repository = ProjectRepository(session_maker)

    # Initialize services
    search_service = SearchService(search_repository, entity_repository, file_service)
    link_resolver = LinkResolver(entity_repository, search_service)

    # Initialize services
    entity_service = EntityService(
        entity_parser,
        entity_repository,
        observation_repository,
        relation_repository,
        file_service,
        link_resolver,
    )

    # Create sync service
    sync_service = SyncService(
        app_config=app_config,
        entity_service=entity_service,
        entity_parser=entity_parser,
        entity_repository=entity_repository,
        relation_repository=relation_repository,
        project_repository=project_repository,
        search_service=search_service,
        file_service=file_service,
    )

    return sync_service

```
Page 17/19FirstPrevNextLast