This is page 15 of 17. Use http://codebase.md/basicmachines-co/basic-memory?page={x} to view the full context.
# Directory Structure
```
├── .claude
│   ├── agents
│   │   ├── python-developer.md
│   │   └── system-architect.md
│   └── commands
│       ├── release
│       │   ├── beta.md
│       │   ├── changelog.md
│       │   ├── release-check.md
│       │   └── release.md
│       ├── spec.md
│       └── test-live.md
├── .dockerignore
├── .github
│   ├── dependabot.yml
│   ├── ISSUE_TEMPLATE
│   │   ├── bug_report.md
│   │   ├── config.yml
│   │   ├── documentation.md
│   │   └── feature_request.md
│   └── workflows
│       ├── claude-code-review.yml
│       ├── claude-issue-triage.yml
│       ├── claude.yml
│       ├── dev-release.yml
│       ├── docker.yml
│       ├── pr-title.yml
│       ├── release.yml
│       └── test.yml
├── .gitignore
├── .python-version
├── CHANGELOG.md
├── CITATION.cff
├── CLA.md
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── docker-compose.yml
├── Dockerfile
├── docs
│   ├── ai-assistant-guide-extended.md
│   ├── character-handling.md
│   ├── cloud-cli.md
│   └── Docker.md
├── justfile
├── LICENSE
├── llms-install.md
├── pyproject.toml
├── README.md
├── SECURITY.md
├── smithery.yaml
├── specs
│   ├── SPEC-1 Specification-Driven Development Process.md
│   ├── SPEC-10 Unified Deployment Workflow and Event Tracking.md
│   ├── SPEC-11 Basic Memory API Performance Optimization.md
│   ├── SPEC-12 OpenTelemetry Observability.md
│   ├── SPEC-13 CLI Authentication with Subscription Validation.md
│   ├── SPEC-14 Cloud Git Versioning & GitHub Backup.md
│   ├── SPEC-14- Cloud Git Versioning & GitHub Backup.md
│   ├── SPEC-15 Configuration Persistence via Tigris for Cloud Tenants.md
│   ├── SPEC-16 MCP Cloud Service Consolidation.md
│   ├── SPEC-17 Semantic Search with ChromaDB.md
│   ├── SPEC-18 AI Memory Management Tool.md
│   ├── SPEC-19 Sync Performance and Memory Optimization.md
│   ├── SPEC-2 Slash Commands Reference.md
│   ├── SPEC-3 Agent Definitions.md
│   ├── SPEC-4 Notes Web UI Component Architecture.md
│   ├── SPEC-5 CLI Cloud Upload via WebDAV.md
│   ├── SPEC-6 Explicit Project Parameter Architecture.md
│   ├── SPEC-7 POC to spike Tigris Turso for local access to cloud data.md
│   ├── SPEC-8 TigrisFS Integration.md
│   ├── SPEC-9 Multi-Project Bidirectional Sync Architecture.md
│   ├── SPEC-9 Signed Header Tenant Information.md
│   └── SPEC-9-1 Follow-Ups- Conflict, Sync, and Observability.md
├── src
│   └── basic_memory
│       ├── __init__.py
│       ├── alembic
│       │   ├── alembic.ini
│       │   ├── env.py
│       │   ├── migrations.py
│       │   ├── script.py.mako
│       │   └── versions
│       │       ├── 3dae7c7b1564_initial_schema.py
│       │       ├── 502b60eaa905_remove_required_from_entity_permalink.py
│       │       ├── 5fe1ab1ccebe_add_projects_table.py
│       │       ├── 647e7a75e2cd_project_constraint_fix.py
│       │       ├── 9d9c1cb7d8f5_add_mtime_and_size_columns_to_entity_.py
│       │       ├── a1b2c3d4e5f6_fix_project_foreign_keys.py
│       │       ├── b3c3938bacdb_relation_to_name_unique_index.py
│       │       ├── cc7172b46608_update_search_index_schema.py
│       │       └── e7e1f4367280_add_scan_watermark_tracking_to_project.py
│       ├── api
│       │   ├── __init__.py
│       │   ├── app.py
│       │   ├── routers
│       │   │   ├── __init__.py
│       │   │   ├── directory_router.py
│       │   │   ├── importer_router.py
│       │   │   ├── knowledge_router.py
│       │   │   ├── management_router.py
│       │   │   ├── memory_router.py
│       │   │   ├── project_router.py
│       │   │   ├── prompt_router.py
│       │   │   ├── resource_router.py
│       │   │   ├── search_router.py
│       │   │   └── utils.py
│       │   └── template_loader.py
│       ├── cli
│       │   ├── __init__.py
│       │   ├── app.py
│       │   ├── auth.py
│       │   ├── commands
│       │   │   ├── __init__.py
│       │   │   ├── cloud
│       │   │   │   ├── __init__.py
│       │   │   │   ├── api_client.py
│       │   │   │   ├── bisync_commands.py
│       │   │   │   ├── cloud_utils.py
│       │   │   │   ├── core_commands.py
│       │   │   │   ├── mount_commands.py
│       │   │   │   ├── rclone_config.py
│       │   │   │   ├── rclone_installer.py
│       │   │   │   ├── upload_command.py
│       │   │   │   └── upload.py
│       │   │   ├── command_utils.py
│       │   │   ├── db.py
│       │   │   ├── import_chatgpt.py
│       │   │   ├── import_claude_conversations.py
│       │   │   ├── import_claude_projects.py
│       │   │   ├── import_memory_json.py
│       │   │   ├── mcp.py
│       │   │   ├── project.py
│       │   │   ├── status.py
│       │   │   ├── sync.py
│       │   │   └── tool.py
│       │   └── main.py
│       ├── config.py
│       ├── db.py
│       ├── deps.py
│       ├── file_utils.py
│       ├── ignore_utils.py
│       ├── importers
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── chatgpt_importer.py
│       │   ├── claude_conversations_importer.py
│       │   ├── claude_projects_importer.py
│       │   ├── memory_json_importer.py
│       │   └── utils.py
│       ├── markdown
│       │   ├── __init__.py
│       │   ├── entity_parser.py
│       │   ├── markdown_processor.py
│       │   ├── plugins.py
│       │   ├── schemas.py
│       │   └── utils.py
│       ├── mcp
│       │   ├── __init__.py
│       │   ├── async_client.py
│       │   ├── project_context.py
│       │   ├── prompts
│       │   │   ├── __init__.py
│       │   │   ├── ai_assistant_guide.py
│       │   │   ├── continue_conversation.py
│       │   │   ├── recent_activity.py
│       │   │   ├── search.py
│       │   │   └── utils.py
│       │   ├── resources
│       │   │   ├── ai_assistant_guide.md
│       │   │   └── project_info.py
│       │   ├── server.py
│       │   └── tools
│       │       ├── __init__.py
│       │       ├── build_context.py
│       │       ├── canvas.py
│       │       ├── chatgpt_tools.py
│       │       ├── delete_note.py
│       │       ├── edit_note.py
│       │       ├── list_directory.py
│       │       ├── move_note.py
│       │       ├── project_management.py
│       │       ├── read_content.py
│       │       ├── read_note.py
│       │       ├── recent_activity.py
│       │       ├── search.py
│       │       ├── utils.py
│       │       ├── view_note.py
│       │       └── write_note.py
│       ├── models
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── knowledge.py
│       │   ├── project.py
│       │   └── search.py
│       ├── repository
│       │   ├── __init__.py
│       │   ├── entity_repository.py
│       │   ├── observation_repository.py
│       │   ├── project_info_repository.py
│       │   ├── project_repository.py
│       │   ├── relation_repository.py
│       │   ├── repository.py
│       │   └── search_repository.py
│       ├── schemas
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── cloud.py
│       │   ├── delete.py
│       │   ├── directory.py
│       │   ├── importer.py
│       │   ├── memory.py
│       │   ├── project_info.py
│       │   ├── prompt.py
│       │   ├── request.py
│       │   ├── response.py
│       │   ├── search.py
│       │   └── sync_report.py
│       ├── services
│       │   ├── __init__.py
│       │   ├── context_service.py
│       │   ├── directory_service.py
│       │   ├── entity_service.py
│       │   ├── exceptions.py
│       │   ├── file_service.py
│       │   ├── initialization.py
│       │   ├── link_resolver.py
│       │   ├── project_service.py
│       │   ├── search_service.py
│       │   └── service.py
│       ├── sync
│       │   ├── __init__.py
│       │   ├── background_sync.py
│       │   ├── sync_service.py
│       │   └── watch_service.py
│       ├── templates
│       │   └── prompts
│       │       ├── continue_conversation.hbs
│       │       └── search.hbs
│       └── utils.py
├── test-int
│   ├── BENCHMARKS.md
│   ├── cli
│   │   ├── test_project_commands_integration.py
│   │   ├── test_sync_commands_integration.py
│   │   └── test_version_integration.py
│   ├── conftest.py
│   ├── mcp
│   │   ├── test_build_context_underscore.py
│   │   ├── test_build_context_validation.py
│   │   ├── test_chatgpt_tools_integration.py
│   │   ├── test_default_project_mode_integration.py
│   │   ├── test_delete_note_integration.py
│   │   ├── test_edit_note_integration.py
│   │   ├── test_list_directory_integration.py
│   │   ├── test_move_note_integration.py
│   │   ├── test_project_management_integration.py
│   │   ├── test_project_state_sync_integration.py
│   │   ├── test_read_content_integration.py
│   │   ├── test_read_note_integration.py
│   │   ├── test_search_integration.py
│   │   ├── test_single_project_mcp_integration.py
│   │   └── test_write_note_integration.py
│   ├── test_db_wal_mode.py
│   ├── test_disable_permalinks_integration.py
│   └── test_sync_performance_benchmark.py
├── tests
│   ├── __init__.py
│   ├── api
│   │   ├── conftest.py
│   │   ├── test_async_client.py
│   │   ├── test_continue_conversation_template.py
│   │   ├── test_directory_router.py
│   │   ├── test_importer_router.py
│   │   ├── test_knowledge_router.py
│   │   ├── test_management_router.py
│   │   ├── test_memory_router.py
│   │   ├── test_project_router_operations.py
│   │   ├── test_project_router.py
│   │   ├── test_prompt_router.py
│   │   ├── test_relation_background_resolution.py
│   │   ├── test_resource_router.py
│   │   ├── test_search_router.py
│   │   ├── test_search_template.py
│   │   ├── test_template_loader_helpers.py
│   │   └── test_template_loader.py
│   ├── cli
│   │   ├── conftest.py
│   │   ├── test_bisync_commands.py
│   │   ├── test_cli_tools.py
│   │   ├── test_cloud_authentication.py
│   │   ├── test_cloud_utils.py
│   │   ├── test_ignore_utils.py
│   │   ├── test_import_chatgpt.py
│   │   ├── test_import_claude_conversations.py
│   │   ├── test_import_claude_projects.py
│   │   ├── test_import_memory_json.py
│   │   └── test_upload.py
│   ├── conftest.py
│   ├── db
│   │   └── test_issue_254_foreign_key_constraints.py
│   ├── importers
│   │   ├── test_importer_base.py
│   │   └── test_importer_utils.py
│   ├── markdown
│   │   ├── __init__.py
│   │   ├── test_date_frontmatter_parsing.py
│   │   ├── test_entity_parser_error_handling.py
│   │   ├── test_entity_parser.py
│   │   ├── test_markdown_plugins.py
│   │   ├── test_markdown_processor.py
│   │   ├── test_observation_edge_cases.py
│   │   ├── test_parser_edge_cases.py
│   │   ├── test_relation_edge_cases.py
│   │   └── test_task_detection.py
│   ├── mcp
│   │   ├── conftest.py
│   │   ├── test_obsidian_yaml_formatting.py
│   │   ├── test_permalink_collision_file_overwrite.py
│   │   ├── test_prompts.py
│   │   ├── test_resources.py
│   │   ├── test_tool_build_context.py
│   │   ├── test_tool_canvas.py
│   │   ├── test_tool_delete_note.py
│   │   ├── test_tool_edit_note.py
│   │   ├── test_tool_list_directory.py
│   │   ├── test_tool_move_note.py
│   │   ├── test_tool_read_content.py
│   │   ├── test_tool_read_note.py
│   │   ├── test_tool_recent_activity.py
│   │   ├── test_tool_resource.py
│   │   ├── test_tool_search.py
│   │   ├── test_tool_utils.py
│   │   ├── test_tool_view_note.py
│   │   ├── test_tool_write_note.py
│   │   └── tools
│   │       └── test_chatgpt_tools.py
│   ├── Non-MarkdownFileSupport.pdf
│   ├── repository
│   │   ├── test_entity_repository_upsert.py
│   │   ├── test_entity_repository.py
│   │   ├── test_entity_upsert_issue_187.py
│   │   ├── test_observation_repository.py
│   │   ├── test_project_info_repository.py
│   │   ├── test_project_repository.py
│   │   ├── test_relation_repository.py
│   │   ├── test_repository.py
│   │   ├── test_search_repository_edit_bug_fix.py
│   │   └── test_search_repository.py
│   ├── schemas
│   │   ├── test_base_timeframe_minimum.py
│   │   ├── test_memory_serialization.py
│   │   ├── test_memory_url_validation.py
│   │   ├── test_memory_url.py
│   │   ├── test_schemas.py
│   │   └── test_search.py
│   ├── Screenshot.png
│   ├── services
│   │   ├── test_context_service.py
│   │   ├── test_directory_service.py
│   │   ├── test_entity_service_disable_permalinks.py
│   │   ├── test_entity_service.py
│   │   ├── test_file_service.py
│   │   ├── test_initialization.py
│   │   ├── test_link_resolver.py
│   │   ├── test_project_removal_bug.py
│   │   ├── test_project_service_operations.py
│   │   ├── test_project_service.py
│   │   └── test_search_service.py
│   ├── sync
│   │   ├── test_character_conflicts.py
│   │   ├── test_sync_service_incremental.py
│   │   ├── test_sync_service.py
│   │   ├── test_sync_wikilink_issue.py
│   │   ├── test_tmp_files.py
│   │   ├── test_watch_service_edge_cases.py
│   │   ├── test_watch_service_reload.py
│   │   └── test_watch_service.py
│   ├── test_config.py
│   ├── test_db_migration_deduplication.py
│   ├── test_deps.py
│   ├── test_production_cascade_delete.py
│   └── utils
│       ├── test_file_utils.py
│       ├── test_frontmatter_obsidian_compatible.py
│       ├── test_parse_tags.py
│       ├── test_permalink_formatting.py
│       ├── test_utf8_handling.py
│       └── test_validate_project_path.py
├── uv.lock
├── v0.15.0-RELEASE-DOCS.md
└── v15-docs
    ├── api-performance.md
    ├── background-relations.md
    ├── basic-memory-home.md
    ├── bug-fixes.md
    ├── chatgpt-integration.md
    ├── cloud-authentication.md
    ├── cloud-bisync.md
    ├── cloud-mode-usage.md
    ├── cloud-mount.md
    ├── default-project-mode.md
    ├── env-file-removal.md
    ├── env-var-overrides.md
    ├── explicit-project-parameter.md
    ├── gitignore-integration.md
    ├── project-root-env-var.md
    ├── README.md
    └── sqlite-performance.md
```
# Files
--------------------------------------------------------------------------------
/src/basic_memory/sync/sync_service.py:
--------------------------------------------------------------------------------
```python
"""Service for syncing files between filesystem and database."""
import asyncio
import os
import time
from collections import OrderedDict
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import AsyncIterator, Dict, List, Optional, Set, Tuple
import aiofiles.os
import logfire
from loguru import logger
from sqlalchemy.exc import IntegrityError
from basic_memory import db
from basic_memory.config import BasicMemoryConfig, ConfigManager
from basic_memory.file_utils import has_frontmatter
from basic_memory.ignore_utils import load_bmignore_patterns, should_ignore_path
from basic_memory.markdown import EntityParser, MarkdownProcessor
from basic_memory.models import Entity, Project
from basic_memory.repository import (
    EntityRepository,
    RelationRepository,
    ObservationRepository,
    ProjectRepository,
)
from basic_memory.repository.search_repository import SearchRepository
from basic_memory.services import EntityService, FileService
from basic_memory.services.exceptions import SyncFatalError
from basic_memory.services.link_resolver import LinkResolver
from basic_memory.services.search_service import SearchService
# Circuit breaker configuration
MAX_CONSECUTIVE_FAILURES = 3
@dataclass
class FileFailureInfo:
    """Track failure information for a file that repeatedly fails to sync.
    Attributes:
        count: Number of consecutive failures
        first_failure: Timestamp of first failure in current sequence
        last_failure: Timestamp of most recent failure
        last_error: Error message from most recent failure
        last_checksum: Checksum of file when it last failed (for detecting file changes)
    """
    count: int
    first_failure: datetime
    last_failure: datetime
    last_error: str
    last_checksum: str
@dataclass
class SkippedFile:
    """Information about a file that was skipped due to repeated failures.
    Attributes:
        path: File path relative to project root
        reason: Error message from last failure
        failure_count: Number of consecutive failures
        first_failed: Timestamp of first failure
    """
    path: str
    reason: str
    failure_count: int
    first_failed: datetime
@dataclass
class SyncReport:
    """Report of file changes found compared to database state.
    Attributes:
        total: Total number of files in directory being synced
        new: Files that exist on disk but not in database
        modified: Files that exist in both but have different checksums
        deleted: Files that exist in database but not on disk
        moves: Files that have been moved from one location to another
        checksums: Current checksums for files on disk
        skipped_files: Files that were skipped due to repeated failures
    """
    # We keep paths as strings in sets/dicts for easier serialization
    new: Set[str] = field(default_factory=set)
    modified: Set[str] = field(default_factory=set)
    deleted: Set[str] = field(default_factory=set)
    moves: Dict[str, str] = field(default_factory=dict)  # old_path -> new_path
    checksums: Dict[str, str] = field(default_factory=dict)  # path -> checksum
    skipped_files: List[SkippedFile] = field(default_factory=list)
    @property
    def total(self) -> int:
        """Total number of changes."""
        return len(self.new) + len(self.modified) + len(self.deleted) + len(self.moves)
@dataclass
class ScanResult:
    """Result of scanning a directory."""
    # file_path -> checksum
    files: Dict[str, str] = field(default_factory=dict)
    # checksum -> file_path
    checksums: Dict[str, str] = field(default_factory=dict)
    # file_path -> error message
    errors: Dict[str, str] = field(default_factory=dict)
class SyncService:
    """Syncs documents and knowledge files with database."""
    def __init__(
        self,
        app_config: BasicMemoryConfig,
        entity_service: EntityService,
        entity_parser: EntityParser,
        entity_repository: EntityRepository,
        relation_repository: RelationRepository,
        project_repository: ProjectRepository,
        search_service: SearchService,
        file_service: FileService,
    ):
        self.app_config = app_config
        self.entity_service = entity_service
        self.entity_parser = entity_parser
        self.entity_repository = entity_repository
        self.relation_repository = relation_repository
        self.project_repository = project_repository
        self.search_service = search_service
        self.file_service = file_service
        # Load ignore patterns once at initialization for performance
        self._ignore_patterns = load_bmignore_patterns()
        # Circuit breaker: track file failures to prevent infinite retry loops
        # Use OrderedDict for LRU behavior with bounded size to prevent unbounded memory growth
        self._file_failures: OrderedDict[str, FileFailureInfo] = OrderedDict()
        self._max_tracked_failures = 100  # Limit failure cache size
    async def _should_skip_file(self, path: str) -> bool:
        """Check if file should be skipped due to repeated failures.
        Computes current file checksum and compares with last failed checksum.
        If checksums differ, file has changed and we should retry.
        Args:
            path: File path to check
        Returns:
            True if file should be skipped, False otherwise
        """
        if path not in self._file_failures:
            return False
        failure_info = self._file_failures[path]
        # Check if failure count exceeds threshold
        if failure_info.count < MAX_CONSECUTIVE_FAILURES:
            return False
        # Compute current checksum to see if file changed
        try:
            current_checksum = await self.file_service.compute_checksum(path)
            # If checksum changed, file was modified - reset and retry
            if current_checksum != failure_info.last_checksum:
                logger.info(
                    f"File {path} changed since last failure (checksum differs), "
                    f"resetting failure count and retrying"
                )
                del self._file_failures[path]
                return False
        except Exception as e:
            # If we can't compute checksum, log but still skip to avoid infinite loops
            logger.warning(f"Failed to compute checksum for {path}: {e}")
        # File unchanged and exceeded threshold - skip it
        return True
    async def _record_failure(self, path: str, error: str) -> None:
        """Record a file sync failure for circuit breaker tracking.
        Uses LRU cache with bounded size to prevent unbounded memory growth.
        Args:
            path: File path that failed
            error: Error message from the failure
        """
        now = datetime.now()
        # Compute checksum for failure tracking
        try:
            checksum = await self.file_service.compute_checksum(path)
        except Exception:
            # If checksum fails, use empty string (better than crashing)
            checksum = ""
        if path in self._file_failures:
            # Update existing failure record and move to end (most recently used)
            failure_info = self._file_failures.pop(path)
            failure_info.count += 1
            failure_info.last_failure = now
            failure_info.last_error = error
            failure_info.last_checksum = checksum
            self._file_failures[path] = failure_info
            logger.warning(
                f"File sync failed (attempt {failure_info.count}/{MAX_CONSECUTIVE_FAILURES}): "
                f"path={path}, error={error}"
            )
            # Record metric for file failure
            logfire.metric_counter("sync.circuit_breaker.failures").add(1)
            # Log when threshold is reached
            if failure_info.count >= MAX_CONSECUTIVE_FAILURES:
                logger.error(
                    f"File {path} has failed {MAX_CONSECUTIVE_FAILURES} times and will be skipped. "
                    f"First failure: {failure_info.first_failure}, Last error: {error}"
                )
                # Record metric for file being blocked by circuit breaker
                logfire.metric_counter("sync.circuit_breaker.blocked_files").add(1)
        else:
            # Create new failure record
            self._file_failures[path] = FileFailureInfo(
                count=1,
                first_failure=now,
                last_failure=now,
                last_error=error,
                last_checksum=checksum,
            )
            logger.debug(f"Recording first failure for {path}: {error}")
            # Enforce cache size limit - remove oldest entry if over limit
            if len(self._file_failures) > self._max_tracked_failures:
                removed_path, removed_info = self._file_failures.popitem(last=False)
                logger.debug(
                    f"Evicting oldest failure record from cache: path={removed_path}, "
                    f"failures={removed_info.count}"
                )
    def _clear_failure(self, path: str) -> None:
        """Clear failure tracking for a file after successful sync.
        Args:
            path: File path that successfully synced
        """
        if path in self._file_failures:
            logger.info(f"Clearing failure history for {path} after successful sync")
            del self._file_failures[path]
    @logfire.instrument()
    async def sync(self, directory: Path, project_name: Optional[str] = None) -> SyncReport:
        """Sync all files with database and update scan watermark."""
        start_time = time.time()
        sync_start_timestamp = time.time()  # Capture at start for watermark
        logger.info(f"Sync operation started for directory: {directory}")
        # initial paths from db to sync
        # path -> checksum
        report = await self.scan(directory)
        # order of sync matters to resolve relations effectively
        logger.info(
            f"Sync changes detected: new_files={len(report.new)}, modified_files={len(report.modified)}, "
            + f"deleted_files={len(report.deleted)}, moved_files={len(report.moves)}"
        )
        # sync moves first
        with logfire.span("process_moves", move_count=len(report.moves)):
            for old_path, new_path in report.moves.items():
                # in the case where a file has been deleted and replaced by another file
                # it will show up in the move and modified lists, so handle it in modified
                if new_path in report.modified:
                    report.modified.remove(new_path)
                    logger.debug(
                        f"File marked as moved and modified: old_path={old_path}, new_path={new_path}"
                    )
                else:
                    await self.handle_move(old_path, new_path)
        # deleted next
        with logfire.span("process_deletes", delete_count=len(report.deleted)):
            for path in report.deleted:
                await self.handle_delete(path)
        # then new and modified
        with logfire.span("process_new_files", new_count=len(report.new)):
            for path in report.new:
                entity, _ = await self.sync_file(path, new=True)
                # Track if file was skipped
                if entity is None and await self._should_skip_file(path):
                    failure_info = self._file_failures[path]
                    report.skipped_files.append(
                        SkippedFile(
                            path=path,
                            reason=failure_info.last_error,
                            failure_count=failure_info.count,
                            first_failed=failure_info.first_failure,
                        )
                    )
        with logfire.span("process_modified_files", modified_count=len(report.modified)):
            for path in report.modified:
                entity, _ = await self.sync_file(path, new=False)
                # Track if file was skipped
                if entity is None and await self._should_skip_file(path):
                    failure_info = self._file_failures[path]
                    report.skipped_files.append(
                        SkippedFile(
                            path=path,
                            reason=failure_info.last_error,
                            failure_count=failure_info.count,
                            first_failed=failure_info.first_failure,
                        )
                    )
        # Only resolve relations if there were actual changes
        # If no files changed, no new unresolved relations could have been created
        with logfire.span("resolve_relations"):
            if report.total > 0:
                await self.resolve_relations()
            else:
                logger.info("Skipping relation resolution - no file changes detected")
        # Update scan watermark after successful sync
        # Use the timestamp from sync start (not end) to ensure we catch files
        # created during the sync on the next iteration
        current_file_count = await self._quick_count_files(directory)
        if self.entity_repository.project_id is not None:
            project = await self.project_repository.find_by_id(self.entity_repository.project_id)
            if project:
                await self.project_repository.update(
                    project.id,
                    {
                        "last_scan_timestamp": sync_start_timestamp,
                        "last_file_count": current_file_count,
                    },
                )
                logger.debug(
                    f"Updated scan watermark: timestamp={sync_start_timestamp}, "
                    f"file_count={current_file_count}"
                )
        duration_ms = int((time.time() - start_time) * 1000)
        # Record metrics for sync operation
        logfire.metric_histogram("sync.duration", unit="ms").record(duration_ms)
        logfire.metric_counter("sync.files.new").add(len(report.new))
        logfire.metric_counter("sync.files.modified").add(len(report.modified))
        logfire.metric_counter("sync.files.deleted").add(len(report.deleted))
        logfire.metric_counter("sync.files.moved").add(len(report.moves))
        if report.skipped_files:
            logfire.metric_counter("sync.files.skipped").add(len(report.skipped_files))
        # Log summary with skipped files if any
        if report.skipped_files:
            logger.warning(
                f"Sync completed with {len(report.skipped_files)} skipped files: "
                f"directory={directory}, total_changes={report.total}, "
                f"skipped={len(report.skipped_files)}, duration_ms={duration_ms}"
            )
            for skipped in report.skipped_files:
                logger.warning(
                    f"Skipped file: path={skipped.path}, "
                    f"failures={skipped.failure_count}, reason={skipped.reason}"
                )
        else:
            logger.info(
                f"Sync operation completed: directory={directory}, "
                f"total_changes={report.total}, duration_ms={duration_ms}"
            )
        return report
    @logfire.instrument()
    async def scan(self, directory):
        """Smart scan using watermark and file count for large project optimization.
        Uses scan watermark tracking to dramatically reduce scan time for large projects:
        - Tracks last_scan_timestamp and last_file_count in Project model
        - Uses `find -newermt` for incremental scanning (only changed files)
        - Falls back to full scan when deletions detected (file count decreased)
        Expected performance:
        - No changes: 225x faster (2s vs 450s for 1,460 files on TigrisFS)
        - Few changes: 84x faster (5s vs 420s)
        - Deletions: Full scan (rare, acceptable)
        Architecture:
        - Get current file count quickly (find | wc -l: 1.4s)
        - Compare with last_file_count to detect deletions
        - If no deletions: incremental scan with find -newermt (0.2s)
        - Process changed files with mtime-based comparison
        """
        scan_start_time = time.time()
        report = SyncReport()
        # Get current project to check watermark
        if self.entity_repository.project_id is None:
            raise ValueError("Entity repository has no project_id set")
        project = await self.project_repository.find_by_id(self.entity_repository.project_id)
        if project is None:
            raise ValueError(f"Project not found: {self.entity_repository.project_id}")
        # Step 1: Quick file count
        logger.debug("Counting files in directory")
        current_count = await self._quick_count_files(directory)
        logger.debug(f"Found {current_count} files in directory")
        # Step 2: Determine scan strategy based on watermark and file count
        if project.last_file_count is None:
            # First sync ever → full scan
            scan_type = "full_initial"
            logger.info("First sync for this project, performing full scan")
            file_paths_to_scan = await self._scan_directory_full(directory)
        elif current_count < project.last_file_count:
            # Files deleted → need full scan to detect which ones
            scan_type = "full_deletions"
            logger.info(
                f"File count decreased ({project.last_file_count} → {current_count}), "
                f"running full scan to detect deletions"
            )
            file_paths_to_scan = await self._scan_directory_full(directory)
        elif project.last_scan_timestamp is not None:
            # Incremental scan: only files modified since last scan
            scan_type = "incremental"
            logger.info(
                f"Running incremental scan for files modified since {project.last_scan_timestamp}"
            )
            file_paths_to_scan = await self._scan_directory_modified_since(
                directory, project.last_scan_timestamp
            )
            logger.info(
                f"Incremental scan found {len(file_paths_to_scan)} potentially changed files"
            )
        else:
            # Fallback to full scan (no watermark available)
            scan_type = "full_fallback"
            logger.warning("No scan watermark available, falling back to full scan")
            file_paths_to_scan = await self._scan_directory_full(directory)
        # Record scan type metric
        logfire.metric_counter(f"sync.scan.{scan_type}").add(1)
        logfire.metric_histogram("sync.scan.files_scanned", unit="files").record(
            len(file_paths_to_scan)
        )
        # Step 3: Process each file with mtime-based comparison
        scanned_paths: Set[str] = set()
        changed_checksums: Dict[str, str] = {}
        logger.debug(f"Processing {len(file_paths_to_scan)} files with mtime-based comparison")
        for rel_path in file_paths_to_scan:
            scanned_paths.add(rel_path)
            # Get file stats
            abs_path = directory / rel_path
            if not abs_path.exists():
                # File was deleted between scan and now (race condition)
                continue
            stat_info = abs_path.stat()
            # Indexed lookup - single file query (not full table scan)
            db_entity = await self.entity_repository.get_by_file_path(rel_path)
            if db_entity is None:
                # New file - need checksum for move detection
                checksum = await self.file_service.compute_checksum(rel_path)
                report.new.add(rel_path)
                changed_checksums[rel_path] = checksum
                logger.trace(f"New file detected: {rel_path}")
                continue
            # File exists in DB - check if mtime/size changed
            db_mtime = db_entity.mtime
            db_size = db_entity.size
            fs_mtime = stat_info.st_mtime
            fs_size = stat_info.st_size
            # Compare mtime and size (like rsync/rclone)
            # Allow small epsilon for float comparison (0.01s = 10ms)
            mtime_changed = db_mtime is None or abs(fs_mtime - db_mtime) > 0.01
            size_changed = db_size is None or fs_size != db_size
            if mtime_changed or size_changed:
                # File modified - compute checksum
                checksum = await self.file_service.compute_checksum(rel_path)
                db_checksum = db_entity.checksum
                # Only mark as modified if checksum actually differs
                # (handles cases where mtime changed but content didn't, e.g., git operations)
                if checksum != db_checksum:
                    report.modified.add(rel_path)
                    changed_checksums[rel_path] = checksum
                    logger.trace(
                        f"Modified file detected: {rel_path}, "
                        f"mtime_changed={mtime_changed}, size_changed={size_changed}"
                    )
            else:
                # File unchanged - no checksum needed
                logger.trace(f"File unchanged (mtime/size match): {rel_path}")
        # Step 4: Detect moves (for both full and incremental scans)
        # Check if any "new" files are actually moves by matching checksums
        for new_path in list(report.new):  # Use list() to allow modification during iteration
            new_checksum = changed_checksums.get(new_path)
            if not new_checksum:
                continue
            # Look for existing entity with same checksum but different path
            # This could be a move or a copy
            existing_entities = await self.entity_repository.find_by_checksum(new_checksum)
            for candidate in existing_entities:
                if candidate.file_path == new_path:
                    # Same path, skip (shouldn't happen for "new" files but be safe)
                    continue
                # Check if the old path still exists on disk
                old_path_abs = directory / candidate.file_path
                if old_path_abs.exists():
                    # Original still exists → this is a copy, not a move
                    logger.trace(
                        f"File copy detected (not move): {candidate.file_path} copied to {new_path}"
                    )
                    continue
                # Original doesn't exist → this is a move!
                report.moves[candidate.file_path] = new_path
                report.new.remove(new_path)
                logger.trace(f"Move detected: {candidate.file_path} -> {new_path}")
                break  # Only match first candidate
        # Step 5: Detect deletions (only for full scans)
        # Incremental scans can't reliably detect deletions since they only see modified files
        if scan_type in ("full_initial", "full_deletions", "full_fallback"):
            # Use optimized query for just file paths (not full entities)
            db_file_paths = await self.entity_repository.get_all_file_paths()
            logger.debug(f"Found {len(db_file_paths)} db paths for deletion detection")
            for db_path in db_file_paths:
                if db_path not in scanned_paths:
                    # File in DB but not on filesystem
                    # Check if it was already detected as a move
                    if db_path in report.moves:
                        # Already handled as a move, skip
                        continue
                    # File was deleted
                    report.deleted.add(db_path)
                    logger.trace(f"Deleted file detected: {db_path}")
        # Store checksums for files that need syncing
        report.checksums = changed_checksums
        scan_duration_ms = int((time.time() - scan_start_time) * 1000)
        logfire.metric_histogram("sync.scan.duration", unit="ms").record(scan_duration_ms)
        logger.info(
            f"Completed {scan_type} scan for directory {directory} in {scan_duration_ms}ms, "
            f"found {report.total} changes (new={len(report.new)}, "
            f"modified={len(report.modified)}, deleted={len(report.deleted)}, "
            f"moves={len(report.moves)})"
        )
        return report
    @logfire.instrument()
    async def sync_file(
        self, path: str, new: bool = True
    ) -> Tuple[Optional[Entity], Optional[str]]:
        """Sync a single file with circuit breaker protection.
        Args:
            path: Path to file to sync
            new: Whether this is a new file
        Returns:
            Tuple of (entity, checksum) or (None, None) if sync fails or file is skipped
        """
        # Check if file should be skipped due to repeated failures
        if await self._should_skip_file(path):
            logger.warning(f"Skipping file due to repeated failures: {path}")
            return None, None
        try:
            logger.debug(
                f"Syncing file path={path} is_new={new} is_markdown={self.file_service.is_markdown(path)}"
            )
            if self.file_service.is_markdown(path):
                entity, checksum = await self.sync_markdown_file(path, new)
            else:
                entity, checksum = await self.sync_regular_file(path, new)
            if entity is not None:
                await self.search_service.index_entity(entity)
                # Clear failure tracking on successful sync
                self._clear_failure(path)
                logger.debug(
                    f"File sync completed, path={path}, entity_id={entity.id}, checksum={checksum[:8]}"
                )
            return entity, checksum
        except Exception as e:
            # Check if this is a fatal error (or caused by one)
            # Fatal errors like project deletion should terminate sync immediately
            if isinstance(e, SyncFatalError) or isinstance(e.__cause__, SyncFatalError):
                logger.error(f"Fatal sync error encountered, terminating sync: path={path}")
                raise
            # Otherwise treat as recoverable file-level error
            error_msg = str(e)
            logger.error(f"Failed to sync file: path={path}, error={error_msg}")
            # Record failure for circuit breaker
            await self._record_failure(path, error_msg)
            return None, None
    @logfire.instrument()
    async def sync_markdown_file(self, path: str, new: bool = True) -> Tuple[Optional[Entity], str]:
        """Sync a markdown file with full processing.
        Args:
            path: Path to markdown file
            new: Whether this is a new file
        Returns:
            Tuple of (entity, checksum)
        """
        # Parse markdown first to get any existing permalink
        logger.debug(f"Parsing markdown file, path: {path}, new: {new}")
        file_content = await self.file_service.read_file_content(path)
        file_contains_frontmatter = has_frontmatter(file_content)
        # Get file timestamps for tracking modification times
        file_stats = self.file_service.file_stats(path)
        created = datetime.fromtimestamp(file_stats.st_ctime).astimezone()
        modified = datetime.fromtimestamp(file_stats.st_mtime).astimezone()
        # entity markdown will always contain front matter, so it can be used up create/update the entity
        entity_markdown = await self.entity_parser.parse_file(path)
        # if the file contains frontmatter, resolve a permalink (unless disabled)
        if file_contains_frontmatter and not self.app_config.disable_permalinks:
            # Resolve permalink - skip conflict checks during bulk sync for performance
            permalink = await self.entity_service.resolve_permalink(
                path, markdown=entity_markdown, skip_conflict_check=True
            )
            # If permalink changed, update the file
            if permalink != entity_markdown.frontmatter.permalink:
                logger.info(
                    f"Updating permalink for path: {path}, old_permalink: {entity_markdown.frontmatter.permalink}, new_permalink: {permalink}"
                )
                entity_markdown.frontmatter.metadata["permalink"] = permalink
                await self.file_service.update_frontmatter(path, {"permalink": permalink})
        # if the file is new, create an entity
        if new:
            # Create entity with final permalink
            logger.debug(f"Creating new entity from markdown, path={path}")
            await self.entity_service.create_entity_from_markdown(Path(path), entity_markdown)
        # otherwise we need to update the entity and observations
        else:
            logger.debug(f"Updating entity from markdown, path={path}")
            await self.entity_service.update_entity_and_observations(Path(path), entity_markdown)
        # Update relations and search index
        entity = await self.entity_service.update_entity_relations(path, entity_markdown)
        # After updating relations, we need to compute the checksum again
        # This is necessary for files with wikilinks to ensure consistent checksums
        # after relation processing is complete
        final_checksum = await self.file_service.compute_checksum(path)
        # Update checksum, timestamps, and file metadata from file system
        # Store mtime/size for efficient change detection in future scans
        # This ensures temporal ordering in search and recent activity uses actual file modification times
        await self.entity_repository.update(
            entity.id,
            {
                "checksum": final_checksum,
                "created_at": created,
                "updated_at": modified,
                "mtime": file_stats.st_mtime,
                "size": file_stats.st_size,
            },
        )
        logger.debug(
            f"Markdown sync completed: path={path}, entity_id={entity.id}, "
            f"observation_count={len(entity.observations)}, relation_count={len(entity.relations)}, "
            f"checksum={final_checksum[:8]}"
        )
        # Return the final checksum to ensure everything is consistent
        return entity, final_checksum
    @logfire.instrument()
    async def sync_regular_file(self, path: str, new: bool = True) -> Tuple[Optional[Entity], str]:
        """Sync a non-markdown file with basic tracking.
        Args:
            path: Path to file
            new: Whether this is a new file
        Returns:
            Tuple of (entity, checksum)
        """
        checksum = await self.file_service.compute_checksum(path)
        if new:
            # Generate permalink from path - skip conflict checks during bulk sync
            await self.entity_service.resolve_permalink(path, skip_conflict_check=True)
            # get file timestamps
            file_stats = self.file_service.file_stats(path)
            created = datetime.fromtimestamp(file_stats.st_ctime).astimezone()
            modified = datetime.fromtimestamp(file_stats.st_mtime).astimezone()
            # get mime type
            content_type = self.file_service.content_type(path)
            file_path = Path(path)
            try:
                entity = await self.entity_repository.add(
                    Entity(
                        entity_type="file",
                        file_path=path,
                        checksum=checksum,
                        title=file_path.name,
                        created_at=created,
                        updated_at=modified,
                        content_type=content_type,
                        mtime=file_stats.st_mtime,
                        size=file_stats.st_size,
                    )
                )
                return entity, checksum
            except IntegrityError as e:
                # Handle race condition where entity was created by another process
                if "UNIQUE constraint failed: entity.file_path" in str(e):
                    logger.info(
                        f"Entity already exists for file_path={path}, updating instead of creating"
                    )
                    # Treat as update instead of create
                    entity = await self.entity_repository.get_by_file_path(path)
                    if entity is None:  # pragma: no cover
                        logger.error(f"Entity not found after constraint violation, path={path}")
                        raise ValueError(f"Entity not found after constraint violation: {path}")
                    # Re-get file stats since we're in update path
                    file_stats_for_update = self.file_service.file_stats(path)
                    updated = await self.entity_repository.update(
                        entity.id,
                        {
                            "file_path": path,
                            "checksum": checksum,
                            "mtime": file_stats_for_update.st_mtime,
                            "size": file_stats_for_update.st_size,
                        },
                    )
                    if updated is None:  # pragma: no cover
                        logger.error(f"Failed to update entity, entity_id={entity.id}, path={path}")
                        raise ValueError(f"Failed to update entity with ID {entity.id}")
                    return updated, checksum
                else:
                    # Re-raise if it's a different integrity error
                    raise
        else:
            # Get file timestamps for updating modification time
            file_stats = self.file_service.file_stats(path)
            modified = datetime.fromtimestamp(file_stats.st_mtime).astimezone()
            entity = await self.entity_repository.get_by_file_path(path)
            if entity is None:  # pragma: no cover
                logger.error(f"Entity not found for existing file, path={path}")
                raise ValueError(f"Entity not found for existing file: {path}")
            # Update checksum, modification time, and file metadata from file system
            # Store mtime/size for efficient change detection in future scans
            updated = await self.entity_repository.update(
                entity.id,
                {
                    "file_path": path,
                    "checksum": checksum,
                    "updated_at": modified,
                    "mtime": file_stats.st_mtime,
                    "size": file_stats.st_size,
                },
            )
            if updated is None:  # pragma: no cover
                logger.error(f"Failed to update entity, entity_id={entity.id}, path={path}")
                raise ValueError(f"Failed to update entity with ID {entity.id}")
            return updated, checksum
    @logfire.instrument()
    async def handle_delete(self, file_path: str):
        """Handle complete entity deletion including search index cleanup."""
        # First get entity to get permalink before deletion
        entity = await self.entity_repository.get_by_file_path(file_path)
        if entity:
            logger.info(
                f"Deleting entity with file_path={file_path}, entity_id={entity.id}, permalink={entity.permalink}"
            )
            # Delete from db (this cascades to observations/relations)
            await self.entity_service.delete_entity_by_file_path(file_path)
            # Clean up search index
            permalinks = (
                [entity.permalink]
                + [o.permalink for o in entity.observations]
                + [r.permalink for r in entity.relations]
            )
            logger.debug(
                f"Cleaning up search index for entity_id={entity.id}, file_path={file_path}, "
                f"index_entries={len(permalinks)}"
            )
            for permalink in permalinks:
                if permalink:
                    await self.search_service.delete_by_permalink(permalink)
                else:
                    await self.search_service.delete_by_entity_id(entity.id)
    @logfire.instrument()
    async def handle_move(self, old_path, new_path):
        logger.debug("Moving entity", old_path=old_path, new_path=new_path)
        entity = await self.entity_repository.get_by_file_path(old_path)
        if entity:
            # Check if destination path is already occupied by another entity
            existing_at_destination = await self.entity_repository.get_by_file_path(new_path)
            if existing_at_destination and existing_at_destination.id != entity.id:
                # Handle the conflict - this could be a file swap or replacement scenario
                logger.warning(
                    f"File path conflict detected during move: "
                    f"entity_id={entity.id} trying to move from '{old_path}' to '{new_path}', "
                    f"but entity_id={existing_at_destination.id} already occupies '{new_path}'"
                )
                # Check if this is a file swap (the destination entity is being moved to our old path)
                # This would indicate a simultaneous move operation
                old_path_after_swap = await self.entity_repository.get_by_file_path(old_path)
                if old_path_after_swap and old_path_after_swap.id == existing_at_destination.id:
                    logger.info(f"Detected file swap between '{old_path}' and '{new_path}'")
                    # This is a swap scenario - both moves should succeed
                    # We'll allow this to proceed since the other file has moved out
                else:
                    # This is a conflict where the destination is occupied
                    raise ValueError(
                        f"Cannot move entity from '{old_path}' to '{new_path}': "
                        f"destination path is already occupied by another file. "
                        f"This may be caused by: "
                        f"1. Conflicting file names with different character encodings, "
                        f"2. Case sensitivity differences (e.g., 'Finance/' vs 'finance/'), "
                        f"3. Character conflicts between hyphens in filenames and generated permalinks, "
                        f"4. Files with similar names containing special characters. "
                        f"Try renaming one of the conflicting files to resolve this issue."
                    )
            # Update file_path in all cases
            updates = {"file_path": new_path}
            # If configured, also update permalink to match new path
            if (
                self.app_config.update_permalinks_on_move
                and not self.app_config.disable_permalinks
                and self.file_service.is_markdown(new_path)
            ):
                # generate new permalink value - skip conflict checks during bulk sync
                new_permalink = await self.entity_service.resolve_permalink(
                    new_path, skip_conflict_check=True
                )
                # write to file and get new checksum
                new_checksum = await self.file_service.update_frontmatter(
                    new_path, {"permalink": new_permalink}
                )
                updates["permalink"] = new_permalink
                updates["checksum"] = new_checksum
                logger.info(
                    f"Updating permalink on move,old_permalink={entity.permalink}"
                    f"new_permalink={new_permalink}"
                    f"new_checksum={new_checksum}"
                )
            try:
                updated = await self.entity_repository.update(entity.id, updates)
            except Exception as e:
                # Catch any database integrity errors and provide helpful context
                if "UNIQUE constraint failed" in str(e):
                    logger.error(
                        f"Database constraint violation during move: "
                        f"entity_id={entity.id}, old_path='{old_path}', new_path='{new_path}'"
                    )
                    raise ValueError(
                        f"Cannot complete move from '{old_path}' to '{new_path}': "
                        f"a database constraint was violated. This usually indicates "
                        f"a file path or permalink conflict. Please check for: "
                        f"1. Duplicate file names, "
                        f"2. Case sensitivity issues (e.g., 'File.md' vs 'file.md'), "
                        f"3. Character encoding conflicts in file names."
                    ) from e
                else:
                    # Re-raise other exceptions as-is
                    raise
            if updated is None:  # pragma: no cover
                logger.error(
                    "Failed to update entity path"
                    f"entity_id={entity.id}"
                    f"old_path={old_path}"
                    f"new_path={new_path}"
                )
                raise ValueError(f"Failed to update entity path for ID {entity.id}")
            logger.debug(
                "Entity path updated"
                f"entity_id={entity.id} "
                f"permalink={entity.permalink} "
                f"old_path={old_path} "
                f"new_path={new_path} "
            )
            # update search index
            await self.search_service.index_entity(updated)
    @logfire.instrument()
    async def resolve_relations(self, entity_id: int | None = None):
        """Try to resolve unresolved relations.
        Args:
            entity_id: If provided, only resolve relations for this specific entity.
                      Otherwise, resolve all unresolved relations in the database.
        """
        if entity_id:
            # Only get unresolved relations for the specific entity
            unresolved_relations = (
                await self.relation_repository.find_unresolved_relations_for_entity(entity_id)
            )
            logger.info(
                f"Resolving forward references for entity {entity_id}",
                count=len(unresolved_relations),
            )
        else:
            # Get all unresolved relations (original behavior)
            unresolved_relations = await self.relation_repository.find_unresolved_relations()
            logger.info("Resolving all forward references", count=len(unresolved_relations))
        for relation in unresolved_relations:
            logger.trace(
                "Attempting to resolve relation "
                f"relation_id={relation.id} "
                f"from_id={relation.from_id} "
                f"to_name={relation.to_name}"
            )
            resolved_entity = await self.entity_service.link_resolver.resolve_link(relation.to_name)
            # ignore reference to self
            if resolved_entity and resolved_entity.id != relation.from_id:
                logger.debug(
                    "Resolved forward reference "
                    f"relation_id={relation.id} "
                    f"from_id={relation.from_id} "
                    f"to_name={relation.to_name} "
                    f"resolved_id={resolved_entity.id} "
                    f"resolved_title={resolved_entity.title}",
                )
                try:
                    await self.relation_repository.update(
                        relation.id,
                        {
                            "to_id": resolved_entity.id,
                            "to_name": resolved_entity.title,
                        },
                    )
                except IntegrityError:  # pragma: no cover
                    logger.debug(
                        "Ignoring duplicate relation "
                        f"relation_id={relation.id} "
                        f"from_id={relation.from_id} "
                        f"to_name={relation.to_name}"
                    )
                # update search index
                await self.search_service.index_entity(resolved_entity)
    async def _quick_count_files(self, directory: Path) -> int:
        """Fast file count using find command.
        Uses subprocess to leverage OS-level file counting which is much faster
        than Python iteration, especially on network filesystems like TigrisFS.
        Args:
            directory: Directory to count files in
        Returns:
            Number of files in directory (recursive)
        """
        process = await asyncio.create_subprocess_shell(
            f'find "{directory}" -type f | wc -l',
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
        )
        stdout, stderr = await process.communicate()
        if process.returncode != 0:
            error_msg = stderr.decode().strip()
            logger.error(
                f"FILE COUNT OPTIMIZATION FAILED: find command failed with exit code {process.returncode}, "
                f"error: {error_msg}. Falling back to manual count. "
                f"This will slow down watermark detection!"
            )
            # Track optimization failures for visibility
            logfire.metric_counter("sync.scan.file_count_failure").add(1)
            # Fallback: count using scan_directory
            count = 0
            async for _ in self.scan_directory(directory):
                count += 1
            return count
        return int(stdout.strip())
    async def _scan_directory_modified_since(
        self, directory: Path, since_timestamp: float
    ) -> List[str]:
        """Use find -newermt for filesystem-level filtering of modified files.
        This is dramatically faster than scanning all files and comparing mtimes,
        especially on network filesystems like TigrisFS where stat operations are expensive.
        Args:
            directory: Directory to scan
            since_timestamp: Unix timestamp to find files newer than
        Returns:
            List of relative file paths modified since the timestamp (respects .bmignore)
        """
        # Convert timestamp to find-compatible format
        since_date = datetime.fromtimestamp(since_timestamp).strftime("%Y-%m-%d %H:%M:%S")
        process = await asyncio.create_subprocess_shell(
            f'find "{directory}" -type f -newermt "{since_date}"',
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
        )
        stdout, stderr = await process.communicate()
        if process.returncode != 0:
            error_msg = stderr.decode().strip()
            logger.error(
                f"SCAN OPTIMIZATION FAILED: find -newermt command failed with exit code {process.returncode}, "
                f"error: {error_msg}. Falling back to full scan. "
                f"This will cause slow syncs on large projects!"
            )
            # Track optimization failures for visibility
            logfire.metric_counter("sync.scan.optimization_failure").add(1)
            # Fallback to full scan
            return await self._scan_directory_full(directory)
        # Convert absolute paths to relative and filter through ignore patterns
        file_paths = []
        for line in stdout.decode().splitlines():
            if line:
                try:
                    abs_path = Path(line)
                    rel_path = abs_path.relative_to(directory).as_posix()
                    # Apply ignore patterns (same as scan_directory)
                    if should_ignore_path(abs_path, directory, self._ignore_patterns):
                        logger.trace(f"Ignoring path per .bmignore: {rel_path}")
                        continue
                    file_paths.append(rel_path)
                except ValueError:
                    # Path is not relative to directory, skip it
                    logger.warning(f"Skipping file not under directory: {line}")
                    continue
        return file_paths
    async def _scan_directory_full(self, directory: Path) -> List[str]:
        """Full directory scan returning all file paths.
        Uses scan_directory() which respects .bmignore patterns.
        Args:
            directory: Directory to scan
        Returns:
            List of relative file paths (respects .bmignore)
        """
        file_paths = []
        async for file_path_str, _ in self.scan_directory(directory):
            rel_path = Path(file_path_str).relative_to(directory).as_posix()
            file_paths.append(rel_path)
        return file_paths
    async def scan_directory(self, directory: Path) -> AsyncIterator[Tuple[str, os.stat_result]]:
        """Stream files from directory using aiofiles.os.scandir() with cached stat info.
        This method uses aiofiles.os.scandir() to leverage async I/O and cached stat
        information from directory entries. This reduces network I/O by 50% on network
        filesystems like TigrisFS by avoiding redundant stat() calls.
        Args:
            directory: Directory to scan
        Yields:
            Tuples of (absolute_file_path, stat_info) for each file
        """
        try:
            entries = await aiofiles.os.scandir(directory)
        except PermissionError:
            logger.warning(f"Permission denied scanning directory: {directory}")
            return
        results = []
        subdirs = []
        for entry in entries:
            entry_path = Path(entry.path)
            # Check ignore patterns
            if should_ignore_path(entry_path, directory, self._ignore_patterns):
                logger.trace(f"Ignoring path per .bmignore: {entry_path.relative_to(directory)}")
                continue
            if entry.is_dir(follow_symlinks=False):
                # Collect subdirectories to recurse into
                subdirs.append(entry_path)
            elif entry.is_file(follow_symlinks=False):
                # Get cached stat info (no extra syscall!)
                stat_info = entry.stat(follow_symlinks=False)
                results.append((entry.path, stat_info))
        # Yield files from current directory
        for file_path, stat_info in results:
            yield (file_path, stat_info)
        # Recurse into subdirectories
        for subdir in subdirs:
            async for result in self.scan_directory(subdir):
                yield result
async def get_sync_service(project: Project) -> SyncService:  # pragma: no cover
    """Get sync service instance with all dependencies."""
    app_config = ConfigManager().config
    _, session_maker = await db.get_or_create_db(
        db_path=app_config.database_path, db_type=db.DatabaseType.FILESYSTEM
    )
    project_path = Path(project.path)
    entity_parser = EntityParser(project_path)
    markdown_processor = MarkdownProcessor(entity_parser)
    file_service = FileService(project_path, markdown_processor)
    # Initialize repositories
    entity_repository = EntityRepository(session_maker, project_id=project.id)
    observation_repository = ObservationRepository(session_maker, project_id=project.id)
    relation_repository = RelationRepository(session_maker, project_id=project.id)
    search_repository = SearchRepository(session_maker, project_id=project.id)
    project_repository = ProjectRepository(session_maker)
    # Initialize services
    search_service = SearchService(search_repository, entity_repository, file_service)
    link_resolver = LinkResolver(entity_repository, search_service)
    # Initialize services
    entity_service = EntityService(
        entity_parser,
        entity_repository,
        observation_repository,
        relation_repository,
        file_service,
        link_resolver,
    )
    # Create sync service
    sync_service = SyncService(
        app_config=app_config,
        entity_service=entity_service,
        entity_parser=entity_parser,
        entity_repository=entity_repository,
        relation_repository=relation_repository,
        project_repository=project_repository,
        search_service=search_service,
        file_service=file_service,
    )
    return sync_service
```
--------------------------------------------------------------------------------
/tests/services/test_project_service.py:
--------------------------------------------------------------------------------
```python
"""Tests for ProjectService."""
import os
import tempfile
from pathlib import Path
import pytest
from basic_memory.schemas import (
    ProjectInfoResponse,
    ProjectStatistics,
    ActivityMetrics,
    SystemStatus,
)
from basic_memory.services.project_service import ProjectService
from basic_memory.config import ConfigManager
def test_projects_property(project_service: ProjectService):
    """Test the projects property."""
    # Get the projects
    projects = project_service.projects
    # Assert that it returns a dictionary
    assert isinstance(projects, dict)
    # The test config should have at least one project
    assert len(projects) > 0
def test_default_project_property(project_service: ProjectService):
    """Test the default_project property."""
    # Get the default project
    default_project = project_service.default_project
    # Assert it's a string and has a value
    assert isinstance(default_project, str)
    assert default_project
def test_current_project_property(project_service: ProjectService):
    """Test the current_project property."""
    # Save original environment
    original_env = os.environ.get("BASIC_MEMORY_PROJECT")
    try:
        # Test with environment variable not set
        if "BASIC_MEMORY_PROJECT" in os.environ:
            del os.environ["BASIC_MEMORY_PROJECT"]
        # Should return default_project when env var not set
        assert project_service.current_project == project_service.default_project
        # Now set the environment variable
        os.environ["BASIC_MEMORY_PROJECT"] = "test-project"
        # Should return env var value
        assert project_service.current_project == "test-project"
    finally:
        # Restore original environment
        if original_env is not None:
            os.environ["BASIC_MEMORY_PROJECT"] = original_env
        elif "BASIC_MEMORY_PROJECT" in os.environ:
            del os.environ["BASIC_MEMORY_PROJECT"]
    """Test the methods of ProjectService."""
@pytest.mark.asyncio
async def test_project_operations_sync_methods(
    app_config, project_service: ProjectService, config_manager: ConfigManager
):
    """Test adding, switching, and removing a project using ConfigManager directly.
    This test uses the ConfigManager directly instead of the async methods.
    """
    # Generate a unique project name for testing
    test_project_name = f"test-project-{os.urandom(4).hex()}"
    with tempfile.TemporaryDirectory() as temp_dir:
        test_root = Path(temp_dir)
        test_project_path = (test_root / "test-project").as_posix()
        # Make sure the test directory exists
        os.makedirs(test_project_path, exist_ok=True)
        try:
            # Test adding a project (using ConfigManager directly)
            config_manager.add_project(test_project_name, test_project_path)
            # Verify it was added
            assert test_project_name in project_service.projects
            assert project_service.projects[test_project_name] == test_project_path
            # Test setting as default
            original_default = project_service.default_project
            config_manager.set_default_project(test_project_name)
            assert project_service.default_project == test_project_name
            # Restore original default
            if original_default:
                config_manager.set_default_project(original_default)
            # Test removing the project
            config_manager.remove_project(test_project_name)
            assert test_project_name not in project_service.projects
        except Exception as e:
            # Clean up in case of error
            if test_project_name in project_service.projects:
                try:
                    config_manager.remove_project(test_project_name)
                except Exception:
                    pass
            raise e
@pytest.mark.asyncio
async def test_get_system_status(project_service: ProjectService):
    """Test getting system status."""
    # Get the system status
    status = project_service.get_system_status()
    # Assert it returns a valid SystemStatus object
    assert isinstance(status, SystemStatus)
    assert status.version
    assert status.database_path
    assert status.database_size
@pytest.mark.asyncio
async def test_get_statistics(project_service: ProjectService, test_graph, test_project):
    """Test getting statistics."""
    # Get statistics
    statistics = await project_service.get_statistics(test_project.id)
    # Assert it returns a valid ProjectStatistics object
    assert isinstance(statistics, ProjectStatistics)
    assert statistics.total_entities > 0
    assert "test" in statistics.entity_types
@pytest.mark.asyncio
async def test_get_activity_metrics(project_service: ProjectService, test_graph, test_project):
    """Test getting activity metrics."""
    # Get activity metrics
    metrics = await project_service.get_activity_metrics(test_project.id)
    # Assert it returns a valid ActivityMetrics object
    assert isinstance(metrics, ActivityMetrics)
    assert len(metrics.recently_created) > 0
    assert len(metrics.recently_updated) > 0
@pytest.mark.asyncio
async def test_get_project_info(project_service: ProjectService, test_graph, test_project):
    """Test getting full project info."""
    # Get project info
    info = await project_service.get_project_info(test_project.name)
    # Assert it returns a valid ProjectInfoResponse object
    assert isinstance(info, ProjectInfoResponse)
    assert info.project_name
    assert info.project_path
    assert info.default_project
    assert isinstance(info.available_projects, dict)
    assert isinstance(info.statistics, ProjectStatistics)
    assert isinstance(info.activity, ActivityMetrics)
    assert isinstance(info.system, SystemStatus)
@pytest.mark.asyncio
async def test_add_project_async(project_service: ProjectService):
    """Test adding a project with the updated async method."""
    test_project_name = f"test-async-project-{os.urandom(4).hex()}"
    with tempfile.TemporaryDirectory() as temp_dir:
        test_root = Path(temp_dir)
        test_project_path = (test_root / "test-async-project").as_posix()
        # Make sure the test directory exists
        os.makedirs(test_project_path, exist_ok=True)
        try:
            # Test adding a project
            await project_service.add_project(test_project_name, test_project_path)
            # Verify it was added to config
            assert test_project_name in project_service.projects
            assert project_service.projects[test_project_name] == test_project_path
            # Verify it was added to the database
            project = await project_service.repository.get_by_name(test_project_name)
            assert project is not None
            assert project.name == test_project_name
            assert project.path == test_project_path
        finally:
            # Clean up
            if test_project_name in project_service.projects:
                await project_service.remove_project(test_project_name)
            # Ensure it was removed from both config and DB
            assert test_project_name not in project_service.projects
            project = await project_service.repository.get_by_name(test_project_name)
            assert project is None
@pytest.mark.asyncio
async def test_set_default_project_async(project_service: ProjectService):
    """Test setting a project as default with the updated async method."""
    # First add a test project
    test_project_name = f"test-default-project-{os.urandom(4).hex()}"
    with tempfile.TemporaryDirectory() as temp_dir:
        test_root = Path(temp_dir)
        test_project_path = str(test_root / "test-default-project")
        # Make sure the test directory exists
        os.makedirs(test_project_path, exist_ok=True)
        original_default = project_service.default_project
        try:
            # Add the test project
            await project_service.add_project(test_project_name, test_project_path)
            # Set as default
            await project_service.set_default_project(test_project_name)
            # Verify it's set as default in config
            assert project_service.default_project == test_project_name
            # Verify it's set as default in database
            project = await project_service.repository.get_by_name(test_project_name)
            assert project is not None
            assert project.is_default is True
            # Make sure old default is no longer default
            old_default_project = await project_service.repository.get_by_name(original_default)
            if old_default_project:
                assert old_default_project.is_default is not True
        finally:
            # Restore original default
            if original_default:
                await project_service.set_default_project(original_default)
            # Clean up test project
            if test_project_name in project_service.projects:
                await project_service.remove_project(test_project_name)
@pytest.mark.asyncio
async def test_get_project_method(project_service: ProjectService):
    """Test the get_project method directly."""
    test_project_name = f"test-get-project-{os.urandom(4).hex()}"
    with tempfile.TemporaryDirectory() as temp_dir:
        test_root = Path(temp_dir)
        test_project_path = (test_root / "test-get-project").as_posix()
        # Make sure the test directory exists
        os.makedirs(test_project_path, exist_ok=True)
        try:
            # Test getting a non-existent project
            result = await project_service.get_project("non-existent-project")
            assert result is None
            # Add a project
            await project_service.add_project(test_project_name, test_project_path)
            # Test getting an existing project
            result = await project_service.get_project(test_project_name)
            assert result is not None
            assert result.name == test_project_name
            assert result.path == test_project_path
        finally:
            # Clean up
            if test_project_name in project_service.projects:
                await project_service.remove_project(test_project_name)
@pytest.mark.asyncio
async def test_set_default_project_config_db_mismatch(
    project_service: ProjectService, config_manager: ConfigManager
):
    """Test set_default_project when project exists in config but not in database."""
    test_project_name = f"test-mismatch-project-{os.urandom(4).hex()}"
    with tempfile.TemporaryDirectory() as temp_dir:
        test_root = Path(temp_dir)
        test_project_path = str(test_root / "test-mismatch-project")
        # Make sure the test directory exists
        os.makedirs(test_project_path, exist_ok=True)
        original_default = project_service.default_project
        try:
            # Add project to config only (not to database)
            config_manager.add_project(test_project_name, test_project_path)
            # Verify it's in config but not in database
            assert test_project_name in project_service.projects
            db_project = await project_service.repository.get_by_name(test_project_name)
            assert db_project is None
            # Try to set as default - this should trigger the error log on line 142
            await project_service.set_default_project(test_project_name)
            # Should still update config despite database mismatch
            assert project_service.default_project == test_project_name
        finally:
            # Restore original default
            if original_default:
                config_manager.set_default_project(original_default)
            # Clean up
            if test_project_name in project_service.projects:
                config_manager.remove_project(test_project_name)
@pytest.mark.asyncio
async def test_add_project_with_set_default_true(project_service: ProjectService):
    """Test adding a project with set_default=True enforces single default."""
    test_project_name = f"test-default-true-{os.urandom(4).hex()}"
    with tempfile.TemporaryDirectory() as temp_dir:
        test_root = Path(temp_dir)
        test_project_path = str(test_root / "test-default-true")
        # Make sure the test directory exists
        os.makedirs(test_project_path, exist_ok=True)
        original_default = project_service.default_project
        try:
            # Get original default project from database
            original_default_project = await project_service.repository.get_by_name(
                original_default
            )
            # Add project with set_default=True
            await project_service.add_project(
                test_project_name, test_project_path, set_default=True
            )
            # Verify new project is set as default in both config and database
            assert project_service.default_project == test_project_name
            new_project = await project_service.repository.get_by_name(test_project_name)
            assert new_project is not None
            assert new_project.is_default is True
            # Verify original default is no longer default in database
            if original_default_project:
                refreshed_original = await project_service.repository.get_by_name(original_default)
                assert refreshed_original.is_default is not True
            # Verify only one project has is_default=True
            all_projects = await project_service.repository.find_all()
            default_projects = [p for p in all_projects if p.is_default is True]
            assert len(default_projects) == 1
            assert default_projects[0].name == test_project_name
        finally:
            # Restore original default
            if original_default:
                await project_service.set_default_project(original_default)
            # Clean up test project
            if test_project_name in project_service.projects:
                await project_service.remove_project(test_project_name)
@pytest.mark.asyncio
async def test_add_project_with_set_default_false(project_service: ProjectService):
    """Test adding a project with set_default=False doesn't change defaults."""
    test_project_name = f"test-default-false-{os.urandom(4).hex()}"
    with tempfile.TemporaryDirectory() as temp_dir:
        test_root = Path(temp_dir)
        test_project_path = str(test_root / "test-default-false")
        # Make sure the test directory exists
        os.makedirs(test_project_path, exist_ok=True)
        original_default = project_service.default_project
        try:
            # Add project with set_default=False (explicit)
            await project_service.add_project(
                test_project_name, test_project_path, set_default=False
            )
            # Verify default project hasn't changed
            assert project_service.default_project == original_default
            # Verify new project is NOT set as default
            new_project = await project_service.repository.get_by_name(test_project_name)
            assert new_project is not None
            assert new_project.is_default is not True
            # Verify original default is still default
            original_default_project = await project_service.repository.get_by_name(
                original_default
            )
            if original_default_project:
                assert original_default_project.is_default is True
        finally:
            # Clean up test project
            if test_project_name in project_service.projects:
                await project_service.remove_project(test_project_name)
@pytest.mark.asyncio
async def test_add_project_default_parameter_omitted(project_service: ProjectService):
    """Test adding a project without set_default parameter defaults to False behavior."""
    test_project_name = f"test-default-omitted-{os.urandom(4).hex()}"
    with tempfile.TemporaryDirectory() as temp_dir:
        test_root = Path(temp_dir)
        test_project_path = str(test_root / "test-default-omitted")
        # Make sure the test directory exists
        os.makedirs(test_project_path, exist_ok=True)
        original_default = project_service.default_project
        try:
            # Add project without set_default parameter (should default to False)
            await project_service.add_project(test_project_name, test_project_path)
            # Verify default project hasn't changed
            assert project_service.default_project == original_default
            # Verify new project is NOT set as default
            new_project = await project_service.repository.get_by_name(test_project_name)
            assert new_project is not None
            assert new_project.is_default is not True
        finally:
            # Clean up test project
            if test_project_name in project_service.projects:
                await project_service.remove_project(test_project_name)
@pytest.mark.asyncio
async def test_ensure_single_default_project_enforcement_logic(project_service: ProjectService):
    """Test that _ensure_single_default_project logic works correctly."""
    # Test that the method exists and is callable
    assert hasattr(project_service, "_ensure_single_default_project")
    assert callable(getattr(project_service, "_ensure_single_default_project"))
    # Call the enforcement method - should work without error
    await project_service._ensure_single_default_project()
    # Verify there is exactly one default project after enforcement
    all_projects = await project_service.repository.find_all()
    default_projects = [p for p in all_projects if p.is_default is True]
    assert len(default_projects) == 1  # Should have exactly one default
@pytest.mark.asyncio
async def test_synchronize_projects_calls_ensure_single_default(project_service: ProjectService):
    """Test that synchronize_projects calls _ensure_single_default_project."""
    test_project_name = f"test-sync-default-{os.urandom(4).hex()}"
    with tempfile.TemporaryDirectory() as temp_dir:
        test_root = Path(temp_dir)
        test_project_path = str(test_root / "test-sync-default")
        # Make sure the test directory exists
        os.makedirs(test_project_path, exist_ok=True)
        config_manager = ConfigManager()
        try:
            # Add project to config only (simulating unsynchronized state)
            config_manager.add_project(test_project_name, test_project_path)
            # Verify it's in config but not in database
            assert test_project_name in project_service.projects
            db_project = await project_service.repository.get_by_name(test_project_name)
            assert db_project is None
            # Call synchronize_projects (this should call _ensure_single_default_project)
            await project_service.synchronize_projects()
            # Verify project is now in database
            db_project = await project_service.repository.get_by_name(test_project_name)
            assert db_project is not None
            # Verify default project enforcement was applied
            all_projects = await project_service.repository.find_all()
            default_projects = [p for p in all_projects if p.is_default is True]
            assert len(default_projects) <= 1  # Should be exactly 1 or 0
        finally:
            # Clean up test project
            if test_project_name in project_service.projects:
                await project_service.remove_project(test_project_name)
@pytest.mark.asyncio
async def test_synchronize_projects_normalizes_project_names(project_service: ProjectService):
    """Test that synchronize_projects normalizes project names in config to match database format."""
    # Use a project name that needs normalization (uppercase, spaces)
    unnormalized_name = "Test Project With Spaces"
    expected_normalized_name = "test-project-with-spaces"
    with tempfile.TemporaryDirectory() as temp_dir:
        test_root = Path(temp_dir)
        test_project_path = str(test_root / "test-project-spaces")
        # Make sure the test directory exists
        os.makedirs(test_project_path, exist_ok=True)
        config_manager = ConfigManager()
        try:
            # Manually add the unnormalized project name to config
            # Add project with unnormalized name directly to config
            config = config_manager.load_config()
            config.projects[unnormalized_name] = test_project_path
            config_manager.save_config(config)
            # Verify the unnormalized name is in config
            assert unnormalized_name in project_service.projects
            assert project_service.projects[unnormalized_name] == test_project_path
            # Call synchronize_projects - this should normalize the project name
            await project_service.synchronize_projects()
            # Verify the config was updated with normalized name
            assert expected_normalized_name in project_service.projects
            assert unnormalized_name not in project_service.projects
            assert project_service.projects[expected_normalized_name] == test_project_path
            # Verify the project was added to database with normalized name
            db_project = await project_service.repository.get_by_name(expected_normalized_name)
            assert db_project is not None
            assert db_project.name == expected_normalized_name
            assert db_project.path == test_project_path
            assert db_project.permalink == expected_normalized_name
            # Verify the unnormalized name is not in database
            unnormalized_db_project = await project_service.repository.get_by_name(
                unnormalized_name
            )
            assert unnormalized_db_project is None
        finally:
            # Clean up - remove any test projects from both config and database
            current_projects = project_service.projects.copy()
            for name in [unnormalized_name, expected_normalized_name]:
                if name in current_projects:
                    try:
                        await project_service.remove_project(name)
                    except Exception:
                        # Try to clean up manually if remove_project fails
                        try:
                            config_manager.remove_project(name)
                        except Exception:
                            pass
                        # Remove from database
                        db_project = await project_service.repository.get_by_name(name)
                        if db_project:
                            await project_service.repository.delete(db_project.id)
@pytest.mark.asyncio
async def test_move_project(project_service: ProjectService):
    """Test moving a project to a new location."""
    test_project_name = f"test-move-project-{os.urandom(4).hex()}"
    with tempfile.TemporaryDirectory() as temp_dir:
        test_root = Path(temp_dir)
        old_path = (test_root / "old-location").as_posix()
        new_path = (test_root / "new-location").as_posix()
        # Create old directory
        os.makedirs(old_path, exist_ok=True)
        try:
            # Add project with initial path
            await project_service.add_project(test_project_name, old_path)
            # Verify initial state
            assert test_project_name in project_service.projects
            assert project_service.projects[test_project_name] == old_path
            project = await project_service.repository.get_by_name(test_project_name)
            assert project is not None
            assert project.path == old_path
            # Move project to new location
            await project_service.move_project(test_project_name, new_path)
            # Verify config was updated
            assert project_service.projects[test_project_name] == new_path
            # Verify database was updated
            updated_project = await project_service.repository.get_by_name(test_project_name)
            assert updated_project is not None
            assert updated_project.path == new_path
            # Verify new directory was created
            assert os.path.exists(new_path)
        finally:
            # Clean up
            if test_project_name in project_service.projects:
                await project_service.remove_project(test_project_name)
@pytest.mark.asyncio
async def test_move_project_nonexistent(project_service: ProjectService):
    """Test moving a project that doesn't exist."""
    with tempfile.TemporaryDirectory() as temp_dir:
        test_root = Path(temp_dir)
        new_path = str(test_root / "new-location")
        with pytest.raises(ValueError, match="not found in configuration"):
            await project_service.move_project("nonexistent-project", new_path)
@pytest.mark.asyncio
async def test_move_project_db_mismatch(project_service: ProjectService):
    """Test moving a project that exists in config but not in database."""
    test_project_name = f"test-move-mismatch-{os.urandom(4).hex()}"
    with tempfile.TemporaryDirectory() as temp_dir:
        test_root = Path(temp_dir)
        old_path = (test_root / "old-location").as_posix()
        new_path = (test_root / "new-location").as_posix()
        # Create directories
        os.makedirs(old_path, exist_ok=True)
        config_manager = project_service.config_manager
        try:
            # Add project to config only (not to database)
            config_manager.add_project(test_project_name, old_path)
            # Verify it's in config but not in database
            assert test_project_name in project_service.projects
            db_project = await project_service.repository.get_by_name(test_project_name)
            assert db_project is None
            # Try to move project - should fail and restore config
            with pytest.raises(ValueError, match="not found in database"):
                await project_service.move_project(test_project_name, new_path)
            # Verify config was restored to original path
            assert project_service.projects[test_project_name] == old_path
        finally:
            # Clean up
            if test_project_name in project_service.projects:
                config_manager.remove_project(test_project_name)
@pytest.mark.asyncio
async def test_move_project_expands_path(project_service: ProjectService):
    """Test that move_project expands ~ and relative paths."""
    test_project_name = f"test-move-expand-{os.urandom(4).hex()}"
    with tempfile.TemporaryDirectory() as temp_dir:
        test_root = Path(temp_dir)
        old_path = (test_root / "old-location").as_posix()
        # Create old directory
        os.makedirs(old_path, exist_ok=True)
        try:
            # Add project with initial path
            await project_service.add_project(test_project_name, old_path)
            # Use a relative path for the move
            relative_new_path = "./new-location"
            expected_absolute_path = Path(os.path.abspath(relative_new_path)).as_posix()
            # Move project using relative path
            await project_service.move_project(test_project_name, relative_new_path)
            # Verify the path was expanded to absolute
            assert project_service.projects[test_project_name] == expected_absolute_path
            updated_project = await project_service.repository.get_by_name(test_project_name)
            assert updated_project is not None
            assert updated_project.path == expected_absolute_path
        finally:
            # Clean up
            if test_project_name in project_service.projects:
                await project_service.remove_project(test_project_name)
@pytest.mark.asyncio
async def test_synchronize_projects_handles_case_sensitivity_bug(project_service: ProjectService):
    """Test that synchronize_projects fixes the case sensitivity bug (Personal vs personal)."""
    with tempfile.TemporaryDirectory() as temp_dir:
        # Simulate the exact bug scenario: config has "Personal" but database expects "personal"
        config_name = "Personal"
        normalized_name = "personal"
        test_root = Path(temp_dir)
        test_project_path = str(test_root / "personal-project")
        # Make sure the test directory exists
        os.makedirs(test_project_path, exist_ok=True)
        config_manager = ConfigManager()
        try:
            # Add project with uppercase name to config (simulating the bug scenario)
            config = config_manager.load_config()
            config.projects[config_name] = test_project_path
            config_manager.save_config(config)
            # Verify the uppercase name is in config
            assert config_name in project_service.projects
            assert project_service.projects[config_name] == test_project_path
            # Call synchronize_projects - this should fix the case sensitivity issue
            await project_service.synchronize_projects()
            # Verify the config was updated to use normalized case
            assert normalized_name in project_service.projects
            assert config_name not in project_service.projects
            assert project_service.projects[normalized_name] == test_project_path
            # Verify the project exists in database with correct normalized name
            db_project = await project_service.repository.get_by_name(normalized_name)
            assert db_project is not None
            assert db_project.name == normalized_name
            assert db_project.path == test_project_path
            # Verify we can now switch to this project without case sensitivity errors
            # (This would have failed before the fix with "Personal" != "personal")
            project_lookup = await project_service.get_project(normalized_name)
            assert project_lookup is not None
            assert project_lookup.name == normalized_name
        finally:
            # Clean up
            for name in [config_name, normalized_name]:
                if name in project_service.projects:
                    try:
                        await project_service.remove_project(name)
                    except Exception:
                        # Manual cleanup if needed
                        try:
                            config_manager.remove_project(name)
                        except Exception:
                            pass
                        db_project = await project_service.repository.get_by_name(name)
                        if db_project:
                            await project_service.repository.delete(db_project.id)
@pytest.mark.skipif(os.name == "nt", reason="Project root constraints only tested on POSIX systems")
@pytest.mark.asyncio
async def test_add_project_with_project_root_sanitizes_paths(
    project_service: ProjectService, config_manager: ConfigManager, monkeypatch
):
    """Test that BASIC_MEMORY_PROJECT_ROOT uses sanitized project name, ignoring user path.
    When project_root is set (cloud mode), the system should:
    1. Ignore the user's provided path completely
    2. Use the sanitized project name as the directory name
    3. Create a flat structure: /app/data/test-bisync instead of /app/data/documents/test bisync
    This prevents the bisync auto-discovery bug where nested paths caused duplicate project creation.
    """
    with tempfile.TemporaryDirectory() as temp_dir:
        # Set up project root environment
        project_root_path = Path(temp_dir) / "app" / "data"
        project_root_path.mkdir(parents=True, exist_ok=True)
        monkeypatch.setenv("BASIC_MEMORY_PROJECT_ROOT", str(project_root_path))
        # Invalidate config cache so it picks up the new env var
        from basic_memory import config as config_module
        config_module._CONFIG_CACHE = None
        test_cases = [
            # (project_name, user_path, expected_sanitized_name)
            # User path is IGNORED - only project name matters
            ("test", "anything/path", "test"),
            (
                "Test BiSync",
                "~/Documents/Test BiSync",
                "test-bi-sync",
            ),  # BiSync -> bi-sync (dash preserved)
            ("My Project", "/tmp/whatever", "my-project"),
            ("UPPERCASE", "~", "uppercase"),
            ("With Spaces", "~/Documents/With Spaces", "with-spaces"),
        ]
        for i, (project_name, user_path, expected_sanitized) in enumerate(test_cases):
            test_project_name = f"{project_name}-{i}"  # Make unique
            expected_final_segment = f"{expected_sanitized}-{i}"
            try:
                # Add the project - user_path should be ignored
                await project_service.add_project(test_project_name, user_path)
                # Verify the path uses sanitized project name, not user path
                assert test_project_name in project_service.projects
                actual_path = project_service.projects[test_project_name]
                # The path should be under project_root (resolve both to handle macOS /private/var)
                assert (
                    Path(actual_path).resolve().is_relative_to(Path(project_root_path).resolve())
                ), f"Path {actual_path} should be under {project_root_path}"
                # Verify the final path segment is the sanitized project name
                path_parts = Path(actual_path).parts
                final_segment = path_parts[-1]
                assert final_segment == expected_final_segment, (
                    f"Expected path segment '{expected_final_segment}', got '{final_segment}'"
                )
                # Clean up
                await project_service.remove_project(test_project_name)
            except ValueError as e:
                pytest.fail(f"Unexpected ValueError for project {test_project_name}: {e}")
@pytest.mark.skipif(os.name == "nt", reason="Project root constraints only tested on POSIX systems")
@pytest.mark.asyncio
async def test_add_project_with_project_root_rejects_escape_attempts(
    project_service: ProjectService, config_manager: ConfigManager, monkeypatch
):
    """Test that BASIC_MEMORY_PROJECT_ROOT rejects paths that try to escape the project root."""
    with tempfile.TemporaryDirectory() as temp_dir:
        # Set up project root environment
        project_root_path = Path(temp_dir) / "app" / "data"
        project_root_path.mkdir(parents=True, exist_ok=True)
        # Create a directory outside project_root to verify it's not accessible
        outside_dir = Path(temp_dir) / "outside"
        outside_dir.mkdir(parents=True, exist_ok=True)
        monkeypatch.setenv("BASIC_MEMORY_PROJECT_ROOT", str(project_root_path))
        # Invalidate config cache so it picks up the new env var
        from basic_memory import config as config_module
        config_module._CONFIG_CACHE = None
        # All of these should succeed by being sanitized to paths under project_root
        # The sanitization removes dangerous patterns, so they don't escape
        safe_after_sanitization = [
            "../../../etc/passwd",
            "../../.env",
            "../../../home/user/.ssh/id_rsa",
        ]
        for i, attack_path in enumerate(safe_after_sanitization):
            test_project_name = f"project-root-attack-test-{i}"
            try:
                # Add the project
                await project_service.add_project(test_project_name, attack_path)
                # Verify it was sanitized to be under project_root (resolve to handle macOS /private/var)
                actual_path = project_service.projects[test_project_name]
                assert (
                    Path(actual_path).resolve().is_relative_to(Path(project_root_path).resolve())
                ), f"Sanitized path {actual_path} should be under {project_root_path}"
                # Clean up
                await project_service.remove_project(test_project_name)
            except ValueError:
                # If it raises ValueError, that's also acceptable for security
                pass
@pytest.mark.skipif(os.name == "nt", reason="Project root constraints only tested on POSIX systems")
@pytest.mark.asyncio
async def test_add_project_without_project_root_allows_arbitrary_paths(
    project_service: ProjectService, config_manager: ConfigManager, monkeypatch
):
    """Test that without BASIC_MEMORY_PROJECT_ROOT set, arbitrary paths are allowed."""
    with tempfile.TemporaryDirectory() as temp_dir:
        # Ensure project_root is not set
        if "BASIC_MEMORY_PROJECT_ROOT" in os.environ:
            monkeypatch.delenv("BASIC_MEMORY_PROJECT_ROOT")
        # Force reload config without project_root
        from basic_memory.services import project_service as ps_module
        monkeypatch.setattr(ps_module, "config", config_manager.load_config())
        # Create a test directory
        test_dir = Path(temp_dir) / "arbitrary-location"
        test_dir.mkdir(parents=True, exist_ok=True)
        test_project_name = "no-project-root-test"
        try:
            # Without project_root, we should be able to use arbitrary absolute paths
            await project_service.add_project(test_project_name, str(test_dir))
            # Verify the path was accepted as-is
            assert test_project_name in project_service.projects
            actual_path = project_service.projects[test_project_name]
            assert actual_path == str(test_dir)
        finally:
            # Clean up
            if test_project_name in project_service.projects:
                await project_service.remove_project(test_project_name)
@pytest.mark.skip(
    reason="Obsolete: project_root mode now uses sanitized project name, not user path. See test_add_project_with_project_root_sanitizes_paths instead."
)
@pytest.mark.skipif(os.name == "nt", reason="Project root constraints only tested on POSIX systems")
@pytest.mark.asyncio
async def test_add_project_with_project_root_normalizes_case(
    project_service: ProjectService, config_manager: ConfigManager, monkeypatch
):
    """Test that BASIC_MEMORY_PROJECT_ROOT normalizes paths to lowercase.
    NOTE: This test is obsolete. After fixing the bisync duplicate project bug,
    project_root mode now ignores the user's path and uses the sanitized project name instead.
    """
    with tempfile.TemporaryDirectory() as temp_dir:
        # Set up project root environment
        project_root_path = Path(temp_dir) / "app" / "data"
        project_root_path.mkdir(parents=True, exist_ok=True)
        monkeypatch.setenv("BASIC_MEMORY_PROJECT_ROOT", str(project_root_path))
        # Invalidate config cache so it picks up the new env var
        from basic_memory import config as config_module
        config_module._CONFIG_CACHE = None
        test_cases = [
            # (input_path, expected_normalized_path)
            ("Documents/my-project", str(project_root_path / "documents" / "my-project")),
            ("UPPERCASE/PATH", str(project_root_path / "uppercase" / "path")),
            ("MixedCase/Path", str(project_root_path / "mixedcase" / "path")),
            ("documents/Test-TWO", str(project_root_path / "documents" / "test-two")),
        ]
        for i, (input_path, expected_path) in enumerate(test_cases):
            test_project_name = f"case-normalize-test-{i}"
            try:
                # Add the project
                await project_service.add_project(test_project_name, input_path)
                # Verify the path was normalized to lowercase (resolve both to handle macOS /private/var)
                assert test_project_name in project_service.projects
                actual_path = project_service.projects[test_project_name]
                assert Path(actual_path).resolve() == Path(expected_path).resolve(), (
                    f"Expected path {expected_path} but got {actual_path} for input {input_path}"
                )
                # Clean up
                await project_service.remove_project(test_project_name)
            except ValueError as e:
                pytest.fail(f"Unexpected ValueError for input path {input_path}: {e}")
@pytest.mark.skip(
    reason="Obsolete: project_root mode now uses sanitized project name, not user path."
)
@pytest.mark.skipif(os.name == "nt", reason="Project root constraints only tested on POSIX systems")
@pytest.mark.asyncio
async def test_add_project_with_project_root_detects_case_collisions(
    project_service: ProjectService, config_manager: ConfigManager, monkeypatch
):
    """Test that BASIC_MEMORY_PROJECT_ROOT detects case-insensitive path collisions.
    NOTE: This test is obsolete. After fixing the bisync duplicate project bug,
    project_root mode now ignores the user's path and uses the sanitized project name instead.
    """
    with tempfile.TemporaryDirectory() as temp_dir:
        # Set up project root environment
        project_root_path = Path(temp_dir) / "app" / "data"
        project_root_path.mkdir(parents=True, exist_ok=True)
        monkeypatch.setenv("BASIC_MEMORY_PROJECT_ROOT", str(project_root_path))
        # Invalidate config cache so it picks up the new env var
        from basic_memory import config as config_module
        config_module._CONFIG_CACHE = None
        # First, create a project with lowercase path
        first_project = "documents-project"
        await project_service.add_project(first_project, "documents/basic-memory")
        # Verify it was created with normalized lowercase path (resolve to handle macOS /private/var)
        assert first_project in project_service.projects
        first_path = project_service.projects[first_project]
        assert (
            Path(first_path).resolve()
            == (project_root_path / "documents" / "basic-memory").resolve()
        )
        # Now try to create a project with the same path but different case
        # This should be normalized to the same lowercase path and not cause a collision
        # since both will be normalized to the same path
        second_project = "documents-project-2"
        try:
            # This should succeed because both get normalized to the same lowercase path
            await project_service.add_project(second_project, "documents/basic-memory")
            # If we get here, both should have the exact same path
            second_path = project_service.projects[second_project]
            assert second_path == first_path
            # Clean up second project
            await project_service.remove_project(second_project)
        except ValueError:
            # This is expected if there's already a project with this exact path
            pass
        # Clean up
        await project_service.remove_project(first_project)
@pytest.mark.asyncio
async def test_add_project_rejects_nested_child_path(project_service: ProjectService):
    """Test that adding a project nested under an existing project fails."""
    parent_project_name = f"parent-project-{os.urandom(4).hex()}"
    # Use a completely separate temp directory to avoid fixture conflicts
    with tempfile.TemporaryDirectory() as temp_dir:
        test_root = Path(temp_dir)
        parent_path = (test_root / "parent").as_posix()
        # Create parent directory
        os.makedirs(parent_path, exist_ok=True)
        try:
            # Add parent project
            await project_service.add_project(parent_project_name, parent_path)
            # Try to add a child project nested under parent
            child_project_name = f"child-project-{os.urandom(4).hex()}"
            child_path = (test_root / "parent" / "child").as_posix()
            with pytest.raises(ValueError, match="nested within existing project"):
                await project_service.add_project(child_project_name, child_path)
        finally:
            # Clean up
            if parent_project_name in project_service.projects:
                await project_service.remove_project(parent_project_name)
@pytest.mark.asyncio
async def test_add_project_rejects_parent_path_over_existing_child(project_service: ProjectService):
    """Test that adding a parent project over an existing nested project fails."""
    child_project_name = f"child-project-{os.urandom(4).hex()}"
    # Use a completely separate temp directory to avoid fixture conflicts
    with tempfile.TemporaryDirectory() as temp_dir:
        test_root = Path(temp_dir)
        child_path = (test_root / "parent" / "child").as_posix()
        # Create child directory
        os.makedirs(child_path, exist_ok=True)
        try:
            # Add child project
            await project_service.add_project(child_project_name, child_path)
            # Try to add a parent project that contains the child
            parent_project_name = f"parent-project-{os.urandom(4).hex()}"
            parent_path = (test_root / "parent").as_posix()
            with pytest.raises(ValueError, match="is nested within this path"):
                await project_service.add_project(parent_project_name, parent_path)
        finally:
            # Clean up
            if child_project_name in project_service.projects:
                await project_service.remove_project(child_project_name)
@pytest.mark.asyncio
async def test_add_project_allows_sibling_paths(project_service: ProjectService):
    """Test that adding sibling projects (same level, different directories) succeeds."""
    project1_name = f"sibling-project-1-{os.urandom(4).hex()}"
    project2_name = f"sibling-project-2-{os.urandom(4).hex()}"
    # Use a completely separate temp directory to avoid fixture conflicts
    with tempfile.TemporaryDirectory() as temp_dir:
        test_root = Path(temp_dir)
        project1_path = (test_root / "sibling1").as_posix()
        project2_path = (test_root / "sibling2").as_posix()
        # Create directories
        os.makedirs(project1_path, exist_ok=True)
        os.makedirs(project2_path, exist_ok=True)
        try:
            # Add first sibling project
            await project_service.add_project(project1_name, project1_path)
            # Add second sibling project (should succeed)
            await project_service.add_project(project2_name, project2_path)
            # Verify both exist
            assert project1_name in project_service.projects
            assert project2_name in project_service.projects
        finally:
            # Clean up
            if project1_name in project_service.projects:
                await project_service.remove_project(project1_name)
            if project2_name in project_service.projects:
                await project_service.remove_project(project2_name)
@pytest.mark.asyncio
async def test_add_project_rejects_deeply_nested_path(project_service: ProjectService):
    """Test that deeply nested paths are also rejected."""
    root_project_name = f"root-project-{os.urandom(4).hex()}"
    # Use a completely separate temp directory to avoid fixture conflicts
    with tempfile.TemporaryDirectory() as temp_dir:
        test_root = Path(temp_dir)
        root_path = (test_root / "root").as_posix()
        # Create root directory
        os.makedirs(root_path, exist_ok=True)
        try:
            # Add root project
            await project_service.add_project(root_project_name, root_path)
            # Try to add a deeply nested project
            nested_project_name = f"nested-project-{os.urandom(4).hex()}"
            nested_path = (test_root / "root" / "level1" / "level2" / "level3").as_posix()
            with pytest.raises(ValueError, match="nested within existing project"):
                await project_service.add_project(nested_project_name, nested_path)
        finally:
            # Clean up
            if root_project_name in project_service.projects:
                await project_service.remove_project(root_project_name)
@pytest.mark.skipif(os.name == "nt", reason="Project root constraints only tested on POSIX systems")
@pytest.mark.asyncio
async def test_add_project_nested_validation_with_project_root(
    project_service: ProjectService, config_manager: ConfigManager, monkeypatch
):
    """Test that nested path validation works with BASIC_MEMORY_PROJECT_ROOT set."""
    # Use a completely separate temp directory to avoid fixture conflicts
    with tempfile.TemporaryDirectory() as temp_dir:
        project_root_path = Path(temp_dir) / "app" / "data"
        project_root_path.mkdir(parents=True, exist_ok=True)
        monkeypatch.setenv("BASIC_MEMORY_PROJECT_ROOT", str(project_root_path))
        # Invalidate config cache
        from basic_memory import config as config_module
        config_module._CONFIG_CACHE = None
        parent_project_name = f"cloud-parent-{os.urandom(4).hex()}"
        child_project_name = f"cloud-child-{os.urandom(4).hex()}"
        try:
            # Add parent project - user path is ignored, uses sanitized project name
            await project_service.add_project(parent_project_name, "parent-folder")
            # Verify it was created using sanitized project name, not user path
            assert parent_project_name in project_service.projects
            parent_actual_path = project_service.projects[parent_project_name]
            # Path should use sanitized project name (cloud-parent-xxx -> cloud-parent-xxx)
            # NOT the user-provided path "parent-folder"
            assert parent_project_name.lower() in parent_actual_path.lower()
            # Resolve both to handle macOS /private/var vs /var
            assert (
                Path(parent_actual_path).resolve().is_relative_to(Path(project_root_path).resolve())
            )
            # Nested projects should still be prevented, even with user path ignored
            # Since paths use project names, this won't actually be nested
            # But we can test that two projects can coexist
            await project_service.add_project(child_project_name, "parent-folder/child-folder")
            # Both should exist with their own paths
            assert child_project_name in project_service.projects
            child_actual_path = project_service.projects[child_project_name]
            assert child_project_name.lower() in child_actual_path.lower()
            # Clean up child
            await project_service.remove_project(child_project_name)
        finally:
            # Clean up
            if parent_project_name in project_service.projects:
                await project_service.remove_project(parent_project_name)
@pytest.mark.asyncio
async def test_synchronize_projects_removes_db_only_projects(project_service: ProjectService):
    """Test that synchronize_projects removes projects that exist in DB but not in config.
    This is a regression test for issue #193 where deleted projects would be re-added
    to config during synchronization, causing them to reappear after deletion.
    Config is the source of truth - if a project is deleted from config, it should be
    removed from the database during synchronization.
    """
    test_project_name = f"test-db-only-{os.urandom(4).hex()}"
    with tempfile.TemporaryDirectory() as temp_dir:
        test_root = Path(temp_dir)
        test_project_path = str(test_root / "test-db-only")
        # Make sure the test directory exists
        os.makedirs(test_project_path, exist_ok=True)
        try:
            # Add project to database only (not to config) - simulating orphaned DB entry
            project_data = {
                "name": test_project_name,
                "path": test_project_path,
                "permalink": test_project_name.lower().replace(" ", "-"),
                "is_active": True,
            }
            await project_service.repository.create(project_data)
            # Verify it exists in DB but not in config
            db_project = await project_service.repository.get_by_name(test_project_name)
            assert db_project is not None
            assert test_project_name not in project_service.projects
            # Call synchronize_projects - this should remove the orphaned DB entry
            # because config is the source of truth
            await project_service.synchronize_projects()
            # Verify project was removed from database
            db_project_after = await project_service.repository.get_by_name(test_project_name)
            assert db_project_after is None, (
                "Project should be removed from DB when not in config (config is source of truth)"
            )
            # Verify it's still not in config
            assert test_project_name not in project_service.projects
        finally:
            # Clean up if needed
            db_project = await project_service.repository.get_by_name(test_project_name)
            if db_project:
                await project_service.repository.delete(db_project.id)
@pytest.mark.asyncio
async def test_remove_project_with_delete_notes_false(project_service: ProjectService):
    """Test that remove_project with delete_notes=False keeps directory intact."""
    test_project_name = f"test-remove-keep-{os.urandom(4).hex()}"
    with tempfile.TemporaryDirectory() as temp_dir:
        test_root = Path(temp_dir)
        test_project_path = test_root / "test-project"
        test_project_path.mkdir()
        test_file = test_project_path / "test.md"
        test_file.write_text("# Test Note")
        try:
            # Add project
            await project_service.add_project(test_project_name, str(test_project_path))
            # Verify project exists
            assert test_project_name in project_service.projects
            assert test_project_path.exists()
            assert test_file.exists()
            # Remove project without deleting notes (default behavior)
            await project_service.remove_project(test_project_name, delete_notes=False)
            # Verify project is removed from config/db
            assert test_project_name not in project_service.projects
            db_project = await project_service.repository.get_by_name(test_project_name)
            assert db_project is None
            # Verify directory and files still exist
            assert test_project_path.exists()
            assert test_file.exists()
        finally:
            # Cleanup happens automatically with temp_dir context manager
            pass
@pytest.mark.asyncio
async def test_remove_project_with_delete_notes_true(project_service: ProjectService):
    """Test that remove_project with delete_notes=True deletes directory."""
    test_project_name = f"test-remove-delete-{os.urandom(4).hex()}"
    with tempfile.TemporaryDirectory() as temp_dir:
        test_root = Path(temp_dir)
        test_project_path = test_root / "test-project"
        test_project_path.mkdir()
        test_file = test_project_path / "test.md"
        test_file.write_text("# Test Note")
        try:
            # Add project
            await project_service.add_project(test_project_name, str(test_project_path))
            # Verify project exists
            assert test_project_name in project_service.projects
            assert test_project_path.exists()
            assert test_file.exists()
            # Remove project with delete_notes=True
            await project_service.remove_project(test_project_name, delete_notes=True)
            # Verify project is removed from config/db
            assert test_project_name not in project_service.projects
            db_project = await project_service.repository.get_by_name(test_project_name)
            assert db_project is None
            # Verify directory and files are deleted
            assert not test_project_path.exists()
        finally:
            # Cleanup happens automatically with temp_dir context manager
            pass
@pytest.mark.asyncio
async def test_remove_project_delete_notes_missing_directory(project_service: ProjectService):
    """Test that remove_project with delete_notes=True handles missing directory gracefully."""
    test_project_name = f"test-remove-missing-{os.urandom(4).hex()}"
    test_project_path = f"/tmp/nonexistent-directory-{os.urandom(8).hex()}"
    try:
        # Add project pointing to non-existent path
        await project_service.add_project(test_project_name, test_project_path)
        # Verify project exists in config/db
        assert test_project_name in project_service.projects
        db_project = await project_service.repository.get_by_name(test_project_name)
        assert db_project is not None
        # Remove project with delete_notes=True (should not fail even if dir doesn't exist)
        await project_service.remove_project(test_project_name, delete_notes=True)
        # Verify project is removed from config/db
        assert test_project_name not in project_service.projects
        db_project = await project_service.repository.get_by_name(test_project_name)
        assert db_project is None
    finally:
        # Ensure cleanup
        if test_project_name in project_service.projects:
            try:
                project_service.config_manager.remove_project(test_project_name)
            except Exception:
                pass
```
--------------------------------------------------------------------------------
/tests/services/test_entity_service.py:
--------------------------------------------------------------------------------
```python
"""Tests for EntityService."""
from pathlib import Path
from textwrap import dedent
import pytest
import yaml
from basic_memory.config import ProjectConfig, BasicMemoryConfig
from basic_memory.markdown import EntityParser
from basic_memory.models import Entity as EntityModel
from basic_memory.repository import EntityRepository
from basic_memory.schemas import Entity as EntitySchema
from basic_memory.services import FileService
from basic_memory.services.entity_service import EntityService
from basic_memory.services.exceptions import EntityCreationError, EntityNotFoundError
from basic_memory.services.search_service import SearchService
from basic_memory.utils import generate_permalink
@pytest.mark.asyncio
async def test_create_entity(entity_service: EntityService, file_service: FileService):
    """Test successful entity creation."""
    entity_data = EntitySchema(
        title="Test Entity",
        folder="",
        entity_type="test",
    )
    # Act
    entity = await entity_service.create_entity(entity_data)
    # Assert Entity
    assert isinstance(entity, EntityModel)
    assert entity.permalink == entity_data.permalink
    assert entity.file_path == entity_data.file_path
    assert entity.entity_type == "test"
    assert entity.created_at is not None
    assert len(entity.relations) == 0
    # Verify we can retrieve it using permalink
    retrieved = await entity_service.get_by_permalink(entity_data.permalink)
    assert retrieved.title == "Test Entity"
    assert retrieved.entity_type == "test"
    assert retrieved.created_at is not None
    # Verify file was written
    file_path = file_service.get_entity_path(entity)
    assert await file_service.exists(file_path)
    file_content, _ = await file_service.read_file(file_path)
    _, frontmatter, doc_content = file_content.split("---", 2)
    metadata = yaml.safe_load(frontmatter)
    # Verify frontmatter contents
    assert metadata["permalink"] == entity.permalink
    assert metadata["type"] == entity.entity_type
@pytest.mark.asyncio
async def test_create_entity_file_exists(entity_service: EntityService, file_service: FileService):
    """Test successful entity creation."""
    entity_data = EntitySchema(
        title="Test Entity",
        folder="",
        entity_type="test",
        content="first",
    )
    # Act
    entity = await entity_service.create_entity(entity_data)
    # Verify file was written
    file_path = file_service.get_entity_path(entity)
    assert await file_service.exists(file_path)
    file_content, _ = await file_service.read_file(file_path)
    assert (
        "---\ntitle: Test Entity\ntype: test\npermalink: test-entity\n---\n\nfirst" == file_content
    )
    entity_data = EntitySchema(
        title="Test Entity",
        folder="",
        entity_type="test",
        content="second",
    )
    with pytest.raises(EntityCreationError):
        await entity_service.create_entity(entity_data)
@pytest.mark.asyncio
async def test_create_entity_unique_permalink(
    project_config,
    entity_service: EntityService,
    file_service: FileService,
    entity_repository: EntityRepository,
):
    """Test successful entity creation."""
    entity_data = EntitySchema(
        title="Test Entity",
        folder="test",
        entity_type="test",
    )
    entity = await entity_service.create_entity(entity_data)
    # default permalink
    assert entity.permalink == generate_permalink(entity.file_path)
    # move file
    file_path = file_service.get_entity_path(entity)
    file_path.rename(project_config.home / "new_path.md")
    await entity_repository.update(entity.id, {"file_path": "new_path.md"})
    # create again
    entity2 = await entity_service.create_entity(entity_data)
    assert entity2.permalink == f"{entity.permalink}-1"
    file_path = file_service.get_entity_path(entity2)
    file_content, _ = await file_service.read_file(file_path)
    _, frontmatter, doc_content = file_content.split("---", 2)
    metadata = yaml.safe_load(frontmatter)
    # Verify frontmatter contents
    assert metadata["permalink"] == entity2.permalink
@pytest.mark.asyncio
async def test_get_by_permalink(entity_service: EntityService):
    """Test finding entity by type and name combination."""
    entity1_data = EntitySchema(
        title="TestEntity1",
        folder="test",
        entity_type="test",
    )
    entity1 = await entity_service.create_entity(entity1_data)
    entity2_data = EntitySchema(
        title="TestEntity2",
        folder="test",
        entity_type="test",
    )
    entity2 = await entity_service.create_entity(entity2_data)
    # Find by type1 and name
    found = await entity_service.get_by_permalink(entity1_data.permalink)
    assert found is not None
    assert found.id == entity1.id
    assert found.entity_type == entity1.entity_type
    # Find by type2 and name
    found = await entity_service.get_by_permalink(entity2_data.permalink)
    assert found is not None
    assert found.id == entity2.id
    assert found.entity_type == entity2.entity_type
    # Test not found case
    with pytest.raises(EntityNotFoundError):
        await entity_service.get_by_permalink("nonexistent/test_entity")
@pytest.mark.asyncio
async def test_get_entity_success(entity_service: EntityService):
    """Test successful entity retrieval."""
    entity_data = EntitySchema(
        title="TestEntity",
        folder="test",
        entity_type="test",
    )
    await entity_service.create_entity(entity_data)
    # Get by permalink
    retrieved = await entity_service.get_by_permalink(entity_data.permalink)
    assert isinstance(retrieved, EntityModel)
    assert retrieved.title == "TestEntity"
    assert retrieved.entity_type == "test"
@pytest.mark.asyncio
async def test_delete_entity_success(entity_service: EntityService):
    """Test successful entity deletion."""
    entity_data = EntitySchema(
        title="TestEntity",
        folder="test",
        entity_type="test",
    )
    await entity_service.create_entity(entity_data)
    # Act using permalink
    result = await entity_service.delete_entity(entity_data.permalink)
    # Assert
    assert result is True
    with pytest.raises(EntityNotFoundError):
        await entity_service.get_by_permalink(entity_data.permalink)
@pytest.mark.asyncio
async def test_delete_entity_by_id(entity_service: EntityService):
    """Test successful entity deletion."""
    entity_data = EntitySchema(
        title="TestEntity",
        folder="test",
        entity_type="test",
    )
    created = await entity_service.create_entity(entity_data)
    # Act using permalink
    result = await entity_service.delete_entity(created.id)
    # Assert
    assert result is True
    with pytest.raises(EntityNotFoundError):
        await entity_service.get_by_permalink(entity_data.permalink)
@pytest.mark.asyncio
async def test_get_entity_by_permalink_not_found(entity_service: EntityService):
    """Test handling of non-existent entity retrieval."""
    with pytest.raises(EntityNotFoundError):
        await entity_service.get_by_permalink("test/non_existent")
@pytest.mark.asyncio
async def test_delete_nonexistent_entity(entity_service: EntityService):
    """Test deleting an entity that doesn't exist."""
    assert await entity_service.delete_entity("test/non_existent") is True
@pytest.mark.asyncio
async def test_create_entity_with_special_chars(entity_service: EntityService):
    """Test entity creation with special characters in name and description."""
    name = "TestEntity_$pecial chars & symbols!"  # Note: Using valid path characters
    entity_data = EntitySchema(
        title=name,
        folder="test",
        entity_type="test",
    )
    entity = await entity_service.create_entity(entity_data)
    assert entity.title == name
    # Verify after retrieval using permalink
    await entity_service.get_by_permalink(entity_data.permalink)
@pytest.mark.asyncio
async def test_get_entities_by_permalinks(entity_service: EntityService):
    """Test opening multiple entities by path IDs."""
    # Create test entities
    entity1_data = EntitySchema(
        title="Entity1",
        folder="test",
        entity_type="test",
    )
    entity2_data = EntitySchema(
        title="Entity2",
        folder="test",
        entity_type="test",
    )
    await entity_service.create_entity(entity1_data)
    await entity_service.create_entity(entity2_data)
    # Open nodes by path IDs
    permalinks = [entity1_data.permalink, entity2_data.permalink]
    found = await entity_service.get_entities_by_permalinks(permalinks)
    assert len(found) == 2
    names = {e.title for e in found}
    assert names == {"Entity1", "Entity2"}
@pytest.mark.asyncio
async def test_get_entities_empty_input(entity_service: EntityService):
    """Test opening nodes with empty path ID list."""
    found = await entity_service.get_entities_by_permalinks([])
    assert len(found) == 0
@pytest.mark.asyncio
async def test_get_entities_some_not_found(entity_service: EntityService):
    """Test opening nodes with mix of existing and non-existent path IDs."""
    # Create one test entity
    entity_data = EntitySchema(
        title="Entity1",
        folder="test",
        entity_type="test",
    )
    await entity_service.create_entity(entity_data)
    # Try to open two nodes, one exists, one doesn't
    permalinks = [entity_data.permalink, "type1/non_existent"]
    found = await entity_service.get_entities_by_permalinks(permalinks)
    assert len(found) == 1
    assert found[0].title == "Entity1"
@pytest.mark.asyncio
async def test_get_entity_path(entity_service: EntityService):
    """Should generate correct filesystem path for entity."""
    entity = EntityModel(
        permalink="test-entity",
        file_path="test-entity.md",
        entity_type="test",
    )
    path = entity_service.file_service.get_entity_path(entity)
    assert path == Path(entity_service.file_service.base_path / "test-entity.md")
@pytest.mark.asyncio
async def test_update_note_entity_content(entity_service: EntityService, file_service: FileService):
    """Should update note content directly."""
    # Create test entity
    schema = EntitySchema(
        title="test",
        folder="test",
        entity_type="note",
        entity_metadata={"status": "draft"},
    )
    entity = await entity_service.create_entity(schema)
    assert entity.entity_metadata.get("status") == "draft"
    # Update content with a relation
    schema.content = """
# Updated [[Content]]
- references [[new content]]
- [note] This is new content.
"""
    updated = await entity_service.update_entity(entity, schema)
    # Verify file has new content but preserved metadata
    file_path = file_service.get_entity_path(updated)
    content, _ = await file_service.read_file(file_path)
    assert "# Updated [[Content]]" in content
    assert "- references [[new content]]" in content
    assert "- [note] This is new content" in content
    # Verify metadata was preserved
    _, frontmatter, _ = content.split("---", 2)
    metadata = yaml.safe_load(frontmatter)
    assert metadata.get("status") == "draft"
@pytest.mark.asyncio
async def test_create_or_update_new(entity_service: EntityService, file_service: FileService):
    """Should create a new entity."""
    # Create test entity
    entity, created = await entity_service.create_or_update_entity(
        EntitySchema(
            title="test",
            folder="test",
            entity_type="test",
            entity_metadata={"status": "draft"},
        )
    )
    assert entity.title == "test"
    assert created is True
@pytest.mark.asyncio
async def test_create_or_update_existing(entity_service: EntityService, file_service: FileService):
    """Should update entity name in both DB and frontmatter."""
    # Create test entity
    entity = await entity_service.create_entity(
        EntitySchema(
            title="test",
            folder="test",
            entity_type="test",
            content="Test entity",
            entity_metadata={"status": "final"},
        )
    )
    entity.content = "Updated content"
    # Update name
    updated, created = await entity_service.create_or_update_entity(entity)
    assert updated.title == "test"
    assert updated.entity_metadata["status"] == "final"
    assert created is False
@pytest.mark.asyncio
async def test_create_with_content(entity_service: EntityService, file_service: FileService):
    # contains frontmatter
    content = dedent(
        """
        ---
        permalink: git-workflow-guide
        ---
        # Git Workflow Guide
                
        A guide to our [[Git]] workflow. This uses some ideas from [[Trunk Based Development]].
        
        ## Best Practices
        Use branches effectively:
        - [design] Keep feature branches short-lived #git #workflow (Reduces merge conflicts)
        - implements [[Branch Strategy]] (Our standard workflow)
        
        ## Common Commands
        See the [[Git Cheat Sheet]] for reference.
        """
    )
    # Create test entity
    entity, created = await entity_service.create_or_update_entity(
        EntitySchema(
            title="Git Workflow Guide",
            folder="test",
            entity_type="test",
            content=content,
        )
    )
    assert created is True
    assert entity.title == "Git Workflow Guide"
    assert entity.entity_type == "test"
    assert entity.permalink == "git-workflow-guide"
    assert entity.file_path == "test/Git Workflow Guide.md"
    assert len(entity.observations) == 1
    assert entity.observations[0].category == "design"
    assert entity.observations[0].content == "Keep feature branches short-lived #git #workflow"
    assert set(entity.observations[0].tags) == {"git", "workflow"}
    assert entity.observations[0].context == "Reduces merge conflicts"
    assert len(entity.relations) == 4
    assert entity.relations[0].relation_type == "links to"
    assert entity.relations[0].to_name == "Git"
    assert entity.relations[1].relation_type == "links to"
    assert entity.relations[1].to_name == "Trunk Based Development"
    assert entity.relations[2].relation_type == "implements"
    assert entity.relations[2].to_name == "Branch Strategy"
    assert entity.relations[2].context == "Our standard workflow"
    assert entity.relations[3].relation_type == "links to"
    assert entity.relations[3].to_name == "Git Cheat Sheet"
    # Verify file has new content but preserved metadata
    file_path = file_service.get_entity_path(entity)
    file_content, _ = await file_service.read_file(file_path)
    # assert file
    # note the permalink value is corrected
    expected = dedent("""
        ---
        title: Git Workflow Guide
        type: test
        permalink: git-workflow-guide
        ---
        
        # Git Workflow Guide
                
        A guide to our [[Git]] workflow. This uses some ideas from [[Trunk Based Development]].
        
        ## Best Practices
        Use branches effectively:
        - [design] Keep feature branches short-lived #git #workflow (Reduces merge conflicts)
        - implements [[Branch Strategy]] (Our standard workflow)
        
        ## Common Commands
        See the [[Git Cheat Sheet]] for reference.
        """).strip()
    assert expected == file_content
@pytest.mark.asyncio
async def test_update_with_content(entity_service: EntityService, file_service: FileService):
    content = """# Git Workflow Guide"""
    # Create test entity
    entity, created = await entity_service.create_or_update_entity(
        EntitySchema(
            title="Git Workflow Guide",
            entity_type="test",
            folder="test",
            content=content,
        )
    )
    assert created is True
    assert entity.title == "Git Workflow Guide"
    assert len(entity.observations) == 0
    assert len(entity.relations) == 0
    # Verify file has new content but preserved metadata
    file_path = file_service.get_entity_path(entity)
    file_content, _ = await file_service.read_file(file_path)
    # assert content is in file
    assert (
        dedent(
            """
            ---
            title: Git Workflow Guide
            type: test
            permalink: test/git-workflow-guide
            ---
            
            # Git Workflow Guide
            """
        ).strip()
        == file_content
    )
    # now update the content
    update_content = dedent(
        """
        ---
        title: Git Workflow Guide
        type: test
        permalink: git-workflow-guide
        ---
        
        # Git Workflow Guide
        
        A guide to our [[Git]] workflow. This uses some ideas from [[Trunk Based Development]].
        
        ## Best Practices
        Use branches effectively:
        - [design] Keep feature branches short-lived #git #workflow (Reduces merge conflicts)
        - implements [[Branch Strategy]] (Our standard workflow)
        
        ## Common Commands
        See the [[Git Cheat Sheet]] for reference.
        """
    ).strip()
    # update entity
    entity, created = await entity_service.create_or_update_entity(
        EntitySchema(
            title="Git Workflow Guide",
            folder="test",
            entity_type="test",
            content=update_content,
        )
    )
    assert created is False
    assert entity.title == "Git Workflow Guide"
    # assert custom permalink value
    assert entity.permalink == "git-workflow-guide"
    assert len(entity.observations) == 1
    assert entity.observations[0].category == "design"
    assert entity.observations[0].content == "Keep feature branches short-lived #git #workflow"
    assert set(entity.observations[0].tags) == {"git", "workflow"}
    assert entity.observations[0].context == "Reduces merge conflicts"
    assert len(entity.relations) == 4
    assert entity.relations[0].relation_type == "links to"
    assert entity.relations[0].to_name == "Git"
    assert entity.relations[1].relation_type == "links to"
    assert entity.relations[1].to_name == "Trunk Based Development"
    assert entity.relations[2].relation_type == "implements"
    assert entity.relations[2].to_name == "Branch Strategy"
    assert entity.relations[2].context == "Our standard workflow"
    assert entity.relations[3].relation_type == "links to"
    assert entity.relations[3].to_name == "Git Cheat Sheet"
    # Verify file has new content but preserved metadata
    file_path = file_service.get_entity_path(entity)
    file_content, _ = await file_service.read_file(file_path)
    # assert content is in file
    assert update_content.strip() == file_content
@pytest.mark.asyncio
async def test_create_with_no_frontmatter(
    project_config: ProjectConfig,
    entity_parser: EntityParser,
    entity_service: EntityService,
    file_service: FileService,
):
    # contains no frontmatter
    content = "# Git Workflow Guide"
    file_path = Path("test/Git Workflow Guide.md")
    full_path = project_config.home / file_path
    await file_service.write_file(Path(full_path), content)
    entity_markdown = await entity_parser.parse_file(full_path)
    created = await entity_service.create_entity_from_markdown(file_path, entity_markdown)
    file_content, _ = await file_service.read_file(created.file_path)
    assert file_path.as_posix() == created.file_path
    assert created.title == "Git Workflow Guide"
    assert created.entity_type == "note"
    assert created.permalink is None
    # assert file
    expected = dedent("""
        # Git Workflow Guide
        """).strip()
    assert expected == file_content
@pytest.mark.asyncio
async def test_edit_entity_append(entity_service: EntityService, file_service: FileService):
    """Test appending content to an entity."""
    # Create test entity
    entity = await entity_service.create_entity(
        EntitySchema(
            title="Test Note",
            folder="test",
            entity_type="note",
            content="Original content",
        )
    )
    # Edit entity with append operation
    updated = await entity_service.edit_entity(
        identifier=entity.permalink, operation="append", content="Appended content"
    )
    # Verify content was appended
    file_path = file_service.get_entity_path(updated)
    file_content, _ = await file_service.read_file(file_path)
    assert "Original content" in file_content
    assert "Appended content" in file_content
    assert file_content.index("Original content") < file_content.index("Appended content")
@pytest.mark.asyncio
async def test_edit_entity_prepend(entity_service: EntityService, file_service: FileService):
    """Test prepending content to an entity."""
    # Create test entity
    entity = await entity_service.create_entity(
        EntitySchema(
            title="Test Note",
            folder="test",
            entity_type="note",
            content="Original content",
        )
    )
    # Edit entity with prepend operation
    updated = await entity_service.edit_entity(
        identifier=entity.permalink, operation="prepend", content="Prepended content"
    )
    # Verify content was prepended
    file_path = file_service.get_entity_path(updated)
    file_content, _ = await file_service.read_file(file_path)
    assert "Original content" in file_content
    assert "Prepended content" in file_content
    assert file_content.index("Prepended content") < file_content.index("Original content")
@pytest.mark.asyncio
async def test_edit_entity_find_replace(entity_service: EntityService, file_service: FileService):
    """Test find and replace operation on an entity."""
    # Create test entity with specific content to replace
    entity = await entity_service.create_entity(
        EntitySchema(
            title="Test Note",
            folder="test",
            entity_type="note",
            content="This is old content that needs updating",
        )
    )
    # Edit entity with find_replace operation
    updated = await entity_service.edit_entity(
        identifier=entity.permalink,
        operation="find_replace",
        content="new content",
        find_text="old content",
    )
    # Verify content was replaced
    file_path = file_service.get_entity_path(updated)
    file_content, _ = await file_service.read_file(file_path)
    assert "old content" not in file_content
    assert "This is new content that needs updating" in file_content
@pytest.mark.asyncio
async def test_edit_entity_replace_section(
    entity_service: EntityService, file_service: FileService
):
    """Test replacing a specific section in an entity."""
    # Create test entity with sections
    content = dedent("""
        # Main Title
        
        ## Section 1
        Original section 1 content
        
        ## Section 2
        Original section 2 content
        """).strip()
    entity = await entity_service.create_entity(
        EntitySchema(
            title="Sample Note",
            folder="docs",
            entity_type="note",
            content=content,
        )
    )
    # Edit entity with replace_section operation
    updated = await entity_service.edit_entity(
        identifier=entity.permalink,
        operation="replace_section",
        content="New section 1 content",
        section="## Section 1",
    )
    # Verify section was replaced
    file_path = file_service.get_entity_path(updated)
    file_content, _ = await file_service.read_file(file_path)
    assert "New section 1 content" in file_content
    assert "Original section 1 content" not in file_content
    assert "Original section 2 content" in file_content  # Other sections preserved
@pytest.mark.asyncio
async def test_edit_entity_replace_section_create_new(
    entity_service: EntityService, file_service: FileService
):
    """Test replacing a section that doesn't exist creates it."""
    # Create test entity without the section
    entity = await entity_service.create_entity(
        EntitySchema(
            title="Test Note",
            folder="test",
            entity_type="note",
            content="# Main Title\n\nSome content",
        )
    )
    # Edit entity with replace_section operation for non-existent section
    updated = await entity_service.edit_entity(
        identifier=entity.permalink,
        operation="replace_section",
        content="New section content",
        section="## New Section",
    )
    # Verify section was created
    file_path = file_service.get_entity_path(updated)
    file_content, _ = await file_service.read_file(file_path)
    assert "## New Section" in file_content
    assert "New section content" in file_content
@pytest.mark.asyncio
async def test_edit_entity_not_found(entity_service: EntityService):
    """Test editing a non-existent entity raises error."""
    with pytest.raises(EntityNotFoundError):
        await entity_service.edit_entity(
            identifier="non-existent", operation="append", content="content"
        )
@pytest.mark.asyncio
async def test_edit_entity_invalid_operation(entity_service: EntityService):
    """Test editing with invalid operation raises error."""
    # Create test entity
    entity = await entity_service.create_entity(
        EntitySchema(
            title="Test Note",
            folder="test",
            entity_type="note",
            content="Original content",
        )
    )
    with pytest.raises(ValueError, match="Unsupported operation"):
        await entity_service.edit_entity(
            identifier=entity.permalink, operation="invalid_operation", content="content"
        )
@pytest.mark.asyncio
async def test_edit_entity_find_replace_missing_find_text(entity_service: EntityService):
    """Test find_replace operation without find_text raises error."""
    # Create test entity
    entity = await entity_service.create_entity(
        EntitySchema(
            title="Test Note",
            folder="test",
            entity_type="note",
            content="Original content",
        )
    )
    with pytest.raises(ValueError, match="find_text is required"):
        await entity_service.edit_entity(
            identifier=entity.permalink, operation="find_replace", content="new content"
        )
@pytest.mark.asyncio
async def test_edit_entity_replace_section_missing_section(entity_service: EntityService):
    """Test replace_section operation without section parameter raises error."""
    # Create test entity
    entity = await entity_service.create_entity(
        EntitySchema(
            title="Test Note",
            folder="test",
            entity_type="note",
            content="Original content",
        )
    )
    with pytest.raises(ValueError, match="section is required"):
        await entity_service.edit_entity(
            identifier=entity.permalink, operation="replace_section", content="new content"
        )
@pytest.mark.asyncio
async def test_edit_entity_with_observations_and_relations(
    entity_service: EntityService, file_service: FileService
):
    """Test editing entity updates observations and relations correctly."""
    # Create test entity with observations and relations
    content = dedent("""
        # Test Note
        
        - [note] This is an observation
        - links to [[Other Entity]]
        
        Original content
        """).strip()
    entity = await entity_service.create_entity(
        EntitySchema(
            title="Sample Note",
            folder="docs",
            entity_type="note",
            content=content,
        )
    )
    # Verify initial state
    assert len(entity.observations) == 1
    assert len(entity.relations) == 1
    # Edit entity by appending content with new observations/relations
    updated = await entity_service.edit_entity(
        identifier=entity.permalink,
        operation="append",
        content="\n- [category] New observation\n- relates to [[New Entity]]",
    )
    # Verify observations and relations were updated
    assert len(updated.observations) == 2
    assert len(updated.relations) == 2
    # Check new observation
    new_obs = [obs for obs in updated.observations if obs.category == "category"][0]
    assert new_obs.content == "New observation"
    # Check new relation
    new_rel = [rel for rel in updated.relations if rel.to_name == "New Entity"][0]
    assert new_rel.relation_type == "relates to"
@pytest.mark.asyncio
async def test_create_entity_from_markdown_with_upsert(
    entity_service: EntityService, file_service: FileService
):
    """Test that create_entity_from_markdown uses UPSERT approach for conflict resolution."""
    file_path = Path("test/upsert-test.md")
    # Create a mock EntityMarkdown object
    from basic_memory.markdown.schemas import (
        EntityFrontmatter,
        EntityMarkdown as RealEntityMarkdown,
    )
    from datetime import datetime, timezone
    frontmatter = EntityFrontmatter(metadata={"title": "UPSERT Test", "type": "test"})
    markdown = RealEntityMarkdown(
        frontmatter=frontmatter,
        observations=[],
        relations=[],
        created=datetime.now(timezone.utc),
        modified=datetime.now(timezone.utc),
    )
    # Call the method - should succeed without complex exception handling
    result = await entity_service.create_entity_from_markdown(file_path, markdown)
    # Verify it created the entity successfully using the UPSERT approach
    assert result is not None
    assert result.title == "UPSERT Test"
    assert result.file_path == file_path.as_posix()
    # create_entity_from_markdown sets checksum to None (incomplete sync)
    assert result.checksum is None
@pytest.mark.asyncio
async def test_create_entity_from_markdown_error_handling(
    entity_service: EntityService, file_service: FileService
):
    """Test that create_entity_from_markdown handles repository errors gracefully."""
    from unittest.mock import patch
    from basic_memory.services.exceptions import EntityCreationError
    file_path = Path("test/error-test.md")
    # Create a mock EntityMarkdown object
    from basic_memory.markdown.schemas import (
        EntityFrontmatter,
        EntityMarkdown as RealEntityMarkdown,
    )
    from datetime import datetime, timezone
    frontmatter = EntityFrontmatter(metadata={"title": "Error Test", "type": "test"})
    markdown = RealEntityMarkdown(
        frontmatter=frontmatter,
        observations=[],
        relations=[],
        created=datetime.now(timezone.utc),
        modified=datetime.now(timezone.utc),
    )
    # Mock the repository.upsert_entity to raise a general error
    async def mock_upsert(*args, **kwargs):
        # Simulate a general database error
        raise Exception("Database connection failed")
    with patch.object(entity_service.repository, "upsert_entity", side_effect=mock_upsert):
        # Should wrap the error in EntityCreationError
        with pytest.raises(EntityCreationError, match="Failed to create entity"):
            await entity_service.create_entity_from_markdown(file_path, markdown)
# Edge case tests for find_replace operation
@pytest.mark.asyncio
async def test_edit_entity_find_replace_not_found(entity_service: EntityService):
    """Test find_replace operation when text is not found."""
    # Create test entity
    entity = await entity_service.create_entity(
        EntitySchema(
            title="Test Note",
            folder="test",
            entity_type="note",
            content="This is some content",
        )
    )
    # Try to replace text that doesn't exist
    with pytest.raises(ValueError, match="Text to replace not found: 'nonexistent'"):
        await entity_service.edit_entity(
            identifier=entity.permalink,
            operation="find_replace",
            content="new content",
            find_text="nonexistent",
        )
@pytest.mark.asyncio
async def test_edit_entity_find_replace_multiple_occurrences_expected_one(
    entity_service: EntityService,
):
    """Test find_replace with multiple occurrences when expecting one."""
    # Create entity with repeated text (avoiding "test" since it appears in frontmatter)
    entity = await entity_service.create_entity(
        EntitySchema(
            title="Sample Note",
            folder="docs",
            entity_type="note",
            content="The word banana appears here. Another banana word here.",
        )
    )
    # Try to replace with expected count of 1 when there are 2
    with pytest.raises(ValueError, match="Expected 1 occurrences of 'banana', but found 2"):
        await entity_service.edit_entity(
            identifier=entity.permalink,
            operation="find_replace",
            content="replacement",
            find_text="banana",
            expected_replacements=1,
        )
@pytest.mark.asyncio
async def test_edit_entity_find_replace_multiple_occurrences_success(
    entity_service: EntityService, file_service: FileService
):
    """Test find_replace with multiple occurrences when expected count matches."""
    # Create test entity with repeated text (avoiding "test" since it appears in frontmatter)
    entity = await entity_service.create_entity(
        EntitySchema(
            title="Sample Note",
            folder="docs",
            entity_type="note",
            content="The word banana appears here. Another banana word here.",
        )
    )
    # Replace with correct expected count
    updated = await entity_service.edit_entity(
        identifier=entity.permalink,
        operation="find_replace",
        content="apple",
        find_text="banana",
        expected_replacements=2,
    )
    # Verify both instances were replaced
    file_path = file_service.get_entity_path(updated)
    file_content, _ = await file_service.read_file(file_path)
    assert "The word apple appears here. Another apple word here." in file_content
@pytest.mark.asyncio
async def test_edit_entity_find_replace_empty_find_text(entity_service: EntityService):
    """Test find_replace with empty find_text."""
    # Create test entity
    entity = await entity_service.create_entity(
        EntitySchema(
            title="Test Note",
            folder="test",
            entity_type="note",
            content="Some content",
        )
    )
    # Try with empty find_text
    with pytest.raises(ValueError, match="find_text cannot be empty or whitespace only"):
        await entity_service.edit_entity(
            identifier=entity.permalink,
            operation="find_replace",
            content="new content",
            find_text="   ",  # whitespace only
        )
@pytest.mark.asyncio
async def test_edit_entity_find_replace_multiline(
    entity_service: EntityService, file_service: FileService
):
    """Test find_replace with multiline text."""
    # Create test entity with multiline content
    content = dedent("""
        # Title
        
        This is a paragraph
        that spans multiple lines
        and needs replacement.
        
        Other content.
        """).strip()
    entity = await entity_service.create_entity(
        EntitySchema(
            title="Sample Note",
            folder="docs",
            entity_type="note",
            content=content,
        )
    )
    # Replace multiline text
    find_text = "This is a paragraph\nthat spans multiple lines\nand needs replacement."
    new_text = "This is new content\nthat replaces the old paragraph."
    updated = await entity_service.edit_entity(
        identifier=entity.permalink, operation="find_replace", content=new_text, find_text=find_text
    )
    # Verify replacement worked
    file_path = file_service.get_entity_path(updated)
    file_content, _ = await file_service.read_file(file_path)
    assert "This is new content\nthat replaces the old paragraph." in file_content
    assert "Other content." in file_content  # Make sure rest is preserved
# Edge case tests for replace_section operation
@pytest.mark.asyncio
async def test_edit_entity_replace_section_multiple_sections_error(entity_service: EntityService):
    """Test replace_section with multiple sections having same header."""
    # Create test entity with duplicate section headers
    content = dedent("""
        # Main Title
        
        ## Section 1
        First instance content
        
        ## Section 2
        Some content
        
        ## Section 1
        Second instance content
        """).strip()
    entity = await entity_service.create_entity(
        EntitySchema(
            title="Sample Note",
            folder="docs",
            entity_type="note",
            content=content,
        )
    )
    # Try to replace section when multiple exist
    with pytest.raises(ValueError, match="Multiple sections found with header '## Section 1'"):
        await entity_service.edit_entity(
            identifier=entity.permalink,
            operation="replace_section",
            content="New content",
            section="## Section 1",
        )
@pytest.mark.asyncio
async def test_edit_entity_replace_section_empty_section(entity_service: EntityService):
    """Test replace_section with empty section parameter."""
    # Create test entity
    entity = await entity_service.create_entity(
        EntitySchema(
            title="Test Note",
            folder="test",
            entity_type="note",
            content="Some content",
        )
    )
    # Try with empty section
    with pytest.raises(ValueError, match="section cannot be empty or whitespace only"):
        await entity_service.edit_entity(
            identifier=entity.permalink,
            operation="replace_section",
            content="new content",
            section="   ",  # whitespace only
        )
@pytest.mark.asyncio
async def test_edit_entity_replace_section_header_variations(
    entity_service: EntityService, file_service: FileService
):
    """Test replace_section with different header formatting."""
    # Create entity with various header formats (avoiding "test" in frontmatter)
    content = dedent("""
        # Main Title
        
        ## Section Name
        Original content
        
        ### Subsection
        Sub content
        """).strip()
    entity = await entity_service.create_entity(
        EntitySchema(
            title="Sample Note",
            folder="docs",
            entity_type="note",
            content=content,
        )
    )
    # Test replacing with different header format (no ##)
    updated = await entity_service.edit_entity(
        identifier=entity.permalink,
        operation="replace_section",
        content="New section content",
        section="Section Name",  # No ## prefix
    )
    # Verify replacement worked
    file_path = file_service.get_entity_path(updated)
    file_content, _ = await file_service.read_file(file_path)
    assert "New section content" in file_content
    assert "Original content" not in file_content
    assert "### Subsection" in file_content  # Subsection preserved
@pytest.mark.asyncio
async def test_edit_entity_replace_section_at_end_of_document(
    entity_service: EntityService, file_service: FileService
):
    """Test replace_section when section is at the end of document."""
    # Create test entity with section at end
    content = dedent("""
        # Main Title
        
        ## First Section
        First content
        
        ## Last Section
        Last section content""").strip()  # No trailing newline
    entity = await entity_service.create_entity(
        EntitySchema(
            title="Sample Note",
            folder="docs",
            entity_type="note",
            content=content,
        )
    )
    # Replace the last section
    updated = await entity_service.edit_entity(
        identifier=entity.permalink,
        operation="replace_section",
        content="New last section content",
        section="## Last Section",
    )
    # Verify replacement worked
    file_path = file_service.get_entity_path(updated)
    file_content, _ = await file_service.read_file(file_path)
    assert "New last section content" in file_content
    assert "Last section content" not in file_content
    assert "First content" in file_content  # Previous section preserved
@pytest.mark.asyncio
async def test_edit_entity_replace_section_with_subsections(
    entity_service: EntityService, file_service: FileService
):
    """Test replace_section preserves subsections (stops at any header)."""
    # Create test entity with nested sections
    content = dedent("""
        # Main Title
        
        ## Parent Section
        Parent content
        
        ### Child Section 1
        Child 1 content
        
        ### Child Section 2  
        Child 2 content
        
        ## Another Section
        Other content
        """).strip()
    entity = await entity_service.create_entity(
        EntitySchema(
            title="Sample Note",
            folder="docs",
            entity_type="note",
            content=content,
        )
    )
    # Replace parent section (should only replace content until first subsection)
    updated = await entity_service.edit_entity(
        identifier=entity.permalink,
        operation="replace_section",
        content="New parent content",
        section="## Parent Section",
    )
    # Verify replacement worked - only immediate content replaced, subsections preserved
    file_path = file_service.get_entity_path(updated)
    file_content, _ = await file_service.read_file(file_path)
    assert "New parent content" in file_content
    assert "Parent content" not in file_content  # Original content replaced
    assert "Child 1 content" in file_content  # Child sections preserved
    assert "Child 2 content" in file_content  # Child sections preserved
    assert "## Another Section" in file_content  # Next section preserved
    assert "Other content" in file_content
# Move entity tests
@pytest.mark.asyncio
async def test_move_entity_success(
    entity_service: EntityService,
    file_service: FileService,
    project_config: ProjectConfig,
):
    """Test successful entity move with basic settings."""
    # Create test entity
    entity = await entity_service.create_entity(
        EntitySchema(
            title="Test Note",
            folder="original",
            entity_type="note",
            content="Original content",
        )
    )
    # Verify original file exists
    original_path = file_service.get_entity_path(entity)
    assert await file_service.exists(original_path)
    # Create app config with permalinks disabled
    app_config = BasicMemoryConfig(update_permalinks_on_move=False)
    # Move entity
    assert entity.permalink == "original/test-note"
    await entity_service.move_entity(
        identifier=entity.permalink,
        destination_path="moved/test-note.md",
        project_config=project_config,
        app_config=app_config,
    )
    # Verify original file no longer exists
    assert not await file_service.exists(original_path)
    # Verify new file exists
    new_path = project_config.home / "moved/test-note.md"
    assert new_path.exists()
    # Verify database was updated
    updated_entity = await entity_service.get_by_permalink(entity.permalink)
    assert updated_entity.file_path == "moved/test-note.md"
    # Verify file content is preserved
    new_content, _ = await file_service.read_file("moved/test-note.md")
    assert "Original content" in new_content
@pytest.mark.asyncio
async def test_move_entity_with_permalink_update(
    entity_service: EntityService,
    file_service: FileService,
    project_config: ProjectConfig,
):
    """Test entity move with permalink updates enabled."""
    # Create test entity
    entity = await entity_service.create_entity(
        EntitySchema(
            title="Test Note",
            folder="original",
            entity_type="note",
            content="Original content",
        )
    )
    original_permalink = entity.permalink
    # Create app config with permalinks enabled
    app_config = BasicMemoryConfig(update_permalinks_on_move=True)
    # Move entity
    await entity_service.move_entity(
        identifier=entity.permalink,
        destination_path="moved/test-note.md",
        project_config=project_config,
        app_config=app_config,
    )
    # Verify entity was found by new path (since permalink changed)
    moved_entity = await entity_service.link_resolver.resolve_link("moved/test-note.md")
    assert moved_entity is not None
    assert moved_entity.file_path == "moved/test-note.md"
    assert moved_entity.permalink != original_permalink
    # Verify frontmatter was updated with new permalink
    new_content, _ = await file_service.read_file("moved/test-note.md")
    assert moved_entity.permalink in new_content
@pytest.mark.asyncio
async def test_move_entity_creates_destination_directory(
    entity_service: EntityService,
    file_service: FileService,
    project_config: ProjectConfig,
):
    """Test that moving creates destination directory if it doesn't exist."""
    # Create test entity
    entity = await entity_service.create_entity(
        EntitySchema(
            title="Test Note",
            folder="original",
            entity_type="note",
            content="Original content",
        )
    )
    app_config = BasicMemoryConfig(update_permalinks_on_move=False)
    # Move to deeply nested path that doesn't exist
    await entity_service.move_entity(
        identifier=entity.permalink,
        destination_path="deeply/nested/folders/test-note.md",
        project_config=project_config,
        app_config=app_config,
    )
    # Verify directory was created
    new_path = project_config.home / "deeply/nested/folders/test-note.md"
    assert new_path.exists()
    assert new_path.parent.exists()
@pytest.mark.asyncio
async def test_move_entity_not_found(
    entity_service: EntityService,
    project_config: ProjectConfig,
):
    """Test moving non-existent entity raises error."""
    app_config = BasicMemoryConfig(update_permalinks_on_move=False)
    with pytest.raises(EntityNotFoundError, match="Entity not found: non-existent"):
        await entity_service.move_entity(
            identifier="non-existent",
            destination_path="new/path.md",
            project_config=project_config,
            app_config=app_config,
        )
@pytest.mark.asyncio
async def test_move_entity_source_file_missing(
    entity_service: EntityService,
    file_service: FileService,
    project_config: ProjectConfig,
):
    """Test moving when source file doesn't exist on filesystem."""
    # Create test entity
    entity = await entity_service.create_entity(
        EntitySchema(
            title="Test Note",
            folder="test",
            entity_type="note",
            content="Original content",
        )
    )
    # Manually delete the file (simulating corruption/external deletion)
    file_path = file_service.get_entity_path(entity)
    file_path.unlink()
    app_config = BasicMemoryConfig(update_permalinks_on_move=False)
    with pytest.raises(ValueError, match="Source file not found:"):
        await entity_service.move_entity(
            identifier=entity.permalink,
            destination_path="new/path.md",
            project_config=project_config,
            app_config=app_config,
        )
@pytest.mark.asyncio
async def test_move_entity_destination_exists(
    entity_service: EntityService,
    file_service: FileService,
    project_config: ProjectConfig,
):
    """Test moving to existing destination fails."""
    # Create two test entities
    entity1 = await entity_service.create_entity(
        EntitySchema(
            title="Test Note 1",
            folder="test",
            entity_type="note",
            content="Content 1",
        )
    )
    entity2 = await entity_service.create_entity(
        EntitySchema(
            title="Test Note 2",
            folder="test",
            entity_type="note",
            content="Content 2",
        )
    )
    app_config = BasicMemoryConfig(update_permalinks_on_move=False)
    # Try to move entity1 to entity2's location
    with pytest.raises(ValueError, match="Destination already exists:"):
        await entity_service.move_entity(
            identifier=entity1.permalink,
            destination_path=entity2.file_path,
            project_config=project_config,
            app_config=app_config,
        )
@pytest.mark.asyncio
async def test_move_entity_invalid_destination_path(
    entity_service: EntityService,
    project_config: ProjectConfig,
):
    """Test moving with invalid destination paths."""
    # Create test entity
    entity = await entity_service.create_entity(
        EntitySchema(
            title="Test Note",
            folder="test",
            entity_type="note",
            content="Original content",
        )
    )
    app_config = BasicMemoryConfig(update_permalinks_on_move=False)
    # Test absolute path
    with pytest.raises(ValueError, match="Invalid destination path:"):
        await entity_service.move_entity(
            identifier=entity.permalink,
            destination_path="/absolute/path.md",
            project_config=project_config,
            app_config=app_config,
        )
    # Test empty path
    with pytest.raises(ValueError, match="Invalid destination path:"):
        await entity_service.move_entity(
            identifier=entity.permalink,
            destination_path="",
            project_config=project_config,
            app_config=app_config,
        )
@pytest.mark.asyncio
async def test_move_entity_by_title(
    entity_service: EntityService,
    file_service: FileService,
    project_config: ProjectConfig,
    app_config: BasicMemoryConfig,
):
    """Test moving entity by title instead of permalink."""
    # Create test entity
    entity = await entity_service.create_entity(
        EntitySchema(
            title="Test Note",
            folder="original",
            entity_type="note",
            content="Original content",
        )
    )
    app_config = BasicMemoryConfig(update_permalinks_on_move=False)
    # Move by title
    await entity_service.move_entity(
        identifier="Test Note",  # Use title instead of permalink
        destination_path="moved/test-note.md",
        project_config=project_config,
        app_config=app_config,
    )
    # Verify old path no longer exists
    new_path = project_config.home / entity.file_path
    assert not new_path.exists()
    # Verify new file exists
    new_path = project_config.home / "moved/test-note.md"
    assert new_path.exists()
@pytest.mark.asyncio
async def test_move_entity_preserves_observations_and_relations(
    entity_service: EntityService,
    file_service: FileService,
    project_config: ProjectConfig,
):
    """Test that moving preserves entity observations and relations."""
    # Create test entity with observations and relations
    content = dedent("""
        # Test Note
        
        - [note] This is an observation #test
        - links to [[Other Entity]]
        
        Original content
        """).strip()
    entity = await entity_service.create_entity(
        EntitySchema(
            title="Test Note",
            folder="original",
            entity_type="note",
            content=content,
        )
    )
    # Verify initial observations and relations
    assert len(entity.observations) == 1
    assert len(entity.relations) == 1
    app_config = BasicMemoryConfig(update_permalinks_on_move=False)
    # Move entity
    await entity_service.move_entity(
        identifier=entity.permalink,
        destination_path="moved/test-note.md",
        project_config=project_config,
        app_config=app_config,
    )
    # Get moved entity
    moved_entity = await entity_service.link_resolver.resolve_link("moved/test-note.md")
    # Verify observations and relations are preserved
    assert len(moved_entity.observations) == 1
    assert moved_entity.observations[0].content == "This is an observation #test"
    assert len(moved_entity.relations) == 1
    assert moved_entity.relations[0].to_name == "Other Entity"
    # Verify file content includes observations and relations
    new_content, _ = await file_service.read_file("moved/test-note.md")
    assert "- [note] This is an observation #test" in new_content
    assert "- links to [[Other Entity]]" in new_content
@pytest.mark.asyncio
async def test_move_entity_rollback_on_database_failure(
    entity_service: EntityService,
    file_service: FileService,
    project_config: ProjectConfig,
    entity_repository: EntityRepository,
):
    """Test that filesystem changes are rolled back on database failures."""
    # Create test entity
    entity = await entity_service.create_entity(
        EntitySchema(
            title="Test Note",
            folder="original",
            entity_type="note",
            content="Original content",
        )
    )
    original_path = file_service.get_entity_path(entity)
    assert await file_service.exists(original_path)
    app_config = BasicMemoryConfig(update_permalinks_on_move=False)
    # Mock repository update to fail
    original_update = entity_repository.update
    async def failing_update(*args, **kwargs):
        return None  # Simulate failure
    entity_repository.update = failing_update
    try:
        with pytest.raises(ValueError, match="Move failed:"):
            await entity_service.move_entity(
                identifier=entity.permalink,
                destination_path="moved/test-note.md",
                project_config=project_config,
                app_config=app_config,
            )
        # Verify rollback - original file should still exist
        assert await file_service.exists(original_path)
        # Verify destination file was cleaned up
        destination_path = project_config.home / "moved/test-note.md"
        assert not destination_path.exists()
    finally:
        # Restore original update method
        entity_repository.update = original_update
@pytest.mark.asyncio
async def test_move_entity_with_complex_observations(
    entity_service: EntityService,
    file_service: FileService,
    project_config: ProjectConfig,
):
    """Test moving entity with complex observations (tags, context)."""
    content = dedent("""
        # Complex Note
        
        - [design] Keep feature branches short-lived #git #workflow (Reduces merge conflicts)
        - [tech] Using SQLite for storage #implementation (Fast and reliable)
        - implements [[Branch Strategy]] (Our standard workflow)
        
        Complex content with [[Multiple]] [[Links]].
        """).strip()
    entity = await entity_service.create_entity(
        EntitySchema(
            title="Complex Note",
            folder="docs",
            entity_type="note",
            content=content,
        )
    )
    # Verify complex structure
    assert len(entity.observations) == 2
    assert len(entity.relations) == 3  # 1 explicit + 2 wikilinks
    app_config = BasicMemoryConfig(update_permalinks_on_move=False)
    # Move entity
    await entity_service.move_entity(
        identifier=entity.permalink,
        destination_path="moved/complex-note.md",
        project_config=project_config,
        app_config=app_config,
    )
    # Verify moved entity maintains structure
    moved_entity = await entity_service.link_resolver.resolve_link("moved/complex-note.md")
    # Check observations with tags and context
    design_obs = [obs for obs in moved_entity.observations if obs.category == "design"][0]
    assert "git" in design_obs.tags
    assert "workflow" in design_obs.tags
    assert design_obs.context == "Reduces merge conflicts"
    tech_obs = [obs for obs in moved_entity.observations if obs.category == "tech"][0]
    assert "implementation" in tech_obs.tags
    assert tech_obs.context == "Fast and reliable"
    # Check relations
    relation_types = {rel.relation_type for rel in moved_entity.relations}
    assert "implements" in relation_types
    assert "links to" in relation_types
    relation_targets = {rel.to_name for rel in moved_entity.relations}
    assert "Branch Strategy" in relation_targets
    assert "Multiple" in relation_targets
    assert "Links" in relation_targets
@pytest.mark.asyncio
async def test_move_entity_with_null_permalink_generates_permalink(
    entity_service: EntityService,
    project_config: ProjectConfig,
    entity_repository: EntityRepository,
):
    """Test that moving entity with null permalink generates a new permalink automatically.
    This tests the fix for issue #155 where entities with null permalinks from the database
    migration would fail validation when being moved. The fix ensures that entities with
    null permalinks get a generated permalink during move operations, regardless of the
    update_permalinks_on_move setting.
    """
    # Create entity through direct database insertion to simulate migrated entity with null permalink
    from datetime import datetime, timezone
    # Create an entity with null permalink directly in database (simulating migrated data)
    entity_data = {
        "title": "Test Entity",
        "file_path": "test/null-permalink-entity.md",
        "entity_type": "note",
        "content_type": "text/markdown",
        "permalink": None,  # This is the key - null permalink from migration
        "created_at": datetime.now(timezone.utc),
        "updated_at": datetime.now(timezone.utc),
    }
    # Create the entity directly in database
    created_entity = await entity_repository.create(entity_data)
    assert created_entity.permalink is None
    # Create the physical file
    file_path = project_config.home / created_entity.file_path
    file_path.parent.mkdir(parents=True, exist_ok=True)
    file_path.write_text("# Test Entity\n\nContent here.")
    # Configure move without permalink updates (the default setting that previously triggered the bug)
    app_config = BasicMemoryConfig(update_permalinks_on_move=False)
    # Move entity - this should now succeed and generate a permalink
    moved_entity = await entity_service.move_entity(
        identifier=created_entity.title,  # Use title since permalink is None
        destination_path="moved/test-entity.md",
        project_config=project_config,
        app_config=app_config,
    )
    # Verify the move succeeded and a permalink was generated
    assert moved_entity is not None
    assert moved_entity.file_path == "moved/test-entity.md"
    assert moved_entity.permalink is not None
    assert moved_entity.permalink != ""
    # Verify the moved entity can be used to create an EntityResponse without validation errors
    from basic_memory.schemas.response import EntityResponse
    response = EntityResponse.model_validate(moved_entity)
    assert response.permalink == moved_entity.permalink
    # Verify the physical file was moved
    old_path = project_config.home / "test/null-permalink-entity.md"
    new_path = project_config.home / "moved/test-entity.md"
    assert not old_path.exists()
    assert new_path.exists()
@pytest.mark.asyncio
async def test_create_or_update_entity_fuzzy_search_bug(
    entity_service: EntityService,
    file_service: FileService,
    project_config: ProjectConfig,
    search_service: SearchService,
):
    """Test that create_or_update_entity doesn't incorrectly match similar entities via fuzzy search.
    This reproduces the critical bug where creating "Node C" overwrote "Node A.md"
    because fuzzy search incorrectly matched the similar file paths.
    Root cause: link_resolver.resolve_link() uses fuzzy search fallback which matches
    "edge-cases/Node C.md" to existing "edge-cases/Node A.md" because they share
    similar words ("edge-cases", "Node").
    Expected: Create new entity "Node C" with its own file
    Actual Bug: Updates existing "Node A" entity, overwriting its file
    """
    # Step 1: Create first entity "Node A"
    entity_a = EntitySchema(
        title="Node A",
        folder="edge-cases",
        entity_type="note",
        content="# Node A\n\nOriginal content for Node A",
    )
    created_a, is_new_a = await entity_service.create_or_update_entity(entity_a)
    assert is_new_a is True, "Node A should be created as new entity"
    assert created_a.title == "Node A"
    assert created_a.file_path == "edge-cases/Node A.md"
    # CRITICAL: Index Node A in search to enable fuzzy search fallback
    # This is what triggers the bug - without indexing, fuzzy search returns no results
    await search_service.index_entity(created_a)
    # Verify Node A file exists with correct content
    file_a = project_config.home / "edge-cases" / "Node A.md"
    assert file_a.exists(), "Node A.md file should exist"
    content_a = file_a.read_text()
    assert "Node A" in content_a
    assert "Original content for Node A" in content_a
    # Step 2: Create Node B to match live test scenario
    entity_b = EntitySchema(
        title="Node B",
        folder="edge-cases",
        entity_type="note",
        content="# Node B\n\nContent for Node B",
    )
    created_b, is_new_b = await entity_service.create_or_update_entity(entity_b)
    assert is_new_b is True
    await search_service.index_entity(created_b)
    # Step 3: Create Node C - this is where the bug occurs in live testing
    # BUG: This will incorrectly match Node A via fuzzy search
    entity_c = EntitySchema(
        title="Node C",
        folder="edge-cases",
        entity_type="note",
        content="# Node C\n\nContent for Node C",
    )
    created_c, is_new_c = await entity_service.create_or_update_entity(entity_c)
    # CRITICAL ASSERTIONS: Node C should be created as NEW entity, not update Node A
    assert is_new_c is True, "Node C should be created as NEW entity, not update existing"
    assert created_c.title == "Node C", "Created entity should have title 'Node C'"
    assert created_c.file_path == "edge-cases/Node C.md", "Should create Node C.md file"
    assert created_c.id != created_a.id, "Node C should have different ID than Node A"
    # Verify both files exist with correct content
    file_c = project_config.home / "edge-cases" / "Node C.md"
    assert file_c.exists(), "Node C.md file should exist as separate file"
    # Re-read Node A file to ensure it wasn't overwritten
    content_a_after = file_a.read_text()
    assert "title: Node A" in content_a_after, "Node A.md should still have Node A title"
    assert "Original content for Node A" in content_a_after, (
        "Node A.md should NOT be overwritten with Node C content"
    )
    assert "Content for Node C" not in content_a_after, (
        "Node A.md should not contain Node C content"
    )
    # Verify Node C file has correct content
    content_c = file_c.read_text()
    assert "title: Node C" in content_c, "Node C.md should have Node C title"
    assert "Content for Node C" in content_c, "Node C.md should have Node C content"
    assert "Original content for Node A" not in content_c, (
        "Node C.md should not contain Node A content"
    )
```