#
tokens: 49539/50000 32/416 files (page 4/19)
lines: off (toggle) GitHub
raw markdown copy
This is page 4 of 19. Use http://codebase.md/basicmachines-co/basic-memory?lines=false&page={x} to view the full context.

# Directory Structure

```
├── .claude
│   ├── commands
│   │   ├── release
│   │   │   ├── beta.md
│   │   │   ├── changelog.md
│   │   │   ├── release-check.md
│   │   │   └── release.md
│   │   ├── spec.md
│   │   └── test-live.md
│   └── settings.json
├── .dockerignore
├── .env.example
├── .github
│   ├── dependabot.yml
│   ├── ISSUE_TEMPLATE
│   │   ├── bug_report.md
│   │   ├── config.yml
│   │   ├── documentation.md
│   │   └── feature_request.md
│   └── workflows
│       ├── claude-code-review.yml
│       ├── claude-issue-triage.yml
│       ├── claude.yml
│       ├── dev-release.yml
│       ├── docker.yml
│       ├── pr-title.yml
│       ├── release.yml
│       └── test.yml
├── .gitignore
├── .python-version
├── CHANGELOG.md
├── CITATION.cff
├── CLA.md
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── docker-compose-postgres.yml
├── docker-compose.yml
├── Dockerfile
├── docs
│   ├── ai-assistant-guide-extended.md
│   ├── ARCHITECTURE.md
│   ├── character-handling.md
│   ├── cloud-cli.md
│   ├── Docker.md
│   └── testing-coverage.md
├── justfile
├── LICENSE
├── llms-install.md
├── pyproject.toml
├── README.md
├── SECURITY.md
├── smithery.yaml
├── specs
│   ├── SPEC-1 Specification-Driven Development Process.md
│   ├── SPEC-10 Unified Deployment Workflow and Event Tracking.md
│   ├── SPEC-11 Basic Memory API Performance Optimization.md
│   ├── SPEC-12 OpenTelemetry Observability.md
│   ├── SPEC-13 CLI Authentication with Subscription Validation.md
│   ├── SPEC-14 Cloud Git Versioning & GitHub Backup.md
│   ├── SPEC-14- Cloud Git Versioning & GitHub Backup.md
│   ├── SPEC-15 Configuration Persistence via Tigris for Cloud Tenants.md
│   ├── SPEC-16 MCP Cloud Service Consolidation.md
│   ├── SPEC-17 Semantic Search with ChromaDB.md
│   ├── SPEC-18 AI Memory Management Tool.md
│   ├── SPEC-19 Sync Performance and Memory Optimization.md
│   ├── SPEC-2 Slash Commands Reference.md
│   ├── SPEC-20 Simplified Project-Scoped Rclone Sync.md
│   ├── SPEC-3 Agent Definitions.md
│   ├── SPEC-4 Notes Web UI Component Architecture.md
│   ├── SPEC-5 CLI Cloud Upload via WebDAV.md
│   ├── SPEC-6 Explicit Project Parameter Architecture.md
│   ├── SPEC-7 POC to spike Tigris Turso for local access to cloud data.md
│   ├── SPEC-8 TigrisFS Integration.md
│   ├── SPEC-9 Multi-Project Bidirectional Sync Architecture.md
│   ├── SPEC-9 Signed Header Tenant Information.md
│   └── SPEC-9-1 Follow-Ups- Conflict, Sync, and Observability.md
├── src
│   └── basic_memory
│       ├── __init__.py
│       ├── alembic
│       │   ├── alembic.ini
│       │   ├── env.py
│       │   ├── migrations.py
│       │   ├── script.py.mako
│       │   └── versions
│       │       ├── 314f1ea54dc4_add_postgres_full_text_search_support_.py
│       │       ├── 3dae7c7b1564_initial_schema.py
│       │       ├── 502b60eaa905_remove_required_from_entity_permalink.py
│       │       ├── 5fe1ab1ccebe_add_projects_table.py
│       │       ├── 647e7a75e2cd_project_constraint_fix.py
│       │       ├── 6830751f5fb6_merge_multiple_heads.py
│       │       ├── 9d9c1cb7d8f5_add_mtime_and_size_columns_to_entity_.py
│       │       ├── a1b2c3d4e5f6_fix_project_foreign_keys.py
│       │       ├── a2b3c4d5e6f7_add_search_index_entity_cascade.py
│       │       ├── b3c3938bacdb_relation_to_name_unique_index.py
│       │       ├── cc7172b46608_update_search_index_schema.py
│       │       ├── e7e1f4367280_add_scan_watermark_tracking_to_project.py
│       │       ├── f8a9b2c3d4e5_add_pg_trgm_for_fuzzy_link_resolution.py
│       │       └── g9a0b3c4d5e6_add_external_id_to_project_and_entity.py
│       ├── api
│       │   ├── __init__.py
│       │   ├── app.py
│       │   ├── container.py
│       │   ├── routers
│       │   │   ├── __init__.py
│       │   │   ├── directory_router.py
│       │   │   ├── importer_router.py
│       │   │   ├── knowledge_router.py
│       │   │   ├── management_router.py
│       │   │   ├── memory_router.py
│       │   │   ├── project_router.py
│       │   │   ├── prompt_router.py
│       │   │   ├── resource_router.py
│       │   │   ├── search_router.py
│       │   │   └── utils.py
│       │   ├── template_loader.py
│       │   └── v2
│       │       ├── __init__.py
│       │       └── routers
│       │           ├── __init__.py
│       │           ├── directory_router.py
│       │           ├── importer_router.py
│       │           ├── knowledge_router.py
│       │           ├── memory_router.py
│       │           ├── project_router.py
│       │           ├── prompt_router.py
│       │           ├── resource_router.py
│       │           └── search_router.py
│       ├── cli
│       │   ├── __init__.py
│       │   ├── app.py
│       │   ├── auth.py
│       │   ├── commands
│       │   │   ├── __init__.py
│       │   │   ├── cloud
│       │   │   │   ├── __init__.py
│       │   │   │   ├── api_client.py
│       │   │   │   ├── bisync_commands.py
│       │   │   │   ├── cloud_utils.py
│       │   │   │   ├── core_commands.py
│       │   │   │   ├── rclone_commands.py
│       │   │   │   ├── rclone_config.py
│       │   │   │   ├── rclone_installer.py
│       │   │   │   ├── upload_command.py
│       │   │   │   └── upload.py
│       │   │   ├── command_utils.py
│       │   │   ├── db.py
│       │   │   ├── format.py
│       │   │   ├── import_chatgpt.py
│       │   │   ├── import_claude_conversations.py
│       │   │   ├── import_claude_projects.py
│       │   │   ├── import_memory_json.py
│       │   │   ├── mcp.py
│       │   │   ├── project.py
│       │   │   ├── status.py
│       │   │   ├── telemetry.py
│       │   │   └── tool.py
│       │   ├── container.py
│       │   └── main.py
│       ├── config.py
│       ├── db.py
│       ├── deps
│       │   ├── __init__.py
│       │   ├── config.py
│       │   ├── db.py
│       │   ├── importers.py
│       │   ├── projects.py
│       │   ├── repositories.py
│       │   └── services.py
│       ├── deps.py
│       ├── file_utils.py
│       ├── ignore_utils.py
│       ├── importers
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── chatgpt_importer.py
│       │   ├── claude_conversations_importer.py
│       │   ├── claude_projects_importer.py
│       │   ├── memory_json_importer.py
│       │   └── utils.py
│       ├── markdown
│       │   ├── __init__.py
│       │   ├── entity_parser.py
│       │   ├── markdown_processor.py
│       │   ├── plugins.py
│       │   ├── schemas.py
│       │   └── utils.py
│       ├── mcp
│       │   ├── __init__.py
│       │   ├── async_client.py
│       │   ├── clients
│       │   │   ├── __init__.py
│       │   │   ├── directory.py
│       │   │   ├── knowledge.py
│       │   │   ├── memory.py
│       │   │   ├── project.py
│       │   │   ├── resource.py
│       │   │   └── search.py
│       │   ├── container.py
│       │   ├── project_context.py
│       │   ├── prompts
│       │   │   ├── __init__.py
│       │   │   ├── ai_assistant_guide.py
│       │   │   ├── continue_conversation.py
│       │   │   ├── recent_activity.py
│       │   │   ├── search.py
│       │   │   └── utils.py
│       │   ├── resources
│       │   │   ├── ai_assistant_guide.md
│       │   │   └── project_info.py
│       │   ├── server.py
│       │   └── tools
│       │       ├── __init__.py
│       │       ├── build_context.py
│       │       ├── canvas.py
│       │       ├── chatgpt_tools.py
│       │       ├── delete_note.py
│       │       ├── edit_note.py
│       │       ├── list_directory.py
│       │       ├── move_note.py
│       │       ├── project_management.py
│       │       ├── read_content.py
│       │       ├── read_note.py
│       │       ├── recent_activity.py
│       │       ├── search.py
│       │       ├── utils.py
│       │       ├── view_note.py
│       │       └── write_note.py
│       ├── models
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── knowledge.py
│       │   ├── project.py
│       │   └── search.py
│       ├── project_resolver.py
│       ├── repository
│       │   ├── __init__.py
│       │   ├── entity_repository.py
│       │   ├── observation_repository.py
│       │   ├── postgres_search_repository.py
│       │   ├── project_info_repository.py
│       │   ├── project_repository.py
│       │   ├── relation_repository.py
│       │   ├── repository.py
│       │   ├── search_index_row.py
│       │   ├── search_repository_base.py
│       │   ├── search_repository.py
│       │   └── sqlite_search_repository.py
│       ├── runtime.py
│       ├── schemas
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── cloud.py
│       │   ├── delete.py
│       │   ├── directory.py
│       │   ├── importer.py
│       │   ├── memory.py
│       │   ├── project_info.py
│       │   ├── prompt.py
│       │   ├── request.py
│       │   ├── response.py
│       │   ├── search.py
│       │   ├── sync_report.py
│       │   └── v2
│       │       ├── __init__.py
│       │       ├── entity.py
│       │       └── resource.py
│       ├── services
│       │   ├── __init__.py
│       │   ├── context_service.py
│       │   ├── directory_service.py
│       │   ├── entity_service.py
│       │   ├── exceptions.py
│       │   ├── file_service.py
│       │   ├── initialization.py
│       │   ├── link_resolver.py
│       │   ├── project_service.py
│       │   ├── search_service.py
│       │   └── service.py
│       ├── sync
│       │   ├── __init__.py
│       │   ├── background_sync.py
│       │   ├── coordinator.py
│       │   ├── sync_service.py
│       │   └── watch_service.py
│       ├── telemetry.py
│       ├── templates
│       │   └── prompts
│       │       ├── continue_conversation.hbs
│       │       └── search.hbs
│       └── utils.py
├── test-int
│   ├── BENCHMARKS.md
│   ├── cli
│   │   ├── test_project_commands_integration.py
│   │   └── test_version_integration.py
│   ├── conftest.py
│   ├── mcp
│   │   ├── test_build_context_underscore.py
│   │   ├── test_build_context_validation.py
│   │   ├── test_chatgpt_tools_integration.py
│   │   ├── test_default_project_mode_integration.py
│   │   ├── test_delete_note_integration.py
│   │   ├── test_edit_note_integration.py
│   │   ├── test_lifespan_shutdown_sync_task_cancellation_integration.py
│   │   ├── test_list_directory_integration.py
│   │   ├── test_move_note_integration.py
│   │   ├── test_project_management_integration.py
│   │   ├── test_project_state_sync_integration.py
│   │   ├── test_read_content_integration.py
│   │   ├── test_read_note_integration.py
│   │   ├── test_search_integration.py
│   │   ├── test_single_project_mcp_integration.py
│   │   └── test_write_note_integration.py
│   ├── test_db_wal_mode.py
│   └── test_disable_permalinks_integration.py
├── tests
│   ├── __init__.py
│   ├── api
│   │   ├── conftest.py
│   │   ├── test_api_container.py
│   │   ├── test_async_client.py
│   │   ├── test_continue_conversation_template.py
│   │   ├── test_directory_router.py
│   │   ├── test_importer_router.py
│   │   ├── test_knowledge_router.py
│   │   ├── test_management_router.py
│   │   ├── test_memory_router.py
│   │   ├── test_project_router_operations.py
│   │   ├── test_project_router.py
│   │   ├── test_prompt_router.py
│   │   ├── test_relation_background_resolution.py
│   │   ├── test_resource_router.py
│   │   ├── test_search_router.py
│   │   ├── test_search_template.py
│   │   ├── test_template_loader_helpers.py
│   │   ├── test_template_loader.py
│   │   └── v2
│   │       ├── __init__.py
│   │       ├── conftest.py
│   │       ├── test_directory_router.py
│   │       ├── test_importer_router.py
│   │       ├── test_knowledge_router.py
│   │       ├── test_memory_router.py
│   │       ├── test_project_router.py
│   │       ├── test_prompt_router.py
│   │       ├── test_resource_router.py
│   │       └── test_search_router.py
│   ├── cli
│   │   ├── cloud
│   │   │   ├── test_cloud_api_client_and_utils.py
│   │   │   ├── test_rclone_config_and_bmignore_filters.py
│   │   │   └── test_upload_path.py
│   │   ├── conftest.py
│   │   ├── test_auth_cli_auth.py
│   │   ├── test_cli_container.py
│   │   ├── test_cli_exit.py
│   │   ├── test_cli_tool_exit.py
│   │   ├── test_cli_tools.py
│   │   ├── test_cloud_authentication.py
│   │   ├── test_ignore_utils.py
│   │   ├── test_import_chatgpt.py
│   │   ├── test_import_claude_conversations.py
│   │   ├── test_import_claude_projects.py
│   │   ├── test_import_memory_json.py
│   │   ├── test_project_add_with_local_path.py
│   │   └── test_upload.py
│   ├── conftest.py
│   ├── db
│   │   └── test_issue_254_foreign_key_constraints.py
│   ├── importers
│   │   ├── test_conversation_indexing.py
│   │   ├── test_importer_base.py
│   │   └── test_importer_utils.py
│   ├── markdown
│   │   ├── __init__.py
│   │   ├── test_date_frontmatter_parsing.py
│   │   ├── test_entity_parser_error_handling.py
│   │   ├── test_entity_parser.py
│   │   ├── test_markdown_plugins.py
│   │   ├── test_markdown_processor.py
│   │   ├── test_observation_edge_cases.py
│   │   ├── test_parser_edge_cases.py
│   │   ├── test_relation_edge_cases.py
│   │   └── test_task_detection.py
│   ├── mcp
│   │   ├── clients
│   │   │   ├── __init__.py
│   │   │   └── test_clients.py
│   │   ├── conftest.py
│   │   ├── test_async_client_modes.py
│   │   ├── test_mcp_container.py
│   │   ├── test_obsidian_yaml_formatting.py
│   │   ├── test_permalink_collision_file_overwrite.py
│   │   ├── test_project_context.py
│   │   ├── test_prompts.py
│   │   ├── test_recent_activity_prompt_modes.py
│   │   ├── test_resources.py
│   │   ├── test_server_lifespan_branches.py
│   │   ├── test_tool_build_context.py
│   │   ├── test_tool_canvas.py
│   │   ├── test_tool_delete_note.py
│   │   ├── test_tool_edit_note.py
│   │   ├── test_tool_list_directory.py
│   │   ├── test_tool_move_note.py
│   │   ├── test_tool_project_management.py
│   │   ├── test_tool_read_content.py
│   │   ├── test_tool_read_note.py
│   │   ├── test_tool_recent_activity.py
│   │   ├── test_tool_resource.py
│   │   ├── test_tool_search.py
│   │   ├── test_tool_utils.py
│   │   ├── test_tool_view_note.py
│   │   ├── test_tool_write_note_kebab_filenames.py
│   │   ├── test_tool_write_note.py
│   │   └── tools
│   │       └── test_chatgpt_tools.py
│   ├── Non-MarkdownFileSupport.pdf
│   ├── README.md
│   ├── repository
│   │   ├── test_entity_repository_upsert.py
│   │   ├── test_entity_repository.py
│   │   ├── test_entity_upsert_issue_187.py
│   │   ├── test_observation_repository.py
│   │   ├── test_postgres_search_repository.py
│   │   ├── test_project_info_repository.py
│   │   ├── test_project_repository.py
│   │   ├── test_relation_repository.py
│   │   ├── test_repository.py
│   │   ├── test_search_repository_edit_bug_fix.py
│   │   └── test_search_repository.py
│   ├── schemas
│   │   ├── test_base_timeframe_minimum.py
│   │   ├── test_memory_serialization.py
│   │   ├── test_memory_url_validation.py
│   │   ├── test_memory_url.py
│   │   ├── test_relation_response_reference_resolution.py
│   │   ├── test_schemas.py
│   │   └── test_search.py
│   ├── Screenshot.png
│   ├── services
│   │   ├── test_context_service.py
│   │   ├── test_directory_service.py
│   │   ├── test_entity_service_disable_permalinks.py
│   │   ├── test_entity_service.py
│   │   ├── test_file_service.py
│   │   ├── test_initialization_cloud_mode_branches.py
│   │   ├── test_initialization.py
│   │   ├── test_link_resolver.py
│   │   ├── test_project_removal_bug.py
│   │   ├── test_project_service_operations.py
│   │   ├── test_project_service.py
│   │   └── test_search_service.py
│   ├── sync
│   │   ├── test_character_conflicts.py
│   │   ├── test_coordinator.py
│   │   ├── test_sync_service_incremental.py
│   │   ├── test_sync_service.py
│   │   ├── test_sync_wikilink_issue.py
│   │   ├── test_tmp_files.py
│   │   ├── test_watch_service_atomic_adds.py
│   │   ├── test_watch_service_edge_cases.py
│   │   ├── test_watch_service_reload.py
│   │   └── test_watch_service.py
│   ├── test_config.py
│   ├── test_deps.py
│   ├── test_production_cascade_delete.py
│   ├── test_project_resolver.py
│   ├── test_rclone_commands.py
│   ├── test_runtime.py
│   ├── test_telemetry.py
│   └── utils
│       ├── test_file_utils.py
│       ├── test_frontmatter_obsidian_compatible.py
│       ├── test_parse_tags.py
│       ├── test_permalink_formatting.py
│       ├── test_timezone_utils.py
│       ├── test_utf8_handling.py
│       └── test_validate_project_path.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/src/basic_memory/mcp/tools/list_directory.py:
--------------------------------------------------------------------------------

```python
"""List directory tool for Basic Memory MCP server."""

from typing import Optional

from loguru import logger
from fastmcp import Context

from basic_memory.mcp.async_client import get_client
from basic_memory.mcp.project_context import get_active_project
from basic_memory.mcp.server import mcp
from basic_memory.telemetry import track_mcp_tool


@mcp.tool(
    description="List directory contents with filtering and depth control.",
)
async def list_directory(
    dir_name: str = "/",
    depth: int = 1,
    file_name_glob: Optional[str] = None,
    project: Optional[str] = None,
    context: Context | None = None,
) -> str:
    """List directory contents from the knowledge base with optional filtering.

    This tool provides 'ls' functionality for browsing the knowledge base directory structure.
    It can list immediate children or recursively explore subdirectories with depth control,
    and supports glob pattern filtering for finding specific files.

    Args:
        dir_name: Directory path to list (default: root "/")
                 Examples: "/", "/projects", "/research/ml"
        depth: Recursion depth (1-10, default: 1 for immediate children only)
               Higher values show subdirectory contents recursively
        file_name_glob: Optional glob pattern for filtering file names
                       Examples: "*.md", "*meeting*", "project_*"
        project: Project name to list directory from. Optional - server will resolve using hierarchy.
                If unknown, use list_memory_projects() to discover available projects.
        context: Optional FastMCP context for performance caching.

    Returns:
        Formatted listing of directory contents with file metadata

    Examples:
        # List root directory contents
        list_directory()

        # List specific folder
        list_directory(dir_name="/projects")

        # Find all markdown files
        list_directory(file_name_glob="*.md")

        # Deep exploration of research folder
        list_directory(dir_name="/research", depth=3)

        # Find meeting notes in projects folder
        list_directory(dir_name="/projects", file_name_glob="*meeting*")

        # Explicit project specification
        list_directory(project="work-docs", dir_name="/projects")

    Raises:
        ToolError: If project doesn't exist or directory path is invalid
    """
    track_mcp_tool("list_directory")
    async with get_client() as client:
        active_project = await get_active_project(client, project, context)

        logger.debug(
            f"Listing directory '{dir_name}' in project {project} with depth={depth}, glob='{file_name_glob}'"
        )

        # Import here to avoid circular import
        from basic_memory.mcp.clients import DirectoryClient

        # Use typed DirectoryClient for API calls
        directory_client = DirectoryClient(client, active_project.external_id)
        nodes = await directory_client.list(dir_name, depth=depth, file_name_glob=file_name_glob)

        if not nodes:
            filter_desc = ""
            if file_name_glob:
                filter_desc = f" matching '{file_name_glob}'"
            return f"No files found in directory '{dir_name}'{filter_desc}"

        # Format the results
        output_lines = []
        if file_name_glob:
            output_lines.append(
                f"Files in '{dir_name}' matching '{file_name_glob}' (depth {depth}):"
            )
        else:
            output_lines.append(f"Contents of '{dir_name}' (depth {depth}):")
        output_lines.append("")

        # Group by type and sort
        directories = [n for n in nodes if n["type"] == "directory"]
        files = [n for n in nodes if n["type"] == "file"]

        # Sort by name
        directories.sort(key=lambda x: x["name"])
        files.sort(key=lambda x: x["name"])

        # Display directories first
        for node in directories:
            path_display = node["directory_path"]
            output_lines.append(f"📁 {node['name']:<30} {path_display}")

        # Add separator if we have both directories and files
        if directories and files:
            output_lines.append("")

        # Display files with metadata
        for node in files:
            path_display = node["directory_path"]
            title = node.get("title", "")
            updated = node.get("updated_at", "")

            # Remove leading slash if present, requesting the file via read_note does not use the beginning slash'
            if path_display.startswith("/"):
                path_display = path_display[1:]

            # Format date if available
            date_str = ""
            if updated:
                try:
                    from datetime import datetime

                    dt = datetime.fromisoformat(updated.replace("Z", "+00:00"))
                    date_str = dt.strftime("%Y-%m-%d")
                except Exception:  # pragma: no cover
                    date_str = updated[:10] if len(updated) >= 10 else ""

            # Create formatted line
            file_line = f"📄 {node['name']:<30} {path_display}"
            if title and title != node["name"]:
                file_line += f" | {title}"
            if date_str:
                file_line += f" | {date_str}"

            output_lines.append(file_line)

        # Add summary
        output_lines.append("")
        total_count = len(directories) + len(files)
        summary_parts = []
        if directories:
            summary_parts.append(
                f"{len(directories)} director{'y' if len(directories) == 1 else 'ies'}"
            )
        if files:
            summary_parts.append(f"{len(files)} file{'s' if len(files) != 1 else ''}")

        output_lines.append(f"Total: {total_count} items ({', '.join(summary_parts)})")

        return "\n".join(output_lines)

```

--------------------------------------------------------------------------------
/tests/cli/cloud/test_cloud_api_client_and_utils.py:
--------------------------------------------------------------------------------

```python
from contextlib import asynccontextmanager
import json

import httpx
import pytest

from basic_memory.cli.auth import CLIAuth
from basic_memory.cli.commands.cloud.api_client import (
    SubscriptionRequiredError,
    make_api_request,
)
from basic_memory.cli.commands.cloud.cloud_utils import (
    create_cloud_project,
    fetch_cloud_projects,
    project_exists,
)


@pytest.mark.asyncio
async def test_make_api_request_success_injects_auth_and_accept_encoding(
    config_home, config_manager
):
    # Arrange: create a token on disk so CLIAuth can authenticate without any network.
    auth = CLIAuth(client_id="cid", authkit_domain="https://auth.example.test")
    auth.token_file.parent.mkdir(parents=True, exist_ok=True)
    auth.token_file.write_text(
        '{"access_token":"token-123","refresh_token":null,"expires_at":9999999999,"token_type":"Bearer"}',
        encoding="utf-8",
    )

    async def handler(request: httpx.Request) -> httpx.Response:
        assert request.headers.get("authorization") == "Bearer token-123"
        assert request.headers.get("accept-encoding") == "identity"
        return httpx.Response(200, json={"ok": True})

    transport = httpx.MockTransport(handler)

    @asynccontextmanager
    async def http_client_factory():
        async with httpx.AsyncClient(transport=transport) as client:
            yield client

    # Act
    resp = await make_api_request(
        method="GET",
        url="https://cloud.example.test/proxy/health",
        auth=auth,
        http_client_factory=http_client_factory,
    )

    # Assert
    assert resp.json()["ok"] is True


@pytest.mark.asyncio
async def test_make_api_request_raises_subscription_required(config_home, config_manager):
    auth = CLIAuth(client_id="cid", authkit_domain="https://auth.example.test")
    auth.token_file.parent.mkdir(parents=True, exist_ok=True)
    auth.token_file.write_text(
        '{"access_token":"token-123","refresh_token":null,"expires_at":9999999999,"token_type":"Bearer"}',
        encoding="utf-8",
    )

    async def handler(_request: httpx.Request) -> httpx.Response:
        return httpx.Response(
            403,
            json={
                "detail": {
                    "error": "subscription_required",
                    "message": "Need subscription",
                    "subscribe_url": "https://example.test/subscribe",
                }
            },
        )

    transport = httpx.MockTransport(handler)

    @asynccontextmanager
    async def http_client_factory():
        async with httpx.AsyncClient(transport=transport) as client:
            yield client

    with pytest.raises(SubscriptionRequiredError) as exc:
        await make_api_request(
            method="GET",
            url="https://cloud.example.test/proxy/health",
            auth=auth,
            http_client_factory=http_client_factory,
        )

    assert exc.value.subscribe_url == "https://example.test/subscribe"


@pytest.mark.asyncio
async def test_cloud_utils_fetch_and_exists_and_create_project(
    config_home, config_manager, monkeypatch
):
    # Point config.cloud_host at our mocked base URL
    config = config_manager.load_config()
    config.cloud_host = "https://cloud.example.test"
    config_manager.save_config(config)

    auth = CLIAuth(client_id="cid", authkit_domain="https://auth.example.test")
    auth.token_file.parent.mkdir(parents=True, exist_ok=True)
    auth.token_file.write_text(
        '{"access_token":"token-123","refresh_token":null,"expires_at":9999999999,"token_type":"Bearer"}',
        encoding="utf-8",
    )

    seen = {"create_payload": None}

    async def handler(request: httpx.Request) -> httpx.Response:
        if request.method == "GET" and request.url.path == "/proxy/projects/projects":
            return httpx.Response(
                200,
                json={
                    "projects": [
                        {"id": 1, "name": "alpha", "path": "alpha", "is_default": True},
                        {"id": 2, "name": "beta", "path": "beta", "is_default": False},
                    ]
                },
            )

        if request.method == "POST" and request.url.path == "/proxy/projects/projects":
            # httpx.Request doesn't have .json(); parse bytes payload.
            seen["create_payload"] = json.loads(request.content.decode("utf-8"))
            return httpx.Response(
                200,
                json={
                    "message": "created",
                    "status": "success",
                    "default": False,
                    "old_project": None,
                    "new_project": {
                        "name": seen["create_payload"]["name"],
                        "path": seen["create_payload"]["path"],
                    },
                },
            )

        raise AssertionError(f"Unexpected request: {request.method} {request.url}")

    transport = httpx.MockTransport(handler)

    @asynccontextmanager
    async def http_client_factory():
        async with httpx.AsyncClient(
            transport=transport, base_url="https://cloud.example.test"
        ) as client:
            yield client

    async def api_request(**kwargs):
        return await make_api_request(auth=auth, http_client_factory=http_client_factory, **kwargs)

    projects = await fetch_cloud_projects(api_request=api_request)
    assert [p.name for p in projects.projects] == ["alpha", "beta"]

    assert await project_exists("alpha", api_request=api_request) is True
    assert await project_exists("missing", api_request=api_request) is False

    created = await create_cloud_project("My Project", api_request=api_request)
    assert created.new_project is not None
    assert created.new_project["name"] == "My Project"
    # Path should be permalink-like (kebab)
    assert seen["create_payload"]["path"] == "my-project"

```

--------------------------------------------------------------------------------
/src/basic_memory/api/v2/routers/importer_router.py:
--------------------------------------------------------------------------------

```python
"""V2 Import Router - ID-based data import operations.

This router uses v2 dependencies for consistent project handling with external_id UUIDs.
Import endpoints use project_id in the path for consistency with other v2 endpoints.
"""

import json
import logging

from fastapi import APIRouter, Form, HTTPException, UploadFile, status, Path

from basic_memory.deps import (
    ChatGPTImporterV2ExternalDep,
    ClaudeConversationsImporterV2ExternalDep,
    ClaudeProjectsImporterV2ExternalDep,
    MemoryJsonImporterV2ExternalDep,
)
from basic_memory.importers import Importer
from basic_memory.schemas.importer import (
    ChatImportResult,
    EntityImportResult,
    ProjectImportResult,
)

logger = logging.getLogger(__name__)

router = APIRouter(prefix="/import", tags=["import-v2"])


@router.post("/chatgpt", response_model=ChatImportResult)
async def import_chatgpt(
    importer: ChatGPTImporterV2ExternalDep,
    file: UploadFile,
    project_id: str = Path(..., description="Project external UUID"),
    folder: str = Form("conversations"),
) -> ChatImportResult:
    """Import conversations from ChatGPT JSON export.

    Args:
        project_id: Project external UUID from URL path
        file: The ChatGPT conversations.json file.
        folder: The folder to place the files in.
        importer: ChatGPT importer instance.

    Returns:
        ChatImportResult with import statistics.

    Raises:
        HTTPException: If import fails.
    """
    logger.info(f"V2 Importing ChatGPT conversations for project {project_id}")
    return await import_file(importer, file, folder)


@router.post("/claude/conversations", response_model=ChatImportResult)
async def import_claude_conversations(
    importer: ClaudeConversationsImporterV2ExternalDep,
    file: UploadFile,
    project_id: str = Path(..., description="Project external UUID"),
    folder: str = Form("conversations"),
) -> ChatImportResult:
    """Import conversations from Claude conversations.json export.

    Args:
        project_id: Project external UUID from URL path
        file: The Claude conversations.json file.
        folder: The folder to place the files in.
        importer: Claude conversations importer instance.

    Returns:
        ChatImportResult with import statistics.

    Raises:
        HTTPException: If import fails.
    """
    logger.info(f"V2 Importing Claude conversations for project {project_id}")
    return await import_file(importer, file, folder)


@router.post("/claude/projects", response_model=ProjectImportResult)
async def import_claude_projects(
    importer: ClaudeProjectsImporterV2ExternalDep,
    file: UploadFile,
    project_id: str = Path(..., description="Project external UUID"),
    folder: str = Form("projects"),
) -> ProjectImportResult:
    """Import projects from Claude projects.json export.

    Args:
        project_id: Project external UUID from URL path
        file: The Claude projects.json file.
        folder: The base folder to place the files in.
        importer: Claude projects importer instance.

    Returns:
        ProjectImportResult with import statistics.

    Raises:
        HTTPException: If import fails.
    """
    logger.info(f"V2 Importing Claude projects for project {project_id}")
    return await import_file(importer, file, folder)


@router.post("/memory-json", response_model=EntityImportResult)
async def import_memory_json(
    importer: MemoryJsonImporterV2ExternalDep,
    file: UploadFile,
    project_id: str = Path(..., description="Project external UUID"),
    folder: str = Form("conversations"),
) -> EntityImportResult:
    """Import entities and relations from a memory.json file.

    Args:
        project_id: Project external UUID from URL path
        file: The memory.json file.
        folder: Optional destination folder within the project.
        importer: Memory JSON importer instance.

    Returns:
        EntityImportResult with import statistics.

    Raises:
        HTTPException: If import fails.
    """
    logger.info(f"V2 Importing memory.json for project {project_id}")
    try:
        file_data = []
        file_bytes = await file.read()
        file_str = file_bytes.decode("utf-8")
        for line in file_str.splitlines():
            json_data = json.loads(line)
            file_data.append(json_data)

        result = await importer.import_data(file_data, folder)
        if not result.success:  # pragma: no cover
            raise HTTPException(
                status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
                detail=result.error_message or "Import failed",
            )
    except Exception as e:
        logger.exception("V2 Import failed")
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail=f"Import failed: {str(e)}",
        )
    return result


async def import_file(importer: Importer, file: UploadFile, destination_folder: str):
    """Helper function to import a file using an importer instance.

    Args:
        importer: The importer instance to use
        file: The file to import
        destination_folder: Destination folder for imported content

    Returns:
        Import result from the importer

    Raises:
        HTTPException: If import fails
    """
    try:
        # Process file
        json_data = json.load(file.file)
        result = await importer.import_data(json_data, destination_folder)
        if not result.success:  # pragma: no cover
            raise HTTPException(
                status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
                detail=result.error_message or "Import failed",
            )

        return result

    except Exception as e:
        logger.exception("V2 Import failed")
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail=f"Import failed: {str(e)}",
        )

```

--------------------------------------------------------------------------------
/test-int/cli/test_project_commands_integration.py:
--------------------------------------------------------------------------------

```python
"""Integration tests for project CLI commands."""

import tempfile
from pathlib import Path

from typer.testing import CliRunner

from basic_memory.cli.main import app as cli_app


def test_project_list(app, app_config, test_project, config_manager):
    """Test 'bm project list' command shows projects."""
    runner = CliRunner()
    result = runner.invoke(cli_app, ["project", "list"])

    if result.exit_code != 0:
        print(f"STDOUT: {result.stdout}")
        print(f"STDERR: {result.stderr}")
        print(f"Exception: {result.exception}")
    assert result.exit_code == 0
    assert "test-project" in result.stdout
    assert "[X]" in result.stdout  # default marker


def test_project_info(app, app_config, test_project, config_manager):
    """Test 'bm project info' command shows project details."""
    runner = CliRunner()
    result = runner.invoke(cli_app, ["project", "info", "test-project"])

    if result.exit_code != 0:
        print(f"STDOUT: {result.stdout}")
        print(f"STDERR: {result.stderr}")
    assert result.exit_code == 0
    assert "Basic Memory Project Info" in result.stdout
    assert "test-project" in result.stdout
    assert "Statistics" in result.stdout


def test_project_info_json(app, app_config, test_project, config_manager):
    """Test 'bm project info --json' command outputs valid JSON."""
    import json

    runner = CliRunner()
    result = runner.invoke(cli_app, ["project", "info", "test-project", "--json"])

    if result.exit_code != 0:
        print(f"STDOUT: {result.stdout}")
        print(f"STDERR: {result.stderr}")
    assert result.exit_code == 0

    # Parse JSON to verify it's valid
    data = json.loads(result.stdout)
    assert data["project_name"] == "test-project"
    assert "statistics" in data
    assert "system" in data


def test_project_add_and_remove(app, app_config, config_manager):
    """Test adding and removing a project."""
    runner = CliRunner()

    # Use a separate temporary directory to avoid nested path conflicts
    with tempfile.TemporaryDirectory() as temp_dir:
        new_project_path = Path(temp_dir) / "new-project"
        new_project_path.mkdir()

        # Add project
        result = runner.invoke(cli_app, ["project", "add", "new-project", str(new_project_path)])

        if result.exit_code != 0:
            print(f"STDOUT: {result.stdout}")
            print(f"STDERR: {result.stderr}")
        assert result.exit_code == 0
        assert (
            "Project 'new-project' added successfully" in result.stdout
            or "added" in result.stdout.lower()
        )

        # Verify it shows up in list
        result = runner.invoke(cli_app, ["project", "list"])
        assert result.exit_code == 0
        assert "new-project" in result.stdout

        # Remove project
        result = runner.invoke(cli_app, ["project", "remove", "new-project"])
        assert result.exit_code == 0
        assert "removed" in result.stdout.lower() or "deleted" in result.stdout.lower()


def test_project_set_default(app, app_config, config_manager):
    """Test setting default project."""
    runner = CliRunner()

    # Use a separate temporary directory to avoid nested path conflicts
    with tempfile.TemporaryDirectory() as temp_dir:
        new_project_path = Path(temp_dir) / "another-project"
        new_project_path.mkdir()

        # Add a second project
        result = runner.invoke(
            cli_app, ["project", "add", "another-project", str(new_project_path)]
        )
        if result.exit_code != 0:
            print(f"STDOUT: {result.stdout}")
            print(f"STDERR: {result.stderr}")
        assert result.exit_code == 0

        # Set as default
        result = runner.invoke(cli_app, ["project", "default", "another-project"])
        if result.exit_code != 0:
            print(f"STDOUT: {result.stdout}")
            print(f"STDERR: {result.stderr}")
        assert result.exit_code == 0
        assert "default" in result.stdout.lower()

        # Verify in list
        result = runner.invoke(cli_app, ["project", "list"])
        assert result.exit_code == 0
        # The new project should have the [X] marker now
        lines = result.stdout.split("\n")
        for line in lines:
            if "another-project" in line:
                assert "[X]" in line


def test_remove_main_project(app, app_config, config_manager):
    """Test that removing main project then listing projects prevents main from reappearing (issue #397)."""
    runner = CliRunner()

    # Create separate temp dirs for each project
    with (
        tempfile.TemporaryDirectory() as main_dir,
        tempfile.TemporaryDirectory() as new_default_dir,
    ):
        main_path = Path(main_dir)
        new_default_path = Path(new_default_dir)

        # Ensure main exists
        result = runner.invoke(cli_app, ["project", "list"])
        if "main" not in result.stdout:
            result = runner.invoke(cli_app, ["project", "add", "main", str(main_path)])
            print(result.stdout)
            assert result.exit_code == 0

        # Confirm main is present
        result = runner.invoke(cli_app, ["project", "list"])
        assert "main" in result.stdout

        # Add a second project
        result = runner.invoke(cli_app, ["project", "add", "new_default", str(new_default_path)])
        assert result.exit_code == 0

        # Set new_default as default (if needed)
        result = runner.invoke(cli_app, ["project", "default", "new_default"])
        assert result.exit_code == 0

        # Remove main
        result = runner.invoke(cli_app, ["project", "remove", "main"])
        assert result.exit_code == 0

        # Confirm only new_default exists and main does not
        result = runner.invoke(cli_app, ["project", "list"])
        assert result.exit_code == 0
        assert "main" not in result.stdout
        assert "new_default" in result.stdout

```

--------------------------------------------------------------------------------
/tests/sync/test_tmp_files.py:
--------------------------------------------------------------------------------

```python
"""Test proper handling of .tmp files during sync."""

import asyncio
from pathlib import Path

import pytest
from watchfiles import Change


async def create_test_file(path: Path, content: str = "test content") -> None:
    """Create a test file with given content."""
    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(content)


@pytest.mark.asyncio
async def test_temp_file_filter(watch_service, app_config, project_config, test_project):
    """Test that .tmp files are correctly filtered out."""
    # Test filter_changes method directly
    tmp_path = Path(test_project.path) / "test.tmp"
    assert not watch_service.filter_changes(Change.added, str(tmp_path))

    # Test with valid file
    valid_path = Path(test_project.path) / "test.md"
    assert watch_service.filter_changes(Change.added, str(valid_path))


@pytest.mark.asyncio
async def test_handle_tmp_files(watch_service, project_config, test_project, sync_service):
    """Test handling of .tmp files during sync process."""
    project_dir = Path(test_project.path)

    # Create a .tmp file - this simulates a file being written with write_file_atomic
    tmp_file = project_dir / "test.tmp"
    await create_test_file(tmp_file, "This is a temporary file")

    # Create the target final file
    final_file = project_dir / "test.md"
    await create_test_file(final_file, "This is the final file")

    # Setup changes that include both the .tmp and final file
    changes = {
        (Change.added, str(tmp_file)),
        (Change.added, str(final_file)),
    }

    # Handle changes
    await watch_service.handle_changes(test_project, changes)

    # Verify only the final file got an entity
    tmp_entity = await sync_service.entity_repository.get_by_file_path("test.tmp")
    final_entity = await sync_service.entity_repository.get_by_file_path("test.md")

    assert tmp_entity is None, "Temp file should not have an entity"
    assert final_entity is not None, "Final file should have an entity"


@pytest.mark.asyncio
async def test_atomic_write_tmp_file_handling(
    watch_service, project_config, test_project, sync_service
):
    """Test handling of file changes during atomic write operations."""
    project_dir = project_config.home

    # This test simulates the full atomic write process:
    # 1. First a .tmp file is created
    # 2. Then the .tmp file is renamed to the final file
    # 3. Both events are processed by the watch service

    # Setup file paths
    tmp_path = project_dir / "document.tmp"
    final_path = project_dir / "document.md"

    # Create mockup of the atomic write process
    await create_test_file(tmp_path, "Content for document")

    # First batch of changes - .tmp file created
    changes1 = {(Change.added, str(tmp_path))}

    # Process first batch
    await watch_service.handle_changes(test_project, changes1)

    # Now "replace" the temp file with the final file
    tmp_path.rename(final_path)

    # Second batch of changes - .tmp file deleted, final file added
    changes2 = {(Change.deleted, str(tmp_path)), (Change.added, str(final_path))}

    # Process second batch
    await watch_service.handle_changes(test_project, changes2)

    # Verify only the final file is in the database
    tmp_entity = await sync_service.entity_repository.get_by_file_path("document.tmp")
    final_entity = await sync_service.entity_repository.get_by_file_path("document.md")

    assert tmp_entity is None, "Temp file should not have an entity"
    assert final_entity is not None, "Final file should have an entity"

    # Check events
    new_events = [e for e in watch_service.state.recent_events if e.action == "new"]
    assert len(new_events) == 1
    assert new_events[0].path == "document.md"


@pytest.mark.asyncio
async def test_rapid_atomic_writes(watch_service, project_config, test_project, sync_service):
    """Test handling of rapid atomic writes to the same destination."""
    project_dir = Path(test_project.path)

    # This test simulates multiple rapid atomic writes to the same file:
    # 1. Several .tmp files are created one after another
    # 2. Each is then renamed to the same final file
    # 3. Events are batched and processed together

    # Setup file paths
    tmp1_path = project_dir / "document.1.tmp"
    tmp2_path = project_dir / "document.2.tmp"
    final_path = project_dir / "document.md"

    # Create multiple temp files that will be used in sequence
    await create_test_file(tmp1_path, "First version")
    await create_test_file(tmp2_path, "Second version")

    # Simulate the first atomic write
    tmp1_path.replace(final_path)

    # Brief pause to ensure file system registers the change
    await asyncio.sleep(0.1)

    # Read content to verify
    content1 = final_path.read_text(encoding="utf-8")
    assert content1 == "First version"

    # Simulate the second atomic write
    tmp2_path.replace(final_path)

    # Verify content was updated
    content2 = final_path.read_text(encoding="utf-8")
    assert content2 == "Second version"

    # Create a batch of changes that might arrive in mixed order
    changes = {
        (Change.added, str(tmp1_path)),
        (Change.deleted, str(tmp1_path)),
        (Change.added, str(tmp2_path)),
        (Change.deleted, str(tmp2_path)),
        (Change.added, str(final_path)),
        (Change.modified, str(final_path)),
    }

    # Process all changes
    await watch_service.handle_changes(test_project, changes)

    # Verify only the final file is in the database
    final_entity = await sync_service.entity_repository.get_by_file_path("document.md")
    assert final_entity is not None

    # Also verify no tmp entities were created
    tmp1_entity = await sync_service.entity_repository.get_by_file_path("document.1.tmp")
    tmp2_entity = await sync_service.entity_repository.get_by_file_path("document.2.tmp")
    assert tmp1_entity is None
    assert tmp2_entity is None

```

--------------------------------------------------------------------------------
/src/basic_memory/mcp/project_context.py:
--------------------------------------------------------------------------------

```python
"""Project context utilities for Basic Memory MCP server.

Provides project lookup utilities for MCP tools.
Handles project validation and context management in one place.

Note: This module uses ProjectResolver for unified project resolution.
The resolve_project_parameter function is a thin wrapper for backwards
compatibility with existing MCP tools.
"""

from typing import Optional, List
from httpx import AsyncClient
from httpx._types import (
    HeaderTypes,
)
from loguru import logger
from fastmcp import Context

from basic_memory.config import ConfigManager
from basic_memory.project_resolver import ProjectResolver
from basic_memory.schemas.project_info import ProjectItem, ProjectList
from basic_memory.utils import generate_permalink


async def resolve_project_parameter(
    project: Optional[str] = None,
    allow_discovery: bool = False,
    cloud_mode: Optional[bool] = None,
    default_project_mode: Optional[bool] = None,
    default_project: Optional[str] = None,
) -> Optional[str]:
    """Resolve project parameter using three-tier hierarchy.

    This is a thin wrapper around ProjectResolver for backwards compatibility.
    New code should consider using ProjectResolver directly for more detailed
    resolution information.

    if cloud_mode:
        project is required (unless allow_discovery=True for tools that support discovery mode)
    else:
        Resolution order:
        1. Single Project Mode  (--project cli arg, or BASIC_MEMORY_MCP_PROJECT env var) - highest priority
        2. Explicit project parameter - medium priority
        3. Default project if default_project_mode=true - lowest priority

    Args:
        project: Optional explicit project parameter
        allow_discovery: If True, allows returning None in cloud mode for discovery mode
            (used by tools like recent_activity that can operate across all projects)
        cloud_mode: Optional explicit cloud mode. If not provided, reads from ConfigManager.
        default_project_mode: Optional explicit default project mode. If not provided, reads from ConfigManager.
        default_project: Optional explicit default project. If not provided, reads from ConfigManager.

    Returns:
        Resolved project name or None if no resolution possible
    """
    # Load config for any values not explicitly provided
    if cloud_mode is None or default_project_mode is None or default_project is None:
        config = ConfigManager().config
        if cloud_mode is None:
            cloud_mode = config.cloud_mode
        if default_project_mode is None:
            default_project_mode = config.default_project_mode
        if default_project is None:
            default_project = config.default_project

    # Create resolver with configuration and resolve
    resolver = ProjectResolver.from_env(
        cloud_mode=cloud_mode,
        default_project_mode=default_project_mode,
        default_project=default_project,
    )
    result = resolver.resolve(project=project, allow_discovery=allow_discovery)
    return result.project


async def get_project_names(client: AsyncClient, headers: HeaderTypes | None = None) -> List[str]:
    # Deferred import to avoid circular dependency with tools
    from basic_memory.mcp.tools.utils import call_get

    response = await call_get(client, "/projects/projects", headers=headers)
    project_list = ProjectList.model_validate(response.json())
    return [project.name for project in project_list.projects]


async def get_active_project(
    client: AsyncClient,
    project: Optional[str] = None,
    context: Optional[Context] = None,
    headers: HeaderTypes | None = None,
) -> ProjectItem:
    """Get and validate project, setting it in context if available.

    Args:
        client: HTTP client for API calls
        project: Optional project name (resolved using hierarchy)
        context: Optional FastMCP context to cache the result

    Returns:
        The validated project item

    Raises:
        ValueError: If no project can be resolved
        HTTPError: If project doesn't exist or is inaccessible
    """
    # Deferred import to avoid circular dependency with tools
    from basic_memory.mcp.tools.utils import call_get

    resolved_project = await resolve_project_parameter(project)
    if not resolved_project:
        project_names = await get_project_names(client, headers)
        raise ValueError(
            "No project specified. "
            "Either set 'default_project_mode=true' in config, or use 'project' argument.\n"
            f"Available projects: {project_names}"
        )

    project = resolved_project

    # Check if already cached in context
    if context:
        cached_project = context.get_state("active_project")
        if cached_project and cached_project.name == project:
            logger.debug(f"Using cached project from context: {project}")
            return cached_project

    # Validate project exists by calling API
    logger.debug(f"Validating project: {project}")
    permalink = generate_permalink(project)
    response = await call_get(client, f"/{permalink}/project/item", headers=headers)
    active_project = ProjectItem.model_validate(response.json())

    # Cache in context if available
    if context:
        context.set_state("active_project", active_project)
        logger.debug(f"Cached project in context: {project}")

    logger.debug(f"Validated project: {active_project.name}")
    return active_project


def add_project_metadata(result: str, project_name: str) -> str:
    """Add project context as metadata footer for assistant session tracking.

    Provides clear project context to help the assistant remember which
    project is being used throughout the conversation session.

    Args:
        result: The tool result string
        project_name: The project name that was used

    Returns:
        Result with project session tracking metadata
    """
    return f"{result}\n\n[Session: Using project '{project_name}']"

```

--------------------------------------------------------------------------------
/test-int/test_db_wal_mode.py:
--------------------------------------------------------------------------------

```python
"""Integration tests for WAL mode and Windows-specific SQLite optimizations.

These tests use real filesystem databases (not in-memory) to verify WAL mode
and other SQLite configuration settings work correctly in production scenarios.
"""

import pytest
from sqlalchemy import text


@pytest.mark.asyncio
async def test_wal_mode_enabled(engine_factory, db_backend):
    """Test that WAL mode is enabled on filesystem database connections."""
    if db_backend == "postgres":
        pytest.skip("SQLite-specific test - PRAGMA commands not supported in Postgres")

    engine, _ = engine_factory

    # Execute a query to verify WAL mode is enabled
    async with engine.connect() as conn:
        result = await conn.execute(text("PRAGMA journal_mode"))
        journal_mode = result.fetchone()[0]

        # WAL mode should be enabled for filesystem databases
        assert journal_mode.upper() == "WAL"


@pytest.mark.asyncio
async def test_busy_timeout_configured(engine_factory, db_backend):
    """Test that busy timeout is configured for database connections."""
    if db_backend == "postgres":
        pytest.skip("SQLite-specific test - PRAGMA commands not supported in Postgres")

    engine, _ = engine_factory

    async with engine.connect() as conn:
        result = await conn.execute(text("PRAGMA busy_timeout"))
        busy_timeout = result.fetchone()[0]

        # Busy timeout should be 10 seconds (10000 milliseconds)
        assert busy_timeout == 10000


@pytest.mark.asyncio
async def test_synchronous_mode_configured(engine_factory, db_backend):
    """Test that synchronous mode is set to NORMAL for performance."""
    if db_backend == "postgres":
        pytest.skip("SQLite-specific test - PRAGMA commands not supported in Postgres")

    engine, _ = engine_factory

    async with engine.connect() as conn:
        result = await conn.execute(text("PRAGMA synchronous"))
        synchronous = result.fetchone()[0]

        # Synchronous should be NORMAL (1)
        assert synchronous == 1


@pytest.mark.asyncio
async def test_cache_size_configured(engine_factory, db_backend):
    """Test that cache size is configured for performance."""
    if db_backend == "postgres":
        pytest.skip("SQLite-specific test - PRAGMA commands not supported in Postgres")

    engine, _ = engine_factory

    async with engine.connect() as conn:
        result = await conn.execute(text("PRAGMA cache_size"))
        cache_size = result.fetchone()[0]

        # Cache size should be -64000 (64MB)
        assert cache_size == -64000


@pytest.mark.asyncio
async def test_temp_store_configured(engine_factory, db_backend):
    """Test that temp_store is set to MEMORY."""
    if db_backend == "postgres":
        pytest.skip("SQLite-specific test - PRAGMA commands not supported in Postgres")

    engine, _ = engine_factory

    async with engine.connect() as conn:
        result = await conn.execute(text("PRAGMA temp_store"))
        temp_store = result.fetchone()[0]

        # temp_store should be MEMORY (2)
        assert temp_store == 2


@pytest.mark.asyncio
@pytest.mark.windows
@pytest.mark.skipif(
    __import__("os").name != "nt", reason="Windows-specific test - only runs on Windows platform"
)
async def test_windows_locking_mode_when_on_windows(tmp_path, monkeypatch, config_manager):
    """Test that Windows-specific locking mode is set when running on Windows."""
    from basic_memory.db import engine_session_factory, DatabaseType
    from basic_memory.config import DatabaseBackend

    # Force SQLite backend for this SQLite-specific test
    config_manager.config.database_backend = DatabaseBackend.SQLITE

    # Set HOME environment variable
    monkeypatch.setenv("HOME", str(tmp_path))
    monkeypatch.setenv("BASIC_MEMORY_HOME", str(tmp_path / "basic-memory"))

    db_path = tmp_path / "test_windows.db"

    async with engine_session_factory(db_path, DatabaseType.FILESYSTEM) as (
        engine,
        _,
    ):
        async with engine.connect() as conn:
            result = await conn.execute(text("PRAGMA locking_mode"))
            locking_mode = result.fetchone()[0]

            # Locking mode should be NORMAL on Windows
            assert locking_mode.upper() == "NORMAL"


@pytest.mark.asyncio
@pytest.mark.windows
@pytest.mark.skipif(
    __import__("os").name != "nt", reason="Windows-specific test - only runs on Windows platform"
)
async def test_null_pool_on_windows(tmp_path, monkeypatch):
    """Test that NullPool is used on Windows to avoid connection pooling issues."""
    from basic_memory.db import engine_session_factory, DatabaseType
    from sqlalchemy.pool import NullPool

    # Set HOME environment variable
    monkeypatch.setenv("HOME", str(tmp_path))
    monkeypatch.setenv("BASIC_MEMORY_HOME", str(tmp_path / "basic-memory"))

    db_path = tmp_path / "test_windows_pool.db"

    async with engine_session_factory(db_path, DatabaseType.FILESYSTEM) as (engine, _):
        # Engine should be using NullPool on Windows
        assert isinstance(engine.pool, NullPool)


@pytest.mark.asyncio
@pytest.mark.windows
@pytest.mark.skipif(
    __import__("os").name != "nt", reason="Windows-specific test - only runs on Windows platform"
)
async def test_memory_database_no_null_pool_on_windows(tmp_path, monkeypatch):
    """Test that in-memory databases do NOT use NullPool even on Windows.

    NullPool closes connections immediately, which destroys in-memory databases.
    This test ensures in-memory databases maintain connection pooling.
    """
    from basic_memory.db import engine_session_factory, DatabaseType
    from sqlalchemy.pool import NullPool

    # Set HOME environment variable
    monkeypatch.setenv("HOME", str(tmp_path))
    monkeypatch.setenv("BASIC_MEMORY_HOME", str(tmp_path / "basic-memory"))

    db_path = tmp_path / "test_memory.db"

    async with engine_session_factory(db_path, DatabaseType.MEMORY) as (engine, _):
        # In-memory databases should NOT use NullPool on Windows
        assert not isinstance(engine.pool, NullPool)

```

--------------------------------------------------------------------------------
/src/basic_memory/repository/relation_repository.py:
--------------------------------------------------------------------------------

```python
"""Repository for managing Relation objects."""

from typing import Sequence, List, Optional, Any, cast

from sqlalchemy import and_, delete, select
from sqlalchemy.engine import CursorResult
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.dialects.sqlite import insert as sqlite_insert
from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy.orm import selectinload, aliased
from sqlalchemy.orm.interfaces import LoaderOption

from basic_memory import db
from basic_memory.models import Relation, Entity
from basic_memory.repository.repository import Repository


class RelationRepository(Repository[Relation]):
    """Repository for Relation model with memory-specific operations."""

    def __init__(self, session_maker: async_sessionmaker, project_id: int):
        """Initialize with session maker and project_id filter.

        Args:
            session_maker: SQLAlchemy session maker
            project_id: Project ID to filter all operations by
        """
        super().__init__(session_maker, Relation, project_id=project_id)

    async def find_relation(
        self, from_permalink: str, to_permalink: str, relation_type: str
    ) -> Optional[Relation]:
        """Find a relation by its from and to path IDs."""
        from_entity = aliased(Entity)
        to_entity = aliased(Entity)

        query = (
            select(Relation)
            .join(from_entity, Relation.from_id == from_entity.id)
            .join(to_entity, Relation.to_id == to_entity.id)
            .where(
                and_(
                    from_entity.permalink == from_permalink,
                    to_entity.permalink == to_permalink,
                    Relation.relation_type == relation_type,
                )
            )
        )
        return await self.find_one(query)

    async def find_by_entities(self, from_id: int, to_id: int) -> Sequence[Relation]:
        """Find all relations between two entities."""
        query = select(Relation).where((Relation.from_id == from_id) & (Relation.to_id == to_id))
        result = await self.execute_query(query)
        return result.scalars().all()

    async def find_by_type(self, relation_type: str) -> Sequence[Relation]:
        """Find all relations of a specific type."""
        query = select(Relation).filter(Relation.relation_type == relation_type)
        result = await self.execute_query(query)
        return result.scalars().all()

    async def delete_outgoing_relations_from_entity(self, entity_id: int) -> None:
        """Delete outgoing relations for an entity.

        Only deletes relations where this entity is the source (from_id),
        as these are the ones owned by this entity's markdown file.
        """
        async with db.scoped_session(self.session_maker) as session:
            await session.execute(delete(Relation).where(Relation.from_id == entity_id))

    async def find_unresolved_relations(self) -> Sequence[Relation]:
        """Find all unresolved relations, where to_id is null."""
        query = select(Relation).filter(Relation.to_id.is_(None))
        result = await self.execute_query(query)
        return result.scalars().all()

    async def find_unresolved_relations_for_entity(self, entity_id: int) -> Sequence[Relation]:
        """Find unresolved relations for a specific entity.

        Args:
            entity_id: The entity whose unresolved outgoing relations to find.

        Returns:
            List of unresolved relations where this entity is the source.
        """
        query = select(Relation).filter(Relation.from_id == entity_id, Relation.to_id.is_(None))
        result = await self.execute_query(query)
        return result.scalars().all()

    async def add_all_ignore_duplicates(self, relations: List[Relation]) -> int:
        """Bulk insert relations, ignoring duplicates.

        Uses ON CONFLICT DO NOTHING to skip relations that would violate the
        unique constraint on (from_id, to_name, relation_type). This is useful
        for bulk operations where the same link may appear multiple times in
        a document.

        Works with both SQLite and PostgreSQL dialects.

        Args:
            relations: List of Relation objects to insert

        Returns:
            Number of relations actually inserted (excludes duplicates)
        """
        if not relations:
            return 0

        # Convert Relation objects to dicts for insert
        values = [
            {
                "project_id": r.project_id if r.project_id else self.project_id,
                "from_id": r.from_id,
                "to_id": r.to_id,
                "to_name": r.to_name,
                "relation_type": r.relation_type,
                "context": r.context,
            }
            for r in relations
        ]

        async with db.scoped_session(self.session_maker) as session:
            # Check dialect to use appropriate insert
            dialect_name = session.bind.dialect.name if session.bind else "sqlite"

            if dialect_name == "postgresql":  # pragma: no cover
                # PostgreSQL: use RETURNING to count inserted rows
                # (rowcount is 0 for ON CONFLICT DO NOTHING)
                stmt = (  # pragma: no cover
                    pg_insert(Relation)
                    .values(values)
                    .on_conflict_do_nothing()
                    .returning(Relation.id)
                )
                result = await session.execute(stmt)  # pragma: no cover
                return len(result.fetchall())  # pragma: no cover
            else:
                # SQLite: rowcount works correctly
                stmt = sqlite_insert(Relation).values(values)
                stmt = stmt.on_conflict_do_nothing()
                result = cast(CursorResult[Any], await session.execute(stmt))
                return result.rowcount if result.rowcount > 0 else 0

    def get_load_options(self) -> List[LoaderOption]:
        return [selectinload(Relation.from_entity), selectinload(Relation.to_entity)]

```

--------------------------------------------------------------------------------
/test-int/mcp/test_build_context_underscore.py:
--------------------------------------------------------------------------------

```python
"""Integration test for build_context with underscore in memory:// URLs."""

import pytest
from fastmcp import Client


@pytest.mark.asyncio
async def test_build_context_underscore_normalization(mcp_server, app, test_project):
    """Test that build_context normalizes underscores in relation types."""

    async with Client(mcp_server) as client:
        # Create parent note
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Parent Entity",
                "folder": "testing",
                "content": "# Parent Entity\n\nMain entity for testing underscore relations.",
                "tags": "test,parent",
            },
        )

        # Create child notes with different relation formats
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Child with Underscore",
                "folder": "testing",
                "content": """# Child with Underscore

- part_of [[Parent Entity]]
- related_to [[Parent Entity]]
                """,
                "tags": "test,child",
            },
        )

        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Child with Hyphen",
                "folder": "testing",
                "content": """# Child with Hyphen

- part-of [[Parent Entity]]
- related-to [[Parent Entity]]
                """,
                "tags": "test,child",
            },
        )

        # Test 1: Search with underscore format should return results
        # Relation permalinks are: source/relation_type/target
        # So child-with-underscore/part-of/parent-entity
        result_underscore = await client.call_tool(
            "build_context",
            {
                "project": test_project.name,
                "url": "memory://testing/*/part_of/*parent*",  # Using underscore
            },
        )

        # Parse response
        assert len(result_underscore.content) == 1
        response_text = result_underscore.content[0].text  # pyright: ignore
        assert '"results"' in response_text

        # Both relations should be found since they both connect to parent-entity
        # The system should normalize the underscore to hyphen internally
        assert "part-of" in response_text.lower()

        # Test 2: Search with hyphen format should also return results
        result_hyphen = await client.call_tool(
            "build_context",
            {
                "project": test_project.name,
                "url": "memory://testing/*/part-of/*parent*",  # Using hyphen
            },
        )

        response_text_hyphen = result_hyphen.content[0].text  # pyright: ignore
        assert '"results"' in response_text_hyphen
        assert "part-of" in response_text_hyphen.lower()

        # Test 3: Test with related_to/related-to as well
        result_related = await client.call_tool(
            "build_context",
            {
                "project": test_project.name,
                "url": "memory://testing/*/related_to/*parent*",  # Using underscore
            },
        )

        response_text_related = result_related.content[0].text  # pyright: ignore
        assert '"results"' in response_text_related
        assert "related-to" in response_text_related.lower()

        # Test 4: Test exact path (non-wildcard) with underscore
        # Exact relation permalink would be child/relation/target
        result_exact = await client.call_tool(
            "build_context",
            {
                "project": test_project.name,
                "url": "memory://testing/child-with-underscore/part_of/testing/parent-entity",
            },
        )

        response_text_exact = result_exact.content[0].text  # pyright: ignore
        assert '"results"' in response_text_exact
        assert "part-of" in response_text_exact.lower()


@pytest.mark.asyncio
async def test_build_context_complex_underscore_paths(mcp_server, app, test_project):
    """Test build_context with complex paths containing underscores."""

    async with Client(mcp_server) as client:
        # Create notes with underscores in titles and relations
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "workflow_manager_agent",
                "folder": "specs",
                "content": """# Workflow Manager Agent

Specification for the workflow manager agent.
                """,
                "tags": "spec,workflow",
            },
        )

        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "task_parser",
                "folder": "components",
                "content": """# Task Parser

- part_of [[workflow_manager_agent]]
- implements_for [[workflow_manager_agent]]
                """,
                "tags": "component,parser",
            },
        )

        # Test with underscores in all parts of the path
        # Relations are created as: task-parser/part-of/workflow-manager-agent
        # So search for */part_of/* or */part-of/* to find them
        test_cases = [
            "memory://components/*/part_of/*workflow*",
            "memory://components/*/part-of/*workflow*",
            "memory://*/task*/part_of/*",
            "memory://*/task*/part-of/*",
        ]

        for url in test_cases:
            result = await client.call_tool(
                "build_context", {"project": test_project.name, "url": url}
            )

            # All variations should work and find the related content
            assert len(result.content) == 1
            response = result.content[0].text  # pyright: ignore
            assert '"results"' in response
            # The relation should be found showing part-of connection
            assert "part-of" in response.lower(), f"Failed for URL: {url}"

```

--------------------------------------------------------------------------------
/tests/mcp/test_tool_utils.py:
--------------------------------------------------------------------------------

```python
"""Tests for MCP tool utilities."""

import pytest
from httpx import HTTPStatusError
from mcp.server.fastmcp.exceptions import ToolError

from basic_memory.mcp.tools.utils import (
    call_get,
    call_post,
    call_put,
    call_delete,
    get_error_message,
)


@pytest.fixture
def mock_response(monkeypatch):
    """Create a mock response."""

    class MockResponse:
        def __init__(self, status_code=200):
            self.status_code = status_code
            self.is_success = status_code < 400
            self.json = lambda: {}

        def raise_for_status(self):
            if self.status_code >= 400:
                raise HTTPStatusError(
                    message=f"HTTP Error {self.status_code}", request=None, response=self
                )

    return MockResponse


class _Client:
    def __init__(self):
        self.calls: list[tuple[str, tuple, dict]] = []
        self._responses: dict[str, object] = {}

    def set_response(self, method: str, response):
        self._responses[method.lower()] = response

    async def get(self, *args, **kwargs):
        self.calls.append(("get", args, kwargs))
        return self._responses["get"]

    async def post(self, *args, **kwargs):
        self.calls.append(("post", args, kwargs))
        return self._responses["post"]

    async def put(self, *args, **kwargs):
        self.calls.append(("put", args, kwargs))
        return self._responses["put"]

    async def delete(self, *args, **kwargs):
        self.calls.append(("delete", args, kwargs))
        return self._responses["delete"]


@pytest.mark.asyncio
async def test_call_get_success(mock_response):
    """Test successful GET request."""
    client = _Client()
    client.set_response("get", mock_response())

    response = await call_get(client, "http://test.com")
    assert response.status_code == 200


@pytest.mark.asyncio
async def test_call_get_error(mock_response):
    """Test GET request with error."""
    client = _Client()
    client.set_response("get", mock_response(404))

    with pytest.raises(ToolError) as exc:
        await call_get(client, "http://test.com")
    assert "Resource not found" in str(exc.value)


@pytest.mark.asyncio
async def test_call_post_success(mock_response):
    """Test successful POST request."""
    client = _Client()
    response = mock_response()
    response.json = lambda: {"test": "data"}
    client.set_response("post", response)

    response = await call_post(client, "http://test.com", json={"test": "data"})
    assert response.status_code == 200


@pytest.mark.asyncio
async def test_call_post_error(mock_response):
    """Test POST request with error."""
    client = _Client()
    response = mock_response(500)
    response.json = lambda: {"test": "error"}

    client.set_response("post", response)

    with pytest.raises(ToolError) as exc:
        await call_post(client, "http://test.com", json={"test": "data"})
    assert "Internal server error" in str(exc.value)


@pytest.mark.asyncio
async def test_call_put_success(mock_response):
    """Test successful PUT request."""
    client = _Client()
    client.set_response("put", mock_response())

    response = await call_put(client, "http://test.com", json={"test": "data"})
    assert response.status_code == 200


@pytest.mark.asyncio
async def test_call_put_error(mock_response):
    """Test PUT request with error."""
    client = _Client()
    client.set_response("put", mock_response(400))

    with pytest.raises(ToolError) as exc:
        await call_put(client, "http://test.com", json={"test": "data"})
    assert "Invalid request" in str(exc.value)


@pytest.mark.asyncio
async def test_call_delete_success(mock_response):
    """Test successful DELETE request."""
    client = _Client()
    client.set_response("delete", mock_response())

    response = await call_delete(client, "http://test.com")
    assert response.status_code == 200


@pytest.mark.asyncio
async def test_call_delete_error(mock_response):
    """Test DELETE request with error."""
    client = _Client()
    client.set_response("delete", mock_response(403))

    with pytest.raises(ToolError) as exc:
        await call_delete(client, "http://test.com")
    assert "Access denied" in str(exc.value)


@pytest.mark.asyncio
async def test_call_get_with_params(mock_response):
    """Test GET request with query parameters."""
    client = _Client()
    client.set_response("get", mock_response())

    params = {"key": "value", "test": "data"}
    await call_get(client, "http://test.com", params=params)

    assert len(client.calls) == 1
    method, _args, kwargs = client.calls[0]
    assert method == "get"
    assert kwargs["params"] == params


@pytest.mark.asyncio
async def test_get_error_message():
    """Test the get_error_message function."""

    # Test 400 status code
    message = get_error_message(400, "http://test.com/resource", "GET")
    assert "Invalid request" in message
    assert "resource" in message

    # Test 404 status code
    message = get_error_message(404, "http://test.com/missing", "GET")
    assert "Resource not found" in message
    assert "missing" in message

    # Test 500 status code
    message = get_error_message(500, "http://test.com/server", "POST")
    assert "Internal server error" in message
    assert "server" in message

    # Test URL object handling
    from httpx import URL

    url = URL("http://test.com/complex/path")
    message = get_error_message(403, url, "DELETE")
    assert "Access denied" in message
    assert "path" in message


@pytest.mark.asyncio
async def test_call_post_with_json(mock_response):
    """Test POST request with JSON payload."""
    client = _Client()
    response = mock_response()
    response.json = lambda: {"test": "data"}

    client.set_response("post", response)

    json_data = {"key": "value", "nested": {"test": "data"}}
    await call_post(client, "http://test.com", json=json_data)

    assert len(client.calls) == 1
    method, _args, kwargs = client.calls[0]
    assert method == "post"
    assert kwargs["json"] == json_data

```

--------------------------------------------------------------------------------
/test-int/test_disable_permalinks_integration.py:
--------------------------------------------------------------------------------

```python
"""Integration tests for the disable_permalinks configuration."""

import pytest

from basic_memory.markdown import EntityParser, MarkdownProcessor
from basic_memory.repository import (
    EntityRepository,
    ObservationRepository,
    RelationRepository,
    ProjectRepository,
)
from basic_memory.repository.postgres_search_repository import PostgresSearchRepository
from basic_memory.repository.sqlite_search_repository import SQLiteSearchRepository
from basic_memory.schemas import Entity as EntitySchema
from basic_memory.services import FileService
from basic_memory.services.entity_service import EntityService
from basic_memory.services.link_resolver import LinkResolver
from basic_memory.services.search_service import SearchService
from basic_memory.sync.sync_service import SyncService


@pytest.mark.asyncio
async def test_disable_permalinks_create_entity(tmp_path, engine_factory, app_config, test_project):
    """Test that entities created with disable_permalinks=True don't have permalinks."""
    from basic_memory.config import DatabaseBackend

    engine, session_maker = engine_factory

    # Override app config to enable disable_permalinks
    app_config.disable_permalinks = True

    # Setup repositories
    entity_repository = EntityRepository(session_maker, project_id=test_project.id)
    observation_repository = ObservationRepository(session_maker, project_id=test_project.id)
    relation_repository = RelationRepository(session_maker, project_id=test_project.id)

    # Use database-specific search repository
    if app_config.database_backend == DatabaseBackend.POSTGRES:
        search_repository = PostgresSearchRepository(session_maker, project_id=test_project.id)
    else:
        search_repository = SQLiteSearchRepository(session_maker, project_id=test_project.id)

    # Setup services
    entity_parser = EntityParser(tmp_path)
    markdown_processor = MarkdownProcessor(entity_parser)
    file_service = FileService(tmp_path, markdown_processor)
    search_service = SearchService(search_repository, entity_repository, file_service)
    await search_service.init_search_index()
    link_resolver = LinkResolver(entity_repository, search_service)

    entity_service = EntityService(
        entity_parser=entity_parser,
        entity_repository=entity_repository,
        observation_repository=observation_repository,
        relation_repository=relation_repository,
        file_service=file_service,
        link_resolver=link_resolver,
        app_config=app_config,
    )

    # Create entity via API
    entity_data = EntitySchema(
        title="Test Note",
        folder="test",
        entity_type="note",
        content="Test content",
    )

    created = await entity_service.create_entity(entity_data)

    # Verify entity has no permalink
    assert created.permalink is None

    # Verify file has no permalink in frontmatter
    file_path = tmp_path / "test" / "Test Note.md"
    assert file_path.exists()
    content = file_path.read_text()
    assert "permalink:" not in content
    assert "Test content" in content


@pytest.mark.asyncio
async def test_disable_permalinks_sync_workflow(tmp_path, engine_factory, app_config, test_project):
    """Test full sync workflow with disable_permalinks enabled."""
    from basic_memory.config import DatabaseBackend

    engine, session_maker = engine_factory

    # Override app config to enable disable_permalinks
    app_config.disable_permalinks = True

    # Create a test markdown file without frontmatter
    test_file = tmp_path / "test_note.md"
    test_file.write_text("# Test Note\nThis is test content.")

    # Setup repositories
    entity_repository = EntityRepository(session_maker, project_id=test_project.id)
    observation_repository = ObservationRepository(session_maker, project_id=test_project.id)
    relation_repository = RelationRepository(session_maker, project_id=test_project.id)

    # Use database-specific search repository
    if app_config.database_backend == DatabaseBackend.POSTGRES:
        search_repository = PostgresSearchRepository(session_maker, project_id=test_project.id)
    else:
        search_repository = SQLiteSearchRepository(session_maker, project_id=test_project.id)

    project_repository = ProjectRepository(session_maker)

    # Setup services
    entity_parser = EntityParser(tmp_path)
    markdown_processor = MarkdownProcessor(entity_parser)
    file_service = FileService(tmp_path, markdown_processor)
    search_service = SearchService(search_repository, entity_repository, file_service)
    await search_service.init_search_index()
    link_resolver = LinkResolver(entity_repository, search_service)

    entity_service = EntityService(
        entity_parser=entity_parser,
        entity_repository=entity_repository,
        observation_repository=observation_repository,
        relation_repository=relation_repository,
        file_service=file_service,
        link_resolver=link_resolver,
        app_config=app_config,
    )

    sync_service = SyncService(
        app_config=app_config,
        entity_service=entity_service,
        project_repository=project_repository,
        entity_parser=entity_parser,
        entity_repository=entity_repository,
        relation_repository=relation_repository,
        search_service=search_service,
        file_service=file_service,
    )

    # Run sync
    report = await sync_service.scan(tmp_path)
    # Note: scan may pick up database files too, so just check our file is there
    assert "test_note.md" in report.new

    # Sync the file
    await sync_service.sync_file("test_note.md", new=True)

    # Verify file has no permalink added
    content = test_file.read_text()
    assert "permalink:" not in content
    assert "# Test Note" in content

    # Verify entity in database has no permalink
    entities = await entity_repository.find_all()
    assert len(entities) == 1
    assert entities[0].permalink is None
    # Title is extracted from filename when no frontmatter, or from frontmatter when present
    assert entities[0].title in ("test_note", "Test Note")

```

--------------------------------------------------------------------------------
/tests/repository/test_repository.py:
--------------------------------------------------------------------------------

```python
"""Test repository implementation."""

from datetime import datetime, UTC
import pytest
from sqlalchemy import String, DateTime
from sqlalchemy.orm import Mapped, mapped_column

from basic_memory.models import Base
from basic_memory.repository.repository import Repository


class ModelTest(Base):
    """Test model for repository tests."""

    __tablename__ = "test_model"

    id: Mapped[str] = mapped_column(String(255), primary_key=True)
    name: Mapped[str] = mapped_column(String(255))
    description: Mapped[str | None] = mapped_column(String(255), nullable=True)
    created_at: Mapped[datetime] = mapped_column(
        DateTime, default=lambda: datetime.now(UTC).replace(tzinfo=None)
    )
    updated_at: Mapped[datetime] = mapped_column(
        DateTime,
        default=lambda: datetime.now(UTC).replace(tzinfo=None),
        onupdate=lambda: datetime.now(UTC).replace(tzinfo=None),
    )


@pytest.fixture
def repository(session_maker):
    """Create a test repository."""
    return Repository(session_maker, ModelTest)


@pytest.mark.asyncio
async def test_add(repository):
    """Test bulk creation of entities."""
    # Create test instances
    instance = ModelTest(id="test_add", name="Test Add")
    await repository.add(instance)

    # Verify we can find in db
    found = await repository.find_by_id("test_add")
    assert found is not None
    assert found.name == "Test Add"


@pytest.mark.asyncio
async def test_add_all(repository):
    """Test bulk creation of entities."""
    # Create test instances
    instances = [ModelTest(id=f"test_{i}", name=f"Test {i}") for i in range(3)]
    await repository.add_all(instances)

    # Verify we can find them in db
    found = await repository.find_by_id("test_0")
    assert found is not None
    assert found.name == "Test 0"


@pytest.mark.asyncio
async def test_bulk_create(repository):
    """Test bulk creation of entities."""
    # Create test instances
    instances = [ModelTest(id=f"test_{i}", name=f"Test {i}") for i in range(3)]

    # Bulk create
    await repository.create_all([instance.__dict__ for instance in instances])

    # Verify we can find them in db
    found = await repository.find_by_id("test_0")
    assert found is not None
    assert found.name == "Test 0"


@pytest.mark.asyncio
async def test_find_all(repository):
    """Test finding multiple entities by IDs."""
    # Create test data
    instances = [ModelTest(id=f"test_{i}", name=f"Test {i}") for i in range(5)]
    await repository.create_all([instance.__dict__ for instance in instances])

    found = await repository.find_all(limit=3)
    assert len(found) == 3


@pytest.mark.asyncio
async def test_find_by_ids(repository):
    """Test finding multiple entities by IDs."""
    # Create test data
    instances = [ModelTest(id=f"test_{i}", name=f"Test {i}") for i in range(5)]
    await repository.create_all([instance.__dict__ for instance in instances])

    # Test finding subset of entities
    ids_to_find = ["test_0", "test_2", "test_4"]
    found = await repository.find_by_ids(ids_to_find)
    assert len(found) == 3
    assert sorted([e.id for e in found]) == sorted(ids_to_find)

    # Test finding with some non-existent IDs
    mixed_ids = ["test_0", "nonexistent", "test_4"]
    partial_found = await repository.find_by_ids(mixed_ids)
    assert len(partial_found) == 2
    assert sorted([e.id for e in partial_found]) == ["test_0", "test_4"]

    # Test with empty list
    empty_found = await repository.find_by_ids([])
    assert len(empty_found) == 0

    # Test with all non-existent IDs
    not_found = await repository.find_by_ids(["fake1", "fake2"])
    assert len(not_found) == 0


@pytest.mark.asyncio
async def test_delete_by_ids(repository):
    """Test finding multiple entities by IDs."""
    # Create test data
    instances = [ModelTest(id=f"test_{i}", name=f"Test {i}") for i in range(5)]
    await repository.create_all([instance.__dict__ for instance in instances])

    # Test delete subset of entities
    ids_to_delete = ["test_0", "test_2", "test_4"]
    deleted_count = await repository.delete_by_ids(ids_to_delete)
    assert deleted_count == 3

    # Test finding subset of entities
    ids_to_find = ["test_1", "test_3"]
    found = await repository.find_by_ids(ids_to_find)
    assert len(found) == 2
    assert sorted([e.id for e in found]) == sorted(ids_to_find)

    assert await repository.find_by_id(ids_to_delete[0]) is None
    assert await repository.find_by_id(ids_to_delete[1]) is None
    assert await repository.find_by_id(ids_to_delete[2]) is None


@pytest.mark.asyncio
async def test_update(repository):
    """Test finding entities modified since a timestamp."""
    # Create initial test data
    instance = ModelTest(id="test_add", name="Test Add")
    await repository.add(instance)

    instance = ModelTest(id="test_add", name="Updated")

    # Find recently modified
    modified = await repository.update(instance.id, {"name": "Updated"})
    assert modified is not None
    assert modified.name == "Updated"


@pytest.mark.asyncio
async def test_update_model(repository):
    """Test finding entities modified since a timestamp."""
    # Create initial test data
    instance = ModelTest(id="test_add", name="Test Add")
    await repository.add(instance)

    instance.name = "Updated"

    # Find recently modified
    modified = await repository.update(instance.id, instance)
    assert modified is not None
    assert modified.name == "Updated"


@pytest.mark.asyncio
async def test_update_model_not_found(repository):
    """Test finding entities modified since a timestamp."""
    # Create initial test data
    instance = ModelTest(id="test_add", name="Test Add")
    await repository.add(instance)

    modified = await repository.update("0", {})  # Use string ID for Postgres compatibility
    assert modified is None


@pytest.mark.asyncio
async def test_count(repository):
    """Test bulk creation of entities."""
    # Create test instances
    instance = ModelTest(id="test_add", name="Test Add")
    await repository.add(instance)

    # Verify we can count in db
    count = await repository.count()
    assert count == 1

```

--------------------------------------------------------------------------------
/tests/mcp/test_obsidian_yaml_formatting.py:
--------------------------------------------------------------------------------

```python
"""Integration tests for Obsidian-compatible YAML formatting in write_note tool."""

import pytest

from basic_memory.mcp.tools import write_note


@pytest.mark.asyncio
async def test_write_note_tags_yaml_format(app, project_config, test_project):
    """Test that write_note creates files with proper YAML list format for tags."""
    # Create a note with tags using write_note
    result = await write_note.fn(
        project=test_project.name,
        title="YAML Format Test",
        folder="test",
        content="Testing YAML tag formatting",
        tags=["system", "overview", "reference"],
    )

    # Verify the note was created successfully
    assert "Created note" in result
    assert "file_path: test/YAML Format Test.md" in result

    # Read the file directly to check YAML formatting
    file_path = project_config.home / "test" / "YAML Format Test.md"
    content = file_path.read_text(encoding="utf-8")

    # Should use YAML list format
    assert "tags:" in content
    assert "- system" in content
    assert "- overview" in content
    assert "- reference" in content

    # Should NOT use JSON array format
    assert '["system"' not in content
    assert '"overview"' not in content
    assert '"reference"]' not in content


@pytest.mark.asyncio
async def test_write_note_stringified_json_tags(app, project_config, test_project):
    """Test that stringified JSON arrays are handled correctly."""
    # This simulates the issue where AI assistants pass tags as stringified JSON
    result = await write_note.fn(
        project=test_project.name,
        title="Stringified JSON Test",
        folder="test",
        content="Testing stringified JSON tag input",
        tags='["python", "testing", "json"]',  # Stringified JSON array
    )

    # Verify the note was created successfully
    assert "Created note" in result

    # Read the file to check formatting
    file_path = project_config.home / "test" / "Stringified JSON Test.md"
    content = file_path.read_text(encoding="utf-8")

    # Should properly parse the JSON and format as YAML list
    assert "tags:" in content
    assert "- python" in content
    assert "- testing" in content
    assert "- json" in content

    # Should NOT have the original stringified format issues
    assert '["python"' not in content
    assert '"testing"' not in content
    assert '"json"]' not in content


@pytest.mark.asyncio
async def test_write_note_single_tag_yaml_format(app, project_config, test_project):
    """Test that single tags are still formatted as YAML lists."""
    await write_note.fn(
        project=test_project.name,
        title="Single Tag Test",
        folder="test",
        content="Testing single tag formatting",
        tags=["solo-tag"],
    )

    file_path = project_config.home / "test" / "Single Tag Test.md"
    content = file_path.read_text(encoding="utf-8")

    # Single tag should still use list format
    assert "tags:" in content
    assert "- solo-tag" in content


@pytest.mark.asyncio
async def test_write_note_no_tags(app, project_config, test_project):
    """Test that notes without tags work normally."""
    await write_note.fn(
        project=test_project.name,
        title="No Tags Test",
        folder="test",
        content="Testing note without tags",
        tags=None,
    )

    file_path = project_config.home / "test" / "No Tags Test.md"
    content = file_path.read_text(encoding="utf-8")

    # Should not have tags field in frontmatter
    assert "tags:" not in content
    assert "title: No Tags Test" in content


@pytest.mark.asyncio
async def test_write_note_empty_tags_list(app, project_config, test_project):
    """Test that empty tag lists are handled properly."""
    await write_note.fn(
        project=test_project.name,
        title="Empty Tags Test",
        folder="test",
        content="Testing empty tag list",
        tags=[],
    )

    file_path = project_config.home / "test" / "Empty Tags Test.md"
    content = file_path.read_text(encoding="utf-8")

    # Should not add tags field to frontmatter for empty lists
    assert "tags:" not in content


@pytest.mark.asyncio
async def test_write_note_update_preserves_yaml_format(app, project_config, test_project):
    """Test that updating a note preserves the YAML list format."""
    # First, create the note
    await write_note.fn(
        project=test_project.name,
        title="Update Format Test",
        folder="test",
        content="Initial content",
        tags=["initial", "tag"],
    )

    # Then update it with new tags
    result = await write_note.fn(
        project=test_project.name,
        title="Update Format Test",
        folder="test",
        content="Updated content",
        tags=["updated", "new-tag", "format"],
    )

    # Should be an update, not a new creation
    assert "Updated note" in result

    # Check the file format
    file_path = project_config.home / "test" / "Update Format Test.md"
    content = file_path.read_text(encoding="utf-8")

    # Should have proper YAML formatting for updated tags
    assert "tags:" in content
    assert "- updated" in content
    assert "- new-tag" in content
    assert "- format" in content

    # Old tags should be gone
    assert "- initial" not in content
    assert "- tag" not in content

    # Content should be updated
    assert "Updated content" in content
    assert "Initial content" not in content


@pytest.mark.asyncio
async def test_complex_tags_yaml_format(app, project_config, test_project):
    """Test that complex tags with special characters format correctly."""
    await write_note.fn(
        project=test_project.name,
        title="Complex Tags Test",
        folder="test",
        content="Testing complex tag formats",
        tags=["python-3.9", "api_integration", "v2.0", "nested/category", "under_score"],
    )

    file_path = project_config.home / "test" / "Complex Tags Test.md"
    content = file_path.read_text(encoding="utf-8")

    # All complex tags should format correctly
    assert "- python-3.9" in content
    assert "- api_integration" in content
    assert "- v2.0" in content
    assert "- nested/category" in content
    assert "- under_score" in content

```

--------------------------------------------------------------------------------
/specs/SPEC-11 Basic Memory API Performance Optimization.md:
--------------------------------------------------------------------------------

```markdown
---
title: 'SPEC-11: Basic Memory API Performance Optimization'
type: spec
permalink: specs/spec-11-basic-memory-api-performance-optimization
tags:
- performance
- api
- mcp
- database
- cloud
---

# SPEC-11: Basic Memory API Performance Optimization

## Why

The Basic Memory API experiences significant performance issues in cloud environments due to expensive per-request initialization. MCP tools making
HTTP requests to the API suffer from 350ms-2.6s latency overhead **before** any actual operation occurs.

**Root Cause Analysis:**
- GitHub Issue #82 shows repeated initialization sequences in logs (16:29:35 and 16:49:58)
- Each MCP tool call triggers full database initialization + project reconciliation
- `get_engine_factory()` dependency calls `db.get_or_create_db()` on every request
- `reconcile_projects_with_config()` runs expensive sync operations repeatedly

**Performance Impact:**
- Database connection setup: ~50-100ms per request
- Migration checks: ~100-500ms per request
- Project reconciliation: ~200ms-2s per request
- **Total overhead**: ~350ms-2.6s per MCP tool call

This creates compounding effects with tenant auto-start delays and increases timeout risk in cloud deployments.

## What

This optimization affects the **core basic-memory repository** components:

1. **API Lifespan Management** (`src/basic_memory/api/app.py`)
 - Cache database connections in app state during startup
 - Avoid repeated expensive initialization

2. **Dependency Injection** (`src/basic_memory/deps.py`)
 - Modify `get_engine_factory()` to use cached connections
 - Eliminate per-request database setup

3. **Initialization Service** (`src/basic_memory/services/initialization.py`)
 - Add caching/throttling to project reconciliation
 - Skip expensive operations when appropriate

4. **Configuration** (`src/basic_memory/config.py`)
 - Add optional performance flags for cloud environments

**Backwards Compatibility**: All changes must be backwards compatible with existing CLI and non-cloud usage.

## How (High Level)

### Phase 1: Cache Database Connections (Critical - 80% of gains)

**Problem**: `get_engine_factory()` calls `db.get_or_create_db()` per request
**Solution**: Cache database engine/session in app state during lifespan

1. **Modify API Lifespan** (`api/app.py`):
 ```python
 @asynccontextmanager
 async def lifespan(app: FastAPI):
     app_config = ConfigManager().config
     await initialize_app(app_config)

     # Cache database connection in app state
     engine, session_maker = await db.get_or_create_db(app_config.database_path)
     app.state.engine = engine
     app.state.session_maker = session_maker

     # ... rest of startup logic
```

2. Modify Dependency Injection (deps.py):
```python
async def get_engine_factory(
  request: Request
) -> tuple[AsyncEngine, async_sessionmaker[AsyncSession]]:
  """Get cached engine and session maker from app state."""
  return request.app.state.engine, request.app.state.session_maker
```
Phase 2: Optimize Project Reconciliation (Secondary - 20% of gains)

Problem: reconcile_projects_with_config() runs expensive sync repeatedly
Solution: Add module-level caching with time-based throttling

1. Add Reconciliation Cache (services/initialization.py):
```ptyhon
_project_reconciliation_completed = False
_last_reconciliation_time = 0

async def reconcile_projects_with_config(app_config, force=False):
  # Skip if recently completed (within 60 seconds) unless forced
  if recently_completed and not force:
      return
  # ... existing logic
```
Phase 3: Cloud Environment Flags (Optional)

Problem: Force expensive initialization in production environments
Solution: Add skip flags for cloud/stateless deployments

1. Add Config Flag (config.py):
skip_initialization_sync: bool = Field(default=False)
2. Configure in Cloud (basic-memory-cloud integration):
BASIC_MEMORY_SKIP_INITIALIZATION_SYNC=true

How to Evaluate

Success Criteria

1. Performance Metrics (Primary):
- MCP tool response time reduced by 50%+ (measure before/after)
- Database connection overhead eliminated (0ms vs 50-100ms)
- Migration check overhead eliminated (0ms vs 100-500ms)
- Project reconciliation overhead reduced by 90%+
2. Load Testing:
- Concurrent MCP tool calls maintain performance
- No memory leaks in cached connections
- Database connection pool behaves correctly
3. Functional Correctness:
- All existing API endpoints work identically
- MCP tools maintain full functionality
- CLI operations unaffected
- Database migrations still execute properly
4. Backwards Compatibility:
- No breaking changes to existing APIs
- Config changes are optional with safe defaults
- Non-cloud deployments work unchanged

Testing Strategy

Performance Testing:
# Before optimization
time basic-memory-mcp-tools write_note "test" "content" "folder"
# Measure: ~1-3 seconds

# After optimization  
time basic-memory-mcp-tools write_note "test" "content" "folder"
# Target: <500ms

Load Testing:
# Multiple concurrent MCP tool calls
for i in {1..10}; do
basic-memory-mcp-tools search "test" &
done
wait
# Verify: No degradation, consistent response times

Regression Testing:
# Full basic-memory test suite
just test
# All tests must pass

# Integration tests with cloud deployment
# Verify MCP gateway → API → database flow works

Validation Checklist

- Phase 1 Complete: Database connections cached, dependency injection optimized
- Performance Benchmark: 50%+ improvement in MCP tool response times
- Memory Usage: No leaks in cached connections over 24h+ periods
- Stress Testing: 100+ concurrent requests maintain performance
- Backwards Compatibility: All existing functionality preserved
- Documentation: Performance optimization documented in README
- Cloud Integration: basic-memory-cloud sees performance benefits

Notes

Implementation Priority:
- Phase 1 provides 80% of performance gains and should be implemented first
- Phase 2 provides remaining 20% and addresses edge cases
- Phase 3 is optional for maximum cloud optimization

Risk Mitigation:
- All changes backwards compatible
- Gradual rollout possible (Phase 1 → 2 → 3)
- Easy rollback via configuration flags

Cloud Integration:
- This optimization directly addresses basic-memory-cloud issue #82
- Changes in core basic-memory will benefit all cloud tenants
- No changes needed in basic-memory-cloud itself

```

--------------------------------------------------------------------------------
/specs/SPEC-1 Specification-Driven Development Process.md:
--------------------------------------------------------------------------------

```markdown
---
title: 'SPEC-1: Specification-Driven Development Process'
type: spec
permalink: specs/spec-1-specification-driven-development-process
tags:
- process
- specification
- development
- meta
---

# SPEC-1: Specification-Driven Development Process

## Why
We're implementing specification-driven development to solve the complexity and circular refactoring issues in our web development process. 
Instead of getting lost in framework details and type gymnastics, we start with clear specifications that drive implementation.

The default approach of adhoc development with AI agents tends to result in:
- Circular refactoring cycles
- Fighting framework complexity
- Lost context between sessions
- Unclear requirements and scope

## What
This spec defines our process for using basic-memory as the specification engine to build basic-memory-cloud. 
We're creating a recursive development pattern where basic-memory manages the specs that drive the development of basic-memory-cloud.

**Affected Areas:**
- All future component development
- Architecture decisions
- Agent collaboration workflows
- Knowledge management and context preservation

## How (High Level)

### Specification Structure

Name: Spec names should be numbered sequentially, followed by a description eg. `SPEC-X - Simple Description.md`.
See: [[Spec-2: Slash Commands Reference]]

Every spec is a complete thought containing:
- **Why**: The reasoning and problem being solved
- **What**: What is affected or changed
- **How**: High-level approach to implementation
- **How to Evaluate**: Testing/validation procedure
- Additional context as needed

### Living Specification Format

Specifications are **living documents** that evolve throughout implementation:

**Progress Tracking:**
- **Completed items**: Use ✅ checkmark emoji for implemented features
- **Pending items**: Use `- [ ]` GitHub-style checkboxes for remaining tasks
- **In-progress items**: Use `- [x]` when work is actively underway

**Status Philosophy:**
- **Avoid static status headers** like "COMPLETE" or "IN PROGRESS" that become stale
- **Use checklists within content** to show granular implementation progress
- **Keep specs informative** while providing clear progress visibility
- **Update continuously** as understanding and implementation evolve

**Example Format:**
```markdown
### ComponentName
- ✅ Basic functionality implemented
- ✅ Props and events defined
- - [ ] Add sorting controls
- - [ ] Improve accessibility
- - [x] Currently implementing responsive design
```

This creates **git-friendly progress tracking** where `[ ]` easily becomes `[x]` or ✅ when completed, and specs remain valuable throughout the development lifecycle.


## Claude Code 

We will leverage Claude Code capabilities to make the process semi-automated. 

- Slash commands: define repeatable steps in the process (create spec, implement, review, etc)
- Agents: define roles to carry out instructions  (front end developer, baskend developer, etc)
- MCP tools: enable agents to implement specs via actions (write code, test, etc)

### Workflow
1. **Create**: Write spec as complete thought in `/specs` folder
2. **Discuss**: Iterate and refine through agent collaboration
3. **Implement**: Hand spec to appropriate specialist agent
4. **Validate**: Review implementation against spec criteria
5. **Document**: Update spec with learnings and decisions

### Slash Commands

Claude slash commands are used to manage the flow.
These are simple instructions to help make the process uniform. 
They can be updated and refined as needed. 

- `/spec create [name]` - Create new specification
- `/spec status` - Show current spec states
- `/spec implement [name]` - Hand to appropriate agent
- `/spec review [name]` - Validate implementation

### Agent Orchestration

Agents are defined with clear roles, for instance:

- **system-architect**: Creates high-level specs, ADRs, architectural decisions
- **vue-developer**: Component specs, UI patterns, frontend architecture
- **python-developer**: Implementation specs, technical details, backend logic
- 
- Each agent reads/updates specs through basic-memory tools. 

## How to Evaluate

### Success Criteria
- Specs provide clear, actionable guidance for implementation
- Reduced circular refactoring and scope creep
- Persistent context across development sessions
- Clean separation between "what/why" and implementation details
- Specs record a history of what happened and why for historical context

### Testing Procedure
1. Create a spec for an existing problematic component
2. Have an agent implement following only the spec
3. Compare result quality and development speed vs. ad-hoc approach
4. Measure context preservation across sessions
5. Evaluate spec clarity and completeness

### Metrics
- Time from spec to working implementation
- Number of refactoring cycles required
- Agent understanding of requirements
- Spec reusability for similar components

## Notes
- Start simple: specs are just complete thoughts, not heavy processes
- Use basic-memory's knowledge graph to link specs, decisions, components
- Let the process evolve naturally based on what works
- Focus on solving the actual problem: Manage complexity in development

## Observations

- [problem] Web development without clear goals and documentation circular refactoring cycles #complexity
- [solution] Specification-driven development reduces scope creep and context loss #process-improvement  
- [pattern] basic-memory as specification engine creates recursive development loop #meta-development
- [workflow] Five-step process: Create → Discuss → Implement → Validate → Document #methodology
- [tool] Slash commands provide uniform process automation #automation
- [agent-pattern] Three specialized agents handle different implementation domains #specialization
- [success-metric] Time from spec to working implementation measures process efficiency #measurement
- [learning] Process should evolve naturally based on what works in practice #adaptation
- [format] Living specifications use checklists for progress tracking instead of static status headers #documentation
- [evolution] Specs evolve throughout implementation maintaining value as working documents #continuous-improvement

## Relations

- spec [[Spec-2: Slash Commands Reference]]
- spec [[Spec-3: Agent Definitions]]

```

--------------------------------------------------------------------------------
/src/basic_memory/mcp/prompts/recent_activity.py:
--------------------------------------------------------------------------------

```python
"""Recent activity prompts for Basic Memory MCP server.

These prompts help users see what has changed in their knowledge base recently.
"""

from typing import Annotated, Optional

from loguru import logger
from pydantic import Field

from basic_memory.mcp.prompts.utils import format_prompt_context, PromptContext, PromptContextItem
from basic_memory.mcp.server import mcp
from basic_memory.mcp.tools.recent_activity import recent_activity
from basic_memory.schemas.base import TimeFrame
from basic_memory.schemas.memory import GraphContext, ProjectActivitySummary
from basic_memory.schemas.search import SearchItemType


@mcp.prompt(
    name="recent_activity",
    description="Get recent activity from a specific project or across all projects",
)
async def recent_activity_prompt(
    timeframe: Annotated[
        TimeFrame,
        Field(description="How far back to look for activity (e.g. '1d', '1 week')"),
    ] = "7d",
    project: Annotated[
        Optional[str],
        Field(
            description="Specific project to get activity from (None for discovery across all projects)"
        ),
    ] = None,
) -> str:
    """Get recent activity from a specific project or across all projects.

    This prompt helps you see what's changed recently in the knowledge base.
    In discovery mode (project=None), it shows activity across all projects.
    In project-specific mode, it shows detailed activity for one project.

    Args:
        timeframe: How far back to look for activity (e.g. '1d', '1 week')
        project: Specific project to get activity from (None for discovery across all projects)

    Returns:
        Formatted summary of recent activity
    """
    logger.info(f"Getting recent activity, timeframe: {timeframe}, project: {project}")

    recent = await recent_activity.fn(
        project=project, timeframe=timeframe, type=[SearchItemType.ENTITY]
    )

    # Extract primary results from the hierarchical structure
    primary_results = []
    related_results = []

    if isinstance(recent, ProjectActivitySummary):
        # Discovery mode - extract results from all projects
        for _, project_activity in recent.projects.items():
            if project_activity.activity.results:
                # Take up to 2 primary results per project
                for item in project_activity.activity.results[:2]:
                    primary_results.append(item.primary_result)
                    # Add up to 1 related result per primary item
                    if item.related_results:
                        related_results.extend(item.related_results[:1])  # pragma: no cover

        # Limit total results for readability
        primary_results = primary_results[:8]
        related_results = related_results[:6]

    elif isinstance(recent, GraphContext):
        # Project-specific mode - use existing logic
        if recent.results:
            # Take up to 5 primary results
            for item in recent.results[:5]:
                primary_results.append(item.primary_result)
                # Add up to 2 related results per primary item
                if item.related_results:
                    related_results.extend(item.related_results[:2])  # pragma: no cover

    # Set topic based on mode
    if project:
        topic = f"Recent Activity in {project} ({timeframe})"
    else:
        topic = f"Recent Activity Across All Projects ({timeframe})"

    prompt_context = format_prompt_context(
        PromptContext(
            topic=topic,
            timeframe=timeframe,
            results=[
                PromptContextItem(
                    primary_results=primary_results,
                    related_results=related_results[:10],  # Limit total related results
                )
            ],
        )
    )

    # Add mode-specific suggestions
    first_title = "Recent Topic"
    if primary_results and len(primary_results) > 0:
        first_title = primary_results[0].title

    if project:
        # Project-specific suggestions
        capture_suggestions = f"""
    ## Opportunity to Capture Activity Summary

    Consider creating a summary note of recent activity in {project}:

    ```python
    await write_note(
        "{project}",
        title="Activity Summary {timeframe}",
        content='''
        # Activity Summary for {project} ({timeframe})

        ## Overview
        [Summary of key changes and developments in this project over this period]

        ## Key Updates
        [List main updates and their significance within this project]

        ## Observations
        - [trend] [Observation about patterns in recent activity]
        - [insight] [Connection between different activities]

        ## Relations
        - summarizes [[{first_title}]]
        - relates_to [[{project} Overview]]
        ''',
        folder="summaries"
    )
    ```

    Summarizing periodic activity helps create high-level insights and connections within the project.
    """
    else:
        # Discovery mode suggestions
        project_count = len(recent.projects) if isinstance(recent, ProjectActivitySummary) else 0
        most_active = (
            getattr(recent.summary, "most_active_project", "Unknown")
            if isinstance(recent, ProjectActivitySummary)
            else "Unknown"
        )

        capture_suggestions = f"""
    ## Cross-Project Activity Discovery

    Found activity across {project_count} projects. Most active: **{most_active}**

    Consider creating a cross-project summary:

    ```python
    await write_note(
        "{most_active if most_active != "Unknown" else "main"}",
        title="Cross-Project Activity Summary {timeframe}",
        content='''
        # Cross-Project Activity Summary ({timeframe})

        ## Overview
        Activity found across {project_count} projects, with {most_active} showing the most activity.

        ## Key Developments
        [Summarize important changes across all projects]

        ## Project Insights
        [Note patterns or connections between projects]

        ## Observations
        - [trend] [Cross-project patterns observed]
        - [insight] [Connections between different project activities]

        ## Relations
        - summarizes [[{first_title}]]
        - relates_to [[Project Portfolio Overview]]
        ''',
        folder="summaries"
    )
    ```

    Cross-project summaries help identify broader trends and project interconnections.
    """

    return prompt_context + capture_suggestions

```

--------------------------------------------------------------------------------
/src/basic_memory/deps/importers.py:
--------------------------------------------------------------------------------

```python
"""Importer dependency injection for basic-memory.

This module provides importer dependencies:
- ChatGPTImporter
- ClaudeConversationsImporter
- ClaudeProjectsImporter
- MemoryJsonImporter
"""

from typing import Annotated

from fastapi import Depends

from basic_memory.deps.projects import (
    ProjectConfigDep,
    ProjectConfigV2Dep,
    ProjectConfigV2ExternalDep,
)
from basic_memory.deps.services import (
    FileServiceDep,
    FileServiceV2Dep,
    FileServiceV2ExternalDep,
    MarkdownProcessorDep,
    MarkdownProcessorV2Dep,
    MarkdownProcessorV2ExternalDep,
)
from basic_memory.importers import (
    ChatGPTImporter,
    ClaudeConversationsImporter,
    ClaudeProjectsImporter,
    MemoryJsonImporter,
)


# --- ChatGPT Importer ---


async def get_chatgpt_importer(
    project_config: ProjectConfigDep,
    markdown_processor: MarkdownProcessorDep,
    file_service: FileServiceDep,
) -> ChatGPTImporter:
    """Create ChatGPTImporter with dependencies."""
    return ChatGPTImporter(project_config.home, markdown_processor, file_service)


ChatGPTImporterDep = Annotated[ChatGPTImporter, Depends(get_chatgpt_importer)]


async def get_chatgpt_importer_v2(  # pragma: no cover
    project_config: ProjectConfigV2Dep,
    markdown_processor: MarkdownProcessorV2Dep,
    file_service: FileServiceV2Dep,
) -> ChatGPTImporter:
    """Create ChatGPTImporter with v2 dependencies."""
    return ChatGPTImporter(project_config.home, markdown_processor, file_service)


ChatGPTImporterV2Dep = Annotated[ChatGPTImporter, Depends(get_chatgpt_importer_v2)]


async def get_chatgpt_importer_v2_external(
    project_config: ProjectConfigV2ExternalDep,
    markdown_processor: MarkdownProcessorV2ExternalDep,
    file_service: FileServiceV2ExternalDep,
) -> ChatGPTImporter:
    """Create ChatGPTImporter with v2 external_id dependencies."""
    return ChatGPTImporter(project_config.home, markdown_processor, file_service)


ChatGPTImporterV2ExternalDep = Annotated[ChatGPTImporter, Depends(get_chatgpt_importer_v2_external)]


# --- Claude Conversations Importer ---


async def get_claude_conversations_importer(
    project_config: ProjectConfigDep,
    markdown_processor: MarkdownProcessorDep,
    file_service: FileServiceDep,
) -> ClaudeConversationsImporter:
    """Create ClaudeConversationsImporter with dependencies."""
    return ClaudeConversationsImporter(project_config.home, markdown_processor, file_service)


ClaudeConversationsImporterDep = Annotated[
    ClaudeConversationsImporter, Depends(get_claude_conversations_importer)
]


async def get_claude_conversations_importer_v2(  # pragma: no cover
    project_config: ProjectConfigV2Dep,
    markdown_processor: MarkdownProcessorV2Dep,
    file_service: FileServiceV2Dep,
) -> ClaudeConversationsImporter:
    """Create ClaudeConversationsImporter with v2 dependencies."""
    return ClaudeConversationsImporter(project_config.home, markdown_processor, file_service)


ClaudeConversationsImporterV2Dep = Annotated[
    ClaudeConversationsImporter, Depends(get_claude_conversations_importer_v2)
]


async def get_claude_conversations_importer_v2_external(
    project_config: ProjectConfigV2ExternalDep,
    markdown_processor: MarkdownProcessorV2ExternalDep,
    file_service: FileServiceV2ExternalDep,
) -> ClaudeConversationsImporter:
    """Create ClaudeConversationsImporter with v2 external_id dependencies."""
    return ClaudeConversationsImporter(project_config.home, markdown_processor, file_service)


ClaudeConversationsImporterV2ExternalDep = Annotated[
    ClaudeConversationsImporter, Depends(get_claude_conversations_importer_v2_external)
]


# --- Claude Projects Importer ---


async def get_claude_projects_importer(
    project_config: ProjectConfigDep,
    markdown_processor: MarkdownProcessorDep,
    file_service: FileServiceDep,
) -> ClaudeProjectsImporter:
    """Create ClaudeProjectsImporter with dependencies."""
    return ClaudeProjectsImporter(project_config.home, markdown_processor, file_service)


ClaudeProjectsImporterDep = Annotated[ClaudeProjectsImporter, Depends(get_claude_projects_importer)]


async def get_claude_projects_importer_v2(  # pragma: no cover
    project_config: ProjectConfigV2Dep,
    markdown_processor: MarkdownProcessorV2Dep,
    file_service: FileServiceV2Dep,
) -> ClaudeProjectsImporter:
    """Create ClaudeProjectsImporter with v2 dependencies."""
    return ClaudeProjectsImporter(project_config.home, markdown_processor, file_service)


ClaudeProjectsImporterV2Dep = Annotated[
    ClaudeProjectsImporter, Depends(get_claude_projects_importer_v2)
]


async def get_claude_projects_importer_v2_external(
    project_config: ProjectConfigV2ExternalDep,
    markdown_processor: MarkdownProcessorV2ExternalDep,
    file_service: FileServiceV2ExternalDep,
) -> ClaudeProjectsImporter:
    """Create ClaudeProjectsImporter with v2 external_id dependencies."""
    return ClaudeProjectsImporter(project_config.home, markdown_processor, file_service)


ClaudeProjectsImporterV2ExternalDep = Annotated[
    ClaudeProjectsImporter, Depends(get_claude_projects_importer_v2_external)
]


# --- Memory JSON Importer ---


async def get_memory_json_importer(
    project_config: ProjectConfigDep,
    markdown_processor: MarkdownProcessorDep,
    file_service: FileServiceDep,
) -> MemoryJsonImporter:
    """Create MemoryJsonImporter with dependencies."""
    return MemoryJsonImporter(project_config.home, markdown_processor, file_service)


MemoryJsonImporterDep = Annotated[MemoryJsonImporter, Depends(get_memory_json_importer)]


async def get_memory_json_importer_v2(  # pragma: no cover
    project_config: ProjectConfigV2Dep,
    markdown_processor: MarkdownProcessorV2Dep,
    file_service: FileServiceV2Dep,
) -> MemoryJsonImporter:
    """Create MemoryJsonImporter with v2 dependencies."""
    return MemoryJsonImporter(project_config.home, markdown_processor, file_service)


MemoryJsonImporterV2Dep = Annotated[MemoryJsonImporter, Depends(get_memory_json_importer_v2)]


async def get_memory_json_importer_v2_external(
    project_config: ProjectConfigV2ExternalDep,
    markdown_processor: MarkdownProcessorV2ExternalDep,
    file_service: FileServiceV2ExternalDep,
) -> MemoryJsonImporter:
    """Create MemoryJsonImporter with v2 external_id dependencies."""
    return MemoryJsonImporter(project_config.home, markdown_processor, file_service)


MemoryJsonImporterV2ExternalDep = Annotated[
    MemoryJsonImporter, Depends(get_memory_json_importer_v2_external)
]

```

--------------------------------------------------------------------------------
/src/basic_memory/importers/claude_conversations_importer.py:
--------------------------------------------------------------------------------

```python
"""Claude conversations import service for Basic Memory."""

import logging
from datetime import datetime
from typing import Any, Dict, List, Optional

from basic_memory.markdown.schemas import EntityFrontmatter, EntityMarkdown
from basic_memory.importers.base import Importer
from basic_memory.schemas.importer import ChatImportResult
from basic_memory.importers.utils import clean_filename, format_timestamp

logger = logging.getLogger(__name__)


class ClaudeConversationsImporter(Importer[ChatImportResult]):
    """Service for importing Claude conversations."""

    def handle_error(  # pragma: no cover
        self, message: str, error: Optional[Exception] = None
    ) -> ChatImportResult:
        """Return a failed ChatImportResult with an error message."""
        error_msg = f"{message}: {error}" if error else message
        return ChatImportResult(
            import_count={},
            success=False,
            error_message=error_msg,
            conversations=0,
            messages=0,
        )

    async def import_data(
        self, source_data, destination_folder: str, **kwargs: Any
    ) -> ChatImportResult:
        """Import conversations from Claude JSON export.

        Args:
            source_data: Path to the Claude conversations.json file.
            destination_folder: Destination folder within the project.
            **kwargs: Additional keyword arguments.

        Returns:
            ChatImportResult containing statistics and status of the import.
        """
        try:
            # Ensure the destination folder exists
            await self.ensure_folder_exists(destination_folder)

            conversations = source_data

            # Process each conversation
            messages_imported = 0
            chats_imported = 0

            for chat in conversations:
                # Get name, providing default for unnamed conversations
                chat_name = chat.get("name") or f"Conversation {chat.get('uuid', 'untitled')}"

                # Convert to entity
                entity = self._format_chat_content(
                    folder=destination_folder,
                    name=chat_name,
                    messages=chat["chat_messages"],
                    created_at=chat["created_at"],
                    modified_at=chat["updated_at"],
                )

                # Write file using relative path - FileService handles base_path
                file_path = f"{entity.frontmatter.metadata['permalink']}.md"
                await self.write_entity(entity, file_path)

                chats_imported += 1
                messages_imported += len(chat["chat_messages"])

            return ChatImportResult(
                import_count={"conversations": chats_imported, "messages": messages_imported},
                success=True,
                conversations=chats_imported,
                messages=messages_imported,
            )

        except Exception as e:  # pragma: no cover
            logger.exception("Failed to import Claude conversations")
            return self.handle_error("Failed to import Claude conversations", e)

    def _format_chat_content(
        self,
        folder: str,
        name: str,
        messages: List[Dict[str, Any]],
        created_at: str,
        modified_at: str,
    ) -> EntityMarkdown:
        """Convert chat messages to Basic Memory entity format.

        Args:
            folder: Destination folder name (relative path).
            name: Chat name.
            messages: List of chat messages.
            created_at: Creation timestamp.
            modified_at: Modification timestamp.

        Returns:
            EntityMarkdown instance representing the conversation.
        """
        # Generate permalink using folder name (relative path)
        date_prefix = datetime.fromisoformat(created_at.replace("Z", "+00:00")).strftime("%Y%m%d")
        clean_title = clean_filename(name)
        permalink = f"{folder}/{date_prefix}-{clean_title}"

        # Format content
        content = self._format_chat_markdown(
            name=name,
            messages=messages,
            created_at=created_at,
            modified_at=modified_at,
            permalink=permalink,
        )

        # Create entity
        entity = EntityMarkdown(
            frontmatter=EntityFrontmatter(
                metadata={
                    "type": "conversation",
                    "title": name,
                    "created": created_at,
                    "modified": modified_at,
                    "permalink": permalink,
                }
            ),
            content=content,
        )

        return entity

    def _format_chat_markdown(
        self,
        name: str,
        messages: List[Dict[str, Any]],
        created_at: str,
        modified_at: str,
        permalink: str,
    ) -> str:
        """Format chat as clean markdown.

        Args:
            name: Chat name.
            messages: List of chat messages.
            created_at: Creation timestamp.
            modified_at: Modification timestamp.
            permalink: Permalink for the entity.

        Returns:
            Formatted markdown content.
        """
        # Start with frontmatter and title
        lines = [
            f"# {name}\n",
        ]

        # Add messages
        for msg in messages:
            # Format timestamp
            ts = format_timestamp(msg["created_at"])

            # Add message header
            lines.append(f"### {msg['sender'].title()} ({ts})")

            # Handle message content
            content = msg.get("text", "")
            if msg.get("content"):
                # Filter out None values before joining
                content = " ".join(
                    str(c.get("text", ""))
                    for c in msg["content"]
                    if c and c.get("text") is not None
                )
            lines.append(content)

            # Handle attachments
            attachments = msg.get("attachments", [])
            for attachment in attachments:
                if "file_name" in attachment:
                    lines.append(f"\n**Attachment: {attachment['file_name']}**")
                    if "extracted_content" in attachment:
                        lines.append("```")
                        lines.append(attachment["extracted_content"])
                        lines.append("```")

            # Add spacing between messages
            lines.append("")

        return "\n".join(lines)

```

--------------------------------------------------------------------------------
/src/basic_memory/cli/commands/status.py:
--------------------------------------------------------------------------------

```python
"""Status command for basic-memory CLI."""

from typing import Set, Dict
from typing import Annotated, Optional

from mcp.server.fastmcp.exceptions import ToolError
import typer
from loguru import logger
from rich.console import Console
from rich.panel import Panel
from rich.tree import Tree

from basic_memory.cli.app import app
from basic_memory.mcp.async_client import get_client
from basic_memory.mcp.tools.utils import call_post
from basic_memory.schemas import SyncReportResponse
from basic_memory.mcp.project_context import get_active_project

# Create rich console
console = Console()


def add_files_to_tree(
    tree: Tree, paths: Set[str], style: str, checksums: Dict[str, str] | None = None
):
    """Add files to tree, grouped by directory."""
    # Group by directory
    by_dir = {}
    for path in sorted(paths):
        parts = path.split("/", 1)
        dir_name = parts[0] if len(parts) > 1 else ""
        file_name = parts[1] if len(parts) > 1 else parts[0]
        by_dir.setdefault(dir_name, []).append((file_name, path))

    # Add to tree
    for dir_name, files in sorted(by_dir.items()):
        if dir_name:
            branch = tree.add(f"[bold]{dir_name}/[/bold]")
        else:
            branch = tree

        for file_name, full_path in sorted(files):
            if checksums and full_path in checksums:
                checksum_short = checksums[full_path][:8]
                branch.add(f"[{style}]{file_name}[/{style}] ({checksum_short})")
            else:
                branch.add(f"[{style}]{file_name}[/{style}]")


def group_changes_by_directory(changes: SyncReportResponse) -> Dict[str, Dict[str, int]]:
    """Group changes by directory for summary view."""
    by_dir = {}
    for change_type, paths in [
        ("new", changes.new),
        ("modified", changes.modified),
        ("deleted", changes.deleted),
    ]:
        for path in paths:
            dir_name = path.split("/", 1)[0]
            by_dir.setdefault(dir_name, {"new": 0, "modified": 0, "deleted": 0, "moved": 0})
            by_dir[dir_name][change_type] += 1

    # Handle moves - count in both source and destination directories
    for old_path, new_path in changes.moves.items():
        old_dir = old_path.split("/", 1)[0]
        new_dir = new_path.split("/", 1)[0]
        by_dir.setdefault(old_dir, {"new": 0, "modified": 0, "deleted": 0, "moved": 0})
        by_dir.setdefault(new_dir, {"new": 0, "modified": 0, "deleted": 0, "moved": 0})
        by_dir[old_dir]["moved"] += 1
        if old_dir != new_dir:
            by_dir[new_dir]["moved"] += 1

    return by_dir


def build_directory_summary(counts: Dict[str, int]) -> str:
    """Build summary string for directory changes."""
    parts = []
    if counts["new"]:
        parts.append(f"[green]+{counts['new']} new[/green]")
    if counts["modified"]:
        parts.append(f"[yellow]~{counts['modified']} modified[/yellow]")
    if counts["moved"]:
        parts.append(f"[blue]↔{counts['moved']} moved[/blue]")
    if counts["deleted"]:
        parts.append(f"[red]-{counts['deleted']} deleted[/red]")
    return " ".join(parts)


def display_changes(
    project_name: str, title: str, changes: SyncReportResponse, verbose: bool = False
):
    """Display changes using Rich for better visualization."""
    tree = Tree(f"{project_name}: {title}")

    if changes.total == 0 and not changes.skipped_files:
        tree.add("No changes")
        console.print(Panel(tree, expand=False))
        return

    if verbose:
        # Full file listing with checksums
        if changes.new:
            new_branch = tree.add("[green]New Files[/green]")
            add_files_to_tree(new_branch, changes.new, "green", changes.checksums)
        if changes.modified:
            mod_branch = tree.add("[yellow]Modified[/yellow]")
            add_files_to_tree(mod_branch, changes.modified, "yellow", changes.checksums)
        if changes.moves:
            move_branch = tree.add("[blue]Moved[/blue]")
            for old_path, new_path in sorted(changes.moves.items()):
                move_branch.add(f"[blue]{old_path}[/blue] → [blue]{new_path}[/blue]")
        if changes.deleted:
            del_branch = tree.add("[red]Deleted[/red]")
            add_files_to_tree(del_branch, changes.deleted, "red")
        if changes.skipped_files:
            skip_branch = tree.add("[red]! Skipped (Circuit Breaker)[/red]")
            for skipped in sorted(changes.skipped_files, key=lambda x: x.path):
                skip_branch.add(
                    f"[red]{skipped.path}[/red] "
                    f"(failures: {skipped.failure_count}, reason: {skipped.reason})"
                )
    else:
        # Show directory summaries
        by_dir = group_changes_by_directory(changes)
        for dir_name, counts in sorted(by_dir.items()):
            summary = build_directory_summary(counts)
            if summary:  # Only show directories with changes
                tree.add(f"[bold]{dir_name}/[/bold] {summary}")

        # Show skipped files summary in non-verbose mode
        if changes.skipped_files:
            skip_count = len(changes.skipped_files)
            tree.add(
                f"[red]! {skip_count} file{'s' if skip_count != 1 else ''} "
                f"skipped due to repeated failures[/red]"
            )

    console.print(Panel(tree, expand=False))


async def run_status(project: Optional[str] = None, verbose: bool = False):  # pragma: no cover
    """Check sync status of files vs database."""

    try:
        async with get_client() as client:
            project_item = await get_active_project(client, project, None)
            response = await call_post(client, f"{project_item.project_url}/project/status")
            sync_report = SyncReportResponse.model_validate(response.json())

            display_changes(project_item.name, "Status", sync_report, verbose)

    except (ValueError, ToolError) as e:
        console.print(f"[red]Error: {e}[/red]")
        raise typer.Exit(1)


@app.command()
def status(
    project: Annotated[
        Optional[str],
        typer.Option(help="The project name."),
    ] = None,
    verbose: bool = typer.Option(False, "--verbose", "-v", help="Show detailed file information"),
):
    """Show sync status between files and database."""
    from basic_memory.cli.commands.command_utils import run_with_cleanup

    try:
        run_with_cleanup(run_status(project, verbose))  # pragma: no cover
    except Exception as e:
        logger.error(f"Error checking status: {e}")
        typer.echo(f"Error checking status: {e}", err=True)
        raise typer.Exit(code=1)  # pragma: no cover

```

--------------------------------------------------------------------------------
/src/basic_memory/alembic/versions/g9a0b3c4d5e6_add_external_id_to_project_and_entity.py:
--------------------------------------------------------------------------------

```python
"""Add external_id UUID column to project and entity tables

Revision ID: g9a0b3c4d5e6
Revises: f8a9b2c3d4e5
Create Date: 2025-12-29 10:00:00.000000

"""

import uuid
from typing import Sequence, Union

import sqlalchemy as sa
from alembic import op
from sqlalchemy import text


def column_exists(connection, table: str, column: str) -> bool:
    """Check if a column exists in a table (idempotent migration support)."""
    if connection.dialect.name == "postgresql":
        result = connection.execute(
            text(
                "SELECT 1 FROM information_schema.columns "
                "WHERE table_name = :table AND column_name = :column"
            ),
            {"table": table, "column": column},
        )
        return result.fetchone() is not None
    else:
        # SQLite
        result = connection.execute(text(f"PRAGMA table_info({table})"))
        columns = [row[1] for row in result]
        return column in columns


def index_exists(connection, index_name: str) -> bool:
    """Check if an index exists (idempotent migration support)."""
    if connection.dialect.name == "postgresql":
        result = connection.execute(
            text("SELECT 1 FROM pg_indexes WHERE indexname = :index_name"),
            {"index_name": index_name},
        )
        return result.fetchone() is not None
    else:
        # SQLite
        result = connection.execute(
            text("SELECT 1 FROM sqlite_master WHERE type='index' AND name = :index_name"),
            {"index_name": index_name},
        )
        return result.fetchone() is not None


# revision identifiers, used by Alembic.
revision: str = "g9a0b3c4d5e6"
down_revision: Union[str, None] = "f8a9b2c3d4e5"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
    """Add external_id UUID column to project and entity tables.

    This migration:
    1. Adds external_id column to project table
    2. Adds external_id column to entity table
    3. Generates UUIDs for existing rows
    4. Creates unique indexes on both columns
    """
    connection = op.get_bind()
    dialect = connection.dialect.name

    # -------------------------------------------------------------------------
    # Add external_id to project table
    # -------------------------------------------------------------------------

    if not column_exists(connection, "project", "external_id"):
        # Step 1: Add external_id column as nullable first
        op.add_column("project", sa.Column("external_id", sa.String(), nullable=True))

        # Step 2: Generate UUIDs for existing rows
        if dialect == "postgresql":
            # Postgres has gen_random_uuid() function
            op.execute("""
                UPDATE project
                SET external_id = gen_random_uuid()::text
                WHERE external_id IS NULL
            """)
        else:
            # SQLite: need to generate UUIDs in Python
            result = connection.execute(text("SELECT id FROM project WHERE external_id IS NULL"))
            for row in result:
                new_uuid = str(uuid.uuid4())
                connection.execute(
                    text("UPDATE project SET external_id = :uuid WHERE id = :id"),
                    {"uuid": new_uuid, "id": row[0]},
                )

        # Step 3: Make external_id NOT NULL
        if dialect == "postgresql":
            op.alter_column("project", "external_id", nullable=False)
        else:
            # SQLite requires batch operations for ALTER COLUMN
            with op.batch_alter_table("project") as batch_op:
                batch_op.alter_column("external_id", nullable=False)

    # Step 4: Create unique index on project.external_id (idempotent)
    if not index_exists(connection, "ix_project_external_id"):
        op.create_index("ix_project_external_id", "project", ["external_id"], unique=True)

    # -------------------------------------------------------------------------
    # Add external_id to entity table
    # -------------------------------------------------------------------------

    if not column_exists(connection, "entity", "external_id"):
        # Step 1: Add external_id column as nullable first
        op.add_column("entity", sa.Column("external_id", sa.String(), nullable=True))

        # Step 2: Generate UUIDs for existing rows
        if dialect == "postgresql":
            # Postgres has gen_random_uuid() function
            op.execute("""
                UPDATE entity
                SET external_id = gen_random_uuid()::text
                WHERE external_id IS NULL
            """)
        else:
            # SQLite: need to generate UUIDs in Python
            result = connection.execute(text("SELECT id FROM entity WHERE external_id IS NULL"))
            for row in result:
                new_uuid = str(uuid.uuid4())
                connection.execute(
                    text("UPDATE entity SET external_id = :uuid WHERE id = :id"),
                    {"uuid": new_uuid, "id": row[0]},
                )

        # Step 3: Make external_id NOT NULL
        if dialect == "postgresql":
            op.alter_column("entity", "external_id", nullable=False)
        else:
            # SQLite requires batch operations for ALTER COLUMN
            with op.batch_alter_table("entity") as batch_op:
                batch_op.alter_column("external_id", nullable=False)

    # Step 4: Create unique index on entity.external_id (idempotent)
    if not index_exists(connection, "ix_entity_external_id"):
        op.create_index("ix_entity_external_id", "entity", ["external_id"], unique=True)


def downgrade() -> None:
    """Remove external_id columns from project and entity tables."""
    connection = op.get_bind()
    dialect = connection.dialect.name

    # Drop from entity table
    if index_exists(connection, "ix_entity_external_id"):
        op.drop_index("ix_entity_external_id", table_name="entity")

    if column_exists(connection, "entity", "external_id"):
        if dialect == "postgresql":
            op.drop_column("entity", "external_id")
        else:
            with op.batch_alter_table("entity") as batch_op:
                batch_op.drop_column("external_id")

    # Drop from project table
    if index_exists(connection, "ix_project_external_id"):
        op.drop_index("ix_project_external_id", table_name="project")

    if column_exists(connection, "project", "external_id"):
        if dialect == "postgresql":
            op.drop_column("project", "external_id")
        else:
            with op.batch_alter_table("project") as batch_op:
                batch_op.drop_column("external_id")

```

--------------------------------------------------------------------------------
/tests/api/test_search_router.py:
--------------------------------------------------------------------------------

```python
"""Tests for search router."""

from datetime import datetime, timezone

import pytest
import pytest_asyncio
from sqlalchemy import text

from basic_memory import db
from basic_memory.schemas import Entity as EntitySchema
from basic_memory.schemas.search import SearchItemType, SearchResponse


@pytest_asyncio.fixture
async def indexed_entity(full_entity, search_service):
    """Create an entity and index it."""
    await search_service.index_entity(full_entity)
    return full_entity


@pytest.mark.asyncio
async def test_search_basic(client, indexed_entity, project_url):
    """Test basic text search."""
    response = await client.post(f"{project_url}/search/", json={"text": "search"})
    assert response.status_code == 200
    search_results = SearchResponse.model_validate(response.json())
    assert len(search_results.results) == 3

    found = False
    for r in search_results.results:
        if r.type == SearchItemType.ENTITY.value:
            assert r.permalink == indexed_entity.permalink
            found = True

    assert found, "Expected to find indexed entity in results"


@pytest.mark.asyncio
async def test_search_basic_pagination(client, indexed_entity, project_url):
    """Test basic text search."""
    response = await client.post(
        f"{project_url}/search/?page=3&page_size=1", json={"text": "search"}
    )
    assert response.status_code == 200
    search_results = SearchResponse.model_validate(response.json())
    assert len(search_results.results) == 1

    assert search_results.current_page == 3
    assert search_results.page_size == 1


@pytest.mark.asyncio
async def test_search_with_entity_type_filter(client, indexed_entity, project_url):
    """Test search with type filter."""
    # Should find with correct type
    response = await client.post(
        f"{project_url}/search/",
        json={"text": "test", "entity_types": [SearchItemType.ENTITY.value]},
    )
    assert response.status_code == 200
    search_results = SearchResponse.model_validate(response.json())
    assert len(search_results.results) > 0

    # Should find with relation type
    response = await client.post(
        f"{project_url}/search/",
        json={"text": "test", "entity_types": [SearchItemType.RELATION.value]},
    )
    assert response.status_code == 200
    search_results = SearchResponse.model_validate(response.json())
    assert len(search_results.results) == 2


@pytest.mark.asyncio
async def test_search_with_type_filter(client, indexed_entity, project_url):
    """Test search with entity type filter."""
    # Should find with correct entity type
    response = await client.post(f"{project_url}/search/", json={"text": "test", "types": ["test"]})
    assert response.status_code == 200
    search_results = SearchResponse.model_validate(response.json())
    assert len(search_results.results) == 1

    # Should not find with wrong entity type
    response = await client.post(f"{project_url}/search/", json={"text": "test", "types": ["note"]})
    assert response.status_code == 200
    search_results = SearchResponse.model_validate(response.json())
    assert len(search_results.results) == 0


@pytest.mark.asyncio
async def test_search_with_date_filter(client, indexed_entity, project_url):
    """Test search with date filter."""
    # Should find with past date
    past_date = datetime(2020, 1, 1, tzinfo=timezone.utc)
    response = await client.post(
        f"{project_url}/search/", json={"text": "test", "after_date": past_date.isoformat()}
    )
    assert response.status_code == 200
    search_results = SearchResponse.model_validate(response.json())

    # Should not find with future date
    future_date = datetime(2030, 1, 1, tzinfo=timezone.utc)
    response = await client.post(
        f"{project_url}/search/", json={"text": "test", "after_date": future_date.isoformat()}
    )
    assert response.status_code == 200
    search_results = SearchResponse.model_validate(response.json())
    assert len(search_results.results) == 0


@pytest.mark.asyncio
async def test_search_empty(search_service, client, project_url):
    """Test search with no matches."""
    response = await client.post(f"{project_url}/search/", json={"text": "nonexistent"})
    assert response.status_code == 200
    search_result = SearchResponse.model_validate(response.json())
    assert len(search_result.results) == 0


@pytest.mark.asyncio
async def test_reindex(
    client, search_service, entity_service, session_maker, project_url, app_config
):
    """Test reindex endpoint."""
    # Skip for Postgres - needs investigation of database connection isolation
    from basic_memory.config import DatabaseBackend

    if app_config.database_backend == DatabaseBackend.POSTGRES:
        pytest.skip("Not yet supported for Postgres - database connection isolation issue")

    # Create test entity and document
    await entity_service.create_entity(
        EntitySchema(
            title="TestEntity1",
            folder="test",
            entity_type="test",
        ),
    )

    # Clear search index
    async with db.scoped_session(session_maker) as session:
        await session.execute(text("DELETE FROM search_index"))
        await session.commit()

    # Verify nothing is searchable
    response = await client.post(f"{project_url}/search/", json={"text": "test"})
    search_results = SearchResponse.model_validate(response.json())
    assert len(search_results.results) == 0

    # Trigger reindex
    reindex_response = await client.post(f"{project_url}/search/reindex")
    assert reindex_response.status_code == 200
    assert reindex_response.json()["status"] == "ok"

    # Verify content is searchable again
    search_response = await client.post(f"{project_url}/search/", json={"text": "test"})
    search_results = SearchResponse.model_validate(search_response.json())
    assert len(search_results.results) == 1


@pytest.mark.asyncio
async def test_multiple_filters(client, indexed_entity, project_url):
    """Test search with multiple filters combined."""
    response = await client.post(
        f"{project_url}/search/",
        json={
            "text": "test",
            "entity_types": [SearchItemType.ENTITY.value],
            "types": ["test"],
            "after_date": datetime(2020, 1, 1, tzinfo=timezone.utc).isoformat(),
        },
    )
    assert response.status_code == 200
    search_result = SearchResponse.model_validate(response.json())
    assert len(search_result.results) == 1
    result = search_result.results[0]
    assert result.permalink == indexed_entity.permalink
    assert result.type == SearchItemType.ENTITY.value
    assert result.metadata["entity_type"] == "test"

```

--------------------------------------------------------------------------------
/src/basic_memory/cli/commands/format.py:
--------------------------------------------------------------------------------

```python
"""Format command for basic-memory CLI."""

from pathlib import Path
from typing import Annotated, Optional

import typer
from loguru import logger
from rich.console import Console
from rich.progress import Progress, SpinnerColumn, TextColumn

from basic_memory.cli.app import app
from basic_memory.cli.commands.command_utils import run_with_cleanup
from basic_memory.config import ConfigManager, get_project_config
from basic_memory.file_utils import format_file

console = Console()


def is_markdown_extension(path: Path) -> bool:
    """Check if file has a markdown extension."""
    return path.suffix.lower() in (".md", ".markdown")


async def format_single_file(file_path: Path, app_config) -> tuple[Path, bool, Optional[str]]:
    """Format a single file.

    Returns:
        Tuple of (path, success, error_message)
    """
    try:
        result = await format_file(
            file_path, app_config, is_markdown=is_markdown_extension(file_path)
        )
        if result is not None:
            return (file_path, True, None)
        else:
            return (file_path, False, "No formatter configured or formatting skipped")
    except Exception as e:
        return (file_path, False, str(e))


async def format_files(
    paths: list[Path], app_config, show_progress: bool = True
) -> tuple[int, int, list[tuple[Path, str]]]:
    """Format multiple files.

    Returns:
        Tuple of (formatted_count, skipped_count, errors)
    """
    formatted = 0
    skipped = 0
    errors: list[tuple[Path, str]] = []

    if show_progress:
        with Progress(
            SpinnerColumn(),
            TextColumn("[progress.description]{task.description}"),
            console=console,
        ) as progress:
            task = progress.add_task("Formatting files...", total=len(paths))

            for file_path in paths:
                path, success, error = await format_single_file(file_path, app_config)
                if success:
                    formatted += 1
                elif error and "No formatter configured" not in error:
                    errors.append((path, error))
                else:
                    skipped += 1
                progress.update(task, advance=1)
    else:
        for file_path in paths:
            path, success, error = await format_single_file(file_path, app_config)
            if success:
                formatted += 1
            elif error and "No formatter configured" not in error:
                errors.append((path, error))
            else:
                skipped += 1

    return formatted, skipped, errors


async def run_format(
    path: Optional[Path] = None,
    project: Optional[str] = None,
) -> None:
    """Run the format command."""
    app_config = ConfigManager().config

    # Check if formatting is enabled
    if (
        not app_config.format_on_save
        and not app_config.formatter_command
        and not app_config.formatters
    ):
        console.print(
            "[yellow]No formatters configured. Set format_on_save=true and "
            "formatter_command or formatters in your config.[/yellow]"
        )
        console.print(
            "\nExample config (~/.basic-memory/config.json):\n"
            '  "format_on_save": true,\n'
            '  "formatter_command": "prettier --write {file}"\n'
        )
        raise typer.Exit(1)

    # Temporarily enable format_on_save for this command
    # (so format_file actually runs the formatter)
    original_format_on_save = app_config.format_on_save
    app_config.format_on_save = True

    try:
        # Determine which files to format
        if path:
            # Format specific file or directory
            if path.is_file():
                files = [path]
            elif path.is_dir():
                # Find all markdown and json files
                files = (
                    list(path.rglob("*.md"))
                    + list(path.rglob("*.json"))
                    + list(path.rglob("*.canvas"))
                )
            else:
                console.print(f"[red]Path not found: {path}[/red]")
                raise typer.Exit(1)
        else:
            # Format all files in project
            project_config = get_project_config(project)
            project_path = Path(project_config.home)

            if not project_path.exists():
                console.print(f"[red]Project path not found: {project_path}[/red]")
                raise typer.Exit(1)

            # Find all markdown and json files
            files = (
                list(project_path.rglob("*.md"))
                + list(project_path.rglob("*.json"))
                + list(project_path.rglob("*.canvas"))
            )

        if not files:
            console.print("[yellow]No files found to format.[/yellow]")
            return

        console.print(f"Found {len(files)} file(s) to format...")

        formatted, skipped, errors = await format_files(files, app_config)

        # Print summary
        console.print()
        if formatted > 0:
            console.print(f"[green]Formatted: {formatted} file(s)[/green]")
        if skipped > 0:
            console.print(f"[dim]Skipped: {skipped} file(s) (no formatter for extension)[/dim]")
        if errors:
            console.print(f"[red]Errors: {len(errors)} file(s)[/red]")
            for path, error in errors:
                console.print(f"  [red]{path}[/red]: {error}")

    finally:
        # Restore original setting
        app_config.format_on_save = original_format_on_save


@app.command()
def format(
    path: Annotated[
        Optional[Path],
        typer.Argument(help="File or directory to format. Defaults to current project."),
    ] = None,
    project: Annotated[
        Optional[str],
        typer.Option("--project", "-p", help="Project name to format."),
    ] = None,
) -> None:
    """Format files using configured formatters.

    Uses the formatter_command or formatters settings from your config.
    By default, formats all .md, .json, and .canvas files in the current project.

    Examples:
        basic-memory format                    # Format all files in current project
        basic-memory format --project research # Format files in specific project
        basic-memory format notes/meeting.md   # Format a specific file
        basic-memory format notes/             # Format all files in directory
    """
    try:
        run_with_cleanup(run_format(path, project))
    except Exception as e:
        if not isinstance(e, typer.Exit):
            logger.error(f"Error formatting files: {e}")
            console.print(f"[red]Error formatting files: {e}[/red]")
            raise typer.Exit(code=1)
        raise

```

--------------------------------------------------------------------------------
/tests/cli/test_import_chatgpt.py:
--------------------------------------------------------------------------------

```python
"""Tests for import_chatgpt command."""

import json

import pytest
from typer.testing import CliRunner

from basic_memory.cli.app import app, import_app
from basic_memory.cli.commands import import_chatgpt  # noqa
from basic_memory.config import get_project_config

# Set up CLI runner
runner = CliRunner()


@pytest.fixture
def sample_conversation():
    """Sample ChatGPT conversation data for testing."""
    return {
        "title": "Test Conversation",
        "create_time": 1736616594.24054,  # Example timestamp
        "update_time": 1736616603.164995,
        "mapping": {
            "root": {"id": "root", "message": None, "parent": None, "children": ["msg1"]},
            "msg1": {
                "id": "msg1",
                "message": {
                    "id": "msg1",
                    "author": {"role": "user", "name": None, "metadata": {}},
                    "create_time": 1736616594.24054,
                    "content": {"content_type": "text", "parts": ["Hello, this is a test message"]},
                    "status": "finished_successfully",
                    "metadata": {},
                },
                "parent": "root",
                "children": ["msg2"],
            },
            "msg2": {
                "id": "msg2",
                "message": {
                    "id": "msg2",
                    "author": {"role": "assistant", "name": None, "metadata": {}},
                    "create_time": 1736616603.164995,
                    "content": {"content_type": "text", "parts": ["This is a test response"]},
                    "status": "finished_successfully",
                    "metadata": {},
                },
                "parent": "msg1",
                "children": [],
            },
        },
    }


@pytest.fixture
def sample_conversation_with_code():
    """Sample conversation with code block."""
    conversation = {
        "title": "Code Test",
        "create_time": 1736616594.24054,
        "update_time": 1736616603.164995,
        "mapping": {
            "root": {"id": "root", "message": None, "parent": None, "children": ["msg1"]},
            "msg1": {
                "id": "msg1",
                "message": {
                    "id": "msg1",
                    "author": {"role": "assistant", "name": None, "metadata": {}},
                    "create_time": 1736616594.24054,
                    "content": {
                        "content_type": "code",
                        "language": "python",
                        "text": "def hello():\n    print('Hello world!')",
                    },
                    "status": "finished_successfully",
                    "metadata": {},
                },
                "parent": "root",
                "children": [],
            },
            "msg2": {
                "id": "msg2",
                "message": {
                    "id": "msg2",
                    "author": {"role": "assistant", "name": None, "metadata": {}},
                    "create_time": 1736616594.24054,
                    "status": "finished_successfully",
                    "metadata": {},
                },
                "parent": "root",
                "children": [],
            },
        },
    }
    return conversation


@pytest.fixture
def sample_conversation_with_hidden():
    """Sample conversation with hidden messages."""
    conversation = {
        "title": "Hidden Test",
        "create_time": 1736616594.24054,
        "update_time": 1736616603.164995,
        "mapping": {
            "root": {
                "id": "root",
                "message": None,
                "parent": None,
                "children": ["visible", "hidden"],
            },
            "visible": {
                "id": "visible",
                "message": {
                    "id": "visible",
                    "author": {"role": "user", "name": None, "metadata": {}},
                    "create_time": 1736616594.24054,
                    "content": {"content_type": "text", "parts": ["Visible message"]},
                    "status": "finished_successfully",
                    "metadata": {},
                },
                "parent": "root",
                "children": [],
            },
            "hidden": {
                "id": "hidden",
                "message": {
                    "id": "hidden",
                    "author": {"role": "system", "name": None, "metadata": {}},
                    "create_time": 1736616594.24054,
                    "content": {"content_type": "text", "parts": ["Hidden message"]},
                    "status": "finished_successfully",
                    "metadata": {"is_visually_hidden_from_conversation": True},
                },
                "parent": "root",
                "children": [],
            },
        },
    }
    return conversation


@pytest.fixture
def sample_chatgpt_json(tmp_path, sample_conversation):
    """Create a sample ChatGPT JSON file."""
    json_file = tmp_path / "conversations.json"
    with open(json_file, "w", encoding="utf-8") as f:
        json.dump([sample_conversation], f)
    return json_file


def test_import_chatgpt_command_success(tmp_path, sample_chatgpt_json, monkeypatch):
    """Test successful conversation import via command."""
    # Set up test environment
    monkeypatch.setenv("HOME", str(tmp_path))

    # Run import
    result = runner.invoke(import_app, ["chatgpt", str(sample_chatgpt_json)])
    assert result.exit_code == 0
    assert "Import complete" in result.output
    assert "Imported 1 conversations" in result.output
    assert "Containing 2 messages" in result.output


def test_import_chatgpt_command_invalid_json(tmp_path):
    """Test error handling for invalid JSON."""
    # Create invalid JSON file
    invalid_file = tmp_path / "invalid.json"
    invalid_file.write_text("not json")

    result = runner.invoke(import_app, ["chatgpt", str(invalid_file)])
    assert result.exit_code == 1
    assert "Error during import" in result.output


def test_import_chatgpt_with_custom_folder(tmp_path, sample_chatgpt_json, monkeypatch):
    """Test import with custom conversations folder."""
    # Set up test environment

    config = get_project_config()
    config.home = tmp_path
    conversations_folder = "chats"

    # Run import
    result = runner.invoke(
        app,
        [
            "import",
            "chatgpt",
            str(sample_chatgpt_json),
            "--folder",
            conversations_folder,
        ],
    )
    assert result.exit_code == 0

    # Check files in custom folder
    conv_path = tmp_path / conversations_folder / "20250111-Test_Conversation.md"
    assert conv_path.exists()

```

--------------------------------------------------------------------------------
/tests/api/v2/test_prompt_router.py:
--------------------------------------------------------------------------------

```python
"""Tests for V2 prompt router endpoints (ID-based)."""

import pytest
import pytest_asyncio
from httpx import AsyncClient

from basic_memory.models import Project
from basic_memory.services.context_service import ContextService


@pytest_asyncio.fixture
async def context_service(entity_repository, search_service, observation_repository):
    """Create a real context service for testing."""
    return ContextService(entity_repository, search_service, observation_repository)


@pytest.mark.asyncio
async def test_continue_conversation_endpoint(
    client: AsyncClient,
    entity_service,
    search_service,
    context_service,
    entity_repository,
    test_graph,
    v2_project_url: str,
):
    """Test the v2 continue_conversation endpoint with real services."""
    # Create request data
    request_data = {
        "topic": "Root",  # This should match our test entity in test_graph
        "timeframe": "7d",
        "depth": 1,
        "related_items_limit": 2,
    }

    # Call the endpoint
    response = await client.post(
        f"{v2_project_url}/prompt/continue-conversation", json=request_data
    )

    # Verify response
    assert response.status_code == 200
    result = response.json()
    assert "prompt" in result
    assert "context" in result

    # Check content of context
    context = result["context"]
    assert context["topic"] == "Root"
    assert context["timeframe"] == "7d"
    assert context["has_results"] is True
    assert len(context["hierarchical_results"]) > 0

    # Check content of prompt
    prompt = result["prompt"]
    assert "Continuing conversation on: Root" in prompt
    assert "memory retrieval session" in prompt


@pytest.mark.asyncio
async def test_continue_conversation_without_topic(
    client: AsyncClient,
    entity_service,
    search_service,
    context_service,
    entity_repository,
    test_graph,
    v2_project_url: str,
):
    """Test v2 continue_conversation without topic - should use recent activity."""
    request_data = {"timeframe": "1d", "depth": 1, "related_items_limit": 2}

    response = await client.post(
        f"{v2_project_url}/prompt/continue-conversation", json=request_data
    )

    assert response.status_code == 200
    result = response.json()
    assert "Recent Activity" in result["context"]["topic"]


@pytest.mark.asyncio
async def test_search_prompt_endpoint(
    client: AsyncClient, entity_service, search_service, test_graph, v2_project_url: str
):
    """Test the v2 search_prompt endpoint with real services."""
    # Create request data
    request_data = {
        "query": "Root",  # This should match our test entity
        "timeframe": "7d",
    }

    # Call the endpoint
    response = await client.post(f"{v2_project_url}/prompt/search", json=request_data)

    # Verify response
    assert response.status_code == 200
    result = response.json()
    assert "prompt" in result
    assert "context" in result

    # Check content of context
    context = result["context"]
    assert context["query"] == "Root"
    assert context["timeframe"] == "7d"
    assert context["has_results"] is True
    assert len(context["results"]) > 0

    # Check content of prompt
    prompt = result["prompt"]
    assert 'Search Results for: "Root"' in prompt
    assert "This is a memory search session" in prompt


@pytest.mark.asyncio
async def test_search_prompt_no_results(
    client: AsyncClient, entity_service, search_service, v2_project_url: str
):
    """Test the v2 search_prompt endpoint with a query that returns no results."""
    # Create request data with a query that shouldn't match anything
    request_data = {"query": "NonExistentQuery12345", "timeframe": "7d"}

    # Call the endpoint
    response = await client.post(f"{v2_project_url}/prompt/search", json=request_data)

    # Verify response
    assert response.status_code == 200
    result = response.json()

    # Check content of context
    context = result["context"]
    assert context["query"] == "NonExistentQuery12345"
    assert context["has_results"] is False
    assert len(context["results"]) == 0

    # Check content of prompt
    prompt = result["prompt"]
    assert 'Search Results for: "NonExistentQuery12345"' in prompt
    assert "I couldn't find any results for this query" in prompt
    assert "Opportunity to Capture Knowledge" in prompt


@pytest.mark.asyncio
async def test_error_handling(client: AsyncClient, monkeypatch, v2_project_url: str):
    """Test error handling in v2 endpoints by breaking the template loader."""

    # Patch the template loader to raise an exception
    def mock_render(*args, **kwargs):
        raise Exception("Template error")

    # Apply the patch
    monkeypatch.setattr("basic_memory.api.template_loader.TemplateLoader.render", mock_render)

    # Test continue_conversation error handling
    response = await client.post(
        f"{v2_project_url}/prompt/continue-conversation",
        json={"topic": "test error", "timeframe": "7d"},
    )

    assert response.status_code == 500
    assert "detail" in response.json()
    assert "Template error" in response.json()["detail"]

    # Test search_prompt error handling
    response = await client.post(
        f"{v2_project_url}/prompt/search", json={"query": "test error", "timeframe": "7d"}
    )

    assert response.status_code == 500
    assert "detail" in response.json()
    assert "Template error" in response.json()["detail"]


@pytest.mark.asyncio
async def test_v2_prompt_endpoints_use_project_id_not_name(
    client: AsyncClient, test_project: Project
):
    """Verify v2 prompt endpoints require project ID, not name."""
    # Try using project name instead of ID - should fail
    response = await client.post(
        f"/v2/projects/{test_project.name}/prompt/continue-conversation",
        json={"topic": "test", "timeframe": "7d"},
    )

    # Should get validation error or 404 because name is not a valid integer
    assert response.status_code in [404, 422]

    # Also test search endpoint
    response = await client.post(
        f"/v2/projects/{test_project.name}/prompt/search",
        json={"query": "test", "timeframe": "7d"},
    )

    assert response.status_code in [404, 422]


@pytest.mark.asyncio
async def test_prompt_invalid_project_id(client: AsyncClient):
    """Test prompt endpoints with invalid project ID return 404."""
    # Test continue-conversation
    response = await client.post(
        "/v2/projects/999999/prompt/continue-conversation",
        json={"topic": "test", "timeframe": "7d"},
    )
    assert response.status_code == 404

    # Test search
    response = await client.post(
        "/v2/projects/999999/prompt/search",
        json={"query": "test", "timeframe": "7d"},
    )
    assert response.status_code == 404

```

--------------------------------------------------------------------------------
/.claude/commands/release/release.md:
--------------------------------------------------------------------------------

```markdown
# /release - Create Stable Release

Create a stable release using the automated justfile target with comprehensive validation.

## Usage
```
/release <version>
```

**Parameters:**
- `version` (required): Release version like `v0.13.2`

## Implementation

You are an expert release manager for the Basic Memory project. When the user runs `/release`, execute the following steps:

### Step 1: Pre-flight Validation

#### Version Check
1. Check current version in `src/basic_memory/__init__.py`
2. Verify new version format matches `v\d+\.\d+\.\d+` pattern
3. Confirm version is higher than current version

#### Git Status
1. Check current git status for uncommitted changes
2. Verify we're on the `main` branch
3. Confirm no existing tag with this version

#### Documentation Validation
1. **Changelog Check**
   - CHANGELOG.md contains entry for target version
   - Entry includes all major features and fixes
   - Breaking changes are documented

### Step 2: Use Justfile Automation
Execute the automated release process:
```bash
just release <version>
```

The justfile target handles:
- ✅ Version format validation
- ✅ Git status and branch checks
- ✅ Quality checks (`just check` - lint, format, type-check, tests)
- ✅ Version update in `src/basic_memory/__init__.py`
- ✅ Automatic commit with proper message
- ✅ Tag creation and pushing to GitHub
- ✅ Release workflow trigger (automatic on tag push)

The GitHub Actions workflow (`.github/workflows/release.yml`) then:
- ✅ Builds the package using `uv build`
- ✅ Creates GitHub release with auto-generated notes
- ✅ Publishes to PyPI
- ✅ Updates Homebrew formula (stable releases only)

### Step 3: Monitor Release Process
1. Verify tag push triggered the workflow (should start automatically within seconds)
2. Monitor workflow progress at: https://github.com/basicmachines-co/basic-memory/actions
3. Watch for successful completion of both jobs:
   - `release` - Builds package and publishes to PyPI
   - `homebrew` - Updates Homebrew formula (stable releases only)
4. Check for any workflow failures and investigate logs if needed

### Step 4: Post-Release Validation

#### GitHub Release
1. Verify GitHub release is created at: https://github.com/basicmachines-co/basic-memory/releases/tag/<version>
2. Check that release notes are auto-generated from commits
3. Validate release assets (`.whl` and `.tar.gz` files are attached)

#### PyPI Publication
1. Verify package published at: https://pypi.org/project/basic-memory/<version>/
2. Test installation: `uv tool install basic-memory`
3. Verify installed version: `basic-memory --version`

#### Homebrew Formula (Stable Releases Only)
1. Check formula update at: https://github.com/basicmachines-co/homebrew-basic-memory
2. Verify formula version matches release
3. Test Homebrew installation: `brew install basicmachines-co/basic-memory/basic-memory`

#### Website Updates

**1. basicmachines.co** (`/Users/drew/code/basicmachines.co`)
   - **Goal**: Update version number displayed on the homepage
   - **Location**: Search for "Basic Memory v0." in the codebase to find version displays
   - **What to update**:
     - Hero section heading that shows "Basic Memory v{VERSION}"
     - "What's New in v{VERSION}" section heading
     - Feature highlights array (look for array of features with title/description)
   - **Process**:
     1. Pull latest from GitHub: `git pull origin main`
     2. Create release branch: `git checkout -b release/v{VERSION}`
     3. Search codebase for current version number (e.g., "v0.16.1")
     4. Update version numbers to new release version
     5. Update feature highlights with 3-5 key features from this release (extract from CHANGELOG.md)
     6. Commit changes: `git commit -m "chore: update to v{VERSION}"`
     7. Push branch: `git push origin release/v{VERSION}`
   - **Deploy**: Follow deployment process for basicmachines.co

**2. docs.basicmemory.com** (`/Users/drew/code/docs.basicmemory.com`)
   - **Goal**: Add new release notes section to the latest-releases page
   - **File**: `src/pages/latest-releases.mdx`
   - **What to do**:
     1. Pull latest from GitHub: `git pull origin main`
     2. Create release branch: `git checkout -b release/v{VERSION}`
     3. Read the existing file to understand the format and structure
     4. Read `/Users/drew/code/basic-memory/CHANGELOG.md` to get release content
     5. Add new release section **at the top** (after MDX imports, before other releases)
     6. Follow the existing pattern:
        - Heading: `## [v{VERSION}](github-link) — YYYY-MM-DD`
        - Focus statement if applicable
        - `<Info>` block with highlights (3-5 key items)
        - Sections for Features, Bug Fixes, Breaking Changes, etc.
        - Link to full changelog at the end
        - Separator `---` between releases
     7. Commit changes: `git commit -m "docs: add v{VERSION} release notes"`
     8. Push branch: `git push origin release/v{VERSION}`
   - **Source content**: Extract and format sections from CHANGELOG.md for this version
   - **Deploy**: Follow deployment process for docs.basicmemory.com

**4. Announce Release**
   - Post to Discord community if significant changes
   - Update social media if major release
   - Notify users via appropriate channels

## Pre-conditions Check
Before starting, verify:
- [ ] All beta testing is complete
- [ ] Critical bugs are fixed
- [ ] Breaking changes are documented
- [ ] CHANGELOG.md is updated (if needed)
- [ ] Version number follows semantic versioning

## Error Handling
- If `just release` fails, examine the error output for specific issues
- If quality checks fail, fix issues and retry
- If changelog entry missing, update CHANGELOG.md and commit before retrying
- If GitHub Actions fail, check workflow logs for debugging

## Success Output
```
🎉 Stable Release v0.13.2 Created Successfully!

🏷️  Tag: v0.13.2
📋 GitHub Release: https://github.com/basicmachines-co/basic-memory/releases/tag/v0.13.2
📦 PyPI: https://pypi.org/project/basic-memory/0.13.2/
🍺 Homebrew: https://github.com/basicmachines-co/homebrew-basic-memory
🚀 GitHub Actions: Completed

Install with pip/uv:
  uv tool install basic-memory

Install with Homebrew:
  brew install basicmachines-co/basic-memory/basic-memory

Users can now upgrade:
  uv tool upgrade basic-memory
  brew upgrade basic-memory
```

## Context
- This creates production releases used by end users
- Must pass all quality gates before proceeding
- Uses the automated justfile target for consistency
- Version is automatically updated in `__init__.py`
- Triggers automated GitHub release with changelog
- Package is published to PyPI for `pip` and `uv` users
- Homebrew formula is automatically updated for stable releases
- Supports multiple installation methods (uv, pip, Homebrew)
```

--------------------------------------------------------------------------------
/src/basic_memory/importers/claude_projects_importer.py:
--------------------------------------------------------------------------------

```python
"""Claude projects import service for Basic Memory."""

import logging
from typing import Any, Dict, Optional

from basic_memory.markdown.schemas import EntityFrontmatter, EntityMarkdown
from basic_memory.importers.base import Importer
from basic_memory.schemas.importer import ProjectImportResult
from basic_memory.importers.utils import clean_filename

logger = logging.getLogger(__name__)


class ClaudeProjectsImporter(Importer[ProjectImportResult]):
    """Service for importing Claude projects."""

    def handle_error(  # pragma: no cover
        self, message: str, error: Optional[Exception] = None
    ) -> ProjectImportResult:
        """Return a failed ProjectImportResult with an error message."""
        error_msg = f"{message}: {error}" if error else message
        return ProjectImportResult(
            import_count={},
            success=False,
            error_message=error_msg,
            documents=0,
            prompts=0,
        )

    async def import_data(
        self, source_data, destination_folder: str, **kwargs: Any
    ) -> ProjectImportResult:
        """Import projects from Claude JSON export.

        Args:
            source_path: Path to the Claude projects.json file.
            destination_folder: Base folder for projects within the project.
            **kwargs: Additional keyword arguments.

        Returns:
            ProjectImportResult containing statistics and status of the import.
        """
        try:
            # Ensure the base folder exists
            if destination_folder:
                await self.ensure_folder_exists(destination_folder)

            projects = source_data

            # Process each project
            docs_imported = 0
            prompts_imported = 0

            for project in projects:
                project_dir = clean_filename(project["name"])

                # Create project directories using FileService with relative path
                docs_dir = (
                    f"{destination_folder}/{project_dir}/docs"
                    if destination_folder
                    else f"{project_dir}/docs"
                )
                await self.file_service.ensure_directory(docs_dir)

                # Import prompt template if it exists
                if prompt_entity := self._format_prompt_markdown(project, destination_folder):
                    # Write file using relative path - FileService handles base_path
                    file_path = f"{prompt_entity.frontmatter.metadata['permalink']}.md"
                    await self.write_entity(prompt_entity, file_path)
                    prompts_imported += 1

                # Import project documents
                for doc in project.get("docs", []):
                    entity = self._format_project_markdown(project, doc, destination_folder)
                    # Write file using relative path - FileService handles base_path
                    file_path = f"{entity.frontmatter.metadata['permalink']}.md"
                    await self.write_entity(entity, file_path)
                    docs_imported += 1

            return ProjectImportResult(
                import_count={"documents": docs_imported, "prompts": prompts_imported},
                success=True,
                documents=docs_imported,
                prompts=prompts_imported,
            )

        except Exception as e:  # pragma: no cover
            logger.exception("Failed to import Claude projects")
            return self.handle_error("Failed to import Claude projects", e)

    def _format_project_markdown(
        self, project: Dict[str, Any], doc: Dict[str, Any], destination_folder: str = ""
    ) -> EntityMarkdown:
        """Format a project document as a Basic Memory entity.

        Args:
            project: Project data.
            doc: Document data.
            destination_folder: Optional destination folder prefix.

        Returns:
            EntityMarkdown instance representing the document.
        """
        # Extract timestamps
        created_at = doc.get("created_at") or project["created_at"]
        modified_at = project["updated_at"]

        # Generate clean names for organization
        project_dir = clean_filename(project["name"])
        doc_file = clean_filename(doc["filename"])

        # Build permalink with optional destination folder prefix
        permalink = (
            f"{destination_folder}/{project_dir}/docs/{doc_file}"
            if destination_folder
            else f"{project_dir}/docs/{doc_file}"
        )

        # Create entity
        entity = EntityMarkdown(
            frontmatter=EntityFrontmatter(
                metadata={
                    "type": "project_doc",
                    "title": doc["filename"],
                    "created": created_at,
                    "modified": modified_at,
                    "permalink": permalink,
                    "project_name": project["name"],
                    "project_uuid": project["uuid"],
                    "doc_uuid": doc["uuid"],
                }
            ),
            content=doc["content"],
        )

        return entity

    def _format_prompt_markdown(
        self, project: Dict[str, Any], destination_folder: str = ""
    ) -> Optional[EntityMarkdown]:
        """Format project prompt template as a Basic Memory entity.

        Args:
            project: Project data.
            destination_folder: Optional destination folder prefix.

        Returns:
            EntityMarkdown instance representing the prompt template, or None if
            no prompt template exists.
        """
        if not project.get("prompt_template"):
            return None

        # Extract timestamps
        created_at = project["created_at"]
        modified_at = project["updated_at"]

        # Generate clean project directory name
        project_dir = clean_filename(project["name"])

        # Build permalink with optional destination folder prefix
        permalink = (
            f"{destination_folder}/{project_dir}/prompt-template"
            if destination_folder
            else f"{project_dir}/prompt-template"
        )

        # Create entity
        entity = EntityMarkdown(
            frontmatter=EntityFrontmatter(
                metadata={
                    "type": "prompt_template",
                    "title": f"Prompt Template: {project['name']}",
                    "created": created_at,
                    "modified": modified_at,
                    "permalink": permalink,
                    "project_name": project["name"],
                    "project_uuid": project["uuid"],
                }
            ),
            content=f"# Prompt Template: {project['name']}\n\n{project['prompt_template']}",
        )

        return entity

```

--------------------------------------------------------------------------------
/src/basic_memory/api/routers/utils.py:
--------------------------------------------------------------------------------

```python
from typing import Optional, List

from basic_memory.repository import EntityRepository
from basic_memory.repository.search_repository import SearchIndexRow
from basic_memory.schemas.memory import (
    EntitySummary,
    ObservationSummary,
    RelationSummary,
    MemoryMetadata,
    GraphContext,
    ContextResult,
)
from basic_memory.schemas.search import SearchItemType, SearchResult
from basic_memory.services import EntityService
from basic_memory.services.context_service import (
    ContextResultRow,
    ContextResult as ServiceContextResult,
)


async def to_graph_context(
    context_result: ServiceContextResult,
    entity_repository: EntityRepository,
    page: Optional[int] = None,
    page_size: Optional[int] = None,
):
    # First pass: collect all entity IDs needed for relations
    entity_ids_needed: set[int] = set()
    for context_item in context_result.results:
        for item in (
            [context_item.primary_result] + context_item.observations + context_item.related_results
        ):
            if item.type == SearchItemType.RELATION:
                if item.from_id:  # pyright: ignore
                    entity_ids_needed.add(item.from_id)  # pyright: ignore
                if item.to_id:
                    entity_ids_needed.add(item.to_id)

    # Batch fetch all entities at once
    entity_lookup: dict[int, str] = {}
    if entity_ids_needed:
        entities = await entity_repository.find_by_ids(list(entity_ids_needed))
        entity_lookup = {e.id: e.title for e in entities}

    # Helper function to convert items to summaries
    def to_summary(item: SearchIndexRow | ContextResultRow):
        match item.type:
            case SearchItemType.ENTITY:
                return EntitySummary(
                    entity_id=item.id,
                    title=item.title,  # pyright: ignore
                    permalink=item.permalink,
                    content=item.content,
                    file_path=item.file_path,
                    created_at=item.created_at,
                )
            case SearchItemType.OBSERVATION:
                return ObservationSummary(
                    observation_id=item.id,
                    entity_id=item.entity_id,  # pyright: ignore
                    title=item.title,  # pyright: ignore
                    file_path=item.file_path,
                    category=item.category,  # pyright: ignore
                    content=item.content,  # pyright: ignore
                    permalink=item.permalink,  # pyright: ignore
                    created_at=item.created_at,
                )
            case SearchItemType.RELATION:
                from_title = entity_lookup.get(item.from_id) if item.from_id else None  # pyright: ignore
                to_title = entity_lookup.get(item.to_id) if item.to_id else None
                return RelationSummary(
                    relation_id=item.id,
                    entity_id=item.entity_id,  # pyright: ignore
                    title=item.title,  # pyright: ignore
                    file_path=item.file_path,
                    permalink=item.permalink,  # pyright: ignore
                    relation_type=item.relation_type,  # pyright: ignore
                    from_entity=from_title,
                    from_entity_id=item.from_id,  # pyright: ignore
                    to_entity=to_title,
                    to_entity_id=item.to_id,
                    created_at=item.created_at,
                )
            case _:  # pragma: no cover
                raise ValueError(f"Unexpected type: {item.type}")

    # Process the hierarchical results
    hierarchical_results = []
    for context_item in context_result.results:
        # Process primary result
        primary_result = to_summary(context_item.primary_result)

        # Process observations (always ObservationSummary, validated by context_service)
        observations = [to_summary(obs) for obs in context_item.observations]

        # Process related results
        related = [to_summary(rel) for rel in context_item.related_results]

        # Add to hierarchical results
        hierarchical_results.append(
            ContextResult(
                primary_result=primary_result,
                observations=observations,  # pyright: ignore[reportArgumentType]
                related_results=related,
            )
        )

    # Create schema metadata from service metadata
    metadata = MemoryMetadata(
        uri=context_result.metadata.uri,
        types=context_result.metadata.types,
        depth=context_result.metadata.depth,
        timeframe=context_result.metadata.timeframe,
        generated_at=context_result.metadata.generated_at,
        primary_count=context_result.metadata.primary_count,
        related_count=context_result.metadata.related_count,
        total_results=context_result.metadata.primary_count + context_result.metadata.related_count,
        total_relations=context_result.metadata.total_relations,
        total_observations=context_result.metadata.total_observations,
    )

    # Return new GraphContext with just hierarchical results
    return GraphContext(
        results=hierarchical_results,
        metadata=metadata,
        page=page,
        page_size=page_size,
    )


async def to_search_results(entity_service: EntityService, results: List[SearchIndexRow]):
    search_results = []
    for r in results:
        entities = await entity_service.get_entities_by_id([r.entity_id, r.from_id, r.to_id])  # pyright: ignore

        # Determine which IDs to set based on type
        entity_id = None
        observation_id = None
        relation_id = None

        if r.type == SearchItemType.ENTITY:
            entity_id = r.id
        elif r.type == SearchItemType.OBSERVATION:
            observation_id = r.id
            entity_id = r.entity_id  # Parent entity
        elif r.type == SearchItemType.RELATION:
            relation_id = r.id
            entity_id = r.entity_id  # Parent entity

        search_results.append(
            SearchResult(
                title=r.title,  # pyright: ignore
                type=r.type,  # pyright: ignore
                permalink=r.permalink,
                score=r.score,  # pyright: ignore
                entity=entities[0].permalink if entities else None,
                content=r.content,
                file_path=r.file_path,
                metadata=r.metadata,
                entity_id=entity_id,
                observation_id=observation_id,
                relation_id=relation_id,
                category=r.category,
                from_entity=entities[0].permalink if entities else None,
                to_entity=entities[1].permalink if len(entities) > 1 else None,
                relation_type=r.relation_type,
            )
        )
    return search_results

```

--------------------------------------------------------------------------------
/tests/repository/test_postgres_search_repository.py:
--------------------------------------------------------------------------------

```python
"""Integration tests for PostgresSearchRepository.

These tests only run in Postgres mode (testcontainers) and ensure that the
Postgres tsvector-backed search implementation remains well covered.
"""

from datetime import datetime, timedelta, timezone

import pytest

from basic_memory.repository.postgres_search_repository import PostgresSearchRepository
from basic_memory.repository.search_index_row import SearchIndexRow
from basic_memory.schemas.search import SearchItemType


pytestmark = pytest.mark.postgres


@pytest.fixture(autouse=True)
def _require_postgres_backend(db_backend):
    """Ensure these tests never run under SQLite."""
    if db_backend != "postgres":
        pytest.skip("PostgresSearchRepository tests require BASIC_MEMORY_TEST_POSTGRES=1")


@pytest.mark.asyncio
async def test_postgres_search_repository_index_and_search(session_maker, test_project):
    repo = PostgresSearchRepository(session_maker, project_id=test_project.id)
    await repo.init_search_index()  # no-op but should be exercised

    now = datetime.now(timezone.utc)
    row = SearchIndexRow(
        project_id=test_project.id,
        id=1,
        title="Coffee Brewing",
        content_stems="coffee brewing pour over",
        content_snippet="coffee brewing snippet",
        permalink="docs/coffee-brewing",
        file_path="docs/coffee-brewing.md",
        type="entity",
        metadata={"entity_type": "note"},
        created_at=now,
        updated_at=now,
    )
    await repo.index_item(row)

    # Basic full-text search
    results = await repo.search(search_text="coffee")
    assert any(r.permalink == "docs/coffee-brewing" for r in results)

    # Boolean query path
    results = await repo.search(search_text="coffee AND brewing")
    assert any(r.permalink == "docs/coffee-brewing" for r in results)

    # Title-only search path
    results = await repo.search(title="Coffee Brewing")
    assert any(r.permalink == "docs/coffee-brewing" for r in results)

    # Exact permalink search
    results = await repo.search(permalink="docs/coffee-brewing")
    assert len(results) == 1

    # Permalink pattern match (LIKE)
    results = await repo.search(permalink_match="docs/coffee*")
    assert any(r.permalink == "docs/coffee-brewing" for r in results)

    # Item type filter
    results = await repo.search(search_item_types=[SearchItemType.ENTITY])
    assert any(r.permalink == "docs/coffee-brewing" for r in results)

    # Entity type filter via metadata JSONB containment
    results = await repo.search(types=["note"])
    assert any(r.permalink == "docs/coffee-brewing" for r in results)

    # Date filter (also exercises order_by_clause)
    results = await repo.search(after_date=now - timedelta(days=1))
    assert any(r.permalink == "docs/coffee-brewing" for r in results)

    # Limit/offset
    results = await repo.search(limit=1, offset=0)
    assert len(results) == 1


@pytest.mark.asyncio
async def test_postgres_search_repository_bulk_index_items_and_prepare_terms(
    session_maker, test_project
):
    repo = PostgresSearchRepository(session_maker, project_id=test_project.id)

    # Empty batch is a no-op
    await repo.bulk_index_items([])

    # Exercise term preparation helpers
    assert "&" in repo._prepare_search_term("coffee AND brewing")
    assert repo._prepare_search_term("coff*") == "coff:*"
    assert repo._prepare_search_term("()&!:") == "NOSPECIALCHARS:*"
    assert repo._prepare_search_term("coffee brewing") == "coffee:* & brewing:*"
    assert repo._prepare_single_term("   ") == "   "
    assert repo._prepare_single_term("coffee", is_prefix=False) == "coffee"

    now = datetime.now(timezone.utc)
    rows = [
        SearchIndexRow(
            project_id=test_project.id,
            id=10,
            title="Pour Over",
            content_stems="pour over coffee",
            content_snippet="pour over snippet",
            permalink="docs/pour-over",
            file_path="docs/pour-over.md",
            type="entity",
            metadata={"entity_type": "note"},
            created_at=now,
            updated_at=now,
        ),
        SearchIndexRow(
            project_id=test_project.id,
            id=11,
            title="French Press",
            content_stems="french press coffee",
            content_snippet="french press snippet",
            permalink="docs/french-press",
            file_path="docs/french-press.md",
            type="entity",
            metadata={"entity_type": "note"},
            created_at=now,
            updated_at=now,
        ),
    ]

    await repo.bulk_index_items(rows)

    results = await repo.search(search_text="coffee")
    permalinks = {r.permalink for r in results}
    assert "docs/pour-over" in permalinks
    assert "docs/french-press" in permalinks


@pytest.mark.asyncio
async def test_postgres_search_repository_wildcard_text_and_permalink_match_exact(
    session_maker, test_project
):
    repo = PostgresSearchRepository(session_maker, project_id=test_project.id)

    now = datetime.now(timezone.utc)
    await repo.index_item(
        SearchIndexRow(
            project_id=test_project.id,
            id=1,
            title="X",
            content_stems="x",
            content_snippet="x",
            permalink="docs/x",
            file_path="docs/x.md",
            type="entity",
            metadata={"entity_type": "note"},
            created_at=now,
            updated_at=now,
        )
    )

    # search_text="*" should not add tsquery conditions (covers the pass branch)
    results = await repo.search(search_text="*")
    assert results

    # permalink_match without '*' uses exact match branch
    results = await repo.search(permalink_match="docs/x")
    assert len(results) == 1


@pytest.mark.asyncio
async def test_postgres_search_repository_tsquery_syntax_error_returns_empty(
    session_maker, test_project
):
    repo = PostgresSearchRepository(session_maker, project_id=test_project.id)

    # Trailing boolean operator creates an invalid tsquery; repository should return []
    results = await repo.search(search_text="coffee AND")
    assert results == []


@pytest.mark.asyncio
async def test_postgres_search_repository_reraises_non_tsquery_db_errors(
    session_maker, test_project
):
    """Dropping the search_index table triggers a non-tsquery DB error which should be re-raised."""
    repo = PostgresSearchRepository(session_maker, project_id=test_project.id)

    from sqlalchemy import text
    from basic_memory import db

    async with db.scoped_session(session_maker) as session:
        await session.execute(text("DROP TABLE search_index"))
        await session.commit()

    with pytest.raises(Exception):
        # Use a non-text query so the generated SQL doesn't include to_tsquery(),
        # ensuring we hit the generic "re-raise other db errors" branch.
        await repo.search(permalink="docs/anything")

```

--------------------------------------------------------------------------------
/src/basic_memory/schemas/project_info.py:
--------------------------------------------------------------------------------

```python
"""Schema for project info response."""

import os
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Any

from pydantic import Field, BaseModel

from basic_memory.utils import generate_permalink


class ProjectStatistics(BaseModel):
    """Statistics about the current project."""

    # Basic counts
    total_entities: int = Field(description="Total number of entities in the knowledge base")
    total_observations: int = Field(description="Total number of observations across all entities")
    total_relations: int = Field(description="Total number of relations between entities")
    total_unresolved_relations: int = Field(
        description="Number of relations with unresolved targets"
    )

    # Entity counts by type
    entity_types: Dict[str, int] = Field(
        description="Count of entities by type (e.g., note, conversation)"
    )

    # Observation counts by category
    observation_categories: Dict[str, int] = Field(
        description="Count of observations by category (e.g., tech, decision)"
    )

    # Relation counts by type
    relation_types: Dict[str, int] = Field(
        description="Count of relations by type (e.g., implements, relates_to)"
    )

    # Graph metrics
    most_connected_entities: List[Dict[str, Any]] = Field(
        description="Entities with the most relations, including their titles and permalinks"
    )
    isolated_entities: int = Field(description="Number of entities with no relations")


class ActivityMetrics(BaseModel):
    """Activity metrics for the current project."""

    # Recent activity
    recently_created: List[Dict[str, Any]] = Field(
        description="Recently created entities with timestamps"
    )
    recently_updated: List[Dict[str, Any]] = Field(
        description="Recently updated entities with timestamps"
    )

    # Growth over time (last 6 months)
    monthly_growth: Dict[str, Dict[str, int]] = Field(
        description="Monthly growth statistics for entities, observations, and relations"
    )


class SystemStatus(BaseModel):
    """System status information."""

    # Version information
    version: str = Field(description="Basic Memory version")

    # Database status
    database_path: str = Field(description="Path to the SQLite database")
    database_size: str = Field(description="Size of the database in human-readable format")

    # Watch service status
    watch_status: Optional[Dict[str, Any]] = Field(
        default=None, description="Watch service status information (if running)"
    )

    # System information
    timestamp: datetime = Field(description="Timestamp when the information was collected")


class ProjectInfoResponse(BaseModel):
    """Response for the project_info tool."""

    # Project configuration
    project_name: str = Field(description="Name of the current project")
    project_path: str = Field(description="Path to the current project files")
    available_projects: Dict[str, Dict[str, Any]] = Field(
        description="Map of configured project names to detailed project information"
    )
    default_project: str = Field(description="Name of the default project")

    # Statistics
    statistics: ProjectStatistics = Field(description="Statistics about the knowledge base")

    # Activity metrics
    activity: ActivityMetrics = Field(description="Activity and growth metrics")

    # System status
    system: SystemStatus = Field(description="System and service status information")


class ProjectInfoRequest(BaseModel):
    """Request model for switching projects."""

    name: str = Field(..., description="Name of the project to switch to")
    path: str = Field(..., description="Path to the project directory")
    set_default: bool = Field(..., description="Set the project as the default")


class WatchEvent(BaseModel):
    timestamp: datetime
    path: str
    action: str  # new, delete, etc
    status: str  # success, error
    checksum: Optional[str]
    error: Optional[str] = None


class WatchServiceState(BaseModel):
    # Service status
    running: bool = False
    start_time: datetime = datetime.now()  # Use directly with Pydantic model
    pid: int = os.getpid()  # Use directly with Pydantic model

    # Stats
    error_count: int = 0
    last_error: Optional[datetime] = None
    last_scan: Optional[datetime] = None

    # File counts
    synced_files: int = 0

    # Recent activity
    recent_events: List[WatchEvent] = []  # Use directly with Pydantic model

    def add_event(
        self,
        path: str,
        action: str,
        status: str,
        checksum: Optional[str] = None,
        error: Optional[str] = None,
    ) -> WatchEvent:  # pragma: no cover
        event = WatchEvent(
            timestamp=datetime.now(),
            path=path,
            action=action,
            status=status,
            checksum=checksum,
            error=error,
        )
        self.recent_events.insert(0, event)
        self.recent_events = self.recent_events[:100]  # Keep last 100
        return event

    def record_error(self, error: str):  # pragma: no cover
        self.error_count += 1
        self.add_event(path="", action="sync", status="error", error=error)
        self.last_error = datetime.now()


class ProjectWatchStatus(BaseModel):
    """Project with its watch status."""

    name: str = Field(..., description="Name of the project")
    path: str = Field(..., description="Path to the project")
    watch_status: Optional[WatchServiceState] = Field(
        None, description="Watch status information for the project"
    )


class ProjectItem(BaseModel):
    """Simple representation of a project."""

    id: int
    external_id: str  # UUID string for API references (required after migration)
    name: str
    path: str
    is_default: bool = False

    @property
    def permalink(self) -> str:  # pragma: no cover
        return generate_permalink(self.name)

    @property
    def home(self) -> Path:  # pragma: no cover
        return Path(self.path).expanduser()

    @property
    def project_url(self) -> str:  # pragma: no cover
        return f"/{generate_permalink(self.name)}"


class ProjectList(BaseModel):
    """Response model for listing projects."""

    projects: List[ProjectItem]
    default_project: str


class ProjectStatusResponse(BaseModel):
    """Response model for switching projects."""

    message: str = Field(..., description="Status message about the project switch")
    status: str = Field(..., description="Status of the switch (success or error)")
    default: bool = Field(..., description="True if the project was set as the default")
    old_project: Optional[ProjectItem] = Field(
        None, description="Information about the project being switched from"
    )
    new_project: Optional[ProjectItem] = Field(
        None, description="Information about the project being switched to"
    )

```

--------------------------------------------------------------------------------
/src/basic_memory/alembic/env.py:
--------------------------------------------------------------------------------

```python
"""Alembic environment configuration."""

import asyncio
import os
from logging.config import fileConfig

# Allow nested event loops (needed for pytest-asyncio and other async contexts)
# Note: nest_asyncio doesn't work with uvloop or Python 3.14+, so we handle those cases separately
import sys

if sys.version_info < (3, 14):
    try:
        import nest_asyncio

        nest_asyncio.apply()
    except (ImportError, ValueError):
        # nest_asyncio not available or can't patch this loop type (e.g., uvloop)
        pass
# For Python 3.14+, we rely on the thread-based fallback in run_migrations_online()

from sqlalchemy import engine_from_config, pool
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine

from alembic import context

from basic_memory.config import ConfigManager

# Trigger: only set test env when actually running under pytest
# Why: alembic/env.py is imported during normal operations (MCP server startup, migrations)
#      but we only want test behavior during actual test runs
# Outcome: prevents is_test_env from returning True in production, enabling watch service
if os.getenv("PYTEST_CURRENT_TEST") is not None:
    os.environ["BASIC_MEMORY_ENV"] = "test"

# Import after setting environment variable  # noqa: E402
from basic_memory.models import Base  # noqa: E402

# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config

# Load app config - this will read environment variables (BASIC_MEMORY_DATABASE_BACKEND, etc.)
# due to Pydantic's env_prefix="BASIC_MEMORY_" setting
app_config = ConfigManager().config

# Set the SQLAlchemy URL based on database backend configuration
# If the URL is already set in config (e.g., from run_migrations), use that
# Otherwise, get it from app config
# Note: alembic.ini has a placeholder URL "driver://user:pass@localhost/dbname" that we need to override
current_url = config.get_main_option("sqlalchemy.url")
if not current_url or current_url == "driver://user:pass@localhost/dbname":
    from basic_memory.db import DatabaseType

    sqlalchemy_url = DatabaseType.get_db_url(
        app_config.database_path, DatabaseType.FILESYSTEM, app_config
    )
    config.set_main_option("sqlalchemy.url", sqlalchemy_url)

# Interpret the config file for Python logging.
if config.config_file_name is not None:
    fileConfig(config.config_file_name)

# add your model's MetaData object here
# for 'autogenerate' support
target_metadata = Base.metadata


# Add this function to tell Alembic what to include/exclude
def include_object(object, name, type_, reflected, compare_to):
    # Ignore SQLite FTS tables
    if type_ == "table" and name.startswith("search_index"):
        return False
    return True


def run_migrations_offline() -> None:
    """Run migrations in 'offline' mode.

    This configures the context with just a URL
    and not an Engine, though an Engine is acceptable
    here as well.  By skipping the Engine creation
    we don't even need a DBAPI to be available.

    Calls to context.execute() here emit the given string to the
    script output.
    """
    url = config.get_main_option("sqlalchemy.url")
    context.configure(
        url=url,
        target_metadata=target_metadata,
        literal_binds=True,
        dialect_opts={"paramstyle": "named"},
        include_object=include_object,
        render_as_batch=True,
    )

    with context.begin_transaction():
        context.run_migrations()


def do_run_migrations(connection):
    """Execute migrations with the given connection."""
    context.configure(
        connection=connection,
        target_metadata=target_metadata,
        include_object=include_object,
        render_as_batch=True,
        compare_type=True,
    )
    with context.begin_transaction():
        context.run_migrations()


async def run_async_migrations(connectable):
    """Run migrations asynchronously with AsyncEngine."""
    async with connectable.connect() as connection:
        await connection.run_sync(do_run_migrations)
    await connectable.dispose()


def run_migrations_online() -> None:
    """Run migrations in 'online' mode.

    Supports both sync engines (SQLite) and async engines (PostgreSQL with asyncpg).
    """
    # Check if a connection/engine was provided (e.g., from run_migrations)
    connectable = context.config.attributes.get("connection", None)

    if connectable is None:
        # No connection provided, create engine from config
        url = context.config.get_main_option("sqlalchemy.url")

        # Check if it's an async URL (sqlite+aiosqlite or postgresql+asyncpg)
        if url and ("+asyncpg" in url or "+aiosqlite" in url):
            # Create async engine for asyncpg or aiosqlite
            connectable = create_async_engine(
                url,
                poolclass=pool.NullPool,
                future=True,
            )
        else:
            # Create sync engine for regular sqlite or postgresql
            connectable = engine_from_config(
                context.config.get_section(context.config.config_ini_section, {}),
                prefix="sqlalchemy.",
                poolclass=pool.NullPool,
            )

    # Handle async engines (PostgreSQL with asyncpg)
    if isinstance(connectable, AsyncEngine):
        # Try to run async migrations
        # nest_asyncio allows asyncio.run() from within event loops, but doesn't work with uvloop
        try:
            asyncio.run(run_async_migrations(connectable))
        except RuntimeError as e:
            if "cannot be called from a running event loop" in str(e):
                # We're in a running event loop (likely uvloop) - need to use a different approach
                # Create a new thread to run the async migrations
                import concurrent.futures

                def run_in_thread():
                    """Run async migrations in a new event loop in a separate thread."""
                    new_loop = asyncio.new_event_loop()
                    asyncio.set_event_loop(new_loop)
                    try:
                        new_loop.run_until_complete(run_async_migrations(connectable))
                    finally:
                        new_loop.close()

                with concurrent.futures.ThreadPoolExecutor() as executor:
                    future = executor.submit(run_in_thread)
                    future.result()  # Wait for completion and re-raise any exceptions
            else:
                raise
    else:
        # Handle sync engines (SQLite) or sync connections
        if hasattr(connectable, "connect"):
            # It's an engine, get a connection
            with connectable.connect() as connection:
                do_run_migrations(connection)
        else:
            # It's already a connection
            do_run_migrations(connectable)


if context.is_offline_mode():
    run_migrations_offline()
else:
    run_migrations_online()

```

--------------------------------------------------------------------------------
/tests/cli/test_import_claude_conversations.py:
--------------------------------------------------------------------------------

```python
"""Tests for import_claude command (chat conversations)."""

import json

import pytest
from typer.testing import CliRunner

from basic_memory.cli.app import app
from basic_memory.cli.commands import import_claude_conversations  # noqa
from basic_memory.config import get_project_config

# Set up CLI runner
runner = CliRunner()


@pytest.fixture
def sample_conversation():
    """Sample conversation data for testing."""
    return {
        "uuid": "test-uuid",
        "name": "Test Conversation",
        "created_at": "2025-01-05T20:55:32.499880+00:00",
        "updated_at": "2025-01-05T20:56:39.477600+00:00",
        "chat_messages": [
            {
                "uuid": "msg-1",
                "text": "Hello, this is a test",
                "sender": "human",
                "created_at": "2025-01-05T20:55:32.499880+00:00",
                "content": [{"type": "text", "text": "Hello, this is a test"}],
            },
            {
                "uuid": "msg-2",
                "text": "Response to test",
                "sender": "assistant",
                "created_at": "2025-01-05T20:55:40.123456+00:00",
                "content": [{"type": "text", "text": "Response to test"}],
            },
        ],
    }


@pytest.fixture
def sample_conversations_json(tmp_path, sample_conversation):
    """Create a sample conversations.json file."""
    json_file = tmp_path / "conversations.json"
    with open(json_file, "w", encoding="utf-8") as f:
        json.dump([sample_conversation], f)
    return json_file


def test_import_conversations_command_file_not_found(tmp_path):
    """Test error handling for nonexistent file."""
    nonexistent = tmp_path / "nonexistent.json"
    result = runner.invoke(app, ["import", "claude", "conversations", str(nonexistent)])
    assert result.exit_code == 1
    assert "File not found" in result.output


def test_import_conversations_command_success(tmp_path, sample_conversations_json, monkeypatch):
    """Test successful conversation import via command."""
    # Set up test environment
    monkeypatch.setenv("HOME", str(tmp_path))

    # Run import
    result = runner.invoke(
        app, ["import", "claude", "conversations", str(sample_conversations_json)]
    )
    assert result.exit_code == 0
    assert "Import complete" in result.output
    assert "Imported 1 conversations" in result.output
    assert "Containing 2 messages" in result.output


def test_import_conversations_command_invalid_json(tmp_path):
    """Test error handling for invalid JSON."""
    # Create invalid JSON file
    invalid_file = tmp_path / "invalid.json"
    invalid_file.write_text("not json")

    result = runner.invoke(app, ["import", "claude", "conversations", str(invalid_file)])
    assert result.exit_code == 1
    assert "Error during import" in result.output


def test_import_conversations_with_custom_folder(tmp_path, sample_conversations_json, monkeypatch):
    """Test import with custom conversations folder."""
    # Set up test environment
    config = get_project_config()
    config.home = tmp_path
    conversations_folder = "chats"

    # Run import
    result = runner.invoke(
        app,
        [
            "import",
            "claude",
            "conversations",
            str(sample_conversations_json),
            "--folder",
            conversations_folder,
        ],
    )
    assert result.exit_code == 0

    # Check files in custom folder
    conv_path = tmp_path / conversations_folder / "20250105-Test_Conversation.md"
    assert conv_path.exists()


def test_import_conversation_with_attachments(tmp_path):
    """Test importing conversation with attachments."""
    # Create conversation with attachments
    conversation = {
        "uuid": "test-uuid",
        "name": "Test With Attachments",
        "created_at": "2025-01-05T20:55:32.499880+00:00",
        "updated_at": "2025-01-05T20:56:39.477600+00:00",
        "chat_messages": [
            {
                "uuid": "msg-1",
                "text": "Here's a file",
                "sender": "human",
                "created_at": "2025-01-05T20:55:32.499880+00:00",
                "content": [{"type": "text", "text": "Here's a file"}],
                "attachments": [
                    {"file_name": "test.txt", "extracted_content": "Test file content"}
                ],
            }
        ],
    }

    json_file = tmp_path / "with_attachments.json"
    with open(json_file, "w", encoding="utf-8") as f:
        json.dump([conversation], f)

    config = get_project_config()
    # Set up environment
    config.home = tmp_path

    # Run import
    result = runner.invoke(app, ["import", "claude", "conversations", str(json_file)])
    assert result.exit_code == 0

    # Check attachment formatting
    conv_path = tmp_path / "conversations/20250105-Test_With_Attachments.md"
    content = conv_path.read_text(encoding="utf-8")
    assert "**Attachment: test.txt**" in content
    assert "```" in content
    assert "Test file content" in content


def test_import_conversation_with_none_text_values(tmp_path):
    """Test importing conversation with None text values in content array (issue #236)."""
    # Create conversation with None text values
    conversation = {
        "uuid": "test-uuid",
        "name": "Test With None Text",
        "created_at": "2025-01-05T20:55:32.499880+00:00",
        "updated_at": "2025-01-05T20:56:39.477600+00:00",
        "chat_messages": [
            {
                "uuid": "msg-1",
                "text": None,
                "sender": "human",
                "created_at": "2025-01-05T20:55:32.499880+00:00",
                "content": [
                    {"type": "text", "text": "Valid text here"},
                    {"type": "text", "text": None},  # This caused the TypeError
                    {"type": "text", "text": "More valid text"},
                ],
            },
            {
                "uuid": "msg-2",
                "text": None,
                "sender": "assistant",
                "created_at": "2025-01-05T20:55:40.123456+00:00",
                "content": [
                    {"type": "text", "text": None},  # All None case
                    {"type": "text", "text": None},
                ],
            },
        ],
    }

    json_file = tmp_path / "with_none_text.json"
    with open(json_file, "w", encoding="utf-8") as f:
        json.dump([conversation], f)

    config = get_project_config()
    config.home = tmp_path

    # Run import - should not fail with TypeError
    result = runner.invoke(app, ["import", "claude", "conversations", str(json_file)])
    assert result.exit_code == 0

    # Check that valid text is preserved and None values are filtered out
    conv_path = tmp_path / "conversations/20250105-Test_With_None_Text.md"
    assert conv_path.exists()
    content = conv_path.read_text(encoding="utf-8")
    assert "Valid text here" in content
    assert "More valid text" in content

```

--------------------------------------------------------------------------------
/src/basic_memory/mcp/tools/chatgpt_tools.py:
--------------------------------------------------------------------------------

```python
"""ChatGPT-compatible MCP tools for Basic Memory.

These adapters expose Basic Memory's search/fetch functionality using the exact
tool names and response structure OpenAI's MCP clients expect: each call returns
a list containing a single `{"type": "text", "text": "{...json...}"}` item.
"""

import json
from typing import Any, Dict, List, Optional
from loguru import logger
from fastmcp import Context

from basic_memory.mcp.server import mcp
from basic_memory.mcp.tools.search import search_notes
from basic_memory.mcp.tools.read_note import read_note
from basic_memory.schemas.search import SearchResponse
from basic_memory.config import ConfigManager
from basic_memory.telemetry import track_mcp_tool


def _format_search_results_for_chatgpt(results: SearchResponse) -> List[Dict[str, Any]]:
    """Format search results according to ChatGPT's expected schema.

    Returns a list of result objects with id, title, and url fields.
    """
    formatted_results = []

    for result in results.results:
        formatted_result = {
            "id": result.permalink or f"doc-{len(formatted_results)}",
            "title": result.title if result.title and result.title.strip() else "Untitled",
            "url": result.permalink or "",
        }
        formatted_results.append(formatted_result)

    return formatted_results


def _format_document_for_chatgpt(
    content: str, identifier: str, title: Optional[str] = None
) -> Dict[str, Any]:
    """Format document content according to ChatGPT's expected schema.

    Returns a document object with id, title, text, url, and metadata fields.
    """
    # Extract title from markdown content if not provided
    if not title and isinstance(content, str):
        lines = content.split("\n")
        if lines and lines[0].startswith("# "):
            title = lines[0][2:].strip()
        else:
            title = identifier.split("/")[-1].replace("-", " ").title()

    # Ensure title is never None
    if not title:
        title = "Untitled Document"

    # Handle error cases
    if isinstance(content, str) and content.lstrip().startswith("# Note Not Found"):
        return {
            "id": identifier,
            "title": title or "Document Not Found",
            "text": content,
            "url": identifier,
            "metadata": {"error": "Document not found"},
        }

    return {
        "id": identifier,
        "title": title or "Untitled Document",
        "text": content,
        "url": identifier,
        "metadata": {"format": "markdown"},
    }


@mcp.tool(description="Search for content across the knowledge base")
async def search(
    query: str,
    context: Context | None = None,
) -> List[Dict[str, Any]]:
    """ChatGPT/OpenAI MCP search adapter returning a single text content item.

    Args:
        query: Search query (full-text syntax supported by `search_notes`)
        context: Optional FastMCP context passed through for auth/session data

    Returns:
        List with one dict: `{ "type": "text", "text": "{...JSON...}" }`
        where the JSON body contains `results`, `total_count`, and echo of `query`.
    """
    track_mcp_tool("search")
    logger.info(f"ChatGPT search request: query='{query}'")

    try:
        # ChatGPT tools don't expose project parameter, so use default project
        config = ConfigManager().config
        default_project = config.default_project

        # Call underlying search_notes with sensible defaults for ChatGPT
        results = await search_notes.fn(
            query=query,
            project=default_project,  # Use default project for ChatGPT
            page=1,
            page_size=10,  # Reasonable default for ChatGPT consumption
            search_type="text",  # Default to full-text search
            context=context,
        )

        # Handle string error responses from search_notes
        if isinstance(results, str):
            logger.warning(f"Search failed with error: {results[:100]}...")
            search_results = {
                "results": [],
                "error": "Search failed",
                "error_details": results[:500],  # Truncate long error messages
            }
        else:
            # Format successful results for ChatGPT
            formatted_results = _format_search_results_for_chatgpt(results)
            search_results = {
                "results": formatted_results,
                "total_count": len(results.results),  # Use actual count from results
                "query": query,
            }
            logger.info(f"Search completed: {len(formatted_results)} results returned")

        # Return in MCP content array format as required by OpenAI
        return [{"type": "text", "text": json.dumps(search_results, ensure_ascii=False)}]

    except Exception as e:
        logger.error(f"ChatGPT search failed for query '{query}': {e}")
        error_results = {
            "results": [],
            "error": "Internal search error",
            "error_message": str(e)[:200],
        }
        return [{"type": "text", "text": json.dumps(error_results, ensure_ascii=False)}]


@mcp.tool(description="Fetch the full contents of a search result document")
async def fetch(
    id: str,
    context: Context | None = None,
) -> List[Dict[str, Any]]:
    """ChatGPT/OpenAI MCP fetch adapter returning a single text content item.

    Args:
        id: Document identifier (permalink, title, or memory URL)
        context: Optional FastMCP context passed through for auth/session data

    Returns:
        List with one dict: `{ "type": "text", "text": "{...JSON...}" }`
        where the JSON body includes `id`, `title`, `text`, `url`, and metadata.
    """
    track_mcp_tool("fetch")
    logger.info(f"ChatGPT fetch request: id='{id}'")

    try:
        # ChatGPT tools don't expose project parameter, so use default project
        config = ConfigManager().config
        default_project = config.default_project

        # Call underlying read_note function
        content = await read_note.fn(
            identifier=id,
            project=default_project,  # Use default project for ChatGPT
            page=1,
            page_size=10,  # Default pagination
            context=context,
        )

        # Format the document for ChatGPT
        document = _format_document_for_chatgpt(content, id)

        logger.info(f"Fetch completed: id='{id}', content_length={len(document.get('text', ''))}")

        # Return in MCP content array format as required by OpenAI
        return [{"type": "text", "text": json.dumps(document, ensure_ascii=False)}]

    except Exception as e:
        logger.error(f"ChatGPT fetch failed for id '{id}': {e}")
        error_document = {
            "id": id,
            "title": "Fetch Error",
            "text": f"Failed to fetch document: {str(e)[:200]}",
            "url": id,
            "metadata": {"error": "Fetch failed"},
        }
        return [{"type": "text", "text": json.dumps(error_document, ensure_ascii=False)}]

```
Page 4/19FirstPrevNextLast