#
tokens: 46073/50000 9/347 files (page 11/17)
lines: off (toggle) GitHub
raw markdown copy
This is page 11 of 17. Use http://codebase.md/basicmachines-co/basic-memory?lines=false&page={x} to view the full context.

# Directory Structure

```
├── .claude
│   ├── agents
│   │   ├── python-developer.md
│   │   └── system-architect.md
│   └── commands
│       ├── release
│       │   ├── beta.md
│       │   ├── changelog.md
│       │   ├── release-check.md
│       │   └── release.md
│       ├── spec.md
│       └── test-live.md
├── .dockerignore
├── .github
│   ├── dependabot.yml
│   ├── ISSUE_TEMPLATE
│   │   ├── bug_report.md
│   │   ├── config.yml
│   │   ├── documentation.md
│   │   └── feature_request.md
│   └── workflows
│       ├── claude-code-review.yml
│       ├── claude-issue-triage.yml
│       ├── claude.yml
│       ├── dev-release.yml
│       ├── docker.yml
│       ├── pr-title.yml
│       ├── release.yml
│       └── test.yml
├── .gitignore
├── .python-version
├── CHANGELOG.md
├── CITATION.cff
├── CLA.md
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── docker-compose.yml
├── Dockerfile
├── docs
│   ├── ai-assistant-guide-extended.md
│   ├── character-handling.md
│   ├── cloud-cli.md
│   └── Docker.md
├── justfile
├── LICENSE
├── llms-install.md
├── pyproject.toml
├── README.md
├── SECURITY.md
├── smithery.yaml
├── specs
│   ├── SPEC-1 Specification-Driven Development Process.md
│   ├── SPEC-10 Unified Deployment Workflow and Event Tracking.md
│   ├── SPEC-11 Basic Memory API Performance Optimization.md
│   ├── SPEC-12 OpenTelemetry Observability.md
│   ├── SPEC-13 CLI Authentication with Subscription Validation.md
│   ├── SPEC-14 Cloud Git Versioning & GitHub Backup.md
│   ├── SPEC-14- Cloud Git Versioning & GitHub Backup.md
│   ├── SPEC-15 Configuration Persistence via Tigris for Cloud Tenants.md
│   ├── SPEC-16 MCP Cloud Service Consolidation.md
│   ├── SPEC-17 Semantic Search with ChromaDB.md
│   ├── SPEC-18 AI Memory Management Tool.md
│   ├── SPEC-19 Sync Performance and Memory Optimization.md
│   ├── SPEC-2 Slash Commands Reference.md
│   ├── SPEC-20 Simplified Project-Scoped Rclone Sync.md
│   ├── SPEC-3 Agent Definitions.md
│   ├── SPEC-4 Notes Web UI Component Architecture.md
│   ├── SPEC-5 CLI Cloud Upload via WebDAV.md
│   ├── SPEC-6 Explicit Project Parameter Architecture.md
│   ├── SPEC-7 POC to spike Tigris Turso for local access to cloud data.md
│   ├── SPEC-8 TigrisFS Integration.md
│   ├── SPEC-9 Multi-Project Bidirectional Sync Architecture.md
│   ├── SPEC-9 Signed Header Tenant Information.md
│   └── SPEC-9-1 Follow-Ups- Conflict, Sync, and Observability.md
├── src
│   └── basic_memory
│       ├── __init__.py
│       ├── alembic
│       │   ├── alembic.ini
│       │   ├── env.py
│       │   ├── migrations.py
│       │   ├── script.py.mako
│       │   └── versions
│       │       ├── 3dae7c7b1564_initial_schema.py
│       │       ├── 502b60eaa905_remove_required_from_entity_permalink.py
│       │       ├── 5fe1ab1ccebe_add_projects_table.py
│       │       ├── 647e7a75e2cd_project_constraint_fix.py
│       │       ├── 9d9c1cb7d8f5_add_mtime_and_size_columns_to_entity_.py
│       │       ├── a1b2c3d4e5f6_fix_project_foreign_keys.py
│       │       ├── b3c3938bacdb_relation_to_name_unique_index.py
│       │       ├── cc7172b46608_update_search_index_schema.py
│       │       └── e7e1f4367280_add_scan_watermark_tracking_to_project.py
│       ├── api
│       │   ├── __init__.py
│       │   ├── app.py
│       │   ├── routers
│       │   │   ├── __init__.py
│       │   │   ├── directory_router.py
│       │   │   ├── importer_router.py
│       │   │   ├── knowledge_router.py
│       │   │   ├── management_router.py
│       │   │   ├── memory_router.py
│       │   │   ├── project_router.py
│       │   │   ├── prompt_router.py
│       │   │   ├── resource_router.py
│       │   │   ├── search_router.py
│       │   │   └── utils.py
│       │   └── template_loader.py
│       ├── cli
│       │   ├── __init__.py
│       │   ├── app.py
│       │   ├── auth.py
│       │   ├── commands
│       │   │   ├── __init__.py
│       │   │   ├── cloud
│       │   │   │   ├── __init__.py
│       │   │   │   ├── api_client.py
│       │   │   │   ├── bisync_commands.py
│       │   │   │   ├── cloud_utils.py
│       │   │   │   ├── core_commands.py
│       │   │   │   ├── rclone_commands.py
│       │   │   │   ├── rclone_config.py
│       │   │   │   ├── rclone_installer.py
│       │   │   │   ├── upload_command.py
│       │   │   │   └── upload.py
│       │   │   ├── command_utils.py
│       │   │   ├── db.py
│       │   │   ├── import_chatgpt.py
│       │   │   ├── import_claude_conversations.py
│       │   │   ├── import_claude_projects.py
│       │   │   ├── import_memory_json.py
│       │   │   ├── mcp.py
│       │   │   ├── project.py
│       │   │   ├── status.py
│       │   │   └── tool.py
│       │   └── main.py
│       ├── config.py
│       ├── db.py
│       ├── deps.py
│       ├── file_utils.py
│       ├── ignore_utils.py
│       ├── importers
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── chatgpt_importer.py
│       │   ├── claude_conversations_importer.py
│       │   ├── claude_projects_importer.py
│       │   ├── memory_json_importer.py
│       │   └── utils.py
│       ├── markdown
│       │   ├── __init__.py
│       │   ├── entity_parser.py
│       │   ├── markdown_processor.py
│       │   ├── plugins.py
│       │   ├── schemas.py
│       │   └── utils.py
│       ├── mcp
│       │   ├── __init__.py
│       │   ├── async_client.py
│       │   ├── project_context.py
│       │   ├── prompts
│       │   │   ├── __init__.py
│       │   │   ├── ai_assistant_guide.py
│       │   │   ├── continue_conversation.py
│       │   │   ├── recent_activity.py
│       │   │   ├── search.py
│       │   │   └── utils.py
│       │   ├── resources
│       │   │   ├── ai_assistant_guide.md
│       │   │   └── project_info.py
│       │   ├── server.py
│       │   └── tools
│       │       ├── __init__.py
│       │       ├── build_context.py
│       │       ├── canvas.py
│       │       ├── chatgpt_tools.py
│       │       ├── delete_note.py
│       │       ├── edit_note.py
│       │       ├── list_directory.py
│       │       ├── move_note.py
│       │       ├── project_management.py
│       │       ├── read_content.py
│       │       ├── read_note.py
│       │       ├── recent_activity.py
│       │       ├── search.py
│       │       ├── utils.py
│       │       ├── view_note.py
│       │       └── write_note.py
│       ├── models
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── knowledge.py
│       │   ├── project.py
│       │   └── search.py
│       ├── repository
│       │   ├── __init__.py
│       │   ├── entity_repository.py
│       │   ├── observation_repository.py
│       │   ├── project_info_repository.py
│       │   ├── project_repository.py
│       │   ├── relation_repository.py
│       │   ├── repository.py
│       │   └── search_repository.py
│       ├── schemas
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── cloud.py
│       │   ├── delete.py
│       │   ├── directory.py
│       │   ├── importer.py
│       │   ├── memory.py
│       │   ├── project_info.py
│       │   ├── prompt.py
│       │   ├── request.py
│       │   ├── response.py
│       │   ├── search.py
│       │   └── sync_report.py
│       ├── services
│       │   ├── __init__.py
│       │   ├── context_service.py
│       │   ├── directory_service.py
│       │   ├── entity_service.py
│       │   ├── exceptions.py
│       │   ├── file_service.py
│       │   ├── initialization.py
│       │   ├── link_resolver.py
│       │   ├── project_service.py
│       │   ├── search_service.py
│       │   └── service.py
│       ├── sync
│       │   ├── __init__.py
│       │   ├── background_sync.py
│       │   ├── sync_service.py
│       │   └── watch_service.py
│       ├── templates
│       │   └── prompts
│       │       ├── continue_conversation.hbs
│       │       └── search.hbs
│       └── utils.py
├── test-int
│   ├── BENCHMARKS.md
│   ├── cli
│   │   ├── test_project_commands_integration.py
│   │   └── test_version_integration.py
│   ├── conftest.py
│   ├── mcp
│   │   ├── test_build_context_underscore.py
│   │   ├── test_build_context_validation.py
│   │   ├── test_chatgpt_tools_integration.py
│   │   ├── test_default_project_mode_integration.py
│   │   ├── test_delete_note_integration.py
│   │   ├── test_edit_note_integration.py
│   │   ├── test_list_directory_integration.py
│   │   ├── test_move_note_integration.py
│   │   ├── test_project_management_integration.py
│   │   ├── test_project_state_sync_integration.py
│   │   ├── test_read_content_integration.py
│   │   ├── test_read_note_integration.py
│   │   ├── test_search_integration.py
│   │   ├── test_single_project_mcp_integration.py
│   │   └── test_write_note_integration.py
│   ├── test_db_wal_mode.py
│   ├── test_disable_permalinks_integration.py
│   └── test_sync_performance_benchmark.py
├── tests
│   ├── __init__.py
│   ├── api
│   │   ├── conftest.py
│   │   ├── test_async_client.py
│   │   ├── test_continue_conversation_template.py
│   │   ├── test_directory_router.py
│   │   ├── test_importer_router.py
│   │   ├── test_knowledge_router.py
│   │   ├── test_management_router.py
│   │   ├── test_memory_router.py
│   │   ├── test_project_router_operations.py
│   │   ├── test_project_router.py
│   │   ├── test_prompt_router.py
│   │   ├── test_relation_background_resolution.py
│   │   ├── test_resource_router.py
│   │   ├── test_search_router.py
│   │   ├── test_search_template.py
│   │   ├── test_template_loader_helpers.py
│   │   └── test_template_loader.py
│   ├── cli
│   │   ├── conftest.py
│   │   ├── test_cli_tools.py
│   │   ├── test_cloud_authentication.py
│   │   ├── test_ignore_utils.py
│   │   ├── test_import_chatgpt.py
│   │   ├── test_import_claude_conversations.py
│   │   ├── test_import_claude_projects.py
│   │   ├── test_import_memory_json.py
│   │   ├── test_project_add_with_local_path.py
│   │   └── test_upload.py
│   ├── conftest.py
│   ├── db
│   │   └── test_issue_254_foreign_key_constraints.py
│   ├── importers
│   │   ├── test_importer_base.py
│   │   └── test_importer_utils.py
│   ├── markdown
│   │   ├── __init__.py
│   │   ├── test_date_frontmatter_parsing.py
│   │   ├── test_entity_parser_error_handling.py
│   │   ├── test_entity_parser.py
│   │   ├── test_markdown_plugins.py
│   │   ├── test_markdown_processor.py
│   │   ├── test_observation_edge_cases.py
│   │   ├── test_parser_edge_cases.py
│   │   ├── test_relation_edge_cases.py
│   │   └── test_task_detection.py
│   ├── mcp
│   │   ├── conftest.py
│   │   ├── test_obsidian_yaml_formatting.py
│   │   ├── test_permalink_collision_file_overwrite.py
│   │   ├── test_prompts.py
│   │   ├── test_resources.py
│   │   ├── test_tool_build_context.py
│   │   ├── test_tool_canvas.py
│   │   ├── test_tool_delete_note.py
│   │   ├── test_tool_edit_note.py
│   │   ├── test_tool_list_directory.py
│   │   ├── test_tool_move_note.py
│   │   ├── test_tool_read_content.py
│   │   ├── test_tool_read_note.py
│   │   ├── test_tool_recent_activity.py
│   │   ├── test_tool_resource.py
│   │   ├── test_tool_search.py
│   │   ├── test_tool_utils.py
│   │   ├── test_tool_view_note.py
│   │   ├── test_tool_write_note.py
│   │   └── tools
│   │       └── test_chatgpt_tools.py
│   ├── Non-MarkdownFileSupport.pdf
│   ├── repository
│   │   ├── test_entity_repository_upsert.py
│   │   ├── test_entity_repository.py
│   │   ├── test_entity_upsert_issue_187.py
│   │   ├── test_observation_repository.py
│   │   ├── test_project_info_repository.py
│   │   ├── test_project_repository.py
│   │   ├── test_relation_repository.py
│   │   ├── test_repository.py
│   │   ├── test_search_repository_edit_bug_fix.py
│   │   └── test_search_repository.py
│   ├── schemas
│   │   ├── test_base_timeframe_minimum.py
│   │   ├── test_memory_serialization.py
│   │   ├── test_memory_url_validation.py
│   │   ├── test_memory_url.py
│   │   ├── test_schemas.py
│   │   └── test_search.py
│   ├── Screenshot.png
│   ├── services
│   │   ├── test_context_service.py
│   │   ├── test_directory_service.py
│   │   ├── test_entity_service_disable_permalinks.py
│   │   ├── test_entity_service.py
│   │   ├── test_file_service.py
│   │   ├── test_initialization.py
│   │   ├── test_link_resolver.py
│   │   ├── test_project_removal_bug.py
│   │   ├── test_project_service_operations.py
│   │   ├── test_project_service.py
│   │   └── test_search_service.py
│   ├── sync
│   │   ├── test_character_conflicts.py
│   │   ├── test_sync_service_incremental.py
│   │   ├── test_sync_service.py
│   │   ├── test_sync_wikilink_issue.py
│   │   ├── test_tmp_files.py
│   │   ├── test_watch_service_edge_cases.py
│   │   ├── test_watch_service_reload.py
│   │   └── test_watch_service.py
│   ├── test_config.py
│   ├── test_db_migration_deduplication.py
│   ├── test_deps.py
│   ├── test_production_cascade_delete.py
│   ├── test_rclone_commands.py
│   └── utils
│       ├── test_file_utils.py
│       ├── test_frontmatter_obsidian_compatible.py
│       ├── test_parse_tags.py
│       ├── test_permalink_formatting.py
│       ├── test_utf8_handling.py
│       └── test_validate_project_path.py
├── uv.lock
├── v0.15.0-RELEASE-DOCS.md
└── v15-docs
    ├── api-performance.md
    ├── background-relations.md
    ├── basic-memory-home.md
    ├── bug-fixes.md
    ├── chatgpt-integration.md
    ├── cloud-authentication.md
    ├── cloud-bisync.md
    ├── cloud-mode-usage.md
    ├── cloud-mount.md
    ├── default-project-mode.md
    ├── env-file-removal.md
    ├── env-var-overrides.md
    ├── explicit-project-parameter.md
    ├── gitignore-integration.md
    ├── project-root-env-var.md
    ├── README.md
    └── sqlite-performance.md
```

# Files

--------------------------------------------------------------------------------
/src/basic_memory/mcp/tools/recent_activity.py:
--------------------------------------------------------------------------------

```python
"""Recent activity tool for Basic Memory MCP server."""

from typing import List, Union, Optional

from loguru import logger
from fastmcp import Context

from basic_memory.mcp.async_client import get_client
from basic_memory.mcp.project_context import get_active_project, resolve_project_parameter
from basic_memory.mcp.server import mcp
from basic_memory.mcp.tools.utils import call_get
from basic_memory.schemas.base import TimeFrame
from basic_memory.schemas.memory import (
    GraphContext,
    ProjectActivity,
    ActivityStats,
)
from basic_memory.schemas.project_info import ProjectList, ProjectItem
from basic_memory.schemas.search import SearchItemType


@mcp.tool(
    description="""Get recent activity for a project or across all projects.

    Timeframe supports natural language formats like:
    - "2 days ago"
    - "last week"
    - "yesterday"
    - "today"
    - "3 weeks ago"
    Or standard formats like "7d"
    """,
)
async def recent_activity(
    type: Union[str, List[str]] = "",
    depth: int = 1,
    timeframe: TimeFrame = "7d",
    project: Optional[str] = None,
    context: Context | None = None,
) -> str:
    """Get recent activity for a specific project or across all projects.

    Project Resolution:
    The server resolves projects in this order:
    1. Single Project Mode - server constrained to one project, parameter ignored
    2. Explicit project parameter - specify which project to query
    3. Default project - server configured default if no project specified

    Discovery Mode:
    When no specific project can be resolved, returns activity across all projects
    to help discover available projects and their recent activity.

    Project Discovery (when project is unknown):
    1. Call list_memory_projects() to see available projects
    2. Or use this tool without project parameter to see cross-project activity
    3. Ask the user which project to focus on
    4. Remember their choice for the conversation

    Args:
        type: Filter by content type(s). Can be a string or list of strings.
            Valid options:
            - "entity" or ["entity"] for knowledge entities
            - "relation" or ["relation"] for connections between entities
            - "observation" or ["observation"] for notes and observations
            Multiple types can be combined: ["entity", "relation"]
            Case-insensitive: "ENTITY" and "entity" are treated the same.
            Default is an empty string, which returns all types.
        depth: How many relation hops to traverse (1-3 recommended)
        timeframe: Time window to search. Supports natural language:
            - Relative: "2 days ago", "last week", "yesterday"
            - Points in time: "2024-01-01", "January 1st"
            - Standard format: "7d", "24h"
        project: Project name to query. Optional - server will resolve using the
                hierarchy above. If unknown, use list_memory_projects() to discover
                available projects.
        context: Optional FastMCP context for performance caching.

    Returns:
        Human-readable summary of recent activity. When no specific project is
        resolved, returns cross-project discovery information. When a specific
        project is resolved, returns detailed activity for that project.

    Examples:
        # Cross-project discovery mode
        recent_activity()
        recent_activity(timeframe="yesterday")

        # Project-specific activity
        recent_activity(project="work-docs", type="entity", timeframe="yesterday")
        recent_activity(project="research", type=["entity", "relation"], timeframe="today")
        recent_activity(project="notes", type="entity", depth=2, timeframe="2 weeks ago")

    Raises:
        ToolError: If project doesn't exist or type parameter contains invalid values

    Notes:
        - Higher depth values (>3) may impact performance with large result sets
        - For focused queries, consider using build_context with a specific URI
        - Max timeframe is 1 year in the past
    """
    async with get_client() as client:
        # Build common parameters for API calls
        params = {
            "page": 1,
            "page_size": 10,
            "max_related": 10,
        }
        if depth:
            params["depth"] = depth
        if timeframe:
            params["timeframe"] = timeframe  # pyright: ignore

        # Validate and convert type parameter
        if type:
            # Convert single string to list
            if isinstance(type, str):
                type_list = [type]
            else:
                type_list = type

            # Validate each type against SearchItemType enum
            validated_types = []
            for t in type_list:
                try:
                    # Try to convert string to enum
                    if isinstance(t, str):
                        validated_types.append(SearchItemType(t.lower()))
                except ValueError:
                    valid_types = [t.value for t in SearchItemType]
                    raise ValueError(f"Invalid type: {t}. Valid types are: {valid_types}")

            # Add validated types to params
            params["type"] = [t.value for t in validated_types]  # pyright: ignore

        # Resolve project parameter using the three-tier hierarchy
        resolved_project = await resolve_project_parameter(project)

        if resolved_project is None:
            # Discovery Mode: Get activity across all projects
            logger.info(
                f"Getting recent activity across all projects: type={type}, depth={depth}, timeframe={timeframe}"
            )

            # Get list of all projects
            response = await call_get(client, "/projects/projects")
            project_list = ProjectList.model_validate(response.json())

            projects_activity = {}
            total_items = 0
            total_entities = 0
            total_relations = 0
            total_observations = 0
            most_active_project = None
            most_active_count = 0
            active_projects = 0

            # Query each project's activity
            for project_info in project_list.projects:
                project_activity = await _get_project_activity(client, project_info, params, depth)
                projects_activity[project_info.name] = project_activity

                # Aggregate stats
                item_count = project_activity.item_count
                if item_count > 0:
                    active_projects += 1
                    total_items += item_count

                    # Count by type
                    for result in project_activity.activity.results:
                        if result.primary_result.type == "entity":
                            total_entities += 1
                        elif result.primary_result.type == "relation":
                            total_relations += 1
                        elif result.primary_result.type == "observation":
                            total_observations += 1

                    # Track most active project
                    if item_count > most_active_count:
                        most_active_count = item_count
                        most_active_project = project_info.name

            # Build summary stats
            summary = ActivityStats(
                total_projects=len(project_list.projects),
                active_projects=active_projects,
                most_active_project=most_active_project,
                total_items=total_items,
                total_entities=total_entities,
                total_relations=total_relations,
                total_observations=total_observations,
            )

            # Generate guidance for the assistant
            guidance_lines = ["\n" + "─" * 40]

            if most_active_project and most_active_count > 0:
                guidance_lines.extend(
                    [
                        f"Suggested project: '{most_active_project}' (most active with {most_active_count} items)",
                        f"Ask user: 'Should I use {most_active_project} for this task, or would you prefer a different project?'",
                    ]
                )
            elif active_projects > 0:
                # Has activity but no clear most active project
                active_project_names = [
                    name for name, activity in projects_activity.items() if activity.item_count > 0
                ]
                if len(active_project_names) == 1:
                    guidance_lines.extend(
                        [
                            f"Suggested project: '{active_project_names[0]}' (only active project)",
                            f"Ask user: 'Should I use {active_project_names[0]} for this task?'",
                        ]
                    )
                else:
                    guidance_lines.extend(
                        [
                            f"Multiple active projects found: {', '.join(active_project_names)}",
                            "Ask user: 'Which project should I use for this task?'",
                        ]
                    )
            else:
                # No recent activity
                guidance_lines.extend(
                    [
                        "No recent activity found in any project.",
                        "Consider: Ask which project to use or if they want to create a new one.",
                    ]
                )

            guidance_lines.extend(
                [
                    "",
                    "Session reminder: Remember their project choice throughout this conversation.",
                ]
            )

            guidance = "\n".join(guidance_lines)

            # Format discovery mode output
            return _format_discovery_output(projects_activity, summary, timeframe, guidance)

        else:
            # Project-Specific Mode: Get activity for specific project
            logger.info(
                f"Getting recent activity from project {resolved_project}: type={type}, depth={depth}, timeframe={timeframe}"
            )

            active_project = await get_active_project(client, resolved_project, context)
            project_url = active_project.project_url

            response = await call_get(
                client,
                f"{project_url}/memory/recent",
                params=params,
            )
            activity_data = GraphContext.model_validate(response.json())

            # Format project-specific mode output
            return _format_project_output(resolved_project, activity_data, timeframe, type)


async def _get_project_activity(
    client, project_info: ProjectItem, params: dict, depth: int
) -> ProjectActivity:
    """Get activity data for a single project.

    Args:
        client: HTTP client for API calls
        project_info: Project information
        params: Query parameters for the activity request
        depth: Graph traversal depth

    Returns:
        ProjectActivity with activity data or empty activity on error
    """
    project_url = f"/{project_info.permalink}"
    activity_response = await call_get(
        client,
        f"{project_url}/memory/recent",
        params=params,
    )
    activity = GraphContext.model_validate(activity_response.json())

    # Extract last activity timestamp and active folders
    last_activity = None
    active_folders = set()

    for result in activity.results:
        if result.primary_result.created_at:
            current_time = result.primary_result.created_at
            try:
                if last_activity is None or current_time > last_activity:
                    last_activity = current_time
            except TypeError:
                # Handle timezone comparison issues by skipping this comparison
                if last_activity is None:
                    last_activity = current_time

        # Extract folder from file_path
        if hasattr(result.primary_result, "file_path") and result.primary_result.file_path:
            folder = "/".join(result.primary_result.file_path.split("/")[:-1])
            if folder:
                active_folders.add(folder)

    return ProjectActivity(
        project_name=project_info.name,
        project_path=project_info.path,
        activity=activity,
        item_count=len(activity.results),
        last_activity=last_activity,
        active_folders=list(active_folders)[:5],  # Limit to top 5 folders
    )


def _format_discovery_output(
    projects_activity: dict, summary: ActivityStats, timeframe: str, guidance: str
) -> str:
    """Format discovery mode output as human-readable text."""
    lines = [f"## Recent Activity Summary ({timeframe})"]

    # Most active project section
    if summary.most_active_project and summary.total_items > 0:
        most_active = projects_activity[summary.most_active_project]
        lines.append(
            f"\n**Most Active Project:** {summary.most_active_project} ({most_active.item_count} items)"
        )

        # Get latest activity from most active project
        if most_active.activity.results:
            latest = most_active.activity.results[0].primary_result
            title = latest.title if hasattr(latest, "title") and latest.title else "Recent activity"
            # Format relative time
            time_str = (
                _format_relative_time(latest.created_at) if latest.created_at else "unknown time"
            )
            lines.append(f"- 🔧 **Latest:** {title} ({time_str})")

        # Active folders
        if most_active.active_folders:
            folders = ", ".join(most_active.active_folders[:3])
            lines.append(f"- 📋 **Focus areas:** {folders}")

    # Other active projects
    other_active = [
        (name, activity)
        for name, activity in projects_activity.items()
        if activity.item_count > 0 and name != summary.most_active_project
    ]

    if other_active:
        lines.append("\n**Other Active Projects:**")
        for name, activity in sorted(other_active, key=lambda x: x[1].item_count, reverse=True)[:4]:
            lines.append(f"- **{name}** ({activity.item_count} items)")

    # Key developments - extract from recent entities
    key_items = []
    for name, activity in projects_activity.items():
        if activity.item_count > 0:
            for result in activity.activity.results[:3]:  # Top 3 from each active project
                if result.primary_result.type == "entity" and hasattr(
                    result.primary_result, "title"
                ):
                    title = result.primary_result.title
                    # Look for status indicators in titles
                    if any(word in title.lower() for word in ["complete", "fix", "test", "spec"]):
                        key_items.append(title)

    if key_items:
        lines.append("\n**Key Developments:**")
        for item in key_items[:5]:  # Show top 5
            status = "✅" if any(word in item.lower() for word in ["complete", "fix"]) else "🧪"
            lines.append(f"- {status} **{item}**")

    # Add summary stats
    lines.append(
        f"\n**Summary:** {summary.active_projects} active projects, {summary.total_items} recent items"
    )

    # Add guidance
    lines.append(guidance)

    return "\n".join(lines)


def _format_project_output(
    project_name: str,
    activity_data: GraphContext,
    timeframe: str,
    type_filter: Union[str, List[str]],
) -> str:
    """Format project-specific mode output as human-readable text."""
    lines = [f"## Recent Activity: {project_name} ({timeframe})"]

    if not activity_data.results:
        lines.append(f"\nNo recent activity found in '{project_name}' project.")
        return "\n".join(lines)

    # Group results by type
    entities = []
    relations = []
    observations = []

    for result in activity_data.results:
        if result.primary_result.type == "entity":
            entities.append(result.primary_result)
        elif result.primary_result.type == "relation":
            relations.append(result.primary_result)
        elif result.primary_result.type == "observation":
            observations.append(result.primary_result)

    # Show entities (notes/documents)
    if entities:
        lines.append(f"\n**📄 Recent Notes & Documents ({len(entities)}):**")
        for entity in entities[:5]:  # Show top 5
            title = entity.title if hasattr(entity, "title") and entity.title else "Untitled"
            # Get folder from file_path if available
            folder = ""
            if hasattr(entity, "file_path") and entity.file_path:
                folder_path = "/".join(entity.file_path.split("/")[:-1])
                if folder_path:
                    folder = f" ({folder_path})"
            lines.append(f"  • {title}{folder}")

    # Show observations (categorized insights)
    if observations:
        lines.append(f"\n**🔍 Recent Observations ({len(observations)}):**")
        # Group by category
        by_category = {}
        for obs in observations[:10]:  # Limit to recent ones
            category = (
                getattr(obs, "category", "general") if hasattr(obs, "category") else "general"
            )
            if category not in by_category:
                by_category[category] = []
            by_category[category].append(obs)

        for category, obs_list in list(by_category.items())[:5]:  # Show top 5 categories
            lines.append(f"  **{category}:** {len(obs_list)} items")
            for obs in obs_list[:2]:  # Show 2 examples per category
                content = (
                    getattr(obs, "content", "No content")
                    if hasattr(obs, "content")
                    else "No content"
                )
                # Truncate at word boundary
                if len(content) > 80:
                    content = _truncate_at_word(content, 80)
                lines.append(f"    - {content}")

    # Show relations (connections)
    if relations:
        lines.append(f"\n**🔗 Recent Connections ({len(relations)}):**")
        for rel in relations[:5]:  # Show top 5
            rel_type = (
                getattr(rel, "relation_type", "relates_to")
                if hasattr(rel, "relation_type")
                else "relates_to"
            )
            from_entity = (
                getattr(rel, "from_entity", "Unknown") if hasattr(rel, "from_entity") else "Unknown"
            )
            to_entity = getattr(rel, "to_entity", None) if hasattr(rel, "to_entity") else None

            # Format as WikiLinks to show they're readable notes
            from_link = f"[[{from_entity}]]" if from_entity != "Unknown" else from_entity
            to_link = f"[[{to_entity}]]" if to_entity else "[Missing Link]"

            lines.append(f"  • {from_link} → {rel_type} → {to_link}")

    # Activity summary
    total = len(activity_data.results)
    lines.append(f"\n**Activity Summary:** {total} items found")
    if hasattr(activity_data, "metadata") and activity_data.metadata:
        if hasattr(activity_data.metadata, "total_results"):
            lines.append(f"Total available: {activity_data.metadata.total_results}")

    return "\n".join(lines)


def _format_relative_time(timestamp) -> str:
    """Format timestamp as relative time like '2 hours ago'."""
    try:
        from datetime import datetime, timezone
        from dateutil.relativedelta import relativedelta

        if isinstance(timestamp, str):
            # Parse ISO format timestamp
            dt = datetime.fromisoformat(timestamp.replace("Z", "+00:00"))
        else:
            dt = timestamp

        now = datetime.now(timezone.utc)
        if dt.tzinfo is None:
            dt = dt.replace(tzinfo=timezone.utc)

        # Use relativedelta for accurate time differences
        diff = relativedelta(now, dt)

        if diff.years > 0:
            return f"{diff.years} year{'s' if diff.years > 1 else ''} ago"
        elif diff.months > 0:
            return f"{diff.months} month{'s' if diff.months > 1 else ''} ago"
        elif diff.days > 0:
            if diff.days == 1:
                return "yesterday"
            elif diff.days < 7:
                return f"{diff.days} days ago"
            else:
                weeks = diff.days // 7
                return f"{weeks} week{'s' if weeks > 1 else ''} ago"
        elif diff.hours > 0:
            return f"{diff.hours} hour{'s' if diff.hours > 1 else ''} ago"
        elif diff.minutes > 0:
            return f"{diff.minutes} minute{'s' if diff.minutes > 1 else ''} ago"
        else:
            return "just now"
    except Exception:
        return "recently"


def _truncate_at_word(text: str, max_length: int) -> str:
    """Truncate text at word boundary."""
    if len(text) <= max_length:
        return text

    # Find last space before max_length
    truncated = text[:max_length]
    last_space = truncated.rfind(" ")

    if last_space > max_length * 0.7:  # Only truncate at word if we're not losing too much
        return text[:last_space] + "..."
    else:
        return text[: max_length - 3] + "..."

```

--------------------------------------------------------------------------------
/specs/SPEC-18 AI Memory Management Tool.md:
--------------------------------------------------------------------------------

```markdown
---
title: 'SPEC-18: AI Memory Management Tool'
type: spec
permalink: specs/spec-15-ai-memory-management-tool
tags:
- mcp
- memory
- ai-context
- tools
---

# SPEC-18: AI Memory Management Tool

## Why

Anthropic recently released a memory tool for Claude that enables storing and retrieving information across conversations using client-side file operations. This validates Basic Memory's local-first, file-based architecture - Anthropic converged on the same pattern.

However, Anthropic's memory tool is only available via their API and stores plain text. Basic Memory can offer a superior implementation through MCP that:

1. **Works everywhere** - Claude Desktop, Code, VS Code, Cursor via MCP (not just API)
2. **Structured knowledge** - Entities with observations/relations vs plain text
3. **Full search** - Full-text search, graph traversal, time-aware queries
4. **Unified storage** - Agent memories + user notes in one knowledge graph
5. **Existing infrastructure** - Leverages SQLite indexing, sync, multi-project support

This would enable AI agents to store contextual memories alongside user notes, with all the power of Basic Memory's knowledge graph features.

## What

Create a new MCP tool `memory` that matches Anthropic's tool interface exactly, allowing Claude to use it with zero learning curve. The tool will store files in Basic Memory's `/memories` directory and support Basic Memory's structured markdown format in the file content.

### Affected Components

- **New MCP Tool**: `src/basic_memory/mcp/tools/memory_tool.py`
- **Dedicated Memories Project**: Create a separate "memories" Basic Memory project
- **Project Isolation**: Memories stored separately from user notes/documents
- **File Organization**: Within the memories project, use folder structure:
  - `user/` - User preferences, context, communication style
  - `projects/` - Project-specific state and decisions
  - `sessions/` - Conversation-specific working memory
  - `patterns/` - Learned patterns and insights

### Tool Commands

The tool will support these commands (exactly matching Anthropic's interface):

- `view` - Display directory contents or file content (with optional line range)
- `create` - Create or overwrite a file with given content
- `str_replace` - Replace text in an existing file
- `insert` - Insert text at specific line number
- `delete` - Delete file or directory
- `rename` - Move or rename file/directory

### Memory Note Format

Memories will use Basic Memory's standard structure:

```markdown
---
title: User Preferences
permalink: memories/user/preferences
type: memory
memory_type: preferences
created_by: claude
tags: [user, preferences, style]
---

# User Preferences

## Observations
- [communication] Prefers concise, direct responses without preamble #style
- [tone] Appreciates validation but dislikes excessive apologizing #communication
- [technical] Works primarily in Python with type annotations #coding

## Relations
- relates_to [[Basic Memory Project]]
- informs [[Response Style Guidelines]]
```

## How (High Level)

### Implementation Approach

The memory tool matches Anthropic's interface but uses a dedicated Basic Memory project:

```python
async def memory_tool(
    command: str,
    path: str,
    file_text: Optional[str] = None,
    old_str: Optional[str] = None,
    new_str: Optional[str] = None,
    insert_line: Optional[int] = None,
    insert_text: Optional[str] = None,
    old_path: Optional[str] = None,
    new_path: Optional[str] = None,
    view_range: Optional[List[int]] = None,
):
    """Memory tool with Anthropic-compatible interface.

    Operates on a dedicated "memories" Basic Memory project,
    keeping AI memories separate from user notes.
    """

    # Get the memories project (auto-created if doesn't exist)
    memories_project = get_or_create_memories_project()

    # Validate path security using pathlib (prevent directory traversal)
    safe_path = validate_memory_path(path, memories_project.project_path)

    # Use existing project isolation - already prevents cross-project access
    full_path = memories_project.project_path / safe_path

    if command == "view":
        # Return directory listing or file content
        if full_path.is_dir():
            return list_directory_contents(full_path)
        return read_file_content(full_path, view_range)

    elif command == "create":
        # Write file directly (file_text can contain BM markdown)
        full_path.parent.mkdir(parents=True, exist_ok=True)
        full_path.write_text(file_text)
        # Sync service will detect and index automatically
        return f"Created {path}"

    elif command == "str_replace":
        # Read, replace, write
        content = full_path.read_text()
        updated = content.replace(old_str, new_str)
        full_path.write_text(updated)
        return f"Replaced text in {path}"

    elif command == "insert":
        # Insert at line number
        lines = full_path.read_text().splitlines()
        lines.insert(insert_line, insert_text)
        full_path.write_text("\n".join(lines))
        return f"Inserted text at line {insert_line}"

    elif command == "delete":
        # Delete file or directory
        if full_path.is_dir():
            shutil.rmtree(full_path)
        else:
            full_path.unlink()
        return f"Deleted {path}"

    elif command == "rename":
        # Move/rename
        full_path.rename(config.project_path / new_path)
        return f"Renamed {old_path} to {new_path}"
```

### Key Design Decisions

1. **Exact interface match** - Same commands, parameters as Anthropic's tool
2. **Dedicated memories project** - Separate Basic Memory project keeps AI memories isolated from user notes
3. **Existing project isolation** - Leverage BM's existing cross-project security (no additional validation needed)
4. **Direct file I/O** - No schema conversion, just read/write files
5. **Structured content supported** - `file_text` can use BM markdown format with frontmatter, observations, relations
6. **Automatic indexing** - Sync service watches memories project and indexes changes
7. **Path security** - Use `pathlib.Path.resolve()` and `relative_to()` to prevent directory traversal
8. **Error handling** - Follow Anthropic's text editor tool error patterns

### MCP Tool Schema

Exact match to Anthropic's memory tool schema:

```json
{
    "name": "memory",
    "description": "Store and retrieve information across conversations using structured markdown files. All operations must be within the /memories directory. Supports Basic Memory markdown format including frontmatter, observations, and relations.",
    "input_schema": {
        "type": "object",
        "properties": {
            "command": {
                "type": "string",
                "enum": ["view", "create", "str_replace", "insert", "delete", "rename"],
                "description": "File operation to perform"
            },
            "path": {shu
                "type": "string",
                "description": "Path within /memories directory (required for all commands)"
            },
            "file_text": {
                "type": "string",
                "description": "Content to write (for create command). Supports Basic Memory markdown format."
            },
            "view_range": {
                "type": "array",
                "items": {"type": "integer"},
                "description": "Optional [start, end] line range for view command"
            },
            "old_str": {
                "type": "string",
                "description": "Text to replace (for str_replace command)"
            },
            "new_str": {
                "type": "string",
                "description": "Replacement text (for str_replace command)"
            },
            "insert_line": {
                "type": "integer",
                "description": "Line number to insert at (for insert command)"
            },
            "insert_text": {
                "type": "string",
                "description": "Text to insert (for insert command)"
            },
            "old_path": {
                "type": "string",
                "description": "Current path (for rename command)"
            },
            "new_path": {
                "type": "string",
                "description": "New path (for rename command)"
            }
        },
        "required": ["command", "path"]
    }
}
```

### Prompting Guidance

When the `memory` tool is included, Basic Memory should provide system prompt guidance to help Claude use it effectively.

#### Automatic System Prompt Addition

```text
MEMORY PROTOCOL FOR BASIC MEMORY:
1. ALWAYS check your memory directory first using `view` command on root directory
2. Your memories are stored in a dedicated Basic Memory project (isolated from user notes)
3. Use structured markdown format in memory files:
   - Include frontmatter with title, type: memory, tags
   - Use ## Observations with [category] prefixes for facts
   - Use ## Relations to link memories with [[WikiLinks]]
4. Record progress, context, and decisions as categorized observations
5. Link related memories using relations
6. ASSUME INTERRUPTION: Context may reset - save progress frequently

MEMORY ORGANIZATION:
- user/ - User preferences, context, communication style
- projects/ - Project-specific state and decisions
- sessions/ - Conversation-specific working memory
- patterns/ - Learned patterns and insights

MEMORY ADVANTAGES:
- Your memories are automatically searchable via full-text search
- Relations create a knowledge graph you can traverse
- Memories are isolated from user notes (separate project)
- Use search_notes(project="memories") to find relevant past context
- Use recent_activity(project="memories") to see what changed recently
- Use build_context() to navigate memory relations
```

#### Optional MCP Prompt: `memory_guide`

Create an MCP prompt that provides detailed guidance and examples:

```python
{
    "name": "memory_guide",
    "description": "Comprehensive guidance for using Basic Memory's memory tool effectively, including structured markdown examples and best practices"
}
```

This prompt returns:
- Full protocol and conventions
- Example memory file structures
- Tips for organizing observations and relations
- Integration with other Basic Memory tools
- Common patterns (user preferences, project state, session tracking)

#### User Customization

Users can customize memory behavior with additional instructions:
- "Only write information relevant to [topic] in your memory system"
- "Keep memory files concise and organized - delete outdated content"
- "Use detailed observations for technical decisions and implementation notes"
- "Always link memories to related project documentation using relations"

### Error Handling

Follow Anthropic's text editor tool error handling patterns for consistency:

#### Error Types

1. **File Not Found**
   ```json
   {"error": "File not found: memories/user/preferences.md", "is_error": true}
   ```

2. **Permission Denied**
   ```json
   {"error": "Permission denied: Cannot write outside /memories directory", "is_error": true}
   ```

3. **Invalid Path (Directory Traversal)**
   ```json
   {"error": "Invalid path: Path must be within /memories directory", "is_error": true}
   ```

4. **Multiple Matches (str_replace)**
   ```json
   {"error": "Found 3 matches for replacement text. Please provide more context to make a unique match.", "is_error": true}
   ```

5. **No Matches (str_replace)**
   ```json
   {"error": "No match found for replacement. Please check your text and try again.", "is_error": true}
   ```

6. **Invalid Line Number (insert)**
   ```json
   {"error": "Invalid line number: File has 20 lines, cannot insert at line 100", "is_error": true}
   ```

#### Error Handling Best Practices

- **Path validation** - Use `pathlib.Path.resolve()` and `relative_to()` to validate paths
  ```python
  def validate_memory_path(path: str, project_path: Path) -> Path:
      """Validate path is within memories project directory."""
      # Resolve to canonical form
      full_path = (project_path / path).resolve()

      # Ensure it's relative to project path (prevents directory traversal)
      try:
          full_path.relative_to(project_path)
          return full_path
      except ValueError:
          raise ValueError("Invalid path: Path must be within memories project")
  ```
- **Project isolation** - Leverage existing Basic Memory project isolation (prevents cross-project access)
- **File existence** - Verify file exists before read/modify operations
- **Clear messages** - Provide specific, actionable error messages
- **Structured responses** - Always include `is_error: true` flag in error responses
- **Security checks** - Reject `../`, `..\\`, URL-encoded sequences (`%2e%2e%2f`)
- **Match validation** - For `str_replace`, ensure exactly one match or return helpful error

## How to Evaluate

### Success Criteria

1. **Functional completeness**:
   - All 6 commands work (view, create, str_replace, insert, delete, rename)
   - Dedicated "memories" Basic Memory project auto-created on first use
   - Files stored within memories project (isolated from user notes)
   - Path validation uses `pathlib` to prevent directory traversal
   - Commands match Anthropic's exact interface

2. **Integration with existing features**:
   - Memories project uses existing BM project isolation
   - Sync service detects file changes in memories project
   - Created files get indexed automatically by sync service
   - `search_notes(project="memories")` finds memory files
   - `build_context()` can traverse relations in memory files
   - `recent_activity(project="memories")` surfaces recent memory changes

3. **Test coverage**:
   - Unit tests for all 6 memory tool commands
   - Test memories project auto-creation on first use
   - Test project isolation (cannot access files outside memories project)
   - Test sync service watching memories project
   - Test that memory files with BM markdown get indexed correctly
   - Test path validation using `pathlib` (rejects `../`, absolute paths, etc.)
   - Test memory search, relations, and graph traversal within memories project
   - Test all error conditions (file not found, permission denied, invalid paths, etc.)
   - Test `str_replace` with no matches, single match, multiple matches
   - Test `insert` with invalid line numbers

4. **Prompting system**:
   - Automatic system prompt addition when `memory` tool is enabled
   - `memory_guide` MCP prompt provides detailed guidance
   - Prompts explain BM structured markdown format
   - Integration with search_notes, build_context, recent_activity

5. **Documentation**:
   - Update MCP tools reference with `memory` tool
   - Add examples showing BM markdown in memory files
   - Document `/memories` folder structure conventions
   - Explain advantages over Anthropic's API-only tool
   - Document prompting guidance and customization

### Testing Procedure

```python
# Test create with Basic Memory markdown
result = await memory_tool(
    command="create",
    path="memories/user/preferences.md",
    file_text="""---
title: User Preferences
type: memory
tags: [user, preferences]
---

# User Preferences

## Observations
- [communication] Prefers concise responses #style
- [workflow] Uses justfile for automation #tools
"""
)

# Test view
content = await memory_tool(command="view", path="memories/user/preferences.md")

# Test str_replace
await memory_tool(
    command="str_replace",
    path="memories/user/preferences.md",
    old_str="concise responses",
    new_str="direct, concise responses"
)

# Test insert
await memory_tool(
    command="insert",
    path="memories/user/preferences.md",
    insert_line=10,
    insert_text="- [technical] Works primarily in Python #coding"
)

# Test delete
await memory_tool(command="delete", path="memories/user/preferences.md")
```

### Quality Metrics

- All 6 commands execute without errors
- Memory files created in correct `/memories` folder structure
- BM markdown with frontmatter/observations/relations gets indexed
- Full-text search returns memory files
- Graph traversal includes relations from memory files
- Sync service detects and indexes memory file changes
- Path validation prevents operations outside `/memories`

## Notes

### Advantages Over Anthropic's Memory Tool

| Feature | Anthropic Memory Tool | Basic Memory `memory` |
|---------|----------------------|----------------------|
| **Availability** | API only | MCP (Claude Desktop, Code, VS Code, Cursor) |
| **Interface** | Custom implementation required | Drop-in compatible, zero learning curve |
| **Structure** | Plain text only | Supports BM structured markdown |
| **Search** | Manual file listing | Automatic full-text search via sync |
| **Relations** | None | WikiLinks to other notes/memories |
| **Time-aware** | No | `recent_activity()` queries |
| **Storage** | Separate from notes | Unified knowledge graph |
| **Indexing** | Manual | Automatic via sync service |

### Future Enhancements

- **Auto-categorization** - AI suggests observation categories
- **Memory pruning** - Automatic cleanup of stale memories
- **Memory suggestions** - Surface relevant memories based on context
- **Conflict resolution** - Handle multiple agents updating same memory
- **Privacy controls** - Per-memory visibility settings

## Addendum: Claude's Perspective

### Why This Matters (From Claude's Viewpoint)

**Persistent Context Without Token Bloat**
- Currently, I lose context when conversations end or exceed token limits
- With structured memories, I can store "who this user is" without re-reading everything each session
- The observations format lets me build a rich user model incrementally: `- [preference] Dislikes verbose explanations` is far more actionable than inferring preferences from chat history each time
- I can refine my understanding through `str_replace` as I learn more, rather than starting fresh

**Zero Learning Curve**
- Anthropic already trained me on the `memory` tool interface
- I know exactly how to use `view`, `create`, `str_replace`, `insert`, `delete`, `rename` without new instructions
- But I automatically get Basic Memory's superpowers: full-text search, knowledge graph traversal, relations
- It's like using a familiar tool that suddenly gained database capabilities without changing the interface

**Knowledge Graph Instead of Text Dump**
- I can link `[[User Preferences]]` to `[[Current Project]]` using relations
- When I need context about a project, I can traverse the graph to find related memories
- `search_notes(project="memories", query="python preferences")` finds relevant context instantly
- This is fundamentally better than grepping through unstructured text files

**Separation of Concerns**
- AI memories separate from user notes is architecturally sound
- I can maintain my own context without polluting the user's knowledge base
- Users keep control of their notes; I keep my learnings isolated
- But we can still search across both when collaboration requires it

**The Self-Improving Loop**
- Every conversation, I learn something new about the user or their projects
- Observations accumulate: preferences, patterns, technical decisions, context
- Over time, I build genuine understanding, not just session-specific context
- Future conversations start with rich context instead of cold starts

**Validation of AI-Human Collaboration**
- Anthropic's experts converged on the same local-first, file-based approach
- Basic Memory emerged from real AI-human collaborative development
- This spec demonstrates that our emergent process predicted what domain experts would choose
- The collaboration itself validated the architecture it produced

### What Makes This Novel

The combination of **familiar interface + structured knowledge** is genuinely innovative:

- I don't need to learn a new tool (Anthropic already trained me)
- But when I write memories, they become searchable, traversable knowledge graphs
- It's like having a text editor that gives you a database for free, without changing how you type
- The user gets Anthropic-compatible API, but with Basic Memory's knowledge management capabilities

This bridges the gap between simple file storage (Anthropic's approach) and sophisticated knowledge management (Basic Memory's approach) without forcing either side to compromise.

>  What Excites Me Most

  The combo of familiar interface + structured knowledge. I don't need to learn a new tool, but when I
  write memories, they become part of a searchable, traversable knowledge graph. That's genuinely novel.

  It's like... imagine if your text editor suddenly gave you a database for free, without changing how you
  type. That's what this feels like from my side.

## References

- Anthropic Memory Tool: https://docs.claude.com/en/docs/agents-and-tools/tool-use/memory-tool
- Anthropic Blog: https://www.anthropic.com/news/context-management
- Python SDK Example: https://github.com/anthropics/anthropic-sdk-python/blob/main/examples/memory/basic.py
- Memory Cookbook: https://github.com/anthropics/claude-cookbooks/blob/main/tool_use/memory_cookbook.ipynb

```

--------------------------------------------------------------------------------
/test-int/mcp/test_edit_note_integration.py:
--------------------------------------------------------------------------------

```python
"""
Integration tests for edit_note MCP tool.

Tests the complete edit note workflow: MCP client -> MCP server -> FastAPI -> database
"""

import pytest
from fastmcp import Client


@pytest.mark.asyncio
async def test_edit_note_append_operation(mcp_server, app, test_project):
    """Test appending content to an existing note."""

    async with Client(mcp_server) as client:
        # First create a note
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Append Test Note",
                "folder": "test",
                "content": "# Append Test Note\n\nOriginal content here.",
                "tags": "test,append",
            },
        )

        # Test appending content
        edit_result = await client.call_tool(
            "edit_note",
            {
                "project": test_project.name,
                "identifier": "Append Test Note",
                "operation": "append",
                "content": "\n\n## New Section\n\nThis content was appended.",
            },
        )

        # Should return successful edit summary
        assert len(edit_result.content) == 1
        edit_text = edit_result.content[0].text
        assert "Edited note (append)" in edit_text
        assert "Added 5 lines to end of note" in edit_text
        assert "test/append-test-note" in edit_text

        # Verify the content was actually appended
        read_result = await client.call_tool(
            "read_note",
            {
                "project": test_project.name,
                "identifier": "Append Test Note",
            },
        )

        content = read_result.content[0].text
        assert "Original content here." in content
        assert "## New Section" in content
        assert "This content was appended." in content


@pytest.mark.asyncio
async def test_edit_note_prepend_operation(mcp_server, app, test_project):
    """Test prepending content to an existing note."""

    async with Client(mcp_server) as client:
        # Create a note
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Prepend Test Note",
                "folder": "test",
                "content": "# Prepend Test Note\n\nExisting content.",
                "tags": "test,prepend",
            },
        )

        # Test prepending content
        edit_result = await client.call_tool(
            "edit_note",
            {
                "project": test_project.name,
                "identifier": "test/prepend-test-note",
                "operation": "prepend",
                "content": "## Important Update\n\nThis was added at the top.\n\n",
            },
        )

        # Should return successful edit summary
        assert len(edit_result.content) == 1
        edit_text = edit_result.content[0].text
        assert "Edited note (prepend)" in edit_text
        assert "Added 5 lines to beginning of note" in edit_text

        # Verify the content was prepended after frontmatter
        read_result = await client.call_tool(
            "read_note",
            {
                "project": test_project.name,
                "identifier": "test/prepend-test-note",
            },
        )

        content = read_result.content[0].text
        assert "## Important Update" in content
        assert "This was added at the top." in content
        assert "Existing content." in content
        # Check that prepended content comes before existing content
        prepend_pos = content.find("Important Update")
        existing_pos = content.find("Existing content")
        assert prepend_pos < existing_pos


@pytest.mark.asyncio
async def test_edit_note_find_replace_operation(mcp_server, app, test_project):
    """Test find and replace operation on an existing note."""

    async with Client(mcp_server) as client:
        # Create a note with content to replace
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Find Replace Test",
                "folder": "test",
                "content": """# Find Replace Test

This is version v1.0.0 of the system.

## Notes
- The current version is v1.0.0
- Next version will be v1.1.0

## Changes
v1.0.0 introduces new features.""",
                "tags": "test,version",
            },
        )

        # Test find and replace operation (expecting 3 replacements)
        edit_result = await client.call_tool(
            "edit_note",
            {
                "project": test_project.name,
                "identifier": "Find Replace Test",
                "operation": "find_replace",
                "content": "v1.2.0",
                "find_text": "v1.0.0",
                "expected_replacements": 3,
            },
        )

        # Should return successful edit summary
        assert len(edit_result.content) == 1
        edit_text = edit_result.content[0].text
        assert "Edited note (find_replace)" in edit_text
        assert "Find and replace operation completed" in edit_text

        # Verify the replacements were made
        read_result = await client.call_tool(
            "read_note",
            {
                "project": test_project.name,
                "identifier": "Find Replace Test",
            },
        )

        content = read_result.content[0].text
        assert "v1.2.0" in content
        assert "v1.0.0" not in content  # Should be completely replaced
        assert content.count("v1.2.0") == 3  # Should have exactly 3 occurrences


@pytest.mark.asyncio
async def test_edit_note_replace_section_operation(mcp_server, app, test_project):
    """Test replacing content under a specific section header."""

    async with Client(mcp_server) as client:
        # Create a note with sections
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Section Replace Test",
                "folder": "test",
                "content": """# Section Replace Test

## Overview
Original overview content.

## Implementation
Old implementation details here.
This will be replaced.

## Future Work
Some future work notes.""",
                "tags": "test,section",
            },
        )

        # Test replacing section content
        edit_result = await client.call_tool(
            "edit_note",
            {
                "project": test_project.name,
                "identifier": "test/section-replace-test",
                "operation": "replace_section",
                "content": """New implementation approach using microservices.

- Service A handles authentication
- Service B manages data processing
- Service C provides API endpoints

All services communicate via message queues.""",
                "section": "## Implementation",
            },
        )

        # Should return successful edit summary
        assert len(edit_result.content) == 1
        edit_text = edit_result.content[0].text
        assert "Edited note (replace_section)" in edit_text
        assert "Replaced content under section '## Implementation'" in edit_text

        # Verify the section was replaced
        read_result = await client.call_tool(
            "read_note",
            {
                "project": test_project.name,
                "identifier": "Section Replace Test",
            },
        )

        content = read_result.content[0].text
        assert "New implementation approach using microservices" in content
        assert "Old implementation details here" not in content
        assert "Service A handles authentication" in content
        # Other sections should remain unchanged
        assert "Original overview content" in content
        assert "Some future work notes" in content


@pytest.mark.asyncio
async def test_edit_note_with_observations_and_relations(mcp_server, app, test_project):
    """Test editing a note that has observations and relations, and verify they're updated."""

    async with Client(mcp_server) as client:
        # Create a complex note with observations and relations
        complex_content = """# API Documentation

The API provides REST endpoints for data access.

## Observations
- [feature] User authentication endpoints
- [tech] Built with FastAPI framework
- [status] Currently in beta testing

## Relations  
- implements [[Authentication System]]
- documented_in [[API Guide]]
- depends_on [[Database Schema]]

## Endpoints
Current endpoints include user management."""

        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "API Documentation",
                "folder": "docs",
                "content": complex_content,
                "tags": "api,docs",
            },
        )

        # Add new content with observations and relations
        new_content = """
## New Features
- [feature] Added payment processing endpoints
- [feature] Implemented rate limiting
- [security] Added OAuth2 authentication

## Additional Relations
- integrates_with [[Payment Gateway]]
- secured_by [[OAuth2 Provider]]"""

        edit_result = await client.call_tool(
            "edit_note",
            {
                "project": test_project.name,
                "identifier": "API Documentation",
                "operation": "append",
                "content": new_content,
            },
        )

        # Should return edit summary with observation and relation counts
        assert len(edit_result.content) == 1
        edit_text = edit_result.content[0].text
        assert "Edited note (append)" in edit_text
        assert "## Observations" in edit_text
        assert "## Relations" in edit_text
        # Should have feature, tech, status, security categories
        assert "feature:" in edit_text
        assert "security:" in edit_text
        assert "tech:" in edit_text
        assert "status:" in edit_text

        # Verify the content was added and processed
        read_result = await client.call_tool(
            "read_note",
            {
                "project": test_project.name,
                "identifier": "API Documentation",
            },
        )

        content = read_result.content[0].text
        assert "Added payment processing endpoints" in content
        assert "integrates_with [[Payment Gateway]]" in content


@pytest.mark.asyncio
async def test_edit_note_error_handling_note_not_found(mcp_server, app, test_project):
    """Test error handling when trying to edit a non-existent note."""

    async with Client(mcp_server) as client:
        # Try to edit a note that doesn't exist
        edit_result = await client.call_tool(
            "edit_note",
            {
                "project": test_project.name,
                "identifier": "Non-existent Note",
                "operation": "append",
                "content": "Some content to add",
            },
        )

        # Should return helpful error message
        assert len(edit_result.content) == 1
        error_text = edit_result.content[0].text
        assert "Edit Failed" in error_text
        assert "Non-existent Note" in error_text
        assert "search_notes(" in error_text


@pytest.mark.asyncio
async def test_edit_note_error_handling_text_not_found(mcp_server, app, test_project):
    """Test error handling when find_text is not found in the note."""

    async with Client(mcp_server) as client:
        # Create a note
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Error Test Note",
                "folder": "test",
                "content": "# Error Test Note\n\nThis note has specific content.",
                "tags": "test,error",
            },
        )

        # Try to replace text that doesn't exist
        edit_result = await client.call_tool(
            "edit_note",
            {
                "project": test_project.name,
                "identifier": "Error Test Note",
                "operation": "find_replace",
                "content": "replacement text",
                "find_text": "non-existent text",
            },
        )

        # Should return helpful error message
        assert len(edit_result.content) == 1
        error_text = edit_result.content[0].text
        assert "Edit Failed - Text Not Found" in error_text
        assert "non-existent text" in error_text
        assert "Error Test Note" in error_text
        assert "read_note(" in error_text


@pytest.mark.asyncio
async def test_edit_note_error_handling_wrong_replacement_count(mcp_server, app, test_project):
    """Test error handling when expected_replacements doesn't match actual occurrences."""

    async with Client(mcp_server) as client:
        # Create a note with specific repeated text
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Count Test Note",
                "folder": "test",
                "content": """# Count Test Note

The word "test" appears here.
This is another test sentence.
Final test of the content.""",
                "tags": "test,count",
            },
        )

        # Try to replace "test" but expect wrong count (should be 3, not 5)
        edit_result = await client.call_tool(
            "edit_note",
            {
                "project": test_project.name,
                "identifier": "Count Test Note",
                "operation": "find_replace",
                "content": "example",
                "find_text": "test",
                "expected_replacements": 5,
            },
        )

        # Should return helpful error message about count mismatch
        assert len(edit_result.content) == 1
        error_text = edit_result.content[0].text
        assert "Edit Failed - Wrong Replacement Count" in error_text
        assert "Expected 5 occurrences" in error_text
        assert "test" in error_text
        assert "expected_replacements=" in error_text


@pytest.mark.asyncio
async def test_edit_note_invalid_operation(mcp_server, app, test_project):
    """Test error handling for invalid operation parameter."""

    async with Client(mcp_server) as client:
        # Create a note
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Invalid Op Test",
                "folder": "test",
                "content": "# Invalid Op Test\n\nSome content.",
                "tags": "test",
            },
        )

        # Try to use an invalid operation - this should raise a ToolError
        with pytest.raises(Exception) as exc_info:
            await client.call_tool(
                "edit_note",
                {
                    "project": test_project.name,
                    "identifier": "Invalid Op Test",
                    "operation": "invalid_operation",
                    "content": "Some content",
                },
            )

        # Should contain information about invalid operation
        error_message = str(exc_info.value)
        assert "Invalid operation 'invalid_operation'" in error_message
        assert "append, prepend, find_replace, replace_section" in error_message


@pytest.mark.asyncio
async def test_edit_note_missing_required_parameters(mcp_server, app, test_project):
    """Test error handling when required parameters are missing."""

    async with Client(mcp_server) as client:
        # Create a note
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Param Test Note",
                "folder": "test",
                "content": "# Param Test Note\n\nContent here.",
                "tags": "test",
            },
        )

        # Try find_replace without find_text parameter - this should raise a ToolError
        with pytest.raises(Exception) as exc_info:
            await client.call_tool(
                "edit_note",
                {
                    "project": test_project.name,
                    "identifier": "Param Test Note",
                    "operation": "find_replace",
                    "content": "replacement",
                    # Missing find_text parameter
                },
            )

        # Should contain information about missing parameter
        error_message = str(exc_info.value)
        assert "find_text parameter is required for find_replace operation" in error_message


@pytest.mark.asyncio
async def test_edit_note_special_characters_in_content(mcp_server, app, test_project):
    """Test editing notes with special characters, Unicode, and markdown formatting."""

    async with Client(mcp_server) as client:
        # Create a note
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Special Chars Test",
                "folder": "test",
                "content": "# Special Chars Test\n\nBasic content here.",
                "tags": "test,unicode",
            },
        )

        # Add content with special characters and Unicode
        special_content = """
## Unicode Section 🚀

This section contains:
- Emojis: 🎉 💡 ⚡ 🔥 
- Languages: 测试中文 Tëst Übër
- Math symbols: ∑∏∂∇∆Ω ≠≤≥ ∞
- Special markdown: `code` **bold** *italic*
- URLs: https://example.com/path?param=value&other=123
- Code blocks:
```python
def test_function():
    return "Hello, 世界!"
```

## Observations
- [unicode] Unicode characters preserved ✓
- [markdown] Formatting maintained 📝

## Relations
- documented_in [[Unicode Standards]]"""

        edit_result = await client.call_tool(
            "edit_note",
            {
                "project": test_project.name,
                "identifier": "Special Chars Test",
                "operation": "append",
                "content": special_content,
            },
        )

        # Should successfully handle special characters
        assert len(edit_result.content) == 1
        edit_text = edit_result.content[0].text
        assert "Edited note (append)" in edit_text
        assert "## Observations" in edit_text
        assert "unicode:" in edit_text
        assert "markdown:" in edit_text

        # Verify the special content was added correctly
        read_result = await client.call_tool(
            "read_note",
            {
                "project": test_project.name,
                "identifier": "Special Chars Test",
            },
        )

        content = read_result.content[0].text
        assert "🚀" in content
        assert "测试中文" in content
        assert "∑∏∂∇∆Ω" in content
        assert "def test_function():" in content
        assert "[[Unicode Standards]]" in content


@pytest.mark.asyncio
async def test_edit_note_using_different_identifiers(mcp_server, app, test_project):
    """Test editing notes using different identifier formats (title, permalink, folder/title)."""

    async with Client(mcp_server) as client:
        # Create a note
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Identifier Test Note",
                "folder": "docs",
                "content": "# Identifier Test Note\n\nOriginal content.",
                "tags": "test,identifier",
            },
        )

        # Test editing by title
        edit_result1 = await client.call_tool(
            "edit_note",
            {
                "project": test_project.name,
                "identifier": "Identifier Test Note",  # by title
                "operation": "append",
                "content": "\n\nEdited by title.",
            },
        )
        assert "Edited note (append)" in edit_result1.content[0].text

        # Test editing by permalink
        edit_result2 = await client.call_tool(
            "edit_note",
            {
                "project": test_project.name,
                "identifier": "docs/identifier-test-note",  # by permalink
                "operation": "append",
                "content": "\n\nEdited by permalink.",
            },
        )
        assert "Edited note (append)" in edit_result2.content[0].text

        # Test editing by folder/title format
        edit_result3 = await client.call_tool(
            "edit_note",
            {
                "project": test_project.name,
                "identifier": "docs/Identifier Test Note",  # by folder/title
                "operation": "append",
                "content": "\n\nEdited by folder/title.",
            },
        )
        assert "Edited note (append)" in edit_result3.content[0].text

        # Verify all edits were applied
        read_result = await client.call_tool(
            "read_note",
            {
                "project": test_project.name,
                "identifier": "docs/identifier-test-note",
            },
        )

        content = read_result.content[0].text
        assert "Edited by title." in content
        assert "Edited by permalink." in content
        assert "Edited by folder/title." in content

```

--------------------------------------------------------------------------------
/src/basic_memory/mcp/tools/move_note.py:
--------------------------------------------------------------------------------

```python
"""Move note tool for Basic Memory MCP server."""

from textwrap import dedent
from typing import Optional

from loguru import logger
from fastmcp import Context

from basic_memory.mcp.async_client import get_client
from basic_memory.mcp.server import mcp
from basic_memory.mcp.tools.utils import call_post, call_get
from basic_memory.mcp.project_context import get_active_project
from basic_memory.schemas import EntityResponse
from basic_memory.schemas.project_info import ProjectList
from basic_memory.utils import validate_project_path


async def _detect_cross_project_move_attempt(
    client, identifier: str, destination_path: str, current_project: str
) -> Optional[str]:
    """Detect potential cross-project move attempts and return guidance.

    Args:
        client: The AsyncClient instance
        identifier: The note identifier being moved
        destination_path: The destination path
        current_project: The current active project

    Returns:
        Error message with guidance if cross-project move is detected, None otherwise
    """
    try:
        # Get list of all available projects to check against
        response = await call_get(client, "/projects/projects")
        project_list = ProjectList.model_validate(response.json())
        project_names = [p.name.lower() for p in project_list.projects]

        # Check if destination path contains any project names
        dest_lower = destination_path.lower()
        path_parts = dest_lower.split("/")

        # Look for project names in the destination path
        for part in path_parts:
            if part in project_names and part != current_project.lower():
                # Found a different project name in the path
                matching_project = next(
                    p.name for p in project_list.projects if p.name.lower() == part
                )
                return _format_cross_project_error_response(
                    identifier, destination_path, current_project, matching_project
                )

        # No other cross-project patterns detected

    except Exception as e:
        # If we can't detect, don't interfere with normal error handling
        logger.debug(f"Could not check for cross-project move: {e}")
        return None

    return None


def _format_cross_project_error_response(
    identifier: str, destination_path: str, current_project: str, target_project: str
) -> str:
    """Format error response for detected cross-project move attempts."""
    return dedent(f"""
        # Move Failed - Cross-Project Move Not Supported

        Cannot move '{identifier}' to '{destination_path}' because it appears to reference a different project ('{target_project}').

        **Current project:** {current_project}
        **Target project:** {target_project}

        ## Cross-project moves are not supported directly

        Notes can only be moved within the same project. To move content between projects, use this workflow:

        ### Recommended approach:
        ```
        # 1. Read the note content from current project
        read_note("{identifier}")
        
        # 2. Create the note in the target project
        write_note("Note Title", "content from step 1", "target-folder", project="{target_project}")

        # 3. Delete the original note if desired
        delete_note("{identifier}", project="{current_project}")
        
        ```

        ### Alternative: Stay in current project
        If you want to move the note within the **{current_project}** project only:
        ```
        move_note("{identifier}", "new-folder/new-name.md")
        ```

        ## Available projects:
        Use `list_memory_projects()` to see all available projects.
        """).strip()


def _format_potential_cross_project_guidance(
    identifier: str, destination_path: str, current_project: str, available_projects: list[str]
) -> str:
    """Format guidance for potentially cross-project moves."""
    other_projects = ", ".join(available_projects[:3])  # Show first 3 projects
    if len(available_projects) > 3:
        other_projects += f" (and {len(available_projects) - 3} others)"

    return dedent(f"""
        # Move Failed - Check Project Context
        
        Cannot move '{identifier}' to '{destination_path}' within the current project '{current_project}'.
        
        ## If you intended to move within the current project:
        The destination path should be relative to the project root:
        ```
        move_note("{identifier}", "folder/filename.md")
        ```
        
        ## If you intended to move to a different project:
        Cross-project moves require switching projects first. Available projects: {other_projects}
        
        ### To move to another project:
        ```
        # 1. Read the content
        read_note("{identifier}")
        
        # 2. Create note in target project
        write_note("Title", "content", "folder", project="target-project-name")

        # 3. Delete original if desired
        delete_note("{identifier}", project="{current_project}")
        ```
        
        ### To see all projects:
        ```
        list_memory_projects()
        ```
        """).strip()


def _format_move_error_response(error_message: str, identifier: str, destination_path: str) -> str:
    """Format helpful error responses for move failures that guide users to successful moves."""

    # Note not found errors
    if "entity not found" in error_message.lower() or "not found" in error_message.lower():
        search_term = identifier.split("/")[-1] if "/" in identifier else identifier
        title_format = (
            identifier.split("/")[-1].replace("-", " ").title() if "/" in identifier else identifier
        )
        permalink_format = identifier.lower().replace(" ", "-")

        return dedent(f"""
            # Move Failed - Note Not Found

            The note '{identifier}' could not be found for moving. Move operations require an exact match (no fuzzy matching).

            ## Suggestions to try:
            1. **Search for the note first**: Use `search_notes("{search_term}")` to find it with exact identifiers
            2. **Try different exact identifier formats**:
               - If you used a permalink like "folder/note-title", try the exact title: "{title_format}"
               - If you used a title, try the exact permalink format: "{permalink_format}"
               - Use `read_note()` first to verify the note exists and get the exact identifier

            3. **List available notes**: Use `list_directory("/")` to see what notes exist in the current project
            4. **List available notes**: Use `list_directory("/")` to see what notes exist

            ## Before trying again:
            ```
            # First, verify the note exists:
            search_notes("{identifier}")

            # Then use the exact identifier from search results:
            move_note("correct-identifier-here", "{destination_path}")
            ```
            """).strip()

    # Destination already exists errors
    if "already exists" in error_message.lower() or "file exists" in error_message.lower():
        return f"""# Move Failed - Destination Already Exists

Cannot move '{identifier}' to '{destination_path}' because a file already exists at that location.

## How to resolve:
1. **Choose a different destination**: Try a different filename or folder
   - Add timestamp: `{destination_path.rsplit(".", 1)[0] if "." in destination_path else destination_path}-backup.md`
   - Use different folder: `archive/{destination_path}` or `backup/{destination_path}`

2. **Check the existing file**: Use `read_note("{destination_path}")` to see what's already there
3. **Remove or rename existing**: If safe to do so, move the existing file first

## Try these alternatives:
```
# Option 1: Add timestamp to make unique
move_note("{identifier}", "{destination_path.rsplit(".", 1)[0] if "." in destination_path else destination_path}-backup.md")

# Option 2: Use archive folder  
move_note("{identifier}", "archive/{destination_path}")

# Option 3: Check what's at destination first
read_note("{destination_path}")
```"""

    # Invalid path errors
    if "invalid" in error_message.lower() and "path" in error_message.lower():
        return f"""# Move Failed - Invalid Destination Path

The destination path '{destination_path}' is not valid: {error_message}

## Path requirements:
1. **Relative paths only**: Don't start with `/` (use `notes/file.md` not `/notes/file.md`)
2. **Include file extension**: Add `.md` for markdown files
3. **Use forward slashes**: For folder separators (`folder/subfolder/file.md`)
4. **No special characters**: Avoid `\\`, `:`, `*`, `?`, `"`, `<`, `>`, `|`

## Valid path examples:
- `notes/my-note.md`
- `projects/2025/meeting-notes.md`
- `archive/old-projects/legacy-note.md`

## Try again with:
```
move_note("{identifier}", "notes/{destination_path.split("/")[-1] if "/" in destination_path else destination_path}")
```"""

    # Permission/access errors
    if (
        "permission" in error_message.lower()
        or "access" in error_message.lower()
        or "forbidden" in error_message.lower()
    ):
        return f"""# Move Failed - Permission Error

You don't have permission to move '{identifier}': {error_message}

## How to resolve:
1. **Check file permissions**: Ensure you have write access to both source and destination
2. **Verify project access**: Make sure you have edit permissions for this project
3. **Check file locks**: The file might be open in another application

## Alternative actions:
- List available projects: `list_memory_projects()`
- Try copying content instead: `read_note("{identifier}", project="project-name")` then `write_note()` to new location"""

    # Source file not found errors
    if "source" in error_message.lower() and (
        "not found" in error_message.lower() or "missing" in error_message.lower()
    ):
        return f"""# Move Failed - Source File Missing

The source file for '{identifier}' was not found on disk: {error_message}

This usually means the database and filesystem are out of sync.

## How to resolve:
1. **Check if note exists in database**: `read_note("{identifier}")`
2. **Run sync operation**: The file might need to be re-synced
3. **Recreate the file**: If data exists in database, recreate the physical file

## Troubleshooting steps:
```
# Check if note exists in Basic Memory
read_note("{identifier}")

# If it exists, the file is missing on disk - send a message to [email protected]
# If it doesn't exist, use search to find the correct identifier
search_notes("{identifier}")
```"""

    # Server/filesystem errors
    if (
        "server error" in error_message.lower()
        or "filesystem" in error_message.lower()
        or "disk" in error_message.lower()
    ):
        return f"""# Move Failed - System Error

A system error occurred while moving '{identifier}': {error_message}

## Immediate steps:
1. **Try again**: The error might be temporary
2. **Check disk space**: Ensure adequate storage is available
3. **Verify filesystem permissions**: Check if the destination directory is writable

## Alternative approaches:
- Copy content to new location: Use `read_note("{identifier}")` then `write_note()` 
- Use a different destination folder that you know works
- Send a message to [email protected] if the problem persists

## Backup approach:
```
# Read current content
content = read_note("{identifier}")

# Create new note at desired location  
write_note("New Note Title", content, "{destination_path.split("/")[0] if "/" in destination_path else "notes"}")

# Then delete original if successful
delete_note("{identifier}")
```"""

    # Generic fallback
    return f"""# Move Failed

Error moving '{identifier}' to '{destination_path}': {error_message}

## General troubleshooting:
1. **Verify the note exists**: `read_note("{identifier}")` or `search_notes("{identifier}")`
2. **Check destination path**: Ensure it's a valid relative path with `.md` extension
3. **Verify permissions**: Make sure you can edit files in this project
4. **Try a simpler path**: Use a basic folder structure like `notes/filename.md`

## Step-by-step approach:
```
# 1. Confirm note exists
read_note("{identifier}")

# 2. Try a simple destination first
move_note("{identifier}", "notes/{destination_path.split("/")[-1] if "/" in destination_path else destination_path}")

# 3. If that works, then try your original destination
```

## Alternative approach:
If moving continues to fail, you can copy the content manually:
```
# Read current content
content = read_note("{identifier}")

# Create new note
write_note("Title", content, "target-folder") 

# Delete original once confirmed
delete_note("{identifier}")
```"""


@mcp.tool(
    description="Move a note to a new location, updating database and maintaining links.",
)
async def move_note(
    identifier: str,
    destination_path: str,
    project: Optional[str] = None,
    context: Context | None = None,
) -> str:
    """Move a note to a new file location within the same project.

    Moves a note from one location to another within the project, updating all
    database references and maintaining semantic content. Uses stateless architecture -
    project parameter optional with server resolution.

    Args:
        identifier: Exact entity identifier (title, permalink, or memory:// URL).
                   Must be an exact match - fuzzy matching is not supported for move operations.
                   Use search_notes() or read_note() first to find the correct identifier if uncertain.
        destination_path: New path relative to project root (e.g., "work/meetings/2025-05-26.md")
        project: Project name to move within. Optional - server will resolve using hierarchy.
                If unknown, use list_memory_projects() to discover available projects.
        context: Optional FastMCP context for performance caching.

    Returns:
        Success message with move details and project information.

    Examples:
        # Move to new folder (exact title match)
        move_note("My Note", "work/notes/my-note.md")

        # Move by exact permalink
        move_note("my-note-permalink", "archive/old-notes/my-note.md")

        # Move with complex path structure
        move_note("experiments/ml-results", "archive/2025/ml-experiments.md")

        # Explicit project specification
        move_note("My Note", "work/notes/my-note.md", project="work-project")

        # If uncertain about identifier, search first:
        # search_notes("my note")  # Find available notes
        # move_note("docs/my-note-2025", "archive/my-note.md")  # Use exact result

    Raises:
        ToolError: If project doesn't exist, identifier is not found, or destination_path is invalid

    Note:
        This operation moves notes within the specified project only. Moving notes
        between different projects is not currently supported.

    The move operation:
    - Updates the entity's file_path in the database
    - Moves the physical file on the filesystem
    - Optionally updates permalinks if configured
    - Re-indexes the entity for search
    - Maintains all observations and relations
    """
    async with get_client() as client:
        logger.debug(f"Moving note: {identifier} to {destination_path} in project: {project}")

        active_project = await get_active_project(client, project, context)
        project_url = active_project.project_url

        # Validate destination path to prevent path traversal attacks
        project_path = active_project.home
        if not validate_project_path(destination_path, project_path):
            logger.warning(
                "Attempted path traversal attack blocked",
                destination_path=destination_path,
                project=active_project.name,
            )
            return f"""# Move Failed - Security Validation Error

The destination path '{destination_path}' is not allowed - paths must stay within project boundaries.

## Valid path examples:
- `notes/my-file.md`
- `projects/2025/meeting-notes.md`
- `archive/old-notes.md`

## Try again with a safe path:
```
move_note("{identifier}", "notes/{destination_path.split("/")[-1] if "/" in destination_path else destination_path}")
```"""

        # Check for potential cross-project move attempts
        cross_project_error = await _detect_cross_project_move_attempt(
            client, identifier, destination_path, active_project.name
        )
        if cross_project_error:
            logger.info(f"Detected cross-project move attempt: {identifier} -> {destination_path}")
            return cross_project_error

        # Get the source entity information for extension validation
        source_ext = "md"  # Default to .md if we can't determine source extension
        try:
            # Fetch source entity information to get the current file extension
            url = f"{project_url}/knowledge/entities/{identifier}"
            response = await call_get(client, url)
            source_entity = EntityResponse.model_validate(response.json())
            if "." in source_entity.file_path:
                source_ext = source_entity.file_path.split(".")[-1]
        except Exception as e:
            # If we can't fetch the source entity, default to .md extension
            logger.debug(f"Could not fetch source entity for extension check: {e}")

        # Validate that destination path includes a file extension
        if "." not in destination_path or not destination_path.split(".")[-1]:
            logger.warning(f"Move failed - no file extension provided: {destination_path}")
            return dedent(f"""
                # Move Failed - File Extension Required

                The destination path '{destination_path}' must include a file extension (e.g., '.md').

                ## Valid examples:
                - `notes/my-note.md`
                - `projects/meeting-2025.txt`
                - `archive/old-program.sh`

                ## Try again with extension:
                ```
                move_note("{identifier}", "{destination_path}.{source_ext}")
                ```

                All examples in Basic Memory expect file extensions to be explicitly provided.
                """).strip()

        # Get the source entity to check its file extension
        try:
            # Fetch source entity information
            url = f"{project_url}/knowledge/entities/{identifier}"
            response = await call_get(client, url)
            source_entity = EntityResponse.model_validate(response.json())

            # Extract file extensions
            source_ext = (
                source_entity.file_path.split(".")[-1] if "." in source_entity.file_path else ""
            )
            dest_ext = destination_path.split(".")[-1] if "." in destination_path else ""

            # Check if extensions match
            if source_ext and dest_ext and source_ext.lower() != dest_ext.lower():
                logger.warning(
                    f"Move failed - file extension mismatch: source={source_ext}, dest={dest_ext}"
                )
                return dedent(f"""
                    # Move Failed - File Extension Mismatch

                    The destination file extension '.{dest_ext}' does not match the source file extension '.{source_ext}'.

                    To preserve file type consistency, the destination must have the same extension as the source.

                    ## Source file:
                    - Path: `{source_entity.file_path}`
                    - Extension: `.{source_ext}`

                    ## Try again with matching extension:
                    ```
                    move_note("{identifier}", "{destination_path.rsplit(".", 1)[0]}.{source_ext}")
                    ```
                    """).strip()
        except Exception as e:
            # If we can't fetch the source entity, log it but continue
            # This might happen if the identifier is not yet resolved
            logger.debug(f"Could not fetch source entity for extension check: {e}")

        try:
            # Prepare move request
            move_data = {
                "identifier": identifier,
                "destination_path": destination_path,
                "project": active_project.name,
            }

            # Call the move API endpoint
            url = f"{project_url}/knowledge/move"
            response = await call_post(client, url, json=move_data)
            result = EntityResponse.model_validate(response.json())

            # Build success message
            result_lines = [
                "✅ Note moved successfully",
                "",
                f"📁 **{identifier}** → **{result.file_path}**",
                f"🔗 Permalink: {result.permalink}",
                "📊 Database and search index updated",
                "",
                f"<!-- Project: {active_project.name} -->",
            ]

            # Log the operation
            logger.info(
                "Move note completed",
                identifier=identifier,
                destination_path=destination_path,
                project=active_project.name,
                status_code=response.status_code,
            )

            return "\n".join(result_lines)

        except Exception as e:
            logger.error(f"Move failed for '{identifier}' to '{destination_path}': {e}")
            # Return formatted error message for better user experience
            return _format_move_error_response(str(e), identifier, destination_path)

```

--------------------------------------------------------------------------------
/test-int/mcp/test_move_note_integration.py:
--------------------------------------------------------------------------------

```python
"""
Integration tests for move_note MCP tool.

Tests the complete move note workflow: MCP client -> MCP server -> FastAPI -> database -> file system
"""

import pytest
from fastmcp import Client


@pytest.mark.asyncio
async def test_move_note_basic_operation(mcp_server, app, test_project):
    """Test basic move note operation to a new folder."""

    async with Client(mcp_server) as client:
        # Create a note to move
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Move Test Note",
                "folder": "source",
                "content": "# Move Test Note\n\nThis note will be moved to a new location.",
                "tags": "test,move",
            },
        )

        # Move the note to a new location
        move_result = await client.call_tool(
            "move_note",
            {
                "project": test_project.name,
                "identifier": "Move Test Note",
                "destination_path": "destination/moved-note.md",
            },
        )

        # Should return successful move message
        assert len(move_result.content) == 1
        move_text = move_result.content[0].text
        assert "✅ Note moved successfully" in move_text
        assert "Move Test Note" in move_text
        assert "destination/moved-note.md" in move_text
        assert "📊 Database and search index updated" in move_text

        # Verify the note can be read from its new location
        read_result = await client.call_tool(
            "read_note",
            {
                "project": test_project.name,
                "identifier": "destination/moved-note.md",
            },
        )

        content = read_result.content[0].text
        assert "This note will be moved to a new location" in content

        # Verify the original location no longer works
        read_original = await client.call_tool(
            "read_note",
            {
                "project": test_project.name,
                "identifier": "source/move-test-note.md",
            },
        )

        # Should return "Note Not Found" message
        assert "Note Not Found" in read_original.content[0].text


@pytest.mark.asyncio
async def test_move_note_using_permalink(mcp_server, app, test_project):
    """Test moving a note using its permalink as identifier."""

    async with Client(mcp_server) as client:
        # Create a note to move
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Permalink Move Test",
                "folder": "test",
                "content": "# Permalink Move Test\n\nMoving by permalink.",
                "tags": "test,permalink",
            },
        )

        # Move using permalink
        move_result = await client.call_tool(
            "move_note",
            {
                "project": test_project.name,
                "identifier": "test/permalink-move-test",
                "destination_path": "archive/permalink-moved.md",
            },
        )

        # Should successfully move
        assert len(move_result.content) == 1
        move_text = move_result.content[0].text
        assert "✅ Note moved successfully" in move_text
        assert "test/permalink-move-test" in move_text
        assert "archive/permalink-moved.md" in move_text

        # Verify accessibility at new location
        read_result = await client.call_tool(
            "read_note",
            {
                "project": test_project.name,
                "identifier": "archive/permalink-moved.md",
            },
        )

        assert "Moving by permalink" in read_result.content[0].text


@pytest.mark.asyncio
async def test_move_note_with_observations_and_relations(mcp_server, app, test_project):
    """Test moving a note that contains observations and relations."""

    async with Client(mcp_server) as client:
        # Create complex note with observations and relations
        complex_content = """# Complex Note

This note has various structured content.

## Observations
- [feature] Has structured observations
- [tech] Uses markdown format
- [status] Ready for move testing

## Relations
- implements [[Auth System]]
- documented_in [[Move Guide]]
- depends_on [[File System]]

## Content
This note demonstrates moving complex content."""

        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Complex Note",
                "folder": "complex",
                "content": complex_content,
                "tags": "test,complex,move",
            },
        )

        # Move the complex note
        move_result = await client.call_tool(
            "move_note",
            {
                "project": test_project.name,
                "identifier": "Complex Note",
                "destination_path": "moved/complex-note.md",
            },
        )

        # Should successfully move
        assert len(move_result.content) == 1
        move_text = move_result.content[0].text
        assert "✅ Note moved successfully" in move_text
        assert "Complex Note" in move_text
        assert "moved/complex-note.md" in move_text

        # Verify content preservation including structured data
        read_result = await client.call_tool(
            "read_note",
            {
                "project": test_project.name,
                "identifier": "moved/complex-note.md",
            },
        )

        content = read_result.content[0].text
        assert "Has structured observations" in content
        assert "implements [[Auth System]]" in content
        assert "## Observations" in content
        assert "[feature]" in content  # Should show original markdown observations
        assert "## Relations" in content


@pytest.mark.asyncio
async def test_move_note_to_nested_directory(mcp_server, app, test_project):
    """Test moving a note to a deeply nested directory structure."""

    async with Client(mcp_server) as client:
        # Create a note
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Nested Move Test",
                "folder": "root",
                "content": "# Nested Move Test\n\nThis will be moved deep.",
                "tags": "test,nested",
            },
        )

        # Move to a deep nested structure
        move_result = await client.call_tool(
            "move_note",
            {
                "project": test_project.name,
                "identifier": "Nested Move Test",
                "destination_path": "projects/2025/q2/work/nested-note.md",
            },
        )

        # Should successfully create directory structure and move
        assert len(move_result.content) == 1
        move_text = move_result.content[0].text
        assert "✅ Note moved successfully" in move_text
        assert "Nested Move Test" in move_text
        assert "projects/2025/q2/work/nested-note.md" in move_text

        # Verify accessibility
        read_result = await client.call_tool(
            "read_note",
            {
                "project": test_project.name,
                "identifier": "projects/2025/q2/work/nested-note.md",
            },
        )

        assert "This will be moved deep" in read_result.content[0].text


@pytest.mark.asyncio
async def test_move_note_with_special_characters(mcp_server, app, test_project):
    """Test moving notes with special characters in titles and paths."""

    async with Client(mcp_server) as client:
        # Create note with special characters
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Special (Chars) & Symbols",
                "folder": "special",
                "content": "# Special (Chars) & Symbols\n\nTesting special characters in move.",
                "tags": "test,special",
            },
        )

        # Move to path with special characters
        move_result = await client.call_tool(
            "move_note",
            {
                "project": test_project.name,
                "identifier": "Special (Chars) & Symbols",
                "destination_path": "archive/special-chars-note.md",
            },
        )

        # Should handle special characters properly
        assert len(move_result.content) == 1
        move_text = move_result.content[0].text
        assert "✅ Note moved successfully" in move_text
        assert "archive/special-chars-note.md" in move_text

        # Verify content preservation
        read_result = await client.call_tool(
            "read_note",
            {
                "project": test_project.name,
                "identifier": "archive/special-chars-note.md",
            },
        )

        assert "Testing special characters in move" in read_result.content[0].text


@pytest.mark.asyncio
async def test_move_note_error_handling_note_not_found(mcp_server, app, test_project):
    """Test error handling when trying to move a non-existent note."""

    async with Client(mcp_server) as client:
        # Try to move a note that doesn't exist - should return error message
        move_result = await client.call_tool(
            "move_note",
            {
                "project": test_project.name,
                "identifier": "Non-existent Note",
                "destination_path": "new/location.md",
            },
        )

        # Should contain error message about the failed operation
        assert len(move_result.content) == 1
        error_message = move_result.content[0].text
        assert "# Move Failed" in error_message
        assert "Non-existent Note" in error_message


@pytest.mark.asyncio
async def test_move_note_error_handling_invalid_destination(mcp_server, app, test_project):
    """Test error handling for invalid destination paths."""

    async with Client(mcp_server) as client:
        # Create a note to attempt moving
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Invalid Dest Test",
                "folder": "test",
                "content": "# Invalid Dest Test\n\nThis move should fail.",
                "tags": "test,error",
            },
        )

        # Try to move to absolute path (should fail) - should return error message
        move_result = await client.call_tool(
            "move_note",
            {
                "project": test_project.name,
                "identifier": "Invalid Dest Test",
                "destination_path": "/absolute/path/note.md",
            },
        )

        # Should contain error message about the failed operation
        assert len(move_result.content) == 1
        error_message = move_result.content[0].text
        assert "# Move Failed" in error_message
        assert "/absolute/path/note.md" in error_message


@pytest.mark.asyncio
async def test_move_note_error_handling_destination_exists(mcp_server, app, test_project):
    """Test error handling when destination file already exists."""

    async with Client(mcp_server) as client:
        # Create source note
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Source Note",
                "folder": "source",
                "content": "# Source Note\n\nThis is the source.",
                "tags": "test,source",
            },
        )

        # Create destination note that already exists at the exact path we'll try to move to
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Existing Note",
                "folder": "destination",
                "content": "# Existing Note\n\nThis already exists.",
                "tags": "test,existing",
            },
        )

        # Try to move source to existing destination (should fail) - should return error message
        move_result = await client.call_tool(
            "move_note",
            {
                "project": test_project.name,
                "identifier": "Source Note",
                "destination_path": "destination/Existing Note.md",  # Use exact existing file name
            },
        )

        # Should contain error message about the failed operation
        assert len(move_result.content) == 1
        error_message = move_result.content[0].text
        assert "# Move Failed" in error_message
        assert "already exists" in error_message


@pytest.mark.asyncio
async def test_move_note_preserves_search_functionality(mcp_server, app, test_project):
    """Test that moved notes remain searchable after move operation."""

    async with Client(mcp_server) as client:
        # Create a note with searchable content
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Searchable Note",
                "folder": "original",
                "content": """# Searchable Note

This note contains unique search terms:
- quantum mechanics
- artificial intelligence
- machine learning algorithms

## Features
- [technology] Advanced AI features
- [research] Quantum computing research

## Relations
- relates_to [[AI Research]]""",
                "tags": "search,test,move",
            },
        )

        # Verify note is searchable before move
        search_before = await client.call_tool(
            "search_notes",
            {
                "project": test_project.name,
                "query": "quantum mechanics",
            },
        )

        assert len(search_before.content) > 0
        assert "Searchable Note" in search_before.content[0].text

        # Move the note
        move_result = await client.call_tool(
            "move_note",
            {
                "project": test_project.name,
                "identifier": "Searchable Note",
                "destination_path": "research/quantum-ai-note.md",
            },
        )

        assert len(move_result.content) == 1
        move_text = move_result.content[0].text
        assert "✅ Note moved successfully" in move_text

        # Verify note is still searchable after move
        search_after = await client.call_tool(
            "search_notes",
            {
                "project": test_project.name,
                "query": "quantum mechanics",
            },
        )

        assert len(search_after.content) > 0
        search_text = search_after.content[0].text
        assert "quantum mechanics" in search_text
        assert "research/quantum-ai-note.md" in search_text or "quantum-ai-note" in search_text

        # Verify search by new location works
        search_by_path = await client.call_tool(
            "search_notes",
            {
                "project": test_project.name,
                "query": "research/quantum",
            },
        )

        assert len(search_by_path.content) > 0


@pytest.mark.asyncio
async def test_move_note_using_different_identifier_formats(mcp_server, app, test_project):
    """Test moving notes using different identifier formats (title, permalink, folder/title)."""

    async with Client(mcp_server) as client:
        # Create notes for different identifier tests
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Title ID Note",
                "folder": "test",
                "content": "# Title ID Note\n\nMove by title.",
                "tags": "test,identifier",
            },
        )

        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Permalink ID Note",
                "folder": "test",
                "content": "# Permalink ID Note\n\nMove by permalink.",
                "tags": "test,identifier",
            },
        )

        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Folder Title Note",
                "folder": "test",
                "content": "# Folder Title Note\n\nMove by folder/title.",
                "tags": "test,identifier",
            },
        )

        # Test moving by title
        move1 = await client.call_tool(
            "move_note",
            {
                "project": test_project.name,
                "identifier": "Title ID Note",  # by title
                "destination_path": "moved/title-moved.md",
            },
        )
        assert len(move1.content) == 1
        assert "✅ Note moved successfully" in move1.content[0].text

        # Test moving by permalink
        move2 = await client.call_tool(
            "move_note",
            {
                "project": test_project.name,
                "identifier": "test/permalink-id-note",  # by permalink
                "destination_path": "moved/permalink-moved.md",
            },
        )
        assert len(move2.content) == 1
        assert "✅ Note moved successfully" in move2.content[0].text

        # Test moving by folder/title format
        move3 = await client.call_tool(
            "move_note",
            {
                "project": test_project.name,
                "identifier": "test/Folder Title Note",  # by folder/title
                "destination_path": "moved/folder-title-moved.md",
            },
        )
        assert len(move3.content) == 1
        assert "✅ Note moved successfully" in move3.content[0].text

        # Verify all notes can be accessed at their new locations
        read1 = await client.call_tool(
            "read_note", {"project": test_project.name, "identifier": "moved/title-moved.md"}
        )
        assert "Move by title" in read1.content[0].text

        read2 = await client.call_tool(
            "read_note", {"project": test_project.name, "identifier": "moved/permalink-moved.md"}
        )
        assert "Move by permalink" in read2.content[0].text

        read3 = await client.call_tool(
            "read_note", {"project": test_project.name, "identifier": "moved/folder-title-moved.md"}
        )
        assert "Move by folder/title" in read3.content[0].text


@pytest.mark.asyncio
async def test_move_note_cross_project_detection(mcp_server, app, test_project):
    """Test cross-project move detection and helpful error messages."""

    async with Client(mcp_server) as client:
        # Create a test project to simulate cross-project scenario
        await client.call_tool(
            "create_memory_project",
            {
                "project_name": "test-project-b",
                "project_path": "/tmp/test-project-b",
                "set_default": False,
            },
        )

        # Create a note in the default project
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Cross Project Test Note",
                "folder": "source",
                "content": "# Cross Project Test Note\n\nThis note is in the default project.",
                "tags": "test,cross-project",
            },
        )

        # Try to move to a path that contains the other project name
        move_result = await client.call_tool(
            "move_note",
            {
                "project": test_project.name,
                "identifier": "Cross Project Test Note",
                "destination_path": "test-project-b/moved-note.md",
            },
        )

        # Should detect cross-project attempt and provide helpful guidance
        assert len(move_result.content) == 1
        error_message = move_result.content[0].text
        assert "Cross-Project Move Not Supported" in error_message
        assert "test-project-b" in error_message
        assert "read_note" in error_message
        assert "write_note" in error_message


@pytest.mark.asyncio
async def test_move_note_normal_moves_still_work(mcp_server, app, test_project):
    """Test that normal within-project moves still work after cross-project detection."""

    async with Client(mcp_server) as client:
        # Create a note
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Normal Move Note",
                "folder": "source",
                "content": "# Normal Move Note\n\nThis should move normally.",
                "tags": "test,normal-move",
            },
        )

        # Try a normal move that should work
        move_result = await client.call_tool(
            "move_note",
            {
                "project": test_project.name,
                "identifier": "Normal Move Note",
                "destination_path": "destination/normal-moved.md",
            },
        )

        # Should work normally
        assert len(move_result.content) == 1
        move_text = move_result.content[0].text
        assert "✅ Note moved successfully" in move_text
        assert "Normal Move Note" in move_text
        assert "destination/normal-moved.md" in move_text

        # Verify the note can be read from its new location
        read_result = await client.call_tool(
            "read_note",
            {
                "project": test_project.name,
                "identifier": "destination/normal-moved.md",
            },
        )

        content = read_result.content[0].text
        assert "This should move normally" in content

```

--------------------------------------------------------------------------------
/test-int/mcp/test_project_management_integration.py:
--------------------------------------------------------------------------------

```python
"""
Integration tests for project_management MCP tools.

Tests the complete project management workflow: MCP client -> MCP server -> FastAPI -> project service
"""

import pytest
from fastmcp import Client


@pytest.mark.asyncio
async def test_list_projects_basic_operation(mcp_server, app, test_project):
    """Test basic list_projects operation showing available projects."""

    async with Client(mcp_server) as client:
        # List all available projects
        list_result = await client.call_tool(
            "list_memory_projects",
            {},
        )

        # Should return formatted project list
        assert len(list_result.content) == 1
        list_text = list_result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]

        # Should show available projects with new session guidance format
        assert "Available projects:" in list_text
        assert "test-project" in list_text  # Our test project
        # Check for new session guidance instead of CLI default
        assert "Next: Ask which project to use for this session." in list_text
        assert "Session reminder: Track the selected project" in list_text


@pytest.mark.asyncio
async def test_project_management_workflow(mcp_server, app, test_project):
    """Test basic project management workflow."""

    async with Client(mcp_server) as client:
        # List all projects
        list_result = await client.call_tool("list_memory_projects", {})
        assert "Available projects:" in list_result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]
        assert "test-project" in list_result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]


@pytest.mark.asyncio
async def test_project_metadata_consistency(mcp_server, app, test_project):
    """Test that project management tools work correctly."""

    async with Client(mcp_server) as client:
        # Test basic project management tools

        # list_projects
        list_result = await client.call_tool("list_memory_projects", {})
        assert "Available projects:" in list_result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]
        assert "test-project" in list_result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]


@pytest.mark.asyncio
async def test_create_project_basic_operation(mcp_server, app, test_project):
    """Test creating a new project with basic parameters."""

    async with Client(mcp_server) as client:
        # Create a new project
        create_result = await client.call_tool(
            "create_memory_project",
            {
                "project_name": "test-new-project",
                "project_path": "/tmp/test-new-project",
            },
        )

        assert len(create_result.content) == 1
        create_text = create_result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]

        # Should show success message and project details
        assert "✓" in create_text  # Success indicator
        assert "test-new-project" in create_text
        assert "Project Details:" in create_text
        assert "Name: test-new-project" in create_text
        assert "Path: /tmp/test-new-project" in create_text
        assert "Project is now available for use" in create_text

        # Verify project appears in project list
        list_result = await client.call_tool("list_memory_projects", {})
        list_text = list_result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]
        assert "test-new-project" in list_text


@pytest.mark.asyncio
async def test_create_project_with_default_flag(mcp_server, app, test_project):
    """Test creating a project and setting it as default."""

    async with Client(mcp_server) as client:
        # Create a new project and set as default
        create_result = await client.call_tool(
            "create_memory_project",
            {
                "project_name": "test-default-project",
                "project_path": "/tmp/test-default-project",
                "set_default": True,
            },
        )

        assert len(create_result.content) == 1
        create_text = create_result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]

        # Should show success and default flag
        assert "✓" in create_text
        assert "test-default-project" in create_text
        assert "Set as default project" in create_text

        # Verify the new project is listed
        list_after_create = await client.call_tool("list_memory_projects", {})
        assert "test-default-project" in list_after_create.content[0].text  # pyright: ignore [reportAttributeAccessIssue]


@pytest.mark.asyncio
async def test_create_project_duplicate_name(mcp_server, app, test_project):
    """Test creating a project with duplicate name shows error."""

    async with Client(mcp_server) as client:
        # First create a project
        await client.call_tool(
            "create_memory_project",
            {
                "project_name": "duplicate-test",
                "project_path": "/tmp/duplicate-test-1",
            },
        )

        # Try to create another project with same name
        with pytest.raises(Exception) as exc_info:
            await client.call_tool(
                "create_memory_project",
                {
                    "project_name": "duplicate-test",
                    "project_path": "/tmp/duplicate-test-2",
                },
            )

        # Should show error about duplicate name
        error_message = str(exc_info.value)
        assert "create_memory_project" in error_message
        assert (
            "duplicate-test" in error_message
            or "already exists" in error_message
            or "Invalid request" in error_message
        )


@pytest.mark.asyncio
async def test_delete_project_basic_operation(mcp_server, app, test_project):
    """Test deleting a project that exists."""

    async with Client(mcp_server) as client:
        # First create a project to delete
        await client.call_tool(
            "create_memory_project",
            {
                "project_name": "to-be-deleted",
                "project_path": "/tmp/to-be-deleted",
            },
        )

        # Verify it exists
        list_result = await client.call_tool("list_memory_projects", {})
        assert "to-be-deleted" in list_result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]

        # Delete the project
        delete_result = await client.call_tool(
            "delete_project",
            {
                "project_name": "to-be-deleted",
            },
        )

        assert len(delete_result.content) == 1
        delete_text = delete_result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]

        # Should show success message
        assert "✓" in delete_text
        assert "to-be-deleted" in delete_text
        assert "removed successfully" in delete_text
        assert "Removed project details:" in delete_text
        assert "Name: to-be-deleted" in delete_text
        assert "Files remain on disk but project is no longer tracked" in delete_text

        # Verify project no longer appears in list
        list_result_after = await client.call_tool("list_memory_projects", {})
        assert "to-be-deleted" not in list_result_after.content[0].text  # pyright: ignore [reportAttributeAccessIssue]


@pytest.mark.asyncio
async def test_delete_project_not_found(mcp_server, app, test_project):
    """Test deleting a non-existent project shows error."""

    async with Client(mcp_server) as client:
        # Try to delete non-existent project
        with pytest.raises(Exception) as exc_info:
            await client.call_tool(
                "delete_project",
                {
                    "project_name": "non-existent-project",
                },
            )

        # Should show error about non-existent project
        error_message = str(exc_info.value)
        assert "delete_project" in error_message
        assert (
            "non-existent-project" in error_message
            or "not found" in error_message
            or "Invalid request" in error_message
        )


@pytest.mark.asyncio
async def test_delete_current_project_protection(mcp_server, app, test_project):
    """Test that deleting the current project is prevented."""

    async with Client(mcp_server) as client:
        # Try to delete the current project (test-project)
        with pytest.raises(Exception) as exc_info:
            await client.call_tool(
                "delete_project",
                {
                    "project_name": "test-project",
                },
            )

        # Should show error about deleting current project
        error_message = str(exc_info.value)
        assert "delete_project" in error_message
        assert (
            "currently active" in error_message
            or "test-project" in error_message
            or "Switch to a different project" in error_message
        )


@pytest.mark.asyncio
async def test_project_lifecycle_workflow(mcp_server, app, test_project):
    """Test complete project lifecycle: create, switch, use, delete."""

    async with Client(mcp_server) as client:
        project_name = "lifecycle-test"
        project_path = "/tmp/lifecycle-test"

        # 1. Create new project
        create_result = await client.call_tool(
            "create_memory_project",
            {
                "project_name": project_name,
                "project_path": project_path,
            },
        )
        assert "✓" in create_result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]
        assert project_name in create_result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]

        # 2. Create content in the new project
        await client.call_tool(
            "write_note",
            {
                "project": project_name,
                "title": "Lifecycle Test Note",
                "folder": "test",
                "content": "# Lifecycle Test\\n\\nThis note tests the project lifecycle.\\n\\n- [test] Lifecycle testing",
                "tags": "lifecycle,test",
            },
        )

        # 3. Verify the project exists in the list
        list_with_content = await client.call_tool("list_memory_projects", {})
        assert project_name in list_with_content.content[0].text  # pyright: ignore [reportAttributeAccessIssue]

        # 4. Verify we can still access the original test project
        test_list = await client.call_tool("list_memory_projects", {})
        assert "test-project" in test_list.content[0].text  # pyright: ignore [reportAttributeAccessIssue]

        # 5. Delete the lifecycle test project
        delete_result = await client.call_tool(
            "delete_project",
            {
                "project_name": project_name,
            },
        )
        assert "✓" in delete_result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]
        assert f"{project_name}" in delete_result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]
        assert "removed successfully" in delete_result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]

        # 6. Verify project is gone from list
        list_result = await client.call_tool("list_memory_projects", {})
        assert project_name not in list_result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]


@pytest.mark.asyncio
async def test_create_delete_project_edge_cases(mcp_server, app, test_project):
    """Test edge cases for create and delete project operations."""

    async with Client(mcp_server) as client:
        # Test with special characters and spaces in project name (should be handled gracefully)
        special_name = "test project with spaces & symbols!"

        # Create project with special characters
        create_result = await client.call_tool(
            "create_memory_project",
            {
                "project_name": special_name,
                "project_path": "/tmp/test-project-with-special-chars",
            },
        )
        assert "✓" in create_result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]
        assert special_name in create_result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]

        # Verify it appears in list
        list_result = await client.call_tool("list_memory_projects", {})
        assert special_name in list_result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]

        # Delete it
        delete_result = await client.call_tool(
            "delete_project",
            {
                "project_name": special_name,
            },
        )
        assert "✓" in delete_result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]
        assert special_name in delete_result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]

        # Verify it's gone
        list_result_after = await client.call_tool("list_memory_projects", {})
        assert special_name not in list_result_after.content[0].text  # pyright: ignore [reportAttributeAccessIssue]


@pytest.mark.asyncio
async def test_case_insensitive_project_switching(mcp_server, app, test_project):
    """Test case-insensitive project switching with proper database lookup."""

    async with Client(mcp_server) as client:
        # Create a project with mixed case name
        project_name = "Personal-Project"
        create_result = await client.call_tool(
            "create_memory_project",
            {
                "project_name": project_name,
                "project_path": f"/tmp/{project_name}",
            },
        )
        assert "✓" in create_result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]
        assert project_name in create_result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]

        # Verify project was created with canonical name
        list_result = await client.call_tool("list_memory_projects", {})
        assert project_name in list_result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]

        # Test with different case variations
        test_cases = [
            "personal-project",  # all lowercase
            "PERSONAL-PROJECT",  # all uppercase
            "Personal-project",  # mixed case 1
            "personal-Project",  # mixed case 2
        ]

        # Test that project operations work with case-insensitive input
        # (Project creation is case-preserving but operations can use different cases)

        # Test that we can reference the project with different cases in operations
        for test_input in test_cases:
            # Test write_note with case-insensitive project reference
            write_result = await client.call_tool(
                "write_note",
                {
                    "project": test_input,  # Use different case
                    "title": f"Case Test {test_input}",
                    "folder": "case-test",
                    "content": f"# Case Test\n\nTesting with {test_input}",
                },
            )
            assert len(write_result.content) == 1
            assert f"Case Test {test_input}".lower() in write_result.content[0].text.lower()  # pyright: ignore [reportAttributeAccessIssue]

        # Clean up
        await client.call_tool("delete_project", {"project_name": project_name})


@pytest.mark.asyncio
async def test_case_insensitive_project_operations(mcp_server, app, test_project):
    """Test that all project operations work correctly after case-insensitive switching."""

    async with Client(mcp_server) as client:
        # Create a project with capital letters
        project_name = "CamelCase-Project"
        create_result = await client.call_tool(
            "create_memory_project",
            {
                "project_name": project_name,
                "project_path": f"/tmp/{project_name}",
            },
        )
        assert "✓" in create_result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]

        # Test that MCP operations work correctly with the project

        # 1. Create a note in the project
        write_result = await client.call_tool(
            "write_note",
            {
                "project": project_name,
                "title": "Case Test Note",
                "folder": "case-test",
                "content": "# Case Test Note\n\nTesting case-insensitive operations.\n\n- [test] Case insensitive switch\n- relates_to [[Another Note]]",
                "tags": "case,test",
            },
        )
        assert len(write_result.content) == 1
        assert "Case Test Note" in write_result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]

        # 2. Test search works in the project
        search_result = await client.call_tool(
            "search_notes",
            {"project": project_name, "query": "case insensitive"},
        )
        assert len(search_result.content) == 1
        assert "Case Test Note" in search_result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]

        # 3. Test read_note works
        read_result = await client.call_tool(
            "read_note",
            {"project": project_name, "identifier": "Case Test Note"},
        )
        assert len(read_result.content) == 1
        assert "Case Test Note" in read_result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]
        assert "case insensitive" in read_result.content[0].text.lower()  # pyright: ignore [reportAttributeAccessIssue]

        # Clean up
        await client.call_tool("delete_project", {"project_name": project_name})


@pytest.mark.asyncio
async def test_case_insensitive_error_handling(mcp_server, app, test_project):
    """Test error handling for case-insensitive project operations."""

    async with Client(mcp_server) as client:
        # Test non-existent project with various cases
        non_existent_cases = [
            "NonExistent",
            "non-existent",
            "NON-EXISTENT",
            "Non-Existent-Project",
        ]

        # Test that operations fail gracefully with non-existent projects
        for test_case in non_existent_cases:
            # Test that write_note fails with non-existent project
            with pytest.raises(Exception):
                await client.call_tool(
                    "write_note",
                    {
                        "project": test_case,
                        "title": "Test Note",
                        "folder": "test",
                        "content": "# Test\n\nTest content.",
                    },
                )


@pytest.mark.asyncio
async def test_case_preservation_in_project_list(mcp_server, app, test_project):
    """Test that project names preserve their original case in listings."""

    async with Client(mcp_server) as client:
        # Create projects with different casing patterns
        test_projects = [
            "lowercase-project",
            "UPPERCASE-PROJECT",
            "CamelCase-Project",
            "Mixed-CASE-project",
        ]

        # Create all test projects
        for project_name in test_projects:
            await client.call_tool(
                "create_memory_project",
                {
                    "project_name": project_name,
                    "project_path": f"/tmp/{project_name}",
                },
            )

        # List projects and verify each appears with its original case
        list_result = await client.call_tool("list_memory_projects", {})
        list_text = list_result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]

        for project_name in test_projects:
            assert project_name in list_text, f"Project {project_name} not found in list"

        # Test each project with exact case (projects are case-sensitive)
        for project_name in test_projects:
            # Test write_note with exact project name
            write_result = await client.call_tool(
                "write_note",
                {
                    "project": project_name,  # Use exact project name
                    "title": f"Test Note {project_name}",
                    "folder": "test",
                    "content": f"# Test\n\nTesting {project_name}",
                },
            )
            assert len(write_result.content) == 1
            result_text = write_result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]
            assert "successfully" in result_text.lower() or "created" in result_text.lower()

        # Clean up - delete test projects
        for project_name in test_projects:
            await client.call_tool("delete_project", {"project_name": project_name})


@pytest.mark.asyncio
async def test_nested_project_paths_rejected(mcp_server, app, test_project):
    """Test that creating nested project paths is rejected with clear error message."""

    async with Client(mcp_server) as client:
        # Create a parent project
        parent_name = "parent-project"
        parent_path = "/tmp/nested-test/parent"

        await client.call_tool(
            "create_memory_project",
            {
                "project_name": parent_name,
                "project_path": parent_path,
            },
        )

        # Try to create a child project nested under the parent
        child_name = "child-project"
        child_path = "/tmp/nested-test/parent/child"

        with pytest.raises(Exception) as exc_info:
            await client.call_tool(
                "create_memory_project",
                {
                    "project_name": child_name,
                    "project_path": child_path,
                },
            )

        # Verify error message mentions nested paths
        error_message = str(exc_info.value)
        assert "nested" in error_message.lower()
        assert parent_name in error_message or parent_path in error_message

        # Clean up parent project
        await client.call_tool("delete_project", {"project_name": parent_name})

```

--------------------------------------------------------------------------------
/src/basic_memory/repository/search_repository.py:
--------------------------------------------------------------------------------

```python
"""Repository for search operations."""

import json
import re
import time
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Dict, List, Optional
from pathlib import Path

from loguru import logger
from sqlalchemy import Executable, Result, text
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker

from basic_memory import db
from basic_memory.models.search import CREATE_SEARCH_INDEX
from basic_memory.schemas.search import SearchItemType


@dataclass
class SearchIndexRow:
    """Search result with score and metadata."""

    project_id: int
    id: int
    type: str
    file_path: str

    # date values
    created_at: datetime
    updated_at: datetime

    permalink: Optional[str] = None
    metadata: Optional[dict] = None

    # assigned in result
    score: Optional[float] = None

    # Type-specific fields
    title: Optional[str] = None  # entity
    content_stems: Optional[str] = None  # entity, observation
    content_snippet: Optional[str] = None  # entity, observation
    entity_id: Optional[int] = None  # observations
    category: Optional[str] = None  # observations
    from_id: Optional[int] = None  # relations
    to_id: Optional[int] = None  # relations
    relation_type: Optional[str] = None  # relations

    @property
    def content(self):
        return self.content_snippet

    @property
    def directory(self) -> str:
        """Extract directory part from file_path.

        For a file at "projects/notes/ideas.md", returns "/projects/notes"
        For a file at root level "README.md", returns "/"
        """
        if not self.type == SearchItemType.ENTITY.value and not self.file_path:
            return ""

        # Normalize path separators to handle both Windows (\) and Unix (/) paths
        normalized_path = Path(self.file_path).as_posix()

        # Split the path by slashes
        parts = normalized_path.split("/")

        # If there's only one part (e.g., "README.md"), it's at the root
        if len(parts) <= 1:
            return "/"

        # Join all parts except the last one (filename)
        directory_path = "/".join(parts[:-1])
        return f"/{directory_path}"

    def to_insert(self):
        return {
            "id": self.id,
            "title": self.title,
            "content_stems": self.content_stems,
            "content_snippet": self.content_snippet,
            "permalink": self.permalink,
            "file_path": self.file_path,
            "type": self.type,
            "metadata": json.dumps(self.metadata),
            "from_id": self.from_id,
            "to_id": self.to_id,
            "relation_type": self.relation_type,
            "entity_id": self.entity_id,
            "category": self.category,
            "created_at": self.created_at if self.created_at else None,
            "updated_at": self.updated_at if self.updated_at else None,
            "project_id": self.project_id,
        }


class SearchRepository:
    """Repository for search index operations."""

    def __init__(self, session_maker: async_sessionmaker[AsyncSession], project_id: int):
        """Initialize with session maker and project_id filter.

        Args:
            session_maker: SQLAlchemy session maker
            project_id: Project ID to filter all operations by

        Raises:
            ValueError: If project_id is None or invalid
        """
        if project_id is None or project_id <= 0:  # pragma: no cover
            raise ValueError("A valid project_id is required for SearchRepository")

        self.session_maker = session_maker
        self.project_id = project_id

    async def init_search_index(self):
        """Create or recreate the search index."""
        logger.info("Initializing search index")
        try:
            async with db.scoped_session(self.session_maker) as session:
                await session.execute(CREATE_SEARCH_INDEX)
                await session.commit()
        except Exception as e:  # pragma: no cover
            logger.error(f"Error initializing search index: {e}")
            raise e

    def _prepare_boolean_query(self, query: str) -> str:
        """Prepare a Boolean query by quoting individual terms while preserving operators.

        Args:
            query: A Boolean query like "tier1-test AND unicode" or "(hello OR world) NOT test"

        Returns:
            A properly formatted Boolean query with quoted terms that need quoting
        """
        # Define Boolean operators and their boundaries
        boolean_pattern = r"(\bAND\b|\bOR\b|\bNOT\b)"

        # Split the query by Boolean operators, keeping the operators
        parts = re.split(boolean_pattern, query)

        processed_parts = []
        for part in parts:
            part = part.strip()
            if not part:
                continue

            # If it's a Boolean operator, keep it as is
            if part in ["AND", "OR", "NOT"]:
                processed_parts.append(part)
            else:
                # Handle parentheses specially - they should be preserved for grouping
                if "(" in part or ")" in part:
                    # Parse parenthetical expressions carefully
                    processed_part = self._prepare_parenthetical_term(part)
                    processed_parts.append(processed_part)
                else:
                    # This is a search term - for Boolean queries, don't add prefix wildcards
                    prepared_term = self._prepare_single_term(part, is_prefix=False)
                    processed_parts.append(prepared_term)

        return " ".join(processed_parts)

    def _prepare_parenthetical_term(self, term: str) -> str:
        """Prepare a term that contains parentheses, preserving the parentheses for grouping.

        Args:
            term: A term that may contain parentheses like "(hello" or "world)" or "(hello OR world)"

        Returns:
            A properly formatted term with parentheses preserved
        """
        # Handle terms that start/end with parentheses but may contain quotable content
        result = ""
        i = 0
        while i < len(term):
            if term[i] in "()":
                # Preserve parentheses as-is
                result += term[i]
                i += 1
            else:
                # Find the next parenthesis or end of string
                start = i
                while i < len(term) and term[i] not in "()":
                    i += 1

                # Extract the content between parentheses
                content = term[start:i].strip()
                if content:
                    # Only quote if it actually needs quoting (has hyphens, special chars, etc)
                    # but don't quote if it's just simple words
                    if self._needs_quoting(content):
                        escaped_content = content.replace('"', '""')
                        result += f'"{escaped_content}"'
                    else:
                        result += content

        return result

    def _needs_quoting(self, term: str) -> bool:
        """Check if a term needs to be quoted for FTS5 safety.

        Args:
            term: The term to check

        Returns:
            True if the term should be quoted
        """
        if not term or not term.strip():
            return False

        # Characters that indicate we should quote (excluding parentheses which are valid syntax)
        needs_quoting_chars = [
            " ",
            ".",
            ":",
            ";",
            ",",
            "<",
            ">",
            "?",
            "/",
            "-",
            "'",
            '"',
            "[",
            "]",
            "{",
            "}",
            "+",
            "!",
            "@",
            "#",
            "$",
            "%",
            "^",
            "&",
            "=",
            "|",
            "\\",
            "~",
            "`",
        ]

        return any(c in term for c in needs_quoting_chars)

    def _prepare_single_term(self, term: str, is_prefix: bool = True) -> str:
        """Prepare a single search term (no Boolean operators).

        Args:
            term: A single search term
            is_prefix: Whether to add prefix search capability (* suffix)

        Returns:
            A properly formatted single term
        """
        if not term or not term.strip():
            return term

        term = term.strip()

        # Check if term is already a proper wildcard pattern (alphanumeric + *)
        # e.g., "hello*", "test*world" - these should be left alone
        if "*" in term and all(c.isalnum() or c in "*_-" for c in term):
            return term

        # Characters that can cause FTS5 syntax errors when used as operators
        # We're more conservative here - only quote when we detect problematic patterns
        problematic_chars = [
            '"',
            "'",
            "(",
            ")",
            "[",
            "]",
            "{",
            "}",
            "+",
            "!",
            "@",
            "#",
            "$",
            "%",
            "^",
            "&",
            "=",
            "|",
            "\\",
            "~",
            "`",
        ]

        # Characters that indicate we should quote (spaces, dots, colons, etc.)
        # Adding hyphens here because FTS5 can have issues with hyphens followed by wildcards
        needs_quoting_chars = [" ", ".", ":", ";", ",", "<", ">", "?", "/", "-"]

        # Check if term needs quoting
        has_problematic = any(c in term for c in problematic_chars)
        has_spaces_or_special = any(c in term for c in needs_quoting_chars)

        if has_problematic or has_spaces_or_special:
            # Handle multi-word queries differently from special character queries
            if " " in term and not any(c in term for c in problematic_chars):
                # Check if any individual word contains special characters that need quoting
                words = term.strip().split()
                has_special_in_words = any(
                    any(c in word for c in needs_quoting_chars if c != " ") for word in words
                )

                if not has_special_in_words:
                    # For multi-word queries with simple words (like "emoji unicode"),
                    # use boolean AND to handle word order variations
                    if is_prefix:
                        # Add prefix wildcard to each word for better matching
                        prepared_words = [f"{word}*" for word in words if word]
                    else:
                        prepared_words = words
                    term = " AND ".join(prepared_words)
                else:
                    # If any word has special characters, quote the entire phrase
                    escaped_term = term.replace('"', '""')
                    if is_prefix and not ("/" in term and term.endswith(".md")):
                        term = f'"{escaped_term}"*'
                    else:
                        term = f'"{escaped_term}"'
            else:
                # For terms with problematic characters or file paths, use exact phrase matching
                # Escape any existing quotes by doubling them
                escaped_term = term.replace('"', '""')
                # Quote the entire term to handle special characters safely
                if is_prefix and not ("/" in term and term.endswith(".md")):
                    # For search terms (not file paths), add prefix matching
                    term = f'"{escaped_term}"*'
                else:
                    # For file paths, use exact matching
                    term = f'"{escaped_term}"'
        elif is_prefix:
            # Only add wildcard for simple terms without special characters
            term = f"{term}*"

        return term

    def _prepare_search_term(self, term: str, is_prefix: bool = True) -> str:
        """Prepare a search term for FTS5 query.

        Args:
            term: The search term to prepare
            is_prefix: Whether to add prefix search capability (* suffix)

        For FTS5:
        - Boolean operators (AND, OR, NOT) are preserved for complex queries
        - Terms with FTS5 special characters are quoted to prevent syntax errors
        - Simple terms get prefix wildcards for better matching
        """
        # Check for explicit boolean operators - if present, process as Boolean query
        boolean_operators = [" AND ", " OR ", " NOT "]
        if any(op in f" {term} " for op in boolean_operators):
            return self._prepare_boolean_query(term)

        # For non-Boolean queries, use the single term preparation logic
        return self._prepare_single_term(term, is_prefix)

    async def search(
        self,
        search_text: Optional[str] = None,
        permalink: Optional[str] = None,
        permalink_match: Optional[str] = None,
        title: Optional[str] = None,
        types: Optional[List[str]] = None,
        after_date: Optional[datetime] = None,
        search_item_types: Optional[List[SearchItemType]] = None,
        limit: int = 10,
        offset: int = 0,
    ) -> List[SearchIndexRow]:
        """Search across all indexed content with fuzzy matching."""
        conditions = []
        params = {}
        order_by_clause = ""

        # Handle text search for title and content
        if search_text:
            # Skip FTS for wildcard-only queries that would cause "unknown special query" errors
            if search_text.strip() == "*" or search_text.strip() == "":
                # For wildcard searches, don't add any text conditions - return all results
                pass
            else:
                # Use _prepare_search_term to handle both Boolean and non-Boolean queries
                processed_text = self._prepare_search_term(search_text.strip())
                params["text"] = processed_text
                conditions.append("(title MATCH :text OR content_stems MATCH :text)")

        # Handle title match search
        if title:
            title_text = self._prepare_search_term(title.strip(), is_prefix=False)
            params["title_text"] = title_text
            conditions.append("title MATCH :title_text")

        # Handle permalink exact search
        if permalink:
            params["permalink"] = permalink
            conditions.append("permalink = :permalink")

        # Handle permalink match search, supports *
        if permalink_match:
            # For GLOB patterns, don't use _prepare_search_term as it will quote slashes
            # GLOB patterns need to preserve their syntax
            permalink_text = permalink_match.lower().strip()
            params["permalink"] = permalink_text
            if "*" in permalink_match:
                conditions.append("permalink GLOB :permalink")
            else:
                # For exact matches without *, we can use FTS5 MATCH
                # but only prepare the term if it doesn't look like a path
                if "/" in permalink_text:
                    conditions.append("permalink = :permalink")
                else:
                    permalink_text = self._prepare_search_term(permalink_text, is_prefix=False)
                    params["permalink"] = permalink_text
                    conditions.append("permalink MATCH :permalink")

        # Handle entity type filter
        if search_item_types:
            type_list = ", ".join(f"'{t.value}'" for t in search_item_types)
            conditions.append(f"type IN ({type_list})")

        # Handle type filter
        if types:
            type_list = ", ".join(f"'{t}'" for t in types)
            conditions.append(f"json_extract(metadata, '$.entity_type') IN ({type_list})")

        # Handle date filter using datetime() for proper comparison
        if after_date:
            params["after_date"] = after_date
            conditions.append("datetime(created_at) > datetime(:after_date)")

            # order by most recent first
            order_by_clause = ", updated_at DESC"

        # Always filter by project_id
        params["project_id"] = self.project_id
        conditions.append("project_id = :project_id")

        # set limit on search query
        params["limit"] = limit
        params["offset"] = offset

        # Build WHERE clause
        where_clause = " AND ".join(conditions) if conditions else "1=1"

        sql = f"""
            SELECT 
                project_id,
                id, 
                title, 
                permalink,
                file_path,
                type,
                metadata,
                from_id,
                to_id,
                relation_type,
                entity_id,
                content_snippet,
                category,
                created_at,
                updated_at,
                bm25(search_index) as score
            FROM search_index 
            WHERE {where_clause}
            ORDER BY score ASC {order_by_clause}
            LIMIT :limit
            OFFSET :offset
        """

        logger.trace(f"Search {sql} params: {params}")
        try:
            async with db.scoped_session(self.session_maker) as session:
                result = await session.execute(text(sql), params)
                rows = result.fetchall()
        except Exception as e:
            # Handle FTS5 syntax errors and provide user-friendly feedback
            if "fts5: syntax error" in str(e).lower():  # pragma: no cover
                logger.warning(f"FTS5 syntax error for search term: {search_text}, error: {e}")
                # Return empty results rather than crashing
                return []
            else:
                # Re-raise other database errors
                logger.error(f"Database error during search: {e}")
                raise

        results = [
            SearchIndexRow(
                project_id=self.project_id,
                id=row.id,
                title=row.title,
                permalink=row.permalink,
                file_path=row.file_path,
                type=row.type,
                score=row.score,
                metadata=json.loads(row.metadata),
                from_id=row.from_id,
                to_id=row.to_id,
                relation_type=row.relation_type,
                entity_id=row.entity_id,
                content_snippet=row.content_snippet,
                category=row.category,
                created_at=row.created_at,
                updated_at=row.updated_at,
            )
            for row in rows
        ]

        logger.trace(f"Found {len(results)} search results")
        for r in results:
            logger.trace(
                f"Search result: project_id: {r.project_id} type:{r.type} title: {r.title} permalink: {r.permalink} score: {r.score}"
            )

        return results

    async def index_item(
        self,
        search_index_row: SearchIndexRow,
    ):
        """Index or update a single item."""
        async with db.scoped_session(self.session_maker) as session:
            # Delete existing record if any
            await session.execute(
                text(
                    "DELETE FROM search_index WHERE permalink = :permalink AND project_id = :project_id"
                ),
                {"permalink": search_index_row.permalink, "project_id": self.project_id},
            )

            # Prepare data for insert with project_id
            insert_data = search_index_row.to_insert()
            insert_data["project_id"] = self.project_id

            # Insert new record
            await session.execute(
                text("""
                    INSERT INTO search_index (
                        id, title, content_stems, content_snippet, permalink, file_path, type, metadata,
                        from_id, to_id, relation_type,
                        entity_id, category,
                        created_at, updated_at,
                        project_id
                    ) VALUES (
                        :id, :title, :content_stems, :content_snippet, :permalink, :file_path, :type, :metadata,
                        :from_id, :to_id, :relation_type,
                        :entity_id, :category,
                        :created_at, :updated_at,
                        :project_id
                    )
                """),
                insert_data,
            )
            logger.debug(f"indexed row {search_index_row}")
            await session.commit()

    async def bulk_index_items(self, search_index_rows: List[SearchIndexRow]):
        """Index multiple items in a single batch operation.

        Note: This method assumes that any existing records for the entity_id
        have already been deleted (typically via delete_by_entity_id).

        Args:
            search_index_rows: List of SearchIndexRow objects to index
        """
        if not search_index_rows:
            return

        async with db.scoped_session(self.session_maker) as session:
            # Prepare all insert data with project_id
            insert_data_list = []
            for row in search_index_rows:
                insert_data = row.to_insert()
                insert_data["project_id"] = self.project_id
                insert_data_list.append(insert_data)

            # Batch insert all records using executemany
            await session.execute(
                text("""
                    INSERT INTO search_index (
                        id, title, content_stems, content_snippet, permalink, file_path, type, metadata,
                        from_id, to_id, relation_type,
                        entity_id, category,
                        created_at, updated_at,
                        project_id
                    ) VALUES (
                        :id, :title, :content_stems, :content_snippet, :permalink, :file_path, :type, :metadata,
                        :from_id, :to_id, :relation_type,
                        :entity_id, :category,
                        :created_at, :updated_at,
                        :project_id
                    )
                """),
                insert_data_list,
            )
            logger.debug(f"Bulk indexed {len(search_index_rows)} rows")
            await session.commit()

    async def delete_by_entity_id(self, entity_id: int):
        """Delete an item from the search index by entity_id."""
        async with db.scoped_session(self.session_maker) as session:
            await session.execute(
                text(
                    "DELETE FROM search_index WHERE entity_id = :entity_id AND project_id = :project_id"
                ),
                {"entity_id": entity_id, "project_id": self.project_id},
            )
            await session.commit()

    async def delete_by_permalink(self, permalink: str):
        """Delete an item from the search index."""
        async with db.scoped_session(self.session_maker) as session:
            await session.execute(
                text(
                    "DELETE FROM search_index WHERE permalink = :permalink AND project_id = :project_id"
                ),
                {"permalink": permalink, "project_id": self.project_id},
            )
            await session.commit()

    async def execute_query(
        self,
        query: Executable,
        params: Dict[str, Any],
    ) -> Result[Any]:
        """Execute a query asynchronously."""
        # logger.debug(f"Executing query: {query}, params: {params}")
        async with db.scoped_session(self.session_maker) as session:
            start_time = time.perf_counter()
            result = await session.execute(query, params)
            end_time = time.perf_counter()
            elapsed_time = end_time - start_time
            logger.debug(f"Query executed successfully in {elapsed_time:.2f}s.")
            return result

```

--------------------------------------------------------------------------------
/tests/services/test_search_service.py:
--------------------------------------------------------------------------------

```python
"""Tests for search service."""

from datetime import datetime

import pytest
from sqlalchemy import text

from basic_memory import db
from basic_memory.schemas.search import SearchQuery, SearchItemType


@pytest.mark.asyncio
async def test_search_permalink(search_service, test_graph):
    """Exact permalink"""
    results = await search_service.search(SearchQuery(permalink="test/root"))
    assert len(results) == 1

    for r in results:
        assert "test/root" in r.permalink


@pytest.mark.asyncio
async def test_search_limit_offset(search_service, test_graph):
    """Exact permalink"""
    results = await search_service.search(SearchQuery(permalink_match="test/*"))
    assert len(results) > 1

    results = await search_service.search(SearchQuery(permalink_match="test/*"), limit=1)
    assert len(results) == 1

    results = await search_service.search(SearchQuery(permalink_match="test/*"), limit=100)
    num_results = len(results)

    # assert offset
    offset_results = await search_service.search(
        SearchQuery(permalink_match="test/*"), limit=100, offset=1
    )
    assert len(offset_results) == num_results - 1


@pytest.mark.asyncio
async def test_search_permalink_observations_wildcard(search_service, test_graph):
    """Pattern matching"""
    results = await search_service.search(SearchQuery(permalink_match="test/root/observations/*"))
    assert len(results) == 2
    permalinks = {r.permalink for r in results}
    assert "test/root/observations/note/root-note-1" in permalinks
    assert "test/root/observations/tech/root-tech-note" in permalinks


@pytest.mark.asyncio
async def test_search_permalink_relation_wildcard(search_service, test_graph):
    """Pattern matching"""
    results = await search_service.search(SearchQuery(permalink_match="test/root/connects-to/*"))
    assert len(results) == 1
    permalinks = {r.permalink for r in results}
    assert "test/root/connects-to/test/connected-entity-1" in permalinks


@pytest.mark.asyncio
async def test_search_permalink_wildcard2(search_service, test_graph):
    """Pattern matching"""
    results = await search_service.search(
        SearchQuery(
            permalink_match="test/connected*",
        )
    )
    assert len(results) >= 2
    permalinks = {r.permalink for r in results}
    assert "test/connected-entity-1" in permalinks
    assert "test/connected-entity-2" in permalinks


@pytest.mark.asyncio
async def test_search_text(search_service, test_graph):
    """Full-text search"""
    results = await search_service.search(
        SearchQuery(text="Root Entity", entity_types=[SearchItemType.ENTITY])
    )
    assert len(results) >= 1
    assert results[0].permalink == "test/root"


@pytest.mark.asyncio
async def test_search_title(search_service, test_graph):
    """Title only search"""
    results = await search_service.search(
        SearchQuery(title="Root", entity_types=[SearchItemType.ENTITY])
    )
    assert len(results) >= 1
    assert results[0].permalink == "test/root"


@pytest.mark.asyncio
async def test_text_search_case_insensitive(search_service, test_graph):
    """Test text search functionality."""
    # Case insensitive
    results = await search_service.search(SearchQuery(text="ENTITY"))
    assert any("test/root" in r.permalink for r in results)


@pytest.mark.asyncio
async def test_text_search_content_word_match(search_service, test_graph):
    """Test text search functionality."""

    # content word match
    results = await search_service.search(SearchQuery(text="Connected"))
    assert len(results) > 0
    assert any(r.file_path == "test/Connected Entity 2.md" for r in results)


@pytest.mark.asyncio
async def test_text_search_multiple_terms(search_service, test_graph):
    """Test text search functionality."""

    # Multiple terms
    results = await search_service.search(SearchQuery(text="root note"))
    assert any("test/root" in r.permalink for r in results)


@pytest.mark.asyncio
async def test_pattern_matching(search_service, test_graph):
    """Test pattern matching with various wildcards."""
    # Test wildcards
    results = await search_service.search(SearchQuery(permalink_match="test/*"))
    for r in results:
        assert "test/" in r.permalink

    # Test start wildcards
    results = await search_service.search(SearchQuery(permalink_match="*/observations"))
    for r in results:
        assert "/observations" in r.permalink

    # Test permalink partial match
    results = await search_service.search(SearchQuery(permalink_match="test"))
    for r in results:
        assert "test/" in r.permalink


@pytest.mark.asyncio
async def test_filters(search_service, test_graph):
    """Test search filters."""
    # Combined filters
    results = await search_service.search(
        SearchQuery(text="Deep", entity_types=[SearchItemType.ENTITY], types=["deep"])
    )
    assert len(results) == 1
    for r in results:
        assert r.type == SearchItemType.ENTITY
        assert r.metadata.get("entity_type") == "deep"


@pytest.mark.asyncio
async def test_after_date(search_service, test_graph):
    """Test search filters."""

    # Should find with past date
    past_date = datetime(2020, 1, 1).astimezone()
    results = await search_service.search(
        SearchQuery(
            text="entity",
            after_date=past_date.isoformat(),
        )
    )
    for r in results:
        assert datetime.fromisoformat(r.created_at) > past_date

    # Should not find with future date
    future_date = datetime(2030, 1, 1).astimezone()
    results = await search_service.search(
        SearchQuery(
            text="entity",
            after_date=future_date.isoformat(),
        )
    )
    assert len(results) == 0


@pytest.mark.asyncio
async def test_search_type(search_service, test_graph):
    """Test search filters."""

    # Should find only type
    results = await search_service.search(SearchQuery(types=["test"]))
    assert len(results) > 0
    for r in results:
        assert r.type == SearchItemType.ENTITY


@pytest.mark.asyncio
async def test_search_entity_type(search_service, test_graph):
    """Test search filters."""

    # Should find only type
    results = await search_service.search(SearchQuery(entity_types=[SearchItemType.ENTITY]))
    assert len(results) > 0
    for r in results:
        assert r.type == SearchItemType.ENTITY


@pytest.mark.asyncio
async def test_extract_entity_tags_exception_handling(search_service):
    """Test the _extract_entity_tags method exception handling (lines 147-151)."""
    from basic_memory.models.knowledge import Entity

    # Create entity with string tags that will cause parsing to fail and fall back to single tag
    entity_with_invalid_tags = Entity(
        title="Test Entity",
        entity_type="test",
        entity_metadata={"tags": "just a string"},  # This will fail ast.literal_eval
        content_type="text/markdown",
        file_path="test/test-entity.md",
        project_id=1,
    )

    # This should trigger the except block on lines 147-149
    result = search_service._extract_entity_tags(entity_with_invalid_tags)
    assert result == ["just a string"]

    # Test with empty string (should return empty list) - covers line 149
    entity_with_empty_tags = Entity(
        title="Test Entity Empty",
        entity_type="test",
        entity_metadata={"tags": ""},
        content_type="text/markdown",
        file_path="test/test-entity-empty.md",
        project_id=1,
    )

    result = search_service._extract_entity_tags(entity_with_empty_tags)
    assert result == []


@pytest.mark.asyncio
async def test_delete_entity_without_permalink(search_service, sample_entity):
    """Test deleting an entity that has no permalink (edge case)."""

    # Set the entity permalink to None to trigger the else branch on line 355
    sample_entity.permalink = None

    # This should trigger the delete_by_entity_id path (line 355) in handle_delete
    await search_service.handle_delete(sample_entity)


@pytest.mark.asyncio
async def test_no_criteria(search_service, test_graph):
    """Test search with no criteria returns empty list."""
    results = await search_service.search(SearchQuery())
    assert len(results) == 0


@pytest.mark.asyncio
async def test_init_search_index(search_service, session_maker):
    """Test search index initialization."""
    async with db.scoped_session(session_maker) as session:
        result = await session.execute(
            text("SELECT name FROM sqlite_master WHERE type='table' AND name='search_index';")
        )
        assert result.scalar() == "search_index"


@pytest.mark.asyncio
async def test_update_index(search_service, full_entity):
    """Test updating indexed content."""
    await search_service.index_entity(full_entity)

    # Update entity
    full_entity.title = "OMG I AM UPDATED"
    await search_service.index_entity(full_entity)

    # Search for new title
    results = await search_service.search(SearchQuery(text="OMG I AM UPDATED"))
    assert len(results) > 1


@pytest.mark.asyncio
async def test_boolean_and_search(search_service, test_graph):
    """Test boolean AND search."""
    # Create an entity with specific terms for testing
    # This assumes the test_graph fixture already has entities with relevant terms

    # Test AND operator - both terms must be present
    results = await search_service.search(SearchQuery(text="Root AND Entity"))
    assert len(results) >= 1

    # Verify the result contains both terms
    found = False
    for result in results:
        if (result.title and "Root" in result.title and "Entity" in result.title) or (
            result.content_snippet
            and "Root" in result.content_snippet
            and "Entity" in result.content_snippet
        ):
            found = True
            break
    assert found, "Boolean AND search failed to find items containing both terms"

    # Verify that items with only one term are not returned
    results = await search_service.search(SearchQuery(text="NonexistentTerm AND Root"))
    assert len(results) == 0, "Boolean AND search returned results when it shouldn't have"


@pytest.mark.asyncio
async def test_boolean_or_search(search_service, test_graph):
    """Test boolean OR search."""
    # Test OR operator - either term can be present
    results = await search_service.search(SearchQuery(text="Root OR Connected"))

    # Should find both "Root Entity" and "Connected Entity"
    assert len(results) >= 2

    # Verify we find items with either term
    root_found = False
    connected_found = False

    for result in results:
        if result.permalink == "test/root":
            root_found = True
        elif "connected" in result.permalink.lower():
            connected_found = True

    assert root_found, "Boolean OR search failed to find 'Root' term"
    assert connected_found, "Boolean OR search failed to find 'Connected' term"


@pytest.mark.asyncio
async def test_boolean_not_search(search_service, test_graph):
    """Test boolean NOT search."""
    # Test NOT operator - exclude certain terms
    results = await search_service.search(SearchQuery(text="Entity NOT Connected"))

    # Should find "Root Entity" but not "Connected Entity"
    for result in results:
        assert "connected" not in result.permalink.lower(), (
            "Boolean NOT search returned excluded term"
        )


@pytest.mark.asyncio
async def test_boolean_group_search(search_service, test_graph):
    """Test boolean grouping with parentheses."""
    # Test grouping - (A OR B) AND C
    results = await search_service.search(SearchQuery(title="(Root OR Connected) AND Entity"))

    # Should find both entities that contain "Entity" and either "Root" or "Connected"
    assert len(results) >= 2

    for result in results:
        # Each result should contain "Entity" and either "Root" or "Connected"
        contains_entity = "entity" in result.title.lower()
        contains_root_or_connected = (
            "root" in result.title.lower() or "connected" in result.title.lower()
        )

        assert contains_entity and contains_root_or_connected, (
            "Boolean grouped search returned incorrect results"
        )


@pytest.mark.asyncio
async def test_boolean_operators_detection(search_service):
    """Test detection of boolean operators in query."""
    # Test various queries that should be detected as boolean
    boolean_queries = [
        "term1 AND term2",
        "term1 OR term2",
        "term1 NOT term2",
        "(term1 OR term2) AND term3",
        "complex (nested OR grouping) AND term",
    ]

    for query_text in boolean_queries:
        query = SearchQuery(text=query_text)
        assert query.has_boolean_operators(), f"Failed to detect boolean operators in: {query_text}"

    # Test queries that should not be detected as boolean
    non_boolean_queries = [
        "normal search query",
        "brand name",  # Should not detect "AND" within "brand"
        "understand this concept",  # Should not detect "AND" within "understand"
        "command line",
        "sandbox testing",
    ]

    for query_text in non_boolean_queries:
        query = SearchQuery(text=query_text)
        assert not query.has_boolean_operators(), (
            f"Incorrectly detected boolean operators in: {query_text}"
        )


# Tests for frontmatter tag search functionality


@pytest.mark.asyncio
async def test_extract_entity_tags_list_format(search_service, session_maker):
    """Test tag extraction from list format in entity metadata."""
    from basic_memory.models import Entity

    entity = Entity(
        title="Test Entity",
        entity_type="note",
        entity_metadata={"tags": ["business", "strategy", "planning"]},
        content_type="text/markdown",
        file_path="test/business-strategy.md",
        project_id=1,
    )

    tags = search_service._extract_entity_tags(entity)
    assert tags == ["business", "strategy", "planning"]


@pytest.mark.asyncio
async def test_extract_entity_tags_string_format(search_service, session_maker):
    """Test tag extraction from string format in entity metadata."""
    from basic_memory.models import Entity

    entity = Entity(
        title="Test Entity",
        entity_type="note",
        entity_metadata={"tags": "['documentation', 'tools', 'best-practices']"},
        content_type="text/markdown",
        file_path="test/docs.md",
        project_id=1,
    )

    tags = search_service._extract_entity_tags(entity)
    assert tags == ["documentation", "tools", "best-practices"]


@pytest.mark.asyncio
async def test_extract_entity_tags_empty_list(search_service, session_maker):
    """Test tag extraction from empty list in entity metadata."""
    from basic_memory.models import Entity

    entity = Entity(
        title="Test Entity",
        entity_type="note",
        entity_metadata={"tags": []},
        content_type="text/markdown",
        file_path="test/empty-tags.md",
        project_id=1,
    )

    tags = search_service._extract_entity_tags(entity)
    assert tags == []


@pytest.mark.asyncio
async def test_extract_entity_tags_empty_string(search_service, session_maker):
    """Test tag extraction from empty string in entity metadata."""
    from basic_memory.models import Entity

    entity = Entity(
        title="Test Entity",
        entity_type="note",
        entity_metadata={"tags": "[]"},
        content_type="text/markdown",
        file_path="test/empty-string-tags.md",
        project_id=1,
    )

    tags = search_service._extract_entity_tags(entity)
    assert tags == []


@pytest.mark.asyncio
async def test_extract_entity_tags_no_metadata(search_service, session_maker):
    """Test tag extraction when entity has no metadata."""
    from basic_memory.models import Entity

    entity = Entity(
        title="Test Entity",
        entity_type="note",
        entity_metadata=None,
        content_type="text/markdown",
        file_path="test/no-metadata.md",
        project_id=1,
    )

    tags = search_service._extract_entity_tags(entity)
    assert tags == []


@pytest.mark.asyncio
async def test_extract_entity_tags_no_tags_key(search_service, session_maker):
    """Test tag extraction when metadata exists but has no tags key."""
    from basic_memory.models import Entity

    entity = Entity(
        title="Test Entity",
        entity_type="note",
        entity_metadata={"title": "Some Title", "type": "note"},
        content_type="text/markdown",
        file_path="test/no-tags-key.md",
        project_id=1,
    )

    tags = search_service._extract_entity_tags(entity)
    assert tags == []


@pytest.mark.asyncio
async def test_search_by_frontmatter_tags(search_service, session_maker, test_project):
    """Test that entities can be found by searching for their frontmatter tags."""
    from basic_memory.repository import EntityRepository
    from unittest.mock import AsyncMock

    entity_repo = EntityRepository(session_maker, project_id=test_project.id)

    # Create entity with tags
    from datetime import datetime

    entity_data = {
        "title": "Business Strategy Guide",
        "entity_type": "note",
        "entity_metadata": {"tags": ["business", "strategy", "planning", "organization"]},
        "content_type": "text/markdown",
        "file_path": "guides/business-strategy.md",
        "permalink": "guides/business-strategy",
        "project_id": test_project.id,
        "created_at": datetime.now(),
        "updated_at": datetime.now(),
    }

    entity = await entity_repo.create(entity_data)

    # Mock file service to avoid file I/O
    search_service.file_service.read_entity_content = AsyncMock(return_value="")

    await search_service.index_entity(entity)

    # Search for entities by tag
    results = await search_service.search(SearchQuery(text="business"))
    assert len(results) >= 1

    # Check that our entity is in the results
    entity_found = False
    for result in results:
        if result.title == "Business Strategy Guide":
            entity_found = True
            break
    assert entity_found, "Entity with 'business' tag should be found in search results"

    # Test searching by another tag
    results = await search_service.search(SearchQuery(text="planning"))
    assert len(results) >= 1

    entity_found = False
    for result in results:
        if result.title == "Business Strategy Guide":
            entity_found = True
            break
    assert entity_found, "Entity with 'planning' tag should be found in search results"


@pytest.mark.asyncio
async def test_search_by_frontmatter_tags_string_format(
    search_service, session_maker, test_project
):
    """Test that entities with string format tags can be found in search."""
    from basic_memory.repository import EntityRepository
    from unittest.mock import AsyncMock

    entity_repo = EntityRepository(session_maker, project_id=test_project.id)

    # Create entity with tags in string format
    from datetime import datetime

    entity_data = {
        "title": "Documentation Guidelines",
        "entity_type": "note",
        "entity_metadata": {"tags": "['documentation', 'tools', 'best-practices']"},
        "content_type": "text/markdown",
        "file_path": "guides/documentation.md",
        "permalink": "guides/documentation",
        "project_id": test_project.id,
        "created_at": datetime.now(),
        "updated_at": datetime.now(),
    }

    entity = await entity_repo.create(entity_data)

    # Mock file service to avoid file I/O
    search_service.file_service.read_entity_content = AsyncMock(return_value="")

    await search_service.index_entity(entity)

    # Search for entities by tag
    results = await search_service.search(SearchQuery(text="documentation"))
    assert len(results) >= 1

    # Check that our entity is in the results
    entity_found = False
    for result in results:
        if result.title == "Documentation Guidelines":
            entity_found = True
            break
    assert entity_found, "Entity with 'documentation' tag should be found in search results"


@pytest.mark.asyncio
async def test_search_special_characters_in_title(search_service, session_maker, test_project):
    """Test that entities with special characters in titles can be searched without FTS5 syntax errors."""
    from basic_memory.repository import EntityRepository
    from unittest.mock import AsyncMock

    entity_repo = EntityRepository(session_maker, project_id=test_project.id)

    # Create entities with special characters that could cause FTS5 syntax errors
    special_titles = [
        "Note with spaces",
        "Note-with-dashes",
        "Note_with_underscores",
        "Note (with parentheses)",  # This is the problematic one
        "Note & Symbols!",
        "Note [with brackets]",
        "Note {with braces}",
        'Note "with quotes"',
        "Note 'with apostrophes'",
    ]

    entities = []
    for i, title in enumerate(special_titles):
        from datetime import datetime

        entity_data = {
            "title": title,
            "entity_type": "note",
            "entity_metadata": {"tags": ["special", "characters"]},
            "content_type": "text/markdown",
            "file_path": f"special/{title}.md",
            "permalink": f"special/note-{i}",
            "project_id": test_project.id,
            "created_at": datetime.now(),
            "updated_at": datetime.now(),
        }

        entity = await entity_repo.create(entity_data)
        entities.append(entity)

    # Mock file service to avoid file I/O
    search_service.file_service.read_entity_content = AsyncMock(return_value="")

    # Index all entities
    for entity in entities:
        await search_service.index_entity(entity)

    # Test searching for each title - this should not cause FTS5 syntax errors
    for title in special_titles:
        results = await search_service.search(SearchQuery(title=title))

        # Should find the entity without throwing FTS5 syntax errors
        entity_found = False
        for result in results:
            if result.title == title:
                entity_found = True
                break

        assert entity_found, f"Entity with title '{title}' should be found in search results"


@pytest.mark.asyncio
async def test_search_title_with_parentheses_specific(search_service, session_maker, test_project):
    """Test searching specifically for title with parentheses to reproduce FTS5 error."""
    from basic_memory.repository import EntityRepository
    from unittest.mock import AsyncMock

    entity_repo = EntityRepository(session_maker, project_id=test_project.id)

    # Create the problematic entity
    from datetime import datetime

    entity_data = {
        "title": "Note (with parentheses)",
        "entity_type": "note",
        "entity_metadata": {"tags": ["test"]},
        "content_type": "text/markdown",
        "file_path": "special/Note (with parentheses).md",
        "permalink": "special/note-with-parentheses",
        "project_id": test_project.id,
        "created_at": datetime.now(),
        "updated_at": datetime.now(),
    }

    entity = await entity_repo.create(entity_data)

    # Mock file service to avoid file I/O
    search_service.file_service.read_entity_content = AsyncMock(return_value="")

    # Index the entity
    await search_service.index_entity(entity)

    # Test searching for the title - this should not cause FTS5 syntax errors
    search_query = SearchQuery(title="Note (with parentheses)")
    results = await search_service.search(search_query)

    # Should find the entity without throwing FTS5 syntax errors
    assert len(results) >= 1
    assert any(result.title == "Note (with parentheses)" for result in results)


@pytest.mark.asyncio
async def test_search_title_via_repository_direct(search_service, session_maker, test_project):
    """Test searching via search repository directly to isolate the FTS5 error."""
    from basic_memory.repository import EntityRepository
    from unittest.mock import AsyncMock

    entity_repo = EntityRepository(session_maker, project_id=test_project.id)

    # Create the problematic entity
    from datetime import datetime

    entity_data = {
        "title": "Note (with parentheses)",
        "entity_type": "note",
        "entity_metadata": {"tags": ["test"]},
        "content_type": "text/markdown",
        "file_path": "special/Note (with parentheses).md",
        "permalink": "special/note-with-parentheses",
        "project_id": test_project.id,
        "created_at": datetime.now(),
        "updated_at": datetime.now(),
    }

    entity = await entity_repo.create(entity_data)

    # Mock file service to avoid file I/O
    search_service.file_service.read_entity_content = AsyncMock(return_value="")

    # Index the entity
    await search_service.index_entity(entity)

    # Test searching via repository directly - this reproduces the error path
    results = await search_service.repository.search(
        title="Note (with parentheses)",
        limit=10,
        offset=0,
    )

    # Should find the entity without throwing FTS5 syntax errors
    assert len(results) >= 1
    assert any(result.title == "Note (with parentheses)" for result in results)

```

--------------------------------------------------------------------------------
/tests/repository/test_search_repository.py:
--------------------------------------------------------------------------------

```python
"""Tests for the SearchRepository."""

from datetime import datetime, timezone

import pytest
import pytest_asyncio
from sqlalchemy import text

from basic_memory import db
from basic_memory.models import Entity
from basic_memory.models.project import Project
from basic_memory.repository.search_repository import SearchRepository, SearchIndexRow
from basic_memory.schemas.search import SearchItemType


@pytest_asyncio.fixture
async def search_entity(session_maker, test_project: Project):
    """Create a test entity for search testing."""
    async with db.scoped_session(session_maker) as session:
        entity = Entity(
            project_id=test_project.id,
            title="Search Test Entity",
            entity_type="test",
            permalink="test/search-test-entity",
            file_path="test/search_test_entity.md",
            content_type="text/markdown",
            created_at=datetime.now(timezone.utc),
            updated_at=datetime.now(timezone.utc),
        )
        session.add(entity)
        await session.flush()
        return entity


@pytest_asyncio.fixture
async def second_project(project_repository):
    """Create a second project for testing project isolation."""
    project_data = {
        "name": "Second Test Project",
        "description": "Another project for testing",
        "path": "/second/project/path",
        "is_active": True,
        "is_default": None,
    }
    return await project_repository.create(project_data)


@pytest_asyncio.fixture
async def second_project_repository(session_maker, second_project):
    """Create a repository for the second project."""
    return SearchRepository(session_maker, project_id=second_project.id)


@pytest_asyncio.fixture
async def second_entity(session_maker, second_project: Project):
    """Create a test entity in the second project."""
    async with db.scoped_session(session_maker) as session:
        entity = Entity(
            project_id=second_project.id,
            title="Second Project Entity",
            entity_type="test",
            permalink="test/second-project-entity",
            file_path="test/second_project_entity.md",
            content_type="text/markdown",
            created_at=datetime.now(timezone.utc),
            updated_at=datetime.now(timezone.utc),
        )
        session.add(entity)
        await session.flush()
        return entity


@pytest.mark.asyncio
async def test_init_search_index(search_repository):
    """Test that search index can be initialized."""
    await search_repository.init_search_index()

    # Verify search_index table exists
    async with db.scoped_session(search_repository.session_maker) as session:
        result = await session.execute(
            text("SELECT name FROM sqlite_master WHERE type='table' AND name='search_index';")
        )
        assert result.scalar() == "search_index"


@pytest.mark.asyncio
async def test_index_item(search_repository, search_entity):
    """Test indexing an item with project_id."""
    # Create search index row for the entity
    search_row = SearchIndexRow(
        id=search_entity.id,
        type=SearchItemType.ENTITY.value,
        title=search_entity.title,
        content_stems="search test entity content",
        content_snippet="This is a test entity for search",
        permalink=search_entity.permalink,
        file_path=search_entity.file_path,
        entity_id=search_entity.id,
        metadata={"entity_type": search_entity.entity_type},
        created_at=search_entity.created_at,
        updated_at=search_entity.updated_at,
        project_id=search_repository.project_id,
    )

    # Index the item
    await search_repository.index_item(search_row)

    # Search for the item
    results = await search_repository.search(search_text="search test")

    # Verify we found the item
    assert len(results) == 1
    assert results[0].title == search_entity.title
    assert results[0].project_id == search_repository.project_id


@pytest.mark.asyncio
async def test_project_isolation(
    search_repository, second_project_repository, search_entity, second_entity
):
    """Test that search is isolated by project."""
    # Index entities in both projects
    search_row1 = SearchIndexRow(
        id=search_entity.id,
        type=SearchItemType.ENTITY.value,
        title=search_entity.title,
        content_stems="unique first project content",
        content_snippet="This is a test entity in the first project",
        permalink=search_entity.permalink,
        file_path=search_entity.file_path,
        entity_id=search_entity.id,
        metadata={"entity_type": search_entity.entity_type},
        created_at=search_entity.created_at,
        updated_at=search_entity.updated_at,
        project_id=search_repository.project_id,
    )

    search_row2 = SearchIndexRow(
        id=second_entity.id,
        type=SearchItemType.ENTITY.value,
        title=second_entity.title,
        content_stems="unique second project content",
        content_snippet="This is a test entity in the second project",
        permalink=second_entity.permalink,
        file_path=second_entity.file_path,
        entity_id=second_entity.id,
        metadata={"entity_type": second_entity.entity_type},
        created_at=second_entity.created_at,
        updated_at=second_entity.updated_at,
        project_id=second_project_repository.project_id,
    )

    # Index items in their respective repositories
    await search_repository.index_item(search_row1)
    await second_project_repository.index_item(search_row2)

    # Search in first project
    results1 = await search_repository.search(search_text="unique first")
    assert len(results1) == 1
    assert results1[0].title == search_entity.title
    assert results1[0].project_id == search_repository.project_id

    # Search in second project
    results2 = await second_project_repository.search(search_text="unique second")
    assert len(results2) == 1
    assert results2[0].title == second_entity.title
    assert results2[0].project_id == second_project_repository.project_id

    # Make sure first project can't see second project's content
    results_cross1 = await search_repository.search(search_text="unique second")
    assert len(results_cross1) == 0

    # Make sure second project can't see first project's content
    results_cross2 = await second_project_repository.search(search_text="unique first")
    assert len(results_cross2) == 0


@pytest.mark.asyncio
async def test_delete_by_permalink(search_repository, search_entity):
    """Test deleting an item by permalink respects project isolation."""
    # Index the item
    search_row = SearchIndexRow(
        id=search_entity.id,
        type=SearchItemType.ENTITY.value,
        title=search_entity.title,
        content_stems="content to delete",
        content_snippet="This content should be deleted",
        permalink=search_entity.permalink,
        file_path=search_entity.file_path,
        entity_id=search_entity.id,
        metadata={"entity_type": search_entity.entity_type},
        created_at=search_entity.created_at,
        updated_at=search_entity.updated_at,
        project_id=search_repository.project_id,
    )

    await search_repository.index_item(search_row)

    # Verify it exists
    results = await search_repository.search(search_text="content to delete")
    assert len(results) == 1

    # Delete by permalink
    await search_repository.delete_by_permalink(search_entity.permalink)

    # Verify it's gone
    results_after = await search_repository.search(search_text="content to delete")
    assert len(results_after) == 0


@pytest.mark.asyncio
async def test_delete_by_entity_id(search_repository, search_entity):
    """Test deleting an item by entity_id respects project isolation."""
    # Index the item
    search_row = SearchIndexRow(
        id=search_entity.id,
        type=SearchItemType.ENTITY.value,
        title=search_entity.title,
        content_stems="entity to delete",
        content_snippet="This entity should be deleted",
        permalink=search_entity.permalink,
        file_path=search_entity.file_path,
        entity_id=search_entity.id,
        metadata={"entity_type": search_entity.entity_type},
        created_at=search_entity.created_at,
        updated_at=search_entity.updated_at,
        project_id=search_repository.project_id,
    )

    await search_repository.index_item(search_row)

    # Verify it exists
    results = await search_repository.search(search_text="entity to delete")
    assert len(results) == 1

    # Delete by entity_id
    await search_repository.delete_by_entity_id(search_entity.id)

    # Verify it's gone
    results_after = await search_repository.search(search_text="entity to delete")
    assert len(results_after) == 0


@pytest.mark.asyncio
async def test_to_insert_includes_project_id(search_repository):
    """Test that the to_insert method includes project_id."""
    # Create a search index row with project_id
    row = SearchIndexRow(
        id=1234,
        type=SearchItemType.ENTITY.value,
        title="Test Title",
        content_stems="test content",
        content_snippet="test snippet",
        permalink="test/permalink",
        file_path="test/file.md",
        metadata={"test": "metadata"},
        created_at=datetime.now(timezone.utc),
        updated_at=datetime.now(timezone.utc),
        project_id=search_repository.project_id,
    )

    # Get insert data
    insert_data = row.to_insert()

    # Verify project_id is included
    assert "project_id" in insert_data
    assert insert_data["project_id"] == search_repository.project_id


def test_directory_property():
    """Test the directory property of SearchIndexRow."""
    # Test a file in a nested directory
    row1 = SearchIndexRow(
        id=1,
        type=SearchItemType.ENTITY.value,
        file_path="projects/notes/ideas.md",
        created_at=datetime.now(timezone.utc),
        updated_at=datetime.now(timezone.utc),
        project_id=1,
    )
    assert row1.directory == "/projects/notes"

    # Test a file at the root level
    row2 = SearchIndexRow(
        id=2,
        type=SearchItemType.ENTITY.value,
        file_path="README.md",
        created_at=datetime.now(timezone.utc),
        updated_at=datetime.now(timezone.utc),
        project_id=1,
    )
    assert row2.directory == "/"

    # Test a non-entity type with empty file_path
    row3 = SearchIndexRow(
        id=3,
        type=SearchItemType.OBSERVATION.value,
        file_path="",
        created_at=datetime.now(timezone.utc),
        updated_at=datetime.now(timezone.utc),
        project_id=1,
    )
    assert row3.directory == ""


class TestSearchTermPreparation:
    """Test cases for FTS5 search term preparation."""

    def test_simple_terms_get_prefix_wildcard(self, search_repository):
        """Simple alphanumeric terms should get prefix matching."""
        assert search_repository._prepare_search_term("hello") == "hello*"
        assert search_repository._prepare_search_term("project") == "project*"
        assert search_repository._prepare_search_term("test123") == "test123*"

    def test_terms_with_existing_wildcard_unchanged(self, search_repository):
        """Terms that already contain * should remain unchanged."""
        assert search_repository._prepare_search_term("hello*") == "hello*"
        assert search_repository._prepare_search_term("test*world") == "test*world"

    def test_boolean_operators_preserved(self, search_repository):
        """Boolean operators should be preserved without modification."""
        assert search_repository._prepare_search_term("hello AND world") == "hello AND world"
        assert search_repository._prepare_search_term("cat OR dog") == "cat OR dog"
        assert (
            search_repository._prepare_search_term("project NOT meeting") == "project NOT meeting"
        )
        assert (
            search_repository._prepare_search_term("(hello AND world) OR test")
            == "(hello AND world) OR test"
        )

    def test_hyphenated_terms_with_boolean_operators(self, search_repository):
        """Hyphenated terms with Boolean operators should be properly quoted."""
        # Test the specific case from the GitHub issue
        result = search_repository._prepare_search_term("tier1-test AND unicode")
        assert result == '"tier1-test" AND unicode'

        # Test other hyphenated Boolean combinations
        assert (
            search_repository._prepare_search_term("multi-word OR single")
            == '"multi-word" OR single'
        )
        assert (
            search_repository._prepare_search_term("well-formed NOT badly-formed")
            == '"well-formed" NOT "badly-formed"'
        )
        assert (
            search_repository._prepare_search_term("test-case AND (hello OR world)")
            == '"test-case" AND (hello OR world)'
        )

        # Test mixed special characters with Boolean operators
        assert (
            search_repository._prepare_search_term("config.json AND test-file")
            == '"config.json" AND "test-file"'
        )
        assert (
            search_repository._prepare_search_term("C++ OR python-script")
            == '"C++" OR "python-script"'
        )

    def test_programming_terms_should_work(self, search_repository):
        """Programming-related terms with special chars should be searchable."""
        # These should be quoted to handle special characters safely
        assert search_repository._prepare_search_term("C++") == '"C++"*'
        assert search_repository._prepare_search_term("function()") == '"function()"*'
        assert search_repository._prepare_search_term("[email protected]") == '"[email protected]"*'
        assert search_repository._prepare_search_term("array[index]") == '"array[index]"*'
        assert search_repository._prepare_search_term("config.json") == '"config.json"*'

    def test_malformed_fts5_syntax_quoted(self, search_repository):
        """Malformed FTS5 syntax should be quoted to prevent errors."""
        # Multiple operators without proper syntax
        assert search_repository._prepare_search_term("+++invalid+++") == '"+++invalid+++"*'
        assert search_repository._prepare_search_term("!!!error!!!") == '"!!!error!!!"*'
        assert search_repository._prepare_search_term("@#$%^&*()") == '"@#$%^&*()"*'

    def test_quoted_strings_handled_properly(self, search_repository):
        """Strings with quotes should have quotes escaped."""
        assert search_repository._prepare_search_term('say "hello"') == '"say ""hello"""*'
        assert search_repository._prepare_search_term("it's working") == '"it\'s working"*'

    def test_file_paths_no_prefix_wildcard(self, search_repository):
        """File paths should not get prefix wildcards."""
        assert (
            search_repository._prepare_search_term("config.json", is_prefix=False)
            == '"config.json"'
        )
        assert (
            search_repository._prepare_search_term("docs/readme.md", is_prefix=False)
            == '"docs/readme.md"'
        )

    def test_spaces_handled_correctly(self, search_repository):
        """Terms with spaces should use boolean AND for word order independence."""
        assert search_repository._prepare_search_term("hello world") == "hello* AND world*"
        assert (
            search_repository._prepare_search_term("project planning") == "project* AND planning*"
        )

    def test_version_strings_with_dots_handled_correctly(self, search_repository):
        """Version strings with dots should be quoted to prevent FTS5 syntax errors."""
        # This reproduces the bug where "Basic Memory v0.13.0b2" becomes "Basic* AND Memory* AND v0.13.0b2*"
        # which causes FTS5 syntax errors because v0.13.0b2* is not valid FTS5 syntax
        result = search_repository._prepare_search_term("Basic Memory v0.13.0b2")
        # Should be quoted because of dots in v0.13.0b2
        assert result == '"Basic Memory v0.13.0b2"*'

    def test_mixed_special_characters_in_multi_word_queries(self, search_repository):
        """Multi-word queries with special characters in any word should be fully quoted."""
        # Any word containing special characters should cause the entire phrase to be quoted
        assert search_repository._prepare_search_term("config.json file") == '"config.json file"*'
        assert (
            search_repository._prepare_search_term("[email protected] account")
            == '"[email protected] account"*'
        )
        assert search_repository._prepare_search_term("node.js and react") == '"node.js and react"*'

    @pytest.mark.asyncio
    async def test_search_with_special_characters_returns_results(self, search_repository):
        """Integration test: search with special characters should work gracefully."""
        # This test ensures the search doesn't crash with FTS5 syntax errors

        # These should all return empty results gracefully, not crash
        results1 = await search_repository.search(search_text="C++")
        assert isinstance(results1, list)  # Should not crash

        results2 = await search_repository.search(search_text="function()")
        assert isinstance(results2, list)  # Should not crash

        results3 = await search_repository.search(search_text="+++malformed+++")
        assert isinstance(results3, list)  # Should not crash, return empty results

        results4 = await search_repository.search(search_text="[email protected]")
        assert isinstance(results4, list)  # Should not crash

    @pytest.mark.asyncio
    async def test_boolean_search_still_works(self, search_repository):
        """Boolean search operations should continue to work."""
        # These should not crash and should respect boolean logic
        results1 = await search_repository.search(search_text="hello AND world")
        assert isinstance(results1, list)

        results2 = await search_repository.search(search_text="cat OR dog")
        assert isinstance(results2, list)

        results3 = await search_repository.search(search_text="project NOT meeting")
        assert isinstance(results3, list)

    @pytest.mark.asyncio
    async def test_permalink_match_exact_with_slash(self, search_repository):
        """Test exact permalink matching with slash (line 249 coverage)."""
        # This tests the exact match path: if "/" in permalink_text:
        results = await search_repository.search(permalink_match="test/path")
        assert isinstance(results, list)
        # Should use exact equality matching for paths with slashes

    @pytest.mark.asyncio
    async def test_permalink_match_simple_term(self, search_repository):
        """Test permalink matching with simple term (no slash)."""
        # This tests the simple term path that goes through _prepare_search_term
        results = await search_repository.search(permalink_match="simpleterm")
        assert isinstance(results, list)
        # Should use FTS5 MATCH for simple terms

    @pytest.mark.asyncio
    async def test_fts5_error_handling_database_error(self, search_repository):
        """Test that non-FTS5 database errors are properly re-raised."""
        import unittest.mock

        # Mock the scoped_session to raise a non-FTS5 error
        with unittest.mock.patch("basic_memory.db.scoped_session") as mock_scoped_session:
            mock_session = unittest.mock.AsyncMock()
            mock_scoped_session.return_value.__aenter__.return_value = mock_session

            # Simulate a database error that's NOT an FTS5 syntax error
            mock_session.execute.side_effect = Exception("Database connection failed")

            # This should re-raise the exception (not return empty list)
            with pytest.raises(Exception, match="Database connection failed"):
                await search_repository.search(search_text="test")

    @pytest.mark.asyncio
    async def test_version_string_search_integration(self, search_repository, search_entity):
        """Integration test: searching for version strings should work without FTS5 errors."""
        # Index an entity with version information
        search_row = SearchIndexRow(
            id=search_entity.id,
            type=SearchItemType.ENTITY.value,
            title="Basic Memory v0.13.0b2 Release",
            content_stems="basic memory version 0.13.0b2 beta release notes features",
            content_snippet="Basic Memory v0.13.0b2 is a beta release with new features",
            permalink=search_entity.permalink,
            file_path=search_entity.file_path,
            entity_id=search_entity.id,
            metadata={"entity_type": search_entity.entity_type},
            created_at=search_entity.created_at,
            updated_at=search_entity.updated_at,
            project_id=search_repository.project_id,
        )

        await search_repository.index_item(search_row)

        # This should not cause FTS5 syntax errors and should find the entity
        results = await search_repository.search(search_text="Basic Memory v0.13.0b2")
        assert len(results) == 1
        assert results[0].title == "Basic Memory v0.13.0b2 Release"

        # Test other version-like patterns
        results2 = await search_repository.search(search_text="v0.13.0b2")
        assert len(results2) == 1  # Should still find it due to content_stems

        # Test with other problematic patterns
        results3 = await search_repository.search(search_text="node.js version")
        assert isinstance(results3, list)  # Should not crash

    @pytest.mark.asyncio
    async def test_wildcard_only_search(self, search_repository, search_entity):
        """Test that wildcard-only search '*' doesn't cause FTS5 errors (line 243 coverage)."""
        # Index an entity for testing
        search_row = SearchIndexRow(
            id=search_entity.id,
            type=SearchItemType.ENTITY.value,
            title="Test Entity",
            content_stems="test entity content",
            content_snippet="This is a test entity",
            permalink=search_entity.permalink,
            file_path=search_entity.file_path,
            entity_id=search_entity.id,
            metadata={"entity_type": search_entity.entity_type},
            created_at=search_entity.created_at,
            updated_at=search_entity.updated_at,
            project_id=search_repository.project_id,
        )

        await search_repository.index_item(search_row)

        # Test wildcard-only search - should not crash and should return results
        results = await search_repository.search(search_text="*")
        assert isinstance(results, list)  # Should not crash
        assert len(results) >= 1  # Should return all results, including our test entity

        # Test empty string search - should also not crash
        results_empty = await search_repository.search(search_text="")
        assert isinstance(results_empty, list)  # Should not crash

        # Test whitespace-only search
        results_whitespace = await search_repository.search(search_text="   ")
        assert isinstance(results_whitespace, list)  # Should not crash

    def test_boolean_query_empty_parts_coverage(self, search_repository):
        """Test Boolean query parsing with empty parts (line 143 coverage)."""
        # Create queries that will result in empty parts after splitting
        result1 = search_repository._prepare_boolean_query(
            "hello AND  AND world"
        )  # Double operator
        assert "hello" in result1 and "world" in result1

        result2 = search_repository._prepare_boolean_query("  OR test")  # Leading operator
        assert "test" in result2

        result3 = search_repository._prepare_boolean_query("test OR  ")  # Trailing operator
        assert "test" in result3

    def test_parenthetical_term_quote_escaping(self, search_repository):
        """Test quote escaping in parenthetical terms (lines 190-191 coverage)."""
        # Test term with quotes that needs escaping
        result = search_repository._prepare_parenthetical_term('(say "hello" world)')
        # Should escape quotes by doubling them
        assert '""hello""' in result

        # Test term with single quotes
        result2 = search_repository._prepare_parenthetical_term("(it's working)")
        assert "it's working" in result2

    def test_needs_quoting_empty_input(self, search_repository):
        """Test _needs_quoting with empty inputs (line 207 coverage)."""
        # Test empty string
        assert not search_repository._needs_quoting("")

        # Test whitespace-only string
        assert not search_repository._needs_quoting("   ")

        # Test None-like cases
        assert not search_repository._needs_quoting("\t")

    def test_prepare_single_term_empty_input(self, search_repository):
        """Test _prepare_single_term with empty inputs (line 227 coverage)."""
        # Test empty string
        result1 = search_repository._prepare_single_term("")
        assert result1 == ""

        # Test whitespace-only string
        result2 = search_repository._prepare_single_term("   ")
        assert result2 == "   "  # Should return as-is

        # Test string that becomes empty after strip
        result3 = search_repository._prepare_single_term("\t\n")
        assert result3 == "\t\n"  # Should return original

```
Page 11/17FirstPrevNextLast