#
tokens: 48941/50000 17/348 files (page 7/17)
lines: off (toggle) GitHub
raw markdown copy
This is page 7 of 17. Use http://codebase.md/basicmachines-co/basic-memory?lines=false&page={x} to view the full context.

# Directory Structure

```
├── .claude
│   ├── agents
│   │   ├── python-developer.md
│   │   └── system-architect.md
│   └── commands
│       ├── release
│       │   ├── beta.md
│       │   ├── changelog.md
│       │   ├── release-check.md
│       │   └── release.md
│       ├── spec.md
│       └── test-live.md
├── .dockerignore
├── .github
│   ├── dependabot.yml
│   ├── ISSUE_TEMPLATE
│   │   ├── bug_report.md
│   │   ├── config.yml
│   │   ├── documentation.md
│   │   └── feature_request.md
│   └── workflows
│       ├── claude-code-review.yml
│       ├── claude-issue-triage.yml
│       ├── claude.yml
│       ├── dev-release.yml
│       ├── docker.yml
│       ├── pr-title.yml
│       ├── release.yml
│       └── test.yml
├── .gitignore
├── .python-version
├── CHANGELOG.md
├── CITATION.cff
├── CLA.md
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── docker-compose.yml
├── Dockerfile
├── docs
│   ├── ai-assistant-guide-extended.md
│   ├── character-handling.md
│   ├── cloud-cli.md
│   └── Docker.md
├── justfile
├── LICENSE
├── llms-install.md
├── pyproject.toml
├── README.md
├── SECURITY.md
├── smithery.yaml
├── specs
│   ├── SPEC-1 Specification-Driven Development Process.md
│   ├── SPEC-10 Unified Deployment Workflow and Event Tracking.md
│   ├── SPEC-11 Basic Memory API Performance Optimization.md
│   ├── SPEC-12 OpenTelemetry Observability.md
│   ├── SPEC-13 CLI Authentication with Subscription Validation.md
│   ├── SPEC-14 Cloud Git Versioning & GitHub Backup.md
│   ├── SPEC-14- Cloud Git Versioning & GitHub Backup.md
│   ├── SPEC-15 Configuration Persistence via Tigris for Cloud Tenants.md
│   ├── SPEC-16 MCP Cloud Service Consolidation.md
│   ├── SPEC-17 Semantic Search with ChromaDB.md
│   ├── SPEC-18 AI Memory Management Tool.md
│   ├── SPEC-19 Sync Performance and Memory Optimization.md
│   ├── SPEC-2 Slash Commands Reference.md
│   ├── SPEC-3 Agent Definitions.md
│   ├── SPEC-4 Notes Web UI Component Architecture.md
│   ├── SPEC-5 CLI Cloud Upload via WebDAV.md
│   ├── SPEC-6 Explicit Project Parameter Architecture.md
│   ├── SPEC-7 POC to spike Tigris Turso for local access to cloud data.md
│   ├── SPEC-8 TigrisFS Integration.md
│   ├── SPEC-9 Multi-Project Bidirectional Sync Architecture.md
│   ├── SPEC-9 Signed Header Tenant Information.md
│   └── SPEC-9-1 Follow-Ups- Conflict, Sync, and Observability.md
├── src
│   └── basic_memory
│       ├── __init__.py
│       ├── alembic
│       │   ├── alembic.ini
│       │   ├── env.py
│       │   ├── migrations.py
│       │   ├── script.py.mako
│       │   └── versions
│       │       ├── 3dae7c7b1564_initial_schema.py
│       │       ├── 502b60eaa905_remove_required_from_entity_permalink.py
│       │       ├── 5fe1ab1ccebe_add_projects_table.py
│       │       ├── 647e7a75e2cd_project_constraint_fix.py
│       │       ├── 9d9c1cb7d8f5_add_mtime_and_size_columns_to_entity_.py
│       │       ├── a1b2c3d4e5f6_fix_project_foreign_keys.py
│       │       ├── b3c3938bacdb_relation_to_name_unique_index.py
│       │       ├── cc7172b46608_update_search_index_schema.py
│       │       └── e7e1f4367280_add_scan_watermark_tracking_to_project.py
│       ├── api
│       │   ├── __init__.py
│       │   ├── app.py
│       │   ├── routers
│       │   │   ├── __init__.py
│       │   │   ├── directory_router.py
│       │   │   ├── importer_router.py
│       │   │   ├── knowledge_router.py
│       │   │   ├── management_router.py
│       │   │   ├── memory_router.py
│       │   │   ├── project_router.py
│       │   │   ├── prompt_router.py
│       │   │   ├── resource_router.py
│       │   │   ├── search_router.py
│       │   │   └── utils.py
│       │   └── template_loader.py
│       ├── cli
│       │   ├── __init__.py
│       │   ├── app.py
│       │   ├── auth.py
│       │   ├── commands
│       │   │   ├── __init__.py
│       │   │   ├── cloud
│       │   │   │   ├── __init__.py
│       │   │   │   ├── api_client.py
│       │   │   │   ├── bisync_commands.py
│       │   │   │   ├── cloud_utils.py
│       │   │   │   ├── core_commands.py
│       │   │   │   ├── mount_commands.py
│       │   │   │   ├── rclone_config.py
│       │   │   │   ├── rclone_installer.py
│       │   │   │   ├── upload_command.py
│       │   │   │   └── upload.py
│       │   │   ├── command_utils.py
│       │   │   ├── db.py
│       │   │   ├── import_chatgpt.py
│       │   │   ├── import_claude_conversations.py
│       │   │   ├── import_claude_projects.py
│       │   │   ├── import_memory_json.py
│       │   │   ├── mcp.py
│       │   │   ├── project.py
│       │   │   ├── status.py
│       │   │   ├── sync.py
│       │   │   └── tool.py
│       │   └── main.py
│       ├── config.py
│       ├── db.py
│       ├── deps.py
│       ├── file_utils.py
│       ├── ignore_utils.py
│       ├── importers
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── chatgpt_importer.py
│       │   ├── claude_conversations_importer.py
│       │   ├── claude_projects_importer.py
│       │   ├── memory_json_importer.py
│       │   └── utils.py
│       ├── markdown
│       │   ├── __init__.py
│       │   ├── entity_parser.py
│       │   ├── markdown_processor.py
│       │   ├── plugins.py
│       │   ├── schemas.py
│       │   └── utils.py
│       ├── mcp
│       │   ├── __init__.py
│       │   ├── async_client.py
│       │   ├── project_context.py
│       │   ├── prompts
│       │   │   ├── __init__.py
│       │   │   ├── ai_assistant_guide.py
│       │   │   ├── continue_conversation.py
│       │   │   ├── recent_activity.py
│       │   │   ├── search.py
│       │   │   └── utils.py
│       │   ├── resources
│       │   │   ├── ai_assistant_guide.md
│       │   │   └── project_info.py
│       │   ├── server.py
│       │   └── tools
│       │       ├── __init__.py
│       │       ├── build_context.py
│       │       ├── canvas.py
│       │       ├── chatgpt_tools.py
│       │       ├── delete_note.py
│       │       ├── edit_note.py
│       │       ├── list_directory.py
│       │       ├── move_note.py
│       │       ├── project_management.py
│       │       ├── read_content.py
│       │       ├── read_note.py
│       │       ├── recent_activity.py
│       │       ├── search.py
│       │       ├── utils.py
│       │       ├── view_note.py
│       │       └── write_note.py
│       ├── models
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── knowledge.py
│       │   ├── project.py
│       │   └── search.py
│       ├── repository
│       │   ├── __init__.py
│       │   ├── entity_repository.py
│       │   ├── observation_repository.py
│       │   ├── project_info_repository.py
│       │   ├── project_repository.py
│       │   ├── relation_repository.py
│       │   ├── repository.py
│       │   └── search_repository.py
│       ├── schemas
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── cloud.py
│       │   ├── delete.py
│       │   ├── directory.py
│       │   ├── importer.py
│       │   ├── memory.py
│       │   ├── project_info.py
│       │   ├── prompt.py
│       │   ├── request.py
│       │   ├── response.py
│       │   ├── search.py
│       │   └── sync_report.py
│       ├── services
│       │   ├── __init__.py
│       │   ├── context_service.py
│       │   ├── directory_service.py
│       │   ├── entity_service.py
│       │   ├── exceptions.py
│       │   ├── file_service.py
│       │   ├── initialization.py
│       │   ├── link_resolver.py
│       │   ├── project_service.py
│       │   ├── search_service.py
│       │   └── service.py
│       ├── sync
│       │   ├── __init__.py
│       │   ├── background_sync.py
│       │   ├── sync_service.py
│       │   └── watch_service.py
│       ├── templates
│       │   └── prompts
│       │       ├── continue_conversation.hbs
│       │       └── search.hbs
│       └── utils.py
├── test-int
│   ├── BENCHMARKS.md
│   ├── cli
│   │   ├── test_project_commands_integration.py
│   │   ├── test_sync_commands_integration.py
│   │   └── test_version_integration.py
│   ├── conftest.py
│   ├── mcp
│   │   ├── test_build_context_underscore.py
│   │   ├── test_build_context_validation.py
│   │   ├── test_chatgpt_tools_integration.py
│   │   ├── test_default_project_mode_integration.py
│   │   ├── test_delete_note_integration.py
│   │   ├── test_edit_note_integration.py
│   │   ├── test_list_directory_integration.py
│   │   ├── test_move_note_integration.py
│   │   ├── test_project_management_integration.py
│   │   ├── test_project_state_sync_integration.py
│   │   ├── test_read_content_integration.py
│   │   ├── test_read_note_integration.py
│   │   ├── test_search_integration.py
│   │   ├── test_single_project_mcp_integration.py
│   │   └── test_write_note_integration.py
│   ├── test_db_wal_mode.py
│   ├── test_disable_permalinks_integration.py
│   └── test_sync_performance_benchmark.py
├── tests
│   ├── __init__.py
│   ├── api
│   │   ├── conftest.py
│   │   ├── test_async_client.py
│   │   ├── test_continue_conversation_template.py
│   │   ├── test_directory_router.py
│   │   ├── test_importer_router.py
│   │   ├── test_knowledge_router.py
│   │   ├── test_management_router.py
│   │   ├── test_memory_router.py
│   │   ├── test_project_router_operations.py
│   │   ├── test_project_router.py
│   │   ├── test_prompt_router.py
│   │   ├── test_relation_background_resolution.py
│   │   ├── test_resource_router.py
│   │   ├── test_search_router.py
│   │   ├── test_search_template.py
│   │   ├── test_template_loader_helpers.py
│   │   └── test_template_loader.py
│   ├── cli
│   │   ├── conftest.py
│   │   ├── test_bisync_commands.py
│   │   ├── test_cli_tools.py
│   │   ├── test_cloud_authentication.py
│   │   ├── test_cloud_utils.py
│   │   ├── test_ignore_utils.py
│   │   ├── test_import_chatgpt.py
│   │   ├── test_import_claude_conversations.py
│   │   ├── test_import_claude_projects.py
│   │   ├── test_import_memory_json.py
│   │   └── test_upload.py
│   ├── conftest.py
│   ├── db
│   │   └── test_issue_254_foreign_key_constraints.py
│   ├── importers
│   │   ├── test_importer_base.py
│   │   └── test_importer_utils.py
│   ├── markdown
│   │   ├── __init__.py
│   │   ├── test_date_frontmatter_parsing.py
│   │   ├── test_entity_parser_error_handling.py
│   │   ├── test_entity_parser.py
│   │   ├── test_markdown_plugins.py
│   │   ├── test_markdown_processor.py
│   │   ├── test_observation_edge_cases.py
│   │   ├── test_parser_edge_cases.py
│   │   ├── test_relation_edge_cases.py
│   │   └── test_task_detection.py
│   ├── mcp
│   │   ├── conftest.py
│   │   ├── test_obsidian_yaml_formatting.py
│   │   ├── test_permalink_collision_file_overwrite.py
│   │   ├── test_prompts.py
│   │   ├── test_resources.py
│   │   ├── test_tool_build_context.py
│   │   ├── test_tool_canvas.py
│   │   ├── test_tool_delete_note.py
│   │   ├── test_tool_edit_note.py
│   │   ├── test_tool_list_directory.py
│   │   ├── test_tool_move_note.py
│   │   ├── test_tool_read_content.py
│   │   ├── test_tool_read_note.py
│   │   ├── test_tool_recent_activity.py
│   │   ├── test_tool_resource.py
│   │   ├── test_tool_search.py
│   │   ├── test_tool_utils.py
│   │   ├── test_tool_view_note.py
│   │   ├── test_tool_write_note.py
│   │   └── tools
│   │       └── test_chatgpt_tools.py
│   ├── Non-MarkdownFileSupport.pdf
│   ├── repository
│   │   ├── test_entity_repository_upsert.py
│   │   ├── test_entity_repository.py
│   │   ├── test_entity_upsert_issue_187.py
│   │   ├── test_observation_repository.py
│   │   ├── test_project_info_repository.py
│   │   ├── test_project_repository.py
│   │   ├── test_relation_repository.py
│   │   ├── test_repository.py
│   │   ├── test_search_repository_edit_bug_fix.py
│   │   └── test_search_repository.py
│   ├── schemas
│   │   ├── test_base_timeframe_minimum.py
│   │   ├── test_memory_serialization.py
│   │   ├── test_memory_url_validation.py
│   │   ├── test_memory_url.py
│   │   ├── test_schemas.py
│   │   └── test_search.py
│   ├── Screenshot.png
│   ├── services
│   │   ├── test_context_service.py
│   │   ├── test_directory_service.py
│   │   ├── test_entity_service_disable_permalinks.py
│   │   ├── test_entity_service.py
│   │   ├── test_file_service.py
│   │   ├── test_initialization.py
│   │   ├── test_link_resolver.py
│   │   ├── test_project_removal_bug.py
│   │   ├── test_project_service_operations.py
│   │   ├── test_project_service.py
│   │   └── test_search_service.py
│   ├── sync
│   │   ├── test_character_conflicts.py
│   │   ├── test_sync_service_incremental.py
│   │   ├── test_sync_service.py
│   │   ├── test_sync_wikilink_issue.py
│   │   ├── test_tmp_files.py
│   │   ├── test_watch_service_edge_cases.py
│   │   ├── test_watch_service_reload.py
│   │   └── test_watch_service.py
│   ├── test_config.py
│   ├── test_db_migration_deduplication.py
│   ├── test_deps.py
│   ├── test_production_cascade_delete.py
│   └── utils
│       ├── test_file_utils.py
│       ├── test_frontmatter_obsidian_compatible.py
│       ├── test_parse_tags.py
│       ├── test_permalink_formatting.py
│       ├── test_utf8_handling.py
│       └── test_validate_project_path.py
├── uv.lock
├── v0.15.0-RELEASE-DOCS.md
└── v15-docs
    ├── api-performance.md
    ├── background-relations.md
    ├── basic-memory-home.md
    ├── bug-fixes.md
    ├── chatgpt-integration.md
    ├── cloud-authentication.md
    ├── cloud-bisync.md
    ├── cloud-mode-usage.md
    ├── cloud-mount.md
    ├── default-project-mode.md
    ├── env-file-removal.md
    ├── env-var-overrides.md
    ├── explicit-project-parameter.md
    ├── gitignore-integration.md
    ├── project-root-env-var.md
    ├── README.md
    └── sqlite-performance.md
```

# Files

--------------------------------------------------------------------------------
/src/basic_memory/services/directory_service.py:
--------------------------------------------------------------------------------

```python
"""Directory service for managing file directories and tree structure."""

import fnmatch
import logging
import os
from typing import Dict, List, Optional, Sequence

from basic_memory.models import Entity
from basic_memory.repository import EntityRepository
from basic_memory.schemas.directory import DirectoryNode

logger = logging.getLogger(__name__)


class DirectoryService:
    """Service for working with directory trees."""

    def __init__(self, entity_repository: EntityRepository):
        """Initialize the directory service.

        Args:
            entity_repository: Directory repository for data access.
        """
        self.entity_repository = entity_repository

    async def get_directory_tree(self) -> DirectoryNode:
        """Build a hierarchical directory tree from indexed files."""

        # Get all files from DB (flat list)
        entity_rows = await self.entity_repository.find_all()

        # Create a root directory node
        root_node = DirectoryNode(name="Root", directory_path="/", type="directory")

        # Map to store directory nodes by path for easy lookup
        dir_map: Dict[str, DirectoryNode] = {root_node.directory_path: root_node}

        # First pass: create all directory nodes
        for file in entity_rows:
            # Process directory path components
            parts = [p for p in file.file_path.split("/") if p]

            # Create directory structure
            current_path = "/"
            for i, part in enumerate(parts[:-1]):  # Skip the filename
                parent_path = current_path
                # Build the directory path
                current_path = (
                    f"{current_path}{part}" if current_path == "/" else f"{current_path}/{part}"
                )

                # Create directory node if it doesn't exist
                if current_path not in dir_map:
                    dir_node = DirectoryNode(
                        name=part, directory_path=current_path, type="directory"
                    )
                    dir_map[current_path] = dir_node

                    # Add to parent's children
                    if parent_path in dir_map:
                        dir_map[parent_path].children.append(dir_node)

        # Second pass: add file nodes to their parent directories
        for file in entity_rows:
            file_name = os.path.basename(file.file_path)
            parent_dir = os.path.dirname(file.file_path)
            directory_path = "/" if parent_dir == "" else f"/{parent_dir}"

            # Create file node
            file_node = DirectoryNode(
                name=file_name,
                file_path=file.file_path,  # Original path from DB (no leading slash)
                directory_path=f"/{file.file_path}",  # Path with leading slash
                type="file",
                title=file.title,
                permalink=file.permalink,
                entity_id=file.id,
                entity_type=file.entity_type,
                content_type=file.content_type,
                updated_at=file.updated_at,
            )

            # Add to parent directory's children
            if directory_path in dir_map:
                dir_map[directory_path].children.append(file_node)
            else:
                # If parent directory doesn't exist (should be rare), add to root
                dir_map["/"].children.append(file_node)  # pragma: no cover

        # Return the root node with its children
        return root_node

    async def get_directory_structure(self) -> DirectoryNode:
        """Build a hierarchical directory structure without file details.

        Optimized method for folder navigation that only returns directory nodes,
        no file metadata. Much faster than get_directory_tree() for large knowledge bases.

        Returns:
            DirectoryNode tree containing only folders (type="directory")
        """
        # Get unique directories without loading entities
        directories = await self.entity_repository.get_distinct_directories()

        # Create a root directory node
        root_node = DirectoryNode(name="Root", directory_path="/", type="directory")

        # Map to store directory nodes by path for easy lookup
        dir_map: Dict[str, DirectoryNode] = {"/": root_node}

        # Build tree with just folders
        for dir_path in directories:
            parts = [p for p in dir_path.split("/") if p]
            current_path = "/"

            for i, part in enumerate(parts):
                parent_path = current_path
                # Build the directory path
                current_path = (
                    f"{current_path}{part}" if current_path == "/" else f"{current_path}/{part}"
                )

                # Create directory node if it doesn't exist
                if current_path not in dir_map:
                    dir_node = DirectoryNode(
                        name=part, directory_path=current_path, type="directory"
                    )
                    dir_map[current_path] = dir_node

                    # Add to parent's children
                    if parent_path in dir_map:
                        dir_map[parent_path].children.append(dir_node)

        return root_node

    async def list_directory(
        self,
        dir_name: str = "/",
        depth: int = 1,
        file_name_glob: Optional[str] = None,
    ) -> List[DirectoryNode]:
        """List directory contents with filtering and depth control.

        Args:
            dir_name: Directory path to list (default: root "/")
            depth: Recursion depth (1 = immediate children only)
            file_name_glob: Glob pattern for filtering file names

        Returns:
            List of DirectoryNode objects matching the criteria
        """
        # Normalize directory path
        # Strip ./ prefix if present (handles relative path notation)
        if dir_name.startswith("./"):
            dir_name = dir_name[2:]  # Remove "./" prefix

        # Ensure path starts with "/"
        if not dir_name.startswith("/"):
            dir_name = f"/{dir_name}"

        # Remove trailing slashes except for root
        if dir_name != "/" and dir_name.endswith("/"):
            dir_name = dir_name.rstrip("/")

        # Optimize: Query only entities in the target directory
        # instead of loading the entire tree
        dir_prefix = dir_name.lstrip("/")
        entity_rows = await self.entity_repository.find_by_directory_prefix(dir_prefix)

        # Build a partial tree from only the relevant entities
        root_tree = self._build_directory_tree_from_entities(entity_rows, dir_name)

        # Find the target directory node
        target_node = self._find_directory_node(root_tree, dir_name)
        if not target_node:
            return []

        # Collect nodes with depth and glob filtering
        result = []
        self._collect_nodes_recursive(target_node, result, depth, file_name_glob, 0)

        return result

    def _build_directory_tree_from_entities(
        self, entity_rows: Sequence[Entity], root_path: str
    ) -> DirectoryNode:
        """Build a directory tree from a subset of entities.

        Args:
            entity_rows: Sequence of entity objects to build tree from
            root_path: Root directory path for the tree

        Returns:
            DirectoryNode representing the tree root
        """
        # Create a root directory node
        root_node = DirectoryNode(name="Root", directory_path=root_path, type="directory")

        # Map to store directory nodes by path for easy lookup
        dir_map: Dict[str, DirectoryNode] = {root_path: root_node}

        # First pass: create all directory nodes
        for file in entity_rows:
            # Process directory path components
            parts = [p for p in file.file_path.split("/") if p]

            # Create directory structure
            current_path = "/"
            for i, part in enumerate(parts[:-1]):  # Skip the filename
                parent_path = current_path
                # Build the directory path
                current_path = (
                    f"{current_path}{part}" if current_path == "/" else f"{current_path}/{part}"
                )

                # Create directory node if it doesn't exist
                if current_path not in dir_map:
                    dir_node = DirectoryNode(
                        name=part, directory_path=current_path, type="directory"
                    )
                    dir_map[current_path] = dir_node

                    # Add to parent's children
                    if parent_path in dir_map:
                        dir_map[parent_path].children.append(dir_node)

        # Second pass: add file nodes to their parent directories
        for file in entity_rows:
            file_name = os.path.basename(file.file_path)
            parent_dir = os.path.dirname(file.file_path)
            directory_path = "/" if parent_dir == "" else f"/{parent_dir}"

            # Create file node
            file_node = DirectoryNode(
                name=file_name,
                file_path=file.file_path,
                directory_path=f"/{file.file_path}",
                type="file",
                title=file.title,
                permalink=file.permalink,
                entity_id=file.id,
                entity_type=file.entity_type,
                content_type=file.content_type,
                updated_at=file.updated_at,
            )

            # Add to parent directory's children
            if directory_path in dir_map:
                dir_map[directory_path].children.append(file_node)
            elif root_path in dir_map:
                # Fallback to root if parent not found
                dir_map[root_path].children.append(file_node)

        return root_node

    def _find_directory_node(
        self, root: DirectoryNode, target_path: str
    ) -> Optional[DirectoryNode]:
        """Find a directory node by path in the tree."""
        if root.directory_path == target_path:
            return root

        for child in root.children:
            if child.type == "directory":
                found = self._find_directory_node(child, target_path)
                if found:
                    return found

        return None

    def _collect_nodes_recursive(
        self,
        node: DirectoryNode,
        result: List[DirectoryNode],
        max_depth: int,
        file_name_glob: Optional[str],
        current_depth: int,
    ) -> None:
        """Recursively collect nodes with depth and glob filtering."""
        if current_depth >= max_depth:
            return

        for child in node.children:
            # Apply glob filtering
            if file_name_glob and not fnmatch.fnmatch(child.name, file_name_glob):
                continue

            # Add the child to results
            result.append(child)

            # Recurse into subdirectories if we haven't reached max depth
            if child.type == "directory" and current_depth < max_depth:
                self._collect_nodes_recursive(
                    child, result, max_depth, file_name_glob, current_depth + 1
                )

```

--------------------------------------------------------------------------------
/v15-docs/cloud-bisync.md:
--------------------------------------------------------------------------------

```markdown
# Cloud Bidirectional Sync (SPEC-9)

**Status**: New Feature
**PR**: #322
**Requires**: Active subscription, rclone installation

## What's New

v0.15.0 introduces **bidirectional cloud synchronization** using rclone bisync. Your local files sync automatically with the cloud, enabling multi-device workflows, backups, and collaboration.

## Quick Start

### One-Time Setup

```bash
# Install and configure cloud sync
bm cloud bisync-setup

# What it does:
# 1. Installs rclone
# 2. Gets tenant credentials
# 3. Configures rclone remote
# 4. Creates sync directory
# 5. Performs initial sync
```

### Regular Sync

```bash
# Recommended: Use standard sync command
bm sync                    # Syncs local → database
bm cloud bisync            # Syncs local ↔ cloud

# Or: Use watch mode (auto-sync every 60 seconds)
bm sync --watch
```

## How Bidirectional Sync Works

### Sync Architecture

```
Local Files          rclone bisync          Cloud Storage
~/basic-memory-      <─────────────>       s3://bucket/
cloud-sync/          (bidirectional)       tenant-id/
  ├── project-a/                              ├── project-a/
  ├── project-b/                              ├── project-b/
  └── notes/                                  └── notes/
```

### Sync Profiles

Three profiles optimize for different use cases:

| Profile | Conflicts | Max Deletes | Speed | Use Case |
|---------|-----------|-------------|-------|----------|
| **safe** | Keep both versions | 10 | Slower | Preserve all changes, manual conflict resolution |
| **balanced** | Use newer file | 25 | Medium | **Default** - auto-resolve most conflicts |
| **fast** | Use newer file | 50 | Fastest | Rapid iteration, trust newer versions |

### Conflict Resolution

**safe profile** (--conflict-resolve=none):
- Conflicting files saved as `file.conflict1`, `file.conflict2`
- Manual resolution required
- No data loss

**balanced/fast profiles** (--conflict-resolve=newer):
- Automatically uses the newer file
- Faster syncs
- Good for single-user workflows

## Commands

### bm cloud bisync-setup

One-time setup for cloud sync.

```bash
bm cloud bisync-setup

# Optional: Custom sync directory
bm cloud bisync-setup --dir ~/my-sync-folder
```

**What happens:**
1. Checks for/installs rclone
2. Generates scoped S3 credentials
3. Configures rclone remote
4. Creates local sync directory
5. Performs initial baseline sync (--resync)

**Configuration saved to:**
- `~/.basic-memory/config.json` - sync_dir path
- `~/.config/rclone/rclone.conf` - remote credentials
- `~/.basic-memory/bisync-state/{tenant_id}/` - sync state

### bm cloud bisync

Manual bidirectional sync.

```bash
# Basic sync (uses 'balanced' profile)
bm cloud bisync

# Choose sync profile
bm cloud bisync --profile safe
bm cloud bisync --profile balanced
bm cloud bisync --profile fast

# Dry run (preview changes)
bm cloud bisync --dry-run

# Force resync (rebuild baseline)
bm cloud bisync --resync

# Verbose output
bm cloud bisync --verbose
```

**Auto-registration:**
- Scans local directory for new projects
- Creates them on cloud before sync
- Ensures cloud knows about all local projects

### bm sync (Recommended)

The standard sync command now handles both local and cloud:

```bash
# One command for everything
bm sync                    # Local sync + cloud sync
bm sync --watch            # Continuous sync every 60s
```

## Sync Directory Structure

### Default Layout

```bash
~/basic-memory-cloud-sync/     # Configurable via --dir
├── project-a/                 # Auto-created local projects
│   ├── notes/
│   ├── ideas/
│   └── .bmignore              # Respected during sync
├── project-b/
│   └── documents/
└── .basic-memory/             # Metadata (ignored in sync)
```

### Important Paths

| Path | Purpose |
|------|---------|
| `~/basic-memory-cloud-sync/` | Default local sync directory |
| `~/basic-memory-cloud/` | Mount point (DO NOT use for bisync) |
| `~/.basic-memory/bisync-state/{tenant_id}/` | Sync state and history |
| `~/.basic-memory/.bmignore` | Patterns to exclude from sync |

**Critical:** Bisync and mount must use **different directories**

## File Filtering with .bmignore

### Default Patterns

Basic Memory respects `.bmignore` patterns (gitignore format):

```bash
# ~/.basic-memory/.bmignore (default)
.git
.DS_Store
node_modules
*.tmp
.env
__pycache__
.pytest_cache
.ruff_cache
.vscode
.idea
```

### How It Works

1. `.bmignore` patterns converted to rclone filter format
2. Auto-regenerated when `.bmignore` changes
3. Stored as `~/.basic-memory/.bmignore.rclone`
4. Applied to all bisync operations

### Custom Patterns

Edit `~/.basic-memory/.bmignore`:

```bash
# Your custom patterns
.git
*.log
temp/
*.backup
```

Next sync will use updated filters.

## Project Management

### Auto-Registration

Bisync automatically registers new local projects:

```bash
# You create a new project locally
mkdir ~/basic-memory-cloud-sync/new-project
echo "# Hello" > ~/basic-memory-cloud-sync/new-project/README.md

# Next sync auto-creates on cloud
bm cloud bisync
# → "Found 1 new local project, creating on cloud..."
# → "✓ Created project: new-project"
```

### Project Discovery

```bash
# List cloud projects
bm cloud status

# Shows:
# - Total projects
# - Last sync time
# - Storage used
```

### Cloud Mode

To work with cloud projects via CLI:

```bash
# Set cloud API URL
export BASIC_MEMORY_API_URL=https://api.basicmemory.cloud

# Or in config.json:
{
  "api_url": "https://api.basicmemory.cloud"
}

# Now CLI tools work against cloud
bm sync --project new-project        # Syncs cloud project
bm tools continue-conversation --project new-project
```

## Sync Workflow Examples

### Daily Workflow

```bash
# Morning: Start watch mode
bm sync --watch &

# Work in your sync directory
cd ~/basic-memory-cloud-sync/work-notes
vim ideas.md

# Changes auto-sync every 60s
# Watch output shows sync progress
```

### Multi-Device Workflow

**Device A:**
```bash
# Make changes
echo "# New Idea" > ~/basic-memory-cloud-sync/ideas/innovation.md

# Sync to cloud
bm cloud bisync
# → "✓ Sync completed - 1 file uploaded"
```

**Device B:**
```bash
# Pull changes from cloud
bm cloud bisync
# → "✓ Sync completed - 1 file downloaded"

# See the new file
cat ~/basic-memory-cloud-sync/ideas/innovation.md
# → "# New Idea"
```

### Conflict Scenario

**Using balanced profile (auto-resolve):**

```bash
# Both devices edit same file
# Device A: Updated at 10:00 AM
# Device B: Updated at 10:05 AM

# Device A syncs
bm cloud bisync
# → "✓ Sync completed"

# Device B syncs
bm cloud bisync
# → "Resolving conflict: using newer version"
# → "✓ Sync completed"
# → Device B's version (10:05) wins
```

**Using safe profile (manual resolution):**

```bash
bm cloud bisync --profile safe
# → "Conflict detected: ideas.md"
# → "Saved as: ideas.md.conflict1 and ideas.md.conflict2"
# → "Please resolve manually"

# Review both versions
diff ideas.md.conflict1 ideas.md.conflict2

# Merge and cleanup
vim ideas.md  # Merge manually
rm ideas.md.conflict*
```

## Monitoring and Status

### Check Sync Status

```bash
bm cloud status
```

**Shows:**
```
Cloud Bisync Status
┏━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Property            ┃ Value                      ┃
┡━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ Status              │ ✓ Initialized              │
│ Local Directory     │ ~/basic-memory-cloud-sync  │
│ Remote              │ s3://bucket/tenant-id      │
│ Last Sync           │ 2 minutes ago              │
│ Total Projects      │ 5                          │
└─────────────────────┴────────────────────────────┘
```

### Verify Integrity

```bash
bm cloud check
```

Compares local and cloud file hashes to detect:
- Corrupted files
- Missing files
- Sync drift

## Troubleshooting

### "First bisync requires --resync"

**Problem:** Initial sync not established

```bash
$ bm cloud bisync
Error: First bisync requires --resync to establish baseline
```

**Solution:**
```bash
bm cloud bisync --resync
```

### "Cannot use mount directory for bisync"

**Problem:** Trying to use mounted directory for sync

```bash
$ bm cloud bisync --dir ~/basic-memory-cloud
Error: Cannot use ~/basic-memory-cloud for bisync - it's the mount directory!
```

**Solution:** Use different directory
```bash
bm cloud bisync --dir ~/basic-memory-cloud-sync
```

### Sync Conflicts

**Problem:** Files modified on both sides

**Safe profile (manual):**
```bash
# Find conflict files
find ~/basic-memory-cloud-sync -name "*.conflict*"

# Review and merge
vimdiff file.conflict1 file.conflict2

# Keep desired version
mv file.conflict1 file
rm file.conflict2
```

**Balanced profile (auto):**
```bash
# Already resolved to newer version
# Check git history if needed
cd ~/basic-memory-cloud-sync
git log file.md
```

### Deleted Too Many Files

**Problem:** Exceeds max_delete threshold

```bash
$ bm cloud bisync
Error: Deletion exceeds safety limit (26 > 25)
```

**Solution:** Review deletions, then force if intentional
```bash
# Preview what would be deleted
bm cloud bisync --dry-run

# If intentional, use higher threshold profile
bm cloud bisync --profile fast  # max_delete=50

# Or resync to establish new baseline
bm cloud bisync --resync
```

### rclone Not Found

**Problem:** rclone not installed

```bash
$ bm cloud bisync
Error: rclone not found
```

**Solution:**
```bash
# Run setup again
bm cloud bisync-setup
# → Installs rclone automatically
```

## Configuration

### Bisync Config

Edit `~/.basic-memory/config.json`:

```json
{
  "bisync_config": {
    "sync_dir": "~/basic-memory-cloud-sync",
    "default_profile": "balanced",
    "auto_sync_interval": 60
  }
}
```

### rclone Config

Located at `~/.config/rclone/rclone.conf`:

```ini
[basic-memory-{tenant_id}]
type = s3
provider = AWS
env_auth = false
access_key_id = AKIA...
secret_access_key = ***
region = us-east-1
endpoint = https://fly.storage.tigris.dev
```

**Security:** This file contains credentials - keep private (mode 600)

## Performance Tips

1. **Use balanced profile**: Best trade-off for most users
2. **Enable watch mode**: `bm sync --watch` for auto-sync
3. **Optimize .bmignore**: Exclude build artifacts and temp files
4. **Batch changes**: Group related edits before sync
5. **Use fast profile**: For rapid iteration on solo projects

## Migration from WebDAV

If upgrading from v0.14.x WebDAV:

1. **Backup existing setup**
   ```bash
   cp -r ~/basic-memory ~/basic-memory.backup
   ```

2. **Run bisync setup**
   ```bash
   bm cloud bisync-setup
   ```

3. **Copy projects to sync directory**
   ```bash
   cp -r ~/basic-memory/* ~/basic-memory-cloud-sync/
   ```

4. **Initial sync**
   ```bash
   bm cloud bisync --resync
   ```

5. **Remove old WebDAV config** (if applicable)

## Security

- **Scoped credentials**: S3 credentials only access your tenant
- **Encrypted transport**: All traffic over HTTPS/TLS
- **No plain text secrets**: Credentials stored securely in rclone config
- **File permissions**: Config files restricted to user (600)
- **.bmignore**: Prevents syncing sensitive files

## See Also

- SPEC-9: Multi-Project Bidirectional Sync Architecture
- `cloud-authentication.md` - Required for cloud access
- `cloud-mount.md` - Alternative: mount cloud storage
- `env-file-removal.md` - Why .env files aren't synced
- `gitignore-integration.md` - File filtering patterns

```

--------------------------------------------------------------------------------
/v15-docs/api-performance.md:
--------------------------------------------------------------------------------

```markdown
# API Performance Optimizations (SPEC-11)

**Status**: Performance Enhancement
**PR**: #315
**Specification**: SPEC-11
**Impact**: Faster API responses, reduced database queries

## What Changed

v0.15.0 implements comprehensive API performance optimizations from SPEC-11, including query optimizations, reduced database round trips, and improved relation traversal.

## Key Optimizations

### 1. Query Optimization

**Before:**
```python
# Multiple separate queries
entity = await get_entity(id)              # Query 1
observations = await get_observations(id)  # Query 2
relations = await get_relations(id)        # Query 3
tags = await get_tags(id)                  # Query 4
```

**After:**
```python
# Single optimized query with joins
entity = await get_entity_with_details(id)
# → One query returns everything
```

**Result:** **75% fewer database queries**

### 2. Relation Traversal

**Before:**
```python
# Recursive queries for each relation
for relation in entity.relations:
    target = await get_entity(relation.target_id)  # N queries
```

**After:**
```python
# Batch load all related entities
related_ids = [r.target_id for r in entity.relations]
targets = await get_entities_batch(related_ids)  # 1 query
```

**Result:** **N+1 query problem eliminated**

### 3. Eager Loading

**Before:**
```python
# Lazy loading (multiple queries)
entity = await get_entity(id)
if need_relations:
    relations = await load_relations(id)
if need_observations:
    observations = await load_observations(id)
```

**After:**
```python
# Eager loading (one query)
entity = await get_entity(
    id,
    load_relations=True,
    load_observations=True
)  # All data in one query
```

**Result:** Configurable loading strategy

## Performance Impact

### API Response Times

**read_note endpoint:**
```
Before: 250ms average
After:  75ms average (3.3x faster)
```

**search_notes endpoint:**
```
Before: 450ms average
After:  150ms average (3x faster)
```

**build_context endpoint (depth=2):**
```
Before: 1200ms average
After:  320ms average (3.8x faster)
```

### Database Queries

**Typical MCP tool call:**
```
Before: 15-20 queries
After:  3-5 queries (75% reduction)
```

**Context building (10 entities):**
```
Before: 150+ queries (N+1 problem)
After:  8 queries (batch loading)
```

## Optimization Techniques

### 1. SELECT Optimization

**Specific column selection:**
```python
# Before: SELECT *
query = select(Entity)

# After: SELECT only needed columns
query = select(
    Entity.id,
    Entity.title,
    Entity.permalink,
    Entity.content
)
```

**Benefit:** Reduced data transfer

### 2. JOIN Optimization

**Efficient joins:**
```python
# Join related tables in one query
query = (
    select(Entity, Observation, Relation)
    .join(Observation, Entity.id == Observation.entity_id)
    .join(Relation, Entity.id == Relation.from_id)
)
```

**Benefit:** Single query vs multiple

### 3. Index Usage

**Optimized indexes:**
```sql
-- Ensure indexes on frequently queried columns
CREATE INDEX idx_entity_permalink ON entities(permalink);
CREATE INDEX idx_relation_from_id ON relations(from_id);
CREATE INDEX idx_relation_to_id ON relations(to_id);
CREATE INDEX idx_observation_entity_id ON observations(entity_id);
```

**Benefit:** Faster lookups

### 4. Query Caching

**Result caching:**
```python
from functools import lru_cache

@lru_cache(maxsize=1000)
async def get_entity_cached(entity_id: str):
    return await get_entity(entity_id)
```

**Benefit:** Avoid redundant queries

### 5. Batch Loading

**Load multiple entities:**
```python
# Before: Load one at a time
entities = []
for id in entity_ids:
    entity = await get_entity(id)  # N queries
    entities.append(entity)

# After: Batch load
query = select(Entity).where(Entity.id.in_(entity_ids))
entities = await db.execute(query)  # 1 query
```

**Benefit:** Eliminates N+1 problem

## API-Specific Optimizations

### read_note

**Optimizations:**
- Single query with joins
- Eager load observations and relations
- Efficient permalink lookup

```python
# Optimized query
query = (
    select(Entity)
    .options(
        selectinload(Entity.observations),
        selectinload(Entity.relations)
    )
    .where(Entity.permalink == permalink)
)
```

**Performance:**
- **Before:** 250ms (4 queries)
- **After:** 75ms (1 query)

### search_notes

**Optimizations:**
- Full-text search index
- Pagination optimization
- Result limiting

```python
# Optimized search
query = (
    select(Entity)
    .where(Entity.content.match(search_query))
    .limit(page_size)
    .offset(page * page_size)
)
```

**Performance:**
- **Before:** 450ms
- **After:** 150ms (3x faster)

### build_context

**Optimizations:**
- Batch relation traversal
- Depth-limited queries
- Circular reference detection

```python
# Optimized context building
async def build_context(url: str, depth: int = 2):
    # Start entity
    entity = await get_entity_by_url(url)

    # Batch load all relations (depth levels)
    related_ids = collect_related_ids(entity, depth)
    related = await get_entities_batch(related_ids)  # 1 query

    return build_graph(entity, related)
```

**Performance:**
- **Before:** 1200ms (150+ queries)
- **After:** 320ms (8 queries)

### recent_activity

**Optimizations:**
- Time-indexed queries
- Limit early in query
- Efficient sorting

```python
# Optimized recent query
query = (
    select(Entity)
    .where(Entity.updated_at >= timeframe_start)
    .order_by(Entity.updated_at.desc())
    .limit(max_results)
)
```

**Performance:**
- **Before:** 600ms
- **After:** 180ms (3.3x faster)

## Configuration

### Query Optimization Settings

No configuration needed - optimizations are automatic.

### Monitoring Query Performance

**Enable query logging:**
```bash
export BASIC_MEMORY_LOG_LEVEL=DEBUG
```

**Log output:**
```
[DEBUG] Query took 15ms: SELECT entity WHERE permalink=...
[DEBUG] Query took 3ms: SELECT observations WHERE entity_id IN (...)
```

### Profiling

```python
import time
from loguru import logger

async def profile_query(query_name: str):
    start = time.time()
    result = await execute_query()
    elapsed = (time.time() - start) * 1000
    logger.info(f"{query_name}: {elapsed:.2f}ms")
    return result
```

## Benchmarks

### Single Entity Retrieval

```
Operation: get_entity_with_details(id)

Before:
- Queries: 4 (entity, observations, relations, tags)
- Time: 45ms total

After:
- Queries: 1 (joined query)
- Time: 12ms total (3.8x faster)
```

### Search Operations

```
Operation: search_notes(query, limit=10)

Before:
- Queries: 1 search + 10 detail queries
- Time: 450ms total

After:
- Queries: 1 optimized search with joins
- Time: 150ms total (3x faster)
```

### Context Building

```
Operation: build_context(url, depth=2)

Scenario: 10 entities, 20 relations

Before:
- Queries: 1 root + 20 relations + 10 targets = 31 queries
- Time: 620ms

After:
- Queries: 1 root + 1 batch relations + 1 batch targets = 3 queries
- Time: 165ms (3.8x faster)
```

### Bulk Operations

```
Operation: Import 100 notes

Before:
- Queries: 100 inserts + 300 relation queries = 400 queries
- Time: 8.5 seconds

After:
- Queries: 1 bulk insert + 1 bulk relations = 2 queries
- Time: 2.1 seconds (4x faster)
```

## Best Practices

### 1. Use Batch Operations

```python
# ✓ Good: Batch load
entity_ids = [1, 2, 3, 4, 5]
entities = await get_entities_batch(entity_ids)

# ✗ Bad: Load one at a time
entities = []
for id in entity_ids:
    entity = await get_entity(id)
    entities.append(entity)
```

### 2. Specify Required Data

```python
# ✓ Good: Load what you need
entity = await get_entity(
    id,
    load_relations=True,
    load_observations=False  # Don't need these
)

# ✗ Bad: Load everything
entity = await get_entity_full(id)  # Loads unnecessary data
```

### 3. Use Pagination

```python
# ✓ Good: Paginate results
results = await search_notes(
    query="test",
    page=1,
    page_size=20
)

# ✗ Bad: Load all results
results = await search_notes(query="test")  # Could be thousands
```

### 4. Index Foreign Keys

```sql
-- ✓ Good: Indexed joins
CREATE INDEX idx_relation_from_id ON relations(from_id);

-- ✗ Bad: No index
-- Joins will be slow
```

### 5. Limit Depth

```python
# ✓ Good: Reasonable depth
context = await build_context(url, depth=2)

# ✗ Bad: Excessive depth
context = await build_context(url, depth=10)  # Exponential growth
```

## Troubleshooting

### Slow Queries

**Problem:** API responses still slow

**Debug:**
```bash
# Enable query logging
export BASIC_MEMORY_LOG_LEVEL=DEBUG

# Check for N+1 queries
# Look for repeated similar queries
```

**Solution:**
```python
# Use batch loading
ids = [1, 2, 3, 4, 5]
entities = await get_entities_batch(ids)  # Not in loop
```

### High Memory Usage

**Problem:** Large result sets consume memory

**Solution:**
```python
# Use streaming/pagination
async for batch in stream_entities(batch_size=100):
    process(batch)
```

### Database Locks

**Problem:** Concurrent queries blocking

**Solution:**
- Ensure WAL mode enabled (see `sqlite-performance.md`)
- Use read-only queries when possible
- Reduce transaction size

## Implementation Details

### Optimized Query Builder

```python
class OptimizedQueryBuilder:
    def __init__(self):
        self.query = select(Entity)
        self.joins = []
        self.options = []

    def with_observations(self):
        self.options.append(selectinload(Entity.observations))
        return self

    def with_relations(self):
        self.options.append(selectinload(Entity.relations))
        return self

    def build(self):
        if self.options:
            self.query = self.query.options(*self.options)
        return self.query
```

### Batch Loader

```python
class BatchEntityLoader:
    def __init__(self, batch_size: int = 100):
        self.batch_size = batch_size
        self.pending = []

    async def load(self, entity_id: str):
        self.pending.append(entity_id)

        if len(self.pending) >= self.batch_size:
            return await self._flush()

        return None

    async def _flush(self):
        if not self.pending:
            return []

        ids = self.pending
        self.pending = []

        # Single batch query
        query = select(Entity).where(Entity.id.in_(ids))
        result = await db.execute(query)
        return result.scalars().all()
```

### Query Cache

```python
from cachetools import TTLCache

class QueryCache:
    def __init__(self, maxsize: int = 1000, ttl: int = 300):
        self.cache = TTLCache(maxsize=maxsize, ttl=ttl)

    async def get_or_query(self, key: str, query_func):
        if key in self.cache:
            return self.cache[key]

        result = await query_func()
        self.cache[key] = result
        return result
```

## Migration from v0.14.x

### Automatic Optimization

**No action needed** - optimizations are automatic:

```bash
# Upgrade and restart
pip install --upgrade basic-memory
bm mcp

# Optimizations active immediately
```

### Verify Performance Improvement

**Before upgrade:**
```bash
time bm tools search --query "test"
# → 450ms
```

**After upgrade:**
```bash
time bm tools search --query "test"
# → 150ms (3x faster)
```

## See Also

- SPEC-11: API Performance Optimization specification
- `sqlite-performance.md` - Database-level optimizations
- `background-relations.md` - Background processing optimizations
- Database indexing guide
- Query optimization patterns

```

--------------------------------------------------------------------------------
/src/basic_memory/db.py:
--------------------------------------------------------------------------------

```python
import asyncio
import os
from contextlib import asynccontextmanager
from enum import Enum, auto
from pathlib import Path
from typing import AsyncGenerator, Optional

from basic_memory.config import BasicMemoryConfig, ConfigManager
from alembic import command
from alembic.config import Config

from loguru import logger
from sqlalchemy import text, event
from sqlalchemy.ext.asyncio import (
    create_async_engine,
    async_sessionmaker,
    AsyncSession,
    AsyncEngine,
    async_scoped_session,
)
from sqlalchemy.pool import NullPool

from basic_memory.repository.search_repository import SearchRepository

# Module level state
_engine: Optional[AsyncEngine] = None
_session_maker: Optional[async_sessionmaker[AsyncSession]] = None
_migrations_completed: bool = False


class DatabaseType(Enum):
    """Types of supported databases."""

    MEMORY = auto()
    FILESYSTEM = auto()

    @classmethod
    def get_db_url(cls, db_path: Path, db_type: "DatabaseType") -> str:
        """Get SQLAlchemy URL for database path."""
        if db_type == cls.MEMORY:
            logger.info("Using in-memory SQLite database")
            return "sqlite+aiosqlite://"

        return f"sqlite+aiosqlite:///{db_path}"  # pragma: no cover


def get_scoped_session_factory(
    session_maker: async_sessionmaker[AsyncSession],
) -> async_scoped_session:
    """Create a scoped session factory scoped to current task."""
    return async_scoped_session(session_maker, scopefunc=asyncio.current_task)


@asynccontextmanager
async def scoped_session(
    session_maker: async_sessionmaker[AsyncSession],
) -> AsyncGenerator[AsyncSession, None]:
    """
    Get a scoped session with proper lifecycle management.

    Args:
        session_maker: Session maker to create scoped sessions from
    """
    factory = get_scoped_session_factory(session_maker)
    session = factory()
    try:
        await session.execute(text("PRAGMA foreign_keys=ON"))
        yield session
        await session.commit()
    except Exception:
        await session.rollback()
        raise
    finally:
        await session.close()
        await factory.remove()


def _configure_sqlite_connection(dbapi_conn, enable_wal: bool = True) -> None:
    """Configure SQLite connection with WAL mode and optimizations.

    Args:
        dbapi_conn: Database API connection object
        enable_wal: Whether to enable WAL mode (should be False for in-memory databases)
    """
    cursor = dbapi_conn.cursor()
    try:
        # Enable WAL mode for better concurrency (not supported for in-memory databases)
        if enable_wal:
            cursor.execute("PRAGMA journal_mode=WAL")
        # Set busy timeout to handle locked databases
        cursor.execute("PRAGMA busy_timeout=10000")  # 10 seconds
        # Optimize for performance
        cursor.execute("PRAGMA synchronous=NORMAL")
        cursor.execute("PRAGMA cache_size=-64000")  # 64MB cache
        cursor.execute("PRAGMA temp_store=MEMORY")
        # Windows-specific optimizations
        if os.name == "nt":
            cursor.execute("PRAGMA locking_mode=NORMAL")  # Ensure normal locking on Windows
    except Exception as e:
        # Log but don't fail - some PRAGMAs may not be supported
        logger.warning(f"Failed to configure SQLite connection: {e}")
    finally:
        cursor.close()


def _create_engine_and_session(
    db_path: Path, db_type: DatabaseType = DatabaseType.FILESYSTEM
) -> tuple[AsyncEngine, async_sessionmaker[AsyncSession]]:
    """Internal helper to create engine and session maker."""
    db_url = DatabaseType.get_db_url(db_path, db_type)
    logger.debug(f"Creating engine for db_url: {db_url}")

    # Configure connection args with Windows-specific settings
    connect_args: dict[str, bool | float | None] = {"check_same_thread": False}

    # Add Windows-specific parameters to improve reliability
    if os.name == "nt":  # Windows
        connect_args.update(
            {
                "timeout": 30.0,  # Increase timeout to 30 seconds for Windows
                "isolation_level": None,  # Use autocommit mode
            }
        )
        # Use NullPool for Windows filesystem databases to avoid connection pooling issues
        # Important: Do NOT use NullPool for in-memory databases as it will destroy the database
        # between connections
        if db_type == DatabaseType.FILESYSTEM:
            engine = create_async_engine(
                db_url,
                connect_args=connect_args,
                poolclass=NullPool,  # Disable connection pooling on Windows
                echo=False,
            )
        else:
            # In-memory databases need connection pooling to maintain state
            engine = create_async_engine(db_url, connect_args=connect_args)
    else:
        engine = create_async_engine(db_url, connect_args=connect_args)

    # Enable WAL mode for better concurrency and reliability
    # Note: WAL mode is not supported for in-memory databases
    enable_wal = db_type != DatabaseType.MEMORY

    @event.listens_for(engine.sync_engine, "connect")
    def enable_wal_mode(dbapi_conn, connection_record):
        """Enable WAL mode on each connection."""
        _configure_sqlite_connection(dbapi_conn, enable_wal=enable_wal)

    session_maker = async_sessionmaker(engine, expire_on_commit=False)
    return engine, session_maker


async def get_or_create_db(
    db_path: Path,
    db_type: DatabaseType = DatabaseType.FILESYSTEM,
    ensure_migrations: bool = True,
) -> tuple[AsyncEngine, async_sessionmaker[AsyncSession]]:  # pragma: no cover
    """Get or create database engine and session maker."""
    global _engine, _session_maker

    if _engine is None:
        _engine, _session_maker = _create_engine_and_session(db_path, db_type)

        # Run migrations automatically unless explicitly disabled
        if ensure_migrations:
            app_config = ConfigManager().config
            await run_migrations(app_config, db_type)

    # These checks should never fail since we just created the engine and session maker
    # if they were None, but we'll check anyway for the type checker
    if _engine is None:
        logger.error("Failed to create database engine", db_path=str(db_path))
        raise RuntimeError("Database engine initialization failed")

    if _session_maker is None:
        logger.error("Failed to create session maker", db_path=str(db_path))
        raise RuntimeError("Session maker initialization failed")

    return _engine, _session_maker


async def shutdown_db() -> None:  # pragma: no cover
    """Clean up database connections."""
    global _engine, _session_maker, _migrations_completed

    if _engine:
        await _engine.dispose()
        _engine = None
        _session_maker = None
        _migrations_completed = False


@asynccontextmanager
async def engine_session_factory(
    db_path: Path,
    db_type: DatabaseType = DatabaseType.MEMORY,
) -> AsyncGenerator[tuple[AsyncEngine, async_sessionmaker[AsyncSession]], None]:
    """Create engine and session factory.

    Note: This is primarily used for testing where we want a fresh database
    for each test. For production use, use get_or_create_db() instead.
    """

    global _engine, _session_maker, _migrations_completed

    db_url = DatabaseType.get_db_url(db_path, db_type)
    logger.debug(f"Creating engine for db_url: {db_url}")

    # Configure connection args with Windows-specific settings
    connect_args: dict[str, bool | float | None] = {"check_same_thread": False}

    # Add Windows-specific parameters to improve reliability
    if os.name == "nt":  # Windows
        connect_args.update(
            {
                "timeout": 30.0,  # Increase timeout to 30 seconds for Windows
                "isolation_level": None,  # Use autocommit mode
            }
        )
        # Use NullPool for Windows filesystem databases to avoid connection pooling issues
        # Important: Do NOT use NullPool for in-memory databases as it will destroy the database
        # between connections
        if db_type == DatabaseType.FILESYSTEM:
            _engine = create_async_engine(
                db_url,
                connect_args=connect_args,
                poolclass=NullPool,  # Disable connection pooling on Windows
                echo=False,
            )
        else:
            # In-memory databases need connection pooling to maintain state
            _engine = create_async_engine(db_url, connect_args=connect_args)
    else:
        _engine = create_async_engine(db_url, connect_args=connect_args)

    # Enable WAL mode for better concurrency and reliability
    # Note: WAL mode is not supported for in-memory databases
    enable_wal = db_type != DatabaseType.MEMORY

    @event.listens_for(_engine.sync_engine, "connect")
    def enable_wal_mode(dbapi_conn, connection_record):
        """Enable WAL mode on each connection."""
        _configure_sqlite_connection(dbapi_conn, enable_wal=enable_wal)

    try:
        _session_maker = async_sessionmaker(_engine, expire_on_commit=False)

        # Verify that engine and session maker are initialized
        if _engine is None:  # pragma: no cover
            logger.error("Database engine is None in engine_session_factory")
            raise RuntimeError("Database engine initialization failed")

        if _session_maker is None:  # pragma: no cover
            logger.error("Session maker is None in engine_session_factory")
            raise RuntimeError("Session maker initialization failed")

        yield _engine, _session_maker
    finally:
        if _engine:
            await _engine.dispose()
            _engine = None
            _session_maker = None
            _migrations_completed = False


async def run_migrations(
    app_config: BasicMemoryConfig, database_type=DatabaseType.FILESYSTEM, force: bool = False
):  # pragma: no cover
    """Run any pending alembic migrations."""
    global _migrations_completed

    # Skip if migrations already completed unless forced
    if _migrations_completed and not force:
        logger.debug("Migrations already completed in this session, skipping")
        return

    logger.info("Running database migrations...")
    try:
        # Get the absolute path to the alembic directory relative to this file
        alembic_dir = Path(__file__).parent / "alembic"
        config = Config()

        # Set required Alembic config options programmatically
        config.set_main_option("script_location", str(alembic_dir))
        config.set_main_option(
            "file_template",
            "%%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s",
        )
        config.set_main_option("timezone", "UTC")
        config.set_main_option("revision_environment", "false")
        config.set_main_option(
            "sqlalchemy.url", DatabaseType.get_db_url(app_config.database_path, database_type)
        )

        command.upgrade(config, "head")
        logger.info("Migrations completed successfully")

        # Get session maker - ensure we don't trigger recursive migration calls
        if _session_maker is None:
            _, session_maker = _create_engine_and_session(app_config.database_path, database_type)
        else:
            session_maker = _session_maker

        # initialize the search Index schema
        # the project_id is not used for init_search_index, so we pass a dummy value
        await SearchRepository(session_maker, 1).init_search_index()

        # Mark migrations as completed
        _migrations_completed = True
    except Exception as e:  # pragma: no cover
        logger.error(f"Error running migrations: {e}")
        raise

```

--------------------------------------------------------------------------------
/tests/repository/test_observation_repository.py:
--------------------------------------------------------------------------------

```python
"""Tests for the ObservationRepository."""

from datetime import datetime, timezone

import pytest
import pytest_asyncio
import sqlalchemy
from sqlalchemy.ext.asyncio import async_sessionmaker

from basic_memory import db
from basic_memory.models import Entity, Observation, Project
from basic_memory.repository.observation_repository import ObservationRepository


@pytest_asyncio.fixture(scope="function")
async def repo(observation_repository):
    """Create an ObservationRepository instance"""
    return observation_repository


@pytest_asyncio.fixture(scope="function")
async def sample_observation(repo, sample_entity: Entity):
    """Create a sample observation for testing"""
    observation_data = {
        "entity_id": sample_entity.id,
        "content": "Test observation",
        "context": "test-context",
    }
    return await repo.create(observation_data)


@pytest.mark.asyncio
async def test_create_observation(
    observation_repository: ObservationRepository, sample_entity: Entity
):
    """Test creating a new observation"""
    observation_data = {
        "entity_id": sample_entity.id,
        "content": "Test content",
        "context": "test-context",
    }
    observation = await observation_repository.create(observation_data)

    assert observation.entity_id == sample_entity.id
    assert observation.content == "Test content"
    assert observation.id is not None  # Should be auto-generated


@pytest.mark.asyncio
async def test_create_observation_entity_does_not_exist(
    observation_repository: ObservationRepository, sample_entity: Entity
):
    """Test creating a new observation"""
    observation_data = {
        "entity_id": "does-not-exist",
        "content": "Test content",
        "context": "test-context",
    }
    with pytest.raises(sqlalchemy.exc.IntegrityError):
        await observation_repository.create(observation_data)


@pytest.mark.asyncio
async def test_find_by_entity(
    observation_repository: ObservationRepository,
    sample_observation: Observation,
    sample_entity: Entity,
):
    """Test finding observations by entity"""
    observations = await observation_repository.find_by_entity(sample_entity.id)
    assert len(observations) == 1
    assert observations[0].id == sample_observation.id
    assert observations[0].content == sample_observation.content


@pytest.mark.asyncio
async def test_find_by_context(
    observation_repository: ObservationRepository, sample_observation: Observation
):
    """Test finding observations by context"""
    observations = await observation_repository.find_by_context("test-context")
    assert len(observations) == 1
    assert observations[0].id == sample_observation.id
    assert observations[0].content == sample_observation.content


@pytest.mark.asyncio
async def test_delete_observations(session_maker: async_sessionmaker, repo, test_project: Project):
    """Test deleting observations by entity_id."""
    # Create test entity
    async with db.scoped_session(session_maker) as session:
        entity = Entity(
            project_id=test_project.id,
            title="test_entity",
            entity_type="test",
            permalink="test/test-entity",
            file_path="test/test_entity.md",
            content_type="text/markdown",
            created_at=datetime.now(timezone.utc),
            updated_at=datetime.now(timezone.utc),
        )
        session.add(entity)
        await session.flush()

        # Create test observations
        obs1 = Observation(
            entity_id=entity.id,
            content="Test observation 1",
        )
        obs2 = Observation(
            entity_id=entity.id,
            content="Test observation 2",
        )
        session.add_all([obs1, obs2])

    # Test deletion by entity_id
    deleted = await repo.delete_by_fields(entity_id=entity.id)
    assert deleted is True

    # Verify observations were deleted
    remaining = await repo.find_by_entity(entity.id)
    assert len(remaining) == 0


@pytest.mark.asyncio
async def test_delete_observation_by_id(
    session_maker: async_sessionmaker, repo, test_project: Project
):
    """Test deleting a single observation by its ID."""
    # Create test entity
    async with db.scoped_session(session_maker) as session:
        entity = Entity(
            project_id=test_project.id,
            title="test_entity",
            entity_type="test",
            permalink="test/test-entity",
            file_path="test/test_entity.md",
            content_type="text/markdown",
            created_at=datetime.now(timezone.utc),
            updated_at=datetime.now(timezone.utc),
        )
        session.add(entity)
        await session.flush()

        # Create test observation
        obs = Observation(
            entity_id=entity.id,
            content="Test observation",
        )
        session.add(obs)

    # Test deletion by ID
    deleted = await repo.delete(obs.id)
    assert deleted is True

    # Verify observation was deleted
    remaining = await repo.find_by_id(obs.id)
    assert remaining is None


@pytest.mark.asyncio
async def test_delete_observation_by_content(
    session_maker: async_sessionmaker, repo, test_project: Project
):
    """Test deleting observations by content."""
    # Create test entity
    async with db.scoped_session(session_maker) as session:
        entity = Entity(
            project_id=test_project.id,
            title="test_entity",
            entity_type="test",
            permalink="test/test-entity",
            file_path="test/test_entity.md",
            content_type="text/markdown",
            created_at=datetime.now(timezone.utc),
            updated_at=datetime.now(timezone.utc),
        )
        session.add(entity)
        await session.flush()

        # Create test observations
        obs1 = Observation(
            entity_id=entity.id,
            content="Delete this observation",
        )
        obs2 = Observation(
            entity_id=entity.id,
            content="Keep this observation",
        )
        session.add_all([obs1, obs2])

    # Test deletion by content
    deleted = await repo.delete_by_fields(content="Delete this observation")
    assert deleted is True

    # Verify only matching observation was deleted
    remaining = await repo.find_by_entity(entity.id)
    assert len(remaining) == 1
    assert remaining[0].content == "Keep this observation"


@pytest.mark.asyncio
async def test_find_by_category(session_maker: async_sessionmaker, repo, test_project: Project):
    """Test finding observations by their category."""
    # Create test entity
    async with db.scoped_session(session_maker) as session:
        entity = Entity(
            project_id=test_project.id,
            title="test_entity",
            entity_type="test",
            permalink="test/test-entity",
            file_path="test/test_entity.md",
            content_type="text/markdown",
            created_at=datetime.now(timezone.utc),
            updated_at=datetime.now(timezone.utc),
        )
        session.add(entity)
        await session.flush()

        # Create test observations with different categories
        observations = [
            Observation(
                entity_id=entity.id,
                content="Tech observation",
                category="tech",
            ),
            Observation(
                entity_id=entity.id,
                content="Design observation",
                category="design",
            ),
            Observation(
                entity_id=entity.id,
                content="Another tech observation",
                category="tech",
            ),
        ]
        session.add_all(observations)
        await session.commit()

    # Find tech observations
    tech_obs = await repo.find_by_category("tech")
    assert len(tech_obs) == 2
    assert all(obs.category == "tech" for obs in tech_obs)
    assert set(obs.content for obs in tech_obs) == {"Tech observation", "Another tech observation"}

    # Find design observations
    design_obs = await repo.find_by_category("design")
    assert len(design_obs) == 1
    assert design_obs[0].category == "design"
    assert design_obs[0].content == "Design observation"

    # Search for non-existent category
    missing_obs = await repo.find_by_category("missing")
    assert len(missing_obs) == 0


@pytest.mark.asyncio
async def test_observation_categories(
    session_maker: async_sessionmaker, repo, test_project: Project
):
    """Test retrieving distinct observation categories."""
    # Create test entity
    async with db.scoped_session(session_maker) as session:
        entity = Entity(
            project_id=test_project.id,
            title="test_entity",
            entity_type="test",
            permalink="test/test-entity",
            file_path="test/test_entity.md",
            content_type="text/markdown",
            created_at=datetime.now(timezone.utc),
            updated_at=datetime.now(timezone.utc),
        )
        session.add(entity)
        await session.flush()

        # Create observations with various categories
        observations = [
            Observation(
                entity_id=entity.id,
                content="First tech note",
                category="tech",
            ),
            Observation(
                entity_id=entity.id,
                content="Second tech note",
                category="tech",  # Duplicate category
            ),
            Observation(
                entity_id=entity.id,
                content="Design note",
                category="design",
            ),
            Observation(
                entity_id=entity.id,
                content="Feature note",
                category="feature",
            ),
        ]
        session.add_all(observations)
        await session.commit()

    # Get distinct categories
    categories = await repo.observation_categories()

    # Should have unique categories in a deterministic order
    assert set(categories) == {"tech", "design", "feature"}


@pytest.mark.asyncio
async def test_find_by_category_with_empty_db(repo):
    """Test category operations with an empty database."""
    # Find by category should return empty list
    obs = await repo.find_by_category("tech")
    assert len(obs) == 0

    # Get categories should return empty list
    categories = await repo.observation_categories()
    assert len(categories) == 0


@pytest.mark.asyncio
async def test_find_by_category_case_sensitivity(
    session_maker: async_sessionmaker, repo, test_project: Project
):
    """Test how category search handles case sensitivity."""
    async with db.scoped_session(session_maker) as session:
        entity = Entity(
            project_id=test_project.id,
            title="test_entity",
            entity_type="test",
            permalink="test/test-entity",
            file_path="test/test_entity.md",
            content_type="text/markdown",
            created_at=datetime.now(timezone.utc),
            updated_at=datetime.now(timezone.utc),
        )
        session.add(entity)
        await session.flush()

        # Create a test observation
        obs = Observation(
            entity_id=entity.id,
            content="Tech note",
            category="tech",  # lowercase in database
        )
        session.add(obs)
        await session.commit()

    # Search should work regardless of case
    # Note: If we want case-insensitive search, we'll need to update the query
    # For now, this test documents the current behavior
    exact_match = await repo.find_by_category("tech")
    assert len(exact_match) == 1

    upper_case = await repo.find_by_category("TECH")
    assert len(upper_case) == 0  # Currently case-sensitive

```

--------------------------------------------------------------------------------
/tests/mcp/test_tool_search.py:
--------------------------------------------------------------------------------

```python
"""Tests for search MCP tools."""

import pytest
from datetime import datetime, timedelta
from unittest.mock import patch

from basic_memory.mcp.tools import write_note
from basic_memory.mcp.tools.search import search_notes, _format_search_error_response
from basic_memory.schemas.search import SearchResponse


@pytest.mark.asyncio
async def test_search_text(client, test_project):
    """Test basic search functionality."""
    # Create a test note
    result = await write_note.fn(
        project=test_project.name,
        title="Test Search Note",
        folder="test",
        content="# Test\nThis is a searchable test note",
        tags=["test", "search"],
    )
    assert result

    # Search for it
    response = await search_notes.fn(project=test_project.name, query="searchable")

    # Verify results - handle both success and error cases
    if isinstance(response, SearchResponse):
        # Success case - verify SearchResponse
        assert len(response.results) > 0
        assert any(r.permalink == "test/test-search-note" for r in response.results)
    else:
        # If search failed and returned error message, test should fail with informative message
        pytest.fail(f"Search failed with error: {response}")


@pytest.mark.asyncio
async def test_search_title(client, test_project):
    """Test basic search functionality."""
    # Create a test note
    result = await write_note.fn(
        project=test_project.name,
        title="Test Search Note",
        folder="test",
        content="# Test\nThis is a searchable test note",
        tags=["test", "search"],
    )
    assert result

    # Search for it
    response = await search_notes.fn(
        project=test_project.name, query="Search Note", search_type="title"
    )

    # Verify results - handle both success and error cases
    if isinstance(response, str):
        # If search failed and returned error message, test should fail with informative message
        pytest.fail(f"Search failed with error: {response}")
    else:
        # Success case - verify SearchResponse
        assert len(response.results) > 0
        assert any(r.permalink == "test/test-search-note" for r in response.results)


@pytest.mark.asyncio
async def test_search_permalink(client, test_project):
    """Test basic search functionality."""
    # Create a test note
    result = await write_note.fn(
        project=test_project.name,
        title="Test Search Note",
        folder="test",
        content="# Test\nThis is a searchable test note",
        tags=["test", "search"],
    )
    assert result

    # Search for it
    response = await search_notes.fn(
        project=test_project.name, query="test/test-search-note", search_type="permalink"
    )

    # Verify results - handle both success and error cases
    if isinstance(response, SearchResponse):
        # Success case - verify SearchResponse
        assert len(response.results) > 0
        assert any(r.permalink == "test/test-search-note" for r in response.results)
    else:
        # If search failed and returned error message, test should fail with informative message
        pytest.fail(f"Search failed with error: {response}")


@pytest.mark.asyncio
async def test_search_permalink_match(client, test_project):
    """Test basic search functionality."""
    # Create a test note
    result = await write_note.fn(
        project=test_project.name,
        title="Test Search Note",
        folder="test",
        content="# Test\nThis is a searchable test note",
        tags=["test", "search"],
    )
    assert result

    # Search for it
    response = await search_notes.fn(
        project=test_project.name, query="test/test-search-*", search_type="permalink"
    )

    # Verify results - handle both success and error cases
    if isinstance(response, SearchResponse):
        # Success case - verify SearchResponse
        assert len(response.results) > 0
        assert any(r.permalink == "test/test-search-note" for r in response.results)
    else:
        # If search failed and returned error message, test should fail with informative message
        pytest.fail(f"Search failed with error: {response}")


@pytest.mark.asyncio
async def test_search_pagination(client, test_project):
    """Test basic search functionality."""
    # Create a test note
    result = await write_note.fn(
        project=test_project.name,
        title="Test Search Note",
        folder="test",
        content="# Test\nThis is a searchable test note",
        tags=["test", "search"],
    )
    assert result

    # Search for it
    response = await search_notes.fn(
        project=test_project.name, query="searchable", page=1, page_size=1
    )

    # Verify results - handle both success and error cases
    if isinstance(response, SearchResponse):
        # Success case - verify SearchResponse
        assert len(response.results) == 1
        assert any(r.permalink == "test/test-search-note" for r in response.results)
    else:
        # If search failed and returned error message, test should fail with informative message
        pytest.fail(f"Search failed with error: {response}")


@pytest.mark.asyncio
async def test_search_with_type_filter(client, test_project):
    """Test search with entity type filter."""
    # Create test content
    await write_note.fn(
        project=test_project.name,
        title="Entity Type Test",
        folder="test",
        content="# Test\nFiltered by type",
    )

    # Search with type filter
    response = await search_notes.fn(project=test_project.name, query="type", types=["note"])

    # Verify results - handle both success and error cases
    if isinstance(response, SearchResponse):
        # Success case - verify all results are entities
        assert all(r.type == "entity" for r in response.results)
    else:
        # If search failed and returned error message, test should fail with informative message
        pytest.fail(f"Search failed with error: {response}")


@pytest.mark.asyncio
async def test_search_with_entity_type_filter(client, test_project):
    """Test search with entity type filter."""
    # Create test content
    await write_note.fn(
        project=test_project.name,
        title="Entity Type Test",
        folder="test",
        content="# Test\nFiltered by type",
    )

    # Search with entity type filter
    response = await search_notes.fn(
        project=test_project.name, query="type", entity_types=["entity"]
    )

    # Verify results - handle both success and error cases
    if isinstance(response, SearchResponse):
        # Success case - verify all results are entities
        assert all(r.type == "entity" for r in response.results)
    else:
        # If search failed and returned error message, test should fail with informative message
        pytest.fail(f"Search failed with error: {response}")


@pytest.mark.asyncio
async def test_search_with_date_filter(client, test_project):
    """Test search with date filter."""
    # Create test content
    await write_note.fn(
        project=test_project.name,
        title="Recent Note",
        folder="test",
        content="# Test\nRecent content",
    )

    # Search with date filter
    one_hour_ago = datetime.now() - timedelta(hours=1)
    response = await search_notes.fn(
        project=test_project.name, query="recent", after_date=one_hour_ago.isoformat()
    )

    # Verify results - handle both success and error cases
    if isinstance(response, SearchResponse):
        # Success case - verify we get results within timeframe
        assert len(response.results) > 0
    else:
        # If search failed and returned error message, test should fail with informative message
        pytest.fail(f"Search failed with error: {response}")


class TestSearchErrorFormatting:
    """Test search error formatting for better user experience."""

    def test_format_search_error_fts5_syntax(self):
        """Test formatting for FTS5 syntax errors."""
        result = _format_search_error_response(
            "test-project", "syntax error in FTS5", "test query("
        )

        assert "# Search Failed - Invalid Syntax" in result
        assert "The search query 'test query(' contains invalid syntax" in result
        assert "Special characters" in result
        assert "test query" in result  # Clean query without special chars

    def test_format_search_error_no_results(self):
        """Test formatting for no results found."""
        result = _format_search_error_response(
            "test-project", "no results found", "very specific query"
        )

        assert "# Search Complete - No Results Found" in result
        assert "No content found matching 'very specific query'" in result
        assert "Broaden your search" in result
        assert "very" in result  # Simplified query

    def test_format_search_error_server_error(self):
        """Test formatting for server errors."""
        result = _format_search_error_response(
            "test-project", "internal server error", "test query"
        )

        assert "# Search Failed - Server Error" in result
        assert "The search service encountered an error while processing 'test query'" in result
        assert "Try again" in result
        assert "Check project status" in result

    def test_format_search_error_permission_denied(self):
        """Test formatting for permission errors."""
        result = _format_search_error_response("test-project", "permission denied", "test query")

        assert "# Search Failed - Access Error" in result
        assert "You don't have permission to search" in result
        assert "Check your project access" in result

    def test_format_search_error_project_not_found(self):
        """Test formatting for project not found errors."""
        result = _format_search_error_response(
            "test-project", "current project not found", "test query"
        )

        assert "# Search Failed - Project Not Found" in result
        assert "The current project is not accessible" in result
        assert "Check available projects" in result

    def test_format_search_error_generic(self):
        """Test formatting for generic errors."""
        result = _format_search_error_response("test-project", "unknown error", "test query")

        assert "# Search Failed" in result
        assert "Error searching for 'test query': unknown error" in result
        assert "## Troubleshooting steps:" in result


class TestSearchToolErrorHandling:
    """Test search tool exception handling."""

    @pytest.mark.asyncio
    async def test_search_notes_exception_handling(self):
        """Test exception handling in search_notes."""
        with patch("basic_memory.mcp.tools.search.get_active_project") as mock_get_project:
            mock_get_project.return_value.project_url = "http://test"

            with patch(
                "basic_memory.mcp.tools.search.call_post", side_effect=Exception("syntax error")
            ):
                result = await search_notes.fn(project="test-project", query="test query")

                assert isinstance(result, str)
                assert "# Search Failed - Invalid Syntax" in result

    @pytest.mark.asyncio
    async def test_search_notes_permission_error(self):
        """Test search_notes with permission error."""
        with patch("basic_memory.mcp.tools.search.get_active_project") as mock_get_project:
            mock_get_project.return_value.project_url = "http://test"

            with patch(
                "basic_memory.mcp.tools.search.call_post",
                side_effect=Exception("permission denied"),
            ):
                result = await search_notes.fn(project="test-project", query="test query")

                assert isinstance(result, str)
                assert "# Search Failed - Access Error" in result

```

--------------------------------------------------------------------------------
/src/basic_memory/cli/commands/tool.py:
--------------------------------------------------------------------------------

```python
"""CLI tool commands for Basic Memory."""

import asyncio
import sys
from typing import Annotated, List, Optional

import typer
from loguru import logger
from rich import print as rprint

from basic_memory.cli.app import app
from basic_memory.config import ConfigManager

# Import prompts
from basic_memory.mcp.prompts.continue_conversation import (
    continue_conversation as mcp_continue_conversation,
)
from basic_memory.mcp.prompts.recent_activity import (
    recent_activity_prompt as recent_activity_prompt,
)
from basic_memory.mcp.tools import build_context as mcp_build_context
from basic_memory.mcp.tools import read_note as mcp_read_note
from basic_memory.mcp.tools import recent_activity as mcp_recent_activity
from basic_memory.mcp.tools import search_notes as mcp_search
from basic_memory.mcp.tools import write_note as mcp_write_note
from basic_memory.schemas.base import TimeFrame
from basic_memory.schemas.memory import MemoryUrl
from basic_memory.schemas.search import SearchItemType

tool_app = typer.Typer()
app.add_typer(tool_app, name="tool", help="Access to MCP tools via CLI")


@tool_app.command()
def write_note(
    title: Annotated[str, typer.Option(help="The title of the note")],
    folder: Annotated[str, typer.Option(help="The folder to create the note in")],
    project: Annotated[
        Optional[str],
        typer.Option(
            help="The project to write to. If not provided, the default project will be used."
        ),
    ] = None,
    content: Annotated[
        Optional[str],
        typer.Option(
            help="The content of the note. If not provided, content will be read from stdin. This allows piping content from other commands, e.g.: cat file.md | basic-memory tools write-note"
        ),
    ] = None,
    tags: Annotated[
        Optional[List[str]], typer.Option(help="A list of tags to apply to the note")
    ] = None,
):
    """Create or update a markdown note. Content can be provided as an argument or read from stdin.

    Content can be provided in two ways:
    1. Using the --content parameter
    2. Piping content through stdin (if --content is not provided)

    Examples:

    # Using content parameter
    basic-memory tools write-note --title "My Note" --folder "notes" --content "Note content"

    # Using stdin pipe
    echo "# My Note Content" | basic-memory tools write-note --title "My Note" --folder "notes"

    # Using heredoc
    cat << EOF | basic-memory tools write-note --title "My Note" --folder "notes"
    # My Document

    This is my document content.

    - Point 1
    - Point 2
    EOF

    # Reading from a file
    cat document.md | basic-memory tools write-note --title "Document" --folder "docs"
    """
    try:
        # If content is not provided, read from stdin
        if content is None:
            # Check if we're getting data from a pipe or redirect
            if not sys.stdin.isatty():
                content = sys.stdin.read()
            else:  # pragma: no cover
                # If stdin is a terminal (no pipe/redirect), inform the user
                typer.echo(
                    "No content provided. Please provide content via --content or by piping to stdin.",
                    err=True,
                )
                raise typer.Exit(1)

        # Also check for empty content
        if content is not None and not content.strip():
            typer.echo("Empty content provided. Please provide non-empty content.", err=True)
            raise typer.Exit(1)

        # look for the project in the config
        config_manager = ConfigManager()
        project_name = None
        if project is not None:
            project_name, _ = config_manager.get_project(project)
            if not project_name:
                typer.echo(f"No project found named: {project}", err=True)
                raise typer.Exit(1)

        # use the project name, or the default from the config
        project_name = project_name or config_manager.default_project

        note = asyncio.run(mcp_write_note.fn(title, content, folder, project_name, tags))
        rprint(note)
    except Exception as e:  # pragma: no cover
        if not isinstance(e, typer.Exit):
            typer.echo(f"Error during write_note: {e}", err=True)
            raise typer.Exit(1)
        raise


@tool_app.command()
def read_note(
    identifier: str,
    project: Annotated[
        Optional[str],
        typer.Option(
            help="The project to use for the note. If not provided, the default project will be used."
        ),
    ] = None,
    page: int = 1,
    page_size: int = 10,
):
    """Read a markdown note from the knowledge base."""

    # look for the project in the config
    config_manager = ConfigManager()
    project_name = None
    if project is not None:
        project_name, _ = config_manager.get_project(project)
        if not project_name:
            typer.echo(f"No project found named: {project}", err=True)
            raise typer.Exit(1)

    # use the project name, or the default from the config
    project_name = project_name or config_manager.default_project

    try:
        note = asyncio.run(mcp_read_note.fn(identifier, project_name, page, page_size))
        rprint(note)
    except Exception as e:  # pragma: no cover
        if not isinstance(e, typer.Exit):
            typer.echo(f"Error during read_note: {e}", err=True)
            raise typer.Exit(1)
        raise


@tool_app.command()
def build_context(
    url: MemoryUrl,
    project: Annotated[
        Optional[str],
        typer.Option(help="The project to use. If not provided, the default project will be used."),
    ] = None,
    depth: Optional[int] = 1,
    timeframe: Optional[TimeFrame] = "7d",
    page: int = 1,
    page_size: int = 10,
    max_related: int = 10,
):
    """Get context needed to continue a discussion."""

    # look for the project in the config
    config_manager = ConfigManager()
    project_name = None
    if project is not None:
        project_name, _ = config_manager.get_project(project)
        if not project_name:
            typer.echo(f"No project found named: {project}", err=True)
            raise typer.Exit(1)

    # use the project name, or the default from the config
    project_name = project_name or config_manager.default_project

    try:
        context = asyncio.run(
            mcp_build_context.fn(
                project=project_name,
                url=url,
                depth=depth,
                timeframe=timeframe,
                page=page,
                page_size=page_size,
                max_related=max_related,
            )
        )
        # Use json module for more controlled serialization
        import json

        context_dict = context.model_dump(exclude_none=True)
        print(json.dumps(context_dict, indent=2, ensure_ascii=True, default=str))
    except Exception as e:  # pragma: no cover
        if not isinstance(e, typer.Exit):
            typer.echo(f"Error during build_context: {e}", err=True)
            raise typer.Exit(1)
        raise


@tool_app.command()
def recent_activity(
    type: Annotated[Optional[List[SearchItemType]], typer.Option()] = None,
    depth: Optional[int] = 1,
    timeframe: Optional[TimeFrame] = "7d",
):
    """Get recent activity across the knowledge base."""
    try:
        result = asyncio.run(
            mcp_recent_activity.fn(
                type=type,  # pyright: ignore [reportArgumentType]
                depth=depth,
                timeframe=timeframe,
            )
        )
        # The tool now returns a formatted string directly
        print(result)
    except Exception as e:  # pragma: no cover
        if not isinstance(e, typer.Exit):
            typer.echo(f"Error during recent_activity: {e}", err=True)
            raise typer.Exit(1)
        raise


@tool_app.command("search-notes")
def search_notes(
    query: str,
    permalink: Annotated[bool, typer.Option("--permalink", help="Search permalink values")] = False,
    title: Annotated[bool, typer.Option("--title", help="Search title values")] = False,
    project: Annotated[
        Optional[str],
        typer.Option(
            help="The project to use for the note. If not provided, the default project will be used."
        ),
    ] = None,
    after_date: Annotated[
        Optional[str],
        typer.Option("--after_date", help="Search results after date, eg. '2d', '1 week'"),
    ] = None,
    page: int = 1,
    page_size: int = 10,
):
    """Search across all content in the knowledge base."""

    # look for the project in the config
    config_manager = ConfigManager()
    project_name = None
    if project is not None:
        project_name, _ = config_manager.get_project(project)
        if not project_name:
            typer.echo(f"No project found named: {project}", err=True)
            raise typer.Exit(1)

    # use the project name, or the default from the config
    project_name = project_name or config_manager.default_project

    if permalink and title:  # pragma: no cover
        print("Cannot search both permalink and title")
        raise typer.Abort()

    try:
        if permalink and title:  # pragma: no cover
            typer.echo(
                "Use either --permalink or --title, not both. Exiting.",
                err=True,
            )
            raise typer.Exit(1)

        # set search type
        search_type = ("permalink" if permalink else None,)
        search_type = ("permalink_match" if permalink and "*" in query else None,)
        search_type = ("title" if title else None,)
        search_type = "text" if search_type is None else search_type

        results = asyncio.run(
            mcp_search.fn(
                query,
                project_name,
                search_type=search_type,
                page=page,
                after_date=after_date,
                page_size=page_size,
            )
        )
        # Use json module for more controlled serialization
        import json

        results_dict = results.model_dump(exclude_none=True)
        print(json.dumps(results_dict, indent=2, ensure_ascii=True, default=str))
    except Exception as e:  # pragma: no cover
        if not isinstance(e, typer.Exit):
            logger.exception("Error during search", e)
            typer.echo(f"Error during search: {e}", err=True)
            raise typer.Exit(1)
        raise


@tool_app.command(name="continue-conversation")
def continue_conversation(
    topic: Annotated[Optional[str], typer.Option(help="Topic or keyword to search for")] = None,
    timeframe: Annotated[
        Optional[str], typer.Option(help="How far back to look for activity")
    ] = None,
):
    """Prompt to continue a previous conversation or work session."""
    try:
        # Prompt functions return formatted strings directly
        session = asyncio.run(mcp_continue_conversation.fn(topic=topic, timeframe=timeframe))  # type: ignore
        rprint(session)
    except Exception as e:  # pragma: no cover
        if not isinstance(e, typer.Exit):
            logger.exception("Error continuing conversation", e)
            typer.echo(f"Error continuing conversation: {e}", err=True)
            raise typer.Exit(1)
        raise


# @tool_app.command(name="show-recent-activity")
# def show_recent_activity(
#     timeframe: Annotated[
#         str, typer.Option(help="How far back to look for activity")
#     ] = "7d",
# ):
#     """Prompt to show recent activity."""
#     try:
#         # Prompt functions return formatted strings directly
#         session = asyncio.run(recent_activity_prompt(timeframe=timeframe))
#         rprint(session)
#     except Exception as e:  # pragma: no cover
#         if not isinstance(e, typer.Exit):
#             logger.exception("Error continuing conversation", e)
#             typer.echo(f"Error continuing conversation: {e}", err=True)
#             raise typer.Exit(1)
#         raise

```

--------------------------------------------------------------------------------
/tests/repository/test_relation_repository.py:
--------------------------------------------------------------------------------

```python
"""Tests for the RelationRepository."""

from datetime import datetime, timezone

import pytest
import pytest_asyncio
import sqlalchemy

from basic_memory import db
from basic_memory.models import Entity, Relation, Project
from basic_memory.repository.relation_repository import RelationRepository


@pytest_asyncio.fixture
async def source_entity(session_maker, test_project: Project):
    """Create a source entity for testing relations."""
    entity = Entity(
        project_id=test_project.id,
        title="test_source",
        entity_type="test",
        permalink="source/test-source",
        file_path="source/test_source.md",
        content_type="text/markdown",
        created_at=datetime.now(timezone.utc),
        updated_at=datetime.now(timezone.utc),
    )
    async with db.scoped_session(session_maker) as session:
        session.add(entity)
        await session.flush()
        return entity


@pytest_asyncio.fixture
async def target_entity(session_maker, test_project: Project):
    """Create a target entity for testing relations."""
    entity = Entity(
        project_id=test_project.id,
        title="test_target",
        entity_type="test",
        permalink="target/test-target",
        file_path="target/test_target.md",
        content_type="text/markdown",
        created_at=datetime.now(timezone.utc),
        updated_at=datetime.now(timezone.utc),
    )
    async with db.scoped_session(session_maker) as session:
        session.add(entity)
        await session.flush()
        return entity


@pytest_asyncio.fixture
async def test_relations(session_maker, source_entity, target_entity):
    """Create test relations."""
    relations = [
        Relation(
            from_id=source_entity.id,
            to_id=target_entity.id,
            to_name=target_entity.title,
            relation_type="connects_to",
        ),
        Relation(
            from_id=source_entity.id,
            to_id=target_entity.id,
            to_name=target_entity.title,
            relation_type="depends_on",
        ),
    ]
    async with db.scoped_session(session_maker) as session:
        session.add_all(relations)
        await session.flush()
        return relations


@pytest_asyncio.fixture(scope="function")
async def related_entity(entity_repository):
    """Create a second entity for testing relations"""
    entity_data = {
        "title": "Related Entity",
        "entity_type": "test",
        "permalink": "test/related-entity",
        "file_path": "test/related_entity.md",
        "summary": "A related test entity",
        "content_type": "text/markdown",
        "created_at": datetime.now(timezone.utc),
        "updated_at": datetime.now(timezone.utc),
    }
    return await entity_repository.create(entity_data)


@pytest_asyncio.fixture(scope="function")
async def sample_relation(
    relation_repository: RelationRepository, sample_entity: Entity, related_entity: Entity
):
    """Create a sample relation for testing"""
    relation_data = {
        "from_id": sample_entity.id,
        "to_id": related_entity.id,
        "to_name": related_entity.title,
        "relation_type": "test_relation",
        "context": "test-context",
    }
    return await relation_repository.create(relation_data)


@pytest_asyncio.fixture(scope="function")
async def multiple_relations(
    relation_repository: RelationRepository, sample_entity: Entity, related_entity: Entity
):
    """Create multiple relations for testing"""
    relations_data = [
        {
            "from_id": sample_entity.id,
            "to_id": related_entity.id,
            "to_name": related_entity.title,
            "relation_type": "relation_one",
            "context": "context_one",
        },
        {
            "from_id": sample_entity.id,
            "to_id": related_entity.id,
            "to_name": related_entity.title,
            "relation_type": "relation_two",
            "context": "context_two",
        },
        {
            "from_id": related_entity.id,
            "to_id": sample_entity.id,
            "to_name": related_entity.title,
            "relation_type": "relation_one",
            "context": "context_three",
        },
    ]
    return [await relation_repository.create(data) for data in relations_data]


@pytest.mark.asyncio
async def test_create_relation(
    relation_repository: RelationRepository, sample_entity: Entity, related_entity: Entity
):
    """Test creating a new relation"""
    relation_data = {
        "from_id": sample_entity.id,
        "to_id": related_entity.id,
        "to_name": related_entity.title,
        "relation_type": "test_relation",
        "context": "test-context",
    }
    relation = await relation_repository.create(relation_data)

    assert relation.from_id == sample_entity.id
    assert relation.to_id == related_entity.id
    assert relation.relation_type == "test_relation"
    assert relation.id is not None  # Should be auto-generated


@pytest.mark.asyncio
async def test_create_relation_entity_does_not_exist(
    relation_repository: RelationRepository, sample_entity: Entity, related_entity: Entity
):
    """Test creating a new relation"""
    relation_data = {
        "from_id": "not_exist",
        "to_id": related_entity.id,
        "to_name": related_entity.title,
        "relation_type": "test_relation",
        "context": "test-context",
    }
    with pytest.raises(sqlalchemy.exc.IntegrityError):
        await relation_repository.create(relation_data)


@pytest.mark.asyncio
async def test_find_by_entities(
    relation_repository: RelationRepository,
    sample_relation: Relation,
    sample_entity: Entity,
    related_entity: Entity,
):
    """Test finding relations between specific entities"""
    relations = await relation_repository.find_by_entities(sample_entity.id, related_entity.id)
    assert len(relations) == 1
    assert relations[0].id == sample_relation.id
    assert relations[0].relation_type == sample_relation.relation_type


@pytest.mark.asyncio
async def test_find_relation(relation_repository: RelationRepository, sample_relation: Relation):
    """Test finding relations by type"""
    relation = await relation_repository.find_relation(
        from_permalink=sample_relation.from_entity.permalink,
        to_permalink=sample_relation.to_entity.permalink,
        relation_type=sample_relation.relation_type,
    )
    assert relation.id == sample_relation.id


@pytest.mark.asyncio
async def test_find_by_type(relation_repository: RelationRepository, sample_relation: Relation):
    """Test finding relations by type"""
    relations = await relation_repository.find_by_type("test_relation")
    assert len(relations) == 1
    assert relations[0].id == sample_relation.id


@pytest.mark.asyncio
async def test_find_unresolved_relations(
    relation_repository: RelationRepository, sample_entity: Entity, related_entity: Entity
):
    """Test creating a new relation"""
    relation_data = {
        "from_id": sample_entity.id,
        "to_id": None,
        "to_name": related_entity.title,
        "relation_type": "test_relation",
        "context": "test-context",
    }
    relation = await relation_repository.create(relation_data)

    assert relation.from_id == sample_entity.id
    assert relation.to_id is None

    unresolved = await relation_repository.find_unresolved_relations()
    assert len(unresolved) == 1
    assert unresolved[0].id == relation.id


@pytest.mark.asyncio
async def test_delete_by_fields_single_field(
    relation_repository: RelationRepository, multiple_relations: list[Relation]
):
    """Test deleting relations by a single field."""
    # Delete all relations of type 'relation_one'
    result = await relation_repository.delete_by_fields(relation_type="relation_one")  # pyright: ignore [reportArgumentType]
    assert result is True

    # Verify deletion
    remaining = await relation_repository.find_by_type("relation_one")
    assert len(remaining) == 0

    # Other relations should still exist
    others = await relation_repository.find_by_type("relation_two")
    assert len(others) == 1


@pytest.mark.asyncio
async def test_delete_by_fields_multiple_fields(
    relation_repository: RelationRepository,
    multiple_relations: list[Relation],
    sample_entity: Entity,
    related_entity: Entity,
):
    """Test deleting relations by multiple fields."""
    # Delete specific relation matching both from_id and relation_type
    result = await relation_repository.delete_by_fields(
        from_id=sample_entity.id,  # pyright: ignore [reportArgumentType]
        relation_type="relation_one",  # pyright: ignore [reportArgumentType]
    )
    assert result is True

    # Verify correct relation was deleted
    remaining = await relation_repository.find_by_entities(sample_entity.id, related_entity.id)
    assert len(remaining) == 1  # Only relation_two should remain
    assert remaining[0].relation_type == "relation_two"


@pytest.mark.asyncio
async def test_delete_by_fields_no_match(
    relation_repository: RelationRepository, multiple_relations: list[Relation]
):
    """Test delete_by_fields when no relations match."""
    result = await relation_repository.delete_by_fields(
        relation_type="nonexistent_type"  # pyright: ignore [reportArgumentType]
    )
    assert result is False


@pytest.mark.asyncio
async def test_delete_by_fields_all_fields(
    relation_repository: RelationRepository,
    multiple_relations: list[Relation],
    sample_entity: Entity,
    related_entity: Entity,
):
    """Test deleting relation by matching all fields."""
    # Get first relation's data
    relation = multiple_relations[0]

    # Delete using all fields
    result = await relation_repository.delete_by_fields(
        from_id=relation.from_id,  # pyright: ignore [reportArgumentType]
        to_id=relation.to_id,  # pyright: ignore [reportArgumentType]
        relation_type=relation.relation_type,  # pyright: ignore [reportArgumentType]
    )
    assert result is True

    # Verify only exact match was deleted
    remaining = await relation_repository.find_by_type(relation.relation_type)
    assert len(remaining) == 1  # One other relation_one should remain


@pytest.mark.asyncio
async def test_delete_relation_by_id(relation_repository, test_relations):
    """Test deleting a relation by ID."""
    relation = test_relations[0]

    result = await relation_repository.delete(relation.id)
    assert result is True

    # Verify deletion
    remaining = await relation_repository.find_one(
        relation_repository.select(Relation).filter(Relation.id == relation.id)
    )
    assert remaining is None


@pytest.mark.asyncio
async def test_delete_relations_by_type(relation_repository, test_relations):
    """Test deleting relations by type."""
    result = await relation_repository.delete_by_fields(relation_type="connects_to")
    assert result is True

    # Verify specific type was deleted
    remaining = await relation_repository.find_by_type("connects_to")
    assert len(remaining) == 0

    # Verify other type still exists
    others = await relation_repository.find_by_type("depends_on")
    assert len(others) == 1


@pytest.mark.asyncio
async def test_delete_relations_by_entities(
    relation_repository, test_relations, source_entity, target_entity
):
    """Test deleting relations between specific entities."""
    result = await relation_repository.delete_by_fields(
        from_id=source_entity.id, to_id=target_entity.id
    )
    assert result is True

    # Verify all relations between entities were deleted
    remaining = await relation_repository.find_by_entities(source_entity.id, target_entity.id)
    assert len(remaining) == 0


@pytest.mark.asyncio
async def test_delete_nonexistent_relation(relation_repository):
    """Test deleting a relation that doesn't exist."""
    result = await relation_repository.delete_by_fields(relation_type="nonexistent")
    assert result is False

```

--------------------------------------------------------------------------------
/test-int/mcp/test_read_content_integration.py:
--------------------------------------------------------------------------------

```python
"""
Integration tests for read_content MCP tool.

Comprehensive tests covering text files, binary files, images, error cases,
and memory:// URL handling via the complete MCP client-server flow.
"""

import json
import pytest
from fastmcp import Client
from fastmcp.exceptions import ToolError


def parse_read_content_response(mcp_result):
    """Helper function to parse read_content MCP response."""
    assert len(mcp_result.content) == 1
    assert mcp_result.content[0].type == "text"
    return json.loads(mcp_result.content[0].text)


@pytest.mark.asyncio
async def test_read_content_markdown_file(mcp_server, app, test_project):
    """Test reading a markdown file created by write_note."""

    async with Client(mcp_server) as client:
        # First create a note
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Content Test",
                "folder": "test",
                "content": "# Content Test\n\nThis is test content with **markdown**.",
                "tags": "test,content",
            },
        )

        # Then read the raw file content
        read_result = await client.call_tool(
            "read_content",
            {
                "project": test_project.name,
                "path": "test/Content Test.md",
            },
        )

        # Parse the response
        response_data = parse_read_content_response(read_result)

        assert response_data["type"] == "text"
        assert response_data["content_type"] == "text/markdown; charset=utf-8"
        assert response_data["encoding"] == "utf-8"

        content = response_data["text"]

        # Should contain the raw markdown with frontmatter
        assert "# Content Test" in content
        assert "This is test content with **markdown**." in content
        assert "tags:" in content  # frontmatter
        assert "- test" in content  # tags are in YAML list format
        assert "- content" in content


@pytest.mark.asyncio
async def test_read_content_by_permalink(mcp_server, app, test_project):
    """Test reading content using permalink instead of file path."""

    async with Client(mcp_server) as client:
        # Create a note
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Permalink Test",
                "folder": "docs",
                "content": "# Permalink Test\n\nTesting permalink-based content reading.",
            },
        )

        # Read by permalink (without .md extension)
        read_result = await client.call_tool(
            "read_content",
            {
                "project": test_project.name,
                "path": "docs/permalink-test",
            },
        )

        # Parse the response
        response_data = parse_read_content_response(read_result)
        content = response_data["text"]

        assert "# Permalink Test" in content
        assert "Testing permalink-based content reading." in content


@pytest.mark.asyncio
async def test_read_content_memory_url(mcp_server, app, test_project):
    """Test reading content using memory:// URL format."""

    async with Client(mcp_server) as client:
        # Create a note
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Memory URL Test",
                "folder": "test",
                "content": "# Memory URL Test\n\nTesting memory:// URL handling.",
                "tags": "memory,url",
            },
        )

        # Read using memory:// URL
        read_result = await client.call_tool(
            "read_content",
            {
                "project": test_project.name,
                "path": "memory://test/memory-url-test",
            },
        )

        # Parse the response
        response_data = parse_read_content_response(read_result)
        content = response_data["text"]

        assert "# Memory URL Test" in content
        assert "Testing memory:// URL handling." in content


@pytest.mark.asyncio
async def test_read_content_unicode_file(mcp_server, app, test_project):
    """Test reading content with unicode characters and emojis."""

    async with Client(mcp_server) as client:
        # Create a note with unicode content
        unicode_content = (
            "# Unicode Test 🚀\n\nThis note has emoji 🎉 and unicode ♠♣♥♦\n\n测试中文内容"
        )

        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Unicode Content Test",
                "folder": "test",
                "content": unicode_content,
                "tags": "unicode,emoji",
            },
        )

        # Read the content back
        read_result = await client.call_tool(
            "read_content",
            {
                "project": test_project.name,
                "path": "test/Unicode Content Test.md",
            },
        )

        # Parse the response
        response_data = parse_read_content_response(read_result)
        content = response_data["text"]

        # All unicode content should be preserved
        assert "🚀" in content
        assert "🎉" in content
        assert "♠♣♥♦" in content
        assert "测试中文内容" in content


@pytest.mark.asyncio
async def test_read_content_complex_frontmatter(mcp_server, app, test_project):
    """Test reading content with complex frontmatter and markdown."""

    async with Client(mcp_server) as client:
        # Create a note with complex content
        complex_content = """---
title: Complex Note
type: document
version: 1.0
author: Test Author
metadata:
  status: draft
  priority: high
---

# Complex Note

This note has complex frontmatter and various markdown elements.

## Observations
- [tech] Uses YAML frontmatter
- [design] Structured content format

## Relations
- related_to [[Other Note]]
- depends_on [[Framework]]

Regular markdown content continues here."""

        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Complex Note",
                "folder": "docs",
                "content": complex_content,
                "tags": "complex,frontmatter",
            },
        )

        # Read the content back
        read_result = await client.call_tool(
            "read_content",
            {
                "project": test_project.name,
                "path": "docs/Complex Note.md",
            },
        )

        # Parse the response
        response_data = parse_read_content_response(read_result)
        content = response_data["text"]

        # Should preserve all frontmatter and content structure
        assert "version: 1.0" in content
        assert "author: Test Author" in content
        assert "status: draft" in content
        assert "[tech] Uses YAML frontmatter" in content
        assert "[[Other Note]]" in content


@pytest.mark.asyncio
async def test_read_content_missing_file(mcp_server, app, test_project):
    """Test reading a file that doesn't exist."""

    async with Client(mcp_server) as client:
        try:
            await client.call_tool(
                "read_content",
                {
                    "project": test_project.name,
                    "path": "nonexistent/file.md",
                },
            )
            # Should not reach here - expecting an error
            assert False, "Expected error for missing file"
        except ToolError as e:
            # Should get an appropriate error message
            error_msg = str(e).lower()
            assert "not found" in error_msg or "does not exist" in error_msg


@pytest.mark.asyncio
async def test_read_content_empty_file(mcp_server, app, test_project):
    """Test reading an empty file."""

    async with Client(mcp_server) as client:
        # Create a note with minimal content
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Empty Test",
                "folder": "test",
                "content": "",  # Empty content
            },
        )

        # Read the content back
        read_result = await client.call_tool(
            "read_content",
            {
                "project": test_project.name,
                "path": "test/Empty Test.md",
            },
        )

        # Parse the response
        response_data = parse_read_content_response(read_result)
        content = response_data["text"]

        # Should still have frontmatter even with empty content
        assert "title: Empty Test" in content
        assert "permalink: test/empty-test" in content


@pytest.mark.asyncio
async def test_read_content_large_file(mcp_server, app, test_project):
    """Test reading a file with substantial content."""

    async with Client(mcp_server) as client:
        # Create a note with substantial content
        large_content = "# Large Content Test\n\n"

        # Add multiple sections with substantial text
        for i in range(10):
            large_content += f"""
## Section {i + 1}

This is section {i + 1} with substantial content. Lorem ipsum dolor sit amet,
consectetur adipiscing elit. Sed do eiusmod tempor incididunt ut labore et
dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation.

- [note] This is observation {i + 1}
- related_to [[Section {i}]]

Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore
eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident.

"""

        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Large Content Note",
                "folder": "test",
                "content": large_content,
                "tags": "large,content,test",
            },
        )

        # Read the content back
        read_result = await client.call_tool(
            "read_content",
            {
                "project": test_project.name,
                "path": "test/Large Content Note.md",
            },
        )

        # Parse the response
        response_data = parse_read_content_response(read_result)
        content = response_data["text"]

        # Should contain all sections
        assert "Section 1" in content
        assert "Section 10" in content
        assert "Lorem ipsum" in content
        assert len(content) > 1000  # Should be substantial


@pytest.mark.asyncio
async def test_read_content_special_characters_in_filename(mcp_server, app, test_project):
    """Test reading files with special characters in the filename."""

    async with Client(mcp_server) as client:
        # Create notes with special characters in titles
        test_cases = [
            ("File with spaces", "test"),
            ("File-with-dashes", "test"),
            ("File_with_underscores", "test"),
            ("File (with parentheses)", "test"),
            ("File & Symbols!", "test"),
        ]

        for title, folder in test_cases:
            await client.call_tool(
                "write_note",
                {
                    "project": test_project.name,
                    "title": title,
                    "folder": folder,
                    "content": f"# {title}\n\nContent for {title}",
                },
            )

            # Read the content back using the exact filename
            read_result = await client.call_tool(
                "read_content",
                {
                    "project": test_project.name,
                    "path": f"{folder}/{title}.md",
                },
            )

            assert len(read_result.content) == 1
            assert read_result.content[0].type == "text"
            content = read_result.content[0].text

            assert f"# {title}" in content
            assert f"Content for {title}" in content

```

--------------------------------------------------------------------------------
/tests/test_config.py:
--------------------------------------------------------------------------------

```python
"""Test configuration management."""

import tempfile
import pytest

from basic_memory.config import BasicMemoryConfig, ConfigManager
from pathlib import Path


class TestBasicMemoryConfig:
    """Test BasicMemoryConfig behavior with BASIC_MEMORY_HOME environment variable."""

    def test_default_behavior_without_basic_memory_home(self, config_home, monkeypatch):
        """Test that config uses default path when BASIC_MEMORY_HOME is not set."""
        # Ensure BASIC_MEMORY_HOME is not set
        monkeypatch.delenv("BASIC_MEMORY_HOME", raising=False)

        config = BasicMemoryConfig()

        # Should use the default path (home/basic-memory)
        expected_path = (config_home / "basic-memory").as_posix()
        assert config.projects["main"] == Path(expected_path).as_posix()

    def test_respects_basic_memory_home_environment_variable(self, config_home, monkeypatch):
        """Test that config respects BASIC_MEMORY_HOME environment variable."""
        custom_path = (config_home / "app" / "data").as_posix()
        monkeypatch.setenv("BASIC_MEMORY_HOME", custom_path)

        config = BasicMemoryConfig()

        # Should use the custom path from environment variable
        assert config.projects["main"] == custom_path

    def test_model_post_init_respects_basic_memory_home(self, config_home, monkeypatch):
        """Test that model_post_init creates main project with BASIC_MEMORY_HOME when missing."""
        custom_path = str(config_home / "custom" / "memory" / "path")
        monkeypatch.setenv("BASIC_MEMORY_HOME", custom_path)

        # Create config without main project
        other_path = str(config_home / "some" / "path")
        config = BasicMemoryConfig(projects={"other": other_path})

        # model_post_init should have added main project with BASIC_MEMORY_HOME
        assert "main" in config.projects
        assert config.projects["main"] == Path(custom_path).as_posix()

    def test_model_post_init_fallback_without_basic_memory_home(self, config_home, monkeypatch):
        """Test that model_post_init falls back to default when BASIC_MEMORY_HOME is not set."""
        # Ensure BASIC_MEMORY_HOME is not set
        monkeypatch.delenv("BASIC_MEMORY_HOME", raising=False)

        # Create config without main project
        other_path = (config_home / "some" / "path").as_posix()
        config = BasicMemoryConfig(projects={"other": other_path})

        # model_post_init should have added main project with default path
        expected_path = (config_home / "basic-memory").as_posix()
        assert "main" in config.projects
        assert config.projects["main"] == Path(expected_path).as_posix()

    def test_basic_memory_home_with_relative_path(self, config_home, monkeypatch):
        """Test that BASIC_MEMORY_HOME works with relative paths."""
        relative_path = "relative/memory/path"
        monkeypatch.setenv("BASIC_MEMORY_HOME", relative_path)

        config = BasicMemoryConfig()

        # Should use the exact value from environment variable
        assert config.projects["main"] == relative_path

    def test_basic_memory_home_overrides_existing_main_project(self, config_home, monkeypatch):
        """Test that BASIC_MEMORY_HOME is not used when a map is passed in the constructor."""
        custom_path = str(config_home / "override" / "memory" / "path")
        monkeypatch.setenv("BASIC_MEMORY_HOME", custom_path)

        # Try to create config with a different main project path
        original_path = str(config_home / "original" / "path")
        config = BasicMemoryConfig(projects={"main": original_path})

        # The default_factory should override with BASIC_MEMORY_HOME value
        # Note: This tests the current behavior where default_factory takes precedence
        assert config.projects["main"] == original_path


class TestConfigManager:
    """Test ConfigManager functionality."""

    @pytest.fixture
    def temp_config_manager(self):
        """Create a ConfigManager with temporary config file."""
        with tempfile.TemporaryDirectory() as temp_dir:
            temp_path = Path(temp_dir)

            # Create a test ConfigManager instance
            config_manager = ConfigManager()
            # Override config paths to use temp directory
            config_manager.config_dir = temp_path / "basic-memory"
            config_manager.config_file = config_manager.config_dir / "config.yaml"
            config_manager.config_dir.mkdir(parents=True, exist_ok=True)

            # Create initial config with test projects
            test_config = BasicMemoryConfig(
                default_project="main",
                projects={
                    "main": str(temp_path / "main"),
                    "test-project": str(temp_path / "test"),
                    "special-chars": str(
                        temp_path / "special"
                    ),  # This will be the config key for "Special/Chars"
                },
            )
            config_manager.save_config(test_config)

            yield config_manager

    def test_set_default_project_with_exact_name_match(self, temp_config_manager):
        """Test set_default_project when project name matches config key exactly."""
        config_manager = temp_config_manager

        # Set default to a project that exists with exact name match
        config_manager.set_default_project("test-project")

        # Verify the config was updated
        config = config_manager.load_config()
        assert config.default_project == "test-project"

    def test_set_default_project_with_permalink_lookup(self, temp_config_manager):
        """Test set_default_project when input needs permalink normalization."""
        config_manager = temp_config_manager

        # Simulate a project that was created with special characters
        # The config key would be the permalink, but user might type the original name

        # First add a project with original name that gets normalized
        config = config_manager.load_config()
        config.projects["special-chars-project"] = str(Path("/tmp/special"))
        config_manager.save_config(config)

        # Now test setting default using a name that will normalize to the config key
        config_manager.set_default_project(
            "Special Chars Project"
        )  # This should normalize to "special-chars-project"

        # Verify the config was updated with the correct config key
        updated_config = config_manager.load_config()
        assert updated_config.default_project == "special-chars-project"

    def test_set_default_project_uses_canonical_name(self, temp_config_manager):
        """Test that set_default_project uses the canonical config key, not user input."""
        config_manager = temp_config_manager

        # Add a project with a config key that differs from user input
        config = config_manager.load_config()
        config.projects["my-test-project"] = str(Path("/tmp/mytest"))
        config_manager.save_config(config)

        # Set default using input that will match but is different from config key
        config_manager.set_default_project("My Test Project")  # Should find "my-test-project"

        # Verify that the canonical config key is used, not the user input
        updated_config = config_manager.load_config()
        assert updated_config.default_project == "my-test-project"
        # Should NOT be the user input
        assert updated_config.default_project != "My Test Project"

    def test_set_default_project_nonexistent_project(self, temp_config_manager):
        """Test set_default_project raises ValueError for nonexistent project."""
        config_manager = temp_config_manager

        with pytest.raises(ValueError, match="Project 'nonexistent' not found"):
            config_manager.set_default_project("nonexistent")

    def test_disable_permalinks_flag_default(self):
        """Test that disable_permalinks flag defaults to False."""
        config = BasicMemoryConfig()
        assert config.disable_permalinks is False

    def test_disable_permalinks_flag_can_be_enabled(self):
        """Test that disable_permalinks flag can be set to True."""
        config = BasicMemoryConfig(disable_permalinks=True)
        assert config.disable_permalinks is True

    def test_config_manager_respects_custom_config_dir(self, monkeypatch):
        """Test that ConfigManager respects BASIC_MEMORY_CONFIG_DIR environment variable."""
        with tempfile.TemporaryDirectory() as temp_dir:
            custom_config_dir = Path(temp_dir) / "custom" / "config"
            monkeypatch.setenv("BASIC_MEMORY_CONFIG_DIR", str(custom_config_dir))

            config_manager = ConfigManager()

            # Verify config_dir is set to the custom path
            assert config_manager.config_dir == custom_config_dir
            # Verify config_file is in the custom directory
            assert config_manager.config_file == custom_config_dir / "config.json"
            # Verify the directory was created
            assert config_manager.config_dir.exists()

    def test_config_manager_default_without_custom_config_dir(self, config_home, monkeypatch):
        """Test that ConfigManager uses default location when BASIC_MEMORY_CONFIG_DIR is not set."""
        monkeypatch.delenv("BASIC_MEMORY_CONFIG_DIR", raising=False)

        config_manager = ConfigManager()

        # Should use default location
        assert config_manager.config_dir == config_home / ".basic-memory"
        assert config_manager.config_file == config_home / ".basic-memory" / "config.json"

    def test_remove_project_with_exact_name_match(self, temp_config_manager):
        """Test remove_project when project name matches config key exactly."""
        config_manager = temp_config_manager

        # Verify project exists
        config = config_manager.load_config()
        assert "test-project" in config.projects

        # Remove the project with exact name match
        config_manager.remove_project("test-project")

        # Verify the project was removed
        config = config_manager.load_config()
        assert "test-project" not in config.projects

    def test_remove_project_with_permalink_lookup(self, temp_config_manager):
        """Test remove_project when input needs permalink normalization."""
        config_manager = temp_config_manager

        # Add a project with normalized key
        config = config_manager.load_config()
        config.projects["special-chars-project"] = str(Path("/tmp/special"))
        config_manager.save_config(config)

        # Remove using a name that will normalize to the config key
        config_manager.remove_project(
            "Special Chars Project"
        )  # This should normalize to "special-chars-project"

        # Verify the project was removed using the correct config key
        updated_config = config_manager.load_config()
        assert "special-chars-project" not in updated_config.projects

    def test_remove_project_uses_canonical_name(self, temp_config_manager):
        """Test that remove_project uses the canonical config key, not user input."""
        config_manager = temp_config_manager

        # Add a project with a config key that differs from user input
        config = config_manager.load_config()
        config.projects["my-test-project"] = str(Path("/tmp/mytest"))
        config_manager.save_config(config)

        # Remove using input that will match but is different from config key
        config_manager.remove_project("My Test Project")  # Should find "my-test-project"

        # Verify that the canonical config key was removed
        updated_config = config_manager.load_config()
        assert "my-test-project" not in updated_config.projects

    def test_remove_project_nonexistent_project(self, temp_config_manager):
        """Test remove_project raises ValueError for nonexistent project."""
        config_manager = temp_config_manager

        with pytest.raises(ValueError, match="Project 'nonexistent' not found"):
            config_manager.remove_project("nonexistent")

    def test_remove_project_cannot_remove_default(self, temp_config_manager):
        """Test remove_project raises ValueError when trying to remove default project."""
        config_manager = temp_config_manager

        # Try to remove the default project
        with pytest.raises(ValueError, match="Cannot remove the default project"):
            config_manager.remove_project("main")

```

--------------------------------------------------------------------------------
/v15-docs/chatgpt-integration.md:
--------------------------------------------------------------------------------

```markdown
# ChatGPT MCP Integration

**Status**: New Feature
**PR**: #305
**File**: `mcp/tools/chatgpt_tools.py`
**Mode**: Remote MCP only

## What's New

v0.15.0 introduces ChatGPT-specific MCP tools that expose Basic Memory's search and fetch functionality using OpenAI's required tool schema and response format.

## Requirements

### ChatGPT Plus/Pro Subscription

**Required:** ChatGPT Plus or Pro subscription
- Free tier does NOT support MCP
- Pro tier includes MCP support

**Pricing:**
- ChatGPT Plus: $20/month
- ChatGPT Pro: $200/month (includes advanced features)

### Developer Mode

**Required:** ChatGPT Developer Mode
- Access to MCP server configuration
- Ability to add custom MCP servers

**Enable Developer Mode:**
1. Open ChatGPT settings
2. Navigate to "Advanced" or "Developer" settings
3. Enable "Developer Mode"
4. Restart ChatGPT

### Remote MCP Configuration

**Important:** ChatGPT only supports **remote MCP servers**
- Cannot use local MCP (like Claude Desktop)
- Requires publicly accessible MCP server
- Basic Memory must be deployed and reachable

## How It Works

### ChatGPT-Specific Format

OpenAI requires MCP responses in a specific format:

**Standard MCP (Claude, etc.):**
```json
{
  "results": [...],
  "total": 10
}
```

**ChatGPT MCP:**
```json
[
  {
    "type": "text",
    "text": "{\"results\": [...], \"total\": 10}"
  }
]
```

**Key difference:** ChatGPT expects content wrapped in `[{"type": "text", "text": "..."}]` array

### Adapter Architecture

```
ChatGPT Request
    ↓
ChatGPT MCP Tools (chatgpt_tools.py)
    ↓
Standard Basic Memory Tools (search_notes, read_note)
    ↓
Format for ChatGPT
    ↓
[{"type": "text", "text": "{...json...}"}]
    ↓
ChatGPT Response
```

## Available Tools

### 1. search

Search across the knowledge base.

**Tool Definition:**
```json
{
  "name": "search",
  "description": "Search for content across the knowledge base",
  "inputSchema": {
    "type": "object",
    "properties": {
      "query": {
        "type": "string",
        "description": "Search query"
      }
    },
    "required": ["query"]
  }
}
```

**Example Request:**
```json
{
  "query": "authentication system"
}
```

**Example Response:**
```json
[
  {
    "type": "text",
    "text": "{\"results\": [{\"id\": \"auth-design\", \"title\": \"Authentication Design\", \"url\": \"auth-design\"}], \"total_count\": 1, \"query\": \"authentication system\"}"
  }
]
```

**Parsed JSON:**
```json
{
  "results": [
    {
      "id": "auth-design",
      "title": "Authentication Design",
      "url": "auth-design"
    }
  ],
  "total_count": 1,
  "query": "authentication system"
}
```

### 2. fetch

Fetch full contents of a document.

**Tool Definition:**
```json
{
  "name": "fetch",
  "description": "Fetch the full contents of a search result document",
  "inputSchema": {
    "type": "object",
    "properties": {
      "id": {
        "type": "string",
        "description": "Document identifier"
      }
    },
    "required": ["id"]
  }
}
```

**Example Request:**
```json
{
  "id": "auth-design"
}
```

**Example Response:**
```json
[
  {
    "type": "text",
    "text": "{\"id\": \"auth-design\", \"title\": \"Authentication Design\", \"text\": \"# Authentication Design\\n\\n...\", \"url\": \"auth-design\", \"metadata\": {\"format\": \"markdown\"}}"
  }
]
```

**Parsed JSON:**
```json
{
  "id": "auth-design",
  "title": "Authentication Design",
  "text": "# Authentication Design\n\n...",
  "url": "auth-design",
  "metadata": {
    "format": "markdown"
  }
}
```

## Configuration

### Remote MCP Server Setup

**Option 1: Deploy to Cloud**

```bash
# Deploy Basic Memory to cloud provider
# Ensure publicly accessible

# Example: Deploy to Fly.io
fly deploy

# Get URL
export MCP_SERVER_URL=https://your-app.fly.dev
```

**Option 2: Use ngrok for Testing**

```bash
# Start Basic Memory locally
bm mcp --port 8000

# Expose via ngrok
ngrok http 8000

# Get public URL
# → https://abc123.ngrok.io
```

### ChatGPT MCP Configuration

**In ChatGPT Developer Mode:**

```json
{
  "mcpServers": {
    "basic-memory": {
      "url": "https://your-server.com/mcp",
      "apiKey": "your-api-key-if-needed"
    }
  }
}
```

**Environment Variables (if using auth):**
```bash
export BASIC_MEMORY_API_KEY=your-secret-key
```

## Usage Examples

### Search Workflow

**User asks ChatGPT:**
> "Search my knowledge base for authentication notes"

**ChatGPT internally calls:**
```json
{
  "tool": "search",
  "arguments": {
    "query": "authentication notes"
  }
}
```

**Basic Memory responds:**
```json
[{
  "type": "text",
  "text": "{\"results\": [{\"id\": \"auth-design\", \"title\": \"Auth Design\", \"url\": \"auth-design\"}, {\"id\": \"oauth-setup\", \"title\": \"OAuth Setup\", \"url\": \"oauth-setup\"}], \"total_count\": 2, \"query\": \"authentication notes\"}"
}]
```

**ChatGPT displays:**
> I found 2 documents about authentication:
> 1. Auth Design
> 2. OAuth Setup

### Fetch Workflow

**User asks ChatGPT:**
> "Show me the Auth Design document"

**ChatGPT internally calls:**
```json
{
  "tool": "fetch",
  "arguments": {
    "id": "auth-design"
  }
}
```

**Basic Memory responds:**
```json
[{
  "type": "text",
  "text": "{\"id\": \"auth-design\", \"title\": \"Auth Design\", \"text\": \"# Auth Design\\n\\n## Overview\\n...full content...\", \"url\": \"auth-design\", \"metadata\": {\"format\": \"markdown\"}}"
}]
```

**ChatGPT displays:**
> Here's the Auth Design document:
>
> # Auth Design
>
> ## Overview
> ...

## Response Schema

### Search Response

```typescript
{
  results: Array<{
    id: string,        // Document permalink
    title: string,     // Document title
    url: string        // Document URL/permalink
  }>,
  total_count: number, // Total results found
  query: string        // Original query echoed back
}
```

### Fetch Response

```typescript
{
  id: string,          // Document identifier
  title: string,       // Document title
  text: string,        // Full markdown content
  url: string,         // Document URL/permalink
  metadata: {
    format: string     // "markdown"
  }
}
```

### Error Response

```typescript
{
  results: [],         // Empty for search
  error: string,       // Error type
  error_message: string // Error details
}
```

## Differences from Standard Tools

### ChatGPT Tools vs Standard MCP Tools

| Feature | ChatGPT Tools | Standard Tools |
|---------|---------------|----------------|
| **Tool Names** | `search`, `fetch` | `search_notes`, `read_note` |
| **Response Format** | `[{"type": "text", "text": "..."}]` | Direct JSON |
| **Parameters** | Minimal (query, id) | Rich (project, page, filters) |
| **Project Selection** | Automatic | Explicit or default_project_mode |
| **Pagination** | Fixed (10 results) | Configurable |
| **Error Handling** | JSON error objects | Direct error messages |

### Automatic Defaults

ChatGPT tools use sensible defaults:

```python
# search tool defaults
page = 1
page_size = 10
search_type = "text"
project = None  # Auto-resolved

# fetch tool defaults
page = 1
page_size = 10
project = None  # Auto-resolved
```

## Project Resolution

### Automatic Project Selection

ChatGPT tools use automatic project resolution:

1. **CLI constraint** (if `--project` flag used)
2. **default_project_mode** (if enabled in config)
3. **Error** if no project can be resolved

**Recommended Setup:**
```json
// ~/.basic-memory/config.json
{
  "default_project": "main",
  "default_project_mode": true
}
```

This ensures ChatGPT tools work without explicit project parameters.

## Error Handling

### Search Errors

```json
[{
  "type": "text",
  "text": "{\"results\": [], \"error\": \"Search failed\", \"error_details\": \"Project not found\"}"
}]
```

### Fetch Errors

```json
[{
  "type": "text",
  "text": "{\"id\": \"missing-doc\", \"title\": \"Fetch Error\", \"text\": \"Failed to fetch document: Not found\", \"url\": \"missing-doc\", \"metadata\": {\"error\": \"Fetch failed\"}}"
}]
```

### Common Errors

**No project found:**
```json
{
  "error": "Project required",
  "error_message": "No project specified and default_project_mode not enabled"
}
```

**Document not found:**
```json
{
  "id": "doc-123",
  "title": "Document Not Found",
  "text": "# Note Not Found\n\nThe requested document 'doc-123' could not be found",
  "metadata": {"error": "Document not found"}
}
```

## Deployment Patterns

### Production Deployment

**1. Deploy to Cloud:**
```bash
# Docker deployment
docker build -t basic-memory .
docker run -p 8000:8000 \
  -e BASIC_MEMORY_API_URL=https://api.basicmemory.cloud \
  basic-memory mcp --port 8000

# Or use managed hosting
fly deploy
```

**2. Configure ChatGPT:**
```json
{
  "mcpServers": {
    "basic-memory": {
      "url": "https://your-app.fly.dev/mcp"
    }
  }
}
```

**3. Enable default_project_mode:**
```json
{
  "default_project_mode": true,
  "default_project": "main"
}
```

### Development/Testing

**1. Use ngrok:**
```bash
# Terminal 1: Start MCP server
bm mcp --port 8000

# Terminal 2: Expose with ngrok
ngrok http 8000
# → https://abc123.ngrok.io
```

**2. Configure ChatGPT:**
```json
{
  "mcpServers": {
    "basic-memory-dev": {
      "url": "https://abc123.ngrok.io/mcp"
    }
  }
}
```

## Limitations

### ChatGPT-Specific Constraints

1. **Remote only** - Cannot use local MCP server
2. **No streaming** - Results returned all at once
3. **Fixed pagination** - 10 results per search
4. **Simplified parameters** - Cannot specify advanced filters
5. **No project selection** - Must use default_project_mode
6. **Subscription required** - ChatGPT Plus/Pro only

### Workarounds

**For more results:**
- Refine search query
- Use fetch to get full documents
- Deploy multiple searches

**For project selection:**
- Enable default_project_mode
- Or deploy separate instances per project

**For advanced features:**
- Use Claude Desktop with full MCP tools
- Or use Basic Memory CLI directly

## Troubleshooting

### ChatGPT Can't Connect

**Problem:** ChatGPT shows "MCP server unavailable"

**Solutions:**
1. Verify server is publicly accessible
   ```bash
   curl https://your-server.com/mcp/health
   ```

2. Check firewall/security groups
3. Verify HTTPS (not HTTP)
4. Check API key if using auth

### No Results Returned

**Problem:** Search returns empty results

**Solutions:**
1. Check default_project_mode enabled
   ```json
   {"default_project_mode": true}
   ```

2. Verify data is synced
   ```bash
   bm sync --project main
   ```

3. Test search locally
   ```bash
   bm tools search --query "test"
   ```

### Format Errors

**Problem:** ChatGPT shows parsing errors

**Check response format:**
```python
# Must be wrapped array
[{"type": "text", "text": "{...json...}"}]

# NOT direct JSON
{"results": [...]}
```

### Developer Mode Not Available

**Problem:** Can't find Developer Mode in ChatGPT

**Solution:**
- Ensure ChatGPT Plus/Pro subscription
- Check for feature rollout (may not be available in all regions)
- Contact OpenAI support

## Best Practices

### 1. Enable default_project_mode

```json
{
  "default_project_mode": true,
  "default_project": "main"
}
```

### 2. Use Cloud Deployment

Don't rely on ngrok for production:
```bash
# Production deployment
fly deploy
# or
railway up
# or
vercel deploy
```

### 3. Monitor Usage

```bash
# Enable logging
export BASIC_MEMORY_LOG_LEVEL=INFO

# Monitor requests
tail -f /var/log/basic-memory/mcp.log
```

### 4. Secure Your Server

```bash
# Use API key authentication
export BASIC_MEMORY_API_KEY=secret

# Restrict CORS
export BASIC_MEMORY_ALLOWED_ORIGINS=https://chatgpt.com
```

### 5. Test Locally First

```bash
# Test with curl
curl -X POST https://your-server.com/mcp/tools/search \
  -H "Content-Type: application/json" \
  -d '{"query": "test"}'
```

## Comparison with Claude Desktop

| Feature | ChatGPT | Claude Desktop |
|---------|---------|----------------|
| **MCP Mode** | Remote only | Local or Remote |
| **Tools** | 2 (search, fetch) | 17+ (full suite) |
| **Response Format** | OpenAI-specific | Standard MCP |
| **Project Support** | Default only | Full multi-project |
| **Subscription** | Plus/Pro required | Free (Claude) |
| **Configuration** | Developer mode | Config file |
| **Performance** | Network latency | Local (instant) |

**Recommendation:** Use Claude Desktop for full features, ChatGPT for convenience

## See Also

- ChatGPT MCP documentation: https://platform.openai.com/docs/mcp
- `default-project-mode.md` - Required for ChatGPT tools
- `cloud-mode-usage.md` - Deploying MCP to cloud
- Standard MCP tools documentation

```

--------------------------------------------------------------------------------
/tests/sync/test_character_conflicts.py:
--------------------------------------------------------------------------------

```python
"""Test character-related sync conflicts and permalink generation."""

from pathlib import Path
from textwrap import dedent

import pytest
from sqlalchemy.exc import IntegrityError

from basic_memory.config import ProjectConfig
from basic_memory.repository import EntityRepository
from basic_memory.sync.sync_service import SyncService
from basic_memory.utils import (
    generate_permalink,
    normalize_file_path_for_comparison,
    detect_potential_file_conflicts,
)


async def create_test_file(path: Path, content: str = "test content") -> None:
    """Create a test file with given content."""
    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(content)


class TestUtilityFunctions:
    """Test utility functions for file path normalization and conflict detection."""

    def test_normalize_file_path_for_comparison(self):
        """Test file path normalization for conflict detection."""
        # Case sensitivity normalization
        assert (
            normalize_file_path_for_comparison("Finance/Investment.md") == "finance/investment.md"
        )
        assert (
            normalize_file_path_for_comparison("FINANCE/INVESTMENT.MD") == "finance/investment.md"
        )

        # Path separator normalization
        assert (
            normalize_file_path_for_comparison("Finance\\Investment.md") == "finance/investment.md"
        )

        # Multiple slash handling
        assert (
            normalize_file_path_for_comparison("Finance//Investment.md") == "finance/investment.md"
        )

    def test_detect_potential_file_conflicts(self):
        """Test the enhanced conflict detection function."""
        existing_paths = [
            "Finance/Investment.md",
            "finance/Investment.md",
            "docs/my-feature.md",
            "docs/my feature.md",
        ]

        # Case sensitivity conflict
        conflicts = detect_potential_file_conflicts("FINANCE/INVESTMENT.md", existing_paths)
        assert "Finance/Investment.md" in conflicts
        assert "finance/Investment.md" in conflicts

        # Permalink conflict (space vs hyphen)
        conflicts = detect_potential_file_conflicts("docs/my_feature.md", existing_paths)
        assert "docs/my-feature.md" in conflicts
        assert "docs/my feature.md" in conflicts


class TestPermalinkGeneration:
    """Test permalink generation with various character scenarios."""

    def test_hyphen_handling(self):
        """Test that hyphens in filenames are handled consistently."""
        # File with existing hyphens
        assert generate_permalink("docs/my-feature.md") == "docs/my-feature"
        assert generate_permalink("docs/basic-memory bug.md") == "docs/basic-memory-bug"

        # File with spaces that become hyphens
        assert generate_permalink("docs/my feature.md") == "docs/my-feature"

        # Mixed scenarios
        assert generate_permalink("docs/my-old feature.md") == "docs/my-old-feature"

    def test_forward_slash_handling(self):
        """Test that forward slashes are handled properly."""
        # Normal directory structure
        assert generate_permalink("Finance/Investment.md") == "finance/investment"

        # Path with spaces in directory names
        assert generate_permalink("My Finance/Investment.md") == "my-finance/investment"

    def test_case_sensitivity_normalization(self):
        """Test that case differences are normalized consistently."""
        # Same logical path with different cases
        assert generate_permalink("Finance/Investment.md") == "finance/investment"
        assert generate_permalink("finance/Investment.md") == "finance/investment"
        assert generate_permalink("FINANCE/INVESTMENT.md") == "finance/investment"

    def test_unicode_character_handling(self):
        """Test that international characters are handled properly."""
        # Italian characters as mentioned in user feedback
        assert (
            generate_permalink("Finance/Punti Chiave di Peter Lynch.md")
            == "finance/punti-chiave-di-peter-lynch"
        )

        # Chinese characters (should be preserved)
        assert generate_permalink("中文/测试文档.md") == "中文/测试文档"

        # Mixed international characters
        assert generate_permalink("docs/Café München.md") == "docs/cafe-munchen"

    def test_special_punctuation(self):
        """Test handling of special punctuation characters."""
        # Apostrophes should be removed
        assert generate_permalink("Peter's Guide.md") == "peters-guide"

        # Other punctuation should become hyphens
        assert generate_permalink("Q&A Session.md") == "q-a-session"


@pytest.mark.asyncio
class TestSyncConflictHandling:
    """Test sync service handling of file path and permalink conflicts."""

    async def test_file_path_conflict_detection(
        self,
        sync_service: SyncService,
        project_config: ProjectConfig,
        entity_repository: EntityRepository,
    ):
        """Test that file path conflicts are detected during move operations."""
        project_dir = project_config.home

        # Create two files
        content1 = dedent("""
        ---
        type: knowledge
        ---
        # Document One
        This is the first document.
        """)

        content2 = dedent("""
        ---
        type: knowledge
        ---
        # Document Two  
        This is the second document.
        """)

        await create_test_file(project_dir / "doc1.md", content1)
        await create_test_file(project_dir / "doc2.md", content2)

        # Initial sync
        await sync_service.sync(project_config.home)

        # Verify both entities exist
        entities = await entity_repository.find_all()
        assert len(entities) == 2

        # Now simulate a move where doc1.md tries to move to doc2.md's location
        # This should be handled gracefully, not throw an IntegrityError

        # First, get the entities
        entity1 = await entity_repository.get_by_file_path("doc1.md")
        entity2 = await entity_repository.get_by_file_path("doc2.md")

        assert entity1 is not None
        assert entity2 is not None

        # Simulate the conflict scenario
        with pytest.raises(Exception) as exc_info:
            # This should detect the conflict and handle it gracefully
            await sync_service.handle_move("doc1.md", "doc2.md")

        # The exception should be a meaningful error, not an IntegrityError
        assert not isinstance(exc_info.value, IntegrityError)

    async def test_hyphen_filename_conflict(
        self,
        sync_service: SyncService,
        project_config: ProjectConfig,
        entity_repository: EntityRepository,
    ):
        """Test conflict when filename with hyphens conflicts with generated permalink."""
        project_dir = project_config.home

        # Create file with spaces (will generate permalink with hyphens)
        content1 = dedent("""
        ---
        type: knowledge  
        ---
        # Basic Memory Bug
        This file has spaces in the name.
        """)

        # Create file with hyphens (already has hyphens in filename)
        content2 = dedent("""
        ---
        type: knowledge
        ---
        # Basic Memory Bug Report
        This file has hyphens in the name.
        """)

        await create_test_file(project_dir / "basic memory bug.md", content1)
        await create_test_file(project_dir / "basic-memory-bug.md", content2)

        # Sync should handle this without conflict
        await sync_service.sync(project_config.home)

        # Verify both entities were created with unique permalinks
        entities = await entity_repository.find_all()
        assert len(entities) == 2

        # Check that permalinks are unique
        permalinks = [entity.permalink for entity in entities if entity.permalink]
        assert len(set(permalinks)) == len(permalinks), "Permalinks should be unique"

    async def test_case_sensitivity_conflict(
        self,
        sync_service: SyncService,
        project_config: ProjectConfig,
        entity_repository: EntityRepository,
    ):
        """Test conflict handling when case differences cause issues."""
        import platform

        project_dir = project_config.home

        # Create directory structure that might cause case conflicts
        (project_dir / "Finance").mkdir(parents=True, exist_ok=True)
        (project_dir / "finance").mkdir(parents=True, exist_ok=True)

        content1 = dedent("""
        ---
        type: knowledge
        ---
        # Investment Guide
        Upper case directory.
        """)

        content2 = dedent("""
        ---
        type: knowledge
        ---
        # Investment Tips
        Lower case directory.
        """)

        await create_test_file(project_dir / "Finance" / "investment.md", content1)
        await create_test_file(project_dir / "finance" / "investment.md", content2)

        # Sync should handle case differences properly
        await sync_service.sync(project_config.home)

        # Verify entities were created
        entities = await entity_repository.find_all()

        # On case-insensitive file systems (macOS, Windows), only one entity will be created
        # On case-sensitive file systems (Linux), two entities will be created
        if platform.system() in ["Darwin", "Windows"]:
            # Case-insensitive file systems
            assert len(entities) >= 1
            # Only one of the paths will exist
            file_paths = [entity.file_path for entity in entities]
            assert any(
                path in ["Finance/investment.md", "finance/investment.md"] for path in file_paths
            )
        else:
            # Case-sensitive file systems (Linux)
            assert len(entities) >= 2
            # Check that file paths are preserved correctly
            file_paths = [entity.file_path for entity in entities]
            assert "Finance/investment.md" in file_paths
            assert "finance/investment.md" in file_paths

    async def test_move_conflict_resolution(
        self,
        sync_service: SyncService,
        project_config: ProjectConfig,
        entity_repository: EntityRepository,
    ):
        """Test that move conflicts are resolved with proper error handling."""
        project_dir = project_config.home

        # Create three files in a scenario that could cause move conflicts
        await create_test_file(project_dir / "file-a.md", "# File A")
        await create_test_file(project_dir / "file-b.md", "# File B")
        await create_test_file(project_dir / "temp.md", "# Temp File")

        # Initial sync
        await sync_service.sync(project_config.home)

        # Simulate a complex move scenario where files swap locations
        # This is the kind of scenario that caused the original bug

        # Get the entities
        entity_a = await entity_repository.get_by_file_path("file-a.md")
        entity_b = await entity_repository.get_by_file_path("file-b.md")
        entity_temp = await entity_repository.get_by_file_path("temp.md")

        assert all([entity_a, entity_b, entity_temp])

        # Try to move file-a to file-b's location (should detect conflict)
        try:
            await sync_service.handle_move("file-a.md", "file-b.md")
            # If this doesn't raise an exception, the conflict was resolved

            # Verify the state is consistent
            updated_entities = await entity_repository.find_all()
            file_paths = [entity.file_path for entity in updated_entities]

            # Should not have duplicate file paths
            assert len(file_paths) == len(set(file_paths)), "File paths should be unique"

        except Exception as e:
            # If an exception is raised, it should be a meaningful error
            assert "conflict" in str(e).lower() or "already exists" in str(e).lower()
            assert not isinstance(e, IntegrityError), "Should not be a raw IntegrityError"


@pytest.mark.asyncio
class TestEnhancedErrorMessages:
    """Test that error messages provide helpful guidance for character conflicts."""

    async def test_helpful_error_for_hyphen_conflict(
        self,
        sync_service: SyncService,
        project_config: ProjectConfig,
    ):
        """Test that hyphen conflicts generate helpful error messages."""
        # This test will be implemented after we enhance the error handling
        pass

    async def test_helpful_error_for_case_conflict(
        self,
        sync_service: SyncService,
        project_config: ProjectConfig,
    ):
        """Test that case sensitivity conflicts generate helpful error messages."""
        # This test will be implemented after we enhance the error handling
        pass

```

--------------------------------------------------------------------------------
/test-int/test_sync_performance_benchmark.py:
--------------------------------------------------------------------------------

```python
"""
Performance benchmark tests for sync operations.

These tests measure baseline performance for indexing operations to track
improvements from optimizations. Tests are marked with @pytest.mark.benchmark
and can be run separately.

Usage:
    # Run all benchmarks
    pytest test-int/test_sync_performance_benchmark.py -v

    # Run specific benchmark
    pytest test-int/test_sync_performance_benchmark.py::test_benchmark_sync_100_files -v
"""

import asyncio
import time
from pathlib import Path
from textwrap import dedent

import pytest

from basic_memory.config import BasicMemoryConfig, ProjectConfig
from basic_memory.sync.sync_service import get_sync_service


async def create_benchmark_file(path: Path, file_num: int, total_files: int) -> None:
    """Create a realistic test markdown file with observations and relations.

    Args:
        path: Path to create the file at
        file_num: Current file number (for unique content)
        total_files: Total number of files being created (for relation targets)
    """
    # Create realistic content with varying complexity
    has_relations = file_num < (total_files - 1)  # Most files have relations
    num_observations = min(3 + (file_num % 5), 10)  # 3-10 observations per file

    # Generate relation targets (some will be forward references)
    relations = []
    if has_relations:
        # Reference 1-3 other files
        num_relations = min(1 + (file_num % 3), 3)
        for i in range(num_relations):
            target_num = (file_num + i + 1) % total_files
            relations.append(f"- relates_to [[test-file-{target_num:04d}]]")

    content = dedent(f"""
        ---
        type: note
        tags: [benchmark, test, category-{file_num % 10}]
        ---
        # Test File {file_num:04d}

        This is benchmark test file {file_num} of {total_files}.
        It contains realistic markdown content to simulate actual usage.

        ## Observations
        {chr(10).join([f"- [category-{i % 5}] Observation {i} for file {file_num} with some content #tag{i}" for i in range(num_observations)])}

        ## Relations
        {chr(10).join(relations) if relations else "- No relations for this file"}

        ## Additional Content

        This section contains additional prose to simulate real documents.
        Lorem ipsum dolor sit amet, consectetur adipiscing elit. Sed do eiusmod
        tempor incididunt ut labore et dolore magna aliqua.

        ### Subsection

        More content here to make the file realistic. This helps test the
        full indexing pipeline including content extraction and search indexing.
    """).strip()

    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(content, encoding="utf-8")


async def generate_benchmark_files(project_dir: Path, num_files: int) -> None:
    """Generate benchmark test files.

    Args:
        project_dir: Directory to create files in
        num_files: Number of files to generate
    """
    print(f"\nGenerating {num_files} test files...")
    start = time.time()

    # Create files in batches for faster generation
    batch_size = 100
    for batch_start in range(0, num_files, batch_size):
        batch_end = min(batch_start + batch_size, num_files)
        tasks = [
            create_benchmark_file(
                project_dir / f"category-{i % 10}" / f"test-file-{i:04d}.md", i, num_files
            )
            for i in range(batch_start, batch_end)
        ]
        await asyncio.gather(*tasks)
        print(f"  Created files {batch_start}-{batch_end} ({batch_end}/{num_files})")

    duration = time.time() - start
    print(f"  File generation completed in {duration:.2f}s ({num_files / duration:.1f} files/sec)")


def get_db_size(db_path: Path) -> tuple[int, str]:
    """Get database file size.

    Returns:
        Tuple of (size_bytes, formatted_size)
    """
    if not db_path.exists():
        return 0, "0 B"

    size_bytes = db_path.stat().st_size

    # Format size
    for unit in ["B", "KB", "MB", "GB"]:
        if size_bytes < 1024.0:
            return size_bytes, f"{size_bytes:.2f} {unit}"
        size_bytes /= 1024.0

    return int(size_bytes * 1024**4), f"{size_bytes:.2f} TB"


async def run_sync_benchmark(
    project_config: ProjectConfig, app_config: BasicMemoryConfig, num_files: int, test_name: str
) -> dict:
    """Run a sync benchmark and collect metrics.

    Args:
        project_config: Project configuration
        app_config: App configuration
        num_files: Number of files to benchmark
        test_name: Name of the test for reporting

    Returns:
        Dictionary with benchmark results
    """
    project_dir = project_config.home
    db_path = app_config.database_path

    print(f"\n{'=' * 70}")
    print(f"BENCHMARK: {test_name}")
    print(f"{'=' * 70}")

    # Generate test files
    await generate_benchmark_files(project_dir, num_files)

    # Get initial DB size
    initial_db_size, initial_db_formatted = get_db_size(db_path)
    print(f"\nInitial database size: {initial_db_formatted}")

    # Create sync service
    from basic_memory.repository import ProjectRepository
    from basic_memory import db

    _, session_maker = await db.get_or_create_db(
        db_path=app_config.database_path,
        db_type=db.DatabaseType.FILESYSTEM,
    )
    project_repository = ProjectRepository(session_maker)

    # Get or create project
    projects = await project_repository.find_all()
    if projects:
        project = projects[0]
    else:
        project = await project_repository.create(
            {
                "name": project_config.name,
                "path": str(project_config.home),
                "is_active": True,
                "is_default": True,
            }
        )

    sync_service = await get_sync_service(project)

    # Initialize search index (required for FTS5 table)
    await sync_service.search_service.init_search_index()

    # Run sync and measure time
    print(f"\nStarting sync of {num_files} files...")
    sync_start = time.time()

    report = await sync_service.sync(project_dir, project_name=project.name)

    sync_duration = time.time() - sync_start

    # Get final DB size
    final_db_size, final_db_formatted = get_db_size(db_path)
    db_growth = final_db_size - initial_db_size
    db_growth_formatted = f"{db_growth / 1024 / 1024:.2f} MB"

    # Calculate metrics
    files_per_sec = num_files / sync_duration if sync_duration > 0 else 0
    ms_per_file = (sync_duration * 1000) / num_files if num_files > 0 else 0

    # Print results
    print(f"\n{'-' * 70}")
    print("RESULTS:")
    print(f"{'-' * 70}")
    print(f"Files processed:      {num_files}")
    print(f"  New:                {len(report.new)}")
    print(f"  Modified:           {len(report.modified)}")
    print(f"  Deleted:            {len(report.deleted)}")
    print(f"  Moved:              {len(report.moves)}")
    print("\nPerformance:")
    print(f"  Total time:         {sync_duration:.2f}s")
    print(f"  Files/sec:          {files_per_sec:.1f}")
    print(f"  ms/file:            {ms_per_file:.1f}")
    print("\nDatabase:")
    print(f"  Initial size:       {initial_db_formatted}")
    print(f"  Final size:         {final_db_formatted}")
    print(f"  Growth:             {db_growth_formatted}")
    print(f"  Growth per file:    {(db_growth / num_files / 1024):.2f} KB")
    print(f"{'=' * 70}\n")

    return {
        "test_name": test_name,
        "num_files": num_files,
        "sync_duration_sec": sync_duration,
        "files_per_sec": files_per_sec,
        "ms_per_file": ms_per_file,
        "new_files": len(report.new),
        "modified_files": len(report.modified),
        "deleted_files": len(report.deleted),
        "moved_files": len(report.moves),
        "initial_db_size": initial_db_size,
        "final_db_size": final_db_size,
        "db_growth_bytes": db_growth,
        "db_growth_per_file_bytes": db_growth / num_files if num_files > 0 else 0,
    }


@pytest.mark.benchmark
@pytest.mark.asyncio
async def test_benchmark_sync_100_files(app_config, project_config, config_manager):
    """Benchmark: Sync 100 files (small repository)."""
    results = await run_sync_benchmark(
        project_config, app_config, num_files=100, test_name="Sync 100 files (small repository)"
    )

    # Basic assertions to ensure sync worked
    # Note: May be slightly more than 100 due to OS-generated files (.DS_Store, etc.)
    assert results["new_files"] >= 100
    assert results["sync_duration_sec"] > 0
    assert results["files_per_sec"] > 0


@pytest.mark.benchmark
@pytest.mark.asyncio
async def test_benchmark_sync_500_files(app_config, project_config, config_manager):
    """Benchmark: Sync 500 files (medium repository)."""
    results = await run_sync_benchmark(
        project_config, app_config, num_files=500, test_name="Sync 500 files (medium repository)"
    )

    # Basic assertions
    # Note: May be slightly more than 500 due to OS-generated files
    assert results["new_files"] >= 500
    assert results["sync_duration_sec"] > 0
    assert results["files_per_sec"] > 0


@pytest.mark.benchmark
@pytest.mark.asyncio
@pytest.mark.slow
async def test_benchmark_sync_1000_files(app_config, project_config, config_manager):
    """Benchmark: Sync 1000 files (large repository).

    This test is marked as 'slow' and can be skipped in regular test runs:
        pytest -m "not slow"
    """
    results = await run_sync_benchmark(
        project_config, app_config, num_files=1000, test_name="Sync 1000 files (large repository)"
    )

    # Basic assertions
    # Note: May be slightly more than 1000 due to OS-generated files
    assert results["new_files"] >= 1000
    assert results["sync_duration_sec"] > 0
    assert results["files_per_sec"] > 0


@pytest.mark.benchmark
@pytest.mark.asyncio
async def test_benchmark_resync_no_changes(app_config, project_config, config_manager):
    """Benchmark: Re-sync with no changes (should be fast).

    This tests the performance of scanning files when nothing has changed,
    which is important for cloud restarts.
    """
    project_dir = project_config.home
    num_files = 100

    # First sync
    print(f"\nFirst sync of {num_files} files...")
    await generate_benchmark_files(project_dir, num_files)

    from basic_memory.repository import ProjectRepository
    from basic_memory import db

    _, session_maker = await db.get_or_create_db(
        db_path=app_config.database_path,
        db_type=db.DatabaseType.FILESYSTEM,
    )
    project_repository = ProjectRepository(session_maker)
    projects = await project_repository.find_all()
    if projects:
        project = projects[0]
    else:
        project = await project_repository.create(
            {
                "name": project_config.name,
                "path": str(project_config.home),
                "is_active": True,
                "is_default": True,
            }
        )

    sync_service = await get_sync_service(project)

    # Initialize search index
    await sync_service.search_service.init_search_index()

    await sync_service.sync(project_dir, project_name=project.name)

    # Second sync (no changes)
    print("\nRe-sync with no changes...")
    resync_start = time.time()
    report = await sync_service.sync(project_dir, project_name=project.name)
    resync_duration = time.time() - resync_start

    print(f"\n{'-' * 70}")
    print("RE-SYNC RESULTS (no changes):")
    print(f"{'-' * 70}")
    print(f"Files scanned:        {num_files}")
    print(f"Changes detected:     {report.total}")
    print(f"  New:                {len(report.new)}")
    print(f"  Modified:           {len(report.modified)}")
    print(f"  Deleted:            {len(report.deleted)}")
    print(f"  Moved:              {len(report.moves)}")
    print(f"Duration:             {resync_duration:.2f}s")
    print(f"Files/sec:            {num_files / resync_duration:.1f}")

    # Debug: Show what changed
    if report.total > 0:
        print("\n⚠️  UNEXPECTED CHANGES DETECTED:")
        if report.new:
            print(f"  New files ({len(report.new)}): {list(report.new)[:5]}")
        if report.modified:
            print(f"  Modified files ({len(report.modified)}): {list(report.modified)[:5]}")
        if report.deleted:
            print(f"  Deleted files ({len(report.deleted)}): {list(report.deleted)[:5]}")
        if report.moves:
            print(f"  Moved files ({len(report.moves)}): {dict(list(report.moves.items())[:5])}")

    print(f"{'=' * 70}\n")

    # Should be no changes
    assert report.total == 0, (
        f"Expected no changes but got {report.total}: new={len(report.new)}, modified={len(report.modified)}, deleted={len(report.deleted)}, moves={len(report.moves)}"
    )
    assert len(report.new) == 0
    assert len(report.modified) == 0
    assert len(report.deleted) == 0

```

--------------------------------------------------------------------------------
/tests/services/test_context_service.py:
--------------------------------------------------------------------------------

```python
"""Tests for context service."""

from datetime import datetime, timedelta, UTC

import pytest
import pytest_asyncio

from basic_memory.repository.search_repository import SearchIndexRow
from basic_memory.schemas.memory import memory_url, memory_url_path
from basic_memory.schemas.search import SearchItemType
from basic_memory.services.context_service import ContextService
from basic_memory.models.knowledge import Entity, Relation
from basic_memory.models.project import Project


@pytest_asyncio.fixture
async def context_service(search_repository, entity_repository, observation_repository):
    """Create context service for testing."""
    return ContextService(search_repository, entity_repository, observation_repository)


@pytest.mark.asyncio
async def test_find_connected_depth_limit(context_service, test_graph):
    """Test depth limiting works.
    Our traversal path is:
    - Depth 0: Root
    - Depth 1: Relations + directly connected entities (Connected1, Connected2)
    - Depth 2: Relations + next level entities (Deep)
    """
    type_id_pairs = [("entity", test_graph["root"].id)]

    # With depth=1, we get direct connections
    # shallow_results = await context_service.find_related(type_id_pairs, max_depth=1)
    # shallow_entities = {(r.id, r.type) for r in shallow_results if r.type == "entity"}
    #
    # assert (test_graph["deep"].id, "entity") not in shallow_entities

    # search deeper
    deep_results = await context_service.find_related(type_id_pairs, max_depth=3, max_results=100)
    deep_entities = {(r.id, r.type) for r in deep_results if r.type == "entity"}
    print(deep_entities)
    # Should now include Deep entity
    assert (test_graph["deep"].id, "entity") in deep_entities


@pytest.mark.asyncio
async def test_find_connected_timeframe(
    context_service, test_graph, search_repository, entity_repository
):
    """Test timeframe filtering.
    This tests how traversal is affected by the item dates.
    When we filter by date, items are only included if:
    1. They match the timeframe
    2. There is a valid path to them through other items in the timeframe
    """
    now = datetime.now(UTC)
    old_date = now - timedelta(days=10)
    recent_date = now - timedelta(days=1)

    # Update entity table timestamps directly
    # Root entity uses old date
    root_entity = test_graph["root"]
    await entity_repository.update(root_entity.id, {"created_at": old_date, "updated_at": old_date})

    # Connected entity uses recent date
    connected_entity = test_graph["connected1"]
    await entity_repository.update(
        connected_entity.id, {"created_at": recent_date, "updated_at": recent_date}
    )

    # Also update search_index for test consistency
    await search_repository.index_item(
        SearchIndexRow(
            project_id=entity_repository.project_id,
            id=test_graph["root"].id,
            title=test_graph["root"].title,
            content_snippet="Root content",
            permalink=test_graph["root"].permalink,
            file_path=test_graph["root"].file_path,
            type=SearchItemType.ENTITY,
            metadata={"created_at": old_date.isoformat()},
            created_at=old_date.isoformat(),
            updated_at=old_date.isoformat(),
        )
    )
    await search_repository.index_item(
        SearchIndexRow(
            project_id=entity_repository.project_id,
            id=test_graph["relations"][0].id,
            title="Root Entity → Connected Entity 1",
            content_snippet="",
            permalink=f"{test_graph['root'].permalink}/connects_to/{test_graph['connected1'].permalink}",
            file_path=test_graph["root"].file_path,
            type=SearchItemType.RELATION,
            from_id=test_graph["root"].id,
            to_id=test_graph["connected1"].id,
            relation_type="connects_to",
            metadata={"created_at": old_date.isoformat()},
            created_at=old_date.isoformat(),
            updated_at=old_date.isoformat(),
        )
    )
    await search_repository.index_item(
        SearchIndexRow(
            project_id=entity_repository.project_id,
            id=test_graph["connected1"].id,
            title=test_graph["connected1"].title,
            content_snippet="Connected 1 content",
            permalink=test_graph["connected1"].permalink,
            file_path=test_graph["connected1"].file_path,
            type=SearchItemType.ENTITY,
            metadata={"created_at": recent_date.isoformat()},
            created_at=recent_date.isoformat(),
            updated_at=recent_date.isoformat(),
        )
    )

    type_id_pairs = [("entity", test_graph["root"].id)]

    # Search with a 7-day cutoff
    since_date = now - timedelta(days=7)
    results = await context_service.find_related(type_id_pairs, since=since_date)

    # Only connected1 is recent, but we can't get to it
    # because its connecting relation is too old and is filtered out
    # (we can only reach connected1 through a relation starting from root)
    entity_ids = {r.id for r in results if r.type == "entity"}
    assert len(entity_ids) == 0  # No accessible entities within timeframe


@pytest.mark.asyncio
async def test_build_context(context_service, test_graph):
    """Test exact permalink lookup."""
    url = memory_url.validate_strings("memory://test/root")
    context_result = await context_service.build_context(url)

    # Check metadata
    assert context_result.metadata.uri == memory_url_path(url)
    assert context_result.metadata.depth == 1
    assert context_result.metadata.primary_count == 1
    assert context_result.metadata.related_count > 0
    assert context_result.metadata.generated_at is not None

    # Check results
    assert len(context_result.results) == 1
    context_item = context_result.results[0]

    # Check primary result
    primary_result = context_item.primary_result
    assert primary_result.id == test_graph["root"].id
    assert primary_result.type == "entity"
    assert primary_result.title == "Root"
    assert primary_result.permalink == "test/root"
    assert primary_result.file_path == "test/Root.md"
    assert primary_result.created_at is not None

    # Check related results
    assert len(context_item.related_results) > 0

    # Find related relation
    relation = next((r for r in context_item.related_results if r.type == "relation"), None)
    assert relation is not None
    assert relation.relation_type == "connects_to"
    assert relation.from_id == test_graph["root"].id
    assert relation.to_id == test_graph["connected1"].id

    # Find related entity
    related_entity = next((r for r in context_item.related_results if r.type == "entity"), None)
    assert related_entity is not None
    assert related_entity.id == test_graph["connected1"].id
    assert related_entity.title == test_graph["connected1"].title
    assert related_entity.permalink == test_graph["connected1"].permalink


@pytest.mark.asyncio
async def test_build_context_with_observations(context_service, test_graph):
    """Test context building with observations."""
    # The test_graph fixture already creates observations for root entity
    # Let's use those existing observations

    # Build context
    url = memory_url.validate_strings("memory://test/root")
    context_result = await context_service.build_context(url, include_observations=True)

    # Check the metadata
    assert context_result.metadata.total_observations > 0
    assert len(context_result.results) == 1

    # Check that observations were included
    context_item = context_result.results[0]
    assert len(context_item.observations) > 0

    # Check observation properties
    for observation in context_item.observations:
        assert observation.type == "observation"
        assert observation.category in ["note", "tech"]  # Categories from test_graph fixture
        assert observation.entity_id == test_graph["root"].id

    # Verify at least one observation has the correct category and content
    note_observation = next((o for o in context_item.observations if o.category == "note"), None)
    assert note_observation is not None
    assert "Root note" in note_observation.content


@pytest.mark.asyncio
async def test_build_context_not_found(context_service):
    """Test handling non-existent permalinks."""
    context = await context_service.build_context("memory://does/not/exist")
    assert len(context.results) == 0
    assert context.metadata.primary_count == 0
    assert context.metadata.related_count == 0


@pytest.mark.asyncio
async def test_context_metadata(context_service, test_graph):
    """Test metadata is correctly populated."""
    context = await context_service.build_context("memory://test/root", depth=2)
    metadata = context.metadata
    assert metadata.uri == "test/root"
    assert metadata.depth == 2
    assert metadata.generated_at is not None
    assert metadata.primary_count > 0


@pytest.mark.asyncio
async def test_project_isolation_in_find_related(session_maker):
    """Test that find_related respects project boundaries and doesn't leak data."""
    from basic_memory.repository.entity_repository import EntityRepository
    from basic_memory.repository.observation_repository import ObservationRepository
    from basic_memory.repository.search_repository import SearchRepository
    from basic_memory import db

    # Create database session
    async with db.scoped_session(session_maker) as db_session:
        # Create two separate projects
        project1 = Project(name="project1", path="/test1")
        project2 = Project(name="project2", path="/test2")
        db_session.add(project1)
        db_session.add(project2)
        await db_session.flush()

        # Create entities in project1
        entity1_p1 = Entity(
            title="Entity1_P1",
            entity_type="document",
            content_type="text/markdown",
            project_id=project1.id,
            permalink="project1/entity1",
            file_path="project1/entity1.md",
            created_at=datetime.now(UTC),
            updated_at=datetime.now(UTC),
        )
        entity2_p1 = Entity(
            title="Entity2_P1",
            entity_type="document",
            content_type="text/markdown",
            project_id=project1.id,
            permalink="project1/entity2",
            file_path="project1/entity2.md",
            created_at=datetime.now(UTC),
            updated_at=datetime.now(UTC),
        )

        # Create entities in project2
        entity1_p2 = Entity(
            title="Entity1_P2",
            entity_type="document",
            content_type="text/markdown",
            project_id=project2.id,
            permalink="project2/entity1",
            file_path="project2/entity1.md",
            created_at=datetime.now(UTC),
            updated_at=datetime.now(UTC),
        )

        db_session.add_all([entity1_p1, entity2_p1, entity1_p2])
        await db_session.flush()

        # Create relation in project1 (between entities of project1)
        relation_p1 = Relation(
            from_id=entity1_p1.id,
            to_id=entity2_p1.id,
            to_name="Entity2_P1",
            relation_type="connects_to",
        )
        db_session.add(relation_p1)
        await db_session.commit()

        # Create repositories for project1
        search_repo_p1 = SearchRepository(session_maker, project1.id)
        entity_repo_p1 = EntityRepository(session_maker, project1.id)
        obs_repo_p1 = ObservationRepository(session_maker, project1.id)
        context_service_p1 = ContextService(search_repo_p1, entity_repo_p1, obs_repo_p1)

        # Create repositories for project2
        search_repo_p2 = SearchRepository(session_maker, project2.id)
        entity_repo_p2 = EntityRepository(session_maker, project2.id)
        obs_repo_p2 = ObservationRepository(session_maker, project2.id)
        context_service_p2 = ContextService(search_repo_p2, entity_repo_p2, obs_repo_p2)

        # Test: find_related for project1 should only return project1 entities
        type_id_pairs_p1 = [("entity", entity1_p1.id)]
        related_p1 = await context_service_p1.find_related(type_id_pairs_p1, max_depth=2)

        # Verify only project1 entities are returned
        related_entity_ids = [r.id for r in related_p1 if r.type == "entity"]
        assert entity2_p1.id in related_entity_ids  # Should find connected entity2 in project1
        assert entity1_p2.id not in related_entity_ids  # Should NOT find entity from project2

        # Test: find_related for project2 should return empty (no relations)
        type_id_pairs_p2 = [("entity", entity1_p2.id)]
        related_p2 = await context_service_p2.find_related(type_id_pairs_p2, max_depth=2)

        # Project2 has no relations, so should return empty
        assert len(related_p2) == 0

        # Double-check: verify entities exist in their respective projects
        assert entity1_p1.project_id == project1.id
        assert entity2_p1.project_id == project1.id
        assert entity1_p2.project_id == project2.id

```

--------------------------------------------------------------------------------
/src/basic_memory/repository/entity_repository.py:
--------------------------------------------------------------------------------

```python
"""Repository for managing entities in the knowledge graph."""

from pathlib import Path
from typing import List, Optional, Sequence, Union

from loguru import logger
from sqlalchemy import select
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from sqlalchemy.orm import selectinload
from sqlalchemy.orm.interfaces import LoaderOption

from basic_memory import db
from basic_memory.models.knowledge import Entity, Observation, Relation
from basic_memory.repository.repository import Repository


class EntityRepository(Repository[Entity]):
    """Repository for Entity model.

    Note: All file paths are stored as strings in the database. Convert Path objects
    to strings before passing to repository methods.
    """

    def __init__(self, session_maker: async_sessionmaker[AsyncSession], project_id: int):
        """Initialize with session maker and project_id filter.

        Args:
            session_maker: SQLAlchemy session maker
            project_id: Project ID to filter all operations by
        """
        super().__init__(session_maker, Entity, project_id=project_id)

    async def get_by_permalink(self, permalink: str) -> Optional[Entity]:
        """Get entity by permalink.

        Args:
            permalink: Unique identifier for the entity
        """
        query = self.select().where(Entity.permalink == permalink).options(*self.get_load_options())
        return await self.find_one(query)

    async def get_by_title(self, title: str) -> Sequence[Entity]:
        """Get entity by title.

        Args:
            title: Title of the entity to find
        """
        query = self.select().where(Entity.title == title).options(*self.get_load_options())
        result = await self.execute_query(query)
        return list(result.scalars().all())

    async def get_by_file_path(self, file_path: Union[Path, str]) -> Optional[Entity]:
        """Get entity by file_path.

        Args:
            file_path: Path to the entity file (will be converted to string internally)
        """
        query = (
            self.select()
            .where(Entity.file_path == Path(file_path).as_posix())
            .options(*self.get_load_options())
        )
        return await self.find_one(query)

    async def find_by_checksum(self, checksum: str) -> Sequence[Entity]:
        """Find entities with the given checksum.

        Used for move detection - finds entities that may have been moved to a new path.
        Multiple entities may have the same checksum if files were copied.

        Args:
            checksum: File content checksum to search for

        Returns:
            Sequence of entities with matching checksum (may be empty)
        """
        query = self.select().where(Entity.checksum == checksum)
        # Don't load relationships for move detection - we only need file_path and checksum
        result = await self.execute_query(query, use_query_options=False)
        return list(result.scalars().all())

    async def delete_by_file_path(self, file_path: Union[Path, str]) -> bool:
        """Delete entity with the provided file_path.

        Args:
            file_path: Path to the entity file (will be converted to string internally)
        """
        return await self.delete_by_fields(file_path=Path(file_path).as_posix())

    def get_load_options(self) -> List[LoaderOption]:
        """Get SQLAlchemy loader options for eager loading relationships."""
        return [
            selectinload(Entity.observations).selectinload(Observation.entity),
            # Load from_relations and both entities for each relation
            selectinload(Entity.outgoing_relations).selectinload(Relation.from_entity),
            selectinload(Entity.outgoing_relations).selectinload(Relation.to_entity),
            # Load to_relations and both entities for each relation
            selectinload(Entity.incoming_relations).selectinload(Relation.from_entity),
            selectinload(Entity.incoming_relations).selectinload(Relation.to_entity),
        ]

    async def find_by_permalinks(self, permalinks: List[str]) -> Sequence[Entity]:
        """Find multiple entities by their permalink.

        Args:
            permalinks: List of permalink strings to find
        """
        # Handle empty input explicitly
        if not permalinks:
            return []

        # Use existing select pattern
        query = (
            self.select().options(*self.get_load_options()).where(Entity.permalink.in_(permalinks))
        )

        result = await self.execute_query(query)
        return list(result.scalars().all())

    async def upsert_entity(self, entity: Entity) -> Entity:
        """Insert or update entity using simple try/catch with database-level conflict resolution.

        Handles file_path race conditions by checking for existing entity on IntegrityError.
        For permalink conflicts, generates a unique permalink with numeric suffix.

        Args:
            entity: The entity to insert or update

        Returns:
            The inserted or updated entity
        """
        async with db.scoped_session(self.session_maker) as session:
            # Set project_id if applicable and not already set
            self._set_project_id_if_needed(entity)

            # Try simple insert first
            try:
                session.add(entity)
                await session.flush()

                # Return with relationships loaded
                query = (
                    self.select()
                    .where(Entity.file_path == entity.file_path)
                    .options(*self.get_load_options())
                )
                result = await session.execute(query)
                found = result.scalar_one_or_none()
                if not found:  # pragma: no cover
                    raise RuntimeError(
                        f"Failed to retrieve entity after insert: {entity.file_path}"
                    )
                return found

            except IntegrityError as e:
                # Check if this is a FOREIGN KEY constraint failure
                error_str = str(e)
                if "FOREIGN KEY constraint failed" in error_str:
                    # Import locally to avoid circular dependency (repository -> services -> repository)
                    from basic_memory.services.exceptions import SyncFatalError

                    # Project doesn't exist in database - this is a fatal sync error
                    raise SyncFatalError(
                        f"Cannot sync file '{entity.file_path}': "
                        f"project_id={entity.project_id} does not exist in database. "
                        f"The project may have been deleted. This sync will be terminated."
                    ) from e

                await session.rollback()

                # Re-query after rollback to get a fresh, attached entity
                existing_result = await session.execute(
                    select(Entity)
                    .where(
                        Entity.file_path == entity.file_path, Entity.project_id == entity.project_id
                    )
                    .options(*self.get_load_options())
                )
                existing_entity = existing_result.scalar_one_or_none()

                if existing_entity:
                    # File path conflict - update the existing entity
                    logger.debug(
                        f"Resolving file_path conflict for {entity.file_path}, "
                        f"entity_id={existing_entity.id}, observations={len(entity.observations)}"
                    )
                    # Use merge to avoid session state conflicts
                    # Set the ID to update existing entity
                    entity.id = existing_entity.id

                    # Ensure observations reference the correct entity_id
                    for obs in entity.observations:
                        obs.entity_id = existing_entity.id
                        # Clear any existing ID to force INSERT as new observation
                        obs.id = None

                    # Merge the entity which will update the existing one
                    merged_entity = await session.merge(entity)

                    await session.commit()

                    # Re-query to get proper relationships loaded
                    final_result = await session.execute(
                        select(Entity)
                        .where(Entity.id == merged_entity.id)
                        .options(*self.get_load_options())
                    )
                    return final_result.scalar_one()

                else:
                    # No file_path conflict - must be permalink conflict
                    # Generate unique permalink and retry
                    entity = await self._handle_permalink_conflict(entity, session)
                    return entity

    async def get_all_file_paths(self) -> List[str]:
        """Get all file paths for this project - optimized for deletion detection.

        Returns only file_path strings without loading entities or relationships.
        Used by streaming sync to detect deleted files efficiently.

        Returns:
            List of file_path strings for all entities in the project
        """
        query = select(Entity.file_path)
        query = self._add_project_filter(query)

        result = await self.execute_query(query, use_query_options=False)
        return list(result.scalars().all())

    async def get_distinct_directories(self) -> List[str]:
        """Extract unique directory paths from file_path column.

        Optimized method for getting directory structure without loading full entities
        or relationships. Returns a sorted list of unique directory paths.

        Returns:
            List of unique directory paths (e.g., ["notes", "notes/meetings", "specs"])
        """
        # Query only file_path column, no entity objects or relationships
        query = select(Entity.file_path).distinct()
        query = self._add_project_filter(query)

        # Execute with use_query_options=False to skip eager loading
        result = await self.execute_query(query, use_query_options=False)
        file_paths = [row for row in result.scalars().all()]

        # Parse file paths to extract unique directories
        directories = set()
        for file_path in file_paths:
            parts = [p for p in file_path.split("/") if p]
            # Add all parent directories (exclude filename which is the last part)
            for i in range(len(parts) - 1):
                dir_path = "/".join(parts[: i + 1])
                directories.add(dir_path)

        return sorted(directories)

    async def find_by_directory_prefix(self, directory_prefix: str) -> Sequence[Entity]:
        """Find entities whose file_path starts with the given directory prefix.

        Optimized method for listing directory contents without loading all entities.
        Uses SQL LIKE pattern matching to filter entities by directory path.

        Args:
            directory_prefix: Directory path prefix (e.g., "docs", "docs/guides")
                             Empty string returns all entities (root directory)

        Returns:
            Sequence of entities in the specified directory and subdirectories
        """
        # Build SQL LIKE pattern
        if directory_prefix == "" or directory_prefix == "/":
            # Root directory - return all entities
            return await self.find_all()

        # Remove leading/trailing slashes for consistency
        directory_prefix = directory_prefix.strip("/")

        # Query entities with file_path starting with prefix
        # Pattern matches "prefix/" to ensure we get files IN the directory,
        # not just files whose names start with the prefix
        pattern = f"{directory_prefix}/%"

        query = self.select().where(Entity.file_path.like(pattern))

        # Skip eager loading - we only need basic entity fields for directory trees
        result = await self.execute_query(query, use_query_options=False)
        return list(result.scalars().all())

    async def _handle_permalink_conflict(self, entity: Entity, session: AsyncSession) -> Entity:
        """Handle permalink conflicts by generating a unique permalink."""
        base_permalink = entity.permalink
        suffix = 1

        # Find a unique permalink
        while True:
            test_permalink = f"{base_permalink}-{suffix}"
            existing = await session.execute(
                select(Entity).where(
                    Entity.permalink == test_permalink, Entity.project_id == entity.project_id
                )
            )
            if existing.scalar_one_or_none() is None:
                # Found unique permalink
                entity.permalink = test_permalink
                break
            suffix += 1

        # Insert with unique permalink
        session.add(entity)
        await session.flush()
        return entity

```

--------------------------------------------------------------------------------
/src/basic_memory/deps.py:
--------------------------------------------------------------------------------

```python
"""Dependency injection functions for basic-memory services."""

from typing import Annotated
from loguru import logger

from fastapi import Depends, HTTPException, Path, status, Request
from sqlalchemy.ext.asyncio import (
    AsyncSession,
    AsyncEngine,
    async_sessionmaker,
)
import pathlib

from basic_memory import db
from basic_memory.config import ProjectConfig, BasicMemoryConfig, ConfigManager
from basic_memory.importers import (
    ChatGPTImporter,
    ClaudeConversationsImporter,
    ClaudeProjectsImporter,
    MemoryJsonImporter,
)
from basic_memory.markdown import EntityParser
from basic_memory.markdown.markdown_processor import MarkdownProcessor
from basic_memory.repository.entity_repository import EntityRepository
from basic_memory.repository.observation_repository import ObservationRepository
from basic_memory.repository.project_repository import ProjectRepository
from basic_memory.repository.relation_repository import RelationRepository
from basic_memory.repository.search_repository import SearchRepository
from basic_memory.services import EntityService, ProjectService
from basic_memory.services.context_service import ContextService
from basic_memory.services.directory_service import DirectoryService
from basic_memory.services.file_service import FileService
from basic_memory.services.link_resolver import LinkResolver
from basic_memory.services.search_service import SearchService
from basic_memory.sync import SyncService
from basic_memory.utils import generate_permalink


def get_app_config() -> BasicMemoryConfig:  # pragma: no cover
    app_config = ConfigManager().config
    return app_config


AppConfigDep = Annotated[BasicMemoryConfig, Depends(get_app_config)]  # pragma: no cover


## project


async def get_project_config(
    project: "ProjectPathDep", project_repository: "ProjectRepositoryDep"
) -> ProjectConfig:  # pragma: no cover
    """Get the current project referenced from request state.

    Args:
        request: The current request object
        project_repository: Repository for project operations

    Returns:
        The resolved project config

    Raises:
        HTTPException: If project is not found
    """
    # Convert project name to permalink for lookup
    project_permalink = generate_permalink(str(project))
    project_obj = await project_repository.get_by_permalink(project_permalink)
    if project_obj:
        return ProjectConfig(name=project_obj.name, home=pathlib.Path(project_obj.path))

    # Not found
    raise HTTPException(  # pragma: no cover
        status_code=status.HTTP_404_NOT_FOUND, detail=f"Project '{project}' not found."
    )


ProjectConfigDep = Annotated[ProjectConfig, Depends(get_project_config)]  # pragma: no cover

## sqlalchemy


async def get_engine_factory(
    request: Request,
) -> tuple[AsyncEngine, async_sessionmaker[AsyncSession]]:  # pragma: no cover
    """Get cached engine and session maker from app state.

    For API requests, returns cached connections from app.state for optimal performance.
    For non-API contexts (CLI), falls back to direct database connection.
    """
    # Try to get cached connections from app state (API context)
    if (
        hasattr(request, "app")
        and hasattr(request.app.state, "engine")
        and hasattr(request.app.state, "session_maker")
    ):
        return request.app.state.engine, request.app.state.session_maker

    # Fallback for non-API contexts (CLI)
    logger.debug("Using fallback database connection for non-API context")
    app_config = get_app_config()
    engine, session_maker = await db.get_or_create_db(app_config.database_path)
    return engine, session_maker


EngineFactoryDep = Annotated[
    tuple[AsyncEngine, async_sessionmaker[AsyncSession]], Depends(get_engine_factory)
]


async def get_session_maker(engine_factory: EngineFactoryDep) -> async_sessionmaker[AsyncSession]:
    """Get session maker."""
    _, session_maker = engine_factory
    return session_maker


SessionMakerDep = Annotated[async_sessionmaker, Depends(get_session_maker)]


## repositories


async def get_project_repository(
    session_maker: SessionMakerDep,
) -> ProjectRepository:
    """Get the project repository."""
    return ProjectRepository(session_maker)


ProjectRepositoryDep = Annotated[ProjectRepository, Depends(get_project_repository)]
ProjectPathDep = Annotated[str, Path()]  # Use Path dependency to extract from URL


async def get_project_id(
    project_repository: ProjectRepositoryDep,
    project: ProjectPathDep,
) -> int:
    """Get the current project ID from request state.

    When using sub-applications with /{project} mounting, the project value
    is stored in request.state by middleware.

    Args:
        request: The current request object
        project_repository: Repository for project operations

    Returns:
        The resolved project ID

    Raises:
        HTTPException: If project is not found
    """
    # Convert project name to permalink for lookup
    project_permalink = generate_permalink(str(project))
    project_obj = await project_repository.get_by_permalink(project_permalink)
    if project_obj:
        return project_obj.id

    # Try by name if permalink lookup fails
    project_obj = await project_repository.get_by_name(str(project))  # pragma: no cover
    if project_obj:  # pragma: no cover
        return project_obj.id

    # Not found
    raise HTTPException(  # pragma: no cover
        status_code=status.HTTP_404_NOT_FOUND, detail=f"Project '{project}' not found."
    )


"""
The project_id dependency is used in the following:
- EntityRepository
- ObservationRepository
- RelationRepository
- SearchRepository
- ProjectInfoRepository
"""
ProjectIdDep = Annotated[int, Depends(get_project_id)]


async def get_entity_repository(
    session_maker: SessionMakerDep,
    project_id: ProjectIdDep,
) -> EntityRepository:
    """Create an EntityRepository instance for the current project."""
    return EntityRepository(session_maker, project_id=project_id)


EntityRepositoryDep = Annotated[EntityRepository, Depends(get_entity_repository)]


async def get_observation_repository(
    session_maker: SessionMakerDep,
    project_id: ProjectIdDep,
) -> ObservationRepository:
    """Create an ObservationRepository instance for the current project."""
    return ObservationRepository(session_maker, project_id=project_id)


ObservationRepositoryDep = Annotated[ObservationRepository, Depends(get_observation_repository)]


async def get_relation_repository(
    session_maker: SessionMakerDep,
    project_id: ProjectIdDep,
) -> RelationRepository:
    """Create a RelationRepository instance for the current project."""
    return RelationRepository(session_maker, project_id=project_id)


RelationRepositoryDep = Annotated[RelationRepository, Depends(get_relation_repository)]


async def get_search_repository(
    session_maker: SessionMakerDep,
    project_id: ProjectIdDep,
) -> SearchRepository:
    """Create a SearchRepository instance for the current project."""
    return SearchRepository(session_maker, project_id=project_id)


SearchRepositoryDep = Annotated[SearchRepository, Depends(get_search_repository)]


# ProjectInfoRepository is deprecated and will be removed in a future version.
# Use ProjectRepository instead, which has the same functionality plus more project-specific operations.

## services


async def get_entity_parser(project_config: ProjectConfigDep) -> EntityParser:
    return EntityParser(project_config.home)


EntityParserDep = Annotated["EntityParser", Depends(get_entity_parser)]


async def get_markdown_processor(entity_parser: EntityParserDep) -> MarkdownProcessor:
    return MarkdownProcessor(entity_parser)


MarkdownProcessorDep = Annotated[MarkdownProcessor, Depends(get_markdown_processor)]


async def get_file_service(
    project_config: ProjectConfigDep, markdown_processor: MarkdownProcessorDep
) -> FileService:
    logger.debug(
        f"Creating FileService for project: {project_config.name}, base_path: {project_config.home}"
    )
    file_service = FileService(project_config.home, markdown_processor)
    logger.debug(f"Created FileService for project: {file_service} ")
    return file_service


FileServiceDep = Annotated[FileService, Depends(get_file_service)]


async def get_entity_service(
    entity_repository: EntityRepositoryDep,
    observation_repository: ObservationRepositoryDep,
    relation_repository: RelationRepositoryDep,
    entity_parser: EntityParserDep,
    file_service: FileServiceDep,
    link_resolver: "LinkResolverDep",
    app_config: AppConfigDep,
) -> EntityService:
    """Create EntityService with repository."""
    return EntityService(
        entity_repository=entity_repository,
        observation_repository=observation_repository,
        relation_repository=relation_repository,
        entity_parser=entity_parser,
        file_service=file_service,
        link_resolver=link_resolver,
        app_config=app_config,
    )


EntityServiceDep = Annotated[EntityService, Depends(get_entity_service)]


async def get_search_service(
    search_repository: SearchRepositoryDep,
    entity_repository: EntityRepositoryDep,
    file_service: FileServiceDep,
) -> SearchService:
    """Create SearchService with dependencies."""
    return SearchService(search_repository, entity_repository, file_service)


SearchServiceDep = Annotated[SearchService, Depends(get_search_service)]


async def get_link_resolver(
    entity_repository: EntityRepositoryDep, search_service: SearchServiceDep
) -> LinkResolver:
    return LinkResolver(entity_repository=entity_repository, search_service=search_service)


LinkResolverDep = Annotated[LinkResolver, Depends(get_link_resolver)]


async def get_context_service(
    search_repository: SearchRepositoryDep,
    entity_repository: EntityRepositoryDep,
    observation_repository: ObservationRepositoryDep,
) -> ContextService:
    return ContextService(
        search_repository=search_repository,
        entity_repository=entity_repository,
        observation_repository=observation_repository,
    )


ContextServiceDep = Annotated[ContextService, Depends(get_context_service)]


async def get_sync_service(
    app_config: AppConfigDep,
    entity_service: EntityServiceDep,
    entity_parser: EntityParserDep,
    entity_repository: EntityRepositoryDep,
    relation_repository: RelationRepositoryDep,
    project_repository: ProjectRepositoryDep,
    search_service: SearchServiceDep,
    file_service: FileServiceDep,
) -> SyncService:  # pragma: no cover
    """

    :rtype: object
    """
    return SyncService(
        app_config=app_config,
        entity_service=entity_service,
        entity_parser=entity_parser,
        entity_repository=entity_repository,
        relation_repository=relation_repository,
        project_repository=project_repository,
        search_service=search_service,
        file_service=file_service,
    )


SyncServiceDep = Annotated[SyncService, Depends(get_sync_service)]


async def get_project_service(
    project_repository: ProjectRepositoryDep,
) -> ProjectService:
    """Create ProjectService with repository."""
    return ProjectService(repository=project_repository)


ProjectServiceDep = Annotated[ProjectService, Depends(get_project_service)]


async def get_directory_service(
    entity_repository: EntityRepositoryDep,
) -> DirectoryService:
    """Create DirectoryService with dependencies."""
    return DirectoryService(
        entity_repository=entity_repository,
    )


DirectoryServiceDep = Annotated[DirectoryService, Depends(get_directory_service)]


# Import


async def get_chatgpt_importer(
    project_config: ProjectConfigDep, markdown_processor: MarkdownProcessorDep
) -> ChatGPTImporter:
    """Create ChatGPTImporter with dependencies."""
    return ChatGPTImporter(project_config.home, markdown_processor)


ChatGPTImporterDep = Annotated[ChatGPTImporter, Depends(get_chatgpt_importer)]


async def get_claude_conversations_importer(
    project_config: ProjectConfigDep, markdown_processor: MarkdownProcessorDep
) -> ClaudeConversationsImporter:
    """Create ChatGPTImporter with dependencies."""
    return ClaudeConversationsImporter(project_config.home, markdown_processor)


ClaudeConversationsImporterDep = Annotated[
    ClaudeConversationsImporter, Depends(get_claude_conversations_importer)
]


async def get_claude_projects_importer(
    project_config: ProjectConfigDep, markdown_processor: MarkdownProcessorDep
) -> ClaudeProjectsImporter:
    """Create ChatGPTImporter with dependencies."""
    return ClaudeProjectsImporter(project_config.home, markdown_processor)


ClaudeProjectsImporterDep = Annotated[ClaudeProjectsImporter, Depends(get_claude_projects_importer)]


async def get_memory_json_importer(
    project_config: ProjectConfigDep, markdown_processor: MarkdownProcessorDep
) -> MemoryJsonImporter:
    """Create ChatGPTImporter with dependencies."""
    return MemoryJsonImporter(project_config.home, markdown_processor)


MemoryJsonImporterDep = Annotated[MemoryJsonImporter, Depends(get_memory_json_importer)]

```

--------------------------------------------------------------------------------
/src/basic_memory/services/search_service.py:
--------------------------------------------------------------------------------

```python
"""Service for search operations."""

import ast
from datetime import datetime
from typing import List, Optional, Set

from dateparser import parse
from fastapi import BackgroundTasks
from loguru import logger
from sqlalchemy import text

from basic_memory.models import Entity
from basic_memory.repository import EntityRepository
from basic_memory.repository.search_repository import SearchRepository, SearchIndexRow
from basic_memory.schemas.search import SearchQuery, SearchItemType
from basic_memory.services import FileService


class SearchService:
    """Service for search operations.

    Supports three primary search modes:
    1. Exact permalink lookup
    2. Pattern matching with * (e.g., 'specs/*')
    3. Full-text search across title/content
    """

    def __init__(
        self,
        search_repository: SearchRepository,
        entity_repository: EntityRepository,
        file_service: FileService,
    ):
        self.repository = search_repository
        self.entity_repository = entity_repository
        self.file_service = file_service

    async def init_search_index(self):
        """Create FTS5 virtual table if it doesn't exist."""
        await self.repository.init_search_index()

    async def reindex_all(self, background_tasks: Optional[BackgroundTasks] = None) -> None:
        """Reindex all content from database."""

        logger.info("Starting full reindex")
        # Clear and recreate search index
        await self.repository.execute_query(text("DROP TABLE IF EXISTS search_index"), params={})
        await self.init_search_index()

        # Reindex all entities
        logger.debug("Indexing entities")
        entities = await self.entity_repository.find_all()
        for entity in entities:
            await self.index_entity(entity, background_tasks)

        logger.info("Reindex complete")

    async def search(self, query: SearchQuery, limit=10, offset=0) -> List[SearchIndexRow]:
        """Search across all indexed content.

        Supports three modes:
        1. Exact permalink: finds direct matches for a specific path
        2. Pattern match: handles * wildcards in paths
        3. Text search: full-text search across title/content
        """
        if query.no_criteria():
            logger.debug("no criteria passed to query")
            return []

        logger.trace(f"Searching with query: {query}")

        after_date = (
            (
                query.after_date
                if isinstance(query.after_date, datetime)
                else parse(query.after_date)
            )
            if query.after_date
            else None
        )

        # search
        results = await self.repository.search(
            search_text=query.text,
            permalink=query.permalink,
            permalink_match=query.permalink_match,
            title=query.title,
            types=query.types,
            search_item_types=query.entity_types,
            after_date=after_date,
            limit=limit,
            offset=offset,
        )

        return results

    @staticmethod
    def _generate_variants(text: str) -> Set[str]:
        """Generate text variants for better fuzzy matching.

        Creates variations of the text to improve match chances:
        - Original form
        - Lowercase form
        - Path segments (for permalinks)
        - Common word boundaries
        """
        variants = {text, text.lower()}

        # Add path segments
        if "/" in text:
            variants.update(p.strip() for p in text.split("/") if p.strip())

        # Add word boundaries
        variants.update(w.strip() for w in text.lower().split() if w.strip())

        # Trigrams disabled: They create massive search index bloat, increasing DB size significantly
        # and slowing down indexing performance. FTS5 search works well without them.
        # See: https://github.com/basicmachines-co/basic-memory/issues/351
        # variants.update(text[i : i + 3].lower() for i in range(len(text) - 2))

        return variants

    def _extract_entity_tags(self, entity: Entity) -> List[str]:
        """Extract tags from entity metadata for search indexing.

        Handles multiple tag formats:
        - List format: ["tag1", "tag2"]
        - String format: "['tag1', 'tag2']" or "[tag1, tag2]"
        - Empty: [] or "[]"

        Returns a list of tag strings for search indexing.
        """
        if not entity.entity_metadata or "tags" not in entity.entity_metadata:
            return []

        tags = entity.entity_metadata["tags"]

        # Handle list format (preferred)
        if isinstance(tags, list):
            return [str(tag) for tag in tags if tag]

        # Handle string format (legacy)
        if isinstance(tags, str):
            try:
                # Parse string representation of list
                parsed_tags = ast.literal_eval(tags)
                if isinstance(parsed_tags, list):
                    return [str(tag) for tag in parsed_tags if tag]
            except (ValueError, SyntaxError):
                # If parsing fails, treat as single tag
                return [tags] if tags.strip() else []

        return []  # pragma: no cover

    async def index_entity(
        self,
        entity: Entity,
        background_tasks: Optional[BackgroundTasks] = None,
    ) -> None:
        if background_tasks:
            background_tasks.add_task(self.index_entity_data, entity)
        else:
            await self.index_entity_data(entity)

    async def index_entity_data(
        self,
        entity: Entity,
    ) -> None:
        # delete all search index data associated with entity
        await self.repository.delete_by_entity_id(entity_id=entity.id)

        # reindex
        await self.index_entity_markdown(
            entity
        ) if entity.is_markdown else await self.index_entity_file(entity)

    async def index_entity_file(
        self,
        entity: Entity,
    ) -> None:
        # Index entity file with no content
        await self.repository.index_item(
            SearchIndexRow(
                id=entity.id,
                entity_id=entity.id,
                type=SearchItemType.ENTITY.value,
                title=entity.title,
                file_path=entity.file_path,
                metadata={
                    "entity_type": entity.entity_type,
                },
                created_at=entity.created_at,
                updated_at=entity.updated_at,
                project_id=entity.project_id,
            )
        )

    async def index_entity_markdown(
        self,
        entity: Entity,
    ) -> None:
        """Index an entity and all its observations and relations.

        Indexing structure:
        1. Entities
           - permalink: direct from entity (e.g., "specs/search")
           - file_path: physical file location
           - project_id: project context for isolation

        2. Observations
           - permalink: entity permalink + /observations/id (e.g., "specs/search/observations/123")
           - file_path: parent entity's file (where observation is defined)
           - project_id: inherited from parent entity

        3. Relations (only index outgoing relations defined in this file)
           - permalink: from_entity/relation_type/to_entity (e.g., "specs/search/implements/features/search-ui")
           - file_path: source entity's file (where relation is defined)
           - project_id: inherited from source entity

        Each type gets its own row in the search index with appropriate metadata.
        The project_id is automatically added by the repository when indexing.
        """

        # Collect all search index rows to batch insert at the end
        rows_to_index = []

        content_stems = []
        content_snippet = ""
        title_variants = self._generate_variants(entity.title)
        content_stems.extend(title_variants)

        content = await self.file_service.read_entity_content(entity)
        if content:
            content_stems.append(content)
            content_snippet = f"{content[:250]}"

        if entity.permalink:
            content_stems.extend(self._generate_variants(entity.permalink))

        content_stems.extend(self._generate_variants(entity.file_path))

        # Add entity tags from frontmatter to search content
        entity_tags = self._extract_entity_tags(entity)
        if entity_tags:
            content_stems.extend(entity_tags)

        entity_content_stems = "\n".join(p for p in content_stems if p and p.strip())

        # Add entity row
        rows_to_index.append(
            SearchIndexRow(
                id=entity.id,
                type=SearchItemType.ENTITY.value,
                title=entity.title,
                content_stems=entity_content_stems,
                content_snippet=content_snippet,
                permalink=entity.permalink,
                file_path=entity.file_path,
                entity_id=entity.id,
                metadata={
                    "entity_type": entity.entity_type,
                },
                created_at=entity.created_at,
                updated_at=entity.updated_at,
                project_id=entity.project_id,
            )
        )

        # Add observation rows
        for obs in entity.observations:
            # Index with parent entity's file path since that's where it's defined
            obs_content_stems = "\n".join(
                p for p in self._generate_variants(obs.content) if p and p.strip()
            )
            rows_to_index.append(
                SearchIndexRow(
                    id=obs.id,
                    type=SearchItemType.OBSERVATION.value,
                    title=f"{obs.category}: {obs.content[:100]}...",
                    content_stems=obs_content_stems,
                    content_snippet=obs.content,
                    permalink=obs.permalink,
                    file_path=entity.file_path,
                    category=obs.category,
                    entity_id=entity.id,
                    metadata={
                        "tags": obs.tags,
                    },
                    created_at=entity.created_at,
                    updated_at=entity.updated_at,
                    project_id=entity.project_id,
                )
            )

        # Add relation rows (only outgoing relations defined in this file)
        for rel in entity.outgoing_relations:
            # Create descriptive title showing the relationship
            relation_title = (
                f"{rel.from_entity.title} → {rel.to_entity.title}"
                if rel.to_entity
                else f"{rel.from_entity.title}"
            )

            rel_content_stems = "\n".join(
                p for p in self._generate_variants(relation_title) if p and p.strip()
            )
            rows_to_index.append(
                SearchIndexRow(
                    id=rel.id,
                    title=relation_title,
                    permalink=rel.permalink,
                    content_stems=rel_content_stems,
                    file_path=entity.file_path,
                    type=SearchItemType.RELATION.value,
                    entity_id=entity.id,
                    from_id=rel.from_id,
                    to_id=rel.to_id,
                    relation_type=rel.relation_type,
                    created_at=entity.created_at,
                    updated_at=entity.updated_at,
                    project_id=entity.project_id,
                )
            )

        # Batch insert all rows at once
        await self.repository.bulk_index_items(rows_to_index)

    async def delete_by_permalink(self, permalink: str):
        """Delete an item from the search index."""
        await self.repository.delete_by_permalink(permalink)

    async def delete_by_entity_id(self, entity_id: int):
        """Delete an item from the search index."""
        await self.repository.delete_by_entity_id(entity_id)

    async def handle_delete(self, entity: Entity):
        """Handle complete entity deletion from search index including observations and relations.

        This replicates the logic from sync_service.handle_delete() to properly clean up
        all search index entries for an entity and its related data.
        """
        logger.debug(
            f"Cleaning up search index for entity_id={entity.id}, file_path={entity.file_path}, "
            f"observations={len(entity.observations)}, relations={len(entity.outgoing_relations)}"
        )

        # Clean up search index - same logic as sync_service.handle_delete()
        permalinks = (
            [entity.permalink]
            + [o.permalink for o in entity.observations]
            + [r.permalink for r in entity.outgoing_relations]
        )

        logger.debug(
            f"Deleting search index entries for entity_id={entity.id}, "
            f"index_entries={len(permalinks)}"
        )

        for permalink in permalinks:
            if permalink:
                await self.delete_by_permalink(permalink)
            else:
                await self.delete_by_entity_id(entity.id)

```
Page 7/17FirstPrevNextLast