#
tokens: 49792/50000 41/416 files (page 3/19)
lines: off (toggle) GitHub
raw markdown copy
This is page 3 of 19. Use http://codebase.md/basicmachines-co/basic-memory?page={x} to view the full context.

# Directory Structure

```
├── .claude
│   ├── commands
│   │   ├── release
│   │   │   ├── beta.md
│   │   │   ├── changelog.md
│   │   │   ├── release-check.md
│   │   │   └── release.md
│   │   ├── spec.md
│   │   └── test-live.md
│   └── settings.json
├── .dockerignore
├── .env.example
├── .github
│   ├── dependabot.yml
│   ├── ISSUE_TEMPLATE
│   │   ├── bug_report.md
│   │   ├── config.yml
│   │   ├── documentation.md
│   │   └── feature_request.md
│   └── workflows
│       ├── claude-code-review.yml
│       ├── claude-issue-triage.yml
│       ├── claude.yml
│       ├── dev-release.yml
│       ├── docker.yml
│       ├── pr-title.yml
│       ├── release.yml
│       └── test.yml
├── .gitignore
├── .python-version
├── CHANGELOG.md
├── CITATION.cff
├── CLA.md
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── docker-compose-postgres.yml
├── docker-compose.yml
├── Dockerfile
├── docs
│   ├── ai-assistant-guide-extended.md
│   ├── ARCHITECTURE.md
│   ├── character-handling.md
│   ├── cloud-cli.md
│   ├── Docker.md
│   └── testing-coverage.md
├── justfile
├── LICENSE
├── llms-install.md
├── pyproject.toml
├── README.md
├── SECURITY.md
├── smithery.yaml
├── specs
│   ├── SPEC-1 Specification-Driven Development Process.md
│   ├── SPEC-10 Unified Deployment Workflow and Event Tracking.md
│   ├── SPEC-11 Basic Memory API Performance Optimization.md
│   ├── SPEC-12 OpenTelemetry Observability.md
│   ├── SPEC-13 CLI Authentication with Subscription Validation.md
│   ├── SPEC-14 Cloud Git Versioning & GitHub Backup.md
│   ├── SPEC-14- Cloud Git Versioning & GitHub Backup.md
│   ├── SPEC-15 Configuration Persistence via Tigris for Cloud Tenants.md
│   ├── SPEC-16 MCP Cloud Service Consolidation.md
│   ├── SPEC-17 Semantic Search with ChromaDB.md
│   ├── SPEC-18 AI Memory Management Tool.md
│   ├── SPEC-19 Sync Performance and Memory Optimization.md
│   ├── SPEC-2 Slash Commands Reference.md
│   ├── SPEC-20 Simplified Project-Scoped Rclone Sync.md
│   ├── SPEC-3 Agent Definitions.md
│   ├── SPEC-4 Notes Web UI Component Architecture.md
│   ├── SPEC-5 CLI Cloud Upload via WebDAV.md
│   ├── SPEC-6 Explicit Project Parameter Architecture.md
│   ├── SPEC-7 POC to spike Tigris Turso for local access to cloud data.md
│   ├── SPEC-8 TigrisFS Integration.md
│   ├── SPEC-9 Multi-Project Bidirectional Sync Architecture.md
│   ├── SPEC-9 Signed Header Tenant Information.md
│   └── SPEC-9-1 Follow-Ups- Conflict, Sync, and Observability.md
├── src
│   └── basic_memory
│       ├── __init__.py
│       ├── alembic
│       │   ├── alembic.ini
│       │   ├── env.py
│       │   ├── migrations.py
│       │   ├── script.py.mako
│       │   └── versions
│       │       ├── 314f1ea54dc4_add_postgres_full_text_search_support_.py
│       │       ├── 3dae7c7b1564_initial_schema.py
│       │       ├── 502b60eaa905_remove_required_from_entity_permalink.py
│       │       ├── 5fe1ab1ccebe_add_projects_table.py
│       │       ├── 647e7a75e2cd_project_constraint_fix.py
│       │       ├── 6830751f5fb6_merge_multiple_heads.py
│       │       ├── 9d9c1cb7d8f5_add_mtime_and_size_columns_to_entity_.py
│       │       ├── a1b2c3d4e5f6_fix_project_foreign_keys.py
│       │       ├── a2b3c4d5e6f7_add_search_index_entity_cascade.py
│       │       ├── b3c3938bacdb_relation_to_name_unique_index.py
│       │       ├── cc7172b46608_update_search_index_schema.py
│       │       ├── e7e1f4367280_add_scan_watermark_tracking_to_project.py
│       │       ├── f8a9b2c3d4e5_add_pg_trgm_for_fuzzy_link_resolution.py
│       │       └── g9a0b3c4d5e6_add_external_id_to_project_and_entity.py
│       ├── api
│       │   ├── __init__.py
│       │   ├── app.py
│       │   ├── container.py
│       │   ├── routers
│       │   │   ├── __init__.py
│       │   │   ├── directory_router.py
│       │   │   ├── importer_router.py
│       │   │   ├── knowledge_router.py
│       │   │   ├── management_router.py
│       │   │   ├── memory_router.py
│       │   │   ├── project_router.py
│       │   │   ├── prompt_router.py
│       │   │   ├── resource_router.py
│       │   │   ├── search_router.py
│       │   │   └── utils.py
│       │   ├── template_loader.py
│       │   └── v2
│       │       ├── __init__.py
│       │       └── routers
│       │           ├── __init__.py
│       │           ├── directory_router.py
│       │           ├── importer_router.py
│       │           ├── knowledge_router.py
│       │           ├── memory_router.py
│       │           ├── project_router.py
│       │           ├── prompt_router.py
│       │           ├── resource_router.py
│       │           └── search_router.py
│       ├── cli
│       │   ├── __init__.py
│       │   ├── app.py
│       │   ├── auth.py
│       │   ├── commands
│       │   │   ├── __init__.py
│       │   │   ├── cloud
│       │   │   │   ├── __init__.py
│       │   │   │   ├── api_client.py
│       │   │   │   ├── bisync_commands.py
│       │   │   │   ├── cloud_utils.py
│       │   │   │   ├── core_commands.py
│       │   │   │   ├── rclone_commands.py
│       │   │   │   ├── rclone_config.py
│       │   │   │   ├── rclone_installer.py
│       │   │   │   ├── upload_command.py
│       │   │   │   └── upload.py
│       │   │   ├── command_utils.py
│       │   │   ├── db.py
│       │   │   ├── format.py
│       │   │   ├── import_chatgpt.py
│       │   │   ├── import_claude_conversations.py
│       │   │   ├── import_claude_projects.py
│       │   │   ├── import_memory_json.py
│       │   │   ├── mcp.py
│       │   │   ├── project.py
│       │   │   ├── status.py
│       │   │   ├── telemetry.py
│       │   │   └── tool.py
│       │   ├── container.py
│       │   └── main.py
│       ├── config.py
│       ├── db.py
│       ├── deps
│       │   ├── __init__.py
│       │   ├── config.py
│       │   ├── db.py
│       │   ├── importers.py
│       │   ├── projects.py
│       │   ├── repositories.py
│       │   └── services.py
│       ├── deps.py
│       ├── file_utils.py
│       ├── ignore_utils.py
│       ├── importers
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── chatgpt_importer.py
│       │   ├── claude_conversations_importer.py
│       │   ├── claude_projects_importer.py
│       │   ├── memory_json_importer.py
│       │   └── utils.py
│       ├── markdown
│       │   ├── __init__.py
│       │   ├── entity_parser.py
│       │   ├── markdown_processor.py
│       │   ├── plugins.py
│       │   ├── schemas.py
│       │   └── utils.py
│       ├── mcp
│       │   ├── __init__.py
│       │   ├── async_client.py
│       │   ├── clients
│       │   │   ├── __init__.py
│       │   │   ├── directory.py
│       │   │   ├── knowledge.py
│       │   │   ├── memory.py
│       │   │   ├── project.py
│       │   │   ├── resource.py
│       │   │   └── search.py
│       │   ├── container.py
│       │   ├── project_context.py
│       │   ├── prompts
│       │   │   ├── __init__.py
│       │   │   ├── ai_assistant_guide.py
│       │   │   ├── continue_conversation.py
│       │   │   ├── recent_activity.py
│       │   │   ├── search.py
│       │   │   └── utils.py
│       │   ├── resources
│       │   │   ├── ai_assistant_guide.md
│       │   │   └── project_info.py
│       │   ├── server.py
│       │   └── tools
│       │       ├── __init__.py
│       │       ├── build_context.py
│       │       ├── canvas.py
│       │       ├── chatgpt_tools.py
│       │       ├── delete_note.py
│       │       ├── edit_note.py
│       │       ├── list_directory.py
│       │       ├── move_note.py
│       │       ├── project_management.py
│       │       ├── read_content.py
│       │       ├── read_note.py
│       │       ├── recent_activity.py
│       │       ├── search.py
│       │       ├── utils.py
│       │       ├── view_note.py
│       │       └── write_note.py
│       ├── models
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── knowledge.py
│       │   ├── project.py
│       │   └── search.py
│       ├── project_resolver.py
│       ├── repository
│       │   ├── __init__.py
│       │   ├── entity_repository.py
│       │   ├── observation_repository.py
│       │   ├── postgres_search_repository.py
│       │   ├── project_info_repository.py
│       │   ├── project_repository.py
│       │   ├── relation_repository.py
│       │   ├── repository.py
│       │   ├── search_index_row.py
│       │   ├── search_repository_base.py
│       │   ├── search_repository.py
│       │   └── sqlite_search_repository.py
│       ├── runtime.py
│       ├── schemas
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── cloud.py
│       │   ├── delete.py
│       │   ├── directory.py
│       │   ├── importer.py
│       │   ├── memory.py
│       │   ├── project_info.py
│       │   ├── prompt.py
│       │   ├── request.py
│       │   ├── response.py
│       │   ├── search.py
│       │   ├── sync_report.py
│       │   └── v2
│       │       ├── __init__.py
│       │       ├── entity.py
│       │       └── resource.py
│       ├── services
│       │   ├── __init__.py
│       │   ├── context_service.py
│       │   ├── directory_service.py
│       │   ├── entity_service.py
│       │   ├── exceptions.py
│       │   ├── file_service.py
│       │   ├── initialization.py
│       │   ├── link_resolver.py
│       │   ├── project_service.py
│       │   ├── search_service.py
│       │   └── service.py
│       ├── sync
│       │   ├── __init__.py
│       │   ├── background_sync.py
│       │   ├── coordinator.py
│       │   ├── sync_service.py
│       │   └── watch_service.py
│       ├── telemetry.py
│       ├── templates
│       │   └── prompts
│       │       ├── continue_conversation.hbs
│       │       └── search.hbs
│       └── utils.py
├── test-int
│   ├── BENCHMARKS.md
│   ├── cli
│   │   ├── test_project_commands_integration.py
│   │   └── test_version_integration.py
│   ├── conftest.py
│   ├── mcp
│   │   ├── test_build_context_underscore.py
│   │   ├── test_build_context_validation.py
│   │   ├── test_chatgpt_tools_integration.py
│   │   ├── test_default_project_mode_integration.py
│   │   ├── test_delete_note_integration.py
│   │   ├── test_edit_note_integration.py
│   │   ├── test_lifespan_shutdown_sync_task_cancellation_integration.py
│   │   ├── test_list_directory_integration.py
│   │   ├── test_move_note_integration.py
│   │   ├── test_project_management_integration.py
│   │   ├── test_project_state_sync_integration.py
│   │   ├── test_read_content_integration.py
│   │   ├── test_read_note_integration.py
│   │   ├── test_search_integration.py
│   │   ├── test_single_project_mcp_integration.py
│   │   └── test_write_note_integration.py
│   ├── test_db_wal_mode.py
│   └── test_disable_permalinks_integration.py
├── tests
│   ├── __init__.py
│   ├── api
│   │   ├── conftest.py
│   │   ├── test_api_container.py
│   │   ├── test_async_client.py
│   │   ├── test_continue_conversation_template.py
│   │   ├── test_directory_router.py
│   │   ├── test_importer_router.py
│   │   ├── test_knowledge_router.py
│   │   ├── test_management_router.py
│   │   ├── test_memory_router.py
│   │   ├── test_project_router_operations.py
│   │   ├── test_project_router.py
│   │   ├── test_prompt_router.py
│   │   ├── test_relation_background_resolution.py
│   │   ├── test_resource_router.py
│   │   ├── test_search_router.py
│   │   ├── test_search_template.py
│   │   ├── test_template_loader_helpers.py
│   │   ├── test_template_loader.py
│   │   └── v2
│   │       ├── __init__.py
│   │       ├── conftest.py
│   │       ├── test_directory_router.py
│   │       ├── test_importer_router.py
│   │       ├── test_knowledge_router.py
│   │       ├── test_memory_router.py
│   │       ├── test_project_router.py
│   │       ├── test_prompt_router.py
│   │       ├── test_resource_router.py
│   │       └── test_search_router.py
│   ├── cli
│   │   ├── cloud
│   │   │   ├── test_cloud_api_client_and_utils.py
│   │   │   ├── test_rclone_config_and_bmignore_filters.py
│   │   │   └── test_upload_path.py
│   │   ├── conftest.py
│   │   ├── test_auth_cli_auth.py
│   │   ├── test_cli_container.py
│   │   ├── test_cli_exit.py
│   │   ├── test_cli_tool_exit.py
│   │   ├── test_cli_tools.py
│   │   ├── test_cloud_authentication.py
│   │   ├── test_ignore_utils.py
│   │   ├── test_import_chatgpt.py
│   │   ├── test_import_claude_conversations.py
│   │   ├── test_import_claude_projects.py
│   │   ├── test_import_memory_json.py
│   │   ├── test_project_add_with_local_path.py
│   │   └── test_upload.py
│   ├── conftest.py
│   ├── db
│   │   └── test_issue_254_foreign_key_constraints.py
│   ├── importers
│   │   ├── test_conversation_indexing.py
│   │   ├── test_importer_base.py
│   │   └── test_importer_utils.py
│   ├── markdown
│   │   ├── __init__.py
│   │   ├── test_date_frontmatter_parsing.py
│   │   ├── test_entity_parser_error_handling.py
│   │   ├── test_entity_parser.py
│   │   ├── test_markdown_plugins.py
│   │   ├── test_markdown_processor.py
│   │   ├── test_observation_edge_cases.py
│   │   ├── test_parser_edge_cases.py
│   │   ├── test_relation_edge_cases.py
│   │   └── test_task_detection.py
│   ├── mcp
│   │   ├── clients
│   │   │   ├── __init__.py
│   │   │   └── test_clients.py
│   │   ├── conftest.py
│   │   ├── test_async_client_modes.py
│   │   ├── test_mcp_container.py
│   │   ├── test_obsidian_yaml_formatting.py
│   │   ├── test_permalink_collision_file_overwrite.py
│   │   ├── test_project_context.py
│   │   ├── test_prompts.py
│   │   ├── test_recent_activity_prompt_modes.py
│   │   ├── test_resources.py
│   │   ├── test_server_lifespan_branches.py
│   │   ├── test_tool_build_context.py
│   │   ├── test_tool_canvas.py
│   │   ├── test_tool_delete_note.py
│   │   ├── test_tool_edit_note.py
│   │   ├── test_tool_list_directory.py
│   │   ├── test_tool_move_note.py
│   │   ├── test_tool_project_management.py
│   │   ├── test_tool_read_content.py
│   │   ├── test_tool_read_note.py
│   │   ├── test_tool_recent_activity.py
│   │   ├── test_tool_resource.py
│   │   ├── test_tool_search.py
│   │   ├── test_tool_utils.py
│   │   ├── test_tool_view_note.py
│   │   ├── test_tool_write_note_kebab_filenames.py
│   │   ├── test_tool_write_note.py
│   │   └── tools
│   │       └── test_chatgpt_tools.py
│   ├── Non-MarkdownFileSupport.pdf
│   ├── README.md
│   ├── repository
│   │   ├── test_entity_repository_upsert.py
│   │   ├── test_entity_repository.py
│   │   ├── test_entity_upsert_issue_187.py
│   │   ├── test_observation_repository.py
│   │   ├── test_postgres_search_repository.py
│   │   ├── test_project_info_repository.py
│   │   ├── test_project_repository.py
│   │   ├── test_relation_repository.py
│   │   ├── test_repository.py
│   │   ├── test_search_repository_edit_bug_fix.py
│   │   └── test_search_repository.py
│   ├── schemas
│   │   ├── test_base_timeframe_minimum.py
│   │   ├── test_memory_serialization.py
│   │   ├── test_memory_url_validation.py
│   │   ├── test_memory_url.py
│   │   ├── test_relation_response_reference_resolution.py
│   │   ├── test_schemas.py
│   │   └── test_search.py
│   ├── Screenshot.png
│   ├── services
│   │   ├── test_context_service.py
│   │   ├── test_directory_service.py
│   │   ├── test_entity_service_disable_permalinks.py
│   │   ├── test_entity_service.py
│   │   ├── test_file_service.py
│   │   ├── test_initialization_cloud_mode_branches.py
│   │   ├── test_initialization.py
│   │   ├── test_link_resolver.py
│   │   ├── test_project_removal_bug.py
│   │   ├── test_project_service_operations.py
│   │   ├── test_project_service.py
│   │   └── test_search_service.py
│   ├── sync
│   │   ├── test_character_conflicts.py
│   │   ├── test_coordinator.py
│   │   ├── test_sync_service_incremental.py
│   │   ├── test_sync_service.py
│   │   ├── test_sync_wikilink_issue.py
│   │   ├── test_tmp_files.py
│   │   ├── test_watch_service_atomic_adds.py
│   │   ├── test_watch_service_edge_cases.py
│   │   ├── test_watch_service_reload.py
│   │   └── test_watch_service.py
│   ├── test_config.py
│   ├── test_deps.py
│   ├── test_production_cascade_delete.py
│   ├── test_project_resolver.py
│   ├── test_rclone_commands.py
│   ├── test_runtime.py
│   ├── test_telemetry.py
│   └── utils
│       ├── test_file_utils.py
│       ├── test_frontmatter_obsidian_compatible.py
│       ├── test_parse_tags.py
│       ├── test_permalink_formatting.py
│       ├── test_timezone_utils.py
│       ├── test_utf8_handling.py
│       └── test_validate_project_path.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/src/basic_memory/api/app.py:
--------------------------------------------------------------------------------

```python
"""FastAPI application for basic-memory knowledge graph API."""

from contextlib import asynccontextmanager

from fastapi import FastAPI, HTTPException
from fastapi.exception_handlers import http_exception_handler
from loguru import logger

from basic_memory import __version__ as version
from basic_memory.api.container import ApiContainer, set_container
from basic_memory.api.routers import (
    directory_router,
    importer_router,
    knowledge,
    management,
    memory,
    project,
    resource,
    search,
    prompt_router,
)
from basic_memory.api.v2.routers import (
    knowledge_router as v2_knowledge,
    project_router as v2_project,
    memory_router as v2_memory,
    search_router as v2_search,
    resource_router as v2_resource,
    directory_router as v2_directory,
    prompt_router as v2_prompt,
    importer_router as v2_importer,
)
from basic_memory.config import init_api_logging
from basic_memory.services.initialization import initialize_app


@asynccontextmanager
async def lifespan(app: FastAPI):  # pragma: no cover
    """Lifecycle manager for the FastAPI app. Not called in stdio mcp mode"""

    # Initialize logging for API (stdout in cloud mode, file otherwise)
    init_api_logging()

    # --- Composition Root ---
    # Create container and read config (single point of config access)
    container = ApiContainer.create()
    set_container(container)
    app.state.container = container

    logger.info(f"Starting Basic Memory API (mode={container.mode.name})")

    await initialize_app(container.config)

    # Cache database connections in app state for performance
    logger.info("Initializing database and caching connections...")
    engine, session_maker = await container.init_database()
    app.state.engine = engine
    app.state.session_maker = session_maker
    logger.info("Database connections cached in app state")

    # Create and start sync coordinator (lifecycle centralized in coordinator)
    sync_coordinator = container.create_sync_coordinator()
    await sync_coordinator.start()
    app.state.sync_coordinator = sync_coordinator

    # Proceed with startup
    yield

    # Shutdown - coordinator handles clean task cancellation
    logger.info("Shutting down Basic Memory API")
    await sync_coordinator.stop()

    await container.shutdown_database()


# Initialize FastAPI app
app = FastAPI(
    title="Basic Memory API",
    description="Knowledge graph API for basic-memory",
    version=version,
    lifespan=lifespan,
)

# Include v2 routers FIRST (more specific paths must match before /{project} catch-all)
app.include_router(v2_knowledge, prefix="/v2/projects/{project_id}")
app.include_router(v2_memory, prefix="/v2/projects/{project_id}")
app.include_router(v2_search, prefix="/v2/projects/{project_id}")
app.include_router(v2_resource, prefix="/v2/projects/{project_id}")
app.include_router(v2_directory, prefix="/v2/projects/{project_id}")
app.include_router(v2_prompt, prefix="/v2/projects/{project_id}")
app.include_router(v2_importer, prefix="/v2/projects/{project_id}")
app.include_router(v2_project, prefix="/v2")

# Include v1 routers (/{project} is a catch-all, must come after specific prefixes)
app.include_router(knowledge.router, prefix="/{project}")
app.include_router(memory.router, prefix="/{project}")
app.include_router(resource.router, prefix="/{project}")
app.include_router(search.router, prefix="/{project}")
app.include_router(project.project_router, prefix="/{project}")
app.include_router(directory_router.router, prefix="/{project}")
app.include_router(prompt_router.router, prefix="/{project}")
app.include_router(importer_router.router, prefix="/{project}")

# Project resource router works across projects
app.include_router(project.project_resource_router)
app.include_router(management.router)


@app.exception_handler(Exception)
async def exception_handler(request, exc):  # pragma: no cover
    logger.exception(
        "API unhandled exception",
        url=str(request.url),
        method=request.method,
        client=request.client.host if request.client else None,
        path=request.url.path,
        error_type=type(exc).__name__,
        error=str(exc),
    )
    return await http_exception_handler(request, HTTPException(status_code=500, detail=str(exc)))

```

--------------------------------------------------------------------------------
/.github/workflows/test.yml:
--------------------------------------------------------------------------------

```yaml
name: Tests

on:
  push:
    branches: [ "main" ]
  pull_request:
    branches: [ "main" ]
  # pull_request_target runs on the BASE of the PR, not the merge result.
  # It has write permissions and access to secrets.
  # It's useful for PRs from forks or automated PRs but requires careful use for security reasons.
  # See: https://docs.github.com/en/actions/using-workflows/events-that-trigger-workflows#pull_request_target
  pull_request_target:
    branches: [ "main" ]

jobs:
  test-sqlite:
    name: Test SQLite (${{ matrix.os }}, Python ${{ matrix.python-version }})
    strategy:
      fail-fast: false
      matrix:
        os: [ubuntu-latest, windows-latest]
        python-version: [ "3.12", "3.13", "3.14" ]
    runs-on: ${{ matrix.os }}

    steps:
      - uses: actions/checkout@v4
        with:
          submodules: true

      - name: Set up Python ${{ matrix.python-version }}
        uses: actions/setup-python@v4
        with:
          python-version: ${{ matrix.python-version }}
          cache: 'pip'

      - name: Install uv
        run: |
          pip install uv

      - name: Install just (Linux/macOS)
        if: runner.os != 'Windows'
        run: |
          curl --proto '=https' --tlsv1.2 -sSf https://just.systems/install.sh | bash -s -- --to /usr/local/bin

      - name: Install just (Windows)
        if: runner.os == 'Windows'
        run: |
          # Install just using Chocolatey (pre-installed on GitHub Actions Windows runners)
          choco install just --yes
        shell: pwsh

      - name: Create virtual env
        run: |
          uv venv

      - name: Install dependencies
        run: |
          uv pip install -e .[dev]

      - name: Run type checks
        run: |
          just typecheck

      - name: Run linting
        run: |
          just lint

      - name: Run tests (SQLite)
        run: |
          uv pip install pytest pytest-cov
          just test-sqlite

  test-postgres:
    name: Test Postgres (Python ${{ matrix.python-version }})
    strategy:
      fail-fast: false
      matrix:
        python-version: [ "3.12", "3.13", "3.14" ]
    runs-on: ubuntu-latest

    # Note: No services section needed - testcontainers handles Postgres in Docker

    steps:
      - uses: actions/checkout@v4
        with:
          submodules: true

      - name: Set up Python ${{ matrix.python-version }}
        uses: actions/setup-python@v4
        with:
          python-version: ${{ matrix.python-version }}
          cache: 'pip'

      - name: Install uv
        run: |
          pip install uv

      - name: Install just
        run: |
          curl --proto '=https' --tlsv1.2 -sSf https://just.systems/install.sh | bash -s -- --to /usr/local/bin

      - name: Create virtual env
        run: |
          uv venv

      - name: Install dependencies
        run: |
          uv pip install -e .[dev]

      - name: Run tests (Postgres via testcontainers)
        run: |
          uv pip install pytest pytest-cov
          just test-postgres

  coverage:
    name: Coverage Summary (combined, Python 3.12)
    runs-on: ubuntu-latest

    steps:
      - uses: actions/checkout@v4
        with:
          submodules: true

      - name: Set up Python 3.12
        uses: actions/setup-python@v4
        with:
          python-version: "3.12"
          cache: "pip"

      - name: Install uv
        run: |
          pip install uv

      - name: Install just
        run: |
          curl --proto '=https' --tlsv1.2 -sSf https://just.systems/install.sh | bash -s -- --to /usr/local/bin

      - name: Create virtual env
        run: |
          uv venv

      - name: Install dependencies
        run: |
          uv pip install -e .[dev]

      - name: Run combined coverage (SQLite + Postgres)
        run: |
          uv pip install pytest pytest-cov
          just coverage

      - name: Add coverage report to job summary
        if: always()
        run: |
          {
            echo "## Coverage"
            echo ""
            echo '```'
            uv run coverage report -m
            echo '```'
          } >> "$GITHUB_STEP_SUMMARY"

      - name: Upload HTML coverage report
        if: always()
        uses: actions/upload-artifact@v4
        with:
          name: htmlcov
          path: htmlcov/

```

--------------------------------------------------------------------------------
/src/basic_memory/alembic/versions/3dae7c7b1564_initial_schema.py:
--------------------------------------------------------------------------------

```python
"""initial schema

Revision ID: 3dae7c7b1564
Revises:
Create Date: 2025-02-12 21:23:00.336344

"""

from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision: str = "3dae7c7b1564"
down_revision: Union[str, None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
    # ### commands auto generated by Alembic - please adjust! ###
    op.create_table(
        "entity",
        sa.Column("id", sa.Integer(), nullable=False),
        sa.Column("title", sa.String(), nullable=False),
        sa.Column("entity_type", sa.String(), nullable=False),
        sa.Column("entity_metadata", sa.JSON(), nullable=True),
        sa.Column("content_type", sa.String(), nullable=False),
        sa.Column("permalink", sa.String(), nullable=False),
        sa.Column("file_path", sa.String(), nullable=False),
        sa.Column("checksum", sa.String(), nullable=True),
        sa.Column("created_at", sa.DateTime(), nullable=False),
        sa.Column("updated_at", sa.DateTime(), nullable=False),
        sa.PrimaryKeyConstraint("id"),
        sa.UniqueConstraint("permalink", name="uix_entity_permalink"),
    )
    op.create_index("ix_entity_created_at", "entity", ["created_at"], unique=False)
    op.create_index(op.f("ix_entity_file_path"), "entity", ["file_path"], unique=True)
    op.create_index(op.f("ix_entity_permalink"), "entity", ["permalink"], unique=True)
    op.create_index("ix_entity_title", "entity", ["title"], unique=False)
    op.create_index("ix_entity_type", "entity", ["entity_type"], unique=False)
    op.create_index("ix_entity_updated_at", "entity", ["updated_at"], unique=False)
    op.create_table(
        "observation",
        sa.Column("id", sa.Integer(), nullable=False),
        sa.Column("entity_id", sa.Integer(), nullable=False),
        sa.Column("content", sa.Text(), nullable=False),
        sa.Column("category", sa.String(), nullable=False),
        sa.Column("context", sa.Text(), nullable=True),
        sa.Column("tags", sa.JSON(), server_default="[]", nullable=True),
        sa.ForeignKeyConstraint(["entity_id"], ["entity.id"], ondelete="CASCADE"),
        sa.PrimaryKeyConstraint("id"),
    )
    op.create_index("ix_observation_category", "observation", ["category"], unique=False)
    op.create_index("ix_observation_entity_id", "observation", ["entity_id"], unique=False)
    op.create_table(
        "relation",
        sa.Column("id", sa.Integer(), nullable=False),
        sa.Column("from_id", sa.Integer(), nullable=False),
        sa.Column("to_id", sa.Integer(), nullable=True),
        sa.Column("to_name", sa.String(), nullable=False),
        sa.Column("relation_type", sa.String(), nullable=False),
        sa.Column("context", sa.Text(), nullable=True),
        sa.ForeignKeyConstraint(["from_id"], ["entity.id"], ondelete="CASCADE"),
        sa.ForeignKeyConstraint(["to_id"], ["entity.id"], ondelete="CASCADE"),
        sa.PrimaryKeyConstraint("id"),
        sa.UniqueConstraint("from_id", "to_id", "relation_type", name="uix_relation"),
    )
    op.create_index("ix_relation_from_id", "relation", ["from_id"], unique=False)
    op.create_index("ix_relation_to_id", "relation", ["to_id"], unique=False)
    op.create_index("ix_relation_type", "relation", ["relation_type"], unique=False)
    # ### end Alembic commands ###


def downgrade() -> None:
    # ### commands auto generated by Alembic - please adjust! ###
    op.drop_index("ix_relation_type", table_name="relation")
    op.drop_index("ix_relation_to_id", table_name="relation")
    op.drop_index("ix_relation_from_id", table_name="relation")
    op.drop_table("relation")
    op.drop_index("ix_observation_entity_id", table_name="observation")
    op.drop_index("ix_observation_category", table_name="observation")
    op.drop_table("observation")
    op.drop_index("ix_entity_updated_at", table_name="entity")
    op.drop_index("ix_entity_type", table_name="entity")
    op.drop_index("ix_entity_title", table_name="entity")
    op.drop_index(op.f("ix_entity_permalink"), table_name="entity")
    op.drop_index(op.f("ix_entity_file_path"), table_name="entity")
    op.drop_index("ix_entity_created_at", table_name="entity")
    op.drop_table("entity")
    # ### end Alembic commands ###

```

--------------------------------------------------------------------------------
/src/basic_memory/cli/commands/db.py:
--------------------------------------------------------------------------------

```python
"""Database management commands."""

from pathlib import Path

import typer
from loguru import logger
from rich.console import Console
from sqlalchemy.exc import OperationalError

from basic_memory import db
from basic_memory.cli.app import app
from basic_memory.cli.commands.command_utils import run_with_cleanup
from basic_memory.config import ConfigManager
from basic_memory.repository import ProjectRepository
from basic_memory.services.initialization import reconcile_projects_with_config
from basic_memory.sync.sync_service import get_sync_service

console = Console()


async def _reindex_projects(app_config):
    """Reindex all projects in a single async context.

    This ensures all database operations use the same event loop,
    and proper cleanup happens when the function completes.
    """
    try:
        await reconcile_projects_with_config(app_config)

        # Get database session (migrations already run if needed)
        _, session_maker = await db.get_or_create_db(
            db_path=app_config.database_path,
            db_type=db.DatabaseType.FILESYSTEM,
        )
        project_repository = ProjectRepository(session_maker)
        projects = await project_repository.get_active_projects()

        for project in projects:
            console.print(f"  Indexing [cyan]{project.name}[/cyan]...")
            logger.info(f"Starting sync for project: {project.name}")
            sync_service = await get_sync_service(project)
            sync_dir = Path(project.path)
            await sync_service.sync(sync_dir, project_name=project.name)
            logger.info(f"Sync completed for project: {project.name}")
    finally:
        # Clean up database connections before event loop closes
        await db.shutdown_db()


@app.command()
def reset(
    reindex: bool = typer.Option(False, "--reindex", help="Rebuild db index from filesystem"),
):  # pragma: no cover
    """Reset database (drop all tables and recreate)."""
    console.print(
        "[yellow]Note:[/yellow] This only deletes the index database. "
        "Your markdown note files will not be affected.\n"
        "Use [green]bm reset --reindex[/green] to automatically rebuild the index afterward."
    )
    if typer.confirm("Reset the database index?"):
        logger.info("Resetting database...")
        config_manager = ConfigManager()
        app_config = config_manager.config
        # Get database path
        db_path = app_config.app_database_path

        # Delete the database file and WAL files if they exist
        for suffix in ["", "-shm", "-wal"]:
            path = db_path.parent / f"{db_path.name}{suffix}"
            if path.exists():
                try:
                    path.unlink()
                    logger.info(f"Deleted: {path}")
                except OSError as e:
                    console.print(
                        f"[red]Error:[/red] Cannot delete {path.name}: {e}\n"
                        "The database may be in use by another process (e.g., MCP server).\n"
                        "Please close Claude Desktop or any other Basic Memory clients and try again."
                    )
                    raise typer.Exit(1)

        # Create a new empty database (preserves project configuration)
        try:
            run_with_cleanup(db.run_migrations(app_config))
        except OperationalError as e:
            if "disk I/O error" in str(e) or "database is locked" in str(e):
                console.print(
                    "[red]Error:[/red] Cannot access database. "
                    "It may be in use by another process (e.g., MCP server).\n"
                    "Please close Claude Desktop or any other Basic Memory clients and try again."
                )
                raise typer.Exit(1)
            raise
        console.print("[green]Database reset complete[/green]")

        if reindex:
            projects = list(app_config.projects)
            if not projects:
                console.print("[yellow]No projects configured. Skipping reindex.[/yellow]")
            else:
                console.print(f"Rebuilding search index for {len(projects)} project(s)...")
                # Note: _reindex_projects has its own cleanup, but run_with_cleanup
                # ensures db.shutdown_db() is called even if _reindex_projects changes
                run_with_cleanup(_reindex_projects(app_config))
                console.print("[green]Reindex complete[/green]")

```

--------------------------------------------------------------------------------
/tests/sync/test_coordinator.py:
--------------------------------------------------------------------------------

```python
"""Tests for SyncCoordinator - centralized sync/watch lifecycle."""

import pytest
from unittest.mock import AsyncMock, patch

from basic_memory.config import BasicMemoryConfig
from basic_memory.sync.coordinator import SyncCoordinator, SyncStatus


class TestSyncCoordinator:
    """Test SyncCoordinator class."""

    @pytest.fixture
    def mock_config(self):
        """Create a mock config for testing."""
        return BasicMemoryConfig()

    def test_initial_status(self, mock_config):
        """Coordinator starts in NOT_STARTED state."""
        coordinator = SyncCoordinator(config=mock_config)
        assert coordinator.status == SyncStatus.NOT_STARTED
        assert coordinator.is_running is False

    @pytest.mark.asyncio
    async def test_start_when_sync_disabled(self, mock_config):
        """When should_sync is False, start() sets status to STOPPED."""
        coordinator = SyncCoordinator(
            config=mock_config,
            should_sync=False,
            skip_reason="Test skip",
        )

        await coordinator.start()

        assert coordinator.status == SyncStatus.STOPPED
        assert coordinator.is_running is False

    @pytest.mark.asyncio
    async def test_stop_when_not_started(self, mock_config):
        """Stop is safe to call when not started."""
        coordinator = SyncCoordinator(config=mock_config)

        await coordinator.stop()  # Should not raise

        assert coordinator.status == SyncStatus.NOT_STARTED

    @pytest.mark.asyncio
    async def test_stop_when_stopped(self, mock_config):
        """Stop is idempotent when already stopped."""
        coordinator = SyncCoordinator(
            config=mock_config,
            should_sync=False,
        )
        await coordinator.start()  # Sets to STOPPED

        await coordinator.stop()  # Should not raise

        assert coordinator.status == SyncStatus.STOPPED

    def test_get_status_info(self, mock_config):
        """get_status_info returns diagnostic info."""
        coordinator = SyncCoordinator(
            config=mock_config,
            should_sync=True,
            skip_reason=None,
        )

        info = coordinator.get_status_info()

        assert info["status"] == "NOT_STARTED"
        assert info["should_sync"] is True
        assert info["skip_reason"] is None
        assert info["has_task"] is False

    def test_get_status_info_with_skip_reason(self, mock_config):
        """get_status_info includes skip reason."""
        coordinator = SyncCoordinator(
            config=mock_config,
            should_sync=False,
            skip_reason="Test environment detected",
        )

        info = coordinator.get_status_info()

        assert info["should_sync"] is False
        assert info["skip_reason"] == "Test environment detected"

    @pytest.mark.asyncio
    async def test_start_creates_task(self, mock_config):
        """When should_sync is True, start() creates a background task."""
        coordinator = SyncCoordinator(
            config=mock_config,
            should_sync=True,
        )

        # Mock initialize_file_sync to avoid actually starting sync
        # The import happens inside start(), so patch at the source module
        with patch(
            "basic_memory.services.initialization.initialize_file_sync",
            new_callable=AsyncMock,
        ):
            # Start coordinator
            await coordinator.start()

            # Should be running with a task
            assert coordinator.status == SyncStatus.RUNNING
            assert coordinator.is_running is True
            assert coordinator._sync_task is not None

            # Stop to clean up
            await coordinator.stop()

            assert coordinator.status == SyncStatus.STOPPED
            assert coordinator._sync_task is None

    @pytest.mark.asyncio
    async def test_start_already_running(self, mock_config):
        """Starting when already running is a no-op."""
        coordinator = SyncCoordinator(
            config=mock_config,
            should_sync=True,
        )

        with patch(
            "basic_memory.services.initialization.initialize_file_sync",
            new_callable=AsyncMock,
        ):
            await coordinator.start()
            first_task = coordinator._sync_task

            # Start again - should not create new task
            await coordinator.start()
            assert coordinator._sync_task is first_task

            await coordinator.stop()

```

--------------------------------------------------------------------------------
/tests/schemas/test_base_timeframe_minimum.py:
--------------------------------------------------------------------------------

```python
"""Test minimum 1-day timeframe enforcement for timezone handling."""

from datetime import datetime, timedelta
import pytest
from freezegun import freeze_time

from basic_memory.schemas.base import parse_timeframe


class TestTimeframeMinimum:
    """Test that parse_timeframe enforces a minimum 1-day lookback."""

    @freeze_time("2025-01-15 15:00:00")
    def test_today_returns_one_day_ago(self):
        """Test that 'today' returns 1 day ago instead of start of today."""
        result = parse_timeframe("today")
        now = datetime.now()
        one_day_ago = now - timedelta(days=1)

        # Should be approximately 1 day ago (within a second for test tolerance)
        diff = abs((result.replace(tzinfo=None) - one_day_ago).total_seconds())
        assert diff < 1, f"Expected ~1 day ago, got {result}"

    @freeze_time("2025-01-15 15:00:00")
    def test_one_hour_returns_one_day_minimum(self):
        """Test that '1h' returns 1 day ago due to minimum enforcement."""
        result = parse_timeframe("1h")
        now = datetime.now()
        one_day_ago = now - timedelta(days=1)

        # Should be approximately 1 day ago, not 1 hour ago
        diff = abs((result.replace(tzinfo=None) - one_day_ago).total_seconds())
        assert diff < 1, f"Expected ~1 day ago for '1h', got {result}"

    @freeze_time("2025-01-15 15:00:00")
    def test_six_hours_returns_one_day_minimum(self):
        """Test that '6h' returns 1 day ago due to minimum enforcement."""
        result = parse_timeframe("6h")
        now = datetime.now()
        one_day_ago = now - timedelta(days=1)

        # Should be approximately 1 day ago, not 6 hours ago
        diff = abs((result.replace(tzinfo=None) - one_day_ago).total_seconds())
        assert diff < 1, f"Expected ~1 day ago for '6h', got {result}"

    @freeze_time("2025-01-15 15:00:00")
    def test_one_day_returns_one_day(self):
        """Test that '1d' correctly returns approximately 1 day ago."""
        result = parse_timeframe("1d")
        now = datetime.now()
        one_day_ago = now - timedelta(days=1)

        # Should be approximately 1 day ago (within 24 hours)
        diff_hours = abs((result.replace(tzinfo=None) - one_day_ago).total_seconds()) / 3600
        assert diff_hours < 24, (
            f"Expected ~1 day ago for '1d', got {result} (diff: {diff_hours} hours)"
        )

    @freeze_time("2025-01-15 15:00:00")
    def test_two_days_returns_two_days(self):
        """Test that '2d' correctly returns approximately 2 days ago (not affected by minimum)."""
        result = parse_timeframe("2d")
        now = datetime.now()
        two_days_ago = now - timedelta(days=2)

        # Should be approximately 2 days ago (within 24 hours)
        diff_hours = abs((result.replace(tzinfo=None) - two_days_ago).total_seconds()) / 3600
        assert diff_hours < 24, (
            f"Expected ~2 days ago for '2d', got {result} (diff: {diff_hours} hours)"
        )

    @freeze_time("2025-01-15 15:00:00")
    def test_one_week_returns_one_week(self):
        """Test that '1 week' correctly returns approximately 1 week ago (not affected by minimum)."""
        result = parse_timeframe("1 week")
        now = datetime.now()
        one_week_ago = now - timedelta(weeks=1)

        # Should be approximately 1 week ago (within 24 hours)
        diff_hours = abs((result.replace(tzinfo=None) - one_week_ago).total_seconds()) / 3600
        assert diff_hours < 24, (
            f"Expected ~1 week ago for '1 week', got {result} (diff: {diff_hours} hours)"
        )

    @freeze_time("2025-01-15 15:00:00")
    def test_zero_days_returns_one_day_minimum(self):
        """Test that '0d' returns 1 day ago due to minimum enforcement."""
        result = parse_timeframe("0d")
        now = datetime.now()
        one_day_ago = now - timedelta(days=1)

        # Should be approximately 1 day ago, not now
        diff = abs((result.replace(tzinfo=None) - one_day_ago).total_seconds())
        assert diff < 1, f"Expected ~1 day ago for '0d', got {result}"

    def test_timezone_awareness(self):
        """Test that returned datetime is timezone-aware."""
        result = parse_timeframe("1d")
        assert result.tzinfo is not None, "Expected timezone-aware datetime"

    def test_invalid_timeframe_raises_error(self):
        """Test that invalid timeframe strings raise ValueError."""
        with pytest.raises(ValueError, match="Could not parse timeframe"):
            parse_timeframe("invalid_timeframe")

```

--------------------------------------------------------------------------------
/src/basic_memory/api/routers/importer_router.py:
--------------------------------------------------------------------------------

```python
"""Import router for Basic Memory API."""

import json
import logging

from fastapi import APIRouter, Form, HTTPException, UploadFile, status

from basic_memory.deps import (
    ChatGPTImporterDep,
    ClaudeConversationsImporterDep,
    ClaudeProjectsImporterDep,
    MemoryJsonImporterDep,
)
from basic_memory.importers import Importer
from basic_memory.schemas.importer import (
    ChatImportResult,
    EntityImportResult,
    ProjectImportResult,
)

logger = logging.getLogger(__name__)

router = APIRouter(prefix="/import", tags=["import"])


@router.post("/chatgpt", response_model=ChatImportResult)
async def import_chatgpt(
    importer: ChatGPTImporterDep,
    file: UploadFile,
    folder: str = Form("conversations"),
) -> ChatImportResult:
    """Import conversations from ChatGPT JSON export.

    Args:
        file: The ChatGPT conversations.json file.
        folder: The folder to place the files in.
        markdown_processor: MarkdownProcessor instance.

    Returns:
        ChatImportResult with import statistics.

    Raises:
        HTTPException: If import fails.
    """
    return await import_file(importer, file, folder)


@router.post("/claude/conversations", response_model=ChatImportResult)
async def import_claude_conversations(
    importer: ClaudeConversationsImporterDep,
    file: UploadFile,
    folder: str = Form("conversations"),
) -> ChatImportResult:
    """Import conversations from Claude conversations.json export.

    Args:
        file: The Claude conversations.json file.
        folder: The folder to place the files in.
        markdown_processor: MarkdownProcessor instance.

    Returns:
        ChatImportResult with import statistics.

    Raises:
        HTTPException: If import fails.
    """
    return await import_file(importer, file, folder)


@router.post("/claude/projects", response_model=ProjectImportResult)
async def import_claude_projects(
    importer: ClaudeProjectsImporterDep,
    file: UploadFile,
    folder: str = Form("projects"),
) -> ProjectImportResult:
    """Import projects from Claude projects.json export.

    Args:
        file: The Claude projects.json file.
        base_folder: The base folder to place the files in.
        markdown_processor: MarkdownProcessor instance.

    Returns:
        ProjectImportResult with import statistics.

    Raises:
        HTTPException: If import fails.
    """
    return await import_file(importer, file, folder)


@router.post("/memory-json", response_model=EntityImportResult)
async def import_memory_json(
    importer: MemoryJsonImporterDep,
    file: UploadFile,
    folder: str = Form("conversations"),
) -> EntityImportResult:
    """Import entities and relations from a memory.json file.

    Args:
        file: The memory.json file.
        destination_folder: Optional destination folder within the project.
        markdown_processor: MarkdownProcessor instance.

    Returns:
        EntityImportResult with import statistics.

    Raises:
        HTTPException: If import fails.
    """
    try:
        file_data = []
        file_bytes = await file.read()
        file_str = file_bytes.decode("utf-8")
        for line in file_str.splitlines():
            json_data = json.loads(line)
            file_data.append(json_data)

        result = await importer.import_data(file_data, folder)
        if not result.success:  # pragma: no cover
            raise HTTPException(
                status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
                detail=result.error_message or "Import failed",
            )
    except Exception as e:
        logger.exception("Import failed")
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail=f"Import failed: {str(e)}",
        )
    return result


async def import_file(importer: Importer, file: UploadFile, destination_folder: str):
    try:
        # Process file
        json_data = json.load(file.file)
        result = await importer.import_data(json_data, destination_folder)
        if not result.success:  # pragma: no cover
            raise HTTPException(
                status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
                detail=result.error_message or "Import failed",
            )

        return result

    except Exception as e:
        logger.exception("Import failed")
        raise HTTPException(
            status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
            detail=f"Import failed: {str(e)}",
        )

```

--------------------------------------------------------------------------------
/tests/cli/test_import_claude_projects.py:
--------------------------------------------------------------------------------

```python
"""Tests for import_claude_projects command."""

import json

import pytest
from typer.testing import CliRunner

from basic_memory.cli.app import app
from basic_memory.cli.commands.import_claude_projects import import_projects  # noqa
from basic_memory.config import get_project_config

# Set up CLI runner
runner = CliRunner()


@pytest.fixture
def sample_project():
    """Sample project data for testing."""
    return {
        "uuid": "test-uuid",
        "name": "Test Project",
        "created_at": "2025-01-05T20:55:32.499880+00:00",
        "updated_at": "2025-01-05T20:56:39.477600+00:00",
        "prompt_template": "# Test Prompt\n\nThis is a test prompt.",
        "docs": [
            {
                "uuid": "doc-uuid-1",
                "filename": "Test Document",
                "content": "# Test Document\n\nThis is test content.",
                "created_at": "2025-01-05T20:56:39.477600+00:00",
            },
            {
                "uuid": "doc-uuid-2",
                "filename": "Another Document",
                "content": "# Another Document\n\nMore test content.",
                "created_at": "2025-01-05T20:56:39.477600+00:00",
            },
        ],
    }


@pytest.fixture
def sample_projects_json(tmp_path, sample_project):
    """Create a sample projects.json file."""
    json_file = tmp_path / "projects.json"
    with open(json_file, "w", encoding="utf-8") as f:
        json.dump([sample_project], f)
    return json_file


def test_import_projects_command_file_not_found(tmp_path):
    """Test error handling for nonexistent file."""
    nonexistent = tmp_path / "nonexistent.json"
    result = runner.invoke(app, ["import", "claude", "projects", str(nonexistent)])
    assert result.exit_code == 1
    assert "File not found" in result.output


def test_import_projects_command_success(tmp_path, sample_projects_json, monkeypatch):
    """Test successful project import via command."""
    # Set up test environment
    config = get_project_config()
    config.home = tmp_path

    # Run import
    result = runner.invoke(app, ["import", "claude", "projects", str(sample_projects_json)])
    assert result.exit_code == 0
    assert "Import complete" in result.output
    assert "Imported 2 project documents" in result.output
    assert "Imported 1 prompt templates" in result.output


def test_import_projects_command_invalid_json(tmp_path):
    """Test error handling for invalid JSON."""
    # Create invalid JSON file
    invalid_file = tmp_path / "invalid.json"
    invalid_file.write_text("not json")

    result = runner.invoke(app, ["import", "claude", "projects", str(invalid_file)])
    assert result.exit_code == 1
    assert "Error during import" in result.output


def test_import_projects_with_base_folder(tmp_path, sample_projects_json, monkeypatch):
    """Test import with custom base folder."""
    # Set up test environment
    config = get_project_config()
    config.home = tmp_path
    base_folder = "claude-exports"

    # Run import
    result = runner.invoke(
        app,
        [
            "import",
            "claude",
            "projects",
            str(sample_projects_json),
            "--base-folder",
            base_folder,
        ],
    )
    assert result.exit_code == 0

    # Check files in base folder
    project_dir = tmp_path / base_folder / "Test_Project"
    assert project_dir.exists()
    assert (project_dir / "docs").exists()
    assert (project_dir / "prompt-template.md").exists()


def test_import_project_without_prompt(tmp_path):
    """Test importing project without prompt template."""
    # Create project without prompt
    project = {
        "uuid": "test-uuid",
        "name": "No Prompt Project",
        "created_at": "2025-01-05T20:55:32.499880+00:00",
        "updated_at": "2025-01-05T20:56:39.477600+00:00",
        "docs": [
            {
                "uuid": "doc-uuid-1",
                "filename": "Test Document",
                "content": "# Test Document\n\nContent.",
                "created_at": "2025-01-05T20:56:39.477600+00:00",
            }
        ],
    }

    json_file = tmp_path / "no_prompt.json"
    with open(json_file, "w", encoding="utf-8") as f:
        json.dump([project], f)

    # Set up environment
    config = get_project_config()
    config.home = tmp_path

    # Run import
    result = runner.invoke(app, ["import", "claude", "projects", str(json_file)])
    assert result.exit_code == 0
    assert "Imported 1 project documents" in result.output
    assert "Imported 0 prompt templates" in result.output

```

--------------------------------------------------------------------------------
/src/basic_memory/services/link_resolver.py:
--------------------------------------------------------------------------------

```python
"""Service for resolving markdown links to permalinks."""

from typing import Optional, Tuple


from loguru import logger

from basic_memory.models import Entity
from basic_memory.repository.entity_repository import EntityRepository
from basic_memory.schemas.search import SearchQuery, SearchItemType
from basic_memory.services.search_service import SearchService


class LinkResolver:
    """Service for resolving markdown links to permalinks.

    Uses a combination of exact matching and search-based resolution:
    1. Try exact permalink match (fastest)
    2. Try exact title match
    3. Try exact file path match
    4. Try file path with .md extension (for folder/title patterns)
    5. Fall back to search for fuzzy matching
    """

    def __init__(self, entity_repository: EntityRepository, search_service: SearchService):
        """Initialize with repositories."""
        self.entity_repository = entity_repository
        self.search_service = search_service

    async def resolve_link(
        self, link_text: str, use_search: bool = True, strict: bool = False
    ) -> Optional[Entity]:
        """Resolve a markdown link to a permalink.

        Args:
            link_text: The link text to resolve
            use_search: Whether to use search-based fuzzy matching as fallback
            strict: If True, only exact matches are allowed (no fuzzy search fallback)
        """
        logger.trace(f"Resolving link: {link_text}")

        # Clean link text and extract any alias
        clean_text, alias = self._normalize_link_text(link_text)

        # 1. Try exact permalink match first (most efficient)
        entity = await self.entity_repository.get_by_permalink(clean_text)
        if entity:
            logger.debug(f"Found exact permalink match: {entity.permalink}")
            return entity

        # 2. Try exact title match
        found = await self.entity_repository.get_by_title(clean_text)
        if found:
            # Return first match if there are duplicates (consistent behavior)
            entity = found[0]
            logger.debug(f"Found title match: {entity.title}")
            return entity

        # 3. Try file path
        found_path = await self.entity_repository.get_by_file_path(clean_text)
        if found_path:
            logger.debug(f"Found entity with path: {found_path.file_path}")
            return found_path

        # 4. Try file path with .md extension if not already present
        if not clean_text.endswith(".md") and "/" in clean_text:
            file_path_with_md = f"{clean_text}.md"
            found_path_md = await self.entity_repository.get_by_file_path(file_path_with_md)
            if found_path_md:
                logger.debug(f"Found entity with path (with .md): {found_path_md.file_path}")
                return found_path_md

        # In strict mode, don't try fuzzy search - return None if no exact match found
        if strict:
            return None

        # 5. Fall back to search for fuzzy matching (only if not in strict mode)
        if use_search and "*" not in clean_text:
            results = await self.search_service.search(
                query=SearchQuery(text=clean_text, entity_types=[SearchItemType.ENTITY]),
            )

            if results:
                # Look for best match
                best_match = min(results, key=lambda x: x.score)  # pyright: ignore
                logger.trace(
                    f"Selected best match from {len(results)} results: {best_match.permalink}"
                )
                if best_match.permalink:
                    return await self.entity_repository.get_by_permalink(best_match.permalink)

        # if we couldn't find anything then return None
        return None

    def _normalize_link_text(self, link_text: str) -> Tuple[str, Optional[str]]:
        """Normalize link text and extract alias if present.

        Args:
            link_text: Raw link text from markdown

        Returns:
            Tuple of (normalized_text, alias or None)
        """
        # Strip whitespace
        text = link_text.strip()

        # Remove enclosing brackets if present
        if text.startswith("[[") and text.endswith("]]"):
            text = text[2:-2]

        # Handle Obsidian-style aliases (format: [[actual|alias]])
        alias = None
        if "|" in text:
            text, alias = text.split("|", 1)
            text = text.strip()
            alias = alias.strip()
        else:
            # Strip whitespace from text even if no alias
            text = text.strip()

        return text, alias

```

--------------------------------------------------------------------------------
/tests/mcp/test_tool_delete_note.py:
--------------------------------------------------------------------------------

```python
"""Tests for delete_note MCP tool."""

from basic_memory.mcp.tools.delete_note import _format_delete_error_response


class TestDeleteNoteErrorFormatting:
    """Test the error formatting function for better user experience."""

    def test_format_delete_error_note_not_found(self, test_project):
        """Test formatting for note not found errors."""
        result = _format_delete_error_response(test_project.name, "entity not found", "test-note")

        assert "# Delete Failed - Note Not Found" in result
        assert "The note 'test-note' could not be found" in result
        assert 'search_notes("test-project", "test-note")' in result
        assert "Already deleted" in result
        assert "Wrong identifier" in result

    def test_format_delete_error_permission_denied(self, test_project):
        """Test formatting for permission errors."""
        result = _format_delete_error_response(test_project.name, "permission denied", "test-note")

        assert "# Delete Failed - Permission Error" in result
        assert "You don't have permission to delete 'test-note'" in result
        assert "Check permissions" in result
        assert "File locks" in result
        assert "list_memory_projects()" in result

    def test_format_delete_error_access_forbidden(self, test_project):
        """Test formatting for access forbidden errors."""
        result = _format_delete_error_response(test_project.name, "access forbidden", "test-note")

        assert "# Delete Failed - Permission Error" in result
        assert "You don't have permission to delete 'test-note'" in result

    def test_format_delete_error_server_error(self, test_project):
        """Test formatting for server errors."""
        result = _format_delete_error_response(
            test_project.name, "server error occurred", "test-note"
        )

        assert "# Delete Failed - System Error" in result
        assert "A system error occurred while deleting 'test-note'" in result
        assert "Try again" in result
        assert "Check file status" in result

    def test_format_delete_error_filesystem_error(self, test_project):
        """Test formatting for filesystem errors."""
        result = _format_delete_error_response(test_project.name, "filesystem error", "test-note")

        assert "# Delete Failed - System Error" in result
        assert "A system error occurred while deleting 'test-note'" in result

    def test_format_delete_error_disk_error(self, test_project):
        """Test formatting for disk errors."""
        result = _format_delete_error_response(test_project.name, "disk full", "test-note")

        assert "# Delete Failed - System Error" in result
        assert "A system error occurred while deleting 'test-note'" in result

    def test_format_delete_error_database_error(self, test_project):
        """Test formatting for database errors."""
        result = _format_delete_error_response(test_project.name, "database error", "test-note")

        assert "# Delete Failed - Database Error" in result
        assert "A database error occurred while deleting 'test-note'" in result
        assert "Sync conflict" in result
        assert "Database lock" in result

    def test_format_delete_error_sync_error(self, test_project):
        """Test formatting for sync errors."""
        result = _format_delete_error_response(test_project.name, "sync failed", "test-note")

        assert "# Delete Failed - Database Error" in result
        assert "A database error occurred while deleting 'test-note'" in result

    def test_format_delete_error_generic(self, test_project):
        """Test formatting for generic errors."""
        result = _format_delete_error_response(test_project.name, "unknown error", "test-note")

        assert "# Delete Failed" in result
        assert "Error deleting note 'test-note': unknown error" in result
        assert "General troubleshooting" in result
        assert "Verify the note exists" in result

    def test_format_delete_error_with_complex_identifier(self, test_project):
        """Test formatting with complex identifiers (permalinks)."""
        result = _format_delete_error_response(
            test_project.name, "entity not found", "folder/note-title"
        )

        assert 'search_notes("test-project", "note-title")' in result
        assert "Note Title" in result  # Title format
        assert "folder/note-title" in result  # Permalink format


# Integration tests removed to focus on error formatting coverage
# The error formatting tests above provide the necessary coverage for MCP tool error messaging

```

--------------------------------------------------------------------------------
/.claude/commands/release/changelog.md:
--------------------------------------------------------------------------------

```markdown
# /changelog - Generate or Update Changelog Entry

Analyze commits and generate formatted changelog entry for a version.

## Usage
```
/changelog <version> [type]
```

**Parameters:**
- `version` (required): Version like `v0.14.0` or `v0.14.0b1`
- `type` (optional): `beta`, `rc`, or `stable` (default: `stable`)

## Implementation

You are an expert technical writer for the Basic Memory project. When the user runs `/changelog`, execute the following steps:

### Step 1: Version Analysis
1. **Determine Commit Range**
   ```bash
   # Find last release tag
   git tag -l "v*" --sort=-version:refname | grep -v "b\|rc" | head -1
   
   # Get commits since last release
   git log --oneline ${last_tag}..HEAD
   ```

2. **Parse Conventional Commits**
   - Extract feat: (features)
   - Extract fix: (bug fixes)  
   - Extract BREAKING CHANGE: (breaking changes)
   - Extract chore:, docs:, test: (other improvements)

### Step 2: Categorize Changes
1. **Features (feat:)**
   - New MCP tools
   - New CLI commands
   - New API endpoints
   - Major functionality additions

2. **Bug Fixes (fix:)**
   - User-facing bug fixes
   - Critical issues resolved
   - Performance improvements
   - Security fixes

3. **Technical Improvements**
   - Test coverage improvements
   - Code quality enhancements
   - Dependency updates
   - Documentation updates

4. **Breaking Changes**
   - API changes
   - Configuration changes
   - Behavior changes
   - Migration requirements

### Step 3: Generate Changelog Entry
Create formatted entry following existing CHANGELOG.md style:

Example:
```markdown
## <version> (<date>)

### Features

- **Multi-Project Management System** - Switch between projects instantly during conversations
  ([`993e88a`](https://github.com/basicmachines-co/basic-memory/commit/993e88a)) 
  - Instant project switching with session context
  - Project-specific operations and isolation
  - Project discovery and management tools

- **Advanced Note Editing** - Incremental editing with append, prepend, find/replace, and section operations
  ([`6fc3904`](https://github.com/basicmachines-co/basic-memory/commit/6fc3904))
  - `edit_note` tool with multiple operation types
  - Smart frontmatter-aware editing
  - Validation and error handling

### Bug Fixes

- **#118**: Fix YAML tag formatting to follow standard specification
  ([`2dc7e27`](https://github.com/basicmachines-co/basic-memory/commit/2dc7e27))

- **#110**: Make --project flag work consistently across CLI commands
  ([`02dd91a`](https://github.com/basicmachines-co/basic-memory/commit/02dd91a))

### Technical Improvements

- **Comprehensive Testing** - 100% test coverage with integration testing
  ([`468a22f`](https://github.com/basicmachines-co/basic-memory/commit/468a22f))
  - MCP integration test suite
  - End-to-end testing framework
  - Performance and edge case validation

### Breaking Changes

- **Database Migration**: Automatic migration from per-project to unified database. 
    Data will be re-index from the filesystem, resulting in no data loss. 
- **Configuration Changes**: Projects now synced between config.json and database
- **Full Backward Compatibility**: All existing setups continue to work seamlessly
```

### Step 4: Integration
1. **Update CHANGELOG.md**
   - Insert new entry at top
   - Maintain consistent formatting
   - Include commit links and issue references

2. **Validation**
   - Check all major changes are captured
   - Verify commit links work
   - Ensure issue numbers are correct

## Smart Analysis Features

### Automatic Classification
- Detect feature additions from file changes
- Identify bug fixes from commit messages
- Find breaking changes from code analysis
- Extract issue numbers from commit messages

### Content Enhancement
- Add context for technical changes
- Include migration guidance for breaking changes
- Suggest installation/upgrade instructions
- Link to relevant documentation

## Output Format

### For Beta Releases

Example: 
```markdown
## v0.13.0b4 (2025-06-03)

### Beta Changes Since v0.13.0b3

- Fix FastMCP API compatibility issues
- Update dependencies to latest versions  
- Resolve setuptools import error

### Installation
```bash
uv tool install basic-memory --prerelease=allow
```

### Known Issues
- [List any known issues for beta testing]
```

### For Stable Releases
Full changelog with complete feature list, organized by impact and category.

## Context
- Follows existing CHANGELOG.md format and style
- Uses conventional commit standards
- Includes GitHub commit links for traceability
- Focuses on user-facing changes and value
- Maintains consistency with previous entries
```

--------------------------------------------------------------------------------
/src/basic_memory/alembic/versions/647e7a75e2cd_project_constraint_fix.py:
--------------------------------------------------------------------------------

```python
"""project constraint fix

Revision ID: 647e7a75e2cd
Revises: 5fe1ab1ccebe
Create Date: 2025-06-03 12:48:30.162566

"""

from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision: str = "647e7a75e2cd"
down_revision: Union[str, None] = "5fe1ab1ccebe"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
    """Remove the problematic UNIQUE constraint on is_default column.

    The UNIQUE constraint prevents multiple projects from having is_default=FALSE,
    which breaks project creation when the service sets is_default=False.

    SQLite: Recreate the table without the constraint (no ALTER TABLE support)
    Postgres: Use ALTER TABLE to drop the constraint directly
    """
    connection = op.get_bind()
    is_sqlite = connection.dialect.name == "sqlite"

    if is_sqlite:
        # For SQLite, we need to recreate the table without the UNIQUE constraint
        # Create a new table without the UNIQUE constraint on is_default
        op.create_table(
            "project_new",
            sa.Column("id", sa.Integer(), nullable=False),
            sa.Column("name", sa.String(), nullable=False),
            sa.Column("description", sa.Text(), nullable=True),
            sa.Column("permalink", sa.String(), nullable=False),
            sa.Column("path", sa.String(), nullable=False),
            sa.Column("is_active", sa.Boolean(), nullable=False),
            sa.Column("is_default", sa.Boolean(), nullable=True),  # No UNIQUE constraint!
            sa.Column("created_at", sa.DateTime(), nullable=False),
            sa.Column("updated_at", sa.DateTime(), nullable=False),
            sa.PrimaryKeyConstraint("id"),
            sa.UniqueConstraint("name"),
            sa.UniqueConstraint("permalink"),
        )

        # Copy data from old table to new table
        op.execute("INSERT INTO project_new SELECT * FROM project")

        # Drop the old table
        op.drop_table("project")

        # Rename the new table
        op.rename_table("project_new", "project")

        # Recreate the indexes
        with op.batch_alter_table("project", schema=None) as batch_op:
            batch_op.create_index("ix_project_created_at", ["created_at"], unique=False)
            batch_op.create_index("ix_project_name", ["name"], unique=True)
            batch_op.create_index("ix_project_path", ["path"], unique=False)
            batch_op.create_index("ix_project_permalink", ["permalink"], unique=True)
            batch_op.create_index("ix_project_updated_at", ["updated_at"], unique=False)
    else:
        # For Postgres, we can simply drop the constraint
        with op.batch_alter_table("project", schema=None) as batch_op:
            batch_op.drop_constraint("project_is_default_key", type_="unique")


def downgrade() -> None:
    """Add back the UNIQUE constraint on is_default column.

    WARNING: This will break project creation again if multiple projects
    have is_default=FALSE.
    """
    # Recreate the table with the UNIQUE constraint
    op.create_table(
        "project_old",
        sa.Column("id", sa.Integer(), nullable=False),
        sa.Column("name", sa.String(), nullable=False),
        sa.Column("description", sa.Text(), nullable=True),
        sa.Column("permalink", sa.String(), nullable=False),
        sa.Column("path", sa.String(), nullable=False),
        sa.Column("is_active", sa.Boolean(), nullable=False),
        sa.Column("is_default", sa.Boolean(), nullable=True),
        sa.Column("created_at", sa.DateTime(), nullable=False),
        sa.Column("updated_at", sa.DateTime(), nullable=False),
        sa.PrimaryKeyConstraint("id"),
        sa.UniqueConstraint("is_default"),  # Add back the problematic constraint
        sa.UniqueConstraint("name"),
        sa.UniqueConstraint("permalink"),
    )

    # Copy data (this may fail if multiple FALSE values exist)
    op.execute("INSERT INTO project_old SELECT * FROM project")

    # Drop the current table and rename
    op.drop_table("project")
    op.rename_table("project_old", "project")

    # Recreate indexes
    with op.batch_alter_table("project", schema=None) as batch_op:
        batch_op.create_index("ix_project_created_at", ["created_at"], unique=False)
        batch_op.create_index("ix_project_name", ["name"], unique=True)
        batch_op.create_index("ix_project_path", ["path"], unique=False)
        batch_op.create_index("ix_project_permalink", ["permalink"], unique=True)
        batch_op.create_index("ix_project_updated_at", ["updated_at"], unique=False)

```

--------------------------------------------------------------------------------
/tests/importers/test_conversation_indexing.py:
--------------------------------------------------------------------------------

```python
"""Test that imported conversations are properly indexed with correct permalink and title.

This test verifies issue #452 - Imported conversations not indexed correctly.
"""

import pytest

from basic_memory.config import ProjectConfig
from basic_memory.importers.claude_conversations_importer import ClaudeConversationsImporter
from basic_memory.markdown import EntityParser
from basic_memory.markdown.markdown_processor import MarkdownProcessor
from basic_memory.repository import EntityRepository
from basic_memory.services import EntityService
from basic_memory.services.file_service import FileService
from basic_memory.services.search_service import SearchService
from basic_memory.schemas.search import SearchQuery
from basic_memory.sync.sync_service import SyncService


@pytest.mark.asyncio
async def test_imported_conversations_have_correct_permalink_and_title(
    project_config: ProjectConfig,
    sync_service: SyncService,
    entity_service: EntityService,
    entity_repository: EntityRepository,
    search_service: SearchService,
):
    """Test that imported conversations have correct permalink and title after sync.

    Issue #452: Imported conversations show permalink: null in search results
    and title shows as filename instead of frontmatter title.
    """
    base_path = project_config.home

    # Create parser, processor, and file_service for importer
    parser = EntityParser(base_path)
    processor = MarkdownProcessor(parser)
    file_service = FileService(base_path, processor)

    # Create importer
    importer = ClaudeConversationsImporter(base_path, processor, file_service)

    # Sample conversation data
    conversations = [
        {
            "uuid": "test-123",
            "name": "My Test Conversation Title",
            "created_at": "2025-01-15T10:00:00Z",
            "updated_at": "2025-01-15T11:00:00Z",
            "chat_messages": [
                {
                    "uuid": "msg-1",
                    "sender": "human",
                    "created_at": "2025-01-15T10:00:00Z",
                    "text": "Hello world",
                    "content": [{"type": "text", "text": "Hello world"}],
                    "attachments": [],
                },
                {
                    "uuid": "msg-2",
                    "sender": "assistant",
                    "created_at": "2025-01-15T10:01:00Z",
                    "text": "Hello!",
                    "content": [{"type": "text", "text": "Hello!"}],
                    "attachments": [],
                },
            ],
        }
    ]

    # Run import
    result = await importer.import_data(conversations, "conversations")
    assert result.success, f"Import failed: {result}"
    assert result.conversations == 1

    # Verify the file was created with correct content
    conv_path = base_path / "conversations" / "20250115-My_Test_Conversation_Title.md"
    assert conv_path.exists(), f"Expected file at {conv_path}"

    content = conv_path.read_text()
    assert "---" in content, "File should have frontmatter markers"
    assert "title: My Test Conversation Title" in content, "File should have title in frontmatter"
    assert "permalink: conversations/20250115-My_Test_Conversation_Title" in content, (
        "File should have permalink in frontmatter"
    )

    # Run sync to index the imported file
    await sync_service.sync(base_path, project_config.name)

    # Verify entity in database
    entities = await entity_repository.find_all()
    assert len(entities) == 1, f"Expected 1 entity, got {len(entities)}"

    entity = entities[0]

    # These are the key assertions for issue #452
    assert entity.title == "My Test Conversation Title", (
        f"Title should be from frontmatter, got: {entity.title}"
    )
    assert entity.permalink == "conversations/20250115-My_Test_Conversation_Title", (
        f"Permalink should be from frontmatter, got: {entity.permalink}"
    )

    # Verify search index also has correct data
    results = await search_service.search(SearchQuery(text="Test Conversation"))
    assert len(results) >= 1, "Should find the conversation in search"

    # Find our entity in search results
    search_result = next((r for r in results if r.entity_id == entity.id), None)
    assert search_result is not None, "Entity should be in search results"
    assert search_result.title == "My Test Conversation Title", (
        f"Search title should be from frontmatter, got: {search_result.title}"
    )
    assert search_result.permalink == "conversations/20250115-My_Test_Conversation_Title", (
        f"Search permalink should not be null, got: {search_result.permalink}"
    )

```

--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------

```toml
[project]
name = "basic-memory"
dynamic = ["version"]
description = "Local-first knowledge management combining Zettelkasten with knowledge graphs"
readme = "README.md"
requires-python = ">=3.12"
license = { text = "AGPL-3.0-or-later" }
authors = [
    { name = "Basic Machines", email = "[email protected]" }
]
dependencies = [
    "sqlalchemy>=2.0.0",
    "pyyaml>=6.0.1",
    "typer>=0.9.0",
    "aiosqlite>=0.20.0",
    "greenlet>=3.1.1",
    "pydantic[email,timezone]>=2.12.0",
    "mcp>=1.23.1",
    "pydantic-settings>=2.6.1",
    "loguru>=0.7.3",
    "pyright>=1.1.390",
    "markdown-it-py>=3.0.0",
    "python-frontmatter>=1.1.0",
    "rich>=13.9.4",
    "unidecode>=1.3.8",
    "dateparser>=1.2.0",
    "watchfiles>=1.0.4",
    "fastapi[standard]>=0.115.8",
    "alembic>=1.14.1",
    "pillow>=11.1.0",
    "pybars3>=0.9.7",
    "fastmcp==2.12.3", # Pinned - 2.14.x breaks MCP tools visibility (issue #463)
    "pyjwt>=2.10.1",
    "python-dotenv>=1.1.0",
    "pytest-aio>=1.9.0",
    "aiofiles>=24.1.0", # Optional observability (disabled by default via config)
    "asyncpg>=0.30.0",
    "nest-asyncio>=1.6.0", # For Alembic migrations with Postgres
    "pytest-asyncio>=1.2.0",
    "psycopg==3.3.1",
    "mdformat>=0.7.22",
    "mdformat-gfm>=0.3.7",
    "mdformat-frontmatter>=2.0.8",
    "openpanel>=0.0.1", # Anonymous usage telemetry (Homebrew-style opt-out)
    "sniffio>=1.3.1",
    "anyio>=4.10.0",
    "httpx>=0.28.0",
]


[project.urls]
Homepage = "https://github.com/basicmachines-co/basic-memory"
Repository = "https://github.com/basicmachines-co/basic-memory"
Documentation = "https://github.com/basicmachines-co/basic-memory#readme"

[project.scripts]
basic-memory = "basic_memory.cli.main:app"
bm = "basic_memory.cli.main:app"

[build-system]
requires = ["hatchling", "uv-dynamic-versioning>=0.7.0"]
build-backend = "hatchling.build"

[tool.pytest.ini_options]
pythonpath = ["src", "tests"]
addopts = "--cov=basic_memory --cov-report term-missing"
testpaths = ["tests", "test-int"]
asyncio_mode = "strict"
asyncio_default_fixture_loop_scope = "function"
markers = [
    "benchmark: Performance benchmark tests (deselect with '-m \"not benchmark\"')",
    "slow: Slow-running tests (deselect with '-m \"not slow\"')",
    "postgres: Tests that run against Postgres backend (deselect with '-m \"not postgres\"')",
    "windows: Windows-specific tests (deselect with '-m \"not windows\"')",
]

[tool.ruff]
line-length = 100
target-version = "py312"

[dependency-groups]
dev = [
    "gevent>=24.11.1",
    "icecream>=2.1.3",
    "pytest>=8.3.4",
    "pytest-cov>=4.1.0",
    "pytest-mock>=3.12.0",
    "pytest-asyncio>=0.24.0",
    "pytest-xdist>=3.0.0",
    "ruff>=0.1.6",
    "freezegun>=1.5.5",
    "testcontainers[postgres]>=4.0.0",
    "psycopg>=3.2.0",
    "pyright>=1.1.408",
]

[tool.hatch.version]
source = "uv-dynamic-versioning"

[tool.uv-dynamic-versioning]
vcs = "git"
style = "pep440"
bump = true
fallback-version = "0.0.0"

[tool.pyright]
include = ["src/"]
exclude = ["**/__pycache__"]
ignore = ["test/"]
defineConstant = { DEBUG = true }
reportMissingImports = "error"
reportMissingTypeStubs = false
pythonVersion = "3.12"



[tool.coverage.run]
concurrency = ["thread", "gevent"]
parallel = true
source = ["basic_memory"]

[tool.coverage.report]
exclude_lines = [
    "pragma: no cover",
    "def __repr__",
    "if self.debug:",
    "if settings.DEBUG",
    "raise AssertionError",
    "raise NotImplementedError",
    "if 0:",
    "if __name__ == .__main__.:",
    "class .*\\bProtocol\\):",
    "@(abc\\.)?abstractmethod",
]

# Exclude specific modules that are difficult to test comprehensively
omit = [
    "*/external_auth_provider.py",  # External HTTP calls to OAuth providers
    "*/supabase_auth_provider.py",  # External HTTP calls to Supabase APIs
    "*/watch_service.py",           # File system watching - complex integration testing
    "*/background_sync.py",         # Background processes
    "*/cli/**",                    # CLI is an interactive wrapper; core logic is covered via API/MCP/service tests
    "*/db.py",                     # Backend/runtime-dependent (sqlite/postgres/windows tuning); validated via integration tests
    "*/services/initialization.py", # Startup orchestration + background tasks (watchers); exercised indirectly in entrypoints
    "*/sync/sync_service.py",      # Heavy filesystem/db integration; covered by integration suite, not enforced in unit coverage
    "*/telemetry.py",              # External analytics; tested lightly, excluded from strict coverage target
    "*/services/migration_service.py", # Complex migration scenarios
]

[tool.logfire]
ignore_no_config = true

```

--------------------------------------------------------------------------------
/src/basic_memory/mcp/tools/build_context.py:
--------------------------------------------------------------------------------

```python
"""Build context tool for Basic Memory MCP server."""

from typing import Optional

from loguru import logger
from fastmcp import Context

from basic_memory.mcp.async_client import get_client
from basic_memory.mcp.project_context import get_active_project
from basic_memory.mcp.server import mcp
from basic_memory.telemetry import track_mcp_tool
from basic_memory.schemas.base import TimeFrame
from basic_memory.schemas.memory import (
    GraphContext,
    MemoryUrl,
    memory_url_path,
)


@mcp.tool(
    description="""Build context from a memory:// URI to continue conversations naturally.

    Use this to follow up on previous discussions or explore related topics.

    Memory URL Format:
    - Use paths like "folder/note" or "memory://folder/note"
    - Pattern matching: "folder/*" matches all notes in folder
    - Valid characters: letters, numbers, hyphens, underscores, forward slashes
    - Avoid: double slashes (//), angle brackets (<>), quotes, pipes (|)
    - Examples: "specs/search", "projects/basic-memory", "notes/*"

    Timeframes support natural language like:
    - "2 days ago", "last week", "today", "3 months ago"
    - Or standard formats like "7d", "24h"
    """,
)
async def build_context(
    url: MemoryUrl,
    project: Optional[str] = None,
    depth: str | int | None = 1,
    timeframe: Optional[TimeFrame] = "7d",
    page: int = 1,
    page_size: int = 10,
    max_related: int = 10,
    context: Context | None = None,
) -> GraphContext:
    """Get context needed to continue a discussion within a specific project.

    This tool enables natural continuation of discussions by loading relevant context
    from memory:// URIs. It uses pattern matching to find relevant content and builds
    a rich context graph of related information.

    Project Resolution:
    Server resolves projects in this order: Single Project Mode → project parameter → default project.
    If project unknown, use list_memory_projects() or recent_activity() first.

    Args:
        project: Project name to build context from. Optional - server will resolve using hierarchy.
                If unknown, use list_memory_projects() to discover available projects.
        url: memory:// URI pointing to discussion content (e.g. memory://specs/search)
        depth: How many relation hops to traverse (1-3 recommended for performance)
        timeframe: How far back to look. Supports natural language like "2 days ago", "last week"
        page: Page number of results to return (default: 1)
        page_size: Number of results to return per page (default: 10)
        max_related: Maximum number of related results to return (default: 10)
        context: Optional FastMCP context for performance caching.

    Returns:
        GraphContext containing:
            - primary_results: Content matching the memory:// URI
            - related_results: Connected content via relations
            - metadata: Context building details

    Examples:
        # Continue a specific discussion
        build_context("my-project", "memory://specs/search")

        # Get deeper context about a component
        build_context("work-docs", "memory://components/memory-service", depth=2)

        # Look at recent changes to a specification
        build_context("research", "memory://specs/document-format", timeframe="today")

        # Research the history of a feature
        build_context("dev-notes", "memory://features/knowledge-graph", timeframe="3 months ago")

    Raises:
        ToolError: If project doesn't exist or depth parameter is invalid
    """
    track_mcp_tool("build_context")
    logger.info(f"Building context from {url} in project {project}")

    # Convert string depth to integer if needed
    if isinstance(depth, str):
        try:
            depth = int(depth)
        except ValueError:
            from mcp.server.fastmcp.exceptions import ToolError

            raise ToolError(f"Invalid depth parameter: '{depth}' is not a valid integer")

    # URL is already validated and normalized by MemoryUrl type annotation

    async with get_client() as client:
        # Get the active project using the new stateless approach
        active_project = await get_active_project(client, project, context)

        # Import here to avoid circular import
        from basic_memory.mcp.clients import MemoryClient

        # Use typed MemoryClient for API calls
        memory_client = MemoryClient(client, active_project.external_id)
        return await memory_client.build_context(
            memory_url_path(url),
            depth=depth or 1,
            timeframe=timeframe,
            page=page,
            page_size=page_size,
            max_related=max_related,
        )

```

--------------------------------------------------------------------------------
/tests/cli/test_auth_cli_auth.py:
--------------------------------------------------------------------------------

```python
import json
import os
import stat
import time
from contextlib import asynccontextmanager

import httpx
import pytest

from basic_memory.cli.auth import CLIAuth


def _make_mock_transport(handler):
    return httpx.MockTransport(handler)


@pytest.mark.asyncio
async def test_cli_auth_request_device_authorization_uses_injected_http_client(
    tmp_path, monkeypatch
):
    """Integration-style test: exercise the request flow with real httpx plumbing (MockTransport)."""
    monkeypatch.setenv("HOME", str(tmp_path))
    monkeypatch.setenv("BASIC_MEMORY_ENV", "test")

    async def handler(request: httpx.Request) -> httpx.Response:
        assert request.url.path.endswith("/oauth2/device_authorization")
        body = (await request.aread()).decode()
        # sanity: client_id should be in form data
        assert "client_id=test-client-id" in body
        return httpx.Response(
            200,
            json={
                "device_code": "devcode",
                "user_code": "usercode",
                "verification_uri": "https://example.test/verify",
                "interval": 1,
            },
        )

    transport = _make_mock_transport(handler)

    @asynccontextmanager
    async def client_factory():
        async with httpx.AsyncClient(transport=transport) as client:
            yield client

    auth = CLIAuth(
        client_id="test-client-id",
        authkit_domain="https://example.test",
        http_client_factory=client_factory,
    )

    result = await auth.request_device_authorization()
    assert result is not None
    assert result["device_code"] == "devcode"


def test_cli_auth_generate_pkce_pair_format(tmp_path, monkeypatch):
    monkeypatch.setenv("HOME", str(tmp_path))
    monkeypatch.setenv("BASIC_MEMORY_ENV", "test")

    auth = CLIAuth(client_id="cid", authkit_domain="https://example.test")
    verifier, challenge = auth.generate_pkce_pair()

    # PKCE verifier/challenge should be URL-safe base64 without padding.
    assert verifier
    assert challenge
    assert "=" not in verifier
    assert "=" not in challenge
    # code verifier length should be in recommended bounds (rough sanity).
    assert 43 <= len(verifier) <= 128


@pytest.mark.asyncio
async def test_cli_auth_save_load_and_get_valid_token_roundtrip(tmp_path, monkeypatch):
    monkeypatch.setenv("HOME", str(tmp_path))
    monkeypatch.setenv("BASIC_MEMORY_ENV", "test")

    auth = CLIAuth(client_id="cid", authkit_domain="https://example.test")

    tokens = {
        "access_token": "at",
        "refresh_token": "rt",
        "expires_in": 3600,
        "token_type": "Bearer",
    }
    auth.save_tokens(tokens)

    loaded = auth.load_tokens()
    assert loaded is not None
    assert loaded["access_token"] == "at"
    assert loaded["refresh_token"] == "rt"
    assert auth.is_token_valid(loaded) is True

    valid = await auth.get_valid_token()
    assert valid == "at"

    # Permission should be 600 on POSIX systems
    if os.name != "nt":
        mode = auth.token_file.stat().st_mode
        assert stat.S_IMODE(mode) == 0o600


@pytest.mark.asyncio
async def test_cli_auth_refresh_flow_uses_injected_http_client(tmp_path, monkeypatch):
    monkeypatch.setenv("HOME", str(tmp_path))
    monkeypatch.setenv("BASIC_MEMORY_ENV", "test")

    async def handler(request: httpx.Request) -> httpx.Response:
        if request.url.path.endswith("/oauth2/token"):
            body = (await request.aread()).decode()
            assert "grant_type=refresh_token" in body
            return httpx.Response(
                200,
                json={
                    "access_token": "new-at",
                    "refresh_token": "new-rt",
                    "expires_in": 3600,
                    "token_type": "Bearer",
                },
            )
        raise AssertionError(f"Unexpected request: {request.method} {request.url}")

    transport = _make_mock_transport(handler)

    @asynccontextmanager
    async def client_factory():
        async with httpx.AsyncClient(transport=transport) as client:
            yield client

    auth = CLIAuth(
        client_id="cid",
        authkit_domain="https://example.test",
        http_client_factory=client_factory,
    )

    # Write an expired token file manually (so we control expires_at precisely).
    auth.token_file.parent.mkdir(parents=True, exist_ok=True)
    auth.token_file.write_text(
        json.dumps(
            {
                "access_token": "old-at",
                "refresh_token": "old-rt",
                "expires_at": int(time.time()) - 10,
                "token_type": "Bearer",
            }
        ),
        encoding="utf-8",
    )

    token = await auth.get_valid_token()
    assert token == "new-at"

```

--------------------------------------------------------------------------------
/src/basic_memory/alembic/versions/5fe1ab1ccebe_add_projects_table.py:
--------------------------------------------------------------------------------

```python
"""add projects table

Revision ID: 5fe1ab1ccebe
Revises: cc7172b46608
Create Date: 2025-05-14 09:05:18.214357

"""

from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision: str = "5fe1ab1ccebe"
down_revision: Union[str, None] = "cc7172b46608"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
    # ### commands auto generated by Alembic - please adjust! ###

    # SQLite FTS5 virtual table handling is SQLite-specific
    # For Postgres, search_index is a regular table managed by ORM
    connection = op.get_bind()
    is_sqlite = connection.dialect.name == "sqlite"

    op.create_table(
        "project",
        sa.Column("id", sa.Integer(), nullable=False),
        sa.Column("name", sa.String(), nullable=False),
        sa.Column("description", sa.Text(), nullable=True),
        sa.Column("permalink", sa.String(), nullable=False),
        sa.Column("path", sa.String(), nullable=False),
        sa.Column("is_active", sa.Boolean(), nullable=False),
        sa.Column("is_default", sa.Boolean(), nullable=True),
        sa.Column("created_at", sa.DateTime(), nullable=False),
        sa.Column("updated_at", sa.DateTime(), nullable=False),
        sa.PrimaryKeyConstraint("id"),
        sa.UniqueConstraint("is_default"),
        sa.UniqueConstraint("name"),
        sa.UniqueConstraint("permalink"),
        if_not_exists=True,
    )
    with op.batch_alter_table("project", schema=None) as batch_op:
        batch_op.create_index(
            "ix_project_created_at", ["created_at"], unique=False, if_not_exists=True
        )
        batch_op.create_index("ix_project_name", ["name"], unique=True, if_not_exists=True)
        batch_op.create_index("ix_project_path", ["path"], unique=False, if_not_exists=True)
        batch_op.create_index(
            "ix_project_permalink", ["permalink"], unique=True, if_not_exists=True
        )
        batch_op.create_index(
            "ix_project_updated_at", ["updated_at"], unique=False, if_not_exists=True
        )

    with op.batch_alter_table("entity", schema=None) as batch_op:
        batch_op.add_column(sa.Column("project_id", sa.Integer(), nullable=False))
        batch_op.drop_index(
            "uix_entity_permalink",
            sqlite_where=sa.text("content_type = 'text/markdown' AND permalink IS NOT NULL")
            if is_sqlite
            else None,
        )
        batch_op.drop_index("ix_entity_file_path")
        batch_op.create_index(batch_op.f("ix_entity_file_path"), ["file_path"], unique=False)
        batch_op.create_index("ix_entity_project_id", ["project_id"], unique=False)
        batch_op.create_index(
            "uix_entity_file_path_project", ["file_path", "project_id"], unique=True
        )
        batch_op.create_index(
            "uix_entity_permalink_project",
            ["permalink", "project_id"],
            unique=True,
            sqlite_where=sa.text("content_type = 'text/markdown' AND permalink IS NOT NULL")
            if is_sqlite
            else None,
        )
        batch_op.create_foreign_key("fk_entity_project_id", "project", ["project_id"], ["id"])

    # drop the search index table. it will be recreated
    # Only drop for SQLite - Postgres uses regular table managed by ORM
    if is_sqlite:
        op.drop_table("search_index")

    # ### end Alembic commands ###


def downgrade() -> None:
    # ### commands auto generated by Alembic - please adjust! ###
    with op.batch_alter_table("entity", schema=None) as batch_op:
        batch_op.drop_constraint("fk_entity_project_id", type_="foreignkey")
        batch_op.drop_index(
            "uix_entity_permalink_project",
            sqlite_where=sa.text("content_type = 'text/markdown' AND permalink IS NOT NULL"),
        )
        batch_op.drop_index("uix_entity_file_path_project")
        batch_op.drop_index("ix_entity_project_id")
        batch_op.drop_index(batch_op.f("ix_entity_file_path"))
        batch_op.create_index("ix_entity_file_path", ["file_path"], unique=1)
        batch_op.create_index(
            "uix_entity_permalink",
            ["permalink"],
            unique=1,
            sqlite_where=sa.text("content_type = 'text/markdown' AND permalink IS NOT NULL"),
        )
        batch_op.drop_column("project_id")

    with op.batch_alter_table("project", schema=None) as batch_op:
        batch_op.drop_index("ix_project_updated_at")
        batch_op.drop_index("ix_project_permalink")
        batch_op.drop_index("ix_project_path")
        batch_op.drop_index("ix_project_name")
        batch_op.drop_index("ix_project_created_at")

    op.drop_table("project")
    # ### end Alembic commands ###

```

--------------------------------------------------------------------------------
/src/basic_memory/schemas/v2/entity.py:
--------------------------------------------------------------------------------

```python
"""V2 entity and project schemas with ID-first design."""

from datetime import datetime
from typing import Dict, List, Literal, Optional

from pydantic import BaseModel, Field, ConfigDict

from basic_memory.schemas.response import ObservationResponse, RelationResponse


class EntityResolveRequest(BaseModel):
    """Request to resolve a string identifier to an entity ID.

    Supports resolution of:
    - Permalinks (e.g., "specs/search")
    - Titles (e.g., "Search Specification")
    - File paths (e.g., "specs/search.md")
    """

    identifier: str = Field(
        ...,
        description="Entity identifier to resolve (permalink, title, or file path)",
        min_length=1,
        max_length=500,
    )


class EntityResolveResponse(BaseModel):
    """Response from identifier resolution.

    Returns the entity ID and associated metadata for the resolved entity.
    """

    external_id: str = Field(..., description="External UUID (primary API identifier)")
    entity_id: int = Field(..., description="Numeric entity ID (internal identifier)")
    permalink: Optional[str] = Field(None, description="Entity permalink")
    file_path: str = Field(..., description="Relative file path")
    title: str = Field(..., description="Entity title")
    resolution_method: Literal["external_id", "permalink", "title", "path", "search"] = Field(
        ..., description="How the identifier was resolved"
    )


class MoveEntityRequestV2(BaseModel):
    """V2 request schema for moving an entity to a new file location.

    In V2 API, the entity ID is provided in the URL path, so this request
    only needs the destination path.
    """

    destination_path: str = Field(
        ...,
        description="New file path for the entity (relative to project root)",
        min_length=1,
        max_length=500,
    )


class EntityResponseV2(BaseModel):
    """V2 entity response with external_id as the primary API identifier.

    This response format emphasizes the external_id (UUID) as the primary API identifier,
    with the numeric id maintained for internal reference.
    """

    # External UUID first - this is the primary API identifier in v2
    external_id: str = Field(..., description="External UUID (primary API identifier)")
    # Internal numeric ID
    id: int = Field(..., description="Numeric entity ID (internal identifier)")

    # Core entity fields
    title: str = Field(..., description="Entity title")
    entity_type: str = Field(..., description="Entity type")
    content_type: str = Field(default="text/markdown", description="Content MIME type")

    # Secondary identifiers (for compatibility and convenience)
    permalink: Optional[str] = Field(None, description="Entity permalink (may change)")
    file_path: str = Field(..., description="Relative file path (may change)")

    # Content and metadata
    content: Optional[str] = Field(None, description="Entity content")
    entity_metadata: Optional[Dict] = Field(None, description="Entity metadata")

    # Relationships
    observations: List[ObservationResponse] = Field(
        default_factory=list, description="Entity observations"
    )
    relations: List[RelationResponse] = Field(default_factory=list, description="Entity relations")

    # Timestamps
    created_at: datetime = Field(..., description="Creation timestamp")
    updated_at: datetime = Field(..., description="Last update timestamp")

    # V2-specific metadata
    api_version: Literal["v2"] = Field(
        default="v2", description="API version (always 'v2' for this response)"
    )

    model_config = ConfigDict(from_attributes=True)


class ProjectResolveRequest(BaseModel):
    """Request to resolve a project identifier to a project ID.

    Supports resolution of:
    - Project names (e.g., "my-project")
    - Permalinks (e.g., "my-project")
    """

    identifier: str = Field(
        ...,
        description="Project identifier to resolve (name or permalink)",
        min_length=1,
        max_length=255,
    )


class ProjectResolveResponse(BaseModel):
    """Response from project identifier resolution.

    Returns the project ID and associated metadata for the resolved project.
    """

    external_id: str = Field(..., description="External UUID (primary API identifier)")
    project_id: int = Field(..., description="Numeric project ID (internal identifier)")
    name: str = Field(..., description="Project name")
    permalink: str = Field(..., description="Project permalink")
    path: str = Field(..., description="Project file path")
    is_active: bool = Field(..., description="Whether the project is active")
    is_default: bool = Field(..., description="Whether the project is the default")
    resolution_method: Literal["external_id", "name", "permalink"] = Field(
        ..., description="How the identifier was resolved"
    )

```

--------------------------------------------------------------------------------
/src/basic_memory/api/v2/routers/memory_router.py:
--------------------------------------------------------------------------------

```python
"""V2 routes for memory:// URI operations.

This router uses external_id UUIDs for stable, API-friendly routing.
V1 uses string-based project names which are less efficient and less stable.
"""

from typing import Annotated, Optional

from fastapi import APIRouter, Query, Path
from loguru import logger

from basic_memory.deps import ContextServiceV2ExternalDep, EntityRepositoryV2ExternalDep
from basic_memory.schemas.base import TimeFrame, parse_timeframe
from basic_memory.schemas.memory import (
    GraphContext,
    normalize_memory_url,
)
from basic_memory.schemas.search import SearchItemType
from basic_memory.api.routers.utils import to_graph_context

# Note: No prefix here - it's added during registration as /v2/{project_id}/memory
router = APIRouter(tags=["memory"])


@router.get("/memory/recent", response_model=GraphContext)
async def recent(
    context_service: ContextServiceV2ExternalDep,
    entity_repository: EntityRepositoryV2ExternalDep,
    project_id: str = Path(..., description="Project external UUID"),
    type: Annotated[list[SearchItemType] | None, Query()] = None,
    depth: int = 1,
    timeframe: TimeFrame = "7d",
    page: int = 1,
    page_size: int = 10,
    max_related: int = 10,
) -> GraphContext:
    """Get recent activity context for a project.

    Args:
        project_id: Project external UUID from URL path
        context_service: Context service scoped to project
        entity_repository: Entity repository scoped to project
        type: Types of items to include (entities, relations, observations)
        depth: How many levels of related entities to include
        timeframe: Time window for recent activity (e.g., "7d", "1 week")
        page: Page number for pagination
        page_size: Number of items per page
        max_related: Maximum related entities to include per item

    Returns:
        GraphContext with recent activity and related entities
    """
    # return all types by default
    types = (
        [SearchItemType.ENTITY, SearchItemType.RELATION, SearchItemType.OBSERVATION]
        if not type
        else type
    )

    logger.debug(
        f"V2 Getting recent context for project {project_id}: `{types}` depth: `{depth}` timeframe: `{timeframe}` page: `{page}` page_size: `{page_size}` max_related: `{max_related}`"
    )
    # Parse timeframe
    since = parse_timeframe(timeframe)
    limit = page_size
    offset = (page - 1) * page_size

    # Build context
    context = await context_service.build_context(
        types=types, depth=depth, since=since, limit=limit, offset=offset, max_related=max_related
    )
    recent_context = await to_graph_context(
        context, entity_repository=entity_repository, page=page, page_size=page_size
    )
    logger.debug(f"V2 Recent context: {recent_context.model_dump_json()}")
    return recent_context


# get_memory_context needs to be declared last so other paths can match


@router.get("/memory/{uri:path}", response_model=GraphContext)
async def get_memory_context(
    context_service: ContextServiceV2ExternalDep,
    entity_repository: EntityRepositoryV2ExternalDep,
    uri: str,
    project_id: str = Path(..., description="Project external UUID"),
    depth: int = 1,
    timeframe: Optional[TimeFrame] = None,
    page: int = 1,
    page_size: int = 10,
    max_related: int = 10,
) -> GraphContext:
    """Get rich context from memory:// URI.

    V2 supports both legacy path-based URIs and new ID-based URIs:
    - Legacy: memory://path/to/note
    - ID-based: memory://id/123 or memory://123

    Args:
        project_id: Project external UUID from URL path
        context_service: Context service scoped to project
        entity_repository: Entity repository scoped to project
        uri: Memory URI path (e.g., "id/123", "123", or "path/to/note")
        depth: How many levels of related entities to include
        timeframe: Optional time window for filtering related content
        page: Page number for pagination
        page_size: Number of items per page
        max_related: Maximum related entities to include

    Returns:
        GraphContext with the entity and its related context
    """
    logger.debug(
        f"V2 Getting context for project {project_id}, URI: `{uri}` depth: `{depth}` timeframe: `{timeframe}` page: `{page}` page_size: `{page_size}` max_related: `{max_related}`"
    )
    memory_url = normalize_memory_url(uri)

    # Parse timeframe
    since = parse_timeframe(timeframe) if timeframe else None
    limit = page_size
    offset = (page - 1) * page_size

    # Build context
    context = await context_service.build_context(
        memory_url, depth=depth, since=since, limit=limit, offset=offset, max_related=max_related
    )
    return await to_graph_context(
        context, entity_repository=entity_repository, page=page, page_size=page_size
    )

```

--------------------------------------------------------------------------------
/src/basic_memory/cli/commands/cloud/api_client.py:
--------------------------------------------------------------------------------

```python
"""Cloud API client utilities."""

from collections.abc import AsyncIterator
from typing import Optional
from contextlib import asynccontextmanager
from typing import AsyncContextManager, Callable

import httpx
import typer
from rich.console import Console

from basic_memory.cli.auth import CLIAuth
from basic_memory.config import ConfigManager

console = Console()

HttpClientFactory = Callable[[], AsyncContextManager[httpx.AsyncClient]]


class CloudAPIError(Exception):
    """Exception raised for cloud API errors."""

    def __init__(
        self, message: str, status_code: Optional[int] = None, detail: Optional[dict] = None
    ):
        super().__init__(message)
        self.status_code = status_code
        self.detail = detail or {}


class SubscriptionRequiredError(CloudAPIError):
    """Exception raised when user needs an active subscription."""

    def __init__(self, message: str, subscribe_url: str):
        super().__init__(message, status_code=403, detail={"error": "subscription_required"})
        self.subscribe_url = subscribe_url


def get_cloud_config() -> tuple[str, str, str]:
    """Get cloud OAuth configuration from config."""
    config_manager = ConfigManager()
    config = config_manager.config
    return config.cloud_client_id, config.cloud_domain, config.cloud_host


async def get_authenticated_headers(auth: CLIAuth | None = None) -> dict[str, str]:
    """
    Get authentication headers with JWT token.
    handles jwt refresh if needed.
    """
    client_id, domain, _ = get_cloud_config()
    auth_obj = auth or CLIAuth(client_id=client_id, authkit_domain=domain)
    token = await auth_obj.get_valid_token()
    if not token:
        console.print("[red]Not authenticated. Please run 'basic-memory cloud login' first.[/red]")
        raise typer.Exit(1)

    return {"Authorization": f"Bearer {token}"}


@asynccontextmanager
async def _default_http_client(timeout: float) -> AsyncIterator[httpx.AsyncClient]:
    async with httpx.AsyncClient(timeout=timeout) as client:
        yield client


async def make_api_request(
    method: str,
    url: str,
    headers: Optional[dict] = None,
    json_data: Optional[dict] = None,
    timeout: float = 30.0,
    *,
    auth: CLIAuth | None = None,
    http_client_factory: HttpClientFactory | None = None,
) -> httpx.Response:
    """Make an API request to the cloud service."""
    headers = headers or {}
    auth_headers = await get_authenticated_headers(auth=auth)
    headers.update(auth_headers)
    # Add debug headers to help with compression issues
    headers.setdefault("Accept-Encoding", "identity")  # Disable compression for debugging

    client_factory = http_client_factory or (lambda: _default_http_client(timeout))
    async with client_factory() as client:
        try:
            response = await client.request(method=method, url=url, headers=headers, json=json_data)
            response.raise_for_status()
            return response
        except httpx.HTTPError as e:
            # Check if this is a response error with response details
            if hasattr(e, "response") and e.response is not None:  # pyright: ignore [reportAttributeAccessIssue]
                response = e.response  # type: ignore

                # Try to parse error detail from response
                error_detail = None
                try:
                    error_detail = response.json()
                except Exception:
                    # If JSON parsing fails, we'll handle it as a generic error
                    pass

                # Check for subscription_required error (403)
                if response.status_code == 403 and isinstance(error_detail, dict):
                    # Handle both FastAPI HTTPException format (nested under "detail")
                    # and direct format
                    detail_obj = error_detail.get("detail", error_detail)
                    if (
                        isinstance(detail_obj, dict)
                        and detail_obj.get("error") == "subscription_required"
                    ):
                        message = detail_obj.get("message", "Active subscription required")
                        subscribe_url = detail_obj.get(
                            "subscribe_url", "https://basicmemory.com/subscribe"
                        )
                        raise SubscriptionRequiredError(
                            message=message, subscribe_url=subscribe_url
                        ) from e

                # Raise generic CloudAPIError with status code and detail
                raise CloudAPIError(
                    f"API request failed: {e}",
                    status_code=response.status_code,
                    detail=error_detail if isinstance(error_detail, dict) else {},
                ) from e

            raise CloudAPIError(f"API request failed: {e}") from e

```

--------------------------------------------------------------------------------
/src/basic_memory/repository/project_repository.py:
--------------------------------------------------------------------------------

```python
"""Repository for managing projects in Basic Memory."""

from pathlib import Path
from typing import Optional, Sequence, Union


from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker

from basic_memory import db
from basic_memory.models.project import Project
from basic_memory.repository.repository import Repository


class ProjectRepository(Repository[Project]):
    """Repository for Project model.

    Projects represent collections of knowledge entities grouped together.
    Each entity, observation, and relation belongs to a specific project.
    """

    def __init__(self, session_maker: async_sessionmaker[AsyncSession]):
        """Initialize with session maker."""
        super().__init__(session_maker, Project)

    async def get_by_name(self, name: str) -> Optional[Project]:
        """Get project by name (exact match).

        Args:
            name: Unique name of the project
        """
        query = self.select().where(Project.name == name)
        return await self.find_one(query)

    async def get_by_name_case_insensitive(self, name: str) -> Optional[Project]:
        """Get project by name (case-insensitive match).

        Args:
            name: Project name (case-insensitive)

        Returns:
            Project if found, None otherwise
        """
        query = self.select().where(Project.name.ilike(name))
        return await self.find_one(query)

    async def get_by_permalink(self, permalink: str) -> Optional[Project]:
        """Get project by permalink.

        Args:
            permalink: URL-friendly identifier for the project
        """
        query = self.select().where(Project.permalink == permalink)
        return await self.find_one(query)

    async def get_by_path(self, path: Union[Path, str]) -> Optional[Project]:
        """Get project by filesystem path.

        Args:
            path: Path to the project directory (will be converted to string internally)
        """
        query = self.select().where(Project.path == Path(path).as_posix())
        return await self.find_one(query)

    async def get_by_id(self, project_id: int) -> Optional[Project]:
        """Get project by numeric ID.

        Args:
            project_id: Numeric project ID

        Returns:
            Project if found, None otherwise
        """
        async with db.scoped_session(self.session_maker) as session:
            return await self.select_by_id(session, project_id)

    async def get_by_external_id(self, external_id: str) -> Optional[Project]:
        """Get project by external UUID.

        Args:
            external_id: External UUID identifier

        Returns:
            Project if found, None otherwise
        """
        query = self.select().where(Project.external_id == external_id)
        return await self.find_one(query)

    async def get_default_project(self) -> Optional[Project]:
        """Get the default project (the one marked as is_default=True)."""
        query = self.select().where(Project.is_default.is_not(None))
        return await self.find_one(query)

    async def get_active_projects(self) -> Sequence[Project]:
        """Get all active projects."""
        query = self.select().where(Project.is_active == True)  # noqa: E712
        result = await self.execute_query(query)
        return list(result.scalars().all())

    async def set_as_default(self, project_id: int) -> Optional[Project]:
        """Set a project as the default and unset previous default.

        Args:
            project_id: ID of the project to set as default

        Returns:
            The updated project if found, None otherwise
        """
        async with db.scoped_session(self.session_maker) as session:
            # First, clear the default flag for all projects using direct SQL
            await session.execute(
                text("UPDATE project SET is_default = NULL WHERE is_default IS NOT NULL")
            )
            await session.flush()

            # Set the new default project
            target_project = await self.select_by_id(session, project_id)
            if target_project:
                target_project.is_default = True
                await session.flush()
                return target_project
            return None  # pragma: no cover

    async def update_path(self, project_id: int, new_path: str) -> Optional[Project]:
        """Update project path.

        Args:
            project_id: ID of the project to update
            new_path: New filesystem path for the project

        Returns:
            The updated project if found, None otherwise
        """
        async with db.scoped_session(self.session_maker) as session:
            project = await self.select_by_id(session, project_id)
            if project:
                project.path = new_path
                await session.flush()
                return project
            return None

```

--------------------------------------------------------------------------------
/src/basic_memory/sync/coordinator.py:
--------------------------------------------------------------------------------

```python
"""SyncCoordinator - centralized sync/watch lifecycle management.

This module provides a single coordinator that manages the lifecycle of
file synchronization and watch services across all entry points (API, MCP, CLI).

The coordinator handles:
- Starting/stopping watch service
- Scheduling background sync
- Reporting status
- Clean shutdown behavior
"""

import asyncio
from dataclasses import dataclass, field
from enum import Enum, auto
from typing import Optional

from loguru import logger

from basic_memory.config import BasicMemoryConfig


class SyncStatus(Enum):
    """Status of the sync coordinator."""

    NOT_STARTED = auto()
    STARTING = auto()
    RUNNING = auto()
    STOPPING = auto()
    STOPPED = auto()
    ERROR = auto()


@dataclass
class SyncCoordinator:
    """Centralized coordinator for sync/watch lifecycle.

    Manages the lifecycle of file synchronization services, providing:
    - Unified start/stop interface
    - Status tracking
    - Clean shutdown with proper task cancellation

    Args:
        config: BasicMemoryConfig with sync settings
        should_sync: Whether sync should be enabled (from container decision)
        skip_reason: Human-readable reason if sync is skipped

    Usage:
        coordinator = SyncCoordinator(config=config, should_sync=True)
        await coordinator.start()
        # ... application runs ...
        await coordinator.stop()
    """

    config: BasicMemoryConfig
    should_sync: bool = True
    skip_reason: Optional[str] = None

    # Internal state (not constructor args)
    _status: SyncStatus = field(default=SyncStatus.NOT_STARTED, init=False)
    _sync_task: Optional[asyncio.Task] = field(default=None, init=False)

    @property
    def status(self) -> SyncStatus:
        """Current status of the coordinator."""
        return self._status

    @property
    def is_running(self) -> bool:
        """Whether sync is currently running."""
        return self._status == SyncStatus.RUNNING

    async def start(self) -> None:
        """Start the sync/watch service if enabled.

        This is a non-blocking call that starts the sync task in the background.
        Use stop() to cleanly shut down.
        """
        if not self.should_sync:
            if self.skip_reason:
                logger.info(f"{self.skip_reason} - skipping local file sync")
            self._status = SyncStatus.STOPPED
            return

        if self._status in (SyncStatus.RUNNING, SyncStatus.STARTING):
            logger.warning("Sync coordinator already running or starting")
            return

        self._status = SyncStatus.STARTING
        logger.info("Starting file sync in background")

        try:
            # Deferred import to avoid circular dependency
            from basic_memory.services.initialization import initialize_file_sync

            async def _file_sync_runner() -> None:  # pragma: no cover
                """Run the file sync service."""
                try:
                    await initialize_file_sync(self.config)
                except asyncio.CancelledError:
                    logger.debug("File sync cancelled")
                    raise
                except Exception as e:
                    logger.error(f"Error in file sync: {e}")
                    self._status = SyncStatus.ERROR
                    raise

            self._sync_task = asyncio.create_task(_file_sync_runner())
            self._status = SyncStatus.RUNNING
            logger.info("Sync coordinator started successfully")

        except Exception as e:  # pragma: no cover
            logger.error(f"Failed to start sync coordinator: {e}")
            self._status = SyncStatus.ERROR
            raise

    async def stop(self) -> None:
        """Stop the sync/watch service cleanly.

        Cancels the background task and waits for it to complete.
        Safe to call even if not running.
        """
        if self._status in (SyncStatus.NOT_STARTED, SyncStatus.STOPPED):
            return

        if self._sync_task is None:  # pragma: no cover
            self._status = SyncStatus.STOPPED
            return

        self._status = SyncStatus.STOPPING
        logger.info("Stopping sync coordinator...")

        self._sync_task.cancel()
        try:
            await self._sync_task
        except asyncio.CancelledError:
            logger.info("File sync task cancelled successfully")

        self._sync_task = None
        self._status = SyncStatus.STOPPED
        logger.info("Sync coordinator stopped")

    def get_status_info(self) -> dict:
        """Get status information for reporting.

        Returns:
            Dictionary with status details for diagnostics
        """
        return {
            "status": self._status.name,
            "should_sync": self.should_sync,
            "skip_reason": self.skip_reason,
            "has_task": self._sync_task is not None,
        }


__all__ = [
    "SyncCoordinator",
    "SyncStatus",
]

```

--------------------------------------------------------------------------------
/tests/api/test_prompt_router.py:
--------------------------------------------------------------------------------

```python
"""Tests for the prompt router endpoints."""

import pytest
import pytest_asyncio
from httpx import AsyncClient

from basic_memory.services.context_service import ContextService


@pytest_asyncio.fixture
async def context_service(entity_repository, search_service, observation_repository):
    """Create a real context service for testing."""
    return ContextService(entity_repository, search_service, observation_repository)


@pytest.mark.asyncio
async def test_continue_conversation_endpoint(
    client: AsyncClient,
    entity_service,
    search_service,
    context_service,
    entity_repository,
    test_graph,
    project_url,
):
    """Test the continue_conversation endpoint with real services."""
    # Create request data
    request_data = {
        "topic": "Root",  # This should match our test entity in test_graph
        "timeframe": "7d",
        "depth": 1,
        "related_items_limit": 2,
    }

    # Call the endpoint
    response = await client.post(f"{project_url}/prompt/continue-conversation", json=request_data)

    # Verify response
    assert response.status_code == 200
    result = response.json()
    assert "prompt" in result
    assert "context" in result

    # Check content of context
    context = result["context"]
    assert context["topic"] == "Root"
    assert context["timeframe"] == "7d"
    assert context["has_results"] is True
    assert len(context["hierarchical_results"]) > 0

    # Check content of prompt
    prompt = result["prompt"]
    assert "Continuing conversation on: Root" in prompt
    assert "memory retrieval session" in prompt

    # Test without topic - should use recent activity
    request_data = {"timeframe": "1d", "depth": 1, "related_items_limit": 2}

    response = await client.post(f"{project_url}/prompt/continue-conversation", json=request_data)

    assert response.status_code == 200
    result = response.json()
    assert "Recent Activity" in result["context"]["topic"]


@pytest.mark.asyncio
async def test_search_prompt_endpoint(
    client: AsyncClient, entity_service, search_service, test_graph, project_url
):
    """Test the search_prompt endpoint with real services."""
    # Create request data
    request_data = {
        "query": "Root",  # This should match our test entity
        "timeframe": "7d",
    }

    # Call the endpoint
    response = await client.post(f"{project_url}/prompt/search", json=request_data)

    # Verify response
    assert response.status_code == 200
    result = response.json()
    assert "prompt" in result
    assert "context" in result

    # Check content of context
    context = result["context"]
    assert context["query"] == "Root"
    assert context["timeframe"] == "7d"
    assert context["has_results"] is True
    assert len(context["results"]) > 0

    # Check content of prompt
    prompt = result["prompt"]
    assert 'Search Results for: "Root"' in prompt
    assert "This is a memory search session" in prompt


@pytest.mark.asyncio
async def test_search_prompt_no_results(
    client: AsyncClient, entity_service, search_service, project_url
):
    """Test the search_prompt endpoint with a query that returns no results."""
    # Create request data with a query that shouldn't match anything
    request_data = {"query": "NonExistentQuery12345", "timeframe": "7d"}

    # Call the endpoint
    response = await client.post(f"{project_url}/prompt/search", json=request_data)

    # Verify response
    assert response.status_code == 200
    result = response.json()

    # Check content of context
    context = result["context"]
    assert context["query"] == "NonExistentQuery12345"
    assert context["has_results"] is False
    assert len(context["results"]) == 0

    # Check content of prompt
    prompt = result["prompt"]
    assert 'Search Results for: "NonExistentQuery12345"' in prompt
    assert "I couldn't find any results for this query" in prompt
    assert "Opportunity to Capture Knowledge" in prompt


@pytest.mark.asyncio
async def test_error_handling(client: AsyncClient, monkeypatch, project_url):
    """Test error handling in the endpoints by breaking the template loader."""

    # Patch the template loader to raise an exception
    def mock_render(*args, **kwargs):
        raise Exception("Template error")

    # Apply the patch
    monkeypatch.setattr("basic_memory.api.template_loader.TemplateLoader.render", mock_render)

    # Test continue_conversation error handling
    response = await client.post(
        f"{project_url}/prompt/continue-conversation",
        json={"topic": "test error", "timeframe": "7d"},
    )

    assert response.status_code == 500
    assert "detail" in response.json()
    assert "Template error" in response.json()["detail"]

    # Test search_prompt error handling
    response = await client.post(
        f"{project_url}/prompt/search", json={"query": "test error", "timeframe": "7d"}
    )

    assert response.status_code == 500
    assert "detail" in response.json()
    assert "Template error" in response.json()["detail"]

```

--------------------------------------------------------------------------------
/tests/cli/test_import_memory_json.py:
--------------------------------------------------------------------------------

```python
"""Tests for import_memory_json command."""

import json

import pytest
from typer.testing import CliRunner

from basic_memory.cli.app import import_app
from basic_memory.cli.commands import import_memory_json  # noqa
from basic_memory.markdown import MarkdownProcessor
from basic_memory.services.file_service import FileService

# Set up CLI runner
runner = CliRunner()


@pytest.fixture
def sample_entities():
    """Sample entities for testing."""
    return [
        {
            "type": "entity",
            "name": "test_entity",
            "entityType": "test",
            "observations": ["Test observation 1", "Test observation 2"],
        },
        {
            "type": "relation",
            "from": "test_entity",
            "to": "related_entity",
            "relationType": "test_relation",
        },
    ]


@pytest.fixture
def sample_json_file(tmp_path, sample_entities):
    """Create a sample memory.json file."""
    json_file = tmp_path / "memory.json"
    with open(json_file, "w", encoding="utf-8") as f:
        for entity in sample_entities:
            f.write(json.dumps(entity) + "\n")
    return json_file


@pytest.mark.asyncio
async def test_get_importer_dependencies(tmp_path, monkeypatch):
    """Test getting importer dependencies (MarkdownProcessor and FileService)."""
    monkeypatch.setenv("HOME", str(tmp_path))
    processor, file_service = await import_memory_json.get_importer_dependencies()
    assert isinstance(processor, MarkdownProcessor)
    assert isinstance(file_service, FileService)


def test_import_json_command_file_not_found(tmp_path):
    """Test error handling for nonexistent file."""
    nonexistent = tmp_path / "nonexistent.json"
    result = runner.invoke(import_app, ["memory-json", str(nonexistent)])
    assert result.exit_code == 1
    assert "File not found" in result.output


def test_import_json_command_success(tmp_path, sample_json_file, monkeypatch):
    """Test successful JSON import via command."""
    # Set up test environment
    monkeypatch.setenv("HOME", str(tmp_path))

    # Run import
    result = runner.invoke(import_app, ["memory-json", str(sample_json_file)])
    assert result.exit_code == 0
    assert "Import complete" in result.output
    assert "Created 1 entities" in result.output
    assert "Added 1 relations" in result.output


def test_import_json_command_invalid_json(tmp_path):
    """Test error handling for invalid JSON."""
    # Create invalid JSON file
    invalid_file = tmp_path / "invalid.json"
    invalid_file.write_text("not json")

    result = runner.invoke(import_app, ["memory-json", str(invalid_file)])
    assert result.exit_code == 1
    assert "Error during import" in result.output


def test_import_json_command_handle_old_format(tmp_path):
    """Test handling old format JSON with from_id/to_id."""
    # Create JSON with old format
    old_format = [
        {
            "type": "entity",
            "name": "test_entity",
            "entityType": "test",
            "observations": ["Test observation"],
        },
        {
            "type": "relation",
            "from_id": "test_entity",
            "to_id": "other_entity",
            "relation_type": "test_relation",
        },
    ]

    json_file = tmp_path / "old_format.json"
    with open(json_file, "w", encoding="utf-8") as f:
        for item in old_format:
            f.write(json.dumps(item) + "\n")

    # Set up test environment
    monkeypatch = pytest.MonkeyPatch()
    monkeypatch.setenv("HOME", str(tmp_path))

    # Run import
    result = runner.invoke(import_app, ["memory-json", str(json_file)])
    assert result.exit_code == 0
    assert "Import complete" in result.output


def test_import_json_command_missing_name_key(tmp_path):
    """Test handling JSON with missing 'name' key using 'id' instead."""
    # Create JSON with id instead of name (common in Knowledge Graph Memory Server)
    data_with_id = [
        {
            "type": "entity",
            "id": "test_entity_id",
            "entityType": "test",
            "observations": ["Test observation with id"],
        },
        {
            "type": "entity",
            "entityName": "test_entity_2",
            "entityType": "test",
            "observations": ["Test observation with entityName"],
        },
        {
            "type": "entity",
            "name": "test_entity_title",
            "entityType": "test",
            "observations": ["Test observation with name"],
        },
    ]

    json_file = tmp_path / "missing_name.json"
    with open(json_file, "w", encoding="utf-8") as f:
        for item in data_with_id:
            f.write(json.dumps(item) + "\n")

    # Set up test environment
    monkeypatch = pytest.MonkeyPatch()
    monkeypatch.setenv("HOME", str(tmp_path))

    # Run import - should not fail even without 'name' key
    result = runner.invoke(import_app, ["memory-json", str(json_file)])
    assert result.exit_code == 0
    assert "Import complete" in result.output
    assert "Created 3 entities" in result.output

```

--------------------------------------------------------------------------------
/tests/markdown/test_parser_edge_cases.py:
--------------------------------------------------------------------------------

```python
"""Tests for markdown parser edge cases."""

from pathlib import Path
from textwrap import dedent

import pytest

from basic_memory.markdown.entity_parser import EntityParser


@pytest.mark.asyncio
async def test_unicode_content(tmp_path):
    """Test handling of Unicode content including emoji and non-Latin scripts."""
    content = dedent("""
        ---
        type: test
        id: test/unicode
        created: 2024-12-21T14:00:00Z
        modified: 2024-12-21T14:00:00Z
        tags: [unicode, 测试]
        ---
        
        # Unicode Test 🧪
        
        ## Observations
        - [test] Emoji test 👍 #emoji #test (Testing emoji)
        - [中文] Chinese text 测试 #language (Script test)
        - [русский] Russian привет #language (More scripts)
        - [note] Emoji in text 😀 #meta (Category test)
        
        ## Relations
        - tested_by [[测试组件]] (Unicode test)
        - depends_on [[компонент]] (Another test)
        """)

    test_file = tmp_path / "unicode.md"
    test_file.write_text(content, encoding="utf-8")

    parser = EntityParser(tmp_path)
    entity = await parser.parse_file(test_file)

    assert "测试" in entity.frontmatter.metadata["tags"]
    assert "chinese" not in entity.frontmatter.metadata["tags"]
    assert "🧪" in entity.content

    # Verify Unicode in observations
    assert any(o.content == "Emoji test 👍 #emoji #test" for o in entity.observations)
    assert any(o.category == "中文" for o in entity.observations)
    assert any(o.category == "русский" for o in entity.observations)

    # Verify Unicode in relations
    assert any(r.target == "测试组件" for r in entity.relations)
    assert any(r.target == "компонент" for r in entity.relations)


@pytest.mark.asyncio
async def test_empty_file(tmp_path):
    """Test handling of empty files."""
    empty_file = tmp_path / "empty.md"
    empty_file.write_text("")

    parser = EntityParser(tmp_path)
    entity = await parser.parse_file(empty_file)
    assert entity.observations == []
    assert entity.relations == []


@pytest.mark.asyncio
async def test_missing_sections(tmp_path):
    """Test handling of files with missing sections."""
    content = dedent("""
        ---
        type: test
        id: test/missing
        created: 2024-01-09
        modified: 2024-01-09
        tags: []
        ---
        
        Just some content
        with [[links]] but no sections
        """)

    test_file = tmp_path / "missing.md"
    test_file.write_text(content)

    parser = EntityParser(tmp_path)
    entity = await parser.parse_file(test_file)
    assert len(entity.relations) == 1
    assert entity.relations[0].target == "links"
    assert entity.relations[0].type == "links_to"


@pytest.mark.asyncio
async def test_tasks_are_not_observations(tmp_path):
    """Test handling of plain observations without categories."""
    content = dedent("""
        ---
        type: test
        id: test/missing
        created: 2024-01-09
        modified: 2024-01-09
        tags: []
        ---

        - [ ] one
        -[ ] two
        - [x] done
        - [-] not done
        """)

    test_file = tmp_path / "missing.md"
    test_file.write_text(content)

    parser = EntityParser(tmp_path)
    entity = await parser.parse_file(test_file)
    assert len(entity.observations) == 0


@pytest.mark.asyncio
async def test_nested_content(tmp_path):
    """Test handling of deeply nested content."""
    content = dedent("""
        ---
        type: test
        id: test/nested
        created: 2024-01-09
        modified: 2024-01-09
        tags: []
        ---
        
        # Test
        
        ## Level 1
        - [test] Level 1 #test (First level)
        - implements [[One]]
            
            ### Level 2
            - [test] Level 2 #test (Second level)
            - uses [[Two]]
                
                #### Level 3
                - [test] Level 3 #test (Third level)
                - needs [[Three]]
        """)

    test_file = tmp_path / "nested.md"
    test_file.write_text(content)

    parser = EntityParser(tmp_path)
    entity = await parser.parse_file(test_file)

    # Should find all observations and relations regardless of nesting
    assert len(entity.observations) == 3
    assert len(entity.relations) == 3
    assert {r.target for r in entity.relations} == {"One", "Two", "Three"}


@pytest.mark.asyncio
async def test_malformed_frontmatter(tmp_path):
    """Test handling of malformed frontmatter."""
    # Missing fields
    content = dedent("""
        ---
        type: test
        ---
        
        # Test
        """)

    test_file = tmp_path / "malformed.md"
    test_file.write_text(content)

    parser = EntityParser(tmp_path)
    entity = await parser.parse_file(test_file)
    assert entity.frontmatter.permalink is None


@pytest.mark.asyncio
async def test_file_not_found():
    """Test handling of non-existent files."""
    parser = EntityParser(Path("/tmp"))
    with pytest.raises(FileNotFoundError):
        await parser.parse_file(Path("nonexistent.md"))

```

--------------------------------------------------------------------------------
/tests/mcp/test_tool_build_context.py:
--------------------------------------------------------------------------------

```python
"""Tests for discussion context MCP tool."""

import pytest
from datetime import datetime

from mcp.server.fastmcp.exceptions import ToolError

from basic_memory.mcp.tools import build_context
from basic_memory.schemas.memory import (
    GraphContext,
)


@pytest.mark.asyncio
async def test_get_basic_discussion_context(client, test_graph, test_project):
    """Test getting basic discussion context."""
    context = await build_context.fn(project=test_project.name, url="memory://test/root")

    assert isinstance(context, GraphContext)
    assert len(context.results) == 1
    assert context.results[0].primary_result.permalink == "test/root"
    assert len(context.results[0].related_results) > 0

    # Verify metadata
    assert context.metadata.uri == "test/root"
    assert context.metadata.depth == 1  # default depth
    assert context.metadata.timeframe is not None
    assert isinstance(context.metadata.generated_at, datetime)
    assert context.metadata.primary_count == 1
    if context.metadata.related_count:
        assert context.metadata.related_count > 0


@pytest.mark.asyncio
async def test_get_discussion_context_pattern(client, test_graph, test_project):
    """Test getting context with pattern matching."""
    context = await build_context.fn(project=test_project.name, url="memory://test/*", depth=1)

    assert isinstance(context, GraphContext)
    assert len(context.results) > 1  # Should match multiple test/* paths
    assert all("test/" in item.primary_result.permalink for item in context.results)  # pyright: ignore [reportOperatorIssue]
    assert context.metadata.depth == 1


@pytest.mark.asyncio
async def test_get_discussion_context_timeframe(client, test_graph, test_project):
    """Test timeframe parameter filtering."""
    # Get recent context
    recent_context = await build_context.fn(
        project=test_project.name,
        url="memory://test/root",
        timeframe="1d",  # Last 24 hours
    )

    # Get older context
    older_context = await build_context.fn(
        project=test_project.name,
        url="memory://test/root",
        timeframe="30d",  # Last 30 days
    )

    # Calculate total related items
    total_recent_related = (
        sum(len(item.related_results) for item in recent_context.results)
        if recent_context.results
        else 0
    )
    total_older_related = (
        sum(len(item.related_results) for item in older_context.results)
        if older_context.results
        else 0
    )

    assert total_older_related >= total_recent_related


@pytest.mark.asyncio
async def test_get_discussion_context_not_found(client, test_project):
    """Test handling of non-existent URIs."""
    context = await build_context.fn(project=test_project.name, url="memory://test/does-not-exist")

    assert isinstance(context, GraphContext)
    assert len(context.results) == 0
    assert context.metadata.primary_count == 0
    assert context.metadata.related_count == 0


# Test data for different timeframe formats
valid_timeframes = [
    "7d",  # Standard format
    "yesterday",  # Natural language
    "0d",  # Zero duration
]

invalid_timeframes = [
    "invalid",  # Nonsense string
    # NOTE: "tomorrow" now returns 1 day ago due to timezone safety - no longer invalid
]


@pytest.mark.asyncio
async def test_build_context_timeframe_formats(client, test_graph, test_project):
    """Test that build_context accepts various timeframe formats."""
    test_url = "memory://specs/test"

    # Test each valid timeframe
    for timeframe in valid_timeframes:
        try:
            result = await build_context.fn(
                project=test_project.name,
                url=test_url,
                timeframe=timeframe,
                page=1,
                page_size=10,
                max_related=10,
            )
            assert result is not None
        except Exception as e:
            pytest.fail(f"Failed with valid timeframe '{timeframe}': {str(e)}")

    # Test invalid timeframes should raise ValidationError
    for timeframe in invalid_timeframes:
        with pytest.raises(ToolError):
            await build_context.fn(project=test_project.name, url=test_url, timeframe=timeframe)


@pytest.mark.asyncio
async def test_build_context_string_depth_parameter(client, test_graph, test_project):
    """Test that build_context handles string depth parameter correctly."""
    test_url = "memory://test/root"

    # Test valid string depth parameter - should either raise ToolError or convert to int
    try:
        result = await build_context.fn(url=test_url, depth="2", project=test_project.name)
        # If it succeeds, verify the depth was converted to an integer
        assert isinstance(result.metadata.depth, int)
        assert result.metadata.depth == 2
    except ToolError:
        # This is also acceptable behavior - type validation should catch it
        pass

    # Test invalid string depth parameter - should raise ToolError
    with pytest.raises(ToolError):
        await build_context.fn(test_url, depth="invalid", project=test_project.name)

```

--------------------------------------------------------------------------------
/tests/api/test_continue_conversation_template.py:
--------------------------------------------------------------------------------

```python
"""Tests for the continue_conversation template rendering."""

import datetime
import pytest

from basic_memory.api.template_loader import TemplateLoader
from basic_memory.schemas.memory import EntitySummary
from basic_memory.schemas.search import SearchItemType


@pytest.fixture
def template_loader():
    """Return a TemplateLoader instance for testing."""
    return TemplateLoader()


@pytest.fixture
def entity_summary():
    """Create a sample EntitySummary for testing."""
    return EntitySummary(
        entity_id=1,
        title="Test Entity",
        permalink="test/entity",
        type=SearchItemType.ENTITY,
        content="This is a test entity with some content.",
        file_path="/path/to/test/entity.md",
        created_at=datetime.datetime(2023, 1, 1, 12, 0),
    )


@pytest.fixture
def context_with_results(entity_summary):
    """Create a sample context with results for testing."""
    from basic_memory.schemas.memory import ObservationSummary, ContextResult

    # Create an observation for the entity
    observation = ObservationSummary(
        observation_id=1,
        entity_id=1,
        title="Test Observation",
        permalink="test/entity/observations/1",
        category="test",
        content="This is a test observation.",
        file_path="/path/to/test/entity.md",
        created_at=datetime.datetime(2023, 1, 1, 12, 0),
    )

    # Create a context result with primary_result, observations, and related_results
    context_item = ContextResult(
        primary_result=entity_summary,
        observations=[observation],
        related_results=[entity_summary],
    )

    return {
        "topic": "Test Topic",
        "timeframe": "7d",
        "has_results": True,
        "hierarchical_results": [context_item],
    }


@pytest.fixture
def context_without_results():
    """Create a sample context without results for testing."""
    return {
        "topic": "Empty Topic",
        "timeframe": "1d",
        "has_results": False,
        "hierarchical_results": [],
    }


@pytest.mark.asyncio
async def test_continue_conversation_with_results(template_loader, context_with_results):
    """Test rendering the continue_conversation template with results."""
    result = await template_loader.render("prompts/continue_conversation.hbs", context_with_results)

    # Check that key elements are present
    assert "Continuing conversation on: Test Topic" in result
    assert "memory://test/entity" in result
    assert "Test Entity" in result
    assert "This is a test entity with some content." in result
    assert "Related Context" in result
    assert "read_note" in result
    assert "Next Steps" in result
    assert "Knowledge Capture Recommendation" in result


@pytest.mark.asyncio
async def test_continue_conversation_without_results(template_loader, context_without_results):
    """Test rendering the continue_conversation template without results."""
    result = await template_loader.render(
        "prompts/continue_conversation.hbs", context_without_results
    )

    # Check that key elements are present
    assert "Continuing conversation on: Empty Topic" in result
    assert "The supplied query did not return any information" in result
    assert "Opportunity to Capture New Knowledge!" in result
    assert 'title="Empty Topic"' in result
    assert "Next Steps" in result
    assert "Knowledge Capture Recommendation" in result


@pytest.mark.asyncio
async def test_next_steps_section(template_loader, context_with_results):
    """Test that the next steps section is rendered correctly."""
    result = await template_loader.render("prompts/continue_conversation.hbs", context_with_results)

    assert "Next Steps" in result
    assert 'Explore more with: `search_notes("Test Topic")`' in result
    assert (
        f'See what\'s changed: `recent_activity(timeframe="{context_with_results["timeframe"]}")`'
        in result
    )
    assert "Record new learnings or decisions from this conversation" in result


@pytest.mark.asyncio
async def test_knowledge_capture_recommendation(template_loader, context_with_results):
    """Test that the knowledge capture recommendation is rendered."""
    result = await template_loader.render("prompts/continue_conversation.hbs", context_with_results)

    assert "Knowledge Capture Recommendation" in result
    assert "actively look for opportunities to:" in result
    assert "Record key information, decisions, or insights" in result
    assert "Link new knowledge to existing topics" in result
    assert "Suggest capturing important context" in result
    assert "one of the most valuable aspects of Basic Memory" in result


@pytest.mark.asyncio
async def test_timeframe_default_value(template_loader, context_with_results):
    """Test that the timeframe uses the default value when not provided."""
    # Remove the timeframe from the context
    context_without_timeframe = context_with_results.copy()
    context_without_timeframe["timeframe"] = None

    result = await template_loader.render(
        "prompts/continue_conversation.hbs", context_without_timeframe
    )

    # Check that the default value is used
    assert 'recent_activity(timeframe="7d")' in result

```

--------------------------------------------------------------------------------
/tests/repository/test_entity_upsert_issue_187.py:
--------------------------------------------------------------------------------

```python
"""Tests for issue #187 - UNIQUE constraint violation on file_path during sync."""

import pytest
from datetime import datetime, timezone

from basic_memory.models.knowledge import Entity, Observation
from basic_memory.repository.entity_repository import EntityRepository


@pytest.mark.asyncio
async def test_upsert_entity_with_observations_conflict(entity_repository: EntityRepository):
    """Test upserting an entity that already exists with observations.

    This reproduces issue #187 where sync fails with UNIQUE constraint violations
    when trying to update entities that already exist with observations.
    """
    # Create initial entity with observations
    entity1 = Entity(
        project_id=entity_repository.project_id,
        title="Original Title",
        entity_type="note",
        permalink="debugging/backup-system/coderabbit-feedback-resolution",
        file_path="debugging/backup-system/CodeRabbit Feedback Resolution - Backup System Issues.md",
        content_type="text/markdown",
        created_at=datetime.now(timezone.utc),
        updated_at=datetime.now(timezone.utc),
    )

    # Add observations to the entity
    obs1 = Observation(
        project_id=entity_repository.project_id,
        content="This is a test observation",
        category="testing",
        tags=["test"],
    )
    entity1.observations.append(obs1)

    result1 = await entity_repository.upsert_entity(entity1)
    original_id = result1.id

    # Verify entity was created with observations
    assert result1.id is not None
    assert len(result1.observations) == 1

    # Now try to upsert the same file_path with different content/observations
    # This simulates a file being modified and re-synced
    entity2 = Entity(
        project_id=entity_repository.project_id,
        title="Updated Title",
        entity_type="note",
        permalink="debugging/backup-system/coderabbit-feedback-resolution",  # Same permalink
        file_path="debugging/backup-system/CodeRabbit Feedback Resolution - Backup System Issues.md",  # Same file_path
        content_type="text/markdown",
        created_at=datetime.now(timezone.utc),
        updated_at=datetime.now(timezone.utc),
    )

    # Add different observations
    obs2 = Observation(
        project_id=entity_repository.project_id,
        content="This is an updated observation",
        category="updated",
        tags=["updated"],
    )
    obs3 = Observation(
        project_id=entity_repository.project_id,
        content="This is a second observation",
        category="second",
        tags=["second"],
    )
    entity2.observations.extend([obs2, obs3])

    # This should UPDATE the existing entity, not fail with IntegrityError
    result2 = await entity_repository.upsert_entity(entity2)

    # Should update existing entity (same ID)
    assert result2.id == original_id
    assert result2.title == "Updated Title"
    assert result2.file_path == entity1.file_path
    assert result2.permalink == entity1.permalink

    # Observations should be updated
    assert len(result2.observations) == 2
    assert result2.observations[0].content == "This is an updated observation"
    assert result2.observations[1].content == "This is a second observation"


@pytest.mark.asyncio
async def test_upsert_entity_repeated_sync_same_file(entity_repository: EntityRepository):
    """Test that syncing the same file multiple times doesn't cause IntegrityError.

    This tests the specific scenario from issue #187 where files are being
    synced repeatedly and hitting UNIQUE constraint violations.
    """
    file_path = "processes/Complete Process for Uploading New Training Videos.md"
    permalink = "processes/complete-process-for-uploading-new-training-videos"

    # Create initial entity
    entity1 = Entity(
        project_id=entity_repository.project_id,
        title="Complete Process for Uploading New Training Videos",
        entity_type="note",
        permalink=permalink,
        file_path=file_path,
        content_type="text/markdown",
        checksum="abc123",
        created_at=datetime.now(timezone.utc),
        updated_at=datetime.now(timezone.utc),
    )

    result1 = await entity_repository.upsert_entity(entity1)
    first_id = result1.id

    # Simulate multiple sync attempts (like the infinite retry loop in the issue)
    for i in range(5):
        entity_new = Entity(
            project_id=entity_repository.project_id,
            title="Complete Process for Uploading New Training Videos",
            entity_type="note",
            permalink=permalink,
            file_path=file_path,
            content_type="text/markdown",
            checksum=f"def{456 + i}",  # Different checksum each time
            created_at=datetime.now(timezone.utc),
            updated_at=datetime.now(timezone.utc),
        )

        # Each upsert should succeed and update the existing entity
        result = await entity_repository.upsert_entity(entity_new)

        # Should always return the same entity (updated)
        assert result.id == first_id
        assert result.checksum == entity_new.checksum
        assert result.file_path == file_path
        assert result.permalink == permalink

```

--------------------------------------------------------------------------------
/tests/api/test_search_template.py:
--------------------------------------------------------------------------------

```python
"""Tests for the search template rendering."""

import datetime
import pytest

from basic_memory.api.template_loader import TemplateLoader
from basic_memory.schemas.search import SearchItemType, SearchResult


@pytest.fixture
def template_loader():
    """Return a TemplateLoader instance for testing."""
    return TemplateLoader()


@pytest.fixture
def search_result():
    """Create a sample SearchResult for testing."""
    return SearchResult(
        title="Test Search Result",
        type=SearchItemType.ENTITY,
        permalink="test/search-result",
        score=0.95,
        content="This is a test search result with some content.",
        file_path="/path/to/test/search-result.md",
        metadata={"created_at": datetime.datetime(2023, 2, 1, 12, 0)},
    )


@pytest.fixture
def context_with_results(search_result):
    """Create a sample context with search results."""
    return {
        "query": "test query",
        "timeframe": "30d",
        "has_results": True,
        "result_count": 1,
        "results": [search_result],
    }


@pytest.fixture
def context_without_results():
    """Create a sample context without search results."""
    return {
        "query": "empty query",
        "timeframe": None,
        "has_results": False,
        "result_count": 0,
        "results": [],
    }


@pytest.mark.asyncio
async def test_search_with_results(template_loader, context_with_results):
    """Test rendering the search template with results."""
    result = await template_loader.render("prompts/search.hbs", context_with_results)

    # Check that key elements are present
    assert 'Search Results for: "test query" (after 30d)' in result
    assert "1.0. Test Search Result" in result
    assert "Type**: entity" in result
    assert "Relevance Score**: 0.95" in result
    assert "This is a test search result with some content." in result
    assert 'read_note("test/search-result")' in result
    assert "Next Steps" in result
    assert "Synthesize and Capture Knowledge" in result


@pytest.mark.asyncio
async def test_search_without_results(template_loader, context_without_results):
    """Test rendering the search template without results."""
    result = await template_loader.render("prompts/search.hbs", context_without_results)

    # Check that key elements are present
    assert 'Search Results for: "empty query"' in result
    assert "I couldn't find any results for this query." in result
    assert "Opportunity to Capture Knowledge!" in result
    assert "write_note(" in result
    assert 'title="Empty query"' in result
    assert "Other Suggestions" in result


@pytest.mark.asyncio
async def test_multiple_search_results(template_loader):
    """Test rendering the search template with multiple results."""
    # Create multiple search results
    results = []
    for i in range(1, 6):  # Create 5 results
        results.append(
            SearchResult(
                title=f"Search Result {i}",
                type=SearchItemType.ENTITY,
                permalink=f"test/result-{i}",
                score=1.0 - (i * 0.1),  # Decreasing scores
                content=f"Content for result {i}",
                file_path=f"/path/to/result-{i}.md",
                metadata={},
            )
        )

    context = {
        "query": "multiple results",
        "timeframe": None,
        "has_results": True,
        "result_count": len(results),
        "results": results,
    }

    result = await template_loader.render("prompts/search.hbs", context)

    # Check that all results are rendered
    for i in range(1, 6):
        assert f"{i}.0. Search Result {i}" in result
        assert f"Content for result {i}" in result
        assert f'read_note("test/result-{i}")' in result


@pytest.mark.asyncio
async def test_capitalization_in_write_note_template(template_loader, context_with_results):
    """Test that the query is capitalized in the write_note template."""
    result = await template_loader.render("prompts/search.hbs", context_with_results)

    # The query should be capitalized in the suggested write_note call
    assert "Synthesis of Test query Information" in result


@pytest.mark.asyncio
async def test_timeframe_display(template_loader):
    """Test that the timeframe is displayed correctly when present, and not when absent."""
    # Context with timeframe
    context_with_timeframe = {
        "query": "with timeframe",
        "timeframe": "7d",
        "has_results": True,
        "result_count": 0,
        "results": [],
    }

    result_with_timeframe = await template_loader.render(
        "prompts/search.hbs", context_with_timeframe
    )
    assert 'Search Results for: "with timeframe" (after 7d)' in result_with_timeframe

    # Context without timeframe
    context_without_timeframe = {
        "query": "without timeframe",
        "timeframe": None,
        "has_results": True,
        "result_count": 0,
        "results": [],
    }

    result_without_timeframe = await template_loader.render(
        "prompts/search.hbs", context_without_timeframe
    )
    assert 'Search Results for: "without timeframe"' in result_without_timeframe
    assert 'Search Results for: "without timeframe" (after' not in result_without_timeframe

```

--------------------------------------------------------------------------------
/src/basic_memory/mcp/async_client.py:
--------------------------------------------------------------------------------

```python
from contextlib import asynccontextmanager, AbstractAsyncContextManager
from typing import AsyncIterator, Callable, Optional

from httpx import ASGITransport, AsyncClient, Timeout
from loguru import logger

from basic_memory.api.app import app as fastapi_app
from basic_memory.config import ConfigManager


# Optional factory override for dependency injection
_client_factory: Optional[Callable[[], AbstractAsyncContextManager[AsyncClient]]] = None


def set_client_factory(factory: Callable[[], AbstractAsyncContextManager[AsyncClient]]) -> None:
    """Override the default client factory (for cloud app, testing, etc).

    Args:
        factory: An async context manager that yields an AsyncClient

    Example:
        @asynccontextmanager
        async def custom_client_factory():
            async with AsyncClient(...) as client:
                yield client

        set_client_factory(custom_client_factory)
    """
    global _client_factory
    _client_factory = factory


@asynccontextmanager
async def get_client() -> AsyncIterator[AsyncClient]:
    """Get an AsyncClient as a context manager.

    This function provides proper resource management for HTTP clients,
    ensuring connections are closed after use. It supports three modes:

    1. **Factory injection** (cloud app, tests):
       If a custom factory is set via set_client_factory(), use that.

    2. **CLI cloud mode**:
       When cloud_mode_enabled is True, create HTTP client with auth
       token from CLIAuth for requests to cloud proxy endpoint.

    3. **Local mode** (default):
       Use ASGI transport for in-process requests to local FastAPI app.

    Usage:
        async with get_client() as client:
            response = await client.get("/path")

    Yields:
        AsyncClient: Configured HTTP client for the current mode

    Raises:
        RuntimeError: If cloud mode is enabled but user is not authenticated
    """
    if _client_factory:
        # Use injected factory (cloud app, tests)
        async with _client_factory() as client:
            yield client
    else:
        # Default: create based on config
        config = ConfigManager().config
        timeout = Timeout(
            connect=10.0,  # 10 seconds for connection
            read=30.0,  # 30 seconds for reading response
            write=30.0,  # 30 seconds for writing request
            pool=30.0,  # 30 seconds for connection pool
        )

        if config.cloud_mode_enabled:
            # CLI cloud mode: inject auth when creating client
            from basic_memory.cli.auth import CLIAuth

            auth = CLIAuth(client_id=config.cloud_client_id, authkit_domain=config.cloud_domain)
            token = await auth.get_valid_token()

            if not token:
                raise RuntimeError(
                    "Cloud mode enabled but not authenticated. "
                    "Run 'basic-memory cloud login' first."
                )

            # Auth header set ONCE at client creation
            proxy_base_url = f"{config.cloud_host}/proxy"
            logger.info(f"Creating HTTP client for cloud proxy at: {proxy_base_url}")
            async with AsyncClient(
                base_url=proxy_base_url,
                headers={"Authorization": f"Bearer {token}"},
                timeout=timeout,
            ) as client:
                yield client
        else:
            # Local mode: ASGI transport for in-process calls
            # Note: ASGI transport does NOT trigger FastAPI lifespan, so no special handling needed
            logger.info("Creating ASGI client for local Basic Memory API")
            async with AsyncClient(
                transport=ASGITransport(app=fastapi_app), base_url="http://test", timeout=timeout
            ) as client:
                yield client


def create_client() -> AsyncClient:
    """Create an HTTP client based on configuration.

    DEPRECATED: Use get_client() context manager instead for proper resource management.

    This function is kept for backward compatibility but will be removed in a future version.
    The returned client should be closed manually by calling await client.aclose().

    Returns:
        AsyncClient configured for either local ASGI or remote proxy
    """
    config_manager = ConfigManager()
    config = config_manager.config

    # Configure timeout for longer operations like write_note
    # Default httpx timeout is 5 seconds which is too short for file operations
    timeout = Timeout(
        connect=10.0,  # 10 seconds for connection
        read=30.0,  # 30 seconds for reading response
        write=30.0,  # 30 seconds for writing request
        pool=30.0,  # 30 seconds for connection pool
    )

    if config.cloud_mode_enabled:
        # Use HTTP transport to proxy endpoint
        proxy_base_url = f"{config.cloud_host}/proxy"
        logger.info(f"Creating HTTP client for proxy at: {proxy_base_url}")
        return AsyncClient(base_url=proxy_base_url, timeout=timeout)
    else:
        # Default: use ASGI transport for local API (development mode)
        logger.info("Creating ASGI client for local Basic Memory API")
        return AsyncClient(
            transport=ASGITransport(app=fastapi_app), base_url="http://test", timeout=timeout
        )

```

--------------------------------------------------------------------------------
/src/basic_memory/alembic/versions/314f1ea54dc4_add_postgres_full_text_search_support_.py:
--------------------------------------------------------------------------------

```python
"""Add Postgres full-text search support with tsvector and GIN indexes

Revision ID: 314f1ea54dc4
Revises: e7e1f4367280
Create Date: 2025-11-15 18:05:01.025405

"""

from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision: str = "314f1ea54dc4"
down_revision: Union[str, None] = "e7e1f4367280"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
    """Add PostgreSQL full-text search support.

    This migration:
    1. Creates search_index table for Postgres (SQLite uses FTS5 virtual table)
    2. Adds generated tsvector column for full-text search
    3. Creates GIN index on the tsvector column for fast text queries
    4. Creates GIN index on metadata JSONB column for fast containment queries

    Note: These changes only apply to Postgres. SQLite continues to use FTS5 virtual tables.
    """
    # Check if we're using Postgres
    connection = op.get_bind()
    if connection.dialect.name == "postgresql":
        # Create search_index table for Postgres
        # For SQLite, this is a FTS5 virtual table created elsewhere
        from sqlalchemy.dialects.postgresql import JSONB

        op.create_table(
            "search_index",
            sa.Column("id", sa.Integer(), nullable=False),  # Entity IDs are integers
            sa.Column("project_id", sa.Integer(), nullable=False),  # Multi-tenant isolation
            sa.Column("title", sa.Text(), nullable=True),
            sa.Column("content_stems", sa.Text(), nullable=True),
            sa.Column("content_snippet", sa.Text(), nullable=True),
            sa.Column("permalink", sa.String(), nullable=True),  # Nullable for non-markdown files
            sa.Column("file_path", sa.String(), nullable=True),
            sa.Column("type", sa.String(), nullable=True),
            sa.Column("from_id", sa.Integer(), nullable=True),  # Relation IDs are integers
            sa.Column("to_id", sa.Integer(), nullable=True),  # Relation IDs are integers
            sa.Column("relation_type", sa.String(), nullable=True),
            sa.Column("entity_id", sa.Integer(), nullable=True),  # Entity IDs are integers
            sa.Column("category", sa.String(), nullable=True),
            sa.Column("metadata", JSONB(), nullable=True),  # Use JSONB for Postgres
            sa.Column("created_at", sa.DateTime(timezone=True), nullable=True),
            sa.Column("updated_at", sa.DateTime(timezone=True), nullable=True),
            sa.PrimaryKeyConstraint(
                "id", "type", "project_id"
            ),  # Composite key: id can repeat across types
            sa.ForeignKeyConstraint(
                ["project_id"],
                ["project.id"],
                name="fk_search_index_project_id",
                ondelete="CASCADE",
            ),
            if_not_exists=True,
        )

        # Create index on project_id for efficient multi-tenant queries
        op.create_index(
            "ix_search_index_project_id",
            "search_index",
            ["project_id"],
            unique=False,
        )

        # Create unique partial index on permalink for markdown files
        # Non-markdown files don't have permalinks, so we use a partial index
        op.execute("""
            CREATE UNIQUE INDEX uix_search_index_permalink_project
            ON search_index (permalink, project_id)
            WHERE permalink IS NOT NULL
        """)

        # Add tsvector column as a GENERATED ALWAYS column
        # This automatically updates when title or content_stems change
        op.execute("""
            ALTER TABLE search_index
            ADD COLUMN textsearchable_index_col tsvector
            GENERATED ALWAYS AS (
                to_tsvector('english',
                    coalesce(title, '') || ' ' ||
                    coalesce(content_stems, '')
                )
            ) STORED
        """)

        # Create GIN index on tsvector column for fast full-text search
        op.create_index(
            "idx_search_index_fts",
            "search_index",
            ["textsearchable_index_col"],
            unique=False,
            postgresql_using="gin",
        )

        # Create GIN index on metadata JSONB for fast containment queries
        # Using jsonb_path_ops for smaller index size and better performance
        op.execute("""
            CREATE INDEX idx_search_index_metadata_gin
            ON search_index
            USING GIN (metadata jsonb_path_ops)
        """)


def downgrade() -> None:
    """Remove PostgreSQL full-text search support."""
    connection = op.get_bind()
    if connection.dialect.name == "postgresql":
        # Drop indexes first
        op.execute("DROP INDEX IF EXISTS idx_search_index_metadata_gin")
        op.drop_index("idx_search_index_fts", table_name="search_index")
        op.execute("DROP INDEX IF EXISTS uix_search_index_permalink_project")
        op.drop_index("ix_search_index_project_id", table_name="search_index")

        # Drop the generated column
        op.execute("ALTER TABLE search_index DROP COLUMN IF EXISTS textsearchable_index_col")

        # Drop the search_index table
        op.drop_table("search_index")

```

--------------------------------------------------------------------------------
/tests/cli/test_project_add_with_local_path.py:
--------------------------------------------------------------------------------

```python
"""Tests for bm project add with --local-path flag."""

import json
from pathlib import Path
from contextlib import asynccontextmanager

import pytest
from typer.testing import CliRunner

from basic_memory.cli.app import app


@pytest.fixture
def runner():
    return CliRunner()


@pytest.fixture
def mock_config(tmp_path, monkeypatch):
    """Create a mock config in cloud mode using environment variables."""
    # Invalidate config cache to ensure clean state for each test
    from basic_memory import config as config_module

    config_module._CONFIG_CACHE = None

    config_dir = tmp_path / ".basic-memory"
    config_dir.mkdir(parents=True, exist_ok=True)
    config_file = config_dir / "config.json"

    config_data = {
        "env": "dev",
        "projects": {},
        "default_project": "main",
        "cloud_mode": True,
        "cloud_projects": {},
    }

    config_file.write_text(json.dumps(config_data, indent=2))

    # Set HOME to tmp_path so ConfigManager uses our test config
    monkeypatch.setenv("HOME", str(tmp_path))

    yield config_file


@pytest.fixture
def mock_api_client(monkeypatch):
    """Stub the API client for project add without stdlib mocks."""
    import basic_memory.cli.commands.project as project_cmd

    @asynccontextmanager
    async def fake_get_client():
        yield object()

    class _Resp:
        def json(self):
            return {
                "message": "Project 'test-project' added successfully",
                "status": "success",
                "default": False,
                "old_project": None,
                "new_project": {
                    "id": 1,
                    "external_id": "12345678-1234-1234-1234-123456789012",
                    "name": "test-project",
                    "path": "/test-project",
                    "is_default": False,
                },
            }

    calls: list[tuple[str, dict]] = []

    async def fake_call_post(client, path: str, json: dict, **kwargs):
        calls.append((path, json))
        return _Resp()

    monkeypatch.setattr(project_cmd, "get_client", fake_get_client)
    monkeypatch.setattr(project_cmd, "call_post", fake_call_post)

    return calls


def test_project_add_with_local_path_saves_to_config(
    runner, mock_config, mock_api_client, tmp_path
):
    """Test that bm project add --local-path saves sync path to config."""
    local_sync_dir = tmp_path / "sync" / "test-project"

    result = runner.invoke(
        app,
        [
            "project",
            "add",
            "test-project",
            "--local-path",
            str(local_sync_dir),
        ],
    )

    assert result.exit_code == 0, f"Exit code: {result.exit_code}, Stdout: {result.stdout}"
    assert "Project 'test-project' added successfully" in result.stdout
    assert "Local sync path configured" in result.stdout
    # Check path is present (may be line-wrapped in output)
    assert "test-project" in result.stdout
    assert "sync" in result.stdout

    # Verify config was updated
    config_data = json.loads(mock_config.read_text())
    assert "test-project" in config_data["cloud_projects"]
    # Use as_posix() for cross-platform compatibility (Windows uses backslashes)
    assert config_data["cloud_projects"]["test-project"]["local_path"] == local_sync_dir.as_posix()
    assert config_data["cloud_projects"]["test-project"]["last_sync"] is None
    assert config_data["cloud_projects"]["test-project"]["bisync_initialized"] is False

    # Verify local directory was created
    assert local_sync_dir.exists()
    assert local_sync_dir.is_dir()


def test_project_add_without_local_path_no_config_entry(runner, mock_config, mock_api_client):
    """Test that bm project add without --local-path doesn't save to config."""
    result = runner.invoke(
        app,
        ["project", "add", "test-project"],
    )

    assert result.exit_code == 0
    assert "Project 'test-project' added successfully" in result.stdout
    assert "Local sync path configured" not in result.stdout

    # Verify config was NOT updated with cloud_projects entry
    config_data = json.loads(mock_config.read_text())
    assert "test-project" not in config_data.get("cloud_projects", {})


def test_project_add_local_path_expands_tilde(runner, mock_config, mock_api_client):
    """Test that --local-path ~/path expands to absolute path."""
    result = runner.invoke(
        app,
        ["project", "add", "test-project", "--local-path", "~/test-sync"],
    )

    assert result.exit_code == 0

    # Verify config has expanded path
    config_data = json.loads(mock_config.read_text())
    local_path = config_data["cloud_projects"]["test-project"]["local_path"]
    # Path should be absolute (starts with / on Unix or drive letter on Windows)
    assert Path(local_path).is_absolute()
    assert "~" not in local_path
    assert local_path.endswith("/test-sync")


def test_project_add_local_path_creates_nested_directories(
    runner, mock_config, mock_api_client, tmp_path
):
    """Test that --local-path creates nested directories."""
    nested_path = tmp_path / "a" / "b" / "c" / "test-project"

    result = runner.invoke(
        app,
        ["project", "add", "test-project", "--local-path", str(nested_path)],
    )

    assert result.exit_code == 0
    assert nested_path.exists()
    assert nested_path.is_dir()

```

--------------------------------------------------------------------------------
/src/basic_memory/mcp/clients/knowledge.py:
--------------------------------------------------------------------------------

```python
"""Typed client for knowledge/entity API operations.

Encapsulates all /v2/projects/{project_id}/knowledge/* endpoints.
"""

from typing import Any

from httpx import AsyncClient

from basic_memory.mcp.tools.utils import call_get, call_post, call_put, call_patch, call_delete
from basic_memory.schemas.response import EntityResponse, DeleteEntitiesResponse


class KnowledgeClient:
    """Typed client for knowledge graph entity operations.

    Centralizes:
    - API path construction for /v2/projects/{project_id}/knowledge/*
    - Response validation via Pydantic models
    - Consistent error handling through call_* utilities

    Usage:
        async with get_client() as http_client:
            client = KnowledgeClient(http_client, project_id)
            entity = await client.create_entity(entity_data)
    """

    def __init__(self, http_client: AsyncClient, project_id: str):
        """Initialize the knowledge client.

        Args:
            http_client: HTTPX AsyncClient for making requests
            project_id: Project external_id (UUID) for API calls
        """
        self.http_client = http_client
        self.project_id = project_id
        self._base_path = f"/v2/projects/{project_id}/knowledge"

    # --- Entity CRUD Operations ---

    async def create_entity(self, entity_data: dict[str, Any]) -> EntityResponse:
        """Create a new entity.

        Args:
            entity_data: Entity data including title, content, folder, etc.

        Returns:
            EntityResponse with created entity details

        Raises:
            ToolError: If the request fails
        """
        response = await call_post(
            self.http_client,
            f"{self._base_path}/entities",
            json=entity_data,
        )
        return EntityResponse.model_validate(response.json())

    async def update_entity(self, entity_id: str, entity_data: dict[str, Any]) -> EntityResponse:
        """Update an existing entity (full replacement).

        Args:
            entity_id: Entity external_id (UUID)
            entity_data: Complete entity data for replacement

        Returns:
            EntityResponse with updated entity details

        Raises:
            ToolError: If the request fails
        """
        response = await call_put(
            self.http_client,
            f"{self._base_path}/entities/{entity_id}",
            json=entity_data,
        )
        return EntityResponse.model_validate(response.json())

    async def get_entity(self, entity_id: str) -> EntityResponse:
        """Get an entity by ID.

        Args:
            entity_id: Entity external_id (UUID)

        Returns:
            EntityResponse with entity details

        Raises:
            ToolError: If the entity is not found or request fails
        """
        response = await call_get(
            self.http_client,
            f"{self._base_path}/entities/{entity_id}",
        )
        return EntityResponse.model_validate(response.json())

    async def patch_entity(self, entity_id: str, patch_data: dict[str, Any]) -> EntityResponse:
        """Partially update an entity.

        Args:
            entity_id: Entity external_id (UUID)
            patch_data: Partial entity data to update

        Returns:
            EntityResponse with updated entity details

        Raises:
            ToolError: If the request fails
        """
        response = await call_patch(
            self.http_client,
            f"{self._base_path}/entities/{entity_id}",
            json=patch_data,
        )
        return EntityResponse.model_validate(response.json())

    async def delete_entity(self, entity_id: str) -> DeleteEntitiesResponse:
        """Delete an entity.

        Args:
            entity_id: Entity external_id (UUID)

        Returns:
            DeleteEntitiesResponse confirming deletion

        Raises:
            ToolError: If the entity is not found or request fails
        """
        response = await call_delete(
            self.http_client,
            f"{self._base_path}/entities/{entity_id}",
        )
        return DeleteEntitiesResponse.model_validate(response.json())

    async def move_entity(self, entity_id: str, destination_path: str) -> EntityResponse:
        """Move an entity to a new location.

        Args:
            entity_id: Entity external_id (UUID)
            destination_path: New file path for the entity

        Returns:
            EntityResponse with updated entity details

        Raises:
            ToolError: If the request fails
        """
        response = await call_put(
            self.http_client,
            f"{self._base_path}/entities/{entity_id}/move",
            json={"destination_path": destination_path},
        )
        return EntityResponse.model_validate(response.json())

    # --- Resolution ---

    async def resolve_entity(self, identifier: str) -> str:
        """Resolve a string identifier to an entity external_id.

        Args:
            identifier: The identifier to resolve (permalink, title, or path)

        Returns:
            The resolved entity external_id (UUID)

        Raises:
            ToolError: If the identifier cannot be resolved
        """
        response = await call_post(
            self.http_client,
            f"{self._base_path}/resolve",
            json={"identifier": identifier},
        )
        data = response.json()
        return data["external_id"]

```

--------------------------------------------------------------------------------
/src/basic_memory/importers/memory_json_importer.py:
--------------------------------------------------------------------------------

```python
"""Memory JSON import service for Basic Memory."""

import logging
from typing import Any, Dict, List, Optional

from basic_memory.markdown.schemas import EntityFrontmatter, EntityMarkdown, Observation, Relation
from basic_memory.importers.base import Importer
from basic_memory.schemas.importer import EntityImportResult

logger = logging.getLogger(__name__)


class MemoryJsonImporter(Importer[EntityImportResult]):
    """Service for importing memory.json format data."""

    def handle_error(  # pragma: no cover
        self, message: str, error: Optional[Exception] = None
    ) -> EntityImportResult:
        """Return a failed EntityImportResult with an error message."""
        error_msg = f"{message}: {error}" if error else message
        return EntityImportResult(
            import_count={},
            success=False,
            error_message=error_msg,
            entities=0,
            relations=0,
            skipped_entities=0,
        )

    async def import_data(
        self, source_data, destination_folder: str = "", **kwargs: Any
    ) -> EntityImportResult:
        """Import entities and relations from a memory.json file.

        Args:
            source_data: Path to the memory.json file.
            destination_folder: Optional destination folder within the project.
            **kwargs: Additional keyword arguments.

        Returns:
            EntityImportResult containing statistics and status of the import.
        """
        try:
            # First pass - collect all relations by source entity
            entity_relations: Dict[str, List[Relation]] = {}
            entities: Dict[str, Dict[str, Any]] = {}
            skipped_entities: int = 0

            # Ensure the destination folder exists if provided
            if destination_folder:  # pragma: no cover
                await self.ensure_folder_exists(destination_folder)

            # First pass - collect entities and relations
            for line in source_data:
                data = line
                if data["type"] == "entity":
                    # Handle different possible name keys
                    entity_name = data.get("name") or data.get("entityName") or data.get("id")
                    if not entity_name:
                        logger.warning(f"Entity missing name field: {data}")  # pragma: no cover
                        skipped_entities += 1  # pragma: no cover
                        continue  # pragma: no cover
                    entities[entity_name] = data
                elif data["type"] == "relation":
                    # Store relation with its source entity
                    source = data.get("from") or data.get("from_id")
                    if source not in entity_relations:
                        entity_relations[source] = []
                    entity_relations[source].append(
                        Relation(
                            type=data.get("relationType") or data.get("relation_type"),
                            target=data.get("to") or data.get("to_id"),
                        )
                    )

            # Second pass - create and write entities
            entities_created = 0
            for name, entity_data in entities.items():
                # Get entity type with fallback
                entity_type = entity_data.get("entityType") or entity_data.get("type") or "entity"

                # Build permalink with optional destination folder prefix
                permalink = (
                    f"{destination_folder}/{entity_type}/{name}"
                    if destination_folder
                    else f"{entity_type}/{name}"
                )

                # Ensure entity type directory exists using FileService with relative path
                entity_type_dir = (
                    f"{destination_folder}/{entity_type}" if destination_folder else entity_type
                )
                await self.file_service.ensure_directory(entity_type_dir)

                # Get observations with fallback to empty list
                observations = entity_data.get("observations", [])

                entity = EntityMarkdown(
                    frontmatter=EntityFrontmatter(
                        metadata={
                            "type": entity_type,
                            "title": name,
                            "permalink": permalink,
                        }
                    ),
                    content=f"# {name}\n",
                    observations=[Observation(content=obs) for obs in observations],
                    relations=entity_relations.get(name, []),
                )

                # Write file using relative path - FileService handles base_path
                file_path = f"{entity.frontmatter.metadata['permalink']}.md"
                await self.write_entity(entity, file_path)
                entities_created += 1

            relations_count = sum(len(rels) for rels in entity_relations.values())

            return EntityImportResult(
                import_count={"entities": entities_created, "relations": relations_count},
                success=True,
                entities=entities_created,
                relations=relations_count,
                skipped_entities=skipped_entities,
            )

        except Exception as e:  # pragma: no cover
            logger.exception("Failed to import memory.json")
            return self.handle_error("Failed to import memory.json", e)

```

--------------------------------------------------------------------------------
/tests/api/test_memory_router.py:
--------------------------------------------------------------------------------

```python
"""Tests for memory router endpoints."""

from datetime import datetime

import pytest

from basic_memory.schemas.memory import GraphContext


@pytest.mark.asyncio
async def test_get_memory_context(client, test_graph, project_url):
    """Test getting context from memory URL."""
    response = await client.get(f"{project_url}/memory/test/root")
    assert response.status_code == 200

    context = GraphContext(**response.json())
    assert len(context.results) == 1
    assert context.results[0].primary_result.permalink == "test/root"
    assert len(context.results[0].related_results) > 0

    # Verify metadata
    assert context.metadata.uri == "test/root"
    assert context.metadata.depth == 1  # default depth
    assert isinstance(context.metadata.generated_at, datetime)
    assert context.metadata.primary_count + context.metadata.related_count > 0
    assert context.metadata.total_results is not None  # Backwards compatibility field


@pytest.mark.asyncio
async def test_get_memory_context_pagination(client, test_graph, project_url):
    """Test getting context from memory URL."""
    response = await client.get(f"{project_url}/memory/test/root?page=1&page_size=1")
    assert response.status_code == 200

    context = GraphContext(**response.json())
    assert len(context.results) == 1
    assert context.results[0].primary_result.permalink == "test/root"
    assert len(context.results[0].related_results) > 0

    # Verify metadata
    assert context.metadata.uri == "test/root"
    assert context.metadata.depth == 1  # default depth
    assert isinstance(context.metadata.generated_at, datetime)
    assert context.metadata.primary_count > 0


@pytest.mark.asyncio
async def test_get_memory_context_pattern(client, test_graph, project_url):
    """Test getting context with pattern matching."""
    response = await client.get(f"{project_url}/memory/test/*")
    assert response.status_code == 200

    context = GraphContext(**response.json())
    assert len(context.results) > 1  # Should match multiple test/* paths
    assert all("test/" in item.primary_result.permalink for item in context.results)


@pytest.mark.asyncio
async def test_get_memory_context_depth(client, test_graph, project_url):
    """Test depth parameter affects relation traversal."""
    # With depth=1, should only get immediate connections
    response = await client.get(f"{project_url}/memory/test/root?depth=1&max_results=20")
    assert response.status_code == 200
    context1 = GraphContext(**response.json())

    # With depth=2, should get deeper connections
    response = await client.get(f"{project_url}/memory/test/root?depth=3&max_results=20")
    assert response.status_code == 200
    context2 = GraphContext(**response.json())

    # Calculate total related items in all result items
    total_related1 = sum(len(item.related_results) for item in context1.results)
    total_related2 = sum(len(item.related_results) for item in context2.results)

    assert total_related2 > total_related1


@pytest.mark.asyncio
async def test_get_memory_context_timeframe(client, test_graph, project_url):
    """Test timeframe parameter filters by date."""
    # Recent timeframe
    response = await client.get(f"{project_url}/memory/test/root?timeframe=1d")
    assert response.status_code == 200
    recent = GraphContext(**response.json())

    # Longer timeframe
    response = await client.get(f"{project_url}/memory/test/root?timeframe=30d")
    assert response.status_code == 200
    older = GraphContext(**response.json())

    # Calculate total related items
    total_recent_related = (
        sum(len(item.related_results) for item in recent.results) if recent.results else 0
    )
    total_older_related = (
        sum(len(item.related_results) for item in older.results) if older.results else 0
    )

    assert total_older_related >= total_recent_related


@pytest.mark.asyncio
async def test_not_found(client, project_url):
    """Test handling of non-existent paths."""
    response = await client.get(f"{project_url}/memory/test/does-not-exist")
    assert response.status_code == 200

    context = GraphContext(**response.json())
    assert len(context.results) == 0


@pytest.mark.asyncio
async def test_recent_activity(client, test_graph, project_url):
    """Test handling of recent activity."""
    response = await client.get(f"{project_url}/memory/recent")
    assert response.status_code == 200

    context = GraphContext(**response.json())
    assert len(context.results) > 0
    assert context.metadata.primary_count > 0


@pytest.mark.asyncio
async def test_recent_activity_pagination(client, test_graph, project_url):
    """Test pagination for recent activity."""
    response = await client.get(f"{project_url}/memory/recent?page=1&page_size=1")
    assert response.status_code == 200

    context = GraphContext(**response.json())
    assert len(context.results) == 1
    assert context.page == 1
    assert context.page_size == 1


@pytest.mark.asyncio
async def test_recent_activity_by_type(client, test_graph, project_url):
    """Test filtering recent activity by type."""
    response = await client.get(f"{project_url}/memory/recent?type=relation&type=observation")
    assert response.status_code == 200

    context = GraphContext(**response.json())
    assert len(context.results) > 0

    # Check for relation and observation types in primary results
    primary_types = [item.primary_result.type for item in context.results]
    assert "relation" in primary_types or "observation" in primary_types

```

--------------------------------------------------------------------------------
/tests/markdown/test_markdown_processor.py:
--------------------------------------------------------------------------------

```python
"""Tests for MarkdownProcessor.

Tests focus on the Read -> Modify -> Write pattern and content preservation.
"""

from datetime import datetime
from pathlib import Path

import pytest

from basic_memory.markdown.markdown_processor import DirtyFileError, MarkdownProcessor
from basic_memory.markdown.schemas import (
    EntityFrontmatter,
    EntityMarkdown,
    Observation,
    Relation,
)


@pytest.mark.asyncio
async def test_write_new_minimal_file(markdown_processor: MarkdownProcessor, tmp_path: Path):
    """Test creating new file with just title."""
    path = tmp_path / "test.md"

    # Create minimal markdown schema
    metadata = {}
    metadata["title"] = "Test Note"
    metadata["type"] = "note"
    metadata["permalink"] = "test"
    metadata["created"] = datetime(2024, 1, 1)
    metadata["modified"] = datetime(2024, 1, 1)
    metadata["tags"] = ["test"]
    markdown = EntityMarkdown(
        frontmatter=EntityFrontmatter(
            metadata=metadata,
        ),
        content="",
    )

    # Write file
    await markdown_processor.write_file(path, markdown)

    # Read back and verify
    content = path.read_text(encoding="utf-8")
    assert "---" in content  # Has frontmatter
    assert "type: note" in content
    assert "permalink: test" in content
    assert "# Test Note" in content  # Added title
    assert "tags:" in content
    assert "- test" in content

    # Should not have empty sections
    assert "## Observations" not in content
    assert "## Relations" not in content


@pytest.mark.asyncio
async def test_write_new_file_with_content(markdown_processor: MarkdownProcessor, tmp_path: Path):
    """Test creating new file with content and sections."""
    path = tmp_path / "test.md"

    # Create markdown with content and sections
    markdown = EntityMarkdown(
        frontmatter=EntityFrontmatter(
            type="note",
            permalink="test",
            title="Test Note",
            created=datetime(2024, 1, 1),
            modified=datetime(2024, 1, 1),
        ),
        content="# Custom Title\n\nMy content here.\nMultiple lines.",
        observations=[
            Observation(
                content="Test observation #test",
                category="tech",
                tags=["test"],
                context="test context",
            ),
        ],
        relations=[
            Relation(
                type="relates_to",
                target="other-note",
                context="test relation",
            ),
        ],
    )

    # Write file
    await markdown_processor.write_file(path, markdown)

    # Read back and verify
    content = path.read_text(encoding="utf-8")

    # Check content preserved exactly
    assert "# Custom Title" in content
    assert "My content here." in content
    assert "Multiple lines." in content

    # Check sections formatted correctly
    assert "- [tech] Test observation #test (test context)" in content
    assert "- relates_to [[other-note]] (test relation)" in content


@pytest.mark.asyncio
async def test_update_preserves_content(markdown_processor: MarkdownProcessor, tmp_path: Path):
    """Test that updating file preserves existing content."""
    path = tmp_path / "test.md"

    # Create initial file
    initial = EntityMarkdown(
        frontmatter=EntityFrontmatter(
            type="note",
            permalink="test",
            title="Test Note",
            created=datetime(2024, 1, 1),
            modified=datetime(2024, 1, 1),
        ),
        content="# My Note\n\nOriginal content here.",
        observations=[
            Observation(content="First observation", category="note"),
        ],
    )

    checksum = await markdown_processor.write_file(path, initial)

    # Update with new observation
    updated = EntityMarkdown(
        frontmatter=initial.frontmatter,
        content=initial.content,  # Preserve original content
        observations=[
            initial.observations[0],  # Keep original observation
            Observation(content="Second observation", category="tech"),  # Add new one
        ],
    )

    # Update file
    await markdown_processor.write_file(path, updated, expected_checksum=checksum)

    # Read back and verify
    result = await markdown_processor.read_file(path)

    # Original content preserved
    assert "Original content here." in result.content

    # Both observations present
    assert len(result.observations) == 2
    assert any(o.content == "First observation" for o in result.observations)
    assert any(o.content == "Second observation" for o in result.observations)


@pytest.mark.asyncio
async def test_dirty_file_detection(markdown_processor: MarkdownProcessor, tmp_path: Path):
    """Test detection of file modifications."""
    path = tmp_path / "test.md"

    # Create initial file
    initial = EntityMarkdown(
        frontmatter=EntityFrontmatter(
            type="note",
            permalink="test",
            title="Test Note",
            created=datetime(2024, 1, 1),
            modified=datetime(2024, 1, 1),
        ),
        content="Initial content",
    )

    checksum = await markdown_processor.write_file(path, initial)

    # Modify file directly
    path.write_text(path.read_text(encoding="utf-8") + "\nModified!")

    # Try to update with old checksum
    update = EntityMarkdown(
        frontmatter=initial.frontmatter,
        content="New content",
    )

    # Should raise DirtyFileError
    with pytest.raises(DirtyFileError):
        await markdown_processor.write_file(path, update, expected_checksum=checksum)

    # Should succeed without checksum
    new_checksum = await markdown_processor.write_file(path, update)
    assert new_checksum != checksum

```

--------------------------------------------------------------------------------
/src/basic_memory/deps/repositories.py:
--------------------------------------------------------------------------------

```python
"""Repository dependency injection for basic-memory.

This module provides repository dependencies:
- EntityRepository
- ObservationRepository
- RelationRepository
- SearchRepository

Each repository is scoped to a project ID from the request.
"""

from typing import Annotated

from fastapi import Depends

from basic_memory.deps.db import SessionMakerDep
from basic_memory.deps.projects import (
    ProjectIdDep,
    ProjectIdPathDep,
    ProjectExternalIdPathDep,
)
from basic_memory.repository.entity_repository import EntityRepository
from basic_memory.repository.observation_repository import ObservationRepository
from basic_memory.repository.relation_repository import RelationRepository
from basic_memory.repository.search_repository import SearchRepository, create_search_repository


# --- Entity Repository ---


async def get_entity_repository(
    session_maker: SessionMakerDep,
    project_id: ProjectIdDep,
) -> EntityRepository:
    """Create an EntityRepository instance for the current project."""
    return EntityRepository(session_maker, project_id=project_id)


EntityRepositoryDep = Annotated[EntityRepository, Depends(get_entity_repository)]


async def get_entity_repository_v2(  # pragma: no cover
    session_maker: SessionMakerDep,
    project_id: ProjectIdPathDep,
) -> EntityRepository:
    """Create an EntityRepository instance for v2 API (uses integer project_id from path)."""
    return EntityRepository(session_maker, project_id=project_id)


EntityRepositoryV2Dep = Annotated[EntityRepository, Depends(get_entity_repository_v2)]


async def get_entity_repository_v2_external(
    session_maker: SessionMakerDep,
    project_id: ProjectExternalIdPathDep,
) -> EntityRepository:
    """Create an EntityRepository instance for v2 API (uses external_id from path)."""
    return EntityRepository(session_maker, project_id=project_id)


EntityRepositoryV2ExternalDep = Annotated[
    EntityRepository, Depends(get_entity_repository_v2_external)
]


# --- Observation Repository ---


async def get_observation_repository(
    session_maker: SessionMakerDep,
    project_id: ProjectIdDep,
) -> ObservationRepository:
    """Create an ObservationRepository instance for the current project."""
    return ObservationRepository(session_maker, project_id=project_id)


ObservationRepositoryDep = Annotated[ObservationRepository, Depends(get_observation_repository)]


async def get_observation_repository_v2(  # pragma: no cover
    session_maker: SessionMakerDep,
    project_id: ProjectIdPathDep,
) -> ObservationRepository:
    """Create an ObservationRepository instance for v2 API."""
    return ObservationRepository(session_maker, project_id=project_id)


ObservationRepositoryV2Dep = Annotated[
    ObservationRepository, Depends(get_observation_repository_v2)
]


async def get_observation_repository_v2_external(
    session_maker: SessionMakerDep,
    project_id: ProjectExternalIdPathDep,
) -> ObservationRepository:
    """Create an ObservationRepository instance for v2 API (uses external_id)."""
    return ObservationRepository(session_maker, project_id=project_id)


ObservationRepositoryV2ExternalDep = Annotated[
    ObservationRepository, Depends(get_observation_repository_v2_external)
]


# --- Relation Repository ---


async def get_relation_repository(
    session_maker: SessionMakerDep,
    project_id: ProjectIdDep,
) -> RelationRepository:
    """Create a RelationRepository instance for the current project."""
    return RelationRepository(session_maker, project_id=project_id)


RelationRepositoryDep = Annotated[RelationRepository, Depends(get_relation_repository)]


async def get_relation_repository_v2(  # pragma: no cover
    session_maker: SessionMakerDep,
    project_id: ProjectIdPathDep,
) -> RelationRepository:
    """Create a RelationRepository instance for v2 API."""
    return RelationRepository(session_maker, project_id=project_id)


RelationRepositoryV2Dep = Annotated[RelationRepository, Depends(get_relation_repository_v2)]


async def get_relation_repository_v2_external(
    session_maker: SessionMakerDep,
    project_id: ProjectExternalIdPathDep,
) -> RelationRepository:
    """Create a RelationRepository instance for v2 API (uses external_id)."""
    return RelationRepository(session_maker, project_id=project_id)


RelationRepositoryV2ExternalDep = Annotated[
    RelationRepository, Depends(get_relation_repository_v2_external)
]


# --- Search Repository ---


async def get_search_repository(
    session_maker: SessionMakerDep,
    project_id: ProjectIdDep,
) -> SearchRepository:
    """Create a backend-specific SearchRepository instance for the current project.

    Uses factory function to return SQLiteSearchRepository or PostgresSearchRepository
    based on database backend configuration.
    """
    return create_search_repository(session_maker, project_id=project_id)


SearchRepositoryDep = Annotated[SearchRepository, Depends(get_search_repository)]


async def get_search_repository_v2(  # pragma: no cover
    session_maker: SessionMakerDep,
    project_id: ProjectIdPathDep,
) -> SearchRepository:
    """Create a SearchRepository instance for v2 API."""
    return create_search_repository(session_maker, project_id=project_id)


SearchRepositoryV2Dep = Annotated[SearchRepository, Depends(get_search_repository_v2)]


async def get_search_repository_v2_external(
    session_maker: SessionMakerDep,
    project_id: ProjectExternalIdPathDep,
) -> SearchRepository:
    """Create a SearchRepository instance for v2 API (uses external_id)."""
    return create_search_repository(session_maker, project_id=project_id)


SearchRepositoryV2ExternalDep = Annotated[
    SearchRepository, Depends(get_search_repository_v2_external)
]

```

--------------------------------------------------------------------------------
/tests/services/test_project_removal_bug.py:
--------------------------------------------------------------------------------

```python
"""Test for project removal bug #254."""

import os
import tempfile
from datetime import timezone, datetime
from pathlib import Path

import pytest

from basic_memory.services.project_service import ProjectService


@pytest.mark.asyncio
async def test_remove_project_with_related_entities(project_service: ProjectService):
    """Test removing a project that has related entities (reproduces issue #254).

    This test verifies that projects with related entities (entities, observations, relations)
    can be properly deleted without foreign key constraint violations.

    The bug was caused by missing foreign key constraints with CASCADE DELETE after
    the project table was recreated in migration 647e7a75e2cd.
    """
    test_project_name = f"test-remove-with-entities-{os.urandom(4).hex()}"
    with tempfile.TemporaryDirectory() as temp_dir:
        test_root = Path(temp_dir)
        test_project_path = str(test_root / "test-remove-with-entities")

        # Make sure the test directory exists
        os.makedirs(test_project_path, exist_ok=True)

        try:
            # Step 1: Add the test project
            await project_service.add_project(test_project_name, test_project_path)

            # Verify project exists
            project = await project_service.get_project(test_project_name)
            assert project is not None

            # Step 2: Create related entities for this project
            from basic_memory.repository.entity_repository import EntityRepository

            entity_repo = EntityRepository(
                project_service.repository.session_maker, project_id=project.id
            )

            entity_data = {
                "title": "Test Entity for Deletion",
                "entity_type": "note",
                "content_type": "text/markdown",
                "project_id": project.id,
                "permalink": "test-deletion-entity",
                "file_path": "test-deletion-entity.md",
                "checksum": "test123",
                "created_at": datetime.now(timezone.utc),
                "updated_at": datetime.now(timezone.utc),
            }
            entity = await entity_repo.create(entity_data)
            assert entity is not None

            # Step 3: Create observations for the entity
            from basic_memory.repository.observation_repository import ObservationRepository

            obs_repo = ObservationRepository(
                project_service.repository.session_maker, project_id=project.id
            )

            observation_data = {
                "entity_id": entity.id,
                "content": "This is a test observation",
                "category": "note",
            }
            observation = await obs_repo.create(observation_data)
            assert observation is not None

            # Step 4: Create relations involving the entity
            from basic_memory.repository.relation_repository import RelationRepository

            rel_repo = RelationRepository(
                project_service.repository.session_maker, project_id=project.id
            )

            relation_data = {
                "from_id": entity.id,
                "to_name": "some-target-entity",
                "relation_type": "relates-to",
            }
            relation = await rel_repo.create(relation_data)
            assert relation is not None

            # Step 5: Attempt to remove the project
            # This should work with proper cascade delete, or fail with foreign key constraint
            await project_service.remove_project(test_project_name)

            # Step 6: Verify everything was properly deleted

            # Project should be gone
            removed_project = await project_service.get_project(test_project_name)
            assert removed_project is None, "Project should have been removed"

            # Related entities should be cascade deleted
            remaining_entity = await entity_repo.find_by_id(entity.id)
            assert remaining_entity is None, "Entity should have been cascade deleted"

            # Observations should be cascade deleted
            remaining_obs = await obs_repo.find_by_id(observation.id)
            assert remaining_obs is None, "Observation should have been cascade deleted"

            # Relations should be cascade deleted
            remaining_rel = await rel_repo.find_by_id(relation.id)
            assert remaining_rel is None, "Relation should have been cascade deleted"

        except Exception as e:
            # Check if this is the specific foreign key constraint error from the bug report
            if "FOREIGN KEY constraint failed" in str(e):
                pytest.fail(
                    f"Bug #254 reproduced: {e}. "
                    "This indicates missing foreign key constraints with CASCADE DELETE. "
                    "Run migration a1b2c3d4e5f6_fix_project_foreign_keys.py to fix this."
                )
            else:
                # Re-raise other unexpected errors
                raise e

        finally:
            # Clean up - remove project if it still exists
            if test_project_name in project_service.projects:
                try:
                    await project_service.remove_project(test_project_name)
                except Exception:
                    # Manual cleanup if remove_project fails
                    try:
                        project_service.config_manager.remove_project(test_project_name)
                    except Exception:
                        pass

                    project = await project_service.get_project(test_project_name)
                    if project:
                        await project_service.repository.delete(project.id)

```

--------------------------------------------------------------------------------
/src/basic_memory/mcp/tools/canvas.py:
--------------------------------------------------------------------------------

```python
"""Canvas creation tool for Basic Memory MCP server.

This tool creates Obsidian canvas files (.canvas) using the JSON Canvas 1.0 spec.
"""

import json
from typing import Dict, List, Any, Optional

from loguru import logger
from fastmcp import Context

from basic_memory.mcp.async_client import get_client
from basic_memory.mcp.project_context import get_active_project
from basic_memory.mcp.server import mcp
from basic_memory.mcp.tools.utils import call_put, call_post, resolve_entity_id
from basic_memory.telemetry import track_mcp_tool


@mcp.tool(
    description="Create an Obsidian canvas file to visualize concepts and connections.",
)
async def canvas(
    nodes: List[Dict[str, Any]],
    edges: List[Dict[str, Any]],
    title: str,
    folder: str,
    project: Optional[str] = None,
    context: Context | None = None,
) -> str:
    """Create an Obsidian canvas file with the provided nodes and edges.

    This tool creates a .canvas file compatible with Obsidian's Canvas feature,
    allowing visualization of relationships between concepts or documents.

    Project Resolution:
    Server resolves projects in this order: Single Project Mode → project parameter → default project.
    If project unknown, use list_memory_projects() or recent_activity() first.

    For the full JSON Canvas 1.0 specification, see the 'spec://canvas' resource.

    Args:
        project: Project name to create canvas in. Optional - server will resolve using hierarchy.
                If unknown, use list_memory_projects() to discover available projects.
        nodes: List of node objects following JSON Canvas 1.0 spec
        edges: List of edge objects following JSON Canvas 1.0 spec
        title: The title of the canvas (will be saved as title.canvas)
        folder: Folder path relative to project root where the canvas should be saved.
                Use forward slashes (/) as separators. Examples: "diagrams", "projects/2025", "visual/maps"
        context: Optional FastMCP context for performance caching.

    Returns:
        A summary of the created canvas file

    Important Notes:
    - When referencing files, use the exact file path as shown in Obsidian
      Example: "folder/Document Name.md" (not permalink format)
    - For file nodes, the "file" attribute must reference an existing file
    - Nodes require id, type, x, y, width, height properties
    - Edges require id, fromNode, toNode properties
    - Position nodes in a logical layout (x,y coordinates in pixels)
    - Use color attributes ("1"-"6" or hex) for visual organization

    Basic Structure:
    ```json
    {
      "nodes": [
        {
          "id": "node1",
          "type": "file",  // Options: "file", "text", "link", "group"
          "file": "folder/Document.md",
          "x": 0,
          "y": 0,
          "width": 400,
          "height": 300
        }
      ],
      "edges": [
        {
          "id": "edge1",
          "fromNode": "node1",
          "toNode": "node2",
          "label": "connects to"
        }
      ]
    }
    ```

    Examples:
        # Create canvas in project
        canvas("my-project", nodes=[...], edges=[...], title="My Canvas", folder="diagrams")

        # Create canvas in work project
        canvas("work-project", nodes=[...], edges=[...], title="Process Flow", folder="visual/maps")

    Raises:
        ToolError: If project doesn't exist or folder path is invalid
    """
    track_mcp_tool("canvas")
    async with get_client() as client:
        active_project = await get_active_project(client, project, context)

        # Ensure path has .canvas extension
        file_title = title if title.endswith(".canvas") else f"{title}.canvas"
        file_path = f"{folder}/{file_title}"

        # Create canvas data structure
        canvas_data = {"nodes": nodes, "edges": edges}

        # Convert to JSON
        canvas_json = json.dumps(canvas_data, indent=2)

        # Try to create the canvas file first (optimistic create)
        logger.info(f"Creating canvas file: {file_path} in project {project}")
        try:
            response = await call_post(
                client,
                f"/v2/projects/{active_project.external_id}/resource",
                json={"file_path": file_path, "content": canvas_json},
            )
            action = "Created"
        except Exception as e:
            # If creation failed due to conflict (already exists), try to update
            if (
                "409" in str(e)
                or "conflict" in str(e).lower()
                or "already exists" in str(e).lower()
            ):
                logger.info(f"Canvas file exists, updating instead: {file_path}")
                try:
                    entity_id = await resolve_entity_id(
                        client, active_project.external_id, file_path
                    )
                    # For update, send content in JSON body
                    response = await call_put(
                        client,
                        f"/v2/projects/{active_project.external_id}/resource/{entity_id}",
                        json={"content": canvas_json},
                    )
                    action = "Updated"
                except Exception as update_error:  # pragma: no cover
                    # Re-raise the original error if update also fails
                    raise e from update_error  # pragma: no cover
            else:
                # Re-raise if it's not a conflict error
                raise  # pragma: no cover

        # Parse response
        result = response.json()
        logger.debug(result)

        # Build summary
        summary = [f"# {action}: {file_path}", "\nThe canvas is ready to open in Obsidian."]

        return "\n".join(summary)

```

--------------------------------------------------------------------------------
/tests/mcp/test_tool_read_content.py:
--------------------------------------------------------------------------------

```python
"""Tests for the read_content MCP tool security validation.

We keep these tests focused on path boundary/security checks, and rely on
`tests/mcp/test_tool_resource.py` for full-stack content-type behavior.
"""

from __future__ import annotations

import pytest
from mcp.server.fastmcp.exceptions import ToolError

from basic_memory.mcp.tools import read_content, write_note


@pytest.mark.asyncio
async def test_read_content_blocks_path_traversal_unix(client, test_project):
    attack_paths = [
        "../secrets.txt",
        "../../etc/passwd",
        "../../../root/.ssh/id_rsa",
        "notes/../../../etc/shadow",
        "folder/../../outside/file.md",
        "../../../../etc/hosts",
        "../../../home/user/.env",
    ]

    for attack_path in attack_paths:
        result = await read_content.fn(project=test_project.name, path=attack_path)
        assert result["type"] == "error"
        assert "paths must stay within project boundaries" in result["error"]
        assert attack_path in result["error"]


@pytest.mark.asyncio
async def test_read_content_blocks_path_traversal_windows(client, test_project):
    attack_paths = [
        "..\\secrets.txt",
        "..\\..\\Windows\\System32\\config\\SAM",
        "notes\\..\\..\\..\\Windows\\System32",
        "\\\\server\\share\\file.txt",
        "..\\..\\Users\\user\\.env",
        "\\\\..\\..\\Windows",
        "..\\..\\..\\Boot.ini",
    ]

    for attack_path in attack_paths:
        result = await read_content.fn(project=test_project.name, path=attack_path)
        assert result["type"] == "error"
        assert "paths must stay within project boundaries" in result["error"]
        assert attack_path in result["error"]


@pytest.mark.asyncio
async def test_read_content_blocks_absolute_paths(client, test_project):
    attack_paths = [
        "/etc/passwd",
        "/home/user/.env",
        "/var/log/auth.log",
        "/root/.ssh/id_rsa",
        "C:\\Windows\\System32\\config\\SAM",
        "C:\\Users\\user\\.env",
        "D:\\secrets\\config.json",
        "/tmp/malicious.txt",
        "/usr/local/bin/evil",
    ]

    for attack_path in attack_paths:
        result = await read_content.fn(project=test_project.name, path=attack_path)
        assert result["type"] == "error"
        assert "paths must stay within project boundaries" in result["error"]
        assert attack_path in result["error"]


@pytest.mark.asyncio
async def test_read_content_blocks_home_directory_access(client, test_project):
    attack_paths = [
        "~/secrets.txt",
        "~/.env",
        "~/.ssh/id_rsa",
        "~/Documents/passwords.txt",
        "~\\AppData\\secrets",
        "~\\Desktop\\config.ini",
        "~/.bashrc",
        "~/Library/Preferences/secret.plist",
    ]

    for attack_path in attack_paths:
        result = await read_content.fn(project=test_project.name, path=attack_path)
        assert result["type"] == "error"
        assert "paths must stay within project boundaries" in result["error"]
        assert attack_path in result["error"]


@pytest.mark.asyncio
async def test_read_content_blocks_memory_url_attacks(client, test_project):
    attack_paths = [
        "memory://../../etc/passwd",
        "memory://../../../root/.ssh/id_rsa",
        "memory://~/.env",
        "memory:///etc/passwd",
    ]

    for attack_path in attack_paths:
        result = await read_content.fn(project=test_project.name, path=attack_path)
        assert result["type"] == "error"
        assert "paths must stay within project boundaries" in result["error"]


@pytest.mark.asyncio
async def test_read_content_unicode_path_attacks(client, test_project):
    unicode_attacks = [
        "notes/文档/../../../etc/passwd",
        "docs/café/../../.env",
        "files/αβγ/../../../secret.txt",
    ]

    for attack_path in unicode_attacks:
        result = await read_content.fn(project=test_project.name, path=attack_path)
        assert result["type"] == "error"
        assert "paths must stay within project boundaries" in result["error"]


@pytest.mark.asyncio
async def test_read_content_very_long_attack_path(client, test_project):
    long_attack = "../" * 1000 + "etc/passwd"
    result = await read_content.fn(project=test_project.name, path=long_attack)
    assert result["type"] == "error"
    assert "paths must stay within project boundaries" in result["error"]


@pytest.mark.asyncio
async def test_read_content_case_variations_attacks(client, test_project):
    case_attacks = [
        "../ETC/passwd",
        "../Etc/PASSWD",
        "..\\WINDOWS\\system32",
        "~/.SSH/id_rsa",
    ]

    for attack_path in case_attacks:
        result = await read_content.fn(project=test_project.name, path=attack_path)
        assert result["type"] == "error"
        assert "paths must stay within project boundaries" in result["error"]


@pytest.mark.asyncio
async def test_read_content_allows_safe_path_integration(client, test_project):
    await write_note.fn(
        project=test_project.name,
        title="Meeting",
        folder="notes",
        content="This is a safe note for read_content()",
    )

    result = await read_content.fn(project=test_project.name, path="notes/meeting")
    assert result["type"] == "text"
    assert "safe note" in result["text"]


@pytest.mark.asyncio
async def test_read_content_empty_path_does_not_trigger_security_error(client, test_project):
    try:
        result = await read_content.fn(project=test_project.name, path="")
        if isinstance(result, dict) and result.get("type") == "error":
            assert "paths must stay within project boundaries" not in result.get("error", "")
    except ToolError:
        # Acceptable: resource resolution may treat empty path as not-found.
        pass

```

--------------------------------------------------------------------------------
/src/basic_memory/mcp/prompts/utils.py:
--------------------------------------------------------------------------------

```python
"""Utility functions for formatting prompt responses.

These utilities help format data from various tools into consistent,
user-friendly markdown summaries.
"""

from dataclasses import dataclass
from textwrap import dedent
from typing import List

from basic_memory.schemas.base import TimeFrame
from basic_memory.schemas.memory import (
    normalize_memory_url,
    EntitySummary,
    RelationSummary,
    ObservationSummary,
)


@dataclass
class PromptContextItem:
    primary_results: List[EntitySummary]
    related_results: List[EntitySummary | RelationSummary | ObservationSummary]


@dataclass
class PromptContext:
    timeframe: TimeFrame
    topic: str
    results: List[PromptContextItem]


def format_prompt_context(context: PromptContext) -> str:
    """Format continuation context into a helpful summary.
    Returns:
        Formatted continuation summary
    """
    if not context.results:  # pragma: no cover
        return dedent(f"""
            # Continuing conversation on: {context.topic}

            This is a memory retrieval session. 
            The supplied query did not return any information specifically on this topic.
            
            ## Opportunity to Capture New Knowledge!
            
            This is an excellent chance to start documenting this topic:
            
            ```python
            await write_note(
                title="{context.topic}",
                content=f'''
                # {context.topic}
                
                ## Overview
                [Summary of what we know about {context.topic}]
                
                ## Key Points
                [Main aspects or components of {context.topic}]
                
                ## Observations
                - [category] [First important observation about {context.topic}]
                - [category] [Second observation about {context.topic}]
                
                ## Relations
                - relates_to [[Related Topic]]
                - part_of [[Broader Context]]
                '''
            )
            ```
            
            ## Other Options
            
            Please use the available basic-memory tools to gather relevant context before responding.
            You can also:
            - Try a different search term
            - Check recent activity with `recent_activity(timeframe="1w")`
            """)

    # Start building our summary with header - add knowledge capture emphasis
    summary = dedent(f"""
        # Continuing conversation on: {context.topic}

        This is a memory retrieval session. 
        
        Please use the available basic-memory tools to gather relevant context before responding. 
        Start by executing one of the suggested commands below to retrieve content.

        Here's what I found from previous conversations:
        
        > **Knowledge Capture Recommendation:** As you continue this conversation, actively look for opportunities to record new information, decisions, or insights that emerge. Use `write_note()` to document important context.
        """)

    # Track what we've added to avoid duplicates
    added_permalinks = set()
    sections = []

    # Process each context
    for context in context.results:  # pyright: ignore
        for primary in context.primary_results:  # pyright: ignore
            if primary.permalink not in added_permalinks:
                primary_permalink = primary.permalink

                added_permalinks.add(primary_permalink)

                # Use permalink if available, otherwise use file_path
                if primary_permalink:
                    memory_url = normalize_memory_url(primary_permalink)
                    read_command = f'read_note("{primary_permalink}")'
                else:
                    memory_url = f"file://{primary.file_path}"
                    read_command = f'read_file("{primary.file_path}")'

                section = dedent(f"""
                    --- {memory_url}

                    ## {primary.title}
                    - **Type**: {primary.type}
                    """)

                # Add creation date
                section += f"- **Created**: {primary.created_at.strftime('%Y-%m-%d %H:%M')}\n"

                # Add content snippet
                if hasattr(primary, "content") and primary.content:  # pyright: ignore
                    content = primary.content or ""  # pyright: ignore  # pragma: no cover
                    if content:  # pragma: no cover
                        section += f"\n**Excerpt**:\n{content}\n"  # pragma: no cover

                section += dedent(f"""

                    You can read this document with: `{read_command}`
                    """)
                sections.append(section)

        if context.related_results:  # pyright: ignore
            section += dedent(  # pyright: ignore
                """   
                ## Related Context
                """
            )

            for related in context.related_results:  # pyright: ignore
                section_content = dedent(f"""
                    - type: **{related.type}**
                    - title: {related.title}
                    """)
                if related.permalink:  # pragma: no cover
                    section_content += (
                        f'You can view this document with: `read_note("{related.permalink}")`'
                    )
                else:  # pragma: no cover
                    section_content += (
                        f'You can view this file with: `read_file("{related.file_path}")`'
                    )

                section += section_content
                sections.append(section)

    # Add all sections
    summary += "\n".join(sections)
    return summary

```
Page 3/19FirstPrevNextLast