#
tokens: 49167/50000 13/347 files (page 9/17)
lines: off (toggle) GitHub
raw markdown copy
This is page 9 of 17. Use http://codebase.md/basicmachines-co/basic-memory?lines=false&page={x} to view the full context.

# Directory Structure

```
├── .claude
│   ├── agents
│   │   ├── python-developer.md
│   │   └── system-architect.md
│   └── commands
│       ├── release
│       │   ├── beta.md
│       │   ├── changelog.md
│       │   ├── release-check.md
│       │   └── release.md
│       ├── spec.md
│       └── test-live.md
├── .dockerignore
├── .github
│   ├── dependabot.yml
│   ├── ISSUE_TEMPLATE
│   │   ├── bug_report.md
│   │   ├── config.yml
│   │   ├── documentation.md
│   │   └── feature_request.md
│   └── workflows
│       ├── claude-code-review.yml
│       ├── claude-issue-triage.yml
│       ├── claude.yml
│       ├── dev-release.yml
│       ├── docker.yml
│       ├── pr-title.yml
│       ├── release.yml
│       └── test.yml
├── .gitignore
├── .python-version
├── CHANGELOG.md
├── CITATION.cff
├── CLA.md
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── docker-compose.yml
├── Dockerfile
├── docs
│   ├── ai-assistant-guide-extended.md
│   ├── character-handling.md
│   ├── cloud-cli.md
│   └── Docker.md
├── justfile
├── LICENSE
├── llms-install.md
├── pyproject.toml
├── README.md
├── SECURITY.md
├── smithery.yaml
├── specs
│   ├── SPEC-1 Specification-Driven Development Process.md
│   ├── SPEC-10 Unified Deployment Workflow and Event Tracking.md
│   ├── SPEC-11 Basic Memory API Performance Optimization.md
│   ├── SPEC-12 OpenTelemetry Observability.md
│   ├── SPEC-13 CLI Authentication with Subscription Validation.md
│   ├── SPEC-14 Cloud Git Versioning & GitHub Backup.md
│   ├── SPEC-14- Cloud Git Versioning & GitHub Backup.md
│   ├── SPEC-15 Configuration Persistence via Tigris for Cloud Tenants.md
│   ├── SPEC-16 MCP Cloud Service Consolidation.md
│   ├── SPEC-17 Semantic Search with ChromaDB.md
│   ├── SPEC-18 AI Memory Management Tool.md
│   ├── SPEC-19 Sync Performance and Memory Optimization.md
│   ├── SPEC-2 Slash Commands Reference.md
│   ├── SPEC-20 Simplified Project-Scoped Rclone Sync.md
│   ├── SPEC-3 Agent Definitions.md
│   ├── SPEC-4 Notes Web UI Component Architecture.md
│   ├── SPEC-5 CLI Cloud Upload via WebDAV.md
│   ├── SPEC-6 Explicit Project Parameter Architecture.md
│   ├── SPEC-7 POC to spike Tigris Turso for local access to cloud data.md
│   ├── SPEC-8 TigrisFS Integration.md
│   ├── SPEC-9 Multi-Project Bidirectional Sync Architecture.md
│   ├── SPEC-9 Signed Header Tenant Information.md
│   └── SPEC-9-1 Follow-Ups- Conflict, Sync, and Observability.md
├── src
│   └── basic_memory
│       ├── __init__.py
│       ├── alembic
│       │   ├── alembic.ini
│       │   ├── env.py
│       │   ├── migrations.py
│       │   ├── script.py.mako
│       │   └── versions
│       │       ├── 3dae7c7b1564_initial_schema.py
│       │       ├── 502b60eaa905_remove_required_from_entity_permalink.py
│       │       ├── 5fe1ab1ccebe_add_projects_table.py
│       │       ├── 647e7a75e2cd_project_constraint_fix.py
│       │       ├── 9d9c1cb7d8f5_add_mtime_and_size_columns_to_entity_.py
│       │       ├── a1b2c3d4e5f6_fix_project_foreign_keys.py
│       │       ├── b3c3938bacdb_relation_to_name_unique_index.py
│       │       ├── cc7172b46608_update_search_index_schema.py
│       │       └── e7e1f4367280_add_scan_watermark_tracking_to_project.py
│       ├── api
│       │   ├── __init__.py
│       │   ├── app.py
│       │   ├── routers
│       │   │   ├── __init__.py
│       │   │   ├── directory_router.py
│       │   │   ├── importer_router.py
│       │   │   ├── knowledge_router.py
│       │   │   ├── management_router.py
│       │   │   ├── memory_router.py
│       │   │   ├── project_router.py
│       │   │   ├── prompt_router.py
│       │   │   ├── resource_router.py
│       │   │   ├── search_router.py
│       │   │   └── utils.py
│       │   └── template_loader.py
│       ├── cli
│       │   ├── __init__.py
│       │   ├── app.py
│       │   ├── auth.py
│       │   ├── commands
│       │   │   ├── __init__.py
│       │   │   ├── cloud
│       │   │   │   ├── __init__.py
│       │   │   │   ├── api_client.py
│       │   │   │   ├── bisync_commands.py
│       │   │   │   ├── cloud_utils.py
│       │   │   │   ├── core_commands.py
│       │   │   │   ├── rclone_commands.py
│       │   │   │   ├── rclone_config.py
│       │   │   │   ├── rclone_installer.py
│       │   │   │   ├── upload_command.py
│       │   │   │   └── upload.py
│       │   │   ├── command_utils.py
│       │   │   ├── db.py
│       │   │   ├── import_chatgpt.py
│       │   │   ├── import_claude_conversations.py
│       │   │   ├── import_claude_projects.py
│       │   │   ├── import_memory_json.py
│       │   │   ├── mcp.py
│       │   │   ├── project.py
│       │   │   ├── status.py
│       │   │   └── tool.py
│       │   └── main.py
│       ├── config.py
│       ├── db.py
│       ├── deps.py
│       ├── file_utils.py
│       ├── ignore_utils.py
│       ├── importers
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── chatgpt_importer.py
│       │   ├── claude_conversations_importer.py
│       │   ├── claude_projects_importer.py
│       │   ├── memory_json_importer.py
│       │   └── utils.py
│       ├── markdown
│       │   ├── __init__.py
│       │   ├── entity_parser.py
│       │   ├── markdown_processor.py
│       │   ├── plugins.py
│       │   ├── schemas.py
│       │   └── utils.py
│       ├── mcp
│       │   ├── __init__.py
│       │   ├── async_client.py
│       │   ├── project_context.py
│       │   ├── prompts
│       │   │   ├── __init__.py
│       │   │   ├── ai_assistant_guide.py
│       │   │   ├── continue_conversation.py
│       │   │   ├── recent_activity.py
│       │   │   ├── search.py
│       │   │   └── utils.py
│       │   ├── resources
│       │   │   ├── ai_assistant_guide.md
│       │   │   └── project_info.py
│       │   ├── server.py
│       │   └── tools
│       │       ├── __init__.py
│       │       ├── build_context.py
│       │       ├── canvas.py
│       │       ├── chatgpt_tools.py
│       │       ├── delete_note.py
│       │       ├── edit_note.py
│       │       ├── list_directory.py
│       │       ├── move_note.py
│       │       ├── project_management.py
│       │       ├── read_content.py
│       │       ├── read_note.py
│       │       ├── recent_activity.py
│       │       ├── search.py
│       │       ├── utils.py
│       │       ├── view_note.py
│       │       └── write_note.py
│       ├── models
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── knowledge.py
│       │   ├── project.py
│       │   └── search.py
│       ├── repository
│       │   ├── __init__.py
│       │   ├── entity_repository.py
│       │   ├── observation_repository.py
│       │   ├── project_info_repository.py
│       │   ├── project_repository.py
│       │   ├── relation_repository.py
│       │   ├── repository.py
│       │   └── search_repository.py
│       ├── schemas
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── cloud.py
│       │   ├── delete.py
│       │   ├── directory.py
│       │   ├── importer.py
│       │   ├── memory.py
│       │   ├── project_info.py
│       │   ├── prompt.py
│       │   ├── request.py
│       │   ├── response.py
│       │   ├── search.py
│       │   └── sync_report.py
│       ├── services
│       │   ├── __init__.py
│       │   ├── context_service.py
│       │   ├── directory_service.py
│       │   ├── entity_service.py
│       │   ├── exceptions.py
│       │   ├── file_service.py
│       │   ├── initialization.py
│       │   ├── link_resolver.py
│       │   ├── project_service.py
│       │   ├── search_service.py
│       │   └── service.py
│       ├── sync
│       │   ├── __init__.py
│       │   ├── background_sync.py
│       │   ├── sync_service.py
│       │   └── watch_service.py
│       ├── templates
│       │   └── prompts
│       │       ├── continue_conversation.hbs
│       │       └── search.hbs
│       └── utils.py
├── test-int
│   ├── BENCHMARKS.md
│   ├── cli
│   │   ├── test_project_commands_integration.py
│   │   └── test_version_integration.py
│   ├── conftest.py
│   ├── mcp
│   │   ├── test_build_context_underscore.py
│   │   ├── test_build_context_validation.py
│   │   ├── test_chatgpt_tools_integration.py
│   │   ├── test_default_project_mode_integration.py
│   │   ├── test_delete_note_integration.py
│   │   ├── test_edit_note_integration.py
│   │   ├── test_list_directory_integration.py
│   │   ├── test_move_note_integration.py
│   │   ├── test_project_management_integration.py
│   │   ├── test_project_state_sync_integration.py
│   │   ├── test_read_content_integration.py
│   │   ├── test_read_note_integration.py
│   │   ├── test_search_integration.py
│   │   ├── test_single_project_mcp_integration.py
│   │   └── test_write_note_integration.py
│   ├── test_db_wal_mode.py
│   ├── test_disable_permalinks_integration.py
│   └── test_sync_performance_benchmark.py
├── tests
│   ├── __init__.py
│   ├── api
│   │   ├── conftest.py
│   │   ├── test_async_client.py
│   │   ├── test_continue_conversation_template.py
│   │   ├── test_directory_router.py
│   │   ├── test_importer_router.py
│   │   ├── test_knowledge_router.py
│   │   ├── test_management_router.py
│   │   ├── test_memory_router.py
│   │   ├── test_project_router_operations.py
│   │   ├── test_project_router.py
│   │   ├── test_prompt_router.py
│   │   ├── test_relation_background_resolution.py
│   │   ├── test_resource_router.py
│   │   ├── test_search_router.py
│   │   ├── test_search_template.py
│   │   ├── test_template_loader_helpers.py
│   │   └── test_template_loader.py
│   ├── cli
│   │   ├── conftest.py
│   │   ├── test_cli_tools.py
│   │   ├── test_cloud_authentication.py
│   │   ├── test_ignore_utils.py
│   │   ├── test_import_chatgpt.py
│   │   ├── test_import_claude_conversations.py
│   │   ├── test_import_claude_projects.py
│   │   ├── test_import_memory_json.py
│   │   ├── test_project_add_with_local_path.py
│   │   └── test_upload.py
│   ├── conftest.py
│   ├── db
│   │   └── test_issue_254_foreign_key_constraints.py
│   ├── importers
│   │   ├── test_importer_base.py
│   │   └── test_importer_utils.py
│   ├── markdown
│   │   ├── __init__.py
│   │   ├── test_date_frontmatter_parsing.py
│   │   ├── test_entity_parser_error_handling.py
│   │   ├── test_entity_parser.py
│   │   ├── test_markdown_plugins.py
│   │   ├── test_markdown_processor.py
│   │   ├── test_observation_edge_cases.py
│   │   ├── test_parser_edge_cases.py
│   │   ├── test_relation_edge_cases.py
│   │   └── test_task_detection.py
│   ├── mcp
│   │   ├── conftest.py
│   │   ├── test_obsidian_yaml_formatting.py
│   │   ├── test_permalink_collision_file_overwrite.py
│   │   ├── test_prompts.py
│   │   ├── test_resources.py
│   │   ├── test_tool_build_context.py
│   │   ├── test_tool_canvas.py
│   │   ├── test_tool_delete_note.py
│   │   ├── test_tool_edit_note.py
│   │   ├── test_tool_list_directory.py
│   │   ├── test_tool_move_note.py
│   │   ├── test_tool_read_content.py
│   │   ├── test_tool_read_note.py
│   │   ├── test_tool_recent_activity.py
│   │   ├── test_tool_resource.py
│   │   ├── test_tool_search.py
│   │   ├── test_tool_utils.py
│   │   ├── test_tool_view_note.py
│   │   ├── test_tool_write_note.py
│   │   └── tools
│   │       └── test_chatgpt_tools.py
│   ├── Non-MarkdownFileSupport.pdf
│   ├── repository
│   │   ├── test_entity_repository_upsert.py
│   │   ├── test_entity_repository.py
│   │   ├── test_entity_upsert_issue_187.py
│   │   ├── test_observation_repository.py
│   │   ├── test_project_info_repository.py
│   │   ├── test_project_repository.py
│   │   ├── test_relation_repository.py
│   │   ├── test_repository.py
│   │   ├── test_search_repository_edit_bug_fix.py
│   │   └── test_search_repository.py
│   ├── schemas
│   │   ├── test_base_timeframe_minimum.py
│   │   ├── test_memory_serialization.py
│   │   ├── test_memory_url_validation.py
│   │   ├── test_memory_url.py
│   │   ├── test_schemas.py
│   │   └── test_search.py
│   ├── Screenshot.png
│   ├── services
│   │   ├── test_context_service.py
│   │   ├── test_directory_service.py
│   │   ├── test_entity_service_disable_permalinks.py
│   │   ├── test_entity_service.py
│   │   ├── test_file_service.py
│   │   ├── test_initialization.py
│   │   ├── test_link_resolver.py
│   │   ├── test_project_removal_bug.py
│   │   ├── test_project_service_operations.py
│   │   ├── test_project_service.py
│   │   └── test_search_service.py
│   ├── sync
│   │   ├── test_character_conflicts.py
│   │   ├── test_sync_service_incremental.py
│   │   ├── test_sync_service.py
│   │   ├── test_sync_wikilink_issue.py
│   │   ├── test_tmp_files.py
│   │   ├── test_watch_service_edge_cases.py
│   │   ├── test_watch_service_reload.py
│   │   └── test_watch_service.py
│   ├── test_config.py
│   ├── test_db_migration_deduplication.py
│   ├── test_deps.py
│   ├── test_production_cascade_delete.py
│   ├── test_rclone_commands.py
│   └── utils
│       ├── test_file_utils.py
│       ├── test_frontmatter_obsidian_compatible.py
│       ├── test_parse_tags.py
│       ├── test_permalink_formatting.py
│       ├── test_utf8_handling.py
│       └── test_validate_project_path.py
├── uv.lock
├── v0.15.0-RELEASE-DOCS.md
└── v15-docs
    ├── api-performance.md
    ├── background-relations.md
    ├── basic-memory-home.md
    ├── bug-fixes.md
    ├── chatgpt-integration.md
    ├── cloud-authentication.md
    ├── cloud-bisync.md
    ├── cloud-mode-usage.md
    ├── cloud-mount.md
    ├── default-project-mode.md
    ├── env-file-removal.md
    ├── env-var-overrides.md
    ├── explicit-project-parameter.md
    ├── gitignore-integration.md
    ├── project-root-env-var.md
    ├── README.md
    └── sqlite-performance.md
```

# Files

--------------------------------------------------------------------------------
/src/basic_memory/mcp/tools/edit_note.py:
--------------------------------------------------------------------------------

```python
"""Edit note tool for Basic Memory MCP server."""

from typing import Optional

from loguru import logger
from fastmcp import Context

from basic_memory.mcp.async_client import get_client
from basic_memory.mcp.project_context import get_active_project, add_project_metadata
from basic_memory.mcp.server import mcp
from basic_memory.mcp.tools.utils import call_patch
from basic_memory.schemas import EntityResponse


def _format_error_response(
    error_message: str,
    operation: str,
    identifier: str,
    find_text: Optional[str] = None,
    expected_replacements: int = 1,
    project: Optional[str] = None,
) -> str:
    """Format helpful error responses for edit_note failures that guide the AI to retry successfully."""

    # Entity not found errors
    if "Entity not found" in error_message or "entity not found" in error_message.lower():
        return f"""# Edit Failed - Note Not Found

The note with identifier '{identifier}' could not be found. Edit operations require an exact match (no fuzzy matching).

## Suggestions to try:
1. **Search for the note first**: Use `search_notes("{project or "project-name"}", "{identifier.split("/")[-1]}")` to find similar notes with exact identifiers
2. **Try different exact identifier formats**:
   - If you used a permalink like "folder/note-title", try the exact title: "{identifier.split("/")[-1].replace("-", " ").title()}"
   - If you used a title, try the exact permalink format: "{identifier.lower().replace(" ", "-")}"
   - Use `read_note("{project or "project-name"}", "{identifier}")` first to verify the note exists and get the exact identifier

## Alternative approach:
Use `write_note("{project or "project-name"}", "title", "content", "folder")` to create the note first, then edit it."""

    # Find/replace specific errors
    if operation == "find_replace":
        if "Text to replace not found" in error_message:
            return f"""# Edit Failed - Text Not Found

The text '{find_text}' was not found in the note '{identifier}'.

## Suggestions to try:
1. **Read the note first**: Use `read_note("{project or "project-name"}", "{identifier}")` to see the current content
2. **Check for exact matches**: The search is case-sensitive and must match exactly
3. **Try a broader search**: Search for just part of the text you want to replace
4. **Use expected_replacements=0**: If you want to verify the text doesn't exist

## Alternative approaches:
- Use `append` or `prepend` to add new content instead
- Use `replace_section` if you're trying to update a specific section"""

        if "Expected" in error_message and "occurrences" in error_message:
            # Extract the actual count from error message if possible
            import re

            match = re.search(r"found (\d+)", error_message)
            actual_count = match.group(1) if match else "a different number of"

            return f"""# Edit Failed - Wrong Replacement Count

Expected {expected_replacements} occurrences of '{find_text}' but found {actual_count}.

## How to fix:
1. **Read the note first**: Use `read_note("{project or "project-name"}", "{identifier}")` to see how many times '{find_text}' appears
2. **Update expected_replacements**: Set expected_replacements={actual_count} in your edit_note call
3. **Be more specific**: If you only want to replace some occurrences, make your find_text more specific

## Example:
```
edit_note("{project or "project-name"}", "{identifier}", "find_replace", "new_text", find_text="{find_text}", expected_replacements={actual_count})
```"""

    # Section replacement errors
    if operation == "replace_section" and "Multiple sections" in error_message:
        return f"""# Edit Failed - Duplicate Section Headers

Multiple sections found with the same header in note '{identifier}'.

## How to fix:
1. **Read the note first**: Use `read_note("{project or "project-name"}", "{identifier}")` to see the document structure
2. **Make headers unique**: Add more specific text to distinguish sections
3. **Use append instead**: Add content at the end rather than replacing a specific section

## Alternative approach:
Use `find_replace` to update specific text within the duplicate sections."""

    # Generic server/request errors
    if (
        "Invalid request" in error_message or "malformed" in error_message.lower()
    ):  # pragma: no cover
        return f"""# Edit Failed - Request Error

There was a problem with the edit request to note '{identifier}': {error_message}.

## Common causes and fixes:
1. **Note doesn't exist**: Use `search_notes("{project or "project-name"}", "query")` or `read_note("{project or "project-name"}", "{identifier}")` to verify the note exists
2. **Invalid identifier format**: Try different identifier formats (title vs permalink)
3. **Empty or invalid content**: Check that your content is properly formatted
4. **Server error**: Try the operation again, or use `read_note()` first to verify the note state

## Troubleshooting steps:
1. Verify the note exists: `read_note("{project or "project-name"}", "{identifier}")`
2. If not found, search for it: `search_notes("{project or "project-name"}", "{identifier.split("/")[-1]}")`
3. Try again with the correct identifier from the search results"""

    # Fallback for other errors
    return f"""# Edit Failed

Error editing note '{identifier}': {error_message}

## General troubleshooting:
1. **Verify the note exists**: Use `read_note("{project or "project-name"}", "{identifier}")` to check
2. **Check your parameters**: Ensure all required parameters are provided correctly
3. **Read the note content first**: Use `read_note("{project or "project-name"}", "{identifier}")` to understand the current structure
4. **Try a simpler operation**: Start with `append` if other operations fail

## Need help?
- Use `search_notes("{project or "project-name"}", "query")` to find notes
- Use `read_note("{project or "project-name"}", "identifier")` to examine content before editing
- Check that identifiers, section headers, and find_text match exactly"""


@mcp.tool(
    description="Edit an existing markdown note using various operations like append, prepend, find_replace, or replace_section.",
)
async def edit_note(
    identifier: str,
    operation: str,
    content: str,
    project: Optional[str] = None,
    section: Optional[str] = None,
    find_text: Optional[str] = None,
    expected_replacements: int = 1,
    context: Context | None = None,
) -> str:
    """Edit an existing markdown note in the knowledge base.

    Makes targeted changes to existing notes without rewriting the entire content.

    Project Resolution:
    Server resolves projects in this order: Single Project Mode → project parameter → default project.
    If project unknown, use list_memory_projects() or recent_activity() first.

    Args:
        identifier: The exact title, permalink, or memory:// URL of the note to edit.
                   Must be an exact match - fuzzy matching is not supported for edit operations.
                   Use search_notes() or read_note() first to find the correct identifier if uncertain.
        operation: The editing operation to perform:
                  - "append": Add content to the end of the note
                  - "prepend": Add content to the beginning of the note
                  - "find_replace": Replace occurrences of find_text with content
                  - "replace_section": Replace content under a specific markdown header
        content: The content to add or use for replacement
        project: Project name to edit in. Optional - server will resolve using hierarchy.
                If unknown, use list_memory_projects() to discover available projects.
        section: For replace_section operation - the markdown header to replace content under (e.g., "## Notes", "### Implementation")
        find_text: For find_replace operation - the text to find and replace
        expected_replacements: For find_replace operation - the expected number of replacements (validation will fail if actual doesn't match)
        context: Optional FastMCP context for performance caching.

    Returns:
        A markdown formatted summary of the edit operation and resulting semantic content,
        including operation details, file path, observations, relations, and project metadata.

    Examples:
        # Add new content to end of note
        edit_note("my-project", "project-planning", "append", "\\n## New Requirements\\n- Feature X\\n- Feature Y")

        # Add timestamp at beginning (frontmatter-aware)
        edit_note("work-docs", "meeting-notes", "prepend", "## 2025-05-25 Update\\n- Progress update...\\n\\n")

        # Update version number (single occurrence)
        edit_note("api-project", "config-spec", "find_replace", "v0.13.0", find_text="v0.12.0")

        # Update version in multiple places with validation
        edit_note("docs-project", "api-docs", "find_replace", "v2.1.0", find_text="v2.0.0", expected_replacements=3)

        # Replace text that appears multiple times - validate count first
        edit_note("team-docs", "docs/guide", "find_replace", "new-api", find_text="old-api", expected_replacements=5)

        # Replace implementation section
        edit_note("specs", "api-spec", "replace_section", "New implementation approach...\\n", section="## Implementation")

        # Replace subsection with more specific header
        edit_note("docs", "docs/setup", "replace_section", "Updated install steps\\n", section="### Installation")

        # Using different identifier formats (must be exact matches)
        edit_note("work-project", "Meeting Notes", "append", "\\n- Follow up on action items")  # exact title
        edit_note("work-project", "docs/meeting-notes", "append", "\\n- Follow up tasks")       # exact permalink

        # If uncertain about identifier, search first:
        # search_notes("work-project", "meeting")  # Find available notes
        # edit_note("work-project", "docs/meeting-notes-2025", "append", "content")  # Use exact result

        # Add new section to document
        edit_note("planning", "project-plan", "replace_section", "TBD - needs research\\n", section="## Future Work")

        # Update status across document (expecting exactly 2 occurrences)
        edit_note("reports", "status-report", "find_replace", "In Progress", find_text="Not Started", expected_replacements=2)

    Raises:
        HTTPError: If project doesn't exist or is inaccessible
        ValueError: If operation is invalid or required parameters are missing
        SecurityError: If identifier attempts path traversal

    Note:
        Edit operations require exact identifier matches. If unsure, use read_note() or
        search_notes() first to find the correct identifier. The tool provides detailed
        error messages with suggestions if operations fail.
    """
    async with get_client() as client:
        active_project = await get_active_project(client, project, context)
        project_url = active_project.project_url

        logger.info("MCP tool call", tool="edit_note", identifier=identifier, operation=operation)

        # Validate operation
        valid_operations = ["append", "prepend", "find_replace", "replace_section"]
        if operation not in valid_operations:
            raise ValueError(
                f"Invalid operation '{operation}'. Must be one of: {', '.join(valid_operations)}"
            )

        # Validate required parameters for specific operations
        if operation == "find_replace" and not find_text:
            raise ValueError("find_text parameter is required for find_replace operation")
        if operation == "replace_section" and not section:
            raise ValueError("section parameter is required for replace_section operation")

        # Use the PATCH endpoint to edit the entity
        try:
            # Prepare the edit request data
            edit_data = {
                "operation": operation,
                "content": content,
            }

            # Add optional parameters
            if section:
                edit_data["section"] = section
            if find_text:
                edit_data["find_text"] = find_text
            if expected_replacements != 1:  # Only send if different from default
                edit_data["expected_replacements"] = str(expected_replacements)

            # Call the PATCH endpoint
            url = f"{project_url}/knowledge/entities/{identifier}"
            response = await call_patch(client, url, json=edit_data)
            result = EntityResponse.model_validate(response.json())

            # Format summary
            summary = [
                f"# Edited note ({operation})",
                f"project: {active_project.name}",
                f"file_path: {result.file_path}",
                f"permalink: {result.permalink}",
                f"checksum: {result.checksum[:8] if result.checksum else 'unknown'}",
            ]

            # Add operation-specific details
            if operation == "append":
                lines_added = len(content.split("\n"))
                summary.append(f"operation: Added {lines_added} lines to end of note")
            elif operation == "prepend":
                lines_added = len(content.split("\n"))
                summary.append(f"operation: Added {lines_added} lines to beginning of note")
            elif operation == "find_replace":
                # For find_replace, we can't easily count replacements from here
                # since we don't have the original content, but the server handled it
                summary.append("operation: Find and replace operation completed")
            elif operation == "replace_section":
                summary.append(f"operation: Replaced content under section '{section}'")

            # Count observations by category (reuse logic from write_note)
            categories = {}
            if result.observations:
                for obs in result.observations:
                    categories[obs.category] = categories.get(obs.category, 0) + 1

                summary.append("\\n## Observations")
                for category, count in sorted(categories.items()):
                    summary.append(f"- {category}: {count}")

            # Count resolved/unresolved relations
            unresolved = 0
            resolved = 0
            if result.relations:
                unresolved = sum(1 for r in result.relations if not r.to_id)
                resolved = len(result.relations) - unresolved

                summary.append("\\n## Relations")
                summary.append(f"- Resolved: {resolved}")
                if unresolved:
                    summary.append(f"- Unresolved: {unresolved}")

            logger.info(
                "MCP tool response",
                tool="edit_note",
                operation=operation,
                project=active_project.name,
                permalink=result.permalink,
                observations_count=len(result.observations),
                relations_count=len(result.relations),
                status_code=response.status_code,
            )

            result = "\n".join(summary)
            return add_project_metadata(result, active_project.name)

        except Exception as e:
            logger.error(f"Error editing note: {e}")
            return _format_error_response(
                str(e), operation, identifier, find_text, expected_replacements, active_project.name
            )

```

--------------------------------------------------------------------------------
/specs/SPEC-7 POC to spike Tigris Turso for local access to cloud data.md:
--------------------------------------------------------------------------------

```markdown
---
title: 'SPEC-7: POC to spike Tigris/Turso for local access to cloud data'
type: spec
permalink: specs/spec-7-poc-tigris-turso-local-access-cloud-data
tags:
- poc
- tigris
- turso
- cloud-storage
- architecture
- proof-of-concept
---

# SPEC-7: POC to spike Tigris/Turso for local access to cloud data

> **Status Update**: ✅ **Phase 1 COMPLETE** (September 20, 2025)
> TigrisFS mounting validated successfully in containerized environments. Container startup, filesystem mounting, and Fly.io integration all working correctly. Ready for Phase 2 (Turso database integration).
> See: [`SPEC-7-PHASE-1-RESULTS.md`](./SPEC-7-PHASE-1-RESULTS.md)

## Why

Current basic-memory-cloud architecture uses Fly volumes for tenant file storage, which creates several limitations:

We could enable a revolutionary user experience: **local editing (or at least view access) of cloud-stored files** while maintaining Basic Memory's existing filesystem assumptions.

1. **Storage Scalability**: Fly volumes require pre-provisioning and don't auto-scale with usage
2. **Single Instance**: Volumes can only be mounted to one fly machine instance
3. **Cost Model**: Volume pricing vs object storage pricing may be less favorable at scale
4. **Local Development**: No way for users to mount their cloud tenant files locally for real-time editing
5. **Multi-Region**: Volumes are region-locked, limiting global deployment flexibility
6. **Backup/Disaster Recovery**: Object storage provides better durability and replication options

Basic Memory requires POSIX filesystem semantics but could benefit from object storage durability and accessibility. By combining:
- **Tigris object storage and TigrisFS** for file persistence in bucket stoage via a POSIX filesystem on the tenant instance
- **Turso/libSQL** for SQLite indexing (replacing local .db files). Sqlite on NFS volumes is disouraged. 

## What

This specification defines a proof-of-concept to validate the technical feasibility of the Tigris/Turso architecture for basic-memory-cloud tenants.

**Affected Areas:**
- **Storage Architecture**: Replace Fly volumes with Tigris object storage
- **Database Architecture**: Replace local SQLite with Turso remote database
- **Container Setup**: Add TigrisFS mounting in tenant containers
- **Local Development**: Enable local mounting of cloud tenant data
- **Basic Memory Core**: Validate unchanged operation over mounted filesystems

**Key Components:**
- **Tigris Storage**: Globally caching S3-compatible object storage via Fly.io integration
- **TigrisFS**: Purpose-built FUSE filesystem with intelligent caching
- **Turso Database**: Hosted libSQL for SQLite replacement
- **Single-Tenant Model**: One bucket + one database per tenant (simplified isolation)

## Architectural Overview & Key Insights

### TigrisFS

Unlike standard S3 mounting approaches, **TigrisFS is a purpose-built FUSE filesystem** optimized for object storage with several critical advantages:

1. **Eliminates Fly Volume Limitations**
   - No single-machine attachment constraints
   - No pre-provisioning of storage capacity
   - Enables horizontal scaling and zero-downtime deployments
   - Automatic global CDN caching at Fly.io edge locations

2. **Intelligent Caching Architecture**
   - 1-4GB+ configurable memory cache for read/write operations
   - Write-back caching for improved performance
   - Metadata cache to reduce API calls
   - "Close to Redis speed" for small object retrieval

3. **Cost-Effective Model**
   - Pay only for storage used and transferred
   - No wasted capacity from over-provisioning
   - Automatic global replication included
   - S3 durability with CDN performance

### API-Driven Architecture Eliminates File Watching Concerns

**Critical Insight**: All file access (reads/writes) in basic-memory-cloud go through the API layer:
- **MCP Tools → API**: All Basic Memory operations use FastAPI endpoints
- **Web App → API**: Frontend uses API for all data modifications
- **File watching is NOT required** for cloud operations, unlike local BM which uses the WatchService to monitor file changes.

This means:
- **Cloud Operations**: Manual sync after API writes is sufficient
- **Local Development**: File watching only matters for local editing experience
- **Performance Risk**: Dramatically reduced since we're not dependent on inotify over network filesystems

### Realistic Local Access Expectations

**Baseline Functionality (Guaranteed):**
- Read-only mounting for browsing cloud files
- Easy download/upload of entire projects
- File copying via standard filesystem operations

**Stretch Goal (Test in POC):**
- Live editing with eventual consistency (1-5 second delays acceptable)
- Automatic sync for local changes
- Not required for core functionality - pure upside if it works

### Production Deployment Advantages

1. **Multi-Region Deployment**: Tigris handles global replication automatically
2. **Zero-Downtime Updates**: No volume detach/attach during deployments
3. **Tenant Migrations**: Simply update credentials, no data movement
4. **Disaster Recovery**: Built into S3 durability model (99.999999999% durability)
5. **Auto-Scaling**: Storage scales with usage, no capacity planning needed


## How (High Level)

### POC Approach: Server-First Validation

**Rationale**: Start with server-side TigrisFS mounting because:
- Local access is meaningless if cloud containers can't mount TigrisFS reliably
- Container startup and API performance are critical path blockers
- TigrisFS compatibility with Basic Memory operations must be proven first
- Each phase gates the next - no point testing local access if server-side fails

### Phase 1: Server-Side TigrisFS Validation (Critical Foundation) ✅ COMPLETE
- [x] Set up Tigris bucket with test data via Fly.io integration
- [x] Create container image with TigrisFS support and dependencies
- [x] Test TigrisFS mounting in containerized environment
- [x] Run Basic Memory API operations over mounted TigrisFS
- [x] Validate all filesystem operations work correctly
- [x] Measure container startup time and resource usage

**Production Validation Results**: Container successfully deployed and operated for 42+ minutes serving real MCP requests with repository queries, knowledge graph navigation, and full Basic Memory API functionality over TigrisFS-mounted storage.

### Phase 2: Database Migration to Turso
- [ ] Set up Turso account and test database
- [ ] Modify Basic Memory to accept external DATABASE_URL
- [ ] Test all MCP tools with remote SQLite via Turso
- [ ] Validate performance and functionality parity
- [ ] Test API write → manual sync workflow in container

### Phase 3: Production Container Integration
- [ ] Implement tenant-specific credential management for buckets
- [x] Test container startup with automatic TigrisFS mounting
- [ ] Validate isolation between tenant containers
- [ ] Test API operations under realistic load
- [ ] Measure performance vs current Fly volume setup

### Phase 4: Local Access Validation (Bonus Feature)
- [ ] Test local TigrisFS mounting of tenant data
- [ ] Validate read-only access for browsing/downloading
- [ ] Test file copying and upload workflows
- [ ] Measure latency impact on user experience
- [ ] Test live editing if file watching works (stretch goal)

### Architecture Overview
```
Local Development:
┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│ Local TigrisFS  │───▶│ Tigris Bucket   │◀───│ Tenant Container│
│ Mount           │    │ (Global CDN)    │    │ TigrisFS mount  │
└─────────────────┘    └─────────────────┘    └─────────────────┘
         │                                              │
         ▼                                              ▼
┌─────────────────┐                            ┌─────────────────┐
│ Basic Memory    │                            │ Basic Memory    │
│ (local files)   │                            │ API + mounted   │
└─────────────────┘                            └─────────────────┘
         │                                              │
         ▼                                              ▼
┌─────────────────┐                            ┌─────────────────┐
│ Turso Database  │◀───────────────────────────│ Turso Database  │
│ (shared index)  │                            │ (shared index)  │
└─────────────────┘                            └─────────────────┘

Flow: API writes → Manual sync → Index update
Local: File watching (if available) → Auto sync
```

## How to Evaluate

### Success Criteria
- [x] **Filesystem Compatibility**: Basic Memory operates without modification over TigrisFS-mounted storage
- [x] **Performance Acceptable**: API-driven operations perform within acceptable latency (target: <500ms for typical operations)
- [ ] **Database Functionality**: All Basic Memory features work with Turso remote SQLite
- [x] **Container Reliability**: Tenant containers start successfully with automatic TigrisFS mounting
- [ ] **Local Access Baseline**: Users can mount cloud files locally for read-only browsing and file copying
- [x] **Data Isolation**: Tenant data remains properly isolated using bucket/database separation
- [ ] **Local Access Stretch**: Live editing with eventual sync (1-5 second delays acceptable)

### Testing Procedure

#### Phase 1: Server-Side Foundation Testing
1. **Container TigrisFS Test**:
   ```dockerfile
   # Test container with TigrisFS mounting
   FROM python:3.12
   RUN apt-get update && apt-get install -y tigrisfs

   # Test startup script
   #!/bin/bash
   tigrisfs --memory-limit 2048 $TIGRIS_BUCKET /app/data --daemon
   cd /app/data && basic-memory sync
   basic-memory-api --data-dir /app/data
   ```

2. **API Operations Validation**:
   ```bash
   # Test all MCP operations over TigrisFS
   curl -X POST /api/write_note -d '{"title":"test","content":"content"}'
   curl -X GET /api/read_note/test
   curl -X GET /api/search_notes?q=content
   # Measure: response times, error rates, data consistency
   ```

#### Phase 2: Database Integration Testing
3. **Turso Integration Test**:
   ```bash
   # Configure Turso connection in container
   export DATABASE_URL="libsql://test-db.turso.io?authToken=..."

   # Test all MCP tools with remote database
   basic-memory tools # Test each tool functionality
   # Test API write → manual sync workflow
   ```

#### Phase 3: Production Readiness Testing
4. **Performance Benchmarking**:
   - Container startup time with TigrisFS mounting
   - API operation response times (target: <500ms for typical operations)
   - Search query performance with Turso (target: comparable to local SQLite)
   - TigrisFS cache hit rates and memory usage
   - Concurrent tenant isolation

#### Phase 4: Local Access Testing (If Phase 1-3 Succeed)
5. **Local Access Validation**:
   ```bash
   # Test read-only access
   tigrisfs tenant-bucket ~/local-tenant
   ls -la ~/local-tenant  # Browse files
   cp ~/local-tenant/notes/* ~/backup/  # Copy files

   # Test file watching (stretch goal)
   echo "test" > ~/local-tenant/test.md
   # Check if changes sync to cloud
   ```

### Go/No-Go Criteria by Phase
- **Phase 1**: Container must start successfully and serve API requests over TigrisFS
- **Phase 2**: All MCP tools must work with Turso with <2x latency increase
- **Phase 3**: Performance must be within 50% of current Fly volume setup
- **Phase 4**: Local mounting must work reliably for read-only access

### Risk Assessment
**Moderate Risk Items (Mitigated by API-First Architecture)**:
- [ ] TigrisFS performance for local access may have higher latency than local filesystem
- [ ] File watching (`inotify`) over FUSE may be unreliable for local development
- [ ] Network interruptions could cause filesystem errors during local editing
- [ ] Write-back caching could cause data loss if container crashes during flush

**Low Risk Items (API-First Eliminates)**:
- [ ] ~~Real-time file watching~~ - Not required for cloud operations
- [ ] ~~Concurrent write consistency~~ - Single-tenant model with API coordination
- [ ] ~~S3 rate limits~~ - TigrisFS intelligent caching handles this

**Mitigation Strategies**:
- **Performance**: Comprehensive benchmarking with realistic workloads
- **Reliability**: Graceful degradation to read-only local access if live editing fails
- **Data Safety**: Regular sync intervals and write-through mode for critical operations
- **Fallback**: Keep Fly volumes as backup deployment option

### Metrics to Track
- **API Latency**: Response times for MCP tools and web operations
- **Cache Effectiveness**: TigrisFS cache hit rates and memory usage
- **Local Access Performance**: File browsing and copying speeds
- **Reliability**: Success rate of mount operations and data consistency
- **Cost**: Storage usage, API calls, and network transfer costs vs current volumes

## Notes

### Key Architectural Decisions
- **Single tenant per bucket/database**: Simplifies isolation and credential management
- **Maintain POSIX compatibility**: Preserve Basic Memory's existing filesystem assumptions
- **TigrisFS over rclone**: Purpose-built for object storage with intelligent caching
- **Turso for SQLite**: Leverages specialized remote SQLite expertise
- **API-first approach**: Eliminates file watching dependency for cloud operations

### Alternative Approaches Considered
- **S3-native storage backend**: Would require Basic Memory architecture changes
- **Hybrid approach**: Local files + cloud sync (adds complexity)
- **Standard rclone mounting**: Less optimized than TigrisFS for object storage workloads
- **Keep Fly volumes**: Maintains current limitations but proven reliability

### Integration Points
- [ ] Fly.io Tigris integration for bucket provisioning
- [ ] Turso account setup and database provisioning
- [ ] Container image modifications for TigrisFS support
- [ ] Credential management for tenant isolation
- [ ] API modification for manual sync triggers
- [ ] Local client setup documentation for TigrisFS mounting

## Observations

- [architecture] Tigris/Turso split cleanly separates file storage from indexing concerns #storage-separation
- [breakthrough] API-first architecture eliminates file watching dependency for cloud operations #api-first-advantage
- [user-experience] Local mounting of cloud files could be revolutionary for knowledge management #local-cloud-hybrid
- [compatibility] Maintaining POSIX filesystem assumptions preserves Basic Memory's local/cloud compatibility #architecture-preservation
- [simplification] Single tenant per bucket eliminates complex multi-tenancy in storage layer #tenant-isolation
- [performance] TigrisFS intelligent caching could provide near-local performance for common operations #tigrisfs-advantage
- [deployment] Zero-downtime updates become trivial without volume constraints #deployment-simplification
- [benefit] Object storage pricing model could be more favorable than volume pricing #cost-optimization
- [innovation] Read-only local access alone would address major SaaS limitation #competitive-advantage
- [risk-mitigation] API-driven sync reduces performance requirements vs real-time file watching #risk-reduction

## Relations

- implements [[SPEC-6 Explicit Project Parameter Architecture]]
- requires [[Fly.io Tigris Integration]]
- enables [[Local Cloud File Access]]
- alternative_to [[Fly Volume Storage]]

## Links
- https://fly.io/hello/tigris
- https://fly.io/docs/tigris/
- https://www.tigrisdata.com/docs/sdks/fly/data-migration-with-flyctl/
- https://www.tigrisdata.com/docs/training/tigrisfs/
- https://www.tigrisdata.com/blog/tigris-filesystem/
- https://www.tigrisdata.com/docs/quickstarts/rclone/

```

--------------------------------------------------------------------------------
/src/basic_memory/repository/repository.py:
--------------------------------------------------------------------------------

```python
"""Base repository implementation."""

from typing import Type, Optional, Any, Sequence, TypeVar, List, Dict

from loguru import logger
from sqlalchemy import (
    select,
    func,
    Select,
    Executable,
    inspect,
    Result,
    and_,
    delete,
)
from sqlalchemy.exc import NoResultFound
from sqlalchemy.ext.asyncio import async_sessionmaker, AsyncSession
from sqlalchemy.orm.interfaces import LoaderOption
from sqlalchemy.sql.elements import ColumnElement

from basic_memory import db
from basic_memory.models import Base

T = TypeVar("T", bound=Base)


class Repository[T: Base]:
    """Base repository implementation with generic CRUD operations."""

    def __init__(
        self,
        session_maker: async_sessionmaker[AsyncSession],
        Model: Type[T],
        project_id: Optional[int] = None,
    ):
        self.session_maker = session_maker
        self.project_id = project_id
        if Model:
            self.Model = Model
            self.mapper = inspect(self.Model).mapper
            self.primary_key: ColumnElement[Any] = self.mapper.primary_key[0]
            self.valid_columns = [column.key for column in self.mapper.columns]
            # Check if this model has a project_id column
            self.has_project_id = "project_id" in self.valid_columns

    def _set_project_id_if_needed(self, model: T) -> None:
        """Set project_id on model if needed and available."""
        if (
            self.has_project_id
            and self.project_id is not None
            and getattr(model, "project_id", None) is None
        ):
            setattr(model, "project_id", self.project_id)

    def get_model_data(self, entity_data):
        model_data = {
            k: v for k, v in entity_data.items() if k in self.valid_columns and v is not None
        }
        return model_data

    def _add_project_filter(self, query: Select) -> Select:
        """Add project_id filter to query if applicable.

        Args:
            query: The SQLAlchemy query to modify

        Returns:
            Updated query with project filter if applicable
        """
        if self.has_project_id and self.project_id is not None:
            query = query.filter(getattr(self.Model, "project_id") == self.project_id)
        return query

    async def select_by_id(self, session: AsyncSession, entity_id: int) -> Optional[T]:
        """Select an entity by ID using an existing session."""
        query = (
            select(self.Model)
            .filter(self.primary_key == entity_id)
            .options(*self.get_load_options())
        )
        # Add project filter if applicable
        query = self._add_project_filter(query)

        result = await session.execute(query)
        return result.scalars().one_or_none()

    async def select_by_ids(self, session: AsyncSession, ids: List[int]) -> Sequence[T]:
        """Select multiple entities by IDs using an existing session."""
        query = (
            select(self.Model).where(self.primary_key.in_(ids)).options(*self.get_load_options())
        )
        # Add project filter if applicable
        query = self._add_project_filter(query)

        result = await session.execute(query)
        return result.scalars().all()

    async def add(self, model: T) -> T:
        """
        Add a model to the repository. This will also add related objects
        :param model: the model to add
        :return: the added model instance
        """
        async with db.scoped_session(self.session_maker) as session:
            # Set project_id if applicable and not already set
            self._set_project_id_if_needed(model)

            session.add(model)
            await session.flush()

            # Query within same session
            found = await self.select_by_id(session, model.id)  # pyright: ignore [reportAttributeAccessIssue]
            if found is None:  # pragma: no cover
                logger.error(
                    "Failed to retrieve model after add",
                    model_type=self.Model.__name__,
                    model_id=model.id,  # pyright: ignore
                )
                raise ValueError(
                    f"Can't find {self.Model.__name__} with ID {model.id} after session.add"  # pyright: ignore
                )
            return found

    async def add_all(self, models: List[T]) -> Sequence[T]:
        """
        Add a list of models to the repository. This will also add related objects
        :param models: the models to add
        :return: the added models instances
        """
        async with db.scoped_session(self.session_maker) as session:
            # set the project id if not present in models
            for model in models:
                self._set_project_id_if_needed(model)

            session.add_all(models)
            await session.flush()

            # Query within same session
            return await self.select_by_ids(session, [m.id for m in models])  # pyright: ignore [reportAttributeAccessIssue]

    def select(self, *entities: Any) -> Select:
        """Create a new SELECT statement.

        Returns:
            A SQLAlchemy Select object configured with the provided entities
            or this repository's model if no entities provided.
        """
        if not entities:
            entities = (self.Model,)
        query = select(*entities)

        # Add project filter if applicable
        return self._add_project_filter(query)

    async def find_all(
        self, skip: int = 0, limit: Optional[int] = None, use_load_options: bool = True
    ) -> Sequence[T]:
        """Fetch records from the database with pagination.

        Args:
            skip: Number of records to skip
            limit: Maximum number of records to return
            use_load_options: Whether to apply eager loading options (default: True)
        """
        logger.debug(f"Finding all {self.Model.__name__} (skip={skip}, limit={limit})")

        async with db.scoped_session(self.session_maker) as session:
            query = select(self.Model).offset(skip)

            # Only apply load options if requested
            if use_load_options:
                query = query.options(*self.get_load_options())

            # Add project filter if applicable
            query = self._add_project_filter(query)

            if limit:
                query = query.limit(limit)

            result = await session.execute(query)

            items = result.scalars().all()
            logger.debug(f"Found {len(items)} {self.Model.__name__} records")
            return items

    async def find_by_id(self, entity_id: int) -> Optional[T]:
        """Fetch an entity by its unique identifier."""
        logger.debug(f"Finding {self.Model.__name__} by ID: {entity_id}")

        async with db.scoped_session(self.session_maker) as session:
            return await self.select_by_id(session, entity_id)

    async def find_by_ids(self, ids: List[int]) -> Sequence[T]:
        """Fetch multiple entities by their identifiers in a single query."""
        logger.debug(f"Finding {self.Model.__name__} by IDs: {ids}")

        async with db.scoped_session(self.session_maker) as session:
            return await self.select_by_ids(session, ids)

    async def find_one(self, query: Select[tuple[T]]) -> Optional[T]:
        """Execute a query and retrieve a single record."""
        # add in load options
        query = query.options(*self.get_load_options())
        result = await self.execute_query(query)
        entity = result.scalars().one_or_none()

        if entity:
            logger.trace(f"Found {self.Model.__name__}: {getattr(entity, 'id', None)}")
        else:
            logger.trace(f"No {self.Model.__name__} found")
        return entity

    async def create(self, data: dict) -> T:
        """Create a new record from a model instance."""
        logger.debug(f"Creating {self.Model.__name__} from entity_data: {data}")
        async with db.scoped_session(self.session_maker) as session:
            # Only include valid columns that are provided in entity_data
            model_data = self.get_model_data(data)

            # Add project_id if applicable and not already provided
            if (
                self.has_project_id
                and self.project_id is not None
                and "project_id" not in model_data
            ):
                model_data["project_id"] = self.project_id

            model = self.Model(**model_data)
            session.add(model)
            await session.flush()

            return_instance = await self.select_by_id(session, model.id)  # pyright: ignore [reportAttributeAccessIssue]
            if return_instance is None:  # pragma: no cover
                logger.error(
                    "Failed to retrieve model after create",
                    model_type=self.Model.__name__,
                    model_id=model.id,  # pyright: ignore
                )
                raise ValueError(
                    f"Can't find {self.Model.__name__} with ID {model.id} after session.add"  # pyright: ignore
                )
            return return_instance

    async def create_all(self, data_list: List[dict]) -> Sequence[T]:
        """Create multiple records in a single transaction."""
        logger.debug(f"Bulk creating {len(data_list)} {self.Model.__name__} instances")

        async with db.scoped_session(self.session_maker) as session:
            # Only include valid columns that are provided in entity_data
            model_list = []
            for d in data_list:
                model_data = self.get_model_data(d)

                # Add project_id if applicable and not already provided
                if (
                    self.has_project_id
                    and self.project_id is not None
                    and "project_id" not in model_data
                ):
                    model_data["project_id"] = self.project_id  # pragma: no cover

                model_list.append(self.Model(**model_data))

            session.add_all(model_list)
            await session.flush()

            return await self.select_by_ids(session, [model.id for model in model_list])  # pyright: ignore [reportAttributeAccessIssue]

    async def update(self, entity_id: int, entity_data: dict | T) -> Optional[T]:
        """Update an entity with the given data."""
        logger.debug(f"Updating {self.Model.__name__} {entity_id} with data: {entity_data}")
        async with db.scoped_session(self.session_maker) as session:
            try:
                result = await session.execute(
                    select(self.Model).filter(self.primary_key == entity_id)
                )
                entity = result.scalars().one()

                if isinstance(entity_data, dict):
                    for key, value in entity_data.items():
                        if key in self.valid_columns:
                            setattr(entity, key, value)

                elif isinstance(entity_data, self.Model):
                    for column in self.Model.__table__.columns.keys():
                        setattr(entity, column, getattr(entity_data, column))

                await session.flush()  # Make sure changes are flushed
                await session.refresh(entity)  # Refresh

                logger.debug(f"Updated {self.Model.__name__}: {entity_id}")
                return await self.select_by_id(session, entity.id)  # pyright: ignore [reportAttributeAccessIssue]

            except NoResultFound:
                logger.debug(f"No {self.Model.__name__} found to update: {entity_id}")
                return None

    async def delete(self, entity_id: int) -> bool:
        """Delete an entity from the database."""
        logger.debug(f"Deleting {self.Model.__name__}: {entity_id}")
        async with db.scoped_session(self.session_maker) as session:
            try:
                result = await session.execute(
                    select(self.Model).filter(self.primary_key == entity_id)
                )
                entity = result.scalars().one()
                await session.delete(entity)

                logger.debug(f"Deleted {self.Model.__name__}: {entity_id}")
                return True
            except NoResultFound:
                logger.debug(f"No {self.Model.__name__} found to delete: {entity_id}")
                return False

    async def delete_by_ids(self, ids: List[int]) -> int:
        """Delete records matching given IDs."""
        logger.debug(f"Deleting {self.Model.__name__} by ids: {ids}")
        async with db.scoped_session(self.session_maker) as session:
            conditions = [self.primary_key.in_(ids)]

            # Add project_id filter if applicable
            if self.has_project_id and self.project_id is not None:  # pragma: no cover
                conditions.append(getattr(self.Model, "project_id") == self.project_id)

            query = delete(self.Model).where(and_(*conditions))
            result = await session.execute(query)
            logger.debug(f"Deleted {result.rowcount} records")
            return result.rowcount

    async def delete_by_fields(self, **filters: Any) -> bool:
        """Delete records matching given field values."""
        logger.debug(f"Deleting {self.Model.__name__} by fields: {filters}")
        async with db.scoped_session(self.session_maker) as session:
            conditions = [getattr(self.Model, field) == value for field, value in filters.items()]

            # Add project_id filter if applicable
            if self.has_project_id and self.project_id is not None:
                conditions.append(getattr(self.Model, "project_id") == self.project_id)

            query = delete(self.Model).where(and_(*conditions))
            result = await session.execute(query)
            deleted = result.rowcount > 0
            logger.debug(f"Deleted {result.rowcount} records")
            return deleted

    async def count(self, query: Executable | None = None) -> int:
        """Count entities in the database table."""
        async with db.scoped_session(self.session_maker) as session:
            if query is None:
                query = select(func.count()).select_from(self.Model)
                # Add project filter if applicable
                if (
                    isinstance(query, Select)
                    and self.has_project_id
                    and self.project_id is not None
                ):
                    query = query.where(
                        getattr(self.Model, "project_id") == self.project_id
                    )  # pragma: no cover

            result = await session.execute(query)
            scalar = result.scalar()
            count = scalar if scalar is not None else 0
            logger.debug(f"Counted {count} {self.Model.__name__} records")
            return count

    async def execute_query(
        self,
        query: Executable,
        params: Optional[Dict[str, Any]] = None,
        use_query_options: bool = True,
    ) -> Result[Any]:
        """Execute a query asynchronously."""

        query = query.options(*self.get_load_options()) if use_query_options else query
        logger.trace(f"Executing query: {query}, params: {params}")
        async with db.scoped_session(self.session_maker) as session:
            result = await session.execute(query, params)
            return result

    def get_load_options(self) -> List[LoaderOption]:
        """Get list of loader options for eager loading relationships.
        Override in subclasses to specify what to load."""
        return []

```

--------------------------------------------------------------------------------
/src/basic_memory/services/file_service.py:
--------------------------------------------------------------------------------

```python
"""Service for file operations with checksum tracking."""

import asyncio
import hashlib
import mimetypes
from os import stat_result
from pathlib import Path
from typing import Any, Dict, Tuple, Union

import aiofiles
import yaml

from basic_memory import file_utils
from basic_memory.file_utils import FileError, ParseError
from basic_memory.markdown.markdown_processor import MarkdownProcessor
from basic_memory.models import Entity as EntityModel
from basic_memory.schemas import Entity as EntitySchema
from basic_memory.services.exceptions import FileOperationError
from basic_memory.utils import FilePath
from loguru import logger


class FileService:
    """Service for handling file operations with concurrency control.

    All paths are handled as Path objects internally. Strings are converted to
    Path objects when passed in. Relative paths are assumed to be relative to
    base_path.

    Features:
    - True async I/O with aiofiles (non-blocking)
    - Built-in concurrency limits (semaphore)
    - Consistent file writing with checksums
    - Frontmatter management
    - Atomic operations
    - Error handling
    """

    def __init__(
        self,
        base_path: Path,
        markdown_processor: MarkdownProcessor,
        max_concurrent_files: int = 10,
    ):
        self.base_path = base_path.resolve()  # Get absolute path
        self.markdown_processor = markdown_processor
        # Semaphore to limit concurrent file operations
        # Prevents OOM on large projects by processing files in batches
        self._file_semaphore = asyncio.Semaphore(max_concurrent_files)

    def get_entity_path(self, entity: Union[EntityModel, EntitySchema]) -> Path:
        """Generate absolute filesystem path for entity.

        Args:
            entity: Entity model or schema with file_path attribute

        Returns:
            Absolute Path to the entity file
        """
        return self.base_path / entity.file_path

    async def read_entity_content(self, entity: EntityModel) -> str:
        """Get entity's content without frontmatter or structured sections.

        Used to index for search. Returns raw content without frontmatter,
        observations, or relations.

        Args:
            entity: Entity to read content for

        Returns:
            Raw content string without metadata sections
        """
        logger.debug(f"Reading entity content, entity_id={entity.id}, permalink={entity.permalink}")

        file_path = self.get_entity_path(entity)
        markdown = await self.markdown_processor.read_file(file_path)
        return markdown.content or ""

    async def delete_entity_file(self, entity: EntityModel) -> None:
        """Delete entity file from filesystem.

        Args:
            entity: Entity model whose file should be deleted

        Raises:
            FileOperationError: If deletion fails
        """
        path = self.get_entity_path(entity)
        await self.delete_file(path)

    async def exists(self, path: FilePath) -> bool:
        """Check if file exists at the provided path.

        If path is relative, it is assumed to be relative to base_path.

        Args:
            path: Path to check (Path or string)

        Returns:
            True if file exists, False otherwise

        Raises:
            FileOperationError: If check fails
        """
        try:
            # Convert string to Path if needed
            path_obj = self.base_path / path if isinstance(path, str) else path
            logger.debug(f"Checking file existence: path={path_obj}")
            if path_obj.is_absolute():
                return path_obj.exists()
            else:
                return (self.base_path / path_obj).exists()
        except Exception as e:
            logger.error("Failed to check file existence", path=str(path), error=str(e))
            raise FileOperationError(f"Failed to check file existence: {e}")

    async def ensure_directory(self, path: FilePath) -> None:
        """Ensure directory exists, creating if necessary.

        Uses semaphore to control concurrency for directory creation operations.

        Args:
            path: Directory path to ensure (Path or string)

        Raises:
            FileOperationError: If directory creation fails
        """
        try:
            # Convert string to Path if needed
            path_obj = self.base_path / path if isinstance(path, str) else path
            full_path = path_obj if path_obj.is_absolute() else self.base_path / path_obj

            # Use semaphore for concurrency control
            async with self._file_semaphore:
                # Run blocking mkdir in thread pool
                loop = asyncio.get_event_loop()
                await loop.run_in_executor(
                    None, lambda: full_path.mkdir(parents=True, exist_ok=True)
                )
        except Exception as e:  # pragma: no cover
            logger.error("Failed to create directory", path=str(path), error=str(e))
            raise FileOperationError(f"Failed to create directory {path}: {e}")

    async def write_file(self, path: FilePath, content: str) -> str:
        """Write content to file and return checksum.

        Handles both absolute and relative paths. Relative paths are resolved
        against base_path.

        Args:
            path: Where to write (Path or string)
            content: Content to write

        Returns:
            Checksum of written content

        Raises:
            FileOperationError: If write fails
        """
        # Convert string to Path if needed
        path_obj = self.base_path / path if isinstance(path, str) else path
        full_path = path_obj if path_obj.is_absolute() else self.base_path / path_obj

        try:
            # Ensure parent directory exists
            await self.ensure_directory(full_path.parent)

            # Write content atomically
            logger.info(
                "Writing file: "
                f"path={path_obj}, "
                f"content_length={len(content)}, "
                f"is_markdown={full_path.suffix.lower() == '.md'}"
            )

            await file_utils.write_file_atomic(full_path, content)

            # Compute and return checksum
            checksum = await file_utils.compute_checksum(content)
            logger.debug(f"File write completed path={full_path}, {checksum=}")
            return checksum

        except Exception as e:
            logger.exception("File write error", path=str(full_path), error=str(e))
            raise FileOperationError(f"Failed to write file: {e}")

    async def read_file_content(self, path: FilePath) -> str:
        """Read file content using true async I/O with aiofiles.

        Handles both absolute and relative paths. Relative paths are resolved
        against base_path.

        Args:
            path: Path to read (Path or string)

        Returns:
            File content as string

        Raises:
            FileOperationError: If read fails
        """
        # Convert string to Path if needed
        path_obj = self.base_path / path if isinstance(path, str) else path
        full_path = path_obj if path_obj.is_absolute() else self.base_path / path_obj

        try:
            logger.debug("Reading file content", operation="read_file_content", path=str(full_path))
            async with aiofiles.open(full_path, mode="r", encoding="utf-8") as f:
                content = await f.read()

            logger.debug(
                "File read completed",
                path=str(full_path),
                content_length=len(content),
            )
            return content

        except Exception as e:
            logger.exception("File read error", path=str(full_path), error=str(e))
            raise FileOperationError(f"Failed to read file: {e}")

    async def read_file(self, path: FilePath) -> Tuple[str, str]:
        """Read file and compute checksum using true async I/O.

        Uses aiofiles for non-blocking file reads.

        Handles both absolute and relative paths. Relative paths are resolved
        against base_path.

        Args:
            path: Path to read (Path or string)

        Returns:
            Tuple of (content, checksum)

        Raises:
            FileOperationError: If read fails
        """
        # Convert string to Path if needed
        path_obj = self.base_path / path if isinstance(path, str) else path
        full_path = path_obj if path_obj.is_absolute() else self.base_path / path_obj

        try:
            logger.debug("Reading file", operation="read_file", path=str(full_path))

            # Use aiofiles for non-blocking read
            async with aiofiles.open(full_path, mode="r", encoding="utf-8") as f:
                content = await f.read()

            checksum = await file_utils.compute_checksum(content)

            logger.debug(
                "File read completed",
                path=str(full_path),
                checksum=checksum,
                content_length=len(content),
            )
            return content, checksum

        except Exception as e:
            logger.exception("File read error", path=str(full_path), error=str(e))
            raise FileOperationError(f"Failed to read file: {e}")

    async def delete_file(self, path: FilePath) -> None:
        """Delete file if it exists.

        Handles both absolute and relative paths. Relative paths are resolved
        against base_path.

        Args:
            path: Path to delete (Path or string)
        """
        # Convert string to Path if needed
        path_obj = self.base_path / path if isinstance(path, str) else path
        full_path = path_obj if path_obj.is_absolute() else self.base_path / path_obj
        full_path.unlink(missing_ok=True)

    async def update_frontmatter(self, path: FilePath, updates: Dict[str, Any]) -> str:
        """Update frontmatter fields in a file while preserving all content.

        Only modifies the frontmatter section, leaving all content untouched.
        Creates frontmatter section if none exists.
        Returns checksum of updated file.

        Uses aiofiles for true async I/O (non-blocking).

        Args:
            path: Path to markdown file (Path or string)
            updates: Dict of frontmatter fields to update

        Returns:
            Checksum of updated file

        Raises:
            FileOperationError: If file operations fail
            ParseError: If frontmatter parsing fails
        """
        # Convert string to Path if needed
        path_obj = self.base_path / path if isinstance(path, str) else path
        full_path = path_obj if path_obj.is_absolute() else self.base_path / path_obj

        try:
            # Read current content using aiofiles
            async with aiofiles.open(full_path, mode="r", encoding="utf-8") as f:
                content = await f.read()

            # Parse current frontmatter with proper error handling for malformed YAML
            current_fm = {}
            if file_utils.has_frontmatter(content):
                try:
                    current_fm = file_utils.parse_frontmatter(content)
                    content = file_utils.remove_frontmatter(content)
                except (ParseError, yaml.YAMLError) as e:
                    # Log warning and treat as plain markdown without frontmatter
                    logger.warning(
                        f"Failed to parse YAML frontmatter in {full_path}: {e}. "
                        "Treating file as plain markdown without frontmatter."
                    )
                    # Keep full content, treat as having no frontmatter
                    current_fm = {}

            # Update frontmatter
            new_fm = {**current_fm, **updates}

            # Write new file with updated frontmatter
            yaml_fm = yaml.dump(new_fm, sort_keys=False, allow_unicode=True)
            final_content = f"---\n{yaml_fm}---\n\n{content.strip()}"

            logger.debug(
                "Updating frontmatter", path=str(full_path), update_keys=list(updates.keys())
            )

            await file_utils.write_file_atomic(full_path, final_content)
            return await file_utils.compute_checksum(final_content)

        except Exception as e:
            # Only log real errors (not YAML parsing, which is handled above)
            if not isinstance(e, (ParseError, yaml.YAMLError)):
                logger.error(
                    "Failed to update frontmatter",
                    path=str(full_path),
                    error=str(e),
                )
            raise FileOperationError(f"Failed to update frontmatter: {e}")

    async def compute_checksum(self, path: FilePath) -> str:
        """Compute checksum for a file using true async I/O.

        Uses aiofiles for non-blocking I/O with 64KB chunked reading.
        Semaphore limits concurrent file operations to prevent OOM.
        Memory usage is constant regardless of file size.

        Args:
            path: Path to the file (Path or string)

        Returns:
            SHA256 checksum hex string

        Raises:
            FileError: If checksum computation fails
        """
        # Convert string to Path if needed
        path_obj = self.base_path / path if isinstance(path, str) else path
        full_path = path_obj if path_obj.is_absolute() else self.base_path / path_obj

        # Semaphore controls concurrency - max N files processed at once
        async with self._file_semaphore:
            try:
                hasher = hashlib.sha256()
                chunk_size = 65536  # 64KB chunks

                # async I/O with aiofiles
                async with aiofiles.open(full_path, mode="rb") as f:
                    while chunk := await f.read(chunk_size):
                        hasher.update(chunk)

                return hasher.hexdigest()

            except Exception as e:  # pragma: no cover
                logger.error("Failed to compute checksum", path=str(full_path), error=str(e))
                raise FileError(f"Failed to compute checksum for {path}: {e}")

    def file_stats(self, path: FilePath) -> stat_result:
        """Return file stats for a given path.

        Args:
            path: Path to the file (Path or string)

        Returns:
            File statistics
        """
        # Convert string to Path if needed
        path_obj = self.base_path / path if isinstance(path, str) else path
        full_path = path_obj if path_obj.is_absolute() else self.base_path / path_obj
        # get file timestamps
        return full_path.stat()

    def content_type(self, path: FilePath) -> str:
        """Return content_type for a given path.

        Args:
            path: Path to the file (Path or string)

        Returns:
            MIME type of the file
        """
        # Convert string to Path if needed
        path_obj = self.base_path / path if isinstance(path, str) else path
        full_path = path_obj if path_obj.is_absolute() else self.base_path / path_obj
        # get file timestamps
        mime_type, _ = mimetypes.guess_type(full_path.name)

        # .canvas files are json
        if full_path.suffix == ".canvas":
            mime_type = "application/json"

        content_type = mime_type or "text/plain"
        return content_type

    def is_markdown(self, path: FilePath) -> bool:
        """Check if a file is a markdown file.

        Args:
            path: Path to the file (Path or string)

        Returns:
            True if the file is a markdown file, False otherwise
        """
        return self.content_type(path) == "text/markdown"

```

--------------------------------------------------------------------------------
/src/basic_memory/services/context_service.py:
--------------------------------------------------------------------------------

```python
"""Service for building rich context from the knowledge graph."""

from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import List, Optional, Tuple

from loguru import logger
from sqlalchemy import text

from basic_memory.repository.entity_repository import EntityRepository
from basic_memory.repository.observation_repository import ObservationRepository
from basic_memory.repository.search_repository import SearchRepository, SearchIndexRow
from basic_memory.schemas.memory import MemoryUrl, memory_url_path
from basic_memory.schemas.search import SearchItemType
from basic_memory.utils import generate_permalink


@dataclass
class ContextResultRow:
    type: str
    id: int
    title: str
    permalink: str
    file_path: str
    depth: int
    root_id: int
    created_at: datetime
    from_id: Optional[int] = None
    to_id: Optional[int] = None
    relation_type: Optional[str] = None
    content: Optional[str] = None
    category: Optional[str] = None
    entity_id: Optional[int] = None


@dataclass
class ContextResultItem:
    """A hierarchical result containing a primary item with its observations and related items."""

    primary_result: ContextResultRow | SearchIndexRow
    observations: List[ContextResultRow] = field(default_factory=list)
    related_results: List[ContextResultRow] = field(default_factory=list)


@dataclass
class ContextMetadata:
    """Metadata about a context result."""

    uri: Optional[str] = None
    types: Optional[List[SearchItemType]] = None
    depth: int = 1
    timeframe: Optional[str] = None
    generated_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
    primary_count: int = 0
    related_count: int = 0
    total_observations: int = 0
    total_relations: int = 0


@dataclass
class ContextResult:
    """Complete context result with metadata."""

    results: List[ContextResultItem] = field(default_factory=list)
    metadata: ContextMetadata = field(default_factory=ContextMetadata)


class ContextService:
    """Service for building rich context from memory:// URIs.

    Handles three types of context building:
    1. Direct permalink lookup - exact match on path
    2. Pattern matching - using * wildcards
    3. Special modes via params (e.g., 'related')
    """

    def __init__(
        self,
        search_repository: SearchRepository,
        entity_repository: EntityRepository,
        observation_repository: ObservationRepository,
    ):
        self.search_repository = search_repository
        self.entity_repository = entity_repository
        self.observation_repository = observation_repository

    async def build_context(
        self,
        memory_url: Optional[MemoryUrl] = None,
        types: Optional[List[SearchItemType]] = None,
        depth: int = 1,
        since: Optional[datetime] = None,
        limit=10,
        offset=0,
        max_related: int = 10,
        include_observations: bool = True,
    ) -> ContextResult:
        """Build rich context from a memory:// URI."""
        logger.debug(
            f"Building context for URI: '{memory_url}' depth: '{depth}' since: '{since}' limit: '{limit}' offset: '{offset}'  max_related: '{max_related}'"
        )

        normalized_path: Optional[str] = None
        if memory_url:
            path = memory_url_path(memory_url)
            # Check for wildcards before normalization
            has_wildcard = "*" in path

            if has_wildcard:
                # For wildcard patterns, normalize each segment separately to preserve the *
                parts = path.split("*")
                normalized_parts = [
                    generate_permalink(part, split_extension=False) if part else ""
                    for part in parts
                ]
                normalized_path = "*".join(normalized_parts)
                logger.debug(f"Pattern search for '{normalized_path}'")
                primary = await self.search_repository.search(
                    permalink_match=normalized_path, limit=limit, offset=offset
                )
            else:
                # For exact paths, normalize the whole thing
                normalized_path = generate_permalink(path, split_extension=False)
                logger.debug(f"Direct lookup for '{normalized_path}'")
                primary = await self.search_repository.search(
                    permalink=normalized_path, limit=limit, offset=offset
                )
        else:
            logger.debug(f"Build context for '{types}'")
            primary = await self.search_repository.search(
                search_item_types=types, after_date=since, limit=limit, offset=offset
            )

        # Get type_id pairs for traversal

        type_id_pairs = [(r.type, r.id) for r in primary] if primary else []
        logger.debug(f"found primary type_id_pairs: {len(type_id_pairs)}")

        # Find related content
        related = await self.find_related(
            type_id_pairs, max_depth=depth, since=since, max_results=max_related
        )
        logger.debug(f"Found {len(related)} related results")

        # Collect entity IDs from primary and related results
        entity_ids = []
        for result in primary:
            if result.type == SearchItemType.ENTITY.value:
                entity_ids.append(result.id)

        for result in related:
            if result.type == SearchItemType.ENTITY.value:
                entity_ids.append(result.id)

        # Fetch observations for all entities if requested
        observations_by_entity = {}
        if include_observations and entity_ids:
            # Use our observation repository to get observations for all entities at once
            observations_by_entity = await self.observation_repository.find_by_entities(entity_ids)
            logger.debug(f"Found observations for {len(observations_by_entity)} entities")

        # Create metadata dataclass
        metadata = ContextMetadata(
            uri=normalized_path if memory_url else None,
            types=types,
            depth=depth,
            timeframe=since.isoformat() if since else None,
            primary_count=len(primary),
            related_count=len(related),
            total_observations=sum(len(obs) for obs in observations_by_entity.values()),
            total_relations=sum(1 for r in related if r.type == SearchItemType.RELATION),
        )

        # Build context results list directly with ContextResultItem objects
        context_results = []

        # For each primary result
        for primary_item in primary:
            # Find all related items with this primary item as root
            related_to_primary = [r for r in related if r.root_id == primary_item.id]

            # Get observations for this item if it's an entity
            item_observations = []
            if primary_item.type == SearchItemType.ENTITY.value and include_observations:
                # Convert Observation models to ContextResultRows
                for obs in observations_by_entity.get(primary_item.id, []):
                    item_observations.append(
                        ContextResultRow(
                            type="observation",
                            id=obs.id,
                            title=f"{obs.category}: {obs.content[:50]}...",
                            permalink=generate_permalink(
                                f"{primary_item.permalink}/observations/{obs.category}/{obs.content}"
                            ),
                            file_path=primary_item.file_path,
                            content=obs.content,
                            category=obs.category,
                            entity_id=primary_item.id,
                            depth=0,
                            root_id=primary_item.id,
                            created_at=primary_item.created_at,  # created_at time from entity
                        )
                    )

            # Create ContextResultItem directly
            context_item = ContextResultItem(
                primary_result=primary_item,
                observations=item_observations,
                related_results=related_to_primary,
            )

            context_results.append(context_item)

        # Return the structured ContextResult
        return ContextResult(results=context_results, metadata=metadata)

    async def find_related(
        self,
        type_id_pairs: List[Tuple[str, int]],
        max_depth: int = 1,
        since: Optional[datetime] = None,
        max_results: int = 10,
    ) -> List[ContextResultRow]:
        """Find items connected through relations.

        Uses recursive CTE to find:
        - Connected entities
        - Relations that connect them

        Note on depth:
        Each traversal step requires two depth levels - one to find the relation,
        and another to follow that relation to an entity. So a max_depth of 4 allows
        traversal through two entities (relation->entity->relation->entity), while reaching
        an entity three steps away requires max_depth=6 (relation->entity->relation->entity->relation->entity).
        """
        max_depth = max_depth * 2

        if not type_id_pairs:
            return []

        # Extract entity IDs from type_id_pairs for the optimized query
        entity_ids = [i for t, i in type_id_pairs if t == "entity"]

        if not entity_ids:
            logger.debug("No entity IDs found in type_id_pairs")
            return []

        logger.debug(
            f"Finding connected items for {len(entity_ids)} entities with depth {max_depth}"
        )

        # Build the VALUES clause for entity IDs
        entity_id_values = ", ".join([str(i) for i in entity_ids])

        # For compatibility with the old query, we still need this for filtering
        values = ", ".join([f"('{t}', {i})" for t, i in type_id_pairs])

        # Parameters for bindings - include project_id for security filtering
        params = {
            "max_depth": max_depth,
            "max_results": max_results,
            "project_id": self.search_repository.project_id,
        }

        # Build date and timeframe filters conditionally based on since parameter
        if since:
            params["since_date"] = since.isoformat()  # pyright: ignore
            date_filter = "AND e.created_at >= :since_date"
            relation_date_filter = "AND e_from.created_at >= :since_date"
            timeframe_condition = "AND eg.relation_date >= :since_date"
        else:
            date_filter = ""
            relation_date_filter = ""
            timeframe_condition = ""

        # Add project filtering for security - ensure all entities and relations belong to the same project
        project_filter = "AND e.project_id = :project_id"
        relation_project_filter = "AND e_from.project_id = :project_id"

        # Use a CTE that operates directly on entity and relation tables
        # This avoids the overhead of the search_index virtual table
        query = text(f"""
        WITH RECURSIVE entity_graph AS (
            -- Base case: seed entities
            SELECT 
                e.id,
                'entity' as type,
                e.title, 
                e.permalink,
                e.file_path,
                NULL as from_id,
                NULL as to_id,
                NULL as relation_type,
                NULL as content,
                NULL as category,
                NULL as entity_id,
                0 as depth,
                e.id as root_id,
                e.created_at,
                e.created_at as relation_date,
                0 as is_incoming
            FROM entity e
            WHERE e.id IN ({entity_id_values})
            {date_filter}
            {project_filter}

            UNION ALL

            -- Get relations from current entities
            SELECT
                r.id,
                'relation' as type,
                r.relation_type || ': ' || r.to_name as title,
                -- Relation model doesn't have permalink column - we'll generate it at runtime
                '' as permalink,
                e_from.file_path,
                r.from_id,
                r.to_id,
                r.relation_type,
                NULL as content,
                NULL as category,
                NULL as entity_id,
                eg.depth + 1,
                eg.root_id,
                e_from.created_at, -- Use the from_entity's created_at since relation has no timestamp
                e_from.created_at as relation_date,
                CASE WHEN r.from_id = eg.id THEN 0 ELSE 1 END as is_incoming
            FROM entity_graph eg
            JOIN relation r ON (
                eg.type = 'entity' AND
                (r.from_id = eg.id OR r.to_id = eg.id)
            )
            JOIN entity e_from ON (
                r.from_id = e_from.id
                {relation_date_filter}
                {relation_project_filter}
            )
            LEFT JOIN entity e_to ON (r.to_id = e_to.id)
            WHERE eg.depth < :max_depth
            -- Ensure to_entity (if exists) also belongs to same project
            AND (r.to_id IS NULL OR e_to.project_id = :project_id)

            UNION ALL

            -- Get entities connected by relations
            SELECT
                e.id,
                'entity' as type,
                e.title,
                CASE 
                    WHEN e.permalink IS NULL THEN '' 
                    ELSE e.permalink 
                END as permalink,
                e.file_path,
                NULL as from_id,
                NULL as to_id,
                NULL as relation_type,
                NULL as content,
                NULL as category,
                NULL as entity_id,
                eg.depth + 1,
                eg.root_id,
                e.created_at,
                eg.relation_date,
                eg.is_incoming
            FROM entity_graph eg
            JOIN entity e ON (
                eg.type = 'relation' AND
                e.id = CASE 
                    WHEN eg.is_incoming = 0 THEN eg.to_id
                    ELSE eg.from_id
                END
                {date_filter}
                {project_filter}
            )
            WHERE eg.depth < :max_depth
            -- Only include entities connected by relations within timeframe if specified
            {timeframe_condition}
        )
        SELECT DISTINCT 
            type,
            id,
            title,
            permalink,
            file_path,
            from_id,
            to_id,
            relation_type,
            content,
            category,
            entity_id,
            MIN(depth) as depth,
            root_id,
            created_at
        FROM entity_graph
        WHERE (type, id) NOT IN ({values})
        GROUP BY
            type, id
        ORDER BY depth, type, id
        LIMIT :max_results
       """)

        result = await self.search_repository.execute_query(query, params=params)
        rows = result.all()

        context_rows = [
            ContextResultRow(
                type=row.type,
                id=row.id,
                title=row.title,
                permalink=row.permalink,
                file_path=row.file_path,
                from_id=row.from_id,
                to_id=row.to_id,
                relation_type=row.relation_type,
                content=row.content,
                category=row.category,
                entity_id=row.entity_id,
                depth=row.depth,
                root_id=row.root_id,
                created_at=row.created_at,
            )
            for row in rows
        ]
        return context_rows

```

--------------------------------------------------------------------------------
/tests/mcp/test_tool_edit_note.py:
--------------------------------------------------------------------------------

```python
"""Tests for the edit_note MCP tool."""

import pytest

from basic_memory.mcp.tools.edit_note import edit_note
from basic_memory.mcp.tools.write_note import write_note


@pytest.mark.asyncio
async def test_edit_note_append_operation(client, test_project):
    """Test appending content to an existing note."""
    # Create initial note
    await write_note.fn(
        project=test_project.name,
        title="Test Note",
        folder="test",
        content="# Test Note\nOriginal content here.",
    )

    # Append content
    result = await edit_note.fn(
        project=test_project.name,
        identifier="test/test-note",
        operation="append",
        content="\n## New Section\nAppended content here.",
    )

    assert isinstance(result, str)
    assert "Edited note (append)" in result
    assert f"project: {test_project.name}" in result
    assert "file_path: test/Test Note.md" in result
    assert "permalink: test/test-note" in result
    assert "Added 3 lines to end of note" in result
    assert f"[Session: Using project '{test_project.name}']" in result


@pytest.mark.asyncio
async def test_edit_note_prepend_operation(client, test_project):
    """Test prepending content to an existing note."""
    # Create initial note
    await write_note.fn(
        project=test_project.name,
        title="Meeting Notes",
        folder="meetings",
        content="# Meeting Notes\nExisting content.",
    )

    # Prepend content
    result = await edit_note.fn(
        project=test_project.name,
        identifier="meetings/meeting-notes",
        operation="prepend",
        content="## 2025-05-25 Update\nNew meeting notes.\n",
    )

    assert isinstance(result, str)
    assert "Edited note (prepend)" in result
    assert f"project: {test_project.name}" in result
    assert "file_path: meetings/Meeting Notes.md" in result
    assert "permalink: meetings/meeting-notes" in result
    assert "Added 3 lines to beginning of note" in result
    assert f"[Session: Using project '{test_project.name}']" in result


@pytest.mark.asyncio
async def test_edit_note_find_replace_operation(client, test_project):
    """Test find and replace operation."""
    # Create initial note with version info
    await write_note.fn(
        project=test_project.name,
        title="Config Document",
        folder="config",
        content="# Configuration\nVersion: v0.12.0\nSettings for v0.12.0 release.",
    )

    # Replace version - expecting 2 replacements
    result = await edit_note.fn(
        project=test_project.name,
        identifier="config/config-document",
        operation="find_replace",
        content="v0.13.0",
        find_text="v0.12.0",
        expected_replacements=2,
    )

    assert isinstance(result, str)
    assert "Edited note (find_replace)" in result
    assert f"project: {test_project.name}" in result
    assert "file_path: config/Config Document.md" in result
    assert "operation: Find and replace operation completed" in result
    assert f"[Session: Using project '{test_project.name}']" in result


@pytest.mark.asyncio
async def test_edit_note_replace_section_operation(client, test_project):
    """Test replacing content under a specific section."""
    # Create initial note with sections
    await write_note.fn(
        project=test_project.name,
        title="API Specification",
        folder="specs",
        content="# API Spec\n\n## Overview\nAPI overview here.\n\n## Implementation\nOld implementation details.\n\n## Testing\nTest info here.",
    )

    # Replace implementation section
    result = await edit_note.fn(
        project=test_project.name,
        identifier="specs/api-specification",
        operation="replace_section",
        content="New implementation approach using FastAPI.\nImproved error handling.\n",
        section="## Implementation",
    )

    assert isinstance(result, str)
    assert "Edited note (replace_section)" in result
    assert f"project: {test_project.name}" in result
    assert "file_path: specs/API Specification.md" in result
    assert "Replaced content under section '## Implementation'" in result
    assert f"[Session: Using project '{test_project.name}']" in result


@pytest.mark.asyncio
async def test_edit_note_nonexistent_note(client, test_project):
    """Test editing a note that doesn't exist - should return helpful guidance."""
    result = await edit_note.fn(
        project=test_project.name,
        identifier="nonexistent/note",
        operation="append",
        content="Some content",
    )

    assert isinstance(result, str)
    assert "# Edit Failed" in result
    assert "search_notes" in result  # Should suggest searching
    assert "read_note" in result  # Should suggest reading to verify


@pytest.mark.asyncio
async def test_edit_note_invalid_operation(client, test_project):
    """Test using an invalid operation."""
    # Create a note first
    await write_note.fn(
        project=test_project.name,
        title="Test Note",
        folder="test",
        content="# Test\nContent here.",
    )

    with pytest.raises(ValueError) as exc_info:
        await edit_note.fn(
            project=test_project.name,
            identifier="test/test-note",
            operation="invalid_op",
            content="Some content",
        )

    assert "Invalid operation 'invalid_op'" in str(exc_info.value)


@pytest.mark.asyncio
async def test_edit_note_find_replace_missing_find_text(client, test_project):
    """Test find_replace operation without find_text parameter."""
    # Create a note first
    await write_note.fn(
        project=test_project.name,
        title="Test Note",
        folder="test",
        content="# Test\nContent here.",
    )

    with pytest.raises(ValueError) as exc_info:
        await edit_note.fn(
            project=test_project.name,
            identifier="test/test-note",
            operation="find_replace",
            content="replacement",
        )

    assert "find_text parameter is required for find_replace operation" in str(exc_info.value)


@pytest.mark.asyncio
async def test_edit_note_replace_section_missing_section(client, test_project):
    """Test replace_section operation without section parameter."""
    # Create a note first
    await write_note.fn(
        project=test_project.name,
        title="Test Note",
        folder="test",
        content="# Test\nContent here.",
    )

    with pytest.raises(ValueError) as exc_info:
        await edit_note.fn(
            project=test_project.name,
            identifier="test/test-note",
            operation="replace_section",
            content="new content",
        )

    assert "section parameter is required for replace_section operation" in str(exc_info.value)


@pytest.mark.asyncio
async def test_edit_note_replace_section_nonexistent_section(client, test_project):
    """Test replacing a section that doesn't exist - should append it."""
    # Create initial note without the target section
    await write_note.fn(
        project=test_project.name,
        title="Document",
        folder="docs",
        content="# Document\n\n## Existing Section\nSome content here.",
    )

    # Try to replace non-existent section
    result = await edit_note.fn(
        project=test_project.name,
        identifier="docs/document",
        operation="replace_section",
        content="New section content here.\n",
        section="## New Section",
    )

    assert isinstance(result, str)
    assert "Edited note (replace_section)" in result
    assert f"project: {test_project.name}" in result
    assert "file_path: docs/Document.md" in result
    assert f"[Session: Using project '{test_project.name}']" in result
    # Should succeed - the section gets appended if it doesn't exist


@pytest.mark.asyncio
async def test_edit_note_with_observations_and_relations(client, test_project):
    """Test editing a note that contains observations and relations."""
    # Create note with semantic content
    await write_note.fn(
        project=test_project.name,
        title="Feature Spec",
        folder="features",
        content="# Feature Spec\n\n- [design] Initial design thoughts #architecture\n- implements [[Base System]]\n\nOriginal content.",
    )

    # Append more semantic content
    result = await edit_note.fn(
        project=test_project.name,
        identifier="features/feature-spec",
        operation="append",
        content="\n## Updates\n\n- [implementation] Added new feature #development\n- relates_to [[User Guide]]",
    )

    assert isinstance(result, str)
    assert "Edited note (append)" in result
    assert "## Observations" in result
    assert "## Relations" in result


@pytest.mark.asyncio
async def test_edit_note_identifier_variations(client, test_project):
    """Test that various identifier formats work."""
    # Create a note
    await write_note.fn(
        project=test_project.name,
        title="Test Document",
        folder="docs",
        content="# Test Document\nOriginal content.",
    )

    # Test different identifier formats
    identifiers_to_test = [
        "docs/test-document",  # permalink
        "Test Document",  # title
        "docs/Test Document",  # folder/title
    ]

    for identifier in identifiers_to_test:
        result = await edit_note.fn(
            project=test_project.name,
            identifier=identifier,
            operation="append",
            content=f"\n## Update via {identifier}",
        )

        assert isinstance(result, str)
        assert "Edited note (append)" in result
        assert f"project: {test_project.name}" in result
        assert "file_path: docs/Test Document.md" in result


@pytest.mark.asyncio
async def test_edit_note_find_replace_no_matches(client, test_project):
    """Test find_replace when the find_text doesn't exist - should return error."""
    # Create initial note
    await write_note.fn(
        project=test_project.name,
        title="Test Note",
        folder="test",
        content="# Test Note\nSome content here.",
    )

    # Try to replace text that doesn't exist - should fail with default expected_replacements=1
    result = await edit_note.fn(
        project=test_project.name,
        identifier="test/test-note",
        operation="find_replace",
        content="replacement",
        find_text="nonexistent_text",
    )

    assert isinstance(result, str)
    assert "# Edit Failed - Text Not Found" in result
    assert "read_note" in result  # Should suggest reading the note first
    assert "Alternative approaches" in result  # Should suggest alternatives


@pytest.mark.asyncio
async def test_edit_note_empty_content_operations(client, test_project):
    """Test operations with empty content."""
    # Create initial note
    await write_note.fn(
        project=test_project.name,
        title="Test Note",
        folder="test",
        content="# Test Note\nOriginal content.",
    )

    # Test append with empty content
    result = await edit_note.fn(
        project=test_project.name, identifier="test/test-note", operation="append", content=""
    )

    assert isinstance(result, str)
    assert "Edited note (append)" in result
    # Should still work, just adding empty content


@pytest.mark.asyncio
async def test_edit_note_find_replace_wrong_count(client, test_project):
    """Test find_replace when replacement count doesn't match expected."""
    # Create initial note with version info
    await write_note.fn(
        project=test_project.name,
        title="Config Document",
        folder="config",
        content="# Configuration\nVersion: v0.12.0\nSettings for v0.12.0 release.",
    )

    # Try to replace expecting 1 occurrence, but there are actually 2
    result = await edit_note.fn(
        project=test_project.name,
        identifier="config/config-document",
        operation="find_replace",
        content="v0.13.0",
        find_text="v0.12.0",
        expected_replacements=1,  # Wrong! There are actually 2 occurrences
    )

    assert isinstance(result, str)
    assert "# Edit Failed - Wrong Replacement Count" in result
    assert "Expected 1 occurrences" in result
    assert "but found 2" in result
    assert "Update expected_replacements" in result  # Should suggest the fix
    assert "expected_replacements=2" in result  # Should suggest the exact fix


@pytest.mark.asyncio
async def test_edit_note_replace_section_multiple_sections(client, test_project):
    """Test replace_section with multiple sections having same header - should return helpful error."""
    # Create note with duplicate section headers
    await write_note.fn(
        project=test_project.name,
        title="Sample Note",
        folder="docs",
        content="# Main Title\n\n## Section 1\nFirst instance\n\n## Section 2\nSome content\n\n## Section 1\nSecond instance",
    )

    # Try to replace section when multiple exist
    result = await edit_note.fn(
        project=test_project.name,
        identifier="docs/sample-note",
        operation="replace_section",
        content="New content",
        section="## Section 1",
    )

    assert isinstance(result, str)
    assert "# Edit Failed - Duplicate Section Headers" in result
    assert "Multiple sections found" in result
    assert "read_note" in result  # Should suggest reading the note first
    assert "Make headers unique" in result  # Should suggest making headers unique


@pytest.mark.asyncio
async def test_edit_note_find_replace_empty_find_text(client, test_project):
    """Test find_replace with empty/whitespace find_text - should return helpful error."""
    # Create initial note
    await write_note.fn(
        project=test_project.name,
        title="Test Note",
        folder="test",
        content="# Test Note\nSome content here.",
    )

    # Try with whitespace-only find_text - this should be caught by service validation
    result = await edit_note.fn(
        project=test_project.name,
        identifier="test/test-note",
        operation="find_replace",
        content="replacement",
        find_text="   ",  # whitespace only
    )

    assert isinstance(result, str)
    assert "# Edit Failed" in result
    # Should contain helpful guidance about the error


@pytest.mark.asyncio
async def test_edit_note_preserves_permalink_when_frontmatter_missing(client, test_project):
    """Test that editing a note preserves the permalink when frontmatter doesn't contain one.

    This is a regression test for issue #170 where edit_note would fail with a validation error
    because the permalink was being set to None when the markdown file didn't have a permalink
    in its frontmatter.
    """
    # Create initial note
    await write_note.fn(
        project=test_project.name,
        title="Test Note",
        folder="test",
        content="# Test Note\nOriginal content here.",
    )

    # Verify the note was created with a permalink
    first_result = await edit_note.fn(
        project=test_project.name,
        identifier="test/test-note",
        operation="append",
        content="\nFirst edit.",
    )

    assert isinstance(first_result, str)
    assert "permalink: test/test-note" in first_result

    # Perform another edit - this should preserve the permalink even if the
    # file doesn't have a permalink in its frontmatter
    second_result = await edit_note.fn(
        project=test_project.name,
        identifier="test/test-note",
        operation="append",
        content="\nSecond edit.",
    )

    assert isinstance(second_result, str)
    assert "Edited note (append)" in second_result
    assert f"project: {test_project.name}" in second_result
    assert "permalink: test/test-note" in second_result
    assert f"[Session: Using project '{test_project.name}']" in second_result
    # The edit should succeed without validation errors

```

--------------------------------------------------------------------------------
/test-int/mcp/test_search_integration.py:
--------------------------------------------------------------------------------

```python
"""
Integration tests for search_notes MCP tool.

Comprehensive tests covering search functionality using the complete
MCP client-server flow with real databases.
"""

import pytest
from fastmcp import Client


@pytest.mark.asyncio
async def test_search_basic_text_search(mcp_server, app, test_project):
    """Test basic text search functionality."""

    async with Client(mcp_server) as client:
        # Create test notes for searching
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Python Programming Guide",
                "folder": "docs",
                "content": "# Python Programming Guide\n\nThis guide covers Python basics and advanced topics.",
                "tags": "python,programming",
            },
        )

        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Flask Web Development",
                "folder": "docs",
                "content": "# Flask Web Development\n\nBuilding web applications with Python Flask framework.",
                "tags": "python,flask,web",
            },
        )

        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "JavaScript Basics",
                "folder": "docs",
                "content": "# JavaScript Basics\n\nIntroduction to JavaScript programming language.",
                "tags": "javascript,programming",
            },
        )

        # Search for Python-related content
        search_result = await client.call_tool(
            "search_notes",
            {
                "project": test_project.name,
                "query": "Python",
            },
        )

        assert len(search_result.content) == 1
        assert search_result.content[0].type == "text"

        # Parse the response (it should be a SearchResponse)
        result_text = search_result.content[0].text
        assert "Python Programming Guide" in result_text
        assert "Flask Web Development" in result_text
        assert "JavaScript Basics" not in result_text


@pytest.mark.asyncio
async def test_search_boolean_operators(mcp_server, app, test_project):
    """Test boolean search operators (AND, OR, NOT)."""

    async with Client(mcp_server) as client:
        # Create test notes
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Python Flask Tutorial",
                "folder": "tutorials",
                "content": "# Python Flask Tutorial\n\nLearn Python web development with Flask.",
                "tags": "python,flask,tutorial",
            },
        )

        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Python Django Guide",
                "folder": "tutorials",
                "content": "# Python Django Guide\n\nBuilding web apps with Python Django framework.",
                "tags": "python,django,web",
            },
        )

        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "React JavaScript",
                "folder": "tutorials",
                "content": "# React JavaScript\n\nBuilding frontend applications with React.",
                "tags": "javascript,react,frontend",
            },
        )

        # Test AND operator
        search_result = await client.call_tool(
            "search_notes",
            {
                "project": test_project.name,
                "query": "Python AND Flask",
            },
        )

        result_text = search_result.content[0].text
        assert "Python Flask Tutorial" in result_text
        assert "Python Django Guide" not in result_text
        assert "React JavaScript" not in result_text

        # Test OR operator
        search_result = await client.call_tool(
            "search_notes",
            {
                "project": test_project.name,
                "query": "Flask OR Django",
            },
        )

        result_text = search_result.content[0].text
        assert "Python Flask Tutorial" in result_text
        assert "Python Django Guide" in result_text
        assert "React JavaScript" not in result_text

        # Test NOT operator
        search_result = await client.call_tool(
            "search_notes",
            {
                "project": test_project.name,
                "query": "Python NOT Django",
            },
        )

        result_text = search_result.content[0].text
        assert "Python Flask Tutorial" in result_text
        assert "Python Django Guide" not in result_text


@pytest.mark.asyncio
async def test_search_title_only(mcp_server, app, test_project):
    """Test searching in titles only."""

    async with Client(mcp_server) as client:
        # Create test notes
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Database Design",
                "folder": "docs",
                "content": "# Database Design\n\nThis covers SQL and database concepts.",
                "tags": "database,sql",
            },
        )

        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Web Development",
                "folder": "docs",
                "content": "# Web Development\n\nDatabase integration in web applications.",
                "tags": "web,development",
            },
        )

        # Search for "database" in titles only
        search_result = await client.call_tool(
            "search_notes",
            {
                "project": test_project.name,
                "query": "Database",
                "search_type": "title",
            },
        )

        result_text = search_result.content[0].text
        assert "Database Design" in result_text
        assert "Web Development" not in result_text  # Has "database" in content but not title


@pytest.mark.asyncio
async def test_search_permalink_exact(mcp_server, app, test_project):
    """Test exact permalink search."""

    async with Client(mcp_server) as client:
        # Create test notes
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "API Documentation",
                "folder": "api",
                "content": "# API Documentation\n\nComplete API reference guide.",
                "tags": "api,docs",
            },
        )

        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "API Testing",
                "folder": "testing",
                "content": "# API Testing\n\nHow to test REST APIs.",
                "tags": "api,testing",
            },
        )

        # Search for exact permalink
        search_result = await client.call_tool(
            "search_notes",
            {
                "project": test_project.name,
                "query": "api/api-documentation",
                "search_type": "permalink",
            },
        )

        result_text = search_result.content[0].text
        assert "API Documentation" in result_text
        assert "API Testing" not in result_text


@pytest.mark.asyncio
async def test_search_permalink_pattern(mcp_server, app, test_project):
    """Test permalink pattern search with wildcards."""

    async with Client(mcp_server) as client:
        # Create test notes in different folders
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Meeting Notes January",
                "folder": "meetings",
                "content": "# Meeting Notes January\n\nJanuary team meeting notes.",
                "tags": "meetings,january",
            },
        )

        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Meeting Notes February",
                "folder": "meetings",
                "content": "# Meeting Notes February\n\nFebruary team meeting notes.",
                "tags": "meetings,february",
            },
        )

        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Project Notes",
                "folder": "projects",
                "content": "# Project Notes\n\nGeneral project documentation.",
                "tags": "projects,notes",
            },
        )

        # Search for all meeting notes using pattern
        search_result = await client.call_tool(
            "search_notes",
            {
                "project": test_project.name,
                "query": "meetings/*",
                "search_type": "permalink",
            },
        )

        result_text = search_result.content[0].text
        assert "Meeting Notes January" in result_text
        assert "Meeting Notes February" in result_text
        assert "Project Notes" not in result_text


@pytest.mark.asyncio
async def test_search_entity_type_filter(mcp_server, app, test_project):
    """Test filtering search results by entity type."""

    async with Client(mcp_server) as client:
        # Create a note with observations and relations
        content_with_observations = """# Development Process

This describes our development workflow.

## Observations
- [process] We use Git for version control
- [tool] We use VS Code as our editor

## Relations
- uses [[Git]]
- part_of [[Development Workflow]]

Regular content about development practices."""

        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Development Process",
                "folder": "processes",
                "content": content_with_observations,
                "tags": "development,process",
            },
        )

        # Search for "development" in entities only
        search_result = await client.call_tool(
            "search_notes",
            {
                "project": test_project.name,
                "query": "development",
                "entity_types": ["entity"],
            },
        )

        result_text = search_result.content[0].text
        # Should find the main entity but filter out observations/relations
        assert "Development Process" in result_text


@pytest.mark.asyncio
async def test_search_pagination(mcp_server, app, test_project):
    """Test search result pagination."""

    async with Client(mcp_server) as client:
        # Create multiple notes to test pagination
        for i in range(15):
            await client.call_tool(
                "write_note",
                {
                    "project": test_project.name,
                    "title": f"Test Note {i + 1:02d}",
                    "folder": "test",
                    "content": f"# Test Note {i + 1:02d}\n\nThis is test content for pagination testing.",
                    "tags": "test,pagination",
                },
            )

        # Search with pagination (page 1, page_size 5)
        search_result = await client.call_tool(
            "search_notes",
            {
                "project": test_project.name,
                "query": "test",
                "page": 1,
                "page_size": 5,
            },
        )

        result_text = search_result.content[0].text
        # Should contain 5 results and pagination info
        assert '"current_page":1' in result_text
        assert '"page_size":5' in result_text

        # Search page 2
        search_result = await client.call_tool(
            "search_notes",
            {
                "project": test_project.name,
                "query": "test",
                "page": 2,
                "page_size": 5,
            },
        )

        result_text = search_result.content[0].text
        assert '"current_page":2' in result_text


@pytest.mark.asyncio
async def test_search_no_results(mcp_server, app, test_project):
    """Test search with no matching results."""

    async with Client(mcp_server) as client:
        # Create a test note
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Sample Note",
                "folder": "test",
                "content": "# Sample Note\n\nThis is a sample note for testing.",
                "tags": "sample,test",
            },
        )

        # Search for something that doesn't exist
        search_result = await client.call_tool(
            "search_notes",
            {
                "project": test_project.name,
                "query": "nonexistent",
            },
        )

        result_text = search_result.content[0].text
        assert '"results": []' in result_text or '"results":[]' in result_text


@pytest.mark.asyncio
async def test_search_complex_boolean_query(mcp_server, app, test_project):
    """Test complex boolean queries with grouping."""

    async with Client(mcp_server) as client:
        # Create test notes
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Python Web Development",
                "folder": "tutorials",
                "content": "# Python Web Development\n\nLearn Python for web development using Flask and Django.",
                "tags": "python,web,development",
            },
        )

        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Python Data Science",
                "folder": "tutorials",
                "content": "# Python Data Science\n\nData analysis and machine learning with Python.",
                "tags": "python,data,science",
            },
        )

        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "JavaScript Web Development",
                "folder": "tutorials",
                "content": "# JavaScript Web Development\n\nBuilding web applications with JavaScript and React.",
                "tags": "javascript,web,development",
            },
        )

        # Complex boolean query: (Python OR JavaScript) AND web
        search_result = await client.call_tool(
            "search_notes",
            {
                "project": test_project.name,
                "query": "(Python OR JavaScript) AND web",
            },
        )

        result_text = search_result.content[0].text
        assert "Python Web Development" in result_text
        assert "JavaScript Web Development" in result_text
        assert "Python Data Science" not in result_text  # Has Python but not web


@pytest.mark.asyncio
async def test_search_case_insensitive(mcp_server, app, test_project):
    """Test that search is case insensitive."""

    async with Client(mcp_server) as client:
        # Create test note
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Machine Learning Guide",
                "folder": "guides",
                "content": "# Machine Learning Guide\n\nIntroduction to MACHINE LEARNING concepts.",
                "tags": "ML,AI",
            },
        )

        # Search with different cases
        search_cases = ["machine", "MACHINE", "Machine", "learning", "LEARNING"]

        for search_term in search_cases:
            search_result = await client.call_tool(
                "search_notes",
                {
                    "project": test_project.name,
                    "query": search_term,
                },
            )

            result_text = search_result.content[0].text
            assert "Machine Learning Guide" in result_text, f"Failed for search term: {search_term}"

```

--------------------------------------------------------------------------------
/tests/api/test_importer_router.py:
--------------------------------------------------------------------------------

```python
"""Tests for importer API routes."""

import json
from pathlib import Path

import pytest
from httpx import AsyncClient

from basic_memory.schemas.importer import (
    ChatImportResult,
    EntityImportResult,
    ProjectImportResult,
)


@pytest.fixture
def chatgpt_json_content():
    """Sample ChatGPT conversation data for testing."""
    return [
        {
            "title": "Test Conversation",
            "create_time": 1736616594.24054,  # Example timestamp
            "update_time": 1736616603.164995,
            "mapping": {
                "root": {"id": "root", "message": None, "parent": None, "children": ["msg1"]},
                "msg1": {
                    "id": "msg1",
                    "message": {
                        "id": "msg1",
                        "author": {"role": "user", "name": None, "metadata": {}},
                        "create_time": 1736616594.24054,
                        "content": {
                            "content_type": "text",
                            "parts": ["Hello, this is a test message"],
                        },
                        "status": "finished_successfully",
                        "metadata": {},
                    },
                    "parent": "root",
                    "children": ["msg2"],
                },
                "msg2": {
                    "id": "msg2",
                    "message": {
                        "id": "msg2",
                        "author": {"role": "assistant", "name": None, "metadata": {}},
                        "create_time": 1736616603.164995,
                        "content": {"content_type": "text", "parts": ["This is a test response"]},
                        "status": "finished_successfully",
                        "metadata": {},
                    },
                    "parent": "msg1",
                    "children": [],
                },
            },
        }
    ]


@pytest.fixture
def claude_conversations_json_content():
    """Sample Claude conversations data for testing."""
    return [
        {
            "uuid": "test-uuid",
            "name": "Test Conversation",
            "created_at": "2025-01-05T20:55:32.499880+00:00",
            "updated_at": "2025-01-05T20:56:39.477600+00:00",
            "chat_messages": [
                {
                    "uuid": "msg-1",
                    "text": "Hello, this is a test",
                    "sender": "human",
                    "created_at": "2025-01-05T20:55:32.499880+00:00",
                    "content": [{"type": "text", "text": "Hello, this is a test"}],
                },
                {
                    "uuid": "msg-2",
                    "text": "Response to test",
                    "sender": "assistant",
                    "created_at": "2025-01-05T20:55:40.123456+00:00",
                    "content": [{"type": "text", "text": "Response to test"}],
                },
            ],
        }
    ]


@pytest.fixture
def claude_projects_json_content():
    """Sample Claude projects data for testing."""
    return [
        {
            "uuid": "test-uuid",
            "name": "Test Project",
            "created_at": "2025-01-05T20:55:32.499880+00:00",
            "updated_at": "2025-01-05T20:56:39.477600+00:00",
            "prompt_template": "# Test Prompt\n\nThis is a test prompt.",
            "docs": [
                {
                    "uuid": "doc-uuid-1",
                    "filename": "Test Document",
                    "content": "# Test Document\n\nThis is test content.",
                    "created_at": "2025-01-05T20:56:39.477600+00:00",
                },
                {
                    "uuid": "doc-uuid-2",
                    "filename": "Another Document",
                    "content": "# Another Document\n\nMore test content.",
                    "created_at": "2025-01-05T20:56:39.477600+00:00",
                },
            ],
        }
    ]


@pytest.fixture
def memory_json_content():
    """Sample memory.json data for testing."""
    return [
        {
            "type": "entity",
            "name": "test_entity",
            "entityType": "test",
            "observations": ["Test observation 1", "Test observation 2"],
        },
        {
            "type": "relation",
            "from": "test_entity",
            "to": "related_entity",
            "relationType": "test_relation",
        },
    ]


async def create_test_upload_file(tmp_path, content):
    """Create a test file for upload."""
    file_path = tmp_path / "test_import.json"
    with open(file_path, "w", encoding="utf-8") as f:
        json.dump(content, f)

    return file_path


@pytest.mark.asyncio
async def test_import_chatgpt(
    project_config, client: AsyncClient, tmp_path, chatgpt_json_content, file_service, project_url
):
    """Test importing ChatGPT conversations."""
    # Create a test file
    file_path = await create_test_upload_file(tmp_path, chatgpt_json_content)

    # Create a multipart form with the file
    with open(file_path, "rb") as f:
        files = {"file": ("conversations.json", f, "application/json")}
        data = {"folder": "test_chatgpt"}

        # Send request
        response = await client.post(f"{project_url}/import/chatgpt", files=files, data=data)

    # Check response
    assert response.status_code == 200
    result = ChatImportResult.model_validate(response.json())
    assert result.success is True
    assert result.conversations == 1
    assert result.messages == 2

    # Verify files were created
    conv_path = Path("test_chatgpt") / "20250111-Test_Conversation.md"
    assert await file_service.exists(conv_path)

    content, _ = await file_service.read_file(conv_path)
    assert "# Test Conversation" in content
    assert "Hello, this is a test message" in content
    assert "This is a test response" in content


@pytest.mark.asyncio
async def test_import_chatgpt_invalid_file(client: AsyncClient, tmp_path, project_url):
    """Test importing invalid ChatGPT file."""
    # Create invalid file
    file_path = tmp_path / "invalid.json"
    with open(file_path, "w") as f:
        f.write("This is not JSON")

    # Create multipart form with invalid file
    with open(file_path, "rb") as f:
        files = {"file": ("invalid.json", f, "application/json")}
        data = {"folder": "test_chatgpt"}

        # Send request - this should return an error
        response = await client.post(f"{project_url}/import/chatgpt", files=files, data=data)

    # Check response
    assert response.status_code == 500
    assert "Import failed" in response.json()["detail"]


@pytest.mark.asyncio
async def test_import_claude_conversations(
    client: AsyncClient, tmp_path, claude_conversations_json_content, file_service, project_url
):
    """Test importing Claude conversations."""
    # Create a test file
    file_path = await create_test_upload_file(tmp_path, claude_conversations_json_content)

    # Create a multipart form with the file
    with open(file_path, "rb") as f:
        files = {"file": ("conversations.json", f, "application/json")}
        data = {"folder": "test_claude_conversations"}

        # Send request
        response = await client.post(
            f"{project_url}/import/claude/conversations", files=files, data=data
        )

    # Check response
    assert response.status_code == 200
    result = ChatImportResult.model_validate(response.json())
    assert result.success is True
    assert result.conversations == 1
    assert result.messages == 2

    # Verify files were created
    conv_path = Path("test_claude_conversations") / "20250105-Test_Conversation.md"
    assert await file_service.exists(conv_path)

    content, _ = await file_service.read_file(conv_path)
    assert "# Test Conversation" in content
    assert "Hello, this is a test" in content
    assert "Response to test" in content


@pytest.mark.asyncio
async def test_import_claude_conversations_invalid_file(client: AsyncClient, tmp_path, project_url):
    """Test importing invalid Claude conversations file."""
    # Create invalid file
    file_path = tmp_path / "invalid.json"
    with open(file_path, "w") as f:
        f.write("This is not JSON")

    # Create multipart form with invalid file
    with open(file_path, "rb") as f:
        files = {"file": ("invalid.json", f, "application/json")}
        data = {"folder": "test_claude_conversations"}

        # Send request - this should return an error
        response = await client.post(
            f"{project_url}/import/claude/conversations", files=files, data=data
        )

    # Check response
    assert response.status_code == 500
    assert "Import failed" in response.json()["detail"]


@pytest.mark.asyncio
async def test_import_claude_projects(
    client: AsyncClient, tmp_path, claude_projects_json_content, file_service, project_url
):
    """Test importing Claude projects."""
    # Create a test file
    file_path = await create_test_upload_file(tmp_path, claude_projects_json_content)

    # Create a multipart form with the file
    with open(file_path, "rb") as f:
        files = {"file": ("projects.json", f, "application/json")}
        data = {"folder": "test_claude_projects"}

        # Send request
        response = await client.post(
            f"{project_url}/import/claude/projects", files=files, data=data
        )

    # Check response
    assert response.status_code == 200
    result = ProjectImportResult.model_validate(response.json())
    assert result.success is True
    assert result.documents == 2
    assert result.prompts == 1

    # Verify files were created
    project_dir = Path("test_claude_projects") / "Test_Project"
    assert await file_service.exists(project_dir / "prompt-template.md")
    assert await file_service.exists(project_dir / "docs" / "Test_Document.md")
    assert await file_service.exists(project_dir / "docs" / "Another_Document.md")

    # Check content
    prompt_content, _ = await file_service.read_file(project_dir / "prompt-template.md")
    assert "# Test Prompt" in prompt_content

    doc_content, _ = await file_service.read_file(project_dir / "docs" / "Test_Document.md")
    assert "# Test Document" in doc_content
    assert "This is test content" in doc_content


@pytest.mark.asyncio
async def test_import_claude_projects_invalid_file(client: AsyncClient, tmp_path, project_url):
    """Test importing invalid Claude projects file."""
    # Create invalid file
    file_path = tmp_path / "invalid.json"
    with open(file_path, "w") as f:
        f.write("This is not JSON")

    # Create multipart form with invalid file
    with open(file_path, "rb") as f:
        files = {"file": ("invalid.json", f, "application/json")}
        data = {"folder": "test_claude_projects"}

        # Send request - this should return an error
        response = await client.post(
            f"{project_url}/import/claude/projects", files=files, data=data
        )

    # Check response
    assert response.status_code == 500
    assert "Import failed" in response.json()["detail"]


@pytest.mark.asyncio
async def test_import_memory_json(
    client: AsyncClient, tmp_path, memory_json_content, file_service, project_url
):
    """Test importing memory.json file."""
    # Create a test file
    json_file = tmp_path / "memory.json"
    with open(json_file, "w", encoding="utf-8") as f:
        for entity in memory_json_content:
            f.write(json.dumps(entity) + "\n")

    # Create a multipart form with the file
    with open(json_file, "rb") as f:
        files = {"file": ("memory.json", f, "application/json")}
        data = {"folder": "test_memory_json"}

        # Send request
        response = await client.post(f"{project_url}/import/memory-json", files=files, data=data)

    # Check response
    assert response.status_code == 200
    result = EntityImportResult.model_validate(response.json())
    assert result.success is True
    assert result.entities == 1
    assert result.relations == 1

    # Verify files were created
    entity_path = Path("test_memory_json") / "test" / "test_entity.md"
    assert await file_service.exists(entity_path)

    # Check content
    content, _ = await file_service.read_file(entity_path)
    assert "Test observation 1" in content
    assert "Test observation 2" in content
    assert "test_relation [[related_entity]]" in content


@pytest.mark.asyncio
async def test_import_memory_json_without_folder(
    client: AsyncClient, tmp_path, memory_json_content, file_service, project_url
):
    """Test importing memory.json file without specifying a destination folder."""
    # Create a test file
    json_file = tmp_path / "memory.json"
    with open(json_file, "w", encoding="utf-8") as f:
        for entity in memory_json_content:
            f.write(json.dumps(entity) + "\n")

    # Create a multipart form with the file
    with open(json_file, "rb") as f:
        files = {"file": ("memory.json", f, "application/json")}

        # Send request without destination_folder
        response = await client.post(f"{project_url}/import/memory-json", files=files)

    # Check response
    assert response.status_code == 200
    result = EntityImportResult.model_validate(response.json())
    assert result.success is True
    assert result.entities == 1
    assert result.relations == 1

    # Verify files were created in the root directory
    entity_path = Path("conversations") / "test" / "test_entity.md"
    assert await file_service.exists(entity_path)


@pytest.mark.asyncio
async def test_import_memory_json_invalid_file(client: AsyncClient, tmp_path, project_url):
    """Test importing invalid memory.json file."""
    # Create invalid file
    file_path = tmp_path / "invalid.json"
    with open(file_path, "w") as f:
        f.write("This is not JSON")

    # Create multipart form with invalid file
    with open(file_path, "rb") as f:
        files = {"file": ("invalid.json", f, "application/json")}
        data = {"destination_folder": "test_memory_json"}

        # Send request - this should return an error
        response = await client.post(f"{project_url}/import/memory-json", files=files, data=data)

    # Check response
    assert response.status_code == 500
    assert "Import failed" in response.json()["detail"]


@pytest.mark.asyncio
async def test_import_missing_file(client: AsyncClient, tmp_path, project_url):
    """Test importing with missing file."""
    # Send a request without a file
    response = await client.post(f"{project_url}/import/chatgpt", data={"folder": "test_folder"})

    # Check that the request was rejected
    assert response.status_code in [400, 422]  # Either bad request or unprocessable entity


@pytest.mark.asyncio
async def test_import_empty_file(client: AsyncClient, tmp_path, project_url):
    """Test importing an empty file."""
    # Create an empty file
    file_path = tmp_path / "empty.json"
    with open(file_path, "w") as f:
        f.write("")

    # Create multipart form with empty file
    with open(file_path, "rb") as f:
        files = {"file": ("empty.json", f, "application/json")}
        data = {"folder": "test_chatgpt"}

        # Send request
        response = await client.post(f"{project_url}/import/chatgpt", files=files, data=data)

    # Check response
    assert response.status_code == 500
    assert "Import failed" in response.json()["detail"]


@pytest.mark.asyncio
async def test_import_malformed_json(client: AsyncClient, tmp_path, project_url):
    """Test importing malformed JSON for all import endpoints."""
    # Create malformed JSON file
    file_path = tmp_path / "malformed.json"
    with open(file_path, "w") as f:
        f.write('{"incomplete": "json"')  # Missing closing brace

    # Test all import endpoints
    endpoints = [
        (f"{project_url}/import/chatgpt", {"folder": "test"}),
        (f"{project_url}/import/claude/conversations", {"folder": "test"}),
        (f"{project_url}/import/claude/projects", {"base_folder": "test"}),
        (f"{project_url}/import/memory-json", {"destination_folder": "test"}),
    ]

    for endpoint, data in endpoints:
        # Create multipart form with malformed JSON
        with open(file_path, "rb") as f:
            files = {"file": ("malformed.json", f, "application/json")}

            # Send request
            response = await client.post(endpoint, files=files, data=data)

        # Check response
        assert response.status_code == 500
        assert "Import failed" in response.json()["detail"]

```

--------------------------------------------------------------------------------
/src/basic_memory/mcp/tools/search.py:
--------------------------------------------------------------------------------

```python
"""Search tools for Basic Memory MCP server."""

from textwrap import dedent
from typing import List, Optional

from loguru import logger
from fastmcp import Context

from basic_memory.mcp.async_client import get_client
from basic_memory.mcp.project_context import get_active_project
from basic_memory.mcp.server import mcp
from basic_memory.mcp.tools.utils import call_post
from basic_memory.schemas.search import SearchItemType, SearchQuery, SearchResponse


def _format_search_error_response(
    project: str, error_message: str, query: str, search_type: str = "text"
) -> str:
    """Format helpful error responses for search failures that guide users to successful searches."""

    # FTS5 syntax errors
    if "syntax error" in error_message.lower() or "fts5" in error_message.lower():
        clean_query = (
            query.replace('"', "")
            .replace("(", "")
            .replace(")", "")
            .replace("+", "")
            .replace("*", "")
        )
        return dedent(f"""
            # Search Failed - Invalid Syntax

            The search query '{query}' contains invalid syntax that the search engine cannot process.

            ## Common syntax issues:
            1. **Special characters**: Characters like `+`, `*`, `"`, `(`, `)` have special meaning in search
            2. **Unmatched quotes**: Make sure quotes are properly paired
            3. **Invalid operators**: Check AND, OR, NOT operators are used correctly

            ## How to fix:
            1. **Simplify your search**: Try using simple words instead: `{clean_query}`
            2. **Remove special characters**: Use alphanumeric characters and spaces
            3. **Use basic boolean operators**: `word1 AND word2`, `word1 OR word2`, `word1 NOT word2`

            ## Examples of valid searches:
            - Simple text: `project planning`
            - Boolean AND: `project AND planning`
            - Boolean OR: `meeting OR discussion`
            - Boolean NOT: `project NOT archived`
            - Grouped: `(project OR planning) AND notes`
            - Exact phrases: `"weekly standup meeting"`
            - Content-specific: `tag:example` or `category:observation`

            ## Try again with:
            ```
            search_notes("{project}","{clean_query}")
            ```

            ## Alternative search strategies:
            - Break into simpler terms: `search_notes("{project}", "{" ".join(clean_query.split()[:2])}")`
            - Try different search types: `search_notes("{project}","{clean_query}", search_type="title")`
            - Use filtering: `search_notes("{project}","{clean_query}", types=["entity"])`
            """).strip()

    # Project not found errors (check before general "not found")
    if "project not found" in error_message.lower():
        return dedent(f"""
            # Search Failed - Project Not Found

            The current project is not accessible or doesn't exist: {error_message}

            ## How to resolve:
            1. **Check available projects**: `list_projects()`
            3. **Verify project setup**: Ensure your project is properly configured

            ## Current session info:
            - See available projects: `list_projects()`
            """).strip()

    # No results found
    if "no results" in error_message.lower() or "not found" in error_message.lower():
        simplified_query = (
            " ".join(query.split()[:2])
            if len(query.split()) > 2
            else query.split()[0]
            if query.split()
            else "notes"
        )
        return dedent(f"""
            # Search Complete - No Results Found

            No content found matching '{query}' in the current project.

            ## Search strategy suggestions:
            1. **Broaden your search**: Try fewer or more general terms
               - Instead of: `{query}`
               - Try: `{simplified_query}`

            2. **Check spelling and try variations**:
               - Verify terms are spelled correctly
               - Try synonyms or related terms

            3. **Use different search approaches**:
               - **Text search**: `search_notes("{project}","{query}", search_type="text")` (searches full content)
               - **Title search**: `search_notes("{project}","{query}", search_type="title")` (searches only titles)
               - **Permalink search**: `search_notes("{project}","{query}", search_type="permalink")` (searches file paths)

            4. **Try boolean operators for broader results**:
               - OR search: `search_notes("{project}","{" OR ".join(query.split()[:3])}")`
               - Remove restrictive terms: Focus on the most important keywords

            5. **Use filtering to narrow scope**:
               - By content type: `search_notes("{project}","{query}", types=["entity"])`
               - By recent content: `search_notes("{project}","{query}", after_date="1 week")`
               - By entity type: `search_notes("{project}","{query}", entity_types=["observation"])`

            6. **Try advanced search patterns**:
               - Tag search: `search_notes("{project}","tag:your-tag")`
               - Category search: `search_notes("{project}","category:observation")`
               - Pattern matching: `search_notes("{project}","*{query}*", search_type="permalink")`

            ## Explore what content exists:
            - **Recent activity**: `recent_activity(timeframe="7d")` - See what's been updated recently
            - **List directories**: `list_directory("{project}","/")` - Browse all content
            - **Browse by folder**: `list_directory("{project}","/notes")` or `list_directory("/docs")`
            """).strip()

    # Server/API errors
    if "server error" in error_message.lower() or "internal" in error_message.lower():
        return dedent(f"""
            # Search Failed - Server Error

            The search service encountered an error while processing '{query}': {error_message}

            ## Immediate steps:
            1. **Try again**: The error might be temporary
            2. **Simplify the query**: Use simpler search terms
            3. **Check project status**: Ensure your project is properly synced

            ## Alternative approaches:
            - Browse files directly: `list_directory("{project}","/")`
            - Check recent activity: `recent_activity(timeframe="7d")`
            - Try a different search type: `search_notes("{project}","{query}", search_type="title")`

            ## If the problem persists:
            The search index might need to be rebuilt. Send a message to [email protected] or check the project sync status.
            """).strip()

    # Permission/access errors
    if (
        "permission" in error_message.lower()
        or "access" in error_message.lower()
        or "forbidden" in error_message.lower()
    ):
        return f"""# Search Failed - Access Error

You don't have permission to search in the current project: {error_message}

## How to resolve:
1. **Check your project access**: Verify you have read permissions for this project
2. **Switch projects**: Try searching in a different project you have access to
3. **Check authentication**: You might need to re-authenticate

## Alternative actions:
- List available projects: `list_projects()`"""

    # Generic fallback
    return f"""# Search Failed

Error searching for '{query}': {error_message}

## Troubleshooting steps:
1. **Simplify your query**: Try basic words without special characters
2. **Check search syntax**: Ensure boolean operators are correctly formatted
3. **Verify project access**: Make sure you can access the current project
4. **Test with simple search**: Try `search_notes("test")` to verify search is working

## Alternative search approaches:
- **Different search types**: 
  - Title only: `search_notes("{project}","{query}", search_type="title")`
  - Permalink patterns: `search_notes("{project}","{query}*", search_type="permalink")`
- **With filters**: `search_notes("{project}","{query}", types=["entity"])`
- **Recent content**: `search_notes("{project}","{query}", after_date="1 week")`
- **Boolean variations**: `search_notes("{project}","{" OR ".join(query.split()[:2])}")`

## Explore your content:
- **Browse files**: `list_directory("{project}","/")` - See all available content
- **Recent activity**: `recent_activity(timeframe="7d")` - Check what's been updated
- **All projects**: `list_projects()` 

## Search syntax reference:
- **Basic**: `keyword` or `multiple words`
- **Boolean**: `term1 AND term2`, `term1 OR term2`, `term1 NOT term2`
- **Phrases**: `"exact phrase"`
- **Grouping**: `(term1 OR term2) AND term3`
- **Patterns**: `tag:example`, `category:observation`"""


@mcp.tool(
    description="Search across all content in the knowledge base with advanced syntax support.",
)
async def search_notes(
    query: str,
    project: Optional[str] = None,
    page: int = 1,
    page_size: int = 10,
    search_type: str = "text",
    types: Optional[List[str]] = None,
    entity_types: Optional[List[str]] = None,
    after_date: Optional[str] = None,
    context: Context | None = None,
) -> SearchResponse | str:
    """Search across all content in the knowledge base with comprehensive syntax support.

    This tool searches the knowledge base using full-text search, pattern matching,
    or exact permalink lookup. It supports filtering by content type, entity type,
    and date, with advanced boolean and phrase search capabilities.

    Project Resolution:
    Server resolves projects in this order: Single Project Mode → project parameter → default project.
    If project unknown, use list_memory_projects() or recent_activity() first.

    ## Search Syntax Examples

    ### Basic Searches
    - `search_notes("my-project", "keyword")` - Find any content containing "keyword"
    - `search_notes("work-docs", "'exact phrase'")` - Search for exact phrase match

    ### Advanced Boolean Searches
    - `search_notes("my-project", "term1 term2")` - Find content with both terms (implicit AND)
    - `search_notes("my-project", "term1 AND term2")` - Explicit AND search (both terms required)
    - `search_notes("my-project", "term1 OR term2")` - Either term can be present
    - `search_notes("my-project", "term1 NOT term2")` - Include term1 but exclude term2
    - `search_notes("my-project", "(project OR planning) AND notes")` - Grouped boolean logic

    ### Content-Specific Searches
    - `search_notes("research", "tag:example")` - Search within specific tags (if supported by content)
    - `search_notes("work-project", "category:observation")` - Filter by observation categories
    - `search_notes("team-docs", "author:username")` - Find content by author (if metadata available)

    ### Search Type Examples
    - `search_notes("my-project", "Meeting", search_type="title")` - Search only in titles
    - `search_notes("work-docs", "docs/meeting-*", search_type="permalink")` - Pattern match permalinks
    - `search_notes("research", "keyword", search_type="text")` - Full-text search (default)

    ### Filtering Options
    - `search_notes("my-project", "query", types=["entity"])` - Search only entities
    - `search_notes("work-docs", "query", types=["note", "person"])` - Multiple content types
    - `search_notes("research", "query", entity_types=["observation"])` - Filter by entity type
    - `search_notes("team-docs", "query", after_date="2024-01-01")` - Recent content only
    - `search_notes("my-project", "query", after_date="1 week")` - Relative date filtering

    ### Advanced Pattern Examples
    - `search_notes("work-project", "project AND (meeting OR discussion)")` - Complex boolean logic
    - `search_notes("research", "\"exact phrase\" AND keyword")` - Combine phrase and keyword search
    - `search_notes("dev-notes", "bug NOT fixed")` - Exclude resolved issues
    - `search_notes("archive", "docs/2024-*", search_type="permalink")` - Year-based permalink search

    Args:
        query: The search query string (supports boolean operators, phrases, patterns)
        project: Project name to search in. Optional - server will resolve using hierarchy.
                If unknown, use list_memory_projects() to discover available projects.
        page: The page number of results to return (default 1)
        page_size: The number of results to return per page (default 10)
        search_type: Type of search to perform, one of: "text", "title", "permalink" (default: "text")
        types: Optional list of note types to search (e.g., ["note", "person"])
        entity_types: Optional list of entity types to filter by (e.g., ["entity", "observation"])
        after_date: Optional date filter for recent content (e.g., "1 week", "2d", "2024-01-01")
        context: Optional FastMCP context for performance caching.

    Returns:
        SearchResponse with results and pagination info, or helpful error guidance if search fails

    Examples:
        # Basic text search
        results = await search_notes("project planning")

        # Boolean AND search (both terms must be present)
        results = await search_notes("project AND planning")

        # Boolean OR search (either term can be present)
        results = await search_notes("project OR meeting")

        # Boolean NOT search (exclude terms)
        results = await search_notes("project NOT meeting")

        # Boolean search with grouping
        results = await search_notes("(project OR planning) AND notes")

        # Exact phrase search
        results = await search_notes("\"weekly standup meeting\"")

        # Search with type filter
        results = await search_notes(
            "meeting notes",
            types=["entity"],
        )

        # Search with entity type filter
        results = await search_notes(
            "meeting notes",
            entity_types=["observation"],
        )

        # Search for recent content
        results = await search_notes(
            "bug report",
            after_date="1 week"
        )

        # Pattern matching on permalinks
        results = await search_notes(
            "docs/meeting-*",
            search_type="permalink"
        )

        # Title-only search
        results = await search_notes(
            "Machine Learning",
            search_type="title"
        )

        # Complex search with multiple filters
        results = await search_notes(
            "(bug OR issue) AND NOT resolved",
            types=["entity"],
            after_date="2024-01-01"
        )

        # Explicit project specification
        results = await search_notes("project planning", project="my-project")
    """
    # Create a SearchQuery object based on the parameters
    search_query = SearchQuery()

    # Set the appropriate search field based on search_type
    if search_type == "text":
        search_query.text = query
    elif search_type == "title":
        search_query.title = query
    elif search_type == "permalink" and "*" in query:
        search_query.permalink_match = query
    elif search_type == "permalink":
        search_query.permalink = query
    else:
        search_query.text = query  # Default to text search

    # Add optional filters if provided
    if entity_types:
        search_query.entity_types = [SearchItemType(t) for t in entity_types]
    if types:
        search_query.types = types
    if after_date:
        search_query.after_date = after_date

    async with get_client() as client:
        active_project = await get_active_project(client, project, context)
        project_url = active_project.project_url

        logger.info(f"Searching for {search_query} in project {active_project.name}")

        try:
            response = await call_post(
                client,
                f"{project_url}/search/",
                json=search_query.model_dump(),
                params={"page": page, "page_size": page_size},
            )
            result = SearchResponse.model_validate(response.json())

            # Check if we got no results and provide helpful guidance
            if not result.results:
                logger.info(
                    f"Search returned no results for query: {query} in project {active_project.name}"
                )
                # Don't treat this as an error, but the user might want guidance
                # We return the empty result as normal - the user can decide if they need help

            return result

        except Exception as e:
            logger.error(f"Search failed for query '{query}': {e}, project: {active_project.name}")
            # Return formatted error message as string for better user experience
            return _format_search_error_response(active_project.name, str(e), query, search_type)

```

--------------------------------------------------------------------------------
/tests/test_config.py:
--------------------------------------------------------------------------------

```python
"""Test configuration management."""

import tempfile
import pytest
from datetime import datetime

from basic_memory.config import BasicMemoryConfig, CloudProjectConfig, ConfigManager
from pathlib import Path


class TestBasicMemoryConfig:
    """Test BasicMemoryConfig behavior with BASIC_MEMORY_HOME environment variable."""

    def test_default_behavior_without_basic_memory_home(self, config_home, monkeypatch):
        """Test that config uses default path when BASIC_MEMORY_HOME is not set."""
        # Ensure BASIC_MEMORY_HOME is not set
        monkeypatch.delenv("BASIC_MEMORY_HOME", raising=False)

        config = BasicMemoryConfig()

        # Should use the default path (home/basic-memory)
        expected_path = (config_home / "basic-memory").as_posix()
        assert config.projects["main"] == Path(expected_path).as_posix()

    def test_respects_basic_memory_home_environment_variable(self, config_home, monkeypatch):
        """Test that config respects BASIC_MEMORY_HOME environment variable."""
        custom_path = (config_home / "app" / "data").as_posix()
        monkeypatch.setenv("BASIC_MEMORY_HOME", custom_path)

        config = BasicMemoryConfig()

        # Should use the custom path from environment variable
        assert config.projects["main"] == custom_path

    def test_model_post_init_respects_basic_memory_home(self, config_home, monkeypatch):
        """Test that model_post_init creates main project with BASIC_MEMORY_HOME when missing."""
        custom_path = str(config_home / "custom" / "memory" / "path")
        monkeypatch.setenv("BASIC_MEMORY_HOME", custom_path)

        # Create config without main project
        other_path = str(config_home / "some" / "path")
        config = BasicMemoryConfig(projects={"other": other_path})

        # model_post_init should have added main project with BASIC_MEMORY_HOME
        assert "main" in config.projects
        assert config.projects["main"] == Path(custom_path).as_posix()

    def test_model_post_init_fallback_without_basic_memory_home(self, config_home, monkeypatch):
        """Test that model_post_init falls back to default when BASIC_MEMORY_HOME is not set."""
        # Ensure BASIC_MEMORY_HOME is not set
        monkeypatch.delenv("BASIC_MEMORY_HOME", raising=False)

        # Create config without main project
        other_path = (config_home / "some" / "path").as_posix()
        config = BasicMemoryConfig(projects={"other": other_path})

        # model_post_init should have added main project with default path
        expected_path = (config_home / "basic-memory").as_posix()
        assert "main" in config.projects
        assert config.projects["main"] == Path(expected_path).as_posix()

    def test_basic_memory_home_with_relative_path(self, config_home, monkeypatch):
        """Test that BASIC_MEMORY_HOME works with relative paths."""
        relative_path = "relative/memory/path"
        monkeypatch.setenv("BASIC_MEMORY_HOME", relative_path)

        config = BasicMemoryConfig()

        # Should use the exact value from environment variable
        assert config.projects["main"] == relative_path

    def test_basic_memory_home_overrides_existing_main_project(self, config_home, monkeypatch):
        """Test that BASIC_MEMORY_HOME is not used when a map is passed in the constructor."""
        custom_path = str(config_home / "override" / "memory" / "path")
        monkeypatch.setenv("BASIC_MEMORY_HOME", custom_path)

        # Try to create config with a different main project path
        original_path = str(config_home / "original" / "path")
        config = BasicMemoryConfig(projects={"main": original_path})

        # The default_factory should override with BASIC_MEMORY_HOME value
        # Note: This tests the current behavior where default_factory takes precedence
        assert config.projects["main"] == original_path


class TestConfigManager:
    """Test ConfigManager functionality."""

    @pytest.fixture
    def temp_config_manager(self):
        """Create a ConfigManager with temporary config file."""
        with tempfile.TemporaryDirectory() as temp_dir:
            temp_path = Path(temp_dir)

            # Create a test ConfigManager instance
            config_manager = ConfigManager()
            # Override config paths to use temp directory
            config_manager.config_dir = temp_path / "basic-memory"
            config_manager.config_file = config_manager.config_dir / "config.yaml"
            config_manager.config_dir.mkdir(parents=True, exist_ok=True)

            # Create initial config with test projects
            test_config = BasicMemoryConfig(
                default_project="main",
                projects={
                    "main": str(temp_path / "main"),
                    "test-project": str(temp_path / "test"),
                    "special-chars": str(
                        temp_path / "special"
                    ),  # This will be the config key for "Special/Chars"
                },
            )
            config_manager.save_config(test_config)

            yield config_manager

    def test_set_default_project_with_exact_name_match(self, temp_config_manager):
        """Test set_default_project when project name matches config key exactly."""
        config_manager = temp_config_manager

        # Set default to a project that exists with exact name match
        config_manager.set_default_project("test-project")

        # Verify the config was updated
        config = config_manager.load_config()
        assert config.default_project == "test-project"

    def test_set_default_project_with_permalink_lookup(self, temp_config_manager):
        """Test set_default_project when input needs permalink normalization."""
        config_manager = temp_config_manager

        # Simulate a project that was created with special characters
        # The config key would be the permalink, but user might type the original name

        # First add a project with original name that gets normalized
        config = config_manager.load_config()
        config.projects["special-chars-project"] = str(Path("/tmp/special"))
        config_manager.save_config(config)

        # Now test setting default using a name that will normalize to the config key
        config_manager.set_default_project(
            "Special Chars Project"
        )  # This should normalize to "special-chars-project"

        # Verify the config was updated with the correct config key
        updated_config = config_manager.load_config()
        assert updated_config.default_project == "special-chars-project"

    def test_set_default_project_uses_canonical_name(self, temp_config_manager):
        """Test that set_default_project uses the canonical config key, not user input."""
        config_manager = temp_config_manager

        # Add a project with a config key that differs from user input
        config = config_manager.load_config()
        config.projects["my-test-project"] = str(Path("/tmp/mytest"))
        config_manager.save_config(config)

        # Set default using input that will match but is different from config key
        config_manager.set_default_project("My Test Project")  # Should find "my-test-project"

        # Verify that the canonical config key is used, not the user input
        updated_config = config_manager.load_config()
        assert updated_config.default_project == "my-test-project"
        # Should NOT be the user input
        assert updated_config.default_project != "My Test Project"

    def test_set_default_project_nonexistent_project(self, temp_config_manager):
        """Test set_default_project raises ValueError for nonexistent project."""
        config_manager = temp_config_manager

        with pytest.raises(ValueError, match="Project 'nonexistent' not found"):
            config_manager.set_default_project("nonexistent")

    def test_disable_permalinks_flag_default(self):
        """Test that disable_permalinks flag defaults to False."""
        config = BasicMemoryConfig()
        assert config.disable_permalinks is False

    def test_disable_permalinks_flag_can_be_enabled(self):
        """Test that disable_permalinks flag can be set to True."""
        config = BasicMemoryConfig(disable_permalinks=True)
        assert config.disable_permalinks is True

    def test_config_manager_respects_custom_config_dir(self, monkeypatch):
        """Test that ConfigManager respects BASIC_MEMORY_CONFIG_DIR environment variable."""
        with tempfile.TemporaryDirectory() as temp_dir:
            custom_config_dir = Path(temp_dir) / "custom" / "config"
            monkeypatch.setenv("BASIC_MEMORY_CONFIG_DIR", str(custom_config_dir))

            config_manager = ConfigManager()

            # Verify config_dir is set to the custom path
            assert config_manager.config_dir == custom_config_dir
            # Verify config_file is in the custom directory
            assert config_manager.config_file == custom_config_dir / "config.json"
            # Verify the directory was created
            assert config_manager.config_dir.exists()

    def test_config_manager_default_without_custom_config_dir(self, config_home, monkeypatch):
        """Test that ConfigManager uses default location when BASIC_MEMORY_CONFIG_DIR is not set."""
        monkeypatch.delenv("BASIC_MEMORY_CONFIG_DIR", raising=False)

        config_manager = ConfigManager()

        # Should use default location
        assert config_manager.config_dir == config_home / ".basic-memory"
        assert config_manager.config_file == config_home / ".basic-memory" / "config.json"

    def test_remove_project_with_exact_name_match(self, temp_config_manager):
        """Test remove_project when project name matches config key exactly."""
        config_manager = temp_config_manager

        # Verify project exists
        config = config_manager.load_config()
        assert "test-project" in config.projects

        # Remove the project with exact name match
        config_manager.remove_project("test-project")

        # Verify the project was removed
        config = config_manager.load_config()
        assert "test-project" not in config.projects

    def test_remove_project_with_permalink_lookup(self, temp_config_manager):
        """Test remove_project when input needs permalink normalization."""
        config_manager = temp_config_manager

        # Add a project with normalized key
        config = config_manager.load_config()
        config.projects["special-chars-project"] = str(Path("/tmp/special"))
        config_manager.save_config(config)

        # Remove using a name that will normalize to the config key
        config_manager.remove_project(
            "Special Chars Project"
        )  # This should normalize to "special-chars-project"

        # Verify the project was removed using the correct config key
        updated_config = config_manager.load_config()
        assert "special-chars-project" not in updated_config.projects

    def test_remove_project_uses_canonical_name(self, temp_config_manager):
        """Test that remove_project uses the canonical config key, not user input."""
        config_manager = temp_config_manager

        # Add a project with a config key that differs from user input
        config = config_manager.load_config()
        config.projects["my-test-project"] = str(Path("/tmp/mytest"))
        config_manager.save_config(config)

        # Remove using input that will match but is different from config key
        config_manager.remove_project("My Test Project")  # Should find "my-test-project"

        # Verify that the canonical config key was removed
        updated_config = config_manager.load_config()
        assert "my-test-project" not in updated_config.projects

    def test_remove_project_nonexistent_project(self, temp_config_manager):
        """Test remove_project raises ValueError for nonexistent project."""
        config_manager = temp_config_manager

        with pytest.raises(ValueError, match="Project 'nonexistent' not found"):
            config_manager.remove_project("nonexistent")

    def test_remove_project_cannot_remove_default(self, temp_config_manager):
        """Test remove_project raises ValueError when trying to remove default project."""
        config_manager = temp_config_manager

        # Try to remove the default project
        with pytest.raises(ValueError, match="Cannot remove the default project"):
            config_manager.remove_project("main")

    def test_config_with_cloud_projects_empty_by_default(self, temp_config_manager):
        """Test that cloud_projects field exists and defaults to empty dict."""
        config_manager = temp_config_manager
        config = config_manager.load_config()

        assert hasattr(config, "cloud_projects")
        assert config.cloud_projects == {}

    def test_save_and_load_config_with_cloud_projects(self):
        """Test that config with cloud_projects can be saved and loaded."""
        with tempfile.TemporaryDirectory() as temp_dir:
            temp_path = Path(temp_dir)

            config_manager = ConfigManager()
            config_manager.config_dir = temp_path / "basic-memory"
            config_manager.config_file = config_manager.config_dir / "config.json"
            config_manager.config_dir.mkdir(parents=True, exist_ok=True)

            # Create config with cloud_projects
            now = datetime.now()
            test_config = BasicMemoryConfig(
                projects={"main": str(temp_path / "main")},
                cloud_projects={
                    "research": CloudProjectConfig(
                        local_path=str(temp_path / "research-local"),
                        last_sync=now,
                        bisync_initialized=True,
                    )
                },
            )
            config_manager.save_config(test_config)

            # Load and verify
            loaded_config = config_manager.load_config()
            assert "research" in loaded_config.cloud_projects
            assert loaded_config.cloud_projects["research"].local_path == str(
                temp_path / "research-local"
            )
            assert loaded_config.cloud_projects["research"].bisync_initialized is True
            assert loaded_config.cloud_projects["research"].last_sync == now

    def test_add_cloud_project_to_existing_config(self):
        """Test adding cloud projects to an existing config file."""
        with tempfile.TemporaryDirectory() as temp_dir:
            temp_path = Path(temp_dir)

            config_manager = ConfigManager()
            config_manager.config_dir = temp_path / "basic-memory"
            config_manager.config_file = config_manager.config_dir / "config.json"
            config_manager.config_dir.mkdir(parents=True, exist_ok=True)

            # Create initial config without cloud projects
            initial_config = BasicMemoryConfig(projects={"main": str(temp_path / "main")})
            config_manager.save_config(initial_config)

            # Load, modify, and save
            config = config_manager.load_config()
            assert config.cloud_projects == {}

            config.cloud_projects["work"] = CloudProjectConfig(
                local_path=str(temp_path / "work-local")
            )
            config_manager.save_config(config)

            # Reload and verify persistence
            reloaded_config = config_manager.load_config()
            assert "work" in reloaded_config.cloud_projects
            assert reloaded_config.cloud_projects["work"].local_path == str(
                temp_path / "work-local"
            )
            assert reloaded_config.cloud_projects["work"].bisync_initialized is False

    def test_backward_compatibility_loading_config_without_cloud_projects(self):
        """Test that old config files without cloud_projects field can be loaded."""
        with tempfile.TemporaryDirectory() as temp_dir:
            temp_path = Path(temp_dir)

            config_manager = ConfigManager()
            config_manager.config_dir = temp_path / "basic-memory"
            config_manager.config_file = config_manager.config_dir / "config.json"
            config_manager.config_dir.mkdir(parents=True, exist_ok=True)

            # Manually write old-style config without cloud_projects
            import json

            old_config_data = {
                "env": "dev",
                "projects": {"main": str(temp_path / "main")},
                "default_project": "main",
                "log_level": "INFO",
            }
            config_manager.config_file.write_text(json.dumps(old_config_data, indent=2))

            # Should load successfully with cloud_projects defaulting to empty dict
            config = config_manager.load_config()
            assert config.cloud_projects == {}
            assert config.projects == {"main": str(temp_path / "main")}

```

--------------------------------------------------------------------------------
/tests/sync/test_watch_service.py:
--------------------------------------------------------------------------------

```python
"""Tests for watch service."""

import asyncio
import json
from pathlib import Path

import pytest
from watchfiles import Change

from basic_memory.models.project import Project
from basic_memory.sync.watch_service import WatchService, WatchServiceState


async def create_test_file(path: Path, content: str = "test content") -> None:
    """Create a test file with given content."""
    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(content)


@pytest.fixture
def watch_service(sync_service, file_service, project_config):
    """Create watch service instance."""
    return WatchService(sync_service, file_service, project_config)


def test_watch_service_init(watch_service, project_config):
    """Test watch service initialization."""
    assert watch_service.status_path.parent.exists()


def test_state_add_event():
    """Test adding events to state."""
    state = WatchServiceState()
    event = state.add_event(path="test.md", action="new", status="success", checksum="abcd1234")

    assert len(state.recent_events) == 1
    assert state.recent_events[0] == event
    assert event.path == "test.md"
    assert event.action == "new"
    assert event.checksum == "abcd1234"

    # Test event limit
    for i in range(110):
        state.add_event(f"test{i}.md", "new", "success")
    assert len(state.recent_events) == 100


def test_state_record_error():
    """Test error recording in state."""
    state = WatchServiceState()
    state.record_error("test error")

    assert state.error_count == 1
    assert state.last_error is not None
    assert len(state.recent_events) == 1
    assert state.recent_events[0].action == "sync"
    assert state.recent_events[0].status == "error"
    assert state.recent_events[0].error == "test error"


@pytest.mark.asyncio
async def test_write_status(watch_service):
    """Test writing status file."""
    await watch_service.write_status()

    assert watch_service.status_path.exists()
    data = json.loads(watch_service.status_path.read_text(encoding="utf-8"))
    assert not data["running"]
    assert data["error_count"] == 0


@pytest.mark.asyncio
async def test_handle_file_add(watch_service, project_config, test_project, entity_repository):
    """Test handling new file creation."""
    project_dir = project_config.home

    # empty dir is ignored
    empty_dir = project_dir / "empty_dir"
    empty_dir.mkdir()

    # Setup changes
    new_file = project_dir / "new_note.md"
    changes = {(Change.added, str(empty_dir)), (Change.added, str(new_file))}

    # Create the file
    content = """---
type: knowledge
---
# New Note
Test content
"""
    await create_test_file(new_file, content)

    # Handle changes
    await watch_service.handle_changes(test_project, changes)

    # Verify
    entity = await entity_repository.get_by_file_path("new_note.md")
    assert entity is not None
    assert entity.title == "new_note"

    # Check event was recorded
    events = [e for e in watch_service.state.recent_events if e.action == "new"]
    assert len(events) == 1
    assert events[0].path == "new_note.md"
    assert events[0].status == "success"


@pytest.mark.asyncio
async def test_handle_file_modify(watch_service, project_config, sync_service, test_project):
    """Test handling file modifications."""
    project_dir = project_config.home

    # empty dir is ignored
    empty_dir = project_dir / "empty_dir"
    empty_dir.mkdir()

    # Create initial file
    test_file = project_dir / "test_note.md"
    initial_content = """---
type: knowledge
---
# Test Note
Initial content
"""
    await create_test_file(test_file, initial_content)

    # Initial sync
    await sync_service.sync(project_dir)

    # Modify file
    modified_content = """---
type: knowledge
---
# Test Note
Modified content
"""
    await create_test_file(test_file, modified_content)

    # Setup changes
    changes = {(Change.modified, str(empty_dir)), (Change.modified, str(test_file))}

    # Handle changes
    await watch_service.handle_changes(test_project, changes)

    # Verify
    entity = await sync_service.entity_repository.get_by_file_path("test_note.md")
    assert entity is not None

    # Check event was recorded
    events = [e for e in watch_service.state.recent_events if e.action == "modified"]
    assert len(events) == 1
    assert events[0].path == "test_note.md"
    assert events[0].status == "success"


@pytest.mark.asyncio
async def test_handle_file_delete(watch_service, project_config, test_project, sync_service):
    """Test handling file deletion."""
    project_dir = project_config.home

    # Create initial file
    test_file = project_dir / "to_delete.md"
    content = """---
type: knowledge
---
# Delete Test
Test content
"""
    await create_test_file(test_file, content)

    # Initial sync
    await sync_service.sync(project_dir)

    # Delete file
    test_file.unlink()

    # Setup changes
    changes = {(Change.deleted, str(test_file))}

    # Handle changes
    await watch_service.handle_changes(test_project, changes)

    # Verify
    entity = await sync_service.entity_repository.get_by_file_path("to_delete.md")
    assert entity is None

    # Check event was recorded
    events = [e for e in watch_service.state.recent_events if e.action == "deleted"]
    assert len(events) == 1
    assert events[0].path == "to_delete.md"
    assert events[0].status == "success"


@pytest.mark.asyncio
async def test_handle_file_move(watch_service, project_config, test_project, sync_service):
    """Test handling file moves."""
    project_dir = project_config.home

    # Create initial file
    old_path = project_dir / "old" / "test_move.md"
    content = """---
type: knowledge
---
# Move Test
Test content
"""
    await create_test_file(old_path, content)

    # Initial sync
    await sync_service.sync(project_dir)
    initial_entity = await sync_service.entity_repository.get_by_file_path("old/test_move.md")

    # Move file
    new_path = project_dir / "new" / "moved_file.md"
    new_path.parent.mkdir(parents=True)
    old_path.rename(new_path)

    # Setup changes
    changes = {(Change.deleted, str(old_path)), (Change.added, str(new_path))}

    # Handle changes
    await watch_service.handle_changes(test_project, changes)

    # Verify
    moved_entity = await sync_service.entity_repository.get_by_file_path("new/moved_file.md")
    assert moved_entity is not None
    assert moved_entity.id == initial_entity.id  # Same entity, new path

    # Original path should no longer exist
    old_entity = await sync_service.entity_repository.get_by_file_path("old/test_move.md")
    assert old_entity is None

    # Check event was recorded
    events = [e for e in watch_service.state.recent_events if e.action == "moved"]
    assert len(events) == 1
    assert events[0].path == "old/test_move.md -> new/moved_file.md"
    assert events[0].status == "success"


@pytest.mark.asyncio
async def test_handle_concurrent_changes(watch_service, project_config, test_project, sync_service):
    """Test handling multiple file changes happening close together."""
    project_dir = project_config.home

    # Create multiple files with small delays to simulate concurrent changes
    async def create_files():
        # Create first file
        file1 = project_dir / "note1.md"
        await create_test_file(file1, "First note")
        await asyncio.sleep(0.1)

        # Create second file
        file2 = project_dir / "note2.md"
        await create_test_file(file2, "Second note")
        await asyncio.sleep(0.1)

        # Modify first file
        await create_test_file(file1, "Modified first note")

        return file1, file2

    # Create files and collect changes
    file1, file2 = await create_files()

    # Setup combined changes
    changes = {
        (Change.added, str(file1)),
        (Change.modified, str(file1)),
        (Change.added, str(file2)),
    }

    # Handle changes
    await watch_service.handle_changes(test_project, changes)

    # Verify both files were processed
    entity1 = await sync_service.entity_repository.get_by_file_path("note1.md")
    entity2 = await sync_service.entity_repository.get_by_file_path("note2.md")

    assert entity1 is not None
    assert entity2 is not None

    # Check events were recorded in correct order
    events = watch_service.state.recent_events
    actions = [e.action for e in events]
    assert "new" in actions
    assert "modified" not in actions  # only process file once


@pytest.mark.asyncio
async def test_handle_rapid_move(watch_service, project_config, test_project, sync_service):
    """Test handling rapid move operations."""
    project_dir = project_config.home

    # Create initial file
    original_path = project_dir / "original.md"
    content = """---
type: knowledge
---
# Move Test
Test content for rapid moves
"""
    await create_test_file(original_path, content)
    await sync_service.sync(project_dir)

    # Perform rapid moves
    temp_path = project_dir / "temp.md"
    final_path = project_dir / "final.md"

    original_path.rename(temp_path)
    await asyncio.sleep(0.1)
    temp_path.rename(final_path)

    # Setup changes that might come in various orders
    changes = {
        (Change.deleted, str(original_path)),
        (Change.added, str(temp_path)),
        (Change.deleted, str(temp_path)),
        (Change.added, str(final_path)),
    }

    # Handle changes
    await watch_service.handle_changes(test_project, changes)

    # Verify final state
    final_entity = await sync_service.entity_repository.get_by_file_path("final.md")
    assert final_entity is not None

    # Intermediate paths should not exist
    original_entity = await sync_service.entity_repository.get_by_file_path("original.md")
    temp_entity = await sync_service.entity_repository.get_by_file_path("temp.md")
    assert original_entity is None
    assert temp_entity is None


@pytest.mark.asyncio
async def test_handle_delete_then_add(watch_service, project_config, test_project, sync_service):
    """Test handling rapid move operations."""
    project_dir = project_config.home

    # Create initial file
    original_path = project_dir / "original.md"
    content = """---
type: knowledge
---
# Move Test
Test content for rapid moves
"""
    await create_test_file(original_path, content)

    # Setup changes that might come in various orders
    changes = {
        (Change.deleted, str(original_path)),
        (Change.added, str(original_path)),
    }

    # Handle changes
    await watch_service.handle_changes(test_project, changes)

    # Verify final state
    original_entity = await sync_service.entity_repository.get_by_file_path("original.md")
    assert original_entity is None  # delete event is handled


@pytest.mark.asyncio
async def test_handle_directory_rename(watch_service, project_config, test_project, sync_service):
    """Test handling directory rename operations - regression test for the bug where directories
    were being processed as files, causing errors."""
    from unittest.mock import AsyncMock

    project_dir = project_config.home

    # Create a directory with a file inside
    old_dir_path = project_dir / "old_dir"
    old_dir_path.mkdir(parents=True, exist_ok=True)

    file_in_dir = old_dir_path / "test_file.md"
    content = """---
type: knowledge
---
# Test File
This is a test file in a directory
"""
    await create_test_file(file_in_dir, content)

    # Initial sync to add the file to the database
    await sync_service.sync(project_dir)

    # Rename the directory
    new_dir_path = project_dir / "new_dir"
    old_dir_path.rename(new_dir_path)

    # Setup changes that simulate directory rename
    # When a directory is renamed, watchfiles reports it as deleted and added
    changes = {
        (Change.deleted, str(old_dir_path)),
        (Change.added, str(new_dir_path)),
    }

    # Create a mocked version of sync_file to track calls
    original_sync_file = sync_service.sync_file
    mock_sync_file = AsyncMock(side_effect=original_sync_file)
    sync_service.sync_file = mock_sync_file

    # Handle changes - this should not throw an exception
    await watch_service.handle_changes(test_project, changes)

    # Check if our mock was called with any directory paths
    for call in mock_sync_file.call_args_list:
        args, kwargs = call
        path = args[0]
        full_path = project_dir / path
        assert not full_path.is_dir(), f"sync_file should not be called with directory path: {path}"

    # The file path should be untouched since we're ignoring directory events
    # We'd need a separate event for the file itself to be updated
    old_entity = await sync_service.entity_repository.get_by_file_path("old_dir/test_file.md")

    # The original entity should still exist since we only renamed the directory
    # but didn't process updates to the file itself
    assert old_entity is not None


def test_is_project_path(watch_service, tmp_path):
    """Test the is_project_path method to ensure it correctly identifies paths within a project."""
    # Create a project at a specific path
    project_path = tmp_path / "project"
    project_path.mkdir(parents=True, exist_ok=True)

    # Create a file inside the project
    file_in_project = project_path / "subdirectory" / "file.md"
    file_in_project.parent.mkdir(parents=True, exist_ok=True)
    file_in_project.touch()

    # Create a file outside the project
    file_outside_project = tmp_path / "outside" / "file.md"
    file_outside_project.parent.mkdir(parents=True, exist_ok=True)
    file_outside_project.touch()

    # Create Project object with our path
    project = Project(id=1, name="test", path=str(project_path), permalink="test")

    # Test a file inside the project
    assert watch_service.is_project_path(project, file_in_project) is True

    # Test a file outside the project
    assert watch_service.is_project_path(project, file_outside_project) is False

    # Test the project path itself
    assert watch_service.is_project_path(project, project_path) is False


@pytest.mark.asyncio
async def test_handle_changes_skips_deleted_project(
    watch_service, project_config, test_project, sync_service, project_service, tmp_path
):
    """Test that handle_changes skips processing changes for projects that have been deleted.

    This is a regression test for issue #193 where deleted projects were being recreated
    by background sync because the directory still existed on disk.
    """
    from textwrap import dedent

    project_dir = project_config.home

    # Create a test file in the project
    test_file = project_dir / "test_note.md"
    content = dedent("""
        ---
        type: knowledge
        ---
        # Test Note
        Test content
    """).strip()
    await create_test_file(test_file, content)

    # Initial sync to create the entity
    await sync_service.sync(project_dir)

    # Verify entity was created
    entity_before = await sync_service.entity_repository.get_by_file_path("test_note.md")
    assert entity_before is not None

    # Create a second project directly in the database and set it as default
    # so we can remove the first one (cannot remove default project)
    other_project_path = str(tmp_path.parent / "other-project-for-test")
    project_data = {
        "name": "other-project",
        "path": other_project_path,
        "permalink": "other-project",
        "is_active": True,
    }
    other_project = await project_service.repository.create(project_data)
    await project_service.repository.set_as_default(other_project.id)

    # Also add to config
    config = project_service.config_manager.load_config()
    config.projects["other-project"] = other_project_path
    config.default_project = "other-project"
    project_service.config_manager.save_config(config)

    # Remove the test project from configuration (simulating project deletion)
    # This should prevent background sync from processing changes
    await project_service.remove_project(test_project.name)

    # Simulate file changes after project deletion
    # These changes should be ignored by the watch service
    modified_content = dedent("""
        ---
        type: knowledge
        ---
        # Test Note
        Modified content after project deletion
    """).strip()
    await create_test_file(test_file, modified_content)

    changes = {(Change.modified, str(test_file))}

    # Handle changes - should skip processing since project is deleted
    await watch_service.handle_changes(test_project, changes)

    # Verify that the entity was NOT re-created or updated
    # Since the project was deleted, the database should still have the old state
    # or the entity should be gone entirely if cleanup happened
    entity_after = await sync_service.entity_repository.get_by_file_path("test_note.md")

    # The entity might be deleted or unchanged, but it should not be updated with new content
    if entity_after is not None:
        # If the entity still exists, it should have the old content, not the new content
        assert entity_after.checksum == entity_before.checksum, (
            "Entity should not be updated for deleted project"
        )

```

--------------------------------------------------------------------------------
/tests/repository/test_entity_repository_upsert.py:
--------------------------------------------------------------------------------

```python
"""Tests for the entity repository UPSERT functionality."""

import pytest
from datetime import datetime, timezone

from basic_memory.models.knowledge import Entity
from basic_memory.repository.entity_repository import EntityRepository
from basic_memory.repository.project_repository import ProjectRepository
from basic_memory.services.exceptions import SyncFatalError


@pytest.mark.asyncio
async def test_upsert_entity_new_entity(entity_repository: EntityRepository):
    """Test upserting a completely new entity."""
    entity = Entity(
        project_id=entity_repository.project_id,
        title="Test Entity",
        entity_type="note",
        permalink="test/test-entity",
        file_path="test/test-entity.md",
        content_type="text/markdown",
        created_at=datetime.now(timezone.utc),
        updated_at=datetime.now(timezone.utc),
    )

    result = await entity_repository.upsert_entity(entity)

    assert result.id is not None
    assert result.title == "Test Entity"
    assert result.permalink == "test/test-entity"
    assert result.file_path == "test/test-entity.md"


@pytest.mark.asyncio
async def test_upsert_entity_same_file_update(entity_repository: EntityRepository):
    """Test upserting an entity that already exists with same file_path."""
    # Create initial entity
    entity1 = Entity(
        project_id=entity_repository.project_id,
        title="Original Title",
        entity_type="note",
        permalink="test/test-entity",
        file_path="test/test-entity.md",
        content_type="text/markdown",
        created_at=datetime.now(timezone.utc),
        updated_at=datetime.now(timezone.utc),
    )

    result1 = await entity_repository.upsert_entity(entity1)
    original_id = result1.id

    # Update with same file_path and permalink
    entity2 = Entity(
        project_id=entity_repository.project_id,
        title="Updated Title",
        entity_type="note",
        permalink="test/test-entity",  # Same permalink
        file_path="test/test-entity.md",  # Same file_path
        content_type="text/markdown",
        created_at=datetime.now(timezone.utc),
        updated_at=datetime.now(timezone.utc),
    )

    result2 = await entity_repository.upsert_entity(entity2)

    # Should update existing entity (same ID)
    assert result2.id == original_id
    assert result2.title == "Updated Title"
    assert result2.permalink == "test/test-entity"
    assert result2.file_path == "test/test-entity.md"


@pytest.mark.asyncio
async def test_upsert_entity_permalink_conflict_different_file(entity_repository: EntityRepository):
    """Test upserting an entity with permalink conflict but different file_path."""
    # Create initial entity
    entity1 = Entity(
        project_id=entity_repository.project_id,
        title="First Entity",
        entity_type="note",
        permalink="test/shared-permalink",
        file_path="test/first-file.md",
        content_type="text/markdown",
        created_at=datetime.now(timezone.utc),
        updated_at=datetime.now(timezone.utc),
    )

    result1 = await entity_repository.upsert_entity(entity1)
    first_id = result1.id

    # Try to create entity with same permalink but different file_path
    entity2 = Entity(
        project_id=entity_repository.project_id,
        title="Second Entity",
        entity_type="note",
        permalink="test/shared-permalink",  # Same permalink
        file_path="test/second-file.md",  # Different file_path
        content_type="text/markdown",
        created_at=datetime.now(timezone.utc),
        updated_at=datetime.now(timezone.utc),
    )

    result2 = await entity_repository.upsert_entity(entity2)

    # Should create new entity with unique permalink
    assert result2.id != first_id
    assert result2.title == "Second Entity"
    assert result2.permalink == "test/shared-permalink-1"  # Should get suffix
    assert result2.file_path == "test/second-file.md"

    # Original entity should be unchanged
    original = await entity_repository.get_by_permalink("test/shared-permalink")
    assert original is not None
    assert original.id == first_id
    assert original.title == "First Entity"


@pytest.mark.asyncio
async def test_upsert_entity_multiple_permalink_conflicts(entity_repository: EntityRepository):
    """Test upserting multiple entities with permalink conflicts."""
    base_permalink = "test/conflict"

    # Create entities with conflicting permalinks
    entities = []
    for i in range(3):
        entity = Entity(
            project_id=entity_repository.project_id,
            title=f"Entity {i + 1}",
            entity_type="note",
            permalink=base_permalink,  # All try to use same permalink
            file_path=f"test/file-{i + 1}.md",  # Different file paths
            content_type="text/markdown",
            created_at=datetime.now(timezone.utc),
            updated_at=datetime.now(timezone.utc),
        )

        result = await entity_repository.upsert_entity(entity)
        entities.append(result)

    # Verify permalinks are unique
    expected_permalinks = ["test/conflict", "test/conflict-1", "test/conflict-2"]
    actual_permalinks = [entity.permalink for entity in entities]

    assert set(actual_permalinks) == set(expected_permalinks)

    # Verify all entities were created (different IDs)
    entity_ids = [entity.id for entity in entities]
    assert len(set(entity_ids)) == 3


@pytest.mark.asyncio
async def test_upsert_entity_race_condition_file_path(entity_repository: EntityRepository):
    """Test that upsert handles file_path conflicts using ON CONFLICT DO UPDATE.

    With SQLite's ON CONFLICT, race conditions are handled at the database level
    without requiring application-level checks. This test verifies that updating
    an existing entity by file_path works correctly.
    """
    # Create an entity first
    entity1 = Entity(
        project_id=entity_repository.project_id,
        title="Original Entity",
        entity_type="note",
        permalink="test/original",
        file_path="test/race-file.md",
        content_type="text/markdown",
        created_at=datetime.now(timezone.utc),
        updated_at=datetime.now(timezone.utc),
    )

    result1 = await entity_repository.upsert_entity(entity1)
    original_id = result1.id

    # Create another entity with same file_path but different title and permalink
    # This simulates a concurrent update scenario
    entity2 = Entity(
        project_id=entity_repository.project_id,
        title="Race Condition Test",
        entity_type="note",
        permalink="test/race-entity",
        file_path="test/race-file.md",  # Same file path as entity1
        content_type="text/markdown",
        created_at=datetime.now(timezone.utc),
        updated_at=datetime.now(timezone.utc),
    )

    # ON CONFLICT should update the existing entity
    result2 = await entity_repository.upsert_entity(entity2)

    # Should return the updated original entity (same ID)
    assert result2.id == original_id
    assert result2.title == "Race Condition Test"  # Updated title
    assert result2.file_path == "test/race-file.md"  # Same file path
    assert result2.permalink == "test/race-entity"  # Updated permalink


@pytest.mark.asyncio
async def test_upsert_entity_gap_in_suffixes(entity_repository: EntityRepository):
    """Test that upsert finds the next available suffix even with gaps."""
    # Manually create entities with non-sequential suffixes
    base_permalink = "test/gap"

    # Create entities with permalinks: "test/gap", "test/gap-1", "test/gap-3"
    # (skipping "test/gap-2")
    permalinks = [base_permalink, f"{base_permalink}-1", f"{base_permalink}-3"]

    for i, permalink in enumerate(permalinks):
        entity = Entity(
            project_id=entity_repository.project_id,
            title=f"Entity {i + 1}",
            entity_type="note",
            permalink=permalink,
            file_path=f"test/gap-file-{i + 1}.md",
            content_type="text/markdown",
            created_at=datetime.now(timezone.utc),
            updated_at=datetime.now(timezone.utc),
        )
        await entity_repository.add(entity)  # Use direct add to set specific permalinks

    # Now try to upsert a new entity that should get "test/gap-2"
    new_entity = Entity(
        project_id=entity_repository.project_id,
        title="Gap Filler",
        entity_type="note",
        permalink=base_permalink,  # Will conflict
        file_path="test/gap-new-file.md",
        content_type="text/markdown",
        created_at=datetime.now(timezone.utc),
        updated_at=datetime.now(timezone.utc),
    )

    result = await entity_repository.upsert_entity(new_entity)

    # Should get the next available suffix - our implementation finds gaps
    # so it should be "test/gap-2" (filling the gap)
    assert result.permalink == "test/gap-2"
    assert result.title == "Gap Filler"


@pytest.mark.asyncio
async def test_upsert_entity_project_scoping_isolation(session_maker):
    """Test that upsert_entity properly scopes entities by project_id.

    This test ensures that the fix for issue #167 works correctly by verifying:
    1. Entities with same permalinks/file_paths can exist in different projects
    2. Upsert operations properly scope queries by project_id
    3. No "multiple rows" errors occur when similar entities exist across projects
    """
    # Create two separate projects
    project_repository = ProjectRepository(session_maker)

    project1_data = {
        "name": "project-1",
        "description": "First test project",
        "path": "/tmp/project1",
        "is_active": True,
        "is_default": False,
    }
    project1 = await project_repository.create(project1_data)

    project2_data = {
        "name": "project-2",
        "description": "Second test project",
        "path": "/tmp/project2",
        "is_active": True,
        "is_default": False,
    }
    project2 = await project_repository.create(project2_data)

    # Create entity repositories for each project
    repo1 = EntityRepository(session_maker, project_id=project1.id)
    repo2 = EntityRepository(session_maker, project_id=project2.id)

    # Create entities with identical permalinks and file_paths in different projects
    entity1 = Entity(
        project_id=project1.id,
        title="Shared Entity",
        entity_type="note",
        permalink="docs/shared-name",
        file_path="docs/shared-name.md",
        content_type="text/markdown",
        created_at=datetime.now(timezone.utc),
        updated_at=datetime.now(timezone.utc),
    )

    entity2 = Entity(
        project_id=project2.id,
        title="Shared Entity",
        entity_type="note",
        permalink="docs/shared-name",  # Same permalink
        file_path="docs/shared-name.md",  # Same file_path
        content_type="text/markdown",
        created_at=datetime.now(timezone.utc),
        updated_at=datetime.now(timezone.utc),
    )

    # These should succeed without "multiple rows" errors
    result1 = await repo1.upsert_entity(entity1)
    result2 = await repo2.upsert_entity(entity2)

    # Verify both entities were created successfully
    assert result1.id is not None
    assert result2.id is not None
    assert result1.id != result2.id  # Different entities
    assert result1.project_id == project1.id
    assert result2.project_id == project2.id
    assert result1.permalink == "docs/shared-name"
    assert result2.permalink == "docs/shared-name"

    # Test updating entities in different projects (should also work without conflicts)
    entity1_update = Entity(
        project_id=project1.id,
        title="Updated Shared Entity",
        entity_type="note",
        permalink="docs/shared-name",
        file_path="docs/shared-name.md",
        content_type="text/markdown",
        created_at=datetime.now(timezone.utc),
        updated_at=datetime.now(timezone.utc),
    )

    entity2_update = Entity(
        project_id=project2.id,
        title="Also Updated Shared Entity",
        entity_type="note",
        permalink="docs/shared-name",
        file_path="docs/shared-name.md",
        content_type="text/markdown",
        created_at=datetime.now(timezone.utc),
        updated_at=datetime.now(timezone.utc),
    )

    # Updates should work without conflicts
    updated1 = await repo1.upsert_entity(entity1_update)
    updated2 = await repo2.upsert_entity(entity2_update)

    # Should update existing entities (same IDs)
    assert updated1.id == result1.id
    assert updated2.id == result2.id
    assert updated1.title == "Updated Shared Entity"
    assert updated2.title == "Also Updated Shared Entity"

    # Verify cross-project queries don't interfere
    found_in_project1 = await repo1.get_by_permalink("docs/shared-name")
    found_in_project2 = await repo2.get_by_permalink("docs/shared-name")

    assert found_in_project1 is not None
    assert found_in_project2 is not None
    assert found_in_project1.id == updated1.id
    assert found_in_project2.id == updated2.id
    assert found_in_project1.title == "Updated Shared Entity"
    assert found_in_project2.title == "Also Updated Shared Entity"


@pytest.mark.asyncio
async def test_upsert_entity_permalink_conflict_within_project_only(session_maker):
    """Test that permalink conflicts only occur within the same project.

    This ensures that the project scoping fix allows entities with identical
    permalinks to exist across different projects without triggering
    permalink conflict resolution.
    """
    # Create two separate projects
    project_repository = ProjectRepository(session_maker)

    project1_data = {
        "name": "conflict-project-1",
        "description": "First conflict test project",
        "path": "/tmp/conflict1",
        "is_active": True,
        "is_default": False,
    }
    project1 = await project_repository.create(project1_data)

    project2_data = {
        "name": "conflict-project-2",
        "description": "Second conflict test project",
        "path": "/tmp/conflict2",
        "is_active": True,
        "is_default": False,
    }
    project2 = await project_repository.create(project2_data)

    # Create entity repositories for each project
    repo1 = EntityRepository(session_maker, project_id=project1.id)
    repo2 = EntityRepository(session_maker, project_id=project2.id)

    # Create first entity in project1
    entity1 = Entity(
        project_id=project1.id,
        title="Original Entity",
        entity_type="note",
        permalink="test/conflict-permalink",
        file_path="test/original.md",
        content_type="text/markdown",
        created_at=datetime.now(timezone.utc),
        updated_at=datetime.now(timezone.utc),
    )

    result1 = await repo1.upsert_entity(entity1)
    assert result1.permalink == "test/conflict-permalink"

    # Create entity with same permalink in project2 (should NOT get suffix)
    entity2 = Entity(
        project_id=project2.id,
        title="Cross-Project Entity",
        entity_type="note",
        permalink="test/conflict-permalink",  # Same permalink, different project
        file_path="test/cross-project.md",
        content_type="text/markdown",
        created_at=datetime.now(timezone.utc),
        updated_at=datetime.now(timezone.utc),
    )

    result2 = await repo2.upsert_entity(entity2)
    # Should keep original permalink (no suffix) since it's in a different project
    assert result2.permalink == "test/conflict-permalink"

    # Now create entity with same permalink in project1 (should get suffix)
    entity3 = Entity(
        project_id=project1.id,
        title="Conflict Entity",
        entity_type="note",
        permalink="test/conflict-permalink",  # Same permalink, same project
        file_path="test/conflict.md",
        content_type="text/markdown",
        created_at=datetime.now(timezone.utc),
        updated_at=datetime.now(timezone.utc),
    )

    result3 = await repo1.upsert_entity(entity3)
    # Should get suffix since it conflicts within the same project
    assert result3.permalink == "test/conflict-permalink-1"

    # Verify all entities exist correctly
    assert result1.id != result2.id != result3.id
    assert result1.project_id == project1.id
    assert result2.project_id == project2.id
    assert result3.project_id == project1.id


@pytest.mark.asyncio
async def test_upsert_entity_with_invalid_project_id(entity_repository: EntityRepository):
    """Test that upserting with non-existent project_id raises clear error.

    This tests the fix for issue #188 where sync fails with FOREIGN KEY constraint
    violations when a project is deleted during sync operations.
    """
    # Create entity with non-existent project_id
    entity = Entity(
        title="Test Entity",
        entity_type="note",
        file_path="test.md",
        permalink="test",
        project_id=99999,  # This project doesn't exist
        content_type="text/markdown",
        created_at=datetime.now(timezone.utc),
        updated_at=datetime.now(timezone.utc),
    )

    # Should raise SyncFatalError with clear message about missing project
    with pytest.raises(SyncFatalError) as exc_info:
        await entity_repository.upsert_entity(entity)

    error_msg = str(exc_info.value)
    assert "project_id=99999 does not exist" in error_msg
    assert "project may have been deleted" in error_msg.lower()
    assert "sync will be terminated" in error_msg.lower()

```

--------------------------------------------------------------------------------
/src/basic_memory/mcp/tools/utils.py:
--------------------------------------------------------------------------------

```python
"""Utility functions for making HTTP requests in Basic Memory MCP tools.

These functions provide a consistent interface for making HTTP requests
to the Basic Memory API, with improved error handling and logging.
"""

import typing
from typing import Optional

from httpx import Response, URL, AsyncClient, HTTPStatusError
from httpx._client import UseClientDefault, USE_CLIENT_DEFAULT
from httpx._types import (
    RequestContent,
    RequestData,
    RequestFiles,
    QueryParamTypes,
    HeaderTypes,
    CookieTypes,
    AuthTypes,
    TimeoutTypes,
    RequestExtensions,
)
from loguru import logger
from mcp.server.fastmcp.exceptions import ToolError


def get_error_message(
    status_code: int, url: URL | str, method: str, msg: Optional[str] = None
) -> str:
    """Get a friendly error message based on the HTTP status code.

    Args:
        status_code: The HTTP status code
        url: The URL that was requested
        method: The HTTP method used

    Returns:
        A user-friendly error message
    """
    # Extract path from URL for cleaner error messages
    if isinstance(url, str):
        path = url.split("/")[-1]
    else:
        path = str(url).split("/")[-1] if url else "resource"

    # Client errors (400-499)
    if status_code == 400:
        return f"Invalid request: The request to '{path}' was malformed or invalid"
    elif status_code == 401:  # pragma: no cover
        return f"Authentication required: You need to authenticate to access '{path}'"
    elif status_code == 403:  # pragma: no cover
        return f"Access denied: You don't have permission to access '{path}'"
    elif status_code == 404:
        return f"Resource not found: '{path}' doesn't exist or has been moved"
    elif status_code == 409:  # pragma: no cover
        return f"Conflict: The request for '{path}' conflicts with the current state"
    elif status_code == 429:  # pragma: no cover
        return "Too many requests: Please slow down and try again later"
    elif 400 <= status_code < 500:  # pragma: no cover
        return f"Client error ({status_code}): The request for '{path}' could not be completed"

    # Server errors (500-599)
    elif status_code == 500:
        return f"Internal server error: Something went wrong processing '{path}'"
    elif status_code == 503:  # pragma: no cover
        return (
            f"Service unavailable: The server is currently unable to handle requests for '{path}'"
        )
    elif 500 <= status_code < 600:  # pragma: no cover
        return f"Server error ({status_code}): The server encountered an error handling '{path}'"

    # Fallback for any other status code
    else:  # pragma: no cover
        return f"HTTP error {status_code}: {method} request to '{path}' failed"


async def call_get(
    client: AsyncClient,
    url: URL | str,
    *,
    params: QueryParamTypes | None = None,
    headers: HeaderTypes | None = None,
    cookies: CookieTypes | None = None,
    auth: AuthTypes | UseClientDefault | None = USE_CLIENT_DEFAULT,
    follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
    timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
    extensions: RequestExtensions | None = None,
) -> Response:
    """Make a GET request and handle errors appropriately.

    Args:
        client: The HTTPX AsyncClient to use
        url: The URL to request
        params: Query parameters
        headers: HTTP headers
        cookies: HTTP cookies
        auth: Authentication
        follow_redirects: Whether to follow redirects
        timeout: Request timeout
        extensions: HTTPX extensions

    Returns:
        The HTTP response

    Raises:
        ToolError: If the request fails with an appropriate error message
    """
    logger.debug(f"Calling GET '{url}' params: '{params}'")
    error_message = None

    try:
        response = await client.get(
            url,
            params=params,
            headers=headers,
            cookies=cookies,
            auth=auth,
            follow_redirects=follow_redirects,
            timeout=timeout,
            extensions=extensions,
        )

        if response.is_success:
            return response

        # Handle different status codes differently
        status_code = response.status_code
        # get the message if available
        response_data = response.json()
        if isinstance(response_data, dict) and "detail" in response_data:
            error_message = response_data["detail"]
        else:
            error_message = get_error_message(status_code, url, "PUT")

        # Log at appropriate level based on status code
        if 400 <= status_code < 500:
            # Client errors: log as info except for 429 (Too Many Requests)
            if status_code == 429:  # pragma: no cover
                logger.warning(f"Rate limit exceeded: GET {url}: {error_message}")
            else:
                logger.info(f"Client error: GET {url}: {error_message}")
        else:  # pragma: no cover
            # Server errors: log as error
            logger.error(f"Server error: GET {url}: {error_message}")

        # Raise a tool error with the friendly message
        response.raise_for_status()  # Will always raise since we're in the error case
        return response  # This line will never execute, but it satisfies the type checker  # pragma: no cover

    except HTTPStatusError as e:
        raise ToolError(error_message) from e


async def call_put(
    client: AsyncClient,
    url: URL | str,
    *,
    content: RequestContent | None = None,
    data: RequestData | None = None,
    files: RequestFiles | None = None,
    json: typing.Any | None = None,
    params: QueryParamTypes | None = None,
    headers: HeaderTypes | None = None,
    cookies: CookieTypes | None = None,
    auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT,
    follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
    timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
    extensions: RequestExtensions | None = None,
) -> Response:
    """Make a PUT request and handle errors appropriately.

    Args:
        client: The HTTPX AsyncClient to use
        url: The URL to request
        content: Request content
        data: Form data
        files: Files to upload
        json: JSON data
        params: Query parameters
        headers: HTTP headers
        cookies: HTTP cookies
        auth: Authentication
        follow_redirects: Whether to follow redirects
        timeout: Request timeout
        extensions: HTTPX extensions

    Returns:
        The HTTP response

    Raises:
        ToolError: If the request fails with an appropriate error message
    """
    logger.debug(f"Calling PUT '{url}'")
    error_message = None

    try:
        response = await client.put(
            url,
            content=content,
            data=data,
            files=files,
            json=json,
            params=params,
            headers=headers,
            cookies=cookies,
            auth=auth,
            follow_redirects=follow_redirects,
            timeout=timeout,
            extensions=extensions,
        )

        if response.is_success:
            return response

        # Handle different status codes differently
        status_code = response.status_code

        # get the message if available
        response_data = response.json()
        if isinstance(response_data, dict) and "detail" in response_data:
            error_message = response_data["detail"]  # pragma: no cover
        else:
            error_message = get_error_message(status_code, url, "PUT")

        # Log at appropriate level based on status code
        if 400 <= status_code < 500:
            # Client errors: log as info except for 429 (Too Many Requests)
            if status_code == 429:  # pragma: no cover
                logger.warning(f"Rate limit exceeded: PUT {url}: {error_message}")
            else:
                logger.info(f"Client error: PUT {url}: {error_message}")
        else:  # pragma: no cover
            # Server errors: log as error
            logger.error(f"Server error: PUT {url}: {error_message}")

        # Raise a tool error with the friendly message
        response.raise_for_status()  # Will always raise since we're in the error case
        return response  # This line will never execute, but it satisfies the type checker  # pragma: no cover

    except HTTPStatusError as e:
        raise ToolError(error_message) from e


async def call_patch(
    client: AsyncClient,
    url: URL | str,
    *,
    content: RequestContent | None = None,
    data: RequestData | None = None,
    files: RequestFiles | None = None,
    json: typing.Any | None = None,
    params: QueryParamTypes | None = None,
    headers: HeaderTypes | None = None,
    cookies: CookieTypes | None = None,
    auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT,
    follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
    timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
    extensions: RequestExtensions | None = None,
) -> Response:
    """Make a PATCH request and handle errors appropriately.

    Args:
        client: The HTTPX AsyncClient to use
        url: The URL to request
        content: Request content
        data: Form data
        files: Files to upload
        json: JSON data
        params: Query parameters
        headers: HTTP headers
        cookies: HTTP cookies
        auth: Authentication
        follow_redirects: Whether to follow redirects
        timeout: Request timeout
        extensions: HTTPX extensions

    Returns:
        The HTTP response

    Raises:
        ToolError: If the request fails with an appropriate error message
    """
    logger.debug(f"Calling PATCH '{url}'")

    try:
        response = await client.patch(
            url,
            content=content,
            data=data,
            files=files,
            json=json,
            params=params,
            headers=headers,
            cookies=cookies,
            auth=auth,
            follow_redirects=follow_redirects,
            timeout=timeout,
            extensions=extensions,
        )

        if response.is_success:
            return response

        # Handle different status codes differently
        status_code = response.status_code

        # Try to extract specific error message from response body
        try:
            response_data = response.json()
            if isinstance(response_data, dict) and "detail" in response_data:
                error_message = response_data["detail"]
            else:
                error_message = get_error_message(status_code, url, "PATCH")  # pragma: no cover
        except Exception:  # pragma: no cover
            error_message = get_error_message(status_code, url, "PATCH")  # pragma: no cover

        # Log at appropriate level based on status code
        if 400 <= status_code < 500:
            # Client errors: log as info except for 429 (Too Many Requests)
            if status_code == 429:  # pragma: no cover
                logger.warning(f"Rate limit exceeded: PATCH {url}: {error_message}")
            else:
                logger.info(f"Client error: PATCH {url}: {error_message}")
        else:  # pragma: no cover
            # Server errors: log as error
            logger.error(f"Server error: PATCH {url}: {error_message}")  # pragma: no cover

        # Raise a tool error with the friendly message
        response.raise_for_status()  # Will always raise since we're in the error case
        return response  # This line will never execute, but it satisfies the type checker  # pragma: no cover

    except HTTPStatusError as e:
        status_code = e.response.status_code

        # Try to extract specific error message from response body
        try:
            response_data = e.response.json()
            if isinstance(response_data, dict) and "detail" in response_data:
                error_message = response_data["detail"]
            else:
                error_message = get_error_message(status_code, url, "PATCH")  # pragma: no cover
        except Exception:  # pragma: no cover
            error_message = get_error_message(status_code, url, "PATCH")  # pragma: no cover

        raise ToolError(error_message) from e


async def call_post(
    client: AsyncClient,
    url: URL | str,
    *,
    content: RequestContent | None = None,
    data: RequestData | None = None,
    files: RequestFiles | None = None,
    json: typing.Any | None = None,
    params: QueryParamTypes | None = None,
    headers: HeaderTypes | None = None,
    cookies: CookieTypes | None = None,
    auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT,
    follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
    timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
    extensions: RequestExtensions | None = None,
) -> Response:
    """Make a POST request and handle errors appropriately.

    Args:
        client: The HTTPX AsyncClient to use
        url: The URL to request
        content: Request content
        data: Form data
        files: Files to upload
        json: JSON data
        params: Query parameters
        headers: HTTP headers
        cookies: HTTP cookies
        auth: Authentication
        follow_redirects: Whether to follow redirects
        timeout: Request timeout
        extensions: HTTPX extensions

    Returns:
        The HTTP response

    Raises:
        ToolError: If the request fails with an appropriate error message
    """
    logger.debug(f"Calling POST '{url}'")
    error_message = None

    try:
        response = await client.post(
            url=url,
            content=content,
            data=data,
            files=files,
            json=json,
            params=params,
            headers=headers,
            cookies=cookies,
            auth=auth,
            follow_redirects=follow_redirects,
            timeout=timeout,
            extensions=extensions,
        )
        logger.debug(f"response: {response.json()}")

        if response.is_success:
            return response

        # Handle different status codes differently
        status_code = response.status_code
        # get the message if available
        response_data = response.json()
        if isinstance(response_data, dict) and "detail" in response_data:
            error_message = response_data["detail"]
        else:
            error_message = get_error_message(status_code, url, "POST")

        # Log at appropriate level based on status code
        if 400 <= status_code < 500:
            # Client errors: log as info except for 429 (Too Many Requests)
            if status_code == 429:  # pragma: no cover
                logger.warning(f"Rate limit exceeded: POST {url}: {error_message}")
            else:  # pragma: no cover
                logger.info(f"Client error: POST {url}: {error_message}")
        else:
            # Server errors: log as error
            logger.error(f"Server error: POST {url}: {error_message}")

        # Raise a tool error with the friendly message
        response.raise_for_status()  # Will always raise since we're in the error case
        return response  # This line will never execute, but it satisfies the type checker  # pragma: no cover

    except HTTPStatusError as e:
        raise ToolError(error_message) from e


async def call_delete(
    client: AsyncClient,
    url: URL | str,
    *,
    params: QueryParamTypes | None = None,
    headers: HeaderTypes | None = None,
    cookies: CookieTypes | None = None,
    auth: AuthTypes | UseClientDefault = USE_CLIENT_DEFAULT,
    follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
    timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
    extensions: RequestExtensions | None = None,
) -> Response:
    """Make a DELETE request and handle errors appropriately.

    Args:
        client: The HTTPX AsyncClient to use
        url: The URL to request
        params: Query parameters
        headers: HTTP headers
        cookies: HTTP cookies
        auth: Authentication
        follow_redirects: Whether to follow redirects
        timeout: Request timeout
        extensions: HTTPX extensions

    Returns:
        The HTTP response

    Raises:
        ToolError: If the request fails with an appropriate error message
    """
    logger.debug(f"Calling DELETE '{url}'")
    error_message = None

    try:
        response = await client.delete(
            url=url,
            params=params,
            headers=headers,
            cookies=cookies,
            auth=auth,
            follow_redirects=follow_redirects,
            timeout=timeout,
            extensions=extensions,
        )

        if response.is_success:
            return response

        # Handle different status codes differently
        status_code = response.status_code
        # get the message if available
        response_data = response.json()
        if isinstance(response_data, dict) and "detail" in response_data:
            error_message = response_data["detail"]  # pragma: no cover
        else:
            error_message = get_error_message(status_code, url, "DELETE")

        # Log at appropriate level based on status code
        if 400 <= status_code < 500:
            # Client errors: log as info except for 429 (Too Many Requests)
            if status_code == 429:  # pragma: no cover
                logger.warning(f"Rate limit exceeded: DELETE {url}: {error_message}")
            else:
                logger.info(f"Client error: DELETE {url}: {error_message}")
        else:  # pragma: no cover
            # Server errors: log as error
            logger.error(f"Server error: DELETE {url}: {error_message}")

        # Raise a tool error with the friendly message
        response.raise_for_status()  # Will always raise since we're in the error case
        return response  # This line will never execute, but it satisfies the type checker  # pragma: no cover

    except HTTPStatusError as e:
        raise ToolError(error_message) from e

```
Page 9/17FirstPrevNextLast