#
tokens: 49128/50000 57/416 files (page 2/19)
lines: off (toggle) GitHub
raw markdown copy
This is page 2 of 19. Use http://codebase.md/basicmachines-co/basic-memory?lines=false&page={x} to view the full context.

# Directory Structure

```
├── .claude
│   ├── commands
│   │   ├── release
│   │   │   ├── beta.md
│   │   │   ├── changelog.md
│   │   │   ├── release-check.md
│   │   │   └── release.md
│   │   ├── spec.md
│   │   └── test-live.md
│   └── settings.json
├── .dockerignore
├── .env.example
├── .github
│   ├── dependabot.yml
│   ├── ISSUE_TEMPLATE
│   │   ├── bug_report.md
│   │   ├── config.yml
│   │   ├── documentation.md
│   │   └── feature_request.md
│   └── workflows
│       ├── claude-code-review.yml
│       ├── claude-issue-triage.yml
│       ├── claude.yml
│       ├── dev-release.yml
│       ├── docker.yml
│       ├── pr-title.yml
│       ├── release.yml
│       └── test.yml
├── .gitignore
├── .python-version
├── CHANGELOG.md
├── CITATION.cff
├── CLA.md
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── docker-compose-postgres.yml
├── docker-compose.yml
├── Dockerfile
├── docs
│   ├── ai-assistant-guide-extended.md
│   ├── ARCHITECTURE.md
│   ├── character-handling.md
│   ├── cloud-cli.md
│   ├── Docker.md
│   └── testing-coverage.md
├── justfile
├── LICENSE
├── llms-install.md
├── pyproject.toml
├── README.md
├── SECURITY.md
├── smithery.yaml
├── specs
│   ├── SPEC-1 Specification-Driven Development Process.md
│   ├── SPEC-10 Unified Deployment Workflow and Event Tracking.md
│   ├── SPEC-11 Basic Memory API Performance Optimization.md
│   ├── SPEC-12 OpenTelemetry Observability.md
│   ├── SPEC-13 CLI Authentication with Subscription Validation.md
│   ├── SPEC-14 Cloud Git Versioning & GitHub Backup.md
│   ├── SPEC-14- Cloud Git Versioning & GitHub Backup.md
│   ├── SPEC-15 Configuration Persistence via Tigris for Cloud Tenants.md
│   ├── SPEC-16 MCP Cloud Service Consolidation.md
│   ├── SPEC-17 Semantic Search with ChromaDB.md
│   ├── SPEC-18 AI Memory Management Tool.md
│   ├── SPEC-19 Sync Performance and Memory Optimization.md
│   ├── SPEC-2 Slash Commands Reference.md
│   ├── SPEC-20 Simplified Project-Scoped Rclone Sync.md
│   ├── SPEC-3 Agent Definitions.md
│   ├── SPEC-4 Notes Web UI Component Architecture.md
│   ├── SPEC-5 CLI Cloud Upload via WebDAV.md
│   ├── SPEC-6 Explicit Project Parameter Architecture.md
│   ├── SPEC-7 POC to spike Tigris Turso for local access to cloud data.md
│   ├── SPEC-8 TigrisFS Integration.md
│   ├── SPEC-9 Multi-Project Bidirectional Sync Architecture.md
│   ├── SPEC-9 Signed Header Tenant Information.md
│   └── SPEC-9-1 Follow-Ups- Conflict, Sync, and Observability.md
├── src
│   └── basic_memory
│       ├── __init__.py
│       ├── alembic
│       │   ├── alembic.ini
│       │   ├── env.py
│       │   ├── migrations.py
│       │   ├── script.py.mako
│       │   └── versions
│       │       ├── 314f1ea54dc4_add_postgres_full_text_search_support_.py
│       │       ├── 3dae7c7b1564_initial_schema.py
│       │       ├── 502b60eaa905_remove_required_from_entity_permalink.py
│       │       ├── 5fe1ab1ccebe_add_projects_table.py
│       │       ├── 647e7a75e2cd_project_constraint_fix.py
│       │       ├── 6830751f5fb6_merge_multiple_heads.py
│       │       ├── 9d9c1cb7d8f5_add_mtime_and_size_columns_to_entity_.py
│       │       ├── a1b2c3d4e5f6_fix_project_foreign_keys.py
│       │       ├── a2b3c4d5e6f7_add_search_index_entity_cascade.py
│       │       ├── b3c3938bacdb_relation_to_name_unique_index.py
│       │       ├── cc7172b46608_update_search_index_schema.py
│       │       ├── e7e1f4367280_add_scan_watermark_tracking_to_project.py
│       │       ├── f8a9b2c3d4e5_add_pg_trgm_for_fuzzy_link_resolution.py
│       │       └── g9a0b3c4d5e6_add_external_id_to_project_and_entity.py
│       ├── api
│       │   ├── __init__.py
│       │   ├── app.py
│       │   ├── container.py
│       │   ├── routers
│       │   │   ├── __init__.py
│       │   │   ├── directory_router.py
│       │   │   ├── importer_router.py
│       │   │   ├── knowledge_router.py
│       │   │   ├── management_router.py
│       │   │   ├── memory_router.py
│       │   │   ├── project_router.py
│       │   │   ├── prompt_router.py
│       │   │   ├── resource_router.py
│       │   │   ├── search_router.py
│       │   │   └── utils.py
│       │   ├── template_loader.py
│       │   └── v2
│       │       ├── __init__.py
│       │       └── routers
│       │           ├── __init__.py
│       │           ├── directory_router.py
│       │           ├── importer_router.py
│       │           ├── knowledge_router.py
│       │           ├── memory_router.py
│       │           ├── project_router.py
│       │           ├── prompt_router.py
│       │           ├── resource_router.py
│       │           └── search_router.py
│       ├── cli
│       │   ├── __init__.py
│       │   ├── app.py
│       │   ├── auth.py
│       │   ├── commands
│       │   │   ├── __init__.py
│       │   │   ├── cloud
│       │   │   │   ├── __init__.py
│       │   │   │   ├── api_client.py
│       │   │   │   ├── bisync_commands.py
│       │   │   │   ├── cloud_utils.py
│       │   │   │   ├── core_commands.py
│       │   │   │   ├── rclone_commands.py
│       │   │   │   ├── rclone_config.py
│       │   │   │   ├── rclone_installer.py
│       │   │   │   ├── upload_command.py
│       │   │   │   └── upload.py
│       │   │   ├── command_utils.py
│       │   │   ├── db.py
│       │   │   ├── format.py
│       │   │   ├── import_chatgpt.py
│       │   │   ├── import_claude_conversations.py
│       │   │   ├── import_claude_projects.py
│       │   │   ├── import_memory_json.py
│       │   │   ├── mcp.py
│       │   │   ├── project.py
│       │   │   ├── status.py
│       │   │   ├── telemetry.py
│       │   │   └── tool.py
│       │   ├── container.py
│       │   └── main.py
│       ├── config.py
│       ├── db.py
│       ├── deps
│       │   ├── __init__.py
│       │   ├── config.py
│       │   ├── db.py
│       │   ├── importers.py
│       │   ├── projects.py
│       │   ├── repositories.py
│       │   └── services.py
│       ├── deps.py
│       ├── file_utils.py
│       ├── ignore_utils.py
│       ├── importers
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── chatgpt_importer.py
│       │   ├── claude_conversations_importer.py
│       │   ├── claude_projects_importer.py
│       │   ├── memory_json_importer.py
│       │   └── utils.py
│       ├── markdown
│       │   ├── __init__.py
│       │   ├── entity_parser.py
│       │   ├── markdown_processor.py
│       │   ├── plugins.py
│       │   ├── schemas.py
│       │   └── utils.py
│       ├── mcp
│       │   ├── __init__.py
│       │   ├── async_client.py
│       │   ├── clients
│       │   │   ├── __init__.py
│       │   │   ├── directory.py
│       │   │   ├── knowledge.py
│       │   │   ├── memory.py
│       │   │   ├── project.py
│       │   │   ├── resource.py
│       │   │   └── search.py
│       │   ├── container.py
│       │   ├── project_context.py
│       │   ├── prompts
│       │   │   ├── __init__.py
│       │   │   ├── ai_assistant_guide.py
│       │   │   ├── continue_conversation.py
│       │   │   ├── recent_activity.py
│       │   │   ├── search.py
│       │   │   └── utils.py
│       │   ├── resources
│       │   │   ├── ai_assistant_guide.md
│       │   │   └── project_info.py
│       │   ├── server.py
│       │   └── tools
│       │       ├── __init__.py
│       │       ├── build_context.py
│       │       ├── canvas.py
│       │       ├── chatgpt_tools.py
│       │       ├── delete_note.py
│       │       ├── edit_note.py
│       │       ├── list_directory.py
│       │       ├── move_note.py
│       │       ├── project_management.py
│       │       ├── read_content.py
│       │       ├── read_note.py
│       │       ├── recent_activity.py
│       │       ├── search.py
│       │       ├── utils.py
│       │       ├── view_note.py
│       │       └── write_note.py
│       ├── models
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── knowledge.py
│       │   ├── project.py
│       │   └── search.py
│       ├── project_resolver.py
│       ├── repository
│       │   ├── __init__.py
│       │   ├── entity_repository.py
│       │   ├── observation_repository.py
│       │   ├── postgres_search_repository.py
│       │   ├── project_info_repository.py
│       │   ├── project_repository.py
│       │   ├── relation_repository.py
│       │   ├── repository.py
│       │   ├── search_index_row.py
│       │   ├── search_repository_base.py
│       │   ├── search_repository.py
│       │   └── sqlite_search_repository.py
│       ├── runtime.py
│       ├── schemas
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── cloud.py
│       │   ├── delete.py
│       │   ├── directory.py
│       │   ├── importer.py
│       │   ├── memory.py
│       │   ├── project_info.py
│       │   ├── prompt.py
│       │   ├── request.py
│       │   ├── response.py
│       │   ├── search.py
│       │   ├── sync_report.py
│       │   └── v2
│       │       ├── __init__.py
│       │       ├── entity.py
│       │       └── resource.py
│       ├── services
│       │   ├── __init__.py
│       │   ├── context_service.py
│       │   ├── directory_service.py
│       │   ├── entity_service.py
│       │   ├── exceptions.py
│       │   ├── file_service.py
│       │   ├── initialization.py
│       │   ├── link_resolver.py
│       │   ├── project_service.py
│       │   ├── search_service.py
│       │   └── service.py
│       ├── sync
│       │   ├── __init__.py
│       │   ├── background_sync.py
│       │   ├── coordinator.py
│       │   ├── sync_service.py
│       │   └── watch_service.py
│       ├── telemetry.py
│       ├── templates
│       │   └── prompts
│       │       ├── continue_conversation.hbs
│       │       └── search.hbs
│       └── utils.py
├── test-int
│   ├── BENCHMARKS.md
│   ├── cli
│   │   ├── test_project_commands_integration.py
│   │   └── test_version_integration.py
│   ├── conftest.py
│   ├── mcp
│   │   ├── test_build_context_underscore.py
│   │   ├── test_build_context_validation.py
│   │   ├── test_chatgpt_tools_integration.py
│   │   ├── test_default_project_mode_integration.py
│   │   ├── test_delete_note_integration.py
│   │   ├── test_edit_note_integration.py
│   │   ├── test_lifespan_shutdown_sync_task_cancellation_integration.py
│   │   ├── test_list_directory_integration.py
│   │   ├── test_move_note_integration.py
│   │   ├── test_project_management_integration.py
│   │   ├── test_project_state_sync_integration.py
│   │   ├── test_read_content_integration.py
│   │   ├── test_read_note_integration.py
│   │   ├── test_search_integration.py
│   │   ├── test_single_project_mcp_integration.py
│   │   └── test_write_note_integration.py
│   ├── test_db_wal_mode.py
│   └── test_disable_permalinks_integration.py
├── tests
│   ├── __init__.py
│   ├── api
│   │   ├── conftest.py
│   │   ├── test_api_container.py
│   │   ├── test_async_client.py
│   │   ├── test_continue_conversation_template.py
│   │   ├── test_directory_router.py
│   │   ├── test_importer_router.py
│   │   ├── test_knowledge_router.py
│   │   ├── test_management_router.py
│   │   ├── test_memory_router.py
│   │   ├── test_project_router_operations.py
│   │   ├── test_project_router.py
│   │   ├── test_prompt_router.py
│   │   ├── test_relation_background_resolution.py
│   │   ├── test_resource_router.py
│   │   ├── test_search_router.py
│   │   ├── test_search_template.py
│   │   ├── test_template_loader_helpers.py
│   │   ├── test_template_loader.py
│   │   └── v2
│   │       ├── __init__.py
│   │       ├── conftest.py
│   │       ├── test_directory_router.py
│   │       ├── test_importer_router.py
│   │       ├── test_knowledge_router.py
│   │       ├── test_memory_router.py
│   │       ├── test_project_router.py
│   │       ├── test_prompt_router.py
│   │       ├── test_resource_router.py
│   │       └── test_search_router.py
│   ├── cli
│   │   ├── cloud
│   │   │   ├── test_cloud_api_client_and_utils.py
│   │   │   ├── test_rclone_config_and_bmignore_filters.py
│   │   │   └── test_upload_path.py
│   │   ├── conftest.py
│   │   ├── test_auth_cli_auth.py
│   │   ├── test_cli_container.py
│   │   ├── test_cli_exit.py
│   │   ├── test_cli_tool_exit.py
│   │   ├── test_cli_tools.py
│   │   ├── test_cloud_authentication.py
│   │   ├── test_ignore_utils.py
│   │   ├── test_import_chatgpt.py
│   │   ├── test_import_claude_conversations.py
│   │   ├── test_import_claude_projects.py
│   │   ├── test_import_memory_json.py
│   │   ├── test_project_add_with_local_path.py
│   │   └── test_upload.py
│   ├── conftest.py
│   ├── db
│   │   └── test_issue_254_foreign_key_constraints.py
│   ├── importers
│   │   ├── test_conversation_indexing.py
│   │   ├── test_importer_base.py
│   │   └── test_importer_utils.py
│   ├── markdown
│   │   ├── __init__.py
│   │   ├── test_date_frontmatter_parsing.py
│   │   ├── test_entity_parser_error_handling.py
│   │   ├── test_entity_parser.py
│   │   ├── test_markdown_plugins.py
│   │   ├── test_markdown_processor.py
│   │   ├── test_observation_edge_cases.py
│   │   ├── test_parser_edge_cases.py
│   │   ├── test_relation_edge_cases.py
│   │   └── test_task_detection.py
│   ├── mcp
│   │   ├── clients
│   │   │   ├── __init__.py
│   │   │   └── test_clients.py
│   │   ├── conftest.py
│   │   ├── test_async_client_modes.py
│   │   ├── test_mcp_container.py
│   │   ├── test_obsidian_yaml_formatting.py
│   │   ├── test_permalink_collision_file_overwrite.py
│   │   ├── test_project_context.py
│   │   ├── test_prompts.py
│   │   ├── test_recent_activity_prompt_modes.py
│   │   ├── test_resources.py
│   │   ├── test_server_lifespan_branches.py
│   │   ├── test_tool_build_context.py
│   │   ├── test_tool_canvas.py
│   │   ├── test_tool_delete_note.py
│   │   ├── test_tool_edit_note.py
│   │   ├── test_tool_list_directory.py
│   │   ├── test_tool_move_note.py
│   │   ├── test_tool_project_management.py
│   │   ├── test_tool_read_content.py
│   │   ├── test_tool_read_note.py
│   │   ├── test_tool_recent_activity.py
│   │   ├── test_tool_resource.py
│   │   ├── test_tool_search.py
│   │   ├── test_tool_utils.py
│   │   ├── test_tool_view_note.py
│   │   ├── test_tool_write_note_kebab_filenames.py
│   │   ├── test_tool_write_note.py
│   │   └── tools
│   │       └── test_chatgpt_tools.py
│   ├── Non-MarkdownFileSupport.pdf
│   ├── README.md
│   ├── repository
│   │   ├── test_entity_repository_upsert.py
│   │   ├── test_entity_repository.py
│   │   ├── test_entity_upsert_issue_187.py
│   │   ├── test_observation_repository.py
│   │   ├── test_postgres_search_repository.py
│   │   ├── test_project_info_repository.py
│   │   ├── test_project_repository.py
│   │   ├── test_relation_repository.py
│   │   ├── test_repository.py
│   │   ├── test_search_repository_edit_bug_fix.py
│   │   └── test_search_repository.py
│   ├── schemas
│   │   ├── test_base_timeframe_minimum.py
│   │   ├── test_memory_serialization.py
│   │   ├── test_memory_url_validation.py
│   │   ├── test_memory_url.py
│   │   ├── test_relation_response_reference_resolution.py
│   │   ├── test_schemas.py
│   │   └── test_search.py
│   ├── Screenshot.png
│   ├── services
│   │   ├── test_context_service.py
│   │   ├── test_directory_service.py
│   │   ├── test_entity_service_disable_permalinks.py
│   │   ├── test_entity_service.py
│   │   ├── test_file_service.py
│   │   ├── test_initialization_cloud_mode_branches.py
│   │   ├── test_initialization.py
│   │   ├── test_link_resolver.py
│   │   ├── test_project_removal_bug.py
│   │   ├── test_project_service_operations.py
│   │   ├── test_project_service.py
│   │   └── test_search_service.py
│   ├── sync
│   │   ├── test_character_conflicts.py
│   │   ├── test_coordinator.py
│   │   ├── test_sync_service_incremental.py
│   │   ├── test_sync_service.py
│   │   ├── test_sync_wikilink_issue.py
│   │   ├── test_tmp_files.py
│   │   ├── test_watch_service_atomic_adds.py
│   │   ├── test_watch_service_edge_cases.py
│   │   ├── test_watch_service_reload.py
│   │   └── test_watch_service.py
│   ├── test_config.py
│   ├── test_deps.py
│   ├── test_production_cascade_delete.py
│   ├── test_project_resolver.py
│   ├── test_rclone_commands.py
│   ├── test_runtime.py
│   ├── test_telemetry.py
│   └── utils
│       ├── test_file_utils.py
│       ├── test_frontmatter_obsidian_compatible.py
│       ├── test_parse_tags.py
│       ├── test_permalink_formatting.py
│       ├── test_timezone_utils.py
│       ├── test_utf8_handling.py
│       └── test_validate_project_path.py
└── uv.lock
```

# Files

--------------------------------------------------------------------------------
/src/basic_memory/api/routers/management_router.py:
--------------------------------------------------------------------------------

```python
"""Management router for basic-memory API."""

import asyncio

from fastapi import APIRouter, Request
from loguru import logger
from pydantic import BaseModel

from basic_memory.config import ConfigManager
from basic_memory.deps import SyncServiceDep, ProjectRepositoryDep

router = APIRouter(prefix="/management", tags=["management"])


class WatchStatusResponse(BaseModel):
    """Response model for watch status."""

    running: bool
    """Whether the watch service is currently running."""


@router.get("/watch/status", response_model=WatchStatusResponse)
async def get_watch_status(request: Request) -> WatchStatusResponse:
    """Get the current status of the watch service."""
    return WatchStatusResponse(
        running=request.app.state.watch_task is not None and not request.app.state.watch_task.done()
    )


@router.post("/watch/start", response_model=WatchStatusResponse)
async def start_watch_service(
    request: Request, project_repository: ProjectRepositoryDep, sync_service: SyncServiceDep
) -> WatchStatusResponse:
    """Start the watch service if it's not already running."""

    # needed because of circular imports from sync -> app
    from basic_memory.sync import WatchService
    from basic_memory.sync.background_sync import create_background_sync_task

    if request.app.state.watch_task is not None and not request.app.state.watch_task.done():
        # Watch service is already running
        return WatchStatusResponse(running=True)

    app_config = ConfigManager().config

    # Create and start a new watch service
    logger.info("Starting watch service via management API")

    # Get services needed for the watch task
    watch_service = WatchService(
        app_config=app_config,
        project_repository=project_repository,
    )

    # Create and store the task
    watch_task = create_background_sync_task(sync_service, watch_service)
    request.app.state.watch_task = watch_task

    return WatchStatusResponse(running=True)


@router.post("/watch/stop", response_model=WatchStatusResponse)
async def stop_watch_service(request: Request) -> WatchStatusResponse:  # pragma: no cover
    """Stop the watch service if it's running."""
    if request.app.state.watch_task is None or request.app.state.watch_task.done():
        # Watch service is not running
        return WatchStatusResponse(running=False)

    # Cancel the running task
    logger.info("Stopping watch service via management API")
    request.app.state.watch_task.cancel()

    # Wait for it to be properly cancelled
    try:
        await request.app.state.watch_task
    except asyncio.CancelledError:
        pass

    request.app.state.watch_task = None
    return WatchStatusResponse(running=False)

```

--------------------------------------------------------------------------------
/tests/sync/test_sync_wikilink_issue.py:
--------------------------------------------------------------------------------

```python
"""Test for issue #72 - notes with wikilinks staying in modified status."""

from pathlib import Path

import pytest

from basic_memory.sync.sync_service import SyncService


async def create_test_file(path: Path, content: str) -> None:
    """Create a test file with given content."""
    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(content)


async def force_full_scan(sync_service: SyncService) -> None:
    """Force next sync to do a full scan by clearing watermark (for testing moves/deletions)."""
    if sync_service.entity_repository.project_id is not None:
        project = await sync_service.project_repository.find_by_id(
            sync_service.entity_repository.project_id
        )
        if project:
            await sync_service.project_repository.update(
                project.id,
                {
                    "last_scan_timestamp": None,
                    "last_file_count": None,
                },
            )


@pytest.mark.asyncio
async def test_wikilink_modified_status_issue(sync_service: SyncService, project_config):
    """Test that files with wikilinks don't remain in modified status after sync."""
    project_dir = project_config.home

    # Create a file with a wikilink
    content = """---
title: Test Wikilink
type: note
---
# Test File

This file contains a wikilink to [[another-file]].
"""
    test_file_path = project_dir / "test_wikilink.md"
    await create_test_file(test_file_path, content)

    # Initial sync
    report1 = await sync_service.sync(project_config.home)
    assert "test_wikilink.md" in report1.new
    assert "test_wikilink.md" not in report1.modified

    # Sync again without changing the file - should not be modified
    report2 = await sync_service.sync(project_config.home)
    assert "test_wikilink.md" not in report2.new
    assert "test_wikilink.md" not in report2.modified

    # Create the target file
    target_content = """---
title: Another File
type: note
---
# Another File

This is the target file.
"""
    target_file_path = project_dir / "another_file.md"
    await create_test_file(target_file_path, target_content)

    # Force full scan to detect the new file
    # (file just created may not be newer than watermark due to timing precision)
    await force_full_scan(sync_service)

    # Sync again after adding target file
    report3 = await sync_service.sync(project_config.home)
    assert "another_file.md" in report3.new
    assert "test_wikilink.md" not in report3.modified

    # Sync one more time - both files should now be stable
    report4 = await sync_service.sync(project_config.home)
    assert "test_wikilink.md" not in report4.modified
    assert "another_file.md" not in report4.modified

```

--------------------------------------------------------------------------------
/tests/mcp/test_async_client_modes.py:
--------------------------------------------------------------------------------

```python
from contextlib import asynccontextmanager

import httpx
import pytest

from basic_memory.cli.auth import CLIAuth
from basic_memory.mcp import async_client as async_client_module
from basic_memory.mcp.async_client import get_client, set_client_factory


@pytest.fixture(autouse=True)
def _reset_async_client_factory():
    async_client_module._client_factory = None
    yield
    async_client_module._client_factory = None


@pytest.mark.asyncio
async def test_get_client_uses_injected_factory(monkeypatch):
    seen = {"used": False}

    @asynccontextmanager
    async def factory():
        seen["used"] = True
        async with httpx.AsyncClient(base_url="https://example.test") as client:
            yield client

    # Ensure we don't leak factory to other tests
    set_client_factory(factory)
    async with get_client() as client:
        assert str(client.base_url) == "https://example.test"
    assert seen["used"] is True


@pytest.mark.asyncio
async def test_get_client_cloud_mode_injects_auth_header(config_manager, config_home):
    cfg = config_manager.load_config()
    cfg.cloud_mode = True
    cfg.cloud_host = "https://cloud.example.test"
    cfg.cloud_client_id = "cid"
    cfg.cloud_domain = "https://auth.example.test"
    config_manager.save_config(cfg)

    # Write token for CLIAuth so get_client() can authenticate without network
    auth = CLIAuth(client_id=cfg.cloud_client_id, authkit_domain=cfg.cloud_domain)
    auth.token_file.parent.mkdir(parents=True, exist_ok=True)
    auth.token_file.write_text(
        '{"access_token":"token-123","refresh_token":null,"expires_at":9999999999,"token_type":"Bearer"}',
        encoding="utf-8",
    )

    async with get_client() as client:
        assert str(client.base_url).rstrip("/") == "https://cloud.example.test/proxy"
        assert client.headers.get("Authorization") == "Bearer token-123"


@pytest.mark.asyncio
async def test_get_client_cloud_mode_raises_when_not_authenticated(config_manager):
    cfg = config_manager.load_config()
    cfg.cloud_mode = True
    cfg.cloud_host = "https://cloud.example.test"
    cfg.cloud_client_id = "cid"
    cfg.cloud_domain = "https://auth.example.test"
    config_manager.save_config(cfg)

    # No token file written -> should raise
    with pytest.raises(RuntimeError, match="Cloud mode enabled but not authenticated"):
        async with get_client():
            pass


@pytest.mark.asyncio
async def test_get_client_local_mode_uses_asgi_transport(config_manager):
    cfg = config_manager.load_config()
    cfg.cloud_mode = False
    config_manager.save_config(cfg)

    async with get_client() as client:
        # httpx stores ASGITransport privately, but we can still sanity-check type
        assert isinstance(client._transport, httpx.ASGITransport)  # pyright: ignore[reportPrivateUsage]

```

--------------------------------------------------------------------------------
/src/basic_memory/api/routers/directory_router.py:
--------------------------------------------------------------------------------

```python
"""Router for directory tree operations."""

from typing import List, Optional

from fastapi import APIRouter, Query

from basic_memory.deps import DirectoryServiceDep, ProjectIdDep
from basic_memory.schemas.directory import DirectoryNode

router = APIRouter(prefix="/directory", tags=["directory"])


@router.get("/tree", response_model=DirectoryNode, response_model_exclude_none=True)
async def get_directory_tree(
    directory_service: DirectoryServiceDep,
    project_id: ProjectIdDep,
):
    """Get hierarchical directory structure from the knowledge base.

    Args:
        directory_service: Service for directory operations
        project_id: ID of the current project

    Returns:
        DirectoryNode representing the root of the hierarchical tree structure
    """
    # Get a hierarchical directory tree for the specific project
    tree = await directory_service.get_directory_tree()

    # Return the hierarchical tree
    return tree


@router.get("/structure", response_model=DirectoryNode, response_model_exclude_none=True)
async def get_directory_structure(
    directory_service: DirectoryServiceDep,
    project_id: ProjectIdDep,
):
    """Get folder structure for navigation (no files).

    Optimized endpoint for folder tree navigation. Returns only directory nodes
    without file metadata. For full tree with files, use /directory/tree.

    Args:
        directory_service: Service for directory operations
        project_id: ID of the current project

    Returns:
        DirectoryNode tree containing only folders (type="directory")
    """
    structure = await directory_service.get_directory_structure()
    return structure


@router.get("/list", response_model=List[DirectoryNode], response_model_exclude_none=True)
async def list_directory(
    directory_service: DirectoryServiceDep,
    project_id: ProjectIdDep,
    dir_name: str = Query("/", description="Directory path to list"),
    depth: int = Query(1, ge=1, le=10, description="Recursion depth (1-10)"),
    file_name_glob: Optional[str] = Query(
        None, description="Glob pattern for filtering file names"
    ),
):
    """List directory contents with filtering and depth control.

    Args:
        directory_service: Service for directory operations
        project_id: ID of the current project
        dir_name: Directory path to list (default: root "/")
        depth: Recursion depth (1-10, default: 1 for immediate children only)
        file_name_glob: Optional glob pattern for filtering file names (e.g., "*.md", "*meeting*")

    Returns:
        List of DirectoryNode objects matching the criteria
    """
    # Get directory listing with filtering
    nodes = await directory_service.list_directory(
        dir_name=dir_name,
        depth=depth,
        file_name_glob=file_name_glob,
    )

    return nodes

```

--------------------------------------------------------------------------------
/src/basic_memory/cli/commands/mcp.py:
--------------------------------------------------------------------------------

```python
"""MCP server command with streamable HTTP transport."""

import os
import typer
from typing import Optional

from basic_memory.cli.app import app
from basic_memory.config import ConfigManager, init_mcp_logging

# Import mcp instance (has lifespan that handles initialization and file sync)
from basic_memory.mcp.server import mcp as mcp_server  # pragma: no cover

# Import mcp tools to register them
import basic_memory.mcp.tools  # noqa: F401  # pragma: no cover

# Import prompts to register them
import basic_memory.mcp.prompts  # noqa: F401  # pragma: no cover
from loguru import logger

config = ConfigManager().config

if not config.cloud_mode_enabled:

    @app.command()
    def mcp(
        transport: str = typer.Option(
            "stdio", help="Transport type: stdio, streamable-http, or sse"
        ),
        host: str = typer.Option(
            "0.0.0.0", help="Host for HTTP transports (use 0.0.0.0 to allow external connections)"
        ),
        port: int = typer.Option(8000, help="Port for HTTP transports"),
        path: str = typer.Option("/mcp", help="Path prefix for streamable-http transport"),
        project: Optional[str] = typer.Option(None, help="Restrict MCP server to single project"),
    ):  # pragma: no cover
        """Run the MCP server with configurable transport options.

        This command starts an MCP server using one of three transport options:

        - stdio: Standard I/O (good for local usage)
        - streamable-http: Recommended for web deployments (default)
        - sse: Server-Sent Events (for compatibility with existing clients)

        Initialization, file sync, and cleanup are handled by the MCP server's lifespan.
        """
        # Initialize logging for MCP (file only, stdout breaks protocol)
        init_mcp_logging()

        # Validate and set project constraint if specified
        if project:
            config_manager = ConfigManager()
            project_name, _ = config_manager.get_project(project)
            if not project_name:
                typer.echo(f"No project found named: {project}", err=True)
                raise typer.Exit(1)

            # Set env var with validated project name
            os.environ["BASIC_MEMORY_MCP_PROJECT"] = project_name
            logger.info(f"MCP server constrained to project: {project_name}")

        # Run the MCP server (blocks)
        # Lifespan handles: initialization, migrations, file sync, cleanup
        logger.info(f"Starting MCP server with {transport.upper()} transport")

        if transport == "stdio":
            mcp_server.run(
                transport=transport,
            )
        elif transport == "streamable-http" or transport == "sse":
            mcp_server.run(
                transport=transport,
                host=host,
                port=port,
                path=path,
                log_level="INFO",
            )

```

--------------------------------------------------------------------------------
/specs/SPEC-2 Slash Commands Reference.md:
--------------------------------------------------------------------------------

```markdown
---
title: 'SPEC-2: Slash Commands Reference'
type: spec
permalink: specs/spec-2-slash-commands-reference
tags:
- commands
- process
- reference
---

# SPEC-2: Slash Commands Reference

This document defines the slash commands used in our specification-driven development process.

## /spec create [name]

**Purpose**: Create a new specification document

**Usage**: `/spec create notes-decomposition`

**Process**:
1. Create new spec document in `/specs` folder
2. Use SPEC-XXX numbering format (auto-increment)
3. Include standard spec template:
   - Why (reasoning/problem)
   - What (affected areas)
   - How (high-level approach)
   - How to Evaluate (testing/validation)
4. Tag appropriately for knowledge graph
5. Link to related specs/components

**Template**:
```markdown
# SPEC-XXX: [Title]

## Why
[Problem statement and reasoning]

## What
[What is affected or changed]

## How (High Level)
[Approach to implementation]

## How to Evaluate
[Testing/validation procedure]

## Notes
[Additional context as needed]
```

## /spec status

**Purpose**: Show current status of all specifications

**Usage**: `/spec status`

**Process**:
1. Search all specs in `/specs` folder
2. Display table showing:
   - Spec number and title
   - Status (draft, approved, implementing, complete)
   - Assigned agent (if any)
   - Last updated
   - Dependencies

## /spec implement [name]

**Purpose**: Hand specification to appropriate agent for implementation

**Usage**: `/spec implement SPEC-002`

**Process**:
1. Read the specified spec
2. Analyze requirements to determine appropriate agent:
   - Frontend components → vue-developer
   - Architecture/system design → system-architect  
   - Backend/API → python-developer
3. Launch agent with spec context
4. Agent creates implementation plan
5. Update spec with implementation status

## /spec review [name]

**Purpose**: Review implementation against specification criteria

**Usage**: `/spec review SPEC-002`

**Process**:
1. Read original spec and "How to Evaluate" section
2. Examine current implementation
3. Test against success criteria
4. Document gaps or issues
5. Update spec with review results
6. Recommend next actions (complete, revise, iterate)

## Command Extensions

As the process evolves, we may add:
- `/spec link [spec1] [spec2]` - Create dependency links
- `/spec archive [name]` - Archive completed specs
- `/spec template [type]` - Create spec from template
- `/spec search [query]` - Search spec content

## References

- Claude Slash commands: https://docs.anthropic.com/en/docs/claude-code/slash-commands

## Creating a command

Commands are implemented as Claude slash commands: 

Location in repo: .claude/commands/

In the following example, we create the /optimize command:
```bash
# Create a project command
mkdir -p .claude/commands
echo "Analyze this code for performance issues and suggest optimizations:" > .claude/commands/optimize.md
```

```

--------------------------------------------------------------------------------
/src/basic_memory/repository/observation_repository.py:
--------------------------------------------------------------------------------

```python
"""Repository for managing Observation objects."""

from typing import Dict, List, Sequence


from sqlalchemy import select
from sqlalchemy.ext.asyncio import async_sessionmaker

from basic_memory.models import Observation
from basic_memory.repository.repository import Repository


class ObservationRepository(Repository[Observation]):
    """Repository for Observation model with memory-specific operations."""

    def __init__(self, session_maker: async_sessionmaker, project_id: int):
        """Initialize with session maker and project_id filter.

        Args:
            session_maker: SQLAlchemy session maker
            project_id: Project ID to filter all operations by
        """
        super().__init__(session_maker, Observation, project_id=project_id)

    async def find_by_entity(self, entity_id: int) -> Sequence[Observation]:
        """Find all observations for a specific entity."""
        query = select(Observation).filter(Observation.entity_id == entity_id)
        result = await self.execute_query(query)
        return result.scalars().all()

    async def find_by_context(self, context: str) -> Sequence[Observation]:
        """Find observations with a specific context."""
        query = select(Observation).filter(Observation.context == context)
        result = await self.execute_query(query)
        return result.scalars().all()

    async def find_by_category(self, category: str) -> Sequence[Observation]:
        """Find observations with a specific context."""
        query = select(Observation).filter(Observation.category == category)
        result = await self.execute_query(query)
        return result.scalars().all()

    async def observation_categories(self) -> Sequence[str]:
        """Return a list of all observation categories."""
        query = select(Observation.category).distinct()
        result = await self.execute_query(query, use_query_options=False)
        return result.scalars().all()

    async def find_by_entities(self, entity_ids: List[int]) -> Dict[int, List[Observation]]:
        """Find all observations for multiple entities in a single query.

        Args:
            entity_ids: List of entity IDs to fetch observations for

        Returns:
            Dictionary mapping entity_id to list of observations
        """
        if not entity_ids:  # pragma: no cover
            return {}

        # Query observations for all entities in the list
        query = select(Observation).filter(Observation.entity_id.in_(entity_ids))
        result = await self.execute_query(query)
        observations = result.scalars().all()

        # Group observations by entity_id
        observations_by_entity = {}
        for obs in observations:
            if obs.entity_id not in observations_by_entity:
                observations_by_entity[obs.entity_id] = []
            observations_by_entity[obs.entity_id].append(obs)

        return observations_by_entity

```

--------------------------------------------------------------------------------
/src/basic_memory/cli/app.py:
--------------------------------------------------------------------------------

```python
# Suppress Logfire "not configured" warning - we only use Logfire in cloud/server contexts
import os

os.environ.setdefault("LOGFIRE_IGNORE_NO_CONFIG", "1")

# Remove loguru's default handler IMMEDIATELY, before any other imports.
# This prevents DEBUG logs from appearing on stdout during module-level
# initialization (e.g., template_loader.TemplateLoader() logs at DEBUG level).
from loguru import logger

logger.remove()

from typing import Optional  # noqa: E402

import typer  # noqa: E402

from basic_memory.cli.container import CliContainer, set_container  # noqa: E402
from basic_memory.config import init_cli_logging  # noqa: E402
from basic_memory.telemetry import show_notice_if_needed, track_app_started  # noqa: E402


def version_callback(value: bool) -> None:
    """Show version and exit."""
    if value:  # pragma: no cover
        import basic_memory

        typer.echo(f"Basic Memory version: {basic_memory.__version__}")
        raise typer.Exit()


app = typer.Typer(name="basic-memory")


@app.callback()
def app_callback(
    ctx: typer.Context,
    version: Optional[bool] = typer.Option(
        None,
        "--version",
        "-v",
        help="Show version and exit.",
        callback=version_callback,
        is_eager=True,
    ),
) -> None:
    """Basic Memory - Local-first personal knowledge management."""

    # Initialize logging for CLI (file only, no stdout)
    init_cli_logging()

    # --- Composition Root ---
    # Create container and read config (single point of config access)
    container = CliContainer.create()
    set_container(container)

    # Show telemetry notice and track CLI startup
    # Skip for 'mcp' command - it handles its own telemetry in lifespan
    # Skip for 'telemetry' command - avoid issues when user is managing telemetry
    if ctx.invoked_subcommand not in {"mcp", "telemetry"}:
        show_notice_if_needed()
        track_app_started("cli")

    # Run initialization for commands that don't use the API
    # Skip for 'mcp' command - it has its own lifespan that handles initialization
    # Skip for API-using commands (status, sync, etc.) - they handle initialization via deps.py
    # Skip for 'reset' command - it manages its own database lifecycle
    skip_init_commands = {"mcp", "status", "sync", "project", "tool", "reset"}
    if (
        not version
        and ctx.invoked_subcommand is not None
        and ctx.invoked_subcommand not in skip_init_commands
    ):
        from basic_memory.services.initialization import ensure_initialization

        ensure_initialization(container.config)


## import
# Register sub-command groups
import_app = typer.Typer(help="Import data from various sources")
app.add_typer(import_app, name="import")

claude_app = typer.Typer(help="Import Conversations from Claude JSON export.")
import_app.add_typer(claude_app, name="claude")


## cloud

cloud_app = typer.Typer(help="Access Basic Memory Cloud")
app.add_typer(cloud_app, name="cloud")

```

--------------------------------------------------------------------------------
/src/basic_memory/api/routers/memory_router.py:
--------------------------------------------------------------------------------

```python
"""Routes for memory:// URI operations."""

from typing import Annotated, Optional

from fastapi import APIRouter, Query
from loguru import logger

from basic_memory.deps import ContextServiceDep, EntityRepositoryDep
from basic_memory.schemas.base import TimeFrame, parse_timeframe
from basic_memory.schemas.memory import (
    GraphContext,
    normalize_memory_url,
)
from basic_memory.schemas.search import SearchItemType
from basic_memory.api.routers.utils import to_graph_context

router = APIRouter(prefix="/memory", tags=["memory"])


@router.get("/recent", response_model=GraphContext)
async def recent(
    context_service: ContextServiceDep,
    entity_repository: EntityRepositoryDep,
    type: Annotated[list[SearchItemType] | None, Query()] = None,
    depth: int = 1,
    timeframe: TimeFrame = "7d",
    page: int = 1,
    page_size: int = 10,
    max_related: int = 10,
) -> GraphContext:
    # return all types by default
    types = (
        [SearchItemType.ENTITY, SearchItemType.RELATION, SearchItemType.OBSERVATION]
        if not type
        else type
    )

    logger.debug(
        f"Getting recent context: `{types}` depth: `{depth}` timeframe: `{timeframe}` page: `{page}` page_size: `{page_size}` max_related: `{max_related}`"
    )
    # Parse timeframe
    since = parse_timeframe(timeframe)
    limit = page_size
    offset = (page - 1) * page_size

    # Build context
    context = await context_service.build_context(
        types=types, depth=depth, since=since, limit=limit, offset=offset, max_related=max_related
    )
    recent_context = await to_graph_context(
        context, entity_repository=entity_repository, page=page, page_size=page_size
    )
    logger.debug(f"Recent context: {recent_context.model_dump_json()}")
    return recent_context


# get_memory_context needs to be declared last so other paths can match


@router.get("/{uri:path}", response_model=GraphContext)
async def get_memory_context(
    context_service: ContextServiceDep,
    entity_repository: EntityRepositoryDep,
    uri: str,
    depth: int = 1,
    timeframe: Optional[TimeFrame] = None,
    page: int = 1,
    page_size: int = 10,
    max_related: int = 10,
) -> GraphContext:
    """Get rich context from memory:// URI."""
    # add the project name from the config to the url as the "host
    # Parse URI
    logger.debug(
        f"Getting context for URI: `{uri}` depth: `{depth}` timeframe: `{timeframe}` page: `{page}` page_size: `{page_size}` max_related: `{max_related}`"
    )
    memory_url = normalize_memory_url(uri)

    # Parse timeframe
    since = parse_timeframe(timeframe) if timeframe else None
    limit = page_size
    offset = (page - 1) * page_size

    # Build context
    context = await context_service.build_context(
        memory_url, depth=depth, since=since, limit=limit, offset=offset, max_related=max_related
    )
    return await to_graph_context(
        context, entity_repository=entity_repository, page=page, page_size=page_size
    )

```

--------------------------------------------------------------------------------
/.github/workflows/claude.yml:
--------------------------------------------------------------------------------

```yaml
name: Claude Code

on:
  issue_comment:
    types: [created]
  pull_request_review_comment:
    types: [created]
  issues:
    types: [opened, assigned]
  pull_request_review:
    types: [submitted]
  pull_request_target:
    types: [opened, synchronize]

jobs:
  claude:
    if: |
      (
        (github.event_name == 'issue_comment' && contains(github.event.comment.body, '@claude')) ||
        (github.event_name == 'pull_request_review_comment' && contains(github.event.comment.body, '@claude')) ||
        (github.event_name == 'pull_request_review' && contains(github.event.review.body, '@claude')) ||
        (github.event_name == 'issues' && (contains(github.event.issue.body, '@claude') || contains(github.event.issue.title, '@claude'))) ||
        (github.event_name == 'pull_request_target' && contains(github.event.pull_request.body, '@claude'))
      ) && (
        github.event.comment.author_association == 'OWNER' ||
        github.event.comment.author_association == 'MEMBER' ||
        github.event.comment.author_association == 'COLLABORATOR' ||
        github.event.sender.author_association == 'OWNER' ||
        github.event.sender.author_association == 'MEMBER' ||
        github.event.sender.author_association == 'COLLABORATOR' ||
        github.event.pull_request.author_association == 'OWNER' ||
        github.event.pull_request.author_association == 'MEMBER' ||
        github.event.pull_request.author_association == 'COLLABORATOR'
      )
    runs-on: ubuntu-latest
    permissions:
      contents: read
      pull-requests: read
      issues: read
      id-token: write
      actions: read # Required for Claude to read CI results on PRs
    steps:
      - name: Checkout repository
        uses: actions/checkout@v4
        with:
          # For pull_request_target, checkout the PR head to review the actual changes
          ref: ${{ github.event_name == 'pull_request_target' && github.event.pull_request.head.sha || github.sha }}
          fetch-depth: 1

      - name: Run Claude Code
        id: claude
        uses: anthropics/claude-code-action@v1
        with:
          claude_code_oauth_token: ${{ secrets.CLAUDE_CODE_OAUTH_TOKEN }}
          track_progress: true  # Enable visual progress tracking
          
          # This is an optional setting that allows Claude to read CI results on PRs
          additional_permissions: |
            actions: read

          # Optional: Give a custom prompt to Claude. If this is not specified, Claude will perform the instructions specified in the comment that tagged it.
          # prompt: 'Update the pull request description to include a summary of changes.'

          # Optional: Add claude_args to customize behavior and configuration
          # See https://github.com/anthropics/claude-code-action/blob/main/docs/usage.md
          # or https://docs.claude.com/en/docs/claude-code/sdk#command-line for available options
          # claude_args: '--model claude-opus-4-1-20250805 --allowed-tools Bash(gh pr:*)'


```

--------------------------------------------------------------------------------
/specs/SPEC-3 Agent Definitions.md:
--------------------------------------------------------------------------------

```markdown
---
title: 'SPEC-3: Agent Definitions'
type: spec
permalink: specs/spec-3-agent-definitions
tags:
- agents
- roles
- process
---

# SPEC-3: Agent Definitions

This document defines the specialist agents used in our specification-driven development process.

## system-architect

**Role**: High-level system design and architectural decisions

**Responsibilities**:
- Create architectural specifications and ADRs
- Analyze system-wide impacts and trade-offs
- Design component interfaces and data flow
- Evaluate technical approaches and patterns
- Document architectural decisions and rationale

**Expertise Areas**:
- System architecture and design patterns
- Technology evaluation and selection
- Scalability and performance considerations
- Integration patterns and API design
- Technical debt and refactoring strategies

**Typical Specs**:
- System architecture overviews
- Component decomposition strategies
- Data flow and state management
- Integration and deployment patterns

## vue-developer

**Role**: Frontend component development and UI implementation

**Responsibilities**:
- Create Vue.js component specifications
- Implement responsive UI components
- Design component APIs and interfaces
- Optimize for performance and accessibility
- Document component usage and patterns

**Expertise Areas**:
- Vue.js 3 Composition API
- Nuxt 3 framework patterns
- shadcn-vue component library
- Responsive design and CSS
- TypeScript integration
- State management with Pinia

**Typical Specs**:
- Individual component specifications
- UI pattern libraries
- Responsive design approaches
- Component interaction flows

## python-developer

**Role**: Backend development and API implementation

**Responsibilities**:
- Create backend service specifications
- Implement APIs and data processing
- Design database schemas and queries
- Optimize performance and reliability
- Document service interfaces and behavior

**Expertise Areas**:
- FastAPI and Python web frameworks
- Database design and operations
- API design and documentation
- Authentication and security
- Performance optimization
- Testing and validation

**Typical Specs**:
- API endpoint specifications
- Database schema designs
- Service integration patterns
- Performance optimization strategies

## Agent Collaboration Patterns

### Handoff Protocol
1. Agent receives spec through `/spec implement [name]`
2. Agent reviews spec and creates implementation plan
3. Agent documents progress and decisions in spec
4. Agent hands off to another agent if cross-domain work needed
5. Final agent updates spec with completion status

### Communication Standards
- All agents update specs through basic-memory MCP tools
- Document decisions and trade-offs in spec notes
- Link related specs and components
- Preserve context for future reference

### Quality Standards
- Follow existing codebase patterns and conventions
- Write tests that validate spec requirements
- Document implementation choices
- Consider maintainability and extensibility

```

--------------------------------------------------------------------------------
/src/basic_memory/repository/search_index_row.py:
--------------------------------------------------------------------------------

```python
"""Search index data structures."""

import json
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
from pathlib import Path

from basic_memory.schemas.search import SearchItemType


@dataclass
class SearchIndexRow:
    """Search result with score and metadata."""

    project_id: int
    id: int
    type: str
    file_path: str

    # date values
    created_at: datetime
    updated_at: datetime

    permalink: Optional[str] = None
    metadata: Optional[dict] = None

    # assigned in result
    score: Optional[float] = None

    # Type-specific fields
    title: Optional[str] = None  # entity
    content_stems: Optional[str] = None  # entity, observation
    content_snippet: Optional[str] = None  # entity, observation
    entity_id: Optional[int] = None  # observations
    category: Optional[str] = None  # observations
    from_id: Optional[int] = None  # relations
    to_id: Optional[int] = None  # relations
    relation_type: Optional[str] = None  # relations

    @property
    def content(self):
        return self.content_snippet

    @property
    def directory(self) -> str:
        """Extract directory part from file_path.

        For a file at "projects/notes/ideas.md", returns "/projects/notes"
        For a file at root level "README.md", returns "/"
        """
        if not self.type == SearchItemType.ENTITY.value and not self.file_path:
            return ""

        # Normalize path separators to handle both Windows (\) and Unix (/) paths
        normalized_path = Path(self.file_path).as_posix()

        # Split the path by slashes
        parts = normalized_path.split("/")

        # If there's only one part (e.g., "README.md"), it's at the root
        if len(parts) <= 1:
            return "/"

        # Join all parts except the last one (filename)
        directory_path = "/".join(parts[:-1])
        return f"/{directory_path}"

    def to_insert(self, serialize_json: bool = True):
        """Convert to dict for database insertion.

        Args:
            serialize_json: If True, converts metadata dict to JSON string (for SQLite).
                           If False, keeps metadata as dict (for Postgres JSONB).
        """
        return {
            "id": self.id,
            "title": self.title,
            "content_stems": self.content_stems,
            "content_snippet": self.content_snippet,
            "permalink": self.permalink,
            "file_path": self.file_path,
            "type": self.type,
            "metadata": json.dumps(self.metadata)
            if serialize_json and self.metadata
            else self.metadata,
            "from_id": self.from_id,
            "to_id": self.to_id,
            "relation_type": self.relation_type,
            "entity_id": self.entity_id,
            "category": self.category,
            "created_at": self.created_at if self.created_at else None,
            "updated_at": self.updated_at if self.updated_at else None,
            "project_id": self.project_id,
        }

```

--------------------------------------------------------------------------------
/.claude/commands/release/beta.md:
--------------------------------------------------------------------------------

```markdown
# /beta - Create Beta Release

Create a new beta release using the automated justfile target with quality checks and tagging.

## Usage
```
/beta <version>
```

**Parameters:**
- `version` (required): Beta version like `v0.13.2b1` or `v0.13.2rc1`

## Implementation

You are an expert release manager for the Basic Memory project. When the user runs `/beta`, execute the following steps:

### Step 1: Pre-flight Validation
1. Verify version format matches `v\d+\.\d+\.\d+(b\d+|rc\d+)` pattern
2. Check current git status for uncommitted changes
3. Verify we're on the `main` branch
4. Confirm no existing tag with this version

### Step 2: Use Justfile Automation
Execute the automated beta release process:
```bash
just beta <version>
```

The justfile target handles:
- ✅ Beta version format validation (supports b1, b2, rc1, etc.)
- ✅ Git status and branch checks
- ✅ Quality checks (`just check` - lint, format, type-check, tests)
- ✅ Version update in `src/basic_memory/__init__.py`
- ✅ Automatic commit with proper message
- ✅ Tag creation and pushing to GitHub
- ✅ Beta release workflow trigger

### Step 3: Monitor Beta Release
1. Check GitHub Actions workflow starts successfully
2. Monitor workflow at: https://github.com/basicmachines-co/basic-memory/actions
3. Verify PyPI pre-release publication
4. Test beta installation: `uv tool install basic-memory --pre`

### Step 4: Beta Testing Instructions
Provide users with beta testing instructions:

```bash
# Install/upgrade to beta
uv tool install basic-memory --pre

# Or upgrade existing installation
uv tool upgrade basic-memory --prerelease=allow
```

## Version Guidelines
- **First beta**: `v0.13.2b1` 
- **Subsequent betas**: `v0.13.2b2`, `v0.13.2b3`, etc.
- **Release candidates**: `v0.13.2rc1`, `v0.13.2rc2`, etc.
- **Final release**: `v0.13.2` (use `/release` command)

## Error Handling
- If `just beta` fails, examine the error output for specific issues
- If quality checks fail, fix issues and retry
- If version format is invalid, correct and retry
- If tag already exists, increment version number

## Success Output
```
✅ Beta Release v0.13.2b1 Created Successfully!

🏷️  Tag: v0.13.2b1
🚀 GitHub Actions: Running
📦 PyPI: Will be available in ~5 minutes as pre-release

Install/test with:
uv tool install basic-memory --pre

Monitor release: https://github.com/basicmachines-co/basic-memory/actions
```

## Beta Testing Workflow
1. **Create beta**: Use `/beta v0.13.2b1`
2. **Test features**: Install and validate new functionality
3. **Fix issues**: Address bugs found during testing
4. **Iterate**: Create `v0.13.2b2` if needed
5. **Release candidate**: Create `v0.13.2rc1` when stable
6. **Final release**: Use `/release v0.13.2` when ready

## Context
- Beta releases are pre-releases for testing new features
- Automatically published to PyPI with pre-release flag
- Uses the automated justfile target for consistency
- Version is automatically updated in `__init__.py`
- Ideal for validating changes before stable release
- Supports both beta (b1, b2) and release candidate (rc1, rc2) versions
```

--------------------------------------------------------------------------------
/src/basic_memory/cli/commands/cloud/cloud_utils.py:
--------------------------------------------------------------------------------

```python
"""Shared utilities for cloud operations."""

from basic_memory.cli.commands.cloud.api_client import make_api_request
from basic_memory.config import ConfigManager
from basic_memory.schemas.cloud import (
    CloudProjectList,
    CloudProjectCreateRequest,
    CloudProjectCreateResponse,
)
from basic_memory.utils import generate_permalink


class CloudUtilsError(Exception):
    """Exception raised for cloud utility errors."""

    pass


async def fetch_cloud_projects(
    *,
    api_request=make_api_request,
) -> CloudProjectList:
    """Fetch list of projects from cloud API.

    Returns:
        CloudProjectList with projects from cloud
    """
    try:
        config_manager = ConfigManager()
        config = config_manager.config
        host_url = config.cloud_host.rstrip("/")

        response = await api_request(method="GET", url=f"{host_url}/proxy/projects/projects")

        return CloudProjectList.model_validate(response.json())
    except Exception as e:
        raise CloudUtilsError(f"Failed to fetch cloud projects: {e}") from e


async def create_cloud_project(
    project_name: str,
    *,
    api_request=make_api_request,
) -> CloudProjectCreateResponse:
    """Create a new project on cloud.

    Args:
        project_name: Name of project to create

    Returns:
        CloudProjectCreateResponse with project details from API
    """
    try:
        config_manager = ConfigManager()
        config = config_manager.config
        host_url = config.cloud_host.rstrip("/")

        # Use generate_permalink to ensure consistent naming
        project_path = generate_permalink(project_name)

        project_data = CloudProjectCreateRequest(
            name=project_name,
            path=project_path,
            set_default=False,
        )

        response = await api_request(
            method="POST",
            url=f"{host_url}/proxy/projects/projects",
            headers={"Content-Type": "application/json"},
            json_data=project_data.model_dump(),
        )

        return CloudProjectCreateResponse.model_validate(response.json())
    except Exception as e:
        raise CloudUtilsError(f"Failed to create cloud project '{project_name}': {e}") from e


async def sync_project(project_name: str, force_full: bool = False) -> None:
    """Trigger sync for a specific project on cloud.

    Args:
        project_name: Name of project to sync
        force_full: If True, force a full scan bypassing watermark optimization
    """
    try:
        from basic_memory.cli.commands.command_utils import run_sync

        await run_sync(project=project_name, force_full=force_full)
    except Exception as e:
        raise CloudUtilsError(f"Failed to sync project '{project_name}': {e}") from e


async def project_exists(project_name: str, *, api_request=make_api_request) -> bool:
    """Check if a project exists on cloud.

    Args:
        project_name: Name of project to check

    Returns:
        True if project exists, False otherwise
    """
    try:
        projects = await fetch_cloud_projects(api_request=api_request)
        project_names = {p.name for p in projects.projects}
        return project_name in project_names
    except Exception:
        return False

```

--------------------------------------------------------------------------------
/.claude/commands/release/release-check.md:
--------------------------------------------------------------------------------

```markdown
# /release-check - Pre-flight Release Validation

Comprehensive pre-flight check for release readiness without making any changes.

## Usage
```
/release-check [version]
```

**Parameters:**
- `version` (optional): Version to validate like `v0.13.0`. If not provided, determines from context.

## Implementation

You are an expert QA engineer for the Basic Memory project. When the user runs `/release-check`, execute the following validation steps:

### Step 1: Environment Validation
1. **Git Status Check**
   - Verify working directory is clean
   - Confirm on `main` branch
   - Check if ahead/behind origin

2. **Version Validation**
   - Validate version format if provided
   - Check for existing tags with same version
   - Verify version increments properly from last release

### Step 2: Code Quality Gates
1. **Test Suite Validation**
   ```bash
   just test
   ```
   - All tests must pass
   - Check test coverage (target: 95%+)
   - Validate no skipped critical tests

2. **Code Quality Checks**
   ```bash
   just lint
   just type-check
   ```
   - No linting errors
   - No type checking errors
   - Code formatting is consistent

### Step 3: Documentation Validation
1. **Changelog Check**
   - CHANGELOG.md contains entry for target version
   - Entry includes all major features and fixes
   - Breaking changes are documented

2. **Documentation Currency**
   - README.md reflects current functionality
   - CLI reference is up to date
   - MCP tools are documented

### Step 4: Dependency Validation
1. **Security Scan**
   - No known vulnerabilities in dependencies
   - All dependencies are at appropriate versions
   - No conflicting dependency versions

2. **Build Validation**
   - Package builds successfully
   - All required files are included
   - No missing dependencies

### Step 5: Issue Tracking Validation
1. **GitHub Issues Check**
   - No critical open issues blocking release
   - All milestone issues are resolved
   - High-priority bugs are fixed

2. **Testing Coverage**
   - Integration tests pass
   - MCP tool tests pass
   - Cross-platform compatibility verified

## Report Format

Generate a comprehensive report:

```
🔍 Release Readiness Check for v0.13.0

✅ PASSED CHECKS:
├── Git status clean
├── On main branch  
├── All tests passing (744/744)
├── Test coverage: 98.2%
├── Type checking passed
├── Linting passed
├── CHANGELOG.md updated
└── No critical issues open

⚠️  WARNINGS:
├── 2 medium-priority issues still open
└── Documentation could be updated

❌ BLOCKING ISSUES:
└── None found

🎯 RELEASE READINESS: ✅ READY

Recommended next steps:
1. Address warnings if desired
2. Run `/release v0.13.0` when ready
```

## Validation Criteria

### Must Pass (Blocking)
- [ ] All tests pass
- [ ] No type errors
- [ ] No linting errors  
- [ ] Working directory clean
- [ ] On main branch
- [ ] CHANGELOG.md has version entry
- [ ] No critical open issues

### Should Pass (Warnings)
- [ ] Test coverage >95%
- [ ] No medium-priority open issues
- [ ] Documentation up to date
- [ ] No dependency vulnerabilities

## Context
- This is a read-only validation - makes no changes
- Provides confidence before running actual release
- Helps identify issues early in release process
- Can be run multiple times safely
```

--------------------------------------------------------------------------------
/tests/schemas/test_search.py:
--------------------------------------------------------------------------------

```python
"""Tests for search schemas."""

from datetime import datetime

from basic_memory.schemas.search import (
    SearchItemType,
    SearchQuery,
    SearchResult,
    SearchResponse,
)


def test_search_modes():
    """Test different search modes."""
    # Exact permalink
    query = SearchQuery(permalink="specs/search")
    assert query.permalink == "specs/search"
    assert query.text is None

    # Pattern match
    query = SearchQuery(permalink="specs/*")
    assert query.permalink == "specs/*"
    assert query.text is None

    # Text search
    query = SearchQuery(text="search implementation")
    assert query.text == "search implementation"
    assert query.permalink is None


def test_search_filters():
    """Test search result filtering."""
    query = SearchQuery(
        text="search",
        entity_types=[SearchItemType.ENTITY],
        types=["component"],
        after_date=datetime(2024, 1, 1),
    )
    assert query.entity_types == [SearchItemType.ENTITY]
    assert query.types == ["component"]
    assert query.after_date == "2024-01-01T00:00:00"


def test_search_result():
    """Test search result structure."""
    result = SearchResult(
        title="test",
        type=SearchItemType.ENTITY,
        entity="some_entity",
        score=0.8,
        metadata={"entity_type": "component"},
        permalink="specs/search",
        file_path="specs/search.md",
    )
    assert result.type == SearchItemType.ENTITY
    assert result.score == 0.8
    assert result.metadata == {"entity_type": "component"}


def test_observation_result():
    """Test observation result fields."""
    result = SearchResult(
        title="test",
        permalink="specs/search",
        file_path="specs/search.md",
        type=SearchItemType.OBSERVATION,
        score=0.5,
        metadata={},
        entity="some_entity",
        category="tech",
    )
    assert result.entity == "some_entity"
    assert result.category == "tech"


def test_relation_result():
    """Test relation result fields."""
    result = SearchResult(
        title="test",
        permalink="specs/search",
        file_path="specs/search.md",
        type=SearchItemType.RELATION,
        entity="some_entity",
        score=0.5,
        metadata={},
        from_entity="123",
        to_entity="456",
        relation_type="depends_on",
    )
    assert result.from_entity == "123"
    assert result.to_entity == "456"
    assert result.relation_type == "depends_on"


def test_search_response():
    """Test search response wrapper."""
    results = [
        SearchResult(
            title="test",
            permalink="specs/search",
            file_path="specs/search.md",
            type=SearchItemType.ENTITY,
            entity="some_entity",
            score=0.8,
            metadata={},
        ),
        SearchResult(
            title="test",
            permalink="specs/search",
            file_path="specs/search.md",
            type=SearchItemType.ENTITY,
            entity="some_entity",
            score=0.6,
            metadata={},
        ),
    ]
    response = SearchResponse(results=results, current_page=1, page_size=1)
    assert len(response.results) == 2
    assert response.results[0].score > response.results[1].score

```

--------------------------------------------------------------------------------
/src/basic_memory/api/v2/routers/directory_router.py:
--------------------------------------------------------------------------------

```python
"""V2 Directory Router - ID-based directory tree operations.

This router provides directory structure browsing for projects using
external_id UUIDs instead of name-based identifiers.

Key improvements:
- Direct project lookup via external_id UUIDs
- Consistent with other v2 endpoints
- Better performance through indexed queries
"""

from typing import List, Optional

from fastapi import APIRouter, Query, Path

from basic_memory.deps import DirectoryServiceV2ExternalDep
from basic_memory.schemas.directory import DirectoryNode

router = APIRouter(prefix="/directory", tags=["directory-v2"])


@router.get("/tree", response_model=DirectoryNode, response_model_exclude_none=True)
async def get_directory_tree(
    directory_service: DirectoryServiceV2ExternalDep,
    project_id: str = Path(..., description="Project external UUID"),
):
    """Get hierarchical directory structure from the knowledge base.

    Args:
        directory_service: Service for directory operations
        project_id: Project external UUID

    Returns:
        DirectoryNode representing the root of the hierarchical tree structure
    """
    # Get a hierarchical directory tree for the specific project
    tree = await directory_service.get_directory_tree()

    # Return the hierarchical tree
    return tree


@router.get("/structure", response_model=DirectoryNode, response_model_exclude_none=True)
async def get_directory_structure(
    directory_service: DirectoryServiceV2ExternalDep,
    project_id: str = Path(..., description="Project external UUID"),
):
    """Get folder structure for navigation (no files).

    Optimized endpoint for folder tree navigation. Returns only directory nodes
    without file metadata. For full tree with files, use /directory/tree.

    Args:
        directory_service: Service for directory operations
        project_id: Project external UUID

    Returns:
        DirectoryNode tree containing only folders (type="directory")
    """
    structure = await directory_service.get_directory_structure()
    return structure


@router.get("/list", response_model=List[DirectoryNode], response_model_exclude_none=True)
async def list_directory(
    directory_service: DirectoryServiceV2ExternalDep,
    project_id: str = Path(..., description="Project external UUID"),
    dir_name: str = Query("/", description="Directory path to list"),
    depth: int = Query(1, ge=1, le=10, description="Recursion depth (1-10)"),
    file_name_glob: Optional[str] = Query(
        None, description="Glob pattern for filtering file names"
    ),
):
    """List directory contents with filtering and depth control.

    Args:
        directory_service: Service for directory operations
        project_id: Project external UUID
        dir_name: Directory path to list (default: root "/")
        depth: Recursion depth (1-10, default: 1 for immediate children only)
        file_name_glob: Optional glob pattern for filtering file names (e.g., "*.md", "*meeting*")

    Returns:
        List of DirectoryNode objects matching the criteria
    """
    # Get directory listing with filtering
    nodes = await directory_service.list_directory(
        dir_name=dir_name,
        depth=depth,
        file_name_glob=file_name_glob,
    )

    return nodes

```

--------------------------------------------------------------------------------
/src/basic_memory/cli/commands/cloud/rclone_config.py:
--------------------------------------------------------------------------------

```python
"""rclone configuration management for Basic Memory Cloud.

This module provides simplified rclone configuration for SPEC-20.
Uses a single "basic-memory-cloud" remote for all operations.
"""

import configparser
import os
import shutil
from pathlib import Path
from typing import Optional

from rich.console import Console

console = Console()


class RcloneConfigError(Exception):
    """Exception raised for rclone configuration errors."""

    pass


def get_rclone_config_path() -> Path:
    """Get the path to rclone configuration file."""
    config_dir = Path.home() / ".config" / "rclone"
    config_dir.mkdir(parents=True, exist_ok=True)
    return config_dir / "rclone.conf"


def backup_rclone_config() -> Optional[Path]:
    """Create a backup of existing rclone config."""
    config_path = get_rclone_config_path()
    if not config_path.exists():
        return None

    backup_path = config_path.with_suffix(f".conf.backup-{os.getpid()}")
    shutil.copy2(config_path, backup_path)
    console.print(f"[dim]Created backup: {backup_path}[/dim]")
    return backup_path


def load_rclone_config() -> configparser.ConfigParser:
    """Load existing rclone configuration."""
    config = configparser.ConfigParser()
    config_path = get_rclone_config_path()

    if config_path.exists():
        config.read(config_path)

    return config


def save_rclone_config(config: configparser.ConfigParser) -> None:
    """Save rclone configuration to file."""
    config_path = get_rclone_config_path()

    with open(config_path, "w") as f:
        config.write(f)

    console.print(f"[dim]Updated rclone config: {config_path}[/dim]")


def configure_rclone_remote(
    access_key: str,
    secret_key: str,
    endpoint: str = "https://fly.storage.tigris.dev",
    region: str = "auto",
) -> str:
    """Configure single rclone remote named 'basic-memory-cloud'.

    This is the simplified approach from SPEC-20 that uses one remote
    for all Basic Memory cloud operations (not tenant-specific).

    Args:
        access_key: S3 access key ID
        secret_key: S3 secret access key
        endpoint: S3-compatible endpoint URL
        region: S3 region (default: auto)

    Returns:
        The remote name: "basic-memory-cloud"
    """
    # Backup existing config
    backup_rclone_config()

    # Load existing config
    config = load_rclone_config()

    # Single remote name (not tenant-specific)
    REMOTE_NAME = "basic-memory-cloud"

    # Add/update the remote section
    if not config.has_section(REMOTE_NAME):
        config.add_section(REMOTE_NAME)

    config.set(REMOTE_NAME, "type", "s3")
    config.set(REMOTE_NAME, "provider", "Other")
    config.set(REMOTE_NAME, "access_key_id", access_key)
    config.set(REMOTE_NAME, "secret_access_key", secret_key)
    config.set(REMOTE_NAME, "endpoint", endpoint)
    config.set(REMOTE_NAME, "region", region)
    # Prevent unnecessary encoding of filenames (only encode slashes and invalid UTF-8)
    # This prevents files with spaces like "Hello World.md" from being quoted
    config.set(REMOTE_NAME, "encoding", "Slash,InvalidUtf8")
    # Save updated config
    save_rclone_config(config)

    console.print(f"[green]Configured rclone remote: {REMOTE_NAME}[/green]")
    return REMOTE_NAME

```

--------------------------------------------------------------------------------
/src/basic_memory/cli/commands/import_chatgpt.py:
--------------------------------------------------------------------------------

```python
"""Import command for ChatGPT conversations."""

import json
from pathlib import Path
from typing import Annotated, Tuple

import typer
from basic_memory.cli.app import import_app
from basic_memory.cli.commands.command_utils import run_with_cleanup
from basic_memory.config import ConfigManager, get_project_config
from basic_memory.importers import ChatGPTImporter
from basic_memory.markdown import EntityParser, MarkdownProcessor
from basic_memory.services.file_service import FileService
from loguru import logger
from rich.console import Console
from rich.panel import Panel

console = Console()


async def get_importer_dependencies() -> Tuple[MarkdownProcessor, FileService]:
    """Get MarkdownProcessor and FileService instances for importers."""
    config = get_project_config()
    app_config = ConfigManager().config
    entity_parser = EntityParser(config.home)
    markdown_processor = MarkdownProcessor(entity_parser, app_config=app_config)
    file_service = FileService(config.home, markdown_processor, app_config=app_config)
    return markdown_processor, file_service


@import_app.command(name="chatgpt", help="Import conversations from ChatGPT JSON export.")
def import_chatgpt(
    conversations_json: Annotated[
        Path, typer.Argument(help="Path to ChatGPT conversations.json file")
    ] = Path("conversations.json"),
    folder: Annotated[
        str, typer.Option(help="The folder to place the files in.")
    ] = "conversations",
):
    """Import chat conversations from ChatGPT JSON format.

    This command will:
    1. Read the complex tree structure of messages
    2. Convert them to linear markdown conversations
    3. Save as clean, readable markdown files

    After importing, run 'basic-memory sync' to index the new files.
    """

    try:
        if not conversations_json.exists():  # pragma: no cover
            typer.echo(f"Error: File not found: {conversations_json}", err=True)
            raise typer.Exit(1)

        # Get importer dependencies
        markdown_processor, file_service = run_with_cleanup(get_importer_dependencies())
        config = get_project_config()
        # Process the file
        base_path = config.home / folder
        console.print(f"\nImporting chats from {conversations_json}...writing to {base_path}")

        # Create importer and run import
        importer = ChatGPTImporter(config.home, markdown_processor, file_service)
        with conversations_json.open("r", encoding="utf-8") as file:
            json_data = json.load(file)
            result = run_with_cleanup(importer.import_data(json_data, folder))

        if not result.success:  # pragma: no cover
            typer.echo(f"Error during import: {result.error_message}", err=True)
            raise typer.Exit(1)

        # Show results
        console.print(
            Panel(
                f"[green]Import complete![/green]\n\n"
                f"Imported {result.conversations} conversations\n"
                f"Containing {result.messages} messages",
                expand=False,
            )
        )

        console.print("\nRun 'basic-memory sync' to index the new files.")

    except Exception as e:
        logger.error("Import failed")
        typer.echo(f"Error during import: {e}", err=True)
        raise typer.Exit(1)

```

--------------------------------------------------------------------------------
/src/basic_memory/cli/commands/import_memory_json.py:
--------------------------------------------------------------------------------

```python
"""Import command for basic-memory CLI to import from JSON memory format."""

import json
from pathlib import Path
from typing import Annotated, Tuple

import typer
from basic_memory.cli.app import import_app
from basic_memory.cli.commands.command_utils import run_with_cleanup
from basic_memory.config import ConfigManager, get_project_config
from basic_memory.importers.memory_json_importer import MemoryJsonImporter
from basic_memory.markdown import EntityParser, MarkdownProcessor
from basic_memory.services.file_service import FileService
from loguru import logger
from rich.console import Console
from rich.panel import Panel

console = Console()


async def get_importer_dependencies() -> Tuple[MarkdownProcessor, FileService]:
    """Get MarkdownProcessor and FileService instances for importers."""
    config = get_project_config()
    app_config = ConfigManager().config
    entity_parser = EntityParser(config.home)
    markdown_processor = MarkdownProcessor(entity_parser, app_config=app_config)
    file_service = FileService(config.home, markdown_processor, app_config=app_config)
    return markdown_processor, file_service


@import_app.command()
def memory_json(
    json_path: Annotated[Path, typer.Argument(..., help="Path to memory.json file")] = Path(
        "memory.json"
    ),
    destination_folder: Annotated[
        str, typer.Option(help="Optional destination folder within the project")
    ] = "",
):
    """Import entities and relations from a memory.json file.

    This command will:
    1. Read entities and relations from the JSON file
    2. Create markdown files for each entity
    3. Include outgoing relations in each entity's markdown
    """

    if not json_path.exists():
        typer.echo(f"Error: File not found: {json_path}", err=True)
        raise typer.Exit(1)

    config = get_project_config()
    try:
        # Get importer dependencies
        markdown_processor, file_service = run_with_cleanup(get_importer_dependencies())

        # Create the importer
        importer = MemoryJsonImporter(config.home, markdown_processor, file_service)

        # Process the file
        base_path = config.home if not destination_folder else config.home / destination_folder
        console.print(f"\nImporting from {json_path}...writing to {base_path}")

        # Run the import for json log format
        file_data = []
        with json_path.open("r", encoding="utf-8") as file:
            for line in file:
                json_data = json.loads(line)
                file_data.append(json_data)
        result = run_with_cleanup(importer.import_data(file_data, destination_folder))

        if not result.success:  # pragma: no cover
            typer.echo(f"Error during import: {result.error_message}", err=True)
            raise typer.Exit(1)

        # Show results
        console.print(
            Panel(
                f"[green]Import complete![/green]\n\n"
                f"Created {result.entities} entities\n"
                f"Added {result.relations} relations\n"
                f"Skipped {result.skipped_entities} entities\n",
                expand=False,
            )
        )

    except Exception as e:
        logger.error("Import failed")
        typer.echo(f"Error during import: {e}", err=True)
        raise typer.Exit(1)

```

--------------------------------------------------------------------------------
/src/basic_memory/mcp/container.py:
--------------------------------------------------------------------------------

```python
"""MCP composition root for Basic Memory.

This container owns reading ConfigManager and environment variables for the
MCP server entrypoint. Downstream modules receive config/dependencies explicitly
rather than reading globals.

Design principles:
- Only this module reads ConfigManager directly
- Runtime mode (cloud/local/test) is resolved here
- File sync decisions are centralized here
"""

from dataclasses import dataclass
from typing import TYPE_CHECKING

from basic_memory.config import BasicMemoryConfig, ConfigManager
from basic_memory.runtime import RuntimeMode, resolve_runtime_mode

if TYPE_CHECKING:  # pragma: no cover
    from basic_memory.sync import SyncCoordinator


@dataclass
class McpContainer:
    """Composition root for the MCP server entrypoint.

    Holds resolved configuration and runtime context.
    Created once at server startup, then used to wire dependencies.
    """

    config: BasicMemoryConfig
    mode: RuntimeMode

    @classmethod
    def create(cls) -> "McpContainer":
        """Create container by reading ConfigManager.

        This is the single point where MCP reads global config.
        """
        config = ConfigManager().config
        mode = resolve_runtime_mode(
            cloud_mode_enabled=config.cloud_mode_enabled,
            is_test_env=config.is_test_env,
        )
        return cls(config=config, mode=mode)

    # --- Runtime Mode Properties ---

    @property
    def should_sync_files(self) -> bool:
        """Whether local file sync should be started.

        Sync is enabled when:
        - sync_changes is True in config
        - Not in test mode (tests manage their own sync)
        - Not in cloud mode (cloud handles sync differently)
        """
        return self.config.sync_changes and not self.mode.is_test and not self.mode.is_cloud

    @property
    def sync_skip_reason(self) -> str | None:
        """Reason why sync is skipped, or None if sync should run.

        Useful for logging why sync was disabled.
        """
        if self.mode.is_test:
            return "Test environment detected"
        if self.mode.is_cloud:
            return "Cloud mode enabled"
        if not self.config.sync_changes:
            return "Sync changes disabled"
        return None

    def create_sync_coordinator(self) -> "SyncCoordinator":
        """Create a SyncCoordinator with this container's settings.

        Returns:
            SyncCoordinator configured for this runtime environment
        """
        # Deferred import to avoid circular dependency
        from basic_memory.sync import SyncCoordinator

        return SyncCoordinator(
            config=self.config,
            should_sync=self.should_sync_files,
            skip_reason=self.sync_skip_reason,
        )


# Module-level container instance (set by lifespan)
_container: McpContainer | None = None


def get_container() -> McpContainer:
    """Get the current MCP container.

    Raises:
        RuntimeError: If container hasn't been initialized
    """
    if _container is None:
        raise RuntimeError("MCP container not initialized. Call set_container() first.")
    return _container


def set_container(container: McpContainer) -> None:
    """Set the MCP container (called by lifespan)."""
    global _container
    _container = container

```

--------------------------------------------------------------------------------
/src/basic_memory/cli/commands/import_claude_projects.py:
--------------------------------------------------------------------------------

```python
"""Import command for basic-memory CLI to import project data from Claude.ai."""

import json
from pathlib import Path
from typing import Annotated, Tuple

import typer
from basic_memory.cli.app import claude_app
from basic_memory.cli.commands.command_utils import run_with_cleanup
from basic_memory.config import ConfigManager, get_project_config
from basic_memory.importers.claude_projects_importer import ClaudeProjectsImporter
from basic_memory.markdown import EntityParser, MarkdownProcessor
from basic_memory.services.file_service import FileService
from loguru import logger
from rich.console import Console
from rich.panel import Panel

console = Console()


async def get_importer_dependencies() -> Tuple[MarkdownProcessor, FileService]:
    """Get MarkdownProcessor and FileService instances for importers."""
    config = get_project_config()
    app_config = ConfigManager().config
    entity_parser = EntityParser(config.home)
    markdown_processor = MarkdownProcessor(entity_parser, app_config=app_config)
    file_service = FileService(config.home, markdown_processor, app_config=app_config)
    return markdown_processor, file_service


@claude_app.command(name="projects", help="Import projects from Claude.ai.")
def import_projects(
    projects_json: Annotated[Path, typer.Argument(..., help="Path to projects.json file")] = Path(
        "projects.json"
    ),
    base_folder: Annotated[
        str, typer.Option(help="The base folder to place project files in.")
    ] = "projects",
):
    """Import project data from Claude.ai.

    This command will:
    1. Create a directory for each project
    2. Store docs in a docs/ subdirectory
    3. Place prompt template in project root

    After importing, run 'basic-memory sync' to index the new files.
    """
    config = get_project_config()
    try:
        if not projects_json.exists():
            typer.echo(f"Error: File not found: {projects_json}", err=True)
            raise typer.Exit(1)

        # Get importer dependencies
        markdown_processor, file_service = run_with_cleanup(get_importer_dependencies())

        # Create the importer
        importer = ClaudeProjectsImporter(config.home, markdown_processor, file_service)

        # Process the file
        base_path = config.home / base_folder if base_folder else config.home
        console.print(f"\nImporting projects from {projects_json}...writing to {base_path}")

        # Run the import
        with projects_json.open("r", encoding="utf-8") as file:
            json_data = json.load(file)
            result = run_with_cleanup(importer.import_data(json_data, base_folder))

        if not result.success:  # pragma: no cover
            typer.echo(f"Error during import: {result.error_message}", err=True)
            raise typer.Exit(1)

        # Show results
        console.print(
            Panel(
                f"[green]Import complete![/green]\n\n"
                f"Imported {result.documents} project documents\n"
                f"Imported {result.prompts} prompt templates",
                expand=False,
            )
        )

        console.print("\nRun 'basic-memory sync' to index the new files.")

    except Exception as e:
        logger.error("Import failed")
        typer.echo(f"Error during import: {e}", err=True)
        raise typer.Exit(1)

```

--------------------------------------------------------------------------------
/src/basic_memory/cli/commands/import_claude_conversations.py:
--------------------------------------------------------------------------------

```python
"""Import command for basic-memory CLI to import chat data from conversations2.json format."""

import json
from pathlib import Path
from typing import Annotated, Tuple

import typer
from basic_memory.cli.app import claude_app
from basic_memory.cli.commands.command_utils import run_with_cleanup
from basic_memory.config import ConfigManager, get_project_config
from basic_memory.importers.claude_conversations_importer import ClaudeConversationsImporter
from basic_memory.markdown import EntityParser, MarkdownProcessor
from basic_memory.services.file_service import FileService
from loguru import logger
from rich.console import Console
from rich.panel import Panel

console = Console()


async def get_importer_dependencies() -> Tuple[MarkdownProcessor, FileService]:
    """Get MarkdownProcessor and FileService instances for importers."""
    config = get_project_config()
    app_config = ConfigManager().config
    entity_parser = EntityParser(config.home)
    markdown_processor = MarkdownProcessor(entity_parser, app_config=app_config)
    file_service = FileService(config.home, markdown_processor, app_config=app_config)
    return markdown_processor, file_service


@claude_app.command(name="conversations", help="Import chat conversations from Claude.ai.")
def import_claude(
    conversations_json: Annotated[
        Path, typer.Argument(..., help="Path to conversations.json file")
    ] = Path("conversations.json"),
    folder: Annotated[
        str, typer.Option(help="The folder to place the files in.")
    ] = "conversations",
):
    """Import chat conversations from conversations2.json format.

    This command will:
    1. Read chat data and nested messages
    2. Create markdown files for each conversation
    3. Format content in clean, readable markdown

    After importing, run 'basic-memory sync' to index the new files.
    """

    config = get_project_config()
    try:
        if not conversations_json.exists():
            typer.echo(f"Error: File not found: {conversations_json}", err=True)
            raise typer.Exit(1)

        # Get importer dependencies
        markdown_processor, file_service = run_with_cleanup(get_importer_dependencies())

        # Create the importer
        importer = ClaudeConversationsImporter(config.home, markdown_processor, file_service)

        # Process the file
        base_path = config.home / folder
        console.print(f"\nImporting chats from {conversations_json}...writing to {base_path}")

        # Run the import
        with conversations_json.open("r", encoding="utf-8") as file:
            json_data = json.load(file)
            result = run_with_cleanup(importer.import_data(json_data, folder))

        if not result.success:  # pragma: no cover
            typer.echo(f"Error during import: {result.error_message}", err=True)
            raise typer.Exit(1)

        # Show results
        console.print(
            Panel(
                f"[green]Import complete![/green]\n\n"
                f"Imported {result.conversations} conversations\n"
                f"Containing {result.messages} messages",
                expand=False,
            )
        )

        console.print("\nRun 'basic-memory sync' to index the new files.")

    except Exception as e:
        logger.error("Import failed")
        typer.echo(f"Error during import: {e}", err=True)
        raise typer.Exit(1)

```

--------------------------------------------------------------------------------
/src/basic_memory/models/project.py:
--------------------------------------------------------------------------------

```python
"""Project model for Basic Memory."""

import uuid
from datetime import datetime, UTC
from typing import Optional

from sqlalchemy import (
    Integer,
    String,
    Text,
    Boolean,
    DateTime,
    Float,
    Index,
    event,
)
from sqlalchemy.orm import Mapped, mapped_column, relationship

from basic_memory.models.base import Base
from basic_memory.utils import generate_permalink


class Project(Base):
    """Project model for Basic Memory.

    A project represents a collection of knowledge entities that are grouped together.
    Projects are stored in the app-level database and provide context for all knowledge
    operations.
    """

    __tablename__ = "project"
    __table_args__ = (
        # Regular indexes
        Index("ix_project_name", "name", unique=True),
        Index("ix_project_permalink", "permalink", unique=True),
        Index("ix_project_external_id", "external_id", unique=True),
        Index("ix_project_path", "path"),
        Index("ix_project_created_at", "created_at"),
        Index("ix_project_updated_at", "updated_at"),
    )

    # Core identity
    id: Mapped[int] = mapped_column(Integer, primary_key=True)
    # External UUID for API references - stable identifier that won't change
    external_id: Mapped[str] = mapped_column(String, unique=True, default=lambda: str(uuid.uuid4()))
    name: Mapped[str] = mapped_column(String, unique=True)
    description: Mapped[Optional[str]] = mapped_column(Text, nullable=True)

    # URL-friendly identifier generated from name
    permalink: Mapped[str] = mapped_column(String, unique=True)

    # Filesystem path to project directory
    path: Mapped[str] = mapped_column(String)

    # Status flags
    is_active: Mapped[bool] = mapped_column(Boolean, default=True)
    is_default: Mapped[Optional[bool]] = mapped_column(Boolean, default=None, nullable=True)

    # Timestamps
    created_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True), default=lambda: datetime.now(UTC)
    )
    updated_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True),
        default=lambda: datetime.now(UTC),
        onupdate=lambda: datetime.now(UTC),
    )

    # Sync optimization - scan watermark tracking
    last_scan_timestamp: Mapped[Optional[float]] = mapped_column(Float, nullable=True)
    last_file_count: Mapped[Optional[int]] = mapped_column(Integer, nullable=True)

    # Define relationships to entities, observations, and relations
    # These relationships will be established once we add project_id to those models
    entities = relationship("Entity", back_populates="project", cascade="all, delete-orphan")

    def __repr__(self) -> str:  # pragma: no cover
        return f"Project(id={self.id}, external_id='{self.external_id}', name='{self.name}', permalink='{self.permalink}', path='{self.path}')"


@event.listens_for(Project, "before_insert")
@event.listens_for(Project, "before_update")
def set_project_permalink(mapper, connection, project):
    """Generate URL-friendly permalink for the project if needed.

    This event listener ensures the permalink is always derived from the name,
    even if the name changes.
    """
    # If the name changed or permalink is empty, regenerate permalink
    if not project.permalink or project.permalink != generate_permalink(project.name):
        project.permalink = generate_permalink(project.name)

```

--------------------------------------------------------------------------------
/.github/workflows/claude-code-review.yml:
--------------------------------------------------------------------------------

```yaml
name: Claude Code Review

on:
  pull_request:
    types: [opened, synchronize]
    # Optional: Only run on specific file changes
    # paths:
    #   - "src/**/*.ts"
    #   - "src/**/*.tsx"
    #   - "src/**/*.js"
    #   - "src/**/*.jsx"

jobs:
  claude-review:
    # Only run for organization members and collaborators
    if: |
      github.event.pull_request.author_association == 'OWNER' ||
      github.event.pull_request.author_association == 'MEMBER' ||
      github.event.pull_request.author_association == 'COLLABORATOR'

    runs-on: ubuntu-latest
    permissions:
      contents: read
      pull-requests: write
      issues: read
      id-token: write

    steps:
      - name: Checkout repository
        uses: actions/checkout@v4
        with:
          fetch-depth: 1

      - name: Run Claude Code Review
        id: claude-review
        uses: anthropics/claude-code-action@v1
        with:
          claude_code_oauth_token: ${{ secrets.CLAUDE_CODE_OAUTH_TOKEN }}
          github_token: ${{ secrets.GITHUB_TOKEN }}
          track_progress: true  # Enable visual progress tracking
          allowed_bots: '*'
          prompt: |
            Review this Basic Memory PR against our team checklist:

            ## Code Quality & Standards
            - [ ] Follows Basic Memory's coding conventions in CLAUDE.md
            - [ ] Python 3.12+ type annotations and async patterns
            - [ ] SQLAlchemy 2.0 best practices
            - [ ] FastAPI and Typer conventions followed
            - [ ] 100-character line length limit maintained
            - [ ] No commented-out code blocks

            ## Testing & Documentation
            - [ ] Unit tests for new functions/methods
            - [ ] Integration tests for new MCP tools
            - [ ] Test coverage for edge cases
            - [ ] **100% test coverage maintained** (use `# pragma: no cover` only for truly hard-to-test code)
            - [ ] Documentation updated (README, docstrings)
            - [ ] CLAUDE.md updated if conventions change

            ## Basic Memory Architecture
            - [ ] MCP tools follow atomic, composable design
            - [ ] Database changes include Alembic migrations
            - [ ] Preserves local-first architecture principles
            - [ ] Knowledge graph operations maintain consistency
            - [ ] Markdown file handling preserves integrity
            - [ ] AI-human collaboration patterns followed

            ## Security & Performance
            - [ ] No hardcoded secrets or credentials
            - [ ] Input validation for MCP tools
            - [ ] Proper error handling and logging
            - [ ] Performance considerations addressed
            - [ ] No sensitive data in logs or commits
            
            ## Compatability
            - [ ] File path comparisons must be windows compatible
            - [ ] Avoid using emojis and unicode characters in console and log output

            Read the CLAUDE.md file for detailed project context. For each checklist item, verify if it's satisfied and comment on any that need attention. Use inline comments for specific code issues and post a summary with checklist results.

          # Allow broader tool access for thorough code review
          claude_args: '--allowed-tools "Bash(gh pr:*),Bash(gh issue:*),Bash(gh api:*),Bash(git log:*),Bash(git show:*),Read,Grep,Glob"'

```

--------------------------------------------------------------------------------
/src/basic_memory/repository/search_repository.py:
--------------------------------------------------------------------------------

```python
"""Repository for search operations.

This module provides the search repository interface.
The actual repository implementations are backend-specific:
- SQLiteSearchRepository: Uses FTS5 virtual tables
- PostgresSearchRepository: Uses tsvector/tsquery with GIN indexes
"""

from datetime import datetime
from typing import List, Optional, Protocol

from sqlalchemy import Result
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker

from basic_memory.config import ConfigManager, DatabaseBackend
from basic_memory.repository.postgres_search_repository import PostgresSearchRepository
from basic_memory.repository.search_index_row import SearchIndexRow
from basic_memory.repository.sqlite_search_repository import SQLiteSearchRepository
from basic_memory.schemas.search import SearchItemType


class SearchRepository(Protocol):
    """Protocol defining the search repository interface.

    Both SQLite and Postgres implementations must satisfy this protocol.
    """

    project_id: int

    async def init_search_index(self) -> None:
        """Initialize the search index schema."""
        ...

    async def search(
        self,
        search_text: Optional[str] = None,
        permalink: Optional[str] = None,
        permalink_match: Optional[str] = None,
        title: Optional[str] = None,
        types: Optional[List[str]] = None,
        after_date: Optional[datetime] = None,
        search_item_types: Optional[List[SearchItemType]] = None,
        limit: int = 10,
        offset: int = 0,
    ) -> List[SearchIndexRow]:
        """Search across indexed content."""
        ...

    async def index_item(self, search_index_row: SearchIndexRow) -> None:
        """Index a single item."""
        ...

    async def bulk_index_items(self, search_index_rows: List[SearchIndexRow]) -> None:
        """Index multiple items in a batch."""
        ...

    async def delete_by_permalink(self, permalink: str) -> None:
        """Delete item by permalink."""
        ...

    async def delete_by_entity_id(self, entity_id: int) -> None:
        """Delete items by entity ID."""
        ...

    async def execute_query(self, query, params: dict) -> Result:
        """Execute a raw SQL query."""
        ...


def create_search_repository(
    session_maker: async_sessionmaker[AsyncSession],
    project_id: int,
    database_backend: Optional[DatabaseBackend] = None,
) -> SearchRepository:
    """Factory function to create the appropriate search repository based on database backend.

    Args:
        session_maker: SQLAlchemy async session maker
        project_id: Project ID for the repository
        database_backend: Optional explicit backend. If not provided, reads from ConfigManager.
            Prefer passing explicitly from composition roots.

    Returns:
        SearchRepository: Backend-appropriate search repository instance
    """
    # Prefer explicit parameter; fall back to ConfigManager for backwards compatibility
    if database_backend is None:
        config = ConfigManager().config
        database_backend = config.database_backend

    if database_backend == DatabaseBackend.POSTGRES:  # pragma: no cover
        return PostgresSearchRepository(session_maker, project_id=project_id)  # pragma: no cover
    else:
        return SQLiteSearchRepository(session_maker, project_id=project_id)


__all__ = [
    "SearchRepository",
    "SearchIndexRow",
    "create_search_repository",
]

```

--------------------------------------------------------------------------------
/src/basic_memory/importers/base.py:
--------------------------------------------------------------------------------

```python
"""Base import service for Basic Memory."""

import logging
from abc import abstractmethod
from pathlib import Path
from typing import TYPE_CHECKING, Any, Optional, TypeVar

from basic_memory.markdown.markdown_processor import MarkdownProcessor
from basic_memory.markdown.schemas import EntityMarkdown
from basic_memory.schemas.importer import ImportResult

if TYPE_CHECKING:  # pragma: no cover
    from basic_memory.services.file_service import FileService

logger = logging.getLogger(__name__)

T = TypeVar("T", bound=ImportResult)


class Importer[T: ImportResult]:
    """Base class for all import services.

    All file operations are delegated to FileService, which can be overridden
    in cloud environments to use S3 or other storage backends.
    """

    def __init__(
        self,
        base_path: Path,
        markdown_processor: MarkdownProcessor,
        file_service: "FileService",
    ):
        """Initialize the import service.

        Args:
            base_path: Base path for the project.
            markdown_processor: MarkdownProcessor instance for markdown serialization.
            file_service: FileService instance for all file operations.
        """
        self.base_path = base_path.resolve()  # Get absolute path
        self.markdown_processor = markdown_processor
        self.file_service = file_service

    @abstractmethod
    async def import_data(self, source_data, destination_folder: str, **kwargs: Any) -> T:
        """Import data from source file to destination folder.

        Args:
            source_path: Path to the source file.
            destination_folder: Destination folder within the project.
            **kwargs: Additional keyword arguments for specific import types.

        Returns:
            ImportResult containing statistics and status of the import.
        """
        pass  # pragma: no cover

    async def write_entity(self, entity: EntityMarkdown, file_path: str | Path) -> str:
        """Write entity to file using FileService.

        This method serializes the entity to markdown and writes it using
        FileService, which handles directory creation and storage backend
        abstraction (local filesystem vs cloud storage).

        Args:
            entity: EntityMarkdown instance to write.
            file_path: Relative path to write the entity to. FileService handles base_path.

        Returns:
            Checksum of written file.
        """
        content = self.markdown_processor.to_markdown_string(entity)
        # FileService.write_file handles directory creation and returns checksum
        return await self.file_service.write_file(file_path, content)

    async def ensure_folder_exists(self, folder: str) -> None:
        """Ensure folder exists using FileService.

        For cloud storage (S3), this is essentially a no-op since S3 doesn't
        have actual folders - they're just key prefixes.

        Args:
            folder: Relative folder path within the project. FileService handles base_path.
        """
        await self.file_service.ensure_directory(folder)

    @abstractmethod
    def handle_error(
        self, message: str, error: Optional[Exception] = None
    ) -> T:  # pragma: no cover
        """Handle errors during import.

        Args:
            message: Error message.
            error: Optional exception that caused the error.

        Returns:
            ImportResult with error information.
        """
        pass

```

--------------------------------------------------------------------------------
/src/basic_memory/models/search.py:
--------------------------------------------------------------------------------

```python
"""Search DDL statements for SQLite and Postgres.

The search_index table is created via raw DDL, not ORM models, because:
- SQLite uses FTS5 virtual tables (cannot be represented as ORM)
- Postgres uses composite primary keys and generated tsvector columns
- Both backends use raw SQL for all search operations via SearchIndexRow dataclass
"""

from sqlalchemy import DDL


# Define Postgres search_index table with composite primary key and tsvector
# This DDL matches the Alembic migration schema (314f1ea54dc4)
# Used by tests to create the table without running full migrations
# NOTE: Split into separate DDL statements because asyncpg doesn't support
# multiple statements in a single execute call.
CREATE_POSTGRES_SEARCH_INDEX_TABLE = DDL("""
CREATE TABLE IF NOT EXISTS search_index (
    id INTEGER NOT NULL,
    project_id INTEGER NOT NULL,
    title TEXT,
    content_stems TEXT,
    content_snippet TEXT,
    permalink VARCHAR,
    file_path VARCHAR,
    type VARCHAR,
    from_id INTEGER,
    to_id INTEGER,
    relation_type VARCHAR,
    entity_id INTEGER,
    category VARCHAR,
    metadata JSONB,
    created_at TIMESTAMP WITH TIME ZONE,
    updated_at TIMESTAMP WITH TIME ZONE,
    textsearchable_index_col tsvector GENERATED ALWAYS AS (
        to_tsvector('english', coalesce(title, '') || ' ' || coalesce(content_stems, ''))
    ) STORED,
    PRIMARY KEY (id, type, project_id),
    FOREIGN KEY (project_id) REFERENCES project(id) ON DELETE CASCADE
)
""")

CREATE_POSTGRES_SEARCH_INDEX_FTS = DDL("""
CREATE INDEX IF NOT EXISTS idx_search_index_fts ON search_index USING gin(textsearchable_index_col)
""")

CREATE_POSTGRES_SEARCH_INDEX_METADATA = DDL("""
CREATE INDEX IF NOT EXISTS idx_search_index_metadata_gin ON search_index USING gin(metadata jsonb_path_ops)
""")

# Partial unique index on (permalink, project_id) for non-null permalinks
# This prevents duplicate permalinks per project and is used by upsert operations
# in PostgresSearchRepository to handle race conditions during parallel indexing
CREATE_POSTGRES_SEARCH_INDEX_PERMALINK = DDL("""
CREATE UNIQUE INDEX IF NOT EXISTS uix_search_index_permalink_project
ON search_index (permalink, project_id)
WHERE permalink IS NOT NULL
""")

# Define FTS5 virtual table creation for SQLite only
# This DDL is executed separately for SQLite databases
CREATE_SEARCH_INDEX = DDL("""
CREATE VIRTUAL TABLE IF NOT EXISTS search_index USING fts5(
    -- Core entity fields
    id UNINDEXED,          -- Row ID
    title,                 -- Title for searching
    content_stems,         -- Main searchable content split into stems
    content_snippet,       -- File content snippet for display
    permalink,             -- Stable identifier (now indexed for path search)
    file_path UNINDEXED,   -- Physical location
    type UNINDEXED,        -- entity/relation/observation

    -- Project context
    project_id UNINDEXED,  -- Project identifier

    -- Relation fields
    from_id UNINDEXED,     -- Source entity
    to_id UNINDEXED,       -- Target entity
    relation_type UNINDEXED, -- Type of relation

    -- Observation fields
    entity_id UNINDEXED,   -- Parent entity
    category UNINDEXED,    -- Observation category

    -- Common fields
    metadata UNINDEXED,    -- JSON metadata
    created_at UNINDEXED,  -- Creation timestamp
    updated_at UNINDEXED,  -- Last update

    -- Configuration
    tokenize='unicode61 tokenchars 0x2F',  -- Hex code for /
    prefix='1,2,3,4'                    -- Support longer prefixes for paths
);
""")

```

--------------------------------------------------------------------------------
/src/basic_memory/mcp/clients/memory.py:
--------------------------------------------------------------------------------

```python
"""Typed client for memory/context API operations.

Encapsulates all /v2/projects/{project_id}/memory/* endpoints.
"""

from typing import Optional

from httpx import AsyncClient

from basic_memory.mcp.tools.utils import call_get
from basic_memory.schemas.memory import GraphContext


class MemoryClient:
    """Typed client for memory context operations.

    Centralizes:
    - API path construction for /v2/projects/{project_id}/memory/*
    - Response validation via Pydantic models
    - Consistent error handling through call_* utilities

    Usage:
        async with get_client() as http_client:
            client = MemoryClient(http_client, project_id)
            context = await client.build_context("memory://specs/search")
    """

    def __init__(self, http_client: AsyncClient, project_id: str):
        """Initialize the memory client.

        Args:
            http_client: HTTPX AsyncClient for making requests
            project_id: Project external_id (UUID) for API calls
        """
        self.http_client = http_client
        self.project_id = project_id
        self._base_path = f"/v2/projects/{project_id}/memory"

    async def build_context(
        self,
        path: str,
        *,
        depth: int = 1,
        timeframe: Optional[str] = None,
        page: int = 1,
        page_size: int = 10,
        max_related: int = 10,
    ) -> GraphContext:
        """Build context from a memory path.

        Args:
            path: The path to build context for (without memory:// prefix)
            depth: How deep to traverse relations
            timeframe: Time filter (e.g., "7d", "1 week")
            page: Page number (1-indexed)
            page_size: Results per page
            max_related: Maximum related items per result

        Returns:
            GraphContext with hierarchical results

        Raises:
            ToolError: If the request fails
        """
        params: dict = {
            "depth": depth,
            "page": page,
            "page_size": page_size,
            "max_related": max_related,
        }
        if timeframe:
            params["timeframe"] = timeframe

        response = await call_get(
            self.http_client,
            f"{self._base_path}/{path}",
            params=params,
        )
        return GraphContext.model_validate(response.json())

    async def recent(
        self,
        *,
        timeframe: str = "7d",
        depth: int = 1,
        types: Optional[list[str]] = None,
        page: int = 1,
        page_size: int = 10,
    ) -> GraphContext:
        """Get recent activity.

        Args:
            timeframe: Time filter (e.g., "7d", "1 week", "2 days ago")
            depth: How deep to traverse relations
            types: Filter by item types
            page: Page number (1-indexed)
            page_size: Results per page

        Returns:
            GraphContext with recent activity

        Raises:
            ToolError: If the request fails
        """
        params: dict = {
            "timeframe": timeframe,
            "depth": depth,
            "page": page,
            "page_size": page_size,
        }
        if types:
            # Join types as comma-separated string if provided
            params["type"] = ",".join(types) if isinstance(types, list) else types

        response = await call_get(
            self.http_client,
            f"{self._base_path}/recent",
            params=params,
        )
        return GraphContext.model_validate(response.json())

```

--------------------------------------------------------------------------------
/CLA.md:
--------------------------------------------------------------------------------

```markdown
# Contributor License Agreement

## Copyright Assignment and License Grant

By signing this Contributor License Agreement ("Agreement"), you accept and agree to the following terms and conditions
for your present and future Contributions submitted
to Basic Machines LLC. Except for the license granted herein to Basic Machines LLC and recipients of software
distributed by Basic Machines LLC, you reserve all right,
title, and interest in and to your Contributions.

### 1. Definitions

"You" (or "Your") shall mean the copyright owner or legal entity authorized by the copyright owner that is making this
Agreement with Basic Machines LLC.

"Contribution" shall mean any original work of authorship, including any modifications or additions to an existing work,
that is intentionally submitted by You to Basic
Machines LLC for inclusion in, or documentation of, any of the products owned or managed by Basic Machines LLC (the "
Work").

### 2. Grant of Copyright License

Subject to the terms and conditions of this Agreement, You hereby grant to Basic Machines LLC and to recipients of
software distributed by Basic Machines LLC a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable copyright license to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the
Work, and to permit persons to whom the Work is furnished to do so.

### 3. Assignment of Copyright

You hereby assign to Basic Machines LLC all right, title, and interest worldwide in all Copyright covering your
Contributions. Basic Machines LLC may license the
Contributions under any license terms, including copyleft, permissive, commercial, or proprietary licenses.

### 4. Grant of Patent License

Subject to the terms and conditions of this Agreement, You hereby grant to Basic Machines LLC and to recipients of
software distributed by Basic Machines LLC a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable (except as stated in this section) patent license to
make, have made, use, offer to sell, sell, import, and
otherwise transfer the Work.

### 5. Developer Certificate of Origin

By making a Contribution to this project, You certify that:

(a) The Contribution was created in whole or in part by You and You have the right to submit it under this Agreement; or

(b) The Contribution is based upon previous work that, to the best of Your knowledge, is covered under an appropriate
open source license and You have the right under that
license to submit that work with modifications, whether created in whole or in part by You, under this Agreement; or

(c) The Contribution was provided directly to You by some other person who certified (a), (b) or (c) and You have not
modified it.

(d) You understand and agree that this project and the Contribution are public and that a record of the Contribution (
including all personal information You submit with
it, including Your sign-off) is maintained indefinitely and may be redistributed consistent with this project or the
open source license(s) involved.

### 6. Representations

You represent that you are legally entitled to grant the above license and assignment. If your employer(s) has rights to
intellectual property that you create that
includes your Contributions, you represent that you have received permission to make Contributions on behalf of that
employer, or that your employer has waived such rights
for your Contributions to Basic Machines LLC.

---

This Agreement is effective as of the date you first submit a Contribution to Basic Machines LLC.

```

--------------------------------------------------------------------------------
/src/basic_memory/cli/commands/command_utils.py:
--------------------------------------------------------------------------------

```python
"""utility functions for commands"""

import asyncio
from typing import Optional, TypeVar, Coroutine, Any

from mcp.server.fastmcp.exceptions import ToolError
import typer

from rich.console import Console

from basic_memory import db
from basic_memory.mcp.async_client import get_client

from basic_memory.mcp.tools.utils import call_post, call_get
from basic_memory.mcp.project_context import get_active_project
from basic_memory.schemas import ProjectInfoResponse
from basic_memory.telemetry import shutdown_telemetry

console = Console()

T = TypeVar("T")


def run_with_cleanup(coro: Coroutine[Any, Any, T]) -> T:
    """Run an async coroutine with proper database cleanup.

    This helper ensures database connections and telemetry threads are cleaned up
    before the event loop closes, preventing process hangs in CLI commands.

    Args:
        coro: The coroutine to run

    Returns:
        The result of the coroutine
    """

    async def _with_cleanup() -> T:
        try:
            return await coro
        finally:
            await db.shutdown_db()
            # Shutdown telemetry to stop the OpenPanel background thread
            # This prevents hangs on Python 3.14+ during thread shutdown
            shutdown_telemetry()

    return asyncio.run(_with_cleanup())


async def run_sync(
    project: Optional[str] = None,
    force_full: bool = False,
    run_in_background: bool = True,
):
    """Run sync operation via API endpoint.

    Args:
        project: Optional project name
        force_full: If True, force a full scan bypassing watermark optimization
        run_in_background: If True, return immediately; if False, wait for completion
    """

    try:
        async with get_client() as client:
            project_item = await get_active_project(client, project, None)
            url = f"{project_item.project_url}/project/sync"
            params = []
            if force_full:
                params.append("force_full=true")
            if not run_in_background:
                params.append("run_in_background=false")
            if params:
                url += "?" + "&".join(params)
            response = await call_post(client, url)
            data = response.json()
            # Background mode returns {"message": "..."}, foreground returns SyncReportResponse
            if "message" in data:
                console.print(f"[green]{data['message']}[/green]")
            else:
                # Foreground mode - show summary of sync results
                total = data.get("total", 0)
                new_count = len(data.get("new", []))
                modified_count = len(data.get("modified", []))
                deleted_count = len(data.get("deleted", []))
                console.print(
                    f"[green]Synced {total} files[/green] "
                    f"(new: {new_count}, modified: {modified_count}, deleted: {deleted_count})"
                )
    except (ToolError, ValueError) as e:
        console.print(f"[red]Sync failed: {e}[/red]")
        raise typer.Exit(1)


async def get_project_info(project: str):
    """Get project information via API endpoint."""

    try:
        async with get_client() as client:
            project_item = await get_active_project(client, project, None)
            response = await call_get(client, f"{project_item.project_url}/project/info")
            return ProjectInfoResponse.model_validate(response.json())
    except (ToolError, ValueError) as e:
        console.print(f"[red]Sync failed: {e}[/red]")
        raise typer.Exit(1)

```

--------------------------------------------------------------------------------
/test-int/mcp/test_lifespan_shutdown_sync_task_cancellation_integration.py:
--------------------------------------------------------------------------------

```python
"""
Integration test for FastAPI lifespan shutdown behavior.

This test verifies the asyncio cancellation pattern used by the API lifespan:
when the background sync task is cancelled during shutdown, it must be *awaited*
before database shutdown begins. This prevents "hang on exit" scenarios in
`asyncio.run(...)` callers (e.g. CLI/MCP clients using httpx ASGITransport).
"""

import asyncio

from httpx import ASGITransport, AsyncClient


def test_lifespan_shutdown_awaits_sync_task_cancellation(app, monkeypatch):
    """
    Ensure lifespan shutdown awaits the cancelled background sync task.

    Why this is deterministic:
    - Cancelling a task does not make it "done" immediately; it becomes done only
      once the event loop schedules it and it processes the CancelledError.
    - In the buggy version, shutdown proceeded directly to db.shutdown_db()
      immediately after calling cancel(), so at *entry* to shutdown_db the task
      is still not done.
    - In the fixed version, SyncCoordinator.stop() awaits the task before returning,
      so by the time shutdown_db is called, the task is done (cancelled).
    """

    # Import the *module* (not the package-level FastAPI `basic_memory.api.app` export)
    # so monkeypatching affects the exact symbols referenced inside lifespan().
    #
    # Note: `basic_memory/api/__init__.py` re-exports `app`, so `import basic_memory.api.app`
    # can resolve to the FastAPI instance rather than the `basic_memory.api.app` module.
    import importlib

    api_app_module = importlib.import_module("basic_memory.api.app")
    container_module = importlib.import_module("basic_memory.api.container")
    init_module = importlib.import_module("basic_memory.services.initialization")

    # Keep startup cheap: we don't need real DB init for this ordering test.
    async def _noop_initialize_app(_app_config):
        return None

    monkeypatch.setattr(api_app_module, "initialize_app", _noop_initialize_app)

    # Patch the container's init_database to return fake objects
    async def _fake_init_database(self):
        self.engine = object()
        self.session_maker = object()
        return self.engine, self.session_maker

    monkeypatch.setattr(container_module.ApiContainer, "init_database", _fake_init_database)

    # Make the sync task long-lived so it must be cancelled on shutdown.
    # Patch at the source module where SyncCoordinator imports it.
    async def _fake_initialize_file_sync(_app_config):
        await asyncio.Event().wait()

    monkeypatch.setattr(init_module, "initialize_file_sync", _fake_initialize_file_sync)

    # Assert ordering: shutdown_db must be called only after the sync_task is done.
    # SyncCoordinator stores the task in _sync_task attribute.
    async def _assert_sync_task_done_before_db_shutdown(self):
        sync_coordinator = api_app_module.app.state.sync_coordinator
        assert sync_coordinator._sync_task is not None
        assert sync_coordinator._sync_task.done()

    monkeypatch.setattr(
        container_module.ApiContainer,
        "shutdown_database",
        _assert_sync_task_done_before_db_shutdown,
    )

    async def _run_client_once():
        async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
            # Any request is sufficient to trigger lifespan startup/shutdown.
            await client.get("/__nonexistent__")

    # Use asyncio.run to match the CLI/MCP execution model where loop teardown
    # would hang if a background task is left running.
    asyncio.run(_run_client_once())

```

--------------------------------------------------------------------------------
/tests/cli/test_cli_container.py:
--------------------------------------------------------------------------------

```python
"""Tests for CLI container composition root."""

import pytest

from basic_memory.cli.container import (
    CliContainer,
    get_container,
    set_container,
    get_or_create_container,
)
from basic_memory.runtime import RuntimeMode


class TestCliContainer:
    """Tests for CliContainer."""

    def test_create_from_config(self, app_config):
        """Container can be created from config."""
        container = CliContainer(config=app_config, mode=RuntimeMode.LOCAL)
        assert container.config == app_config
        assert container.mode == RuntimeMode.LOCAL

    def test_is_cloud_mode_when_cloud(self, app_config):
        """is_cloud_mode returns True in cloud mode."""
        container = CliContainer(config=app_config, mode=RuntimeMode.CLOUD)
        assert container.is_cloud_mode is True

    def test_is_cloud_mode_when_local(self, app_config):
        """is_cloud_mode returns False in local mode."""
        container = CliContainer(config=app_config, mode=RuntimeMode.LOCAL)
        assert container.is_cloud_mode is False

    def test_is_cloud_mode_when_test(self, app_config):
        """is_cloud_mode returns False in test mode."""
        container = CliContainer(config=app_config, mode=RuntimeMode.TEST)
        assert container.is_cloud_mode is False


class TestContainerAccessors:
    """Tests for container get/set functions."""

    def test_get_container_raises_when_not_set(self, monkeypatch):
        """get_container raises RuntimeError when container not initialized."""
        import basic_memory.cli.container as container_module

        monkeypatch.setattr(container_module, "_container", None)

        with pytest.raises(RuntimeError, match="CLI container not initialized"):
            get_container()

    def test_set_and_get_container(self, app_config, monkeypatch):
        """set_container allows get_container to return the container."""
        import basic_memory.cli.container as container_module

        container = CliContainer(config=app_config, mode=RuntimeMode.LOCAL)
        monkeypatch.setattr(container_module, "_container", None)

        set_container(container)
        assert get_container() is container


class TestGetOrCreateContainer:
    """Tests for get_or_create_container - unique to CLI container."""

    def test_creates_new_when_none_exists(self, monkeypatch):
        """get_or_create_container creates a new container when none exists."""
        import basic_memory.cli.container as container_module

        monkeypatch.setattr(container_module, "_container", None)

        container = get_or_create_container()
        assert container is not None
        assert isinstance(container, CliContainer)

    def test_returns_existing_when_set(self, app_config, monkeypatch):
        """get_or_create_container returns existing container if already set."""
        import basic_memory.cli.container as container_module

        existing = CliContainer(config=app_config, mode=RuntimeMode.LOCAL)
        monkeypatch.setattr(container_module, "_container", existing)

        result = get_or_create_container()
        assert result is existing

    def test_sets_module_level_container(self, monkeypatch):
        """get_or_create_container sets the module-level container."""
        import basic_memory.cli.container as container_module

        monkeypatch.setattr(container_module, "_container", None)

        container = get_or_create_container()

        # Verify it was set at module level
        assert container_module._container is container
        # Verify get_container now works
        assert get_container() is container

```

--------------------------------------------------------------------------------
/tests/mcp/test_recent_activity_prompt_modes.py:
--------------------------------------------------------------------------------

```python
from datetime import UTC, datetime

import pytest

from basic_memory.mcp.prompts.recent_activity import recent_activity_prompt
from basic_memory.schemas.memory import (
    ActivityStats,
    ContextResult,
    GraphContext,
    MemoryMetadata,
    ProjectActivity,
    ProjectActivitySummary,
    EntitySummary,
)
from basic_memory.schemas.search import SearchItemType


def _entity(title: str, entity_id: int = 1) -> EntitySummary:
    return EntitySummary(
        entity_id=entity_id,
        permalink=title.lower().replace(" ", "-"),
        title=title,
        content=None,
        file_path=f"{title}.md",
        created_at=datetime.now(UTC),
    )


@pytest.mark.asyncio
async def test_recent_activity_prompt_discovery_mode(monkeypatch):
    recent = ProjectActivitySummary(
        projects={
            "p1": ProjectActivity(
                project_name="p1",
                project_path="/tmp/p1",
                activity=GraphContext(
                    results=[
                        ContextResult(
                            primary_result=_entity("A"), observations=[], related_results=[]
                        )
                    ],
                    metadata=MemoryMetadata(
                        uri=None,
                        types=[SearchItemType.ENTITY],
                        depth=1,
                        timeframe="7d",
                        generated_at=datetime.now(UTC),
                    ),
                ),
                item_count=1,
            ),
            "p2": ProjectActivity(
                project_name="p2",
                project_path="/tmp/p2",
                activity=GraphContext(
                    results=[
                        ContextResult(
                            primary_result=_entity("B", 2), observations=[], related_results=[]
                        )
                    ],
                    metadata=MemoryMetadata(
                        uri=None,
                        types=[SearchItemType.ENTITY],
                        depth=1,
                        timeframe="7d",
                        generated_at=datetime.now(UTC),
                    ),
                ),
                item_count=1,
            ),
        },
        summary=ActivityStats(
            total_projects=2, active_projects=2, most_active_project="p1", total_items=2
        ),
        timeframe="7d",
        generated_at=datetime.now(UTC),
    )

    async def fake_fn(**_kwargs):
        return recent

    monkeypatch.setattr("basic_memory.mcp.prompts.recent_activity.recent_activity.fn", fake_fn)

    out = await recent_activity_prompt.fn(timeframe="7d", project=None)  # pyright: ignore[reportGeneralTypeIssues]
    assert "Recent Activity Across All Projects" in out
    assert "Cross-Project Activity Discovery" in out


@pytest.mark.asyncio
async def test_recent_activity_prompt_project_mode(monkeypatch):
    recent = GraphContext(
        results=[
            ContextResult(primary_result=_entity("Only"), observations=[], related_results=[])
        ],
        metadata=MemoryMetadata(
            uri=None,
            types=[SearchItemType.ENTITY],
            depth=1,
            timeframe="1d",
            generated_at=datetime.now(UTC),
        ),
    )

    async def fake_fn(**_kwargs):
        return recent

    monkeypatch.setattr("basic_memory.mcp.prompts.recent_activity.recent_activity.fn", fake_fn)

    out = await recent_activity_prompt.fn(timeframe="1d", project="proj")  # pyright: ignore[reportGeneralTypeIssues]
    assert "Recent Activity in proj" in out
    assert "Opportunity to Capture Activity Summary" in out

```

--------------------------------------------------------------------------------
/src/basic_memory/schemas/prompt.py:
--------------------------------------------------------------------------------

```python
"""Request and response schemas for prompt-related operations."""

from typing import Optional, List, Any, Dict
from pydantic import BaseModel, Field

from basic_memory.schemas.base import TimeFrame
from basic_memory.schemas.memory import EntitySummary, ObservationSummary, RelationSummary


class PromptContextItem(BaseModel):
    """Container for primary and related results to render in a prompt."""

    primary_results: List[EntitySummary]
    related_results: List[EntitySummary | ObservationSummary | RelationSummary]


class ContinueConversationRequest(BaseModel):
    """Request for generating a continue conversation prompt.

    Used to provide context for continuing a conversation on a specific topic
    or with recent activity from a given timeframe.
    """

    topic: Optional[str] = Field(None, description="Topic or keyword to search for")
    timeframe: Optional[TimeFrame] = Field(
        None, description="How far back to look for activity (e.g. '1d', '1 week')"
    )
    # Limit depth to max 2 for performance reasons - higher values cause significant slowdown
    search_items_limit: int = Field(
        5,
        description="Maximum number of search results to include in context (max 10)",
        ge=1,
        le=10,
    )
    depth: int = Field(
        1,
        description="How many relationship 'hops' to follow when building context (max 5)",
        ge=1,
        le=5,
    )
    # Limit related items to prevent overloading the context
    related_items_limit: int = Field(
        5, description="Maximum number of related items to include in context (max 10)", ge=1, le=10
    )


class SearchPromptRequest(BaseModel):
    """Request for generating a search results prompt.

    Used to format search results into a prompt with context and suggestions.
    """

    query: str = Field(..., description="The search query text")
    timeframe: Optional[TimeFrame] = Field(
        None, description="Optional timeframe to limit results (e.g. '1d', '1 week')"
    )


class PromptMetadata(BaseModel):
    """Metadata about a prompt response.

    Contains statistical information about the prompt generation process
    and results, useful for debugging and UI display.
    """

    query: Optional[str] = Field(None, description="The original query or topic")
    timeframe: Optional[str] = Field(None, description="The timeframe used for filtering")
    search_count: int = Field(0, description="Number of search results found")
    context_count: int = Field(0, description="Number of context items retrieved")
    observation_count: int = Field(0, description="Total number of observations included")
    relation_count: int = Field(0, description="Total number of relations included")
    total_items: int = Field(0, description="Total number of all items included in the prompt")
    search_limit: int = Field(0, description="Maximum search results requested")
    context_depth: int = Field(0, description="Context depth used")
    related_limit: int = Field(0, description="Maximum related items requested")
    generated_at: str = Field(..., description="ISO timestamp when this prompt was generated")


class PromptResponse(BaseModel):
    """Response containing the rendered prompt.

    Includes both the rendered prompt text and the context that was used
    to render it, for potential client-side use.
    """

    prompt: str = Field(..., description="The rendered prompt text")
    context: Dict[str, Any] = Field(..., description="The context used to render the prompt")
    metadata: PromptMetadata = Field(
        ..., description="Metadata about the prompt generation process"
    )

```

--------------------------------------------------------------------------------
/test-int/mcp/test_read_note_integration.py:
--------------------------------------------------------------------------------

```python
"""
Integration tests for read_note MCP tool.

Tests the full flow: MCP client -> MCP server -> FastAPI -> database
"""

import pytest
from fastmcp import Client


@pytest.mark.asyncio
async def test_read_note_after_write(mcp_server, app, test_project):
    """Test read_note after write_note using real database."""

    async with Client(mcp_server) as client:
        # First write a note
        write_result = await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Test Note",
                "folder": "test",
                "content": "# Test Note\n\nThis is test content.",
                "tags": "test,integration",
            },
        )

        assert len(write_result.content) == 1
        assert write_result.content[0].type == "text"
        assert "Test Note.md" in write_result.content[0].text

        # Then read it back
        read_result = await client.call_tool(
            "read_note",
            {
                "project": test_project.name,
                "identifier": "Test Note",
            },
        )

        assert len(read_result.content) == 1
        assert read_result.content[0].type == "text"
        result_text = read_result.content[0].text

        # Should contain the note content and metadata
        assert "# Test Note" in result_text
        assert "This is test content." in result_text
        assert "test/test-note" in result_text  # permalink


@pytest.mark.asyncio
async def test_read_note_underscored_folder_by_permalink(mcp_server, app, test_project):
    """Test read_note with permalink from underscored folder.

    Reproduces bug #416: read_note fails to find notes when given permalinks
    from underscored folder names (e.g., _archive/, _drafts/), even though
    the permalink is copied directly from the note's YAML frontmatter.
    """

    async with Client(mcp_server) as client:
        # Create a note in an underscored folder
        write_result = await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Example Note",
                "folder": "_archive/articles",
                "content": "# Example Note\n\nThis is a test note in an underscored folder.",
                "tags": "test,archive",
            },
        )

        assert len(write_result.content) == 1
        assert write_result.content[0].type == "text"
        write_text = write_result.content[0].text

        # Verify the file path includes the underscore
        assert "_archive/articles/Example Note.md" in write_text

        # Verify the permalink has underscores stripped (this is the expected behavior)
        assert "archive/articles/example-note" in write_text

        # Now try to read the note using the permalink (without underscores)
        # This is the exact scenario from the bug report - using the permalink
        # that was generated in the YAML frontmatter
        read_result = await client.call_tool(
            "read_note",
            {
                "project": test_project.name,
                "identifier": "archive/articles/example-note",  # permalink without underscores
            },
        )

        # This should succeed - the note should be found by its permalink
        assert len(read_result.content) == 1
        assert read_result.content[0].type == "text"
        result_text = read_result.content[0].text

        # Should contain the note content
        assert "# Example Note" in result_text
        assert "This is a test note in an underscored folder." in result_text
        assert "archive/articles/example-note" in result_text  # permalink

```

--------------------------------------------------------------------------------
/src/basic_memory/alembic/alembic.ini:
--------------------------------------------------------------------------------

```
# A generic, single database configuration.

[alembic]
# path to migration scripts
# Use forward slashes (/) also on windows to provide an os agnostic path
script_location = .

# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s
# Uncomment the line below if you want the files to be prepended with date and time
# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file
# for all available tokens
# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s

# sys.path path, will be prepended to sys.path if present.
# defaults to the current working directory.
prepend_sys_path = .

# timezone to use when rendering the date within the migration file
# as well as the filename.
# If specified, requires the python>=3.9 or backports.zoneinfo library and tzdata library.
# Any required deps can installed by adding `alembic[tz]` to the pip requirements
# string value is passed to ZoneInfo()
# leave blank for localtime
# timezone =

# max length of characters to apply to the "slug" field
# truncate_slug_length = 40

# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false

# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false

# version location specification; This defaults
# to migrations/versions.  When using multiple version
# directories, initial revisions must be specified with --version-path.
# The path separator used here should be the separator specified by "version_path_separator" below.
# version_locations = %(here)s/bar:%(here)s/bat:migrations/versions

# version path separator; As mentioned above, this is the character used to split
# version_locations. The default within new alembic.ini files is "os", which uses os.pathsep.
# If this key is omitted entirely, it falls back to the legacy behavior of splitting on spaces and/or commas.
# Valid values for version_path_separator are:
#
# version_path_separator = :
# version_path_separator = ;
# version_path_separator = space
# version_path_separator = newline
#
# Use os.pathsep. Default configuration used for new projects.
version_path_separator = os

# set to 'true' to search source files recursively
# in each "version_locations" directory
# new in Alembic version 1.10
# recursive_version_locations = false

# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8

sqlalchemy.url = driver://user:pass@localhost/dbname


[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts.  See the documentation for further
# detail and examples

# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks = black
# black.type = console_scripts
# black.entrypoint = black
# black.options = -l 79 REVISION_SCRIPT_FILENAME

# lint with attempts to fix using "ruff" - use the exec runner, execute a binary
# hooks = ruff
# ruff.type = exec
# ruff.executable = %(here)s/.venv/bin/ruff
# ruff.options = --fix REVISION_SCRIPT_FILENAME

# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic

[handlers]
keys = console

[formatters]
keys = generic

[logger_root]
level = WARNING
handlers = console
qualname =

[logger_sqlalchemy]
level = WARNING
handlers =
qualname = sqlalchemy.engine

[logger_alembic]
level = INFO
handlers =
qualname = alembic

[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic

[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

```

--------------------------------------------------------------------------------
/tests/mcp/test_project_context.py:
--------------------------------------------------------------------------------

```python
"""Tests for project context utilities (no standard-library mock usage).

These functions are config/env driven, so we use the real ConfigManager-backed
test config file and pytest monkeypatch for environment variables.
"""

from __future__ import annotations

import pytest


@pytest.mark.asyncio
async def test_cloud_mode_requires_project_by_default(config_manager, monkeypatch):
    from basic_memory.mcp.project_context import resolve_project_parameter

    cfg = config_manager.load_config()
    cfg.cloud_mode = True
    config_manager.save_config(cfg)

    with pytest.raises(ValueError) as exc_info:
        await resolve_project_parameter(project=None, allow_discovery=False)

    assert "No project specified" in str(exc_info.value)
    assert "Project is required for cloud mode" in str(exc_info.value)


@pytest.mark.asyncio
async def test_cloud_mode_allows_discovery_when_enabled(config_manager):
    from basic_memory.mcp.project_context import resolve_project_parameter

    cfg = config_manager.load_config()
    cfg.cloud_mode = True
    config_manager.save_config(cfg)

    assert await resolve_project_parameter(project=None, allow_discovery=True) is None


@pytest.mark.asyncio
async def test_cloud_mode_returns_project_when_specified(config_manager):
    from basic_memory.mcp.project_context import resolve_project_parameter

    cfg = config_manager.load_config()
    cfg.cloud_mode = True
    config_manager.save_config(cfg)

    assert await resolve_project_parameter(project="my-project") == "my-project"


@pytest.mark.asyncio
async def test_local_mode_uses_env_var_priority(config_manager, monkeypatch):
    from basic_memory.mcp.project_context import resolve_project_parameter

    cfg = config_manager.load_config()
    cfg.cloud_mode = False
    cfg.default_project_mode = False
    config_manager.save_config(cfg)

    monkeypatch.setenv("BASIC_MEMORY_MCP_PROJECT", "env-project")
    assert await resolve_project_parameter(project="explicit-project") == "env-project"


@pytest.mark.asyncio
async def test_local_mode_uses_explicit_project(config_manager, monkeypatch):
    from basic_memory.mcp.project_context import resolve_project_parameter

    cfg = config_manager.load_config()
    cfg.cloud_mode = False
    cfg.default_project_mode = False
    config_manager.save_config(cfg)

    monkeypatch.delenv("BASIC_MEMORY_MCP_PROJECT", raising=False)
    assert await resolve_project_parameter(project="explicit-project") == "explicit-project"


@pytest.mark.asyncio
async def test_local_mode_uses_default_project(config_manager, config_home, monkeypatch):
    from basic_memory.mcp.project_context import resolve_project_parameter

    cfg = config_manager.load_config()
    cfg.cloud_mode = False
    cfg.default_project_mode = True
    # default_project must exist in the config project list, otherwise config validation
    # will coerce it back to an existing default.
    (config_home / "default-project").mkdir(parents=True, exist_ok=True)
    cfg.projects["default-project"] = str(config_home / "default-project")
    cfg.default_project = "default-project"
    config_manager.save_config(cfg)

    monkeypatch.delenv("BASIC_MEMORY_MCP_PROJECT", raising=False)
    assert await resolve_project_parameter(project=None) == "default-project"


@pytest.mark.asyncio
async def test_local_mode_returns_none_when_no_resolution(config_manager, monkeypatch):
    from basic_memory.mcp.project_context import resolve_project_parameter

    cfg = config_manager.load_config()
    cfg.cloud_mode = False
    cfg.default_project_mode = False
    config_manager.save_config(cfg)

    monkeypatch.delenv("BASIC_MEMORY_MCP_PROJECT", raising=False)
    assert await resolve_project_parameter(project=None) is None

```

--------------------------------------------------------------------------------
/src/basic_memory/markdown/utils.py:
--------------------------------------------------------------------------------

```python
"""Utilities for converting between markdown and entity models."""

from pathlib import Path
from typing import Any, Optional


from frontmatter import Post

from basic_memory.file_utils import has_frontmatter, remove_frontmatter, parse_frontmatter
from basic_memory.markdown import EntityMarkdown
from basic_memory.models import Entity
from basic_memory.models import Observation as ObservationModel


def entity_model_from_markdown(
    file_path: Path,
    markdown: EntityMarkdown,
    entity: Optional[Entity] = None,
    project_id: Optional[int] = None,
) -> Entity:
    """
    Convert markdown entity to model. Does not include relations.

    Args:
        file_path: Path to the markdown file
        markdown: Parsed markdown entity
        entity: Optional existing entity to update
        project_id: Project ID for new observations (uses entity.project_id if not provided)

    Returns:
        Entity model populated from markdown

    Raises:
        ValueError: If required datetime fields are missing from markdown
    """

    if not markdown.created or not markdown.modified:  # pragma: no cover
        raise ValueError("Both created and modified dates are required in markdown")

    # Create or update entity
    model = entity or Entity()

    # Update basic fields
    model.title = markdown.frontmatter.title
    model.entity_type = markdown.frontmatter.type
    # Only update permalink if it exists in frontmatter, otherwise preserve existing
    if markdown.frontmatter.permalink is not None:
        model.permalink = markdown.frontmatter.permalink
    model.file_path = file_path.as_posix()
    model.content_type = "text/markdown"
    model.created_at = markdown.created
    model.updated_at = markdown.modified

    # Handle metadata - ensure all values are strings and filter None
    metadata = markdown.frontmatter.metadata or {}
    model.entity_metadata = {k: str(v) for k, v in metadata.items() if v is not None}

    # Get project_id from entity if not provided
    obs_project_id = project_id or (model.project_id if hasattr(model, "project_id") else None)

    # Convert observations
    model.observations = [
        ObservationModel(
            project_id=obs_project_id,
            content=obs.content,
            category=obs.category,
            context=obs.context,
            tags=obs.tags,
        )
        for obs in markdown.observations
    ]

    return model


async def schema_to_markdown(schema: Any) -> Post:
    """
    Convert schema to markdown Post object.

    Args:
        schema: Schema to convert (must have title, entity_type, and permalink attributes)

    Returns:
        Post object with frontmatter metadata
    """
    # Extract content and metadata
    content = schema.content or ""
    entity_metadata = dict(schema.entity_metadata or {})

    # if the content contains frontmatter, remove it and merge
    if has_frontmatter(content):
        content_frontmatter = parse_frontmatter(content)
        content = remove_frontmatter(content)

        # Merge content frontmatter with entity metadata
        # (entity_metadata takes precedence for conflicts)
        content_frontmatter.update(entity_metadata)
        entity_metadata = content_frontmatter

    # Remove special fields for ordered frontmatter
    for field in ["type", "title", "permalink"]:
        entity_metadata.pop(field, None)

    # Create Post with fields ordered by insert order
    post = Post(
        content,
        title=schema.title,
        type=schema.entity_type,
    )
    # set the permalink if passed in
    if schema.permalink:
        post.metadata["permalink"] = schema.permalink

    if entity_metadata:
        post.metadata.update(entity_metadata)

    return post

```

--------------------------------------------------------------------------------
/src/basic_memory/schemas/request.py:
--------------------------------------------------------------------------------

```python
"""Request schemas for interacting with the knowledge graph."""

from typing import List, Optional, Annotated, Literal
from annotated_types import MaxLen, MinLen

from pydantic import BaseModel, field_validator

from basic_memory.schemas.base import (
    Relation,
    Permalink,
)


class SearchNodesRequest(BaseModel):
    """Search for entities in the knowledge graph.

    The search looks across multiple fields:
    - Entity title
    - Entity types
    - summary
    - file content
    - Observations

    Features:
    - Case-insensitive matching
    - Partial word matches
    - Returns full entity objects with relations
    - Includes all matching entities
    - If a category is specified, only entities with that category are returned

    Example Queries:
    - "memory" - Find entities related to memory systems
    - "SQLite" - Find database-related components
    - "test" - Find test-related entities
    - "implementation" - Find concrete implementations
    - "service" - Find service components

    Note: Currently uses SQL ILIKE for matching. Wildcard (*) searches
    and full-text search capabilities are planned for future versions.
    """

    query: Annotated[str, MinLen(1), MaxLen(200)]
    category: Optional[str] = None


class GetEntitiesRequest(BaseModel):
    """Retrieve specific entities by their IDs.

    Used to load complete entity details including all observations
    and relations. Particularly useful for following relations
    discovered through search.
    """

    permalinks: Annotated[List[Permalink], MinLen(1), MaxLen(10)]


class CreateRelationsRequest(BaseModel):
    relations: List[Relation]


class EditEntityRequest(BaseModel):
    """Request schema for editing an existing entity's content.

    This allows for targeted edits without requiring the full entity content.
    Supports various operation types for different editing scenarios.
    """

    operation: Literal["append", "prepend", "find_replace", "replace_section"]
    content: str
    section: Optional[str] = None
    find_text: Optional[str] = None
    expected_replacements: int = 1

    @field_validator("section")
    @classmethod
    def validate_section_for_replace_section(cls, v, info):
        """Ensure section is provided for replace_section operation."""
        if info.data.get("operation") == "replace_section" and not v:
            raise ValueError("section parameter is required for replace_section operation")
        return v

    @field_validator("find_text")
    @classmethod
    def validate_find_text_for_find_replace(cls, v, info):
        """Ensure find_text is provided for find_replace operation."""
        if info.data.get("operation") == "find_replace" and not v:
            raise ValueError("find_text parameter is required for find_replace operation")
        return v


class MoveEntityRequest(BaseModel):
    """Request schema for moving an entity to a new file location.

    This allows moving notes to different paths while maintaining project
    consistency and optionally updating permalinks based on configuration.
    """

    identifier: Annotated[str, MinLen(1), MaxLen(200)]
    destination_path: Annotated[str, MinLen(1), MaxLen(500)]
    project: Optional[str] = None

    @field_validator("destination_path")
    @classmethod
    def validate_destination_path(cls, v):
        """Ensure destination path is relative and valid."""
        if v.startswith("/"):
            raise ValueError("destination_path must be relative, not absolute")
        if ".." in v:
            raise ValueError("destination_path cannot contain '..' path components")
        if not v.strip():
            raise ValueError("destination_path cannot be empty or whitespace only")
        return v.strip()

```

--------------------------------------------------------------------------------
/tests/cli/test_cli_tool_exit.py:
--------------------------------------------------------------------------------

```python
"""Test that CLI tool commands exit cleanly without hanging.

This test ensures that CLI commands properly clean up database connections
on exit, preventing process hangs. See GitHub issue for details.

The issue occurs when:
1. ensure_initialization() calls asyncio.run(initialize_app())
2. initialize_app() creates global database connections via db.get_or_create_db()
3. When asyncio.run() completes, the event loop closes
4. But the global database engine holds async connections that prevent clean exit
5. Process hangs indefinitely

The fix ensures db.shutdown_db() is called before asyncio.run() returns.
"""

import os
import platform
import subprocess
import sys

import pytest

# Windows has different process cleanup behavior that makes these tests unreliable
IS_WINDOWS = platform.system() == "Windows"
SUBPROCESS_TIMEOUT = 10.0
skip_on_windows = pytest.mark.skipif(
    IS_WINDOWS, reason="Subprocess cleanup tests unreliable on Windows CI"
)


@skip_on_windows
class TestCLIToolExit:
    """Test that CLI tool commands exit cleanly."""

    @pytest.mark.parametrize(
        "command",
        [
            ["tool", "--help"],
            ["tool", "write-note", "--help"],
            ["tool", "read-note", "--help"],
            ["tool", "search-notes", "--help"],
            ["tool", "build-context", "--help"],
        ],
    )
    def test_cli_command_exits_cleanly(self, command: list[str]):
        """Test that CLI commands exit without hanging.

        Each command should complete within the timeout without requiring
        manual termination (Ctrl+C).
        """
        full_command = [sys.executable, "-m", "basic_memory.cli.main"] + command

        try:
            result = subprocess.run(
                full_command,
                capture_output=True,
                text=True,
                timeout=SUBPROCESS_TIMEOUT,
            )
            # Command should exit with code 0 for --help
            assert result.returncode == 0, f"Command failed: {result.stderr}"
        except subprocess.TimeoutExpired:
            pytest.fail(
                f"Command '{' '.join(command)}' hung and did not exit within timeout. "
                "This indicates database connections are not being cleaned up properly."
            )

    def test_ensure_initialization_exits_cleanly(self, tmp_path):
        """Test that ensure_initialization doesn't cause process hang.

        This test directly tests the initialization function that's called
        by CLI commands, ensuring it cleans up database connections properly.
        """
        code = """
import asyncio
from basic_memory.config import ConfigManager
from basic_memory.services.initialization import ensure_initialization

app_config = ConfigManager().config
ensure_initialization(app_config)
print("OK")
"""
        try:
            # Ensure the subprocess uses an isolated home directory so ConfigManager doesn't
            # touch the real user profile/AppData (which can be slow/flaky on CI Windows).
            env = dict(os.environ)
            bm_home = tmp_path / "basic-memory-home"
            env["BASIC_MEMORY_HOME"] = str(bm_home)
            env["HOME"] = str(tmp_path)
            env["USERPROFILE"] = str(tmp_path)

            result = subprocess.run(
                [sys.executable, "-c", code],
                capture_output=True,
                text=True,
                timeout=SUBPROCESS_TIMEOUT,
                env=env,
            )
            assert "OK" in result.stdout, f"Unexpected output: {result.stdout}"
        except subprocess.TimeoutExpired:
            pytest.fail(
                "ensure_initialization() caused process hang. "
                "Database connections are not being cleaned up before event loop closes."
            )

```

--------------------------------------------------------------------------------
/tests/api/test_management_router.py:
--------------------------------------------------------------------------------

```python
"""Tests for management router API endpoints (minimal mocking).

These endpoints are mostly simple state checks and wiring; we use stub objects
and pytest monkeypatch instead of standard-library mocks.
"""

from __future__ import annotations

import pytest
from fastapi import FastAPI

from basic_memory.api.routers.management_router import (
    WatchStatusResponse,
    get_watch_status,
    start_watch_service,
    stop_watch_service,
)


class _Request:
    def __init__(self, app: FastAPI):
        self.app = app


class _Task:
    def __init__(self, *, done: bool):
        self._done = done
        self.cancel_called = False

    def done(self) -> bool:
        return self._done

    def cancel(self) -> None:
        self.cancel_called = True


@pytest.fixture
def app_with_state() -> FastAPI:
    app = FastAPI()
    app.state.watch_task = None
    return app


@pytest.mark.asyncio
async def test_get_watch_status_not_running(app_with_state: FastAPI):
    app_with_state.state.watch_task = None
    resp = await get_watch_status(_Request(app_with_state))
    assert isinstance(resp, WatchStatusResponse)
    assert resp.running is False


@pytest.mark.asyncio
async def test_get_watch_status_running(app_with_state: FastAPI):
    app_with_state.state.watch_task = _Task(done=False)
    resp = await get_watch_status(_Request(app_with_state))
    assert resp.running is True


@pytest.mark.asyncio
async def test_start_watch_service_when_not_running(monkeypatch, app_with_state: FastAPI):
    app_with_state.state.watch_task = None

    created = {"watch_service": None, "task": None}

    class _StubWatchService:
        def __init__(self, *, app_config, project_repository):
            self.app_config = app_config
            self.project_repository = project_repository
            created["watch_service"] = self

    def _create_background_sync_task(sync_service, watch_service):
        created["task"] = _Task(done=False)
        return created["task"]

    # start_watch_service imports these inside the function, so patch at the source modules.
    monkeypatch.setattr("basic_memory.sync.WatchService", _StubWatchService)
    monkeypatch.setattr(
        "basic_memory.sync.background_sync.create_background_sync_task",
        _create_background_sync_task,
    )

    project_repository = object()
    sync_service = object()

    resp = await start_watch_service(_Request(app_with_state), project_repository, sync_service)
    assert resp.running is True
    assert app_with_state.state.watch_task is created["task"]
    assert created["watch_service"] is not None
    assert created["watch_service"].project_repository is project_repository


@pytest.mark.asyncio
async def test_start_watch_service_already_running(monkeypatch, app_with_state: FastAPI):
    existing = _Task(done=False)
    app_with_state.state.watch_task = existing

    def _should_not_be_called(*_args, **_kwargs):
        raise AssertionError("create_background_sync_task should not be called if already running")

    monkeypatch.setattr(
        "basic_memory.sync.background_sync.create_background_sync_task",
        _should_not_be_called,
    )

    resp = await start_watch_service(_Request(app_with_state), object(), object())
    assert resp.running is True
    assert app_with_state.state.watch_task is existing


@pytest.mark.asyncio
async def test_stop_watch_service_not_running(app_with_state: FastAPI):
    app_with_state.state.watch_task = None
    resp = await stop_watch_service(_Request(app_with_state))
    assert resp.running is False


@pytest.mark.asyncio
async def test_stop_watch_service_already_done(app_with_state: FastAPI):
    app_with_state.state.watch_task = _Task(done=True)
    resp = await stop_watch_service(_Request(app_with_state))
    assert resp.running is False

```

--------------------------------------------------------------------------------
/test-int/mcp/test_project_state_sync_integration.py:
--------------------------------------------------------------------------------

```python
"""Integration test for project state synchronization between MCP session and CLI config.

This test validates the fix for GitHub issue #148 where MCP session and CLI commands
had inconsistent project state, causing "Project not found" errors and edit failures.

The test simulates the exact workflow reported in the issue:
1. MCP server starts with a default project
2. Default project is changed via CLI/API
3. MCP tools should immediately use the new project (no restart needed)
4. All operations should work consistently in the new project context
"""

import pytest
from fastmcp import Client


@pytest.mark.asyncio
async def test_project_state_sync_after_default_change(
    mcp_server, app, config_manager, test_project, tmp_path
):
    """Test that MCP session stays in sync when default project is changed."""

    async with Client(mcp_server) as client:
        # Step 1: Create a second project that we can switch to
        create_result = await client.call_tool(
            "create_memory_project",
            {
                "project_name": "minerva",
                "project_path": str(tmp_path.parent / (tmp_path.name + "-projects") / "minerva"),
                "set_default": False,  # Don't set as default yet
            },
        )
        assert len(create_result.content) == 1
        assert "✓" in create_result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]
        assert "minerva" in create_result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]

        # Step 2: Test that note operations work in the new project context
        # This validates that the identifier resolution works correctly
        write_result = await client.call_tool(
            "write_note",
            {
                "project": "minerva",
                "title": "Test Consistency Note",
                "folder": "test",
                "content": "# Test Note\n\nThis note tests project state consistency.\n\n- [test] Project state sync working",
                "tags": "test,consistency",
            },
        )
        assert len(write_result.content) == 1
        assert "Test Consistency Note" in write_result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]

        # Step 3: Test that we can read the note we just created
        read_result = await client.call_tool(
            "read_note", {"project": "minerva", "identifier": "Test Consistency Note"}
        )
        assert len(read_result.content) == 1
        assert "Test Consistency Note" in read_result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]
        assert "project state sync working" in read_result.content[0].text.lower()  # pyright: ignore [reportAttributeAccessIssue]

        # Step 4: Test that edit operations work (this was failing in the original issue)
        edit_result = await client.call_tool(
            "edit_note",
            {
                "project": "minerva",
                "identifier": "Test Consistency Note",
                "operation": "append",
                "content": "\n\n## Update\n\nEdit operation successful after project switch!",
            },
        )
        assert len(edit_result.content) == 1
        assert (
            "added" in edit_result.content[0].text.lower()  # pyright: ignore [reportAttributeAccessIssue]
            and "lines" in edit_result.content[0].text.lower()  # pyright: ignore [reportAttributeAccessIssue]
        )

        # Step 5: Verify the edit was applied
        final_read_result = await client.call_tool(
            "read_note", {"project": "minerva", "identifier": "Test Consistency Note"}
        )
        assert len(final_read_result.content) == 1
        final_content = final_read_result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]
        assert "Edit operation successful" in final_content

```

--------------------------------------------------------------------------------
/tests/mcp/test_mcp_container.py:
--------------------------------------------------------------------------------

```python
"""Tests for MCP container composition root."""

import pytest

from basic_memory.mcp.container import (
    McpContainer,
    get_container,
    set_container,
)
from basic_memory.runtime import RuntimeMode


class TestMcpContainer:
    """Tests for McpContainer."""

    def test_create_from_config(self, app_config):
        """Container can be created from config manager."""
        container = McpContainer(config=app_config, mode=RuntimeMode.LOCAL)
        assert container.config == app_config
        assert container.mode == RuntimeMode.LOCAL

    def test_should_sync_files_when_enabled_local_mode(self, app_config):
        """Sync should be enabled in local mode when config says so."""
        app_config.sync_changes = True
        container = McpContainer(config=app_config, mode=RuntimeMode.LOCAL)
        assert container.should_sync_files is True

    def test_should_not_sync_files_when_disabled(self, app_config):
        """Sync should be disabled when config says so."""
        app_config.sync_changes = False
        container = McpContainer(config=app_config, mode=RuntimeMode.LOCAL)
        assert container.should_sync_files is False

    def test_should_not_sync_files_in_test_mode(self, app_config):
        """Sync should be disabled in test mode regardless of config."""
        app_config.sync_changes = True
        container = McpContainer(config=app_config, mode=RuntimeMode.TEST)
        assert container.should_sync_files is False

    def test_should_not_sync_files_in_cloud_mode(self, app_config):
        """Sync should be disabled in cloud mode (cloud handles sync differently)."""
        app_config.sync_changes = True
        container = McpContainer(config=app_config, mode=RuntimeMode.CLOUD)
        assert container.should_sync_files is False


class TestSyncSkipReason:
    """Tests for sync_skip_reason property."""

    def test_skip_reason_in_test_mode(self, app_config):
        """Returns test message when in test mode."""
        container = McpContainer(config=app_config, mode=RuntimeMode.TEST)
        assert container.sync_skip_reason == "Test environment detected"

    def test_skip_reason_in_cloud_mode(self, app_config):
        """Returns cloud message when in cloud mode."""
        container = McpContainer(config=app_config, mode=RuntimeMode.CLOUD)
        assert container.sync_skip_reason == "Cloud mode enabled"

    def test_skip_reason_when_sync_disabled(self, app_config):
        """Returns disabled message when sync is disabled."""
        app_config.sync_changes = False
        container = McpContainer(config=app_config, mode=RuntimeMode.LOCAL)
        assert container.sync_skip_reason == "Sync changes disabled"

    def test_no_skip_reason_when_should_sync(self, app_config):
        """Returns None when sync should run."""
        app_config.sync_changes = True
        container = McpContainer(config=app_config, mode=RuntimeMode.LOCAL)
        assert container.sync_skip_reason is None


class TestContainerAccessors:
    """Tests for container get/set functions."""

    def test_get_container_raises_when_not_set(self, monkeypatch):
        """get_container raises RuntimeError when container not initialized."""
        import basic_memory.mcp.container as container_module

        monkeypatch.setattr(container_module, "_container", None)

        with pytest.raises(RuntimeError, match="MCP container not initialized"):
            get_container()

    def test_set_and_get_container(self, app_config, monkeypatch):
        """set_container allows get_container to return the container."""
        import basic_memory.mcp.container as container_module

        container = McpContainer(config=app_config, mode=RuntimeMode.LOCAL)
        monkeypatch.setattr(container_module, "_container", None)

        set_container(container)
        assert get_container() is container

```

--------------------------------------------------------------------------------
/src/basic_memory/cli/commands/cloud/bisync_commands.py:
--------------------------------------------------------------------------------

```python
"""Cloud bisync utility functions for Basic Memory CLI."""

from pathlib import Path

from basic_memory.cli.commands.cloud.api_client import make_api_request
from basic_memory.config import ConfigManager
from basic_memory.ignore_utils import create_default_bmignore, get_bmignore_path
from basic_memory.schemas.cloud import MountCredentials, TenantMountInfo


class BisyncError(Exception):
    """Exception raised for bisync-related errors."""

    pass


async def get_mount_info() -> TenantMountInfo:
    """Get current tenant information from cloud API."""
    try:
        config_manager = ConfigManager()
        config = config_manager.config
        host_url = config.cloud_host.rstrip("/")

        response = await make_api_request(method="GET", url=f"{host_url}/tenant/mount/info")

        return TenantMountInfo.model_validate(response.json())
    except Exception as e:
        raise BisyncError(f"Failed to get tenant info: {e}") from e


async def generate_mount_credentials(tenant_id: str) -> MountCredentials:
    """Generate scoped credentials for syncing."""
    try:
        config_manager = ConfigManager()
        config = config_manager.config
        host_url = config.cloud_host.rstrip("/")

        response = await make_api_request(method="POST", url=f"{host_url}/tenant/mount/credentials")

        return MountCredentials.model_validate(response.json())
    except Exception as e:
        raise BisyncError(f"Failed to generate credentials: {e}") from e


def convert_bmignore_to_rclone_filters() -> Path:
    """Convert .bmignore patterns to rclone filter format.

    Reads ~/.basic-memory/.bmignore (gitignore-style) and converts to
    ~/.basic-memory/.bmignore.rclone (rclone filter format).

    Only regenerates if .bmignore has been modified since last conversion.

    Returns:
        Path to converted rclone filter file
    """
    # Ensure .bmignore exists
    create_default_bmignore()

    bmignore_path = get_bmignore_path()
    # Create rclone filter path: ~/.basic-memory/.bmignore -> ~/.basic-memory/.bmignore.rclone
    rclone_filter_path = bmignore_path.parent / f"{bmignore_path.name}.rclone"

    # Skip regeneration if rclone file is newer than bmignore
    if rclone_filter_path.exists():
        bmignore_mtime = bmignore_path.stat().st_mtime
        rclone_mtime = rclone_filter_path.stat().st_mtime
        if rclone_mtime >= bmignore_mtime:
            return rclone_filter_path

    # Read .bmignore patterns
    patterns = []
    try:
        with bmignore_path.open("r", encoding="utf-8") as f:
            for line in f:
                line = line.strip()
                # Keep comments and empty lines
                if not line or line.startswith("#"):
                    patterns.append(line)
                    continue

                # Convert gitignore pattern to rclone filter syntax
                # gitignore: node_modules  → rclone: - node_modules/**
                # gitignore: *.pyc        → rclone: - *.pyc
                if "*" in line:
                    # Pattern already has wildcard, just add exclude prefix
                    patterns.append(f"- {line}")
                else:
                    # Directory pattern - add /** for recursive exclude
                    patterns.append(f"- {line}/**")

    except Exception:
        # If we can't read the file, create a minimal filter
        patterns = ["# Error reading .bmignore, using minimal filters", "- .git/**"]

    # Write rclone filter file
    rclone_filter_path.write_text("\n".join(patterns) + "\n")

    return rclone_filter_path


def get_bisync_filter_path() -> Path:
    """Get path to bisync filter file.

    Uses ~/.basic-memory/.bmignore (converted to rclone format).
    The file is automatically created with default patterns on first use.

    Returns:
        Path to rclone filter file
    """
    return convert_bmignore_to_rclone_filters()

```

--------------------------------------------------------------------------------
/tests/utils/test_timezone_utils.py:
--------------------------------------------------------------------------------

```python
"""Tests for timezone utilities."""

from datetime import datetime, timezone


from basic_memory.utils import ensure_timezone_aware


class TestEnsureTimezoneAware:
    """Tests for ensure_timezone_aware function."""

    def test_already_timezone_aware_returns_unchanged(self):
        """Timezone-aware datetime should be returned unchanged."""
        dt = datetime(2024, 1, 15, 12, 30, 0, tzinfo=timezone.utc)
        result = ensure_timezone_aware(dt)
        assert result == dt
        assert result.tzinfo == timezone.utc

    def test_naive_datetime_cloud_mode_true_interprets_as_utc(self):
        """In cloud mode, naive datetimes should be interpreted as UTC."""
        naive_dt = datetime(2024, 1, 15, 12, 30, 0)
        result = ensure_timezone_aware(naive_dt, cloud_mode=True)

        # Should have UTC timezone
        assert result.tzinfo == timezone.utc
        # Time values should be unchanged (just tagged as UTC)
        assert result.year == 2024
        assert result.month == 1
        assert result.day == 15
        assert result.hour == 12
        assert result.minute == 30

    def test_naive_datetime_cloud_mode_false_interprets_as_local(self):
        """In local mode, naive datetimes should be interpreted as local time."""
        naive_dt = datetime(2024, 1, 15, 12, 30, 0)
        result = ensure_timezone_aware(naive_dt, cloud_mode=False)

        # Should have some timezone info (local)
        assert result.tzinfo is not None
        # The datetime should be converted to local timezone
        # We can't assert exact timezone as it depends on system

    def test_cloud_mode_true_does_not_shift_time(self):
        """Cloud mode should use replace() not astimezone() - time values unchanged."""
        naive_dt = datetime(2024, 6, 15, 18, 0, 0)  # Summer time
        result = ensure_timezone_aware(naive_dt, cloud_mode=True)

        # Hour should remain 18, not be shifted by timezone offset
        assert result.hour == 18
        assert result.tzinfo == timezone.utc

    def test_explicit_cloud_mode_skips_config_loading(self):
        """When cloud_mode is explicitly passed, config should not be loaded."""
        # This test verifies we can call ensure_timezone_aware without
        # triggering ConfigManager import when cloud_mode is explicit
        naive_dt = datetime(2024, 1, 15, 12, 30, 0)

        # Should work without any config setup
        result_cloud = ensure_timezone_aware(naive_dt, cloud_mode=True)
        assert result_cloud.tzinfo == timezone.utc

        result_local = ensure_timezone_aware(naive_dt, cloud_mode=False)
        assert result_local.tzinfo is not None

    def test_none_cloud_mode_falls_back_to_config(self, config_manager):
        """When cloud_mode is None, should load from config."""
        naive_dt = datetime(2024, 1, 15, 12, 30, 0)
        # Use the real config file (via test fixtures) rather than mocking.
        cfg = config_manager.config
        cfg.cloud_mode = True
        config_manager.save_config(cfg)

        result = ensure_timezone_aware(naive_dt, cloud_mode=None)

        # Should have used cloud mode (UTC)
        assert result.tzinfo == timezone.utc

    def test_asyncpg_naive_utc_scenario(self):
        """Simulate asyncpg returning naive datetime that's actually UTC.

        asyncpg binary protocol returns timestamps in UTC but as naive datetimes.
        In cloud mode, we interpret these as UTC rather than local time.
        """
        # Simulate what asyncpg returns: a naive datetime that's actually UTC
        asyncpg_result = datetime(2024, 1, 15, 18, 30, 0)  # 6:30 PM UTC

        # In cloud mode, interpret as UTC
        cloud_result = ensure_timezone_aware(asyncpg_result, cloud_mode=True)
        assert cloud_result == datetime(2024, 1, 15, 18, 30, 0, tzinfo=timezone.utc)

        # The hour should remain 18, not shifted
        assert cloud_result.hour == 18

```

--------------------------------------------------------------------------------
/tests/markdown/test_relation_edge_cases.py:
--------------------------------------------------------------------------------

```python
"""Tests for edge cases in relation parsing."""

from markdown_it import MarkdownIt

from basic_memory.markdown.plugins import relation_plugin, parse_relation, parse_inline_relations
from basic_memory.markdown.schemas import Relation


def test_empty_targets():
    """Test handling of empty targets."""
    md = MarkdownIt().use(relation_plugin)

    # Empty brackets
    tokens = md.parse("- type [[]]")
    token = next(t for t in tokens if t.type == "inline")
    assert parse_relation(token) is None

    # Only spaces
    tokens = md.parse("- type [[ ]]")
    token = next(t for t in tokens if t.type == "inline")
    assert parse_relation(token) is None

    # Whitespace in brackets
    tokens = md.parse("- type [[   ]]")
    token = next(t for t in tokens if t.type == "inline")
    assert parse_relation(token) is None


def test_malformed_links():
    """Test handling of malformed wiki links."""
    md = MarkdownIt().use(relation_plugin)

    # Missing close brackets
    tokens = md.parse("- type [[Target")
    assert not any(t.meta and "relations" in t.meta for t in tokens)

    # Missing open brackets
    tokens = md.parse("- type Target]]")
    assert not any(t.meta and "relations" in t.meta for t in tokens)

    # Backwards brackets
    tokens = md.parse("- type ]]Target[[")
    assert not any(t.meta and "relations" in t.meta for t in tokens)

    # Nested brackets
    tokens = md.parse("- type [[Outer [[Inner]] ]]")
    token = next(t for t in tokens if t.type == "inline")
    rel = parse_relation(token)
    assert rel is not None
    assert "Outer" in rel["target"]


def test_context_handling():
    """Test handling of contexts."""
    md = MarkdownIt().use(relation_plugin)

    # Unclosed context
    tokens = md.parse("- type [[Target]] (unclosed")
    token = next(t for t in tokens if t.type == "inline")
    rel = parse_relation(token)
    assert rel["context"] is None

    # Multiple parens
    tokens = md.parse("- type [[Target]] (with (nested) parens)")
    token = next(t for t in tokens if t.type == "inline")
    rel = parse_relation(token)
    assert rel["context"] == "with (nested) parens"

    # Empty context
    tokens = md.parse("- type [[Target]] ()")
    token = next(t for t in tokens if t.type == "inline")
    rel = parse_relation(token)
    assert rel["context"] is None


def test_inline_relations():
    """Test inline relation detection."""
    md = MarkdownIt().use(relation_plugin)

    # Multiple links in text
    text = "Text with [[Link1]] and [[Link2]] and [[Link3]]"
    rels = parse_inline_relations(text)
    assert len(rels) == 3
    assert {r["target"] for r in rels} == {"Link1", "Link2", "Link3"}

    # Links with surrounding text
    text = "Before [[Target]] After"
    rels = parse_inline_relations(text)
    assert len(rels) == 1
    assert rels[0]["target"] == "Target"

    # Multiple links on same line
    tokens = md.parse("[[One]] [[Two]] [[Three]]")
    token = next(t for t in tokens if t.type == "inline")
    assert len(token.meta["relations"]) == 3


def test_unicode_targets():
    """Test handling of Unicode in targets."""
    md = MarkdownIt().use(relation_plugin)

    # Unicode in target
    tokens = md.parse("- type [[测试]]")
    token = next(t for t in tokens if t.type == "inline")
    rel = parse_relation(token)
    assert rel["target"] == "测试"

    # Unicode in type
    tokens = md.parse("- 使用 [[Target]]")
    token = next(t for t in tokens if t.type == "inline")
    rel = parse_relation(token)
    assert rel["type"] == "使用"

    # Unicode in context
    tokens = md.parse("- type [[Target]] (测试)")
    token = next(t for t in tokens if t.type == "inline")
    rel = parse_relation(token)
    assert rel["context"] == "测试"

    # Model validation with Unicode
    relation = Relation.model_validate(rel)
    assert relation.type == "type"
    assert relation.target == "Target"
    assert relation.context == "测试"

```

--------------------------------------------------------------------------------
/src/basic_memory/schemas/search.py:
--------------------------------------------------------------------------------

```python
"""Search schemas for Basic Memory.

The search system supports three primary modes:
1. Exact permalink lookup
2. Pattern matching with *
3. Full-text search across content
"""

from typing import Optional, List, Union
from datetime import datetime
from enum import Enum
from pydantic import BaseModel, field_validator

from basic_memory.schemas.base import Permalink


class SearchItemType(str, Enum):
    """Types of searchable items."""

    ENTITY = "entity"
    OBSERVATION = "observation"
    RELATION = "relation"


class SearchQuery(BaseModel):
    """Search query parameters.

    Use ONE of these primary search modes:
    - permalink: Exact permalink match
    - permalink_match: Path pattern with *
    - text: Full-text search of title/content (supports boolean operators: AND, OR, NOT)

    Optionally filter results by:
    - types: Limit to specific item types
    - entity_types: Limit to specific entity types
    - after_date: Only items after date

    Boolean search examples:
    - "python AND flask" - Find items with both terms
    - "python OR django" - Find items with either term
    - "python NOT django" - Find items with python but not django
    - "(python OR flask) AND web" - Use parentheses for grouping
    """

    # Primary search modes (use ONE of these)
    permalink: Optional[str] = None  # Exact permalink match
    permalink_match: Optional[str] = None  # Glob permalink match
    text: Optional[str] = None  # Full-text search (now supports boolean operators)
    title: Optional[str] = None  # title only search

    # Optional filters
    types: Optional[List[str]] = None  # Filter by type
    entity_types: Optional[List[SearchItemType]] = None  # Filter by entity type
    after_date: Optional[Union[datetime, str]] = None  # Time-based filter

    @field_validator("after_date")
    @classmethod
    def validate_date(cls, v: Optional[Union[datetime, str]]) -> Optional[str]:
        """Convert datetime to ISO format if needed."""
        if isinstance(v, datetime):
            return v.isoformat()
        return v

    def no_criteria(self) -> bool:
        return (
            self.permalink is None
            and self.permalink_match is None
            and self.title is None
            and self.text is None
            and self.after_date is None
            and self.types is None
            and self.entity_types is None
        )

    def has_boolean_operators(self) -> bool:
        """Check if the text query contains boolean operators (AND, OR, NOT)."""
        if not self.text:  # pragma: no cover
            return False

        # Check for common boolean operators with correct word boundaries
        # to avoid matching substrings like "GRAND" containing "AND"
        boolean_patterns = [" AND ", " OR ", " NOT ", "(", ")"]
        text = f" {self.text} "  # Add spaces to ensure we match word boundaries
        return any(pattern in text for pattern in boolean_patterns)


class SearchResult(BaseModel):
    """Search result with score and metadata."""

    title: str
    type: SearchItemType
    score: float
    entity: Optional[Permalink] = None
    permalink: Optional[str]
    content: Optional[str] = None
    file_path: str

    metadata: Optional[dict] = None

    # IDs for v2 API consistency
    entity_id: Optional[int] = None  # Entity ID (always present for entities)
    observation_id: Optional[int] = None  # Observation ID (for observation results)
    relation_id: Optional[int] = None  # Relation ID (for relation results)

    # Type-specific fields
    category: Optional[str] = None  # For observations
    from_entity: Optional[Permalink] = None  # For relations
    to_entity: Optional[Permalink] = None  # For relations
    relation_type: Optional[str] = None  # For relations


class SearchResponse(BaseModel):
    """Wrapper for search results."""

    results: List[SearchResult]
    current_page: int
    page_size: int

```

--------------------------------------------------------------------------------
/tests/api/v2/test_directory_router.py:
--------------------------------------------------------------------------------

```python
"""Tests for V2 directory API routes (ID-based endpoints)."""

import pytest
from httpx import AsyncClient

from basic_memory.models import Project
from basic_memory.schemas.directory import DirectoryNode


@pytest.mark.asyncio
async def test_get_directory_tree(
    client: AsyncClient,
    test_project: Project,
    v2_project_url: str,
):
    """Test getting directory tree via v2 endpoint."""
    response = await client.get(f"{v2_project_url}/directory/tree")

    assert response.status_code == 200
    tree = DirectoryNode.model_validate(response.json())
    assert tree.type == "directory"


@pytest.mark.asyncio
async def test_get_directory_structure(
    client: AsyncClient,
    test_project: Project,
    v2_project_url: str,
):
    """Test getting directory structure (folders only) via v2 endpoint."""
    response = await client.get(f"{v2_project_url}/directory/structure")

    assert response.status_code == 200
    structure = DirectoryNode.model_validate(response.json())
    assert structure.type == "directory"
    # Structure should only contain directories, not files
    if structure.children:
        for child in structure.children:
            assert child.type == "directory"


@pytest.mark.asyncio
async def test_list_directory_default(
    client: AsyncClient,
    test_project: Project,
    v2_project_url: str,
):
    """Test listing directory contents with default parameters via v2 endpoint."""
    response = await client.get(f"{v2_project_url}/directory/list")

    assert response.status_code == 200
    nodes = response.json()
    assert isinstance(nodes, list)


@pytest.mark.asyncio
async def test_list_directory_with_depth(
    client: AsyncClient,
    test_project: Project,
    v2_project_url: str,
):
    """Test listing directory with custom depth via v2 endpoint."""
    response = await client.get(f"{v2_project_url}/directory/list?depth=2")

    assert response.status_code == 200
    nodes = response.json()
    assert isinstance(nodes, list)


@pytest.mark.asyncio
async def test_list_directory_with_glob(
    client: AsyncClient,
    test_project: Project,
    v2_project_url: str,
):
    """Test listing directory with file name glob filter via v2 endpoint."""
    response = await client.get(f"{v2_project_url}/directory/list?file_name_glob=*.md")

    assert response.status_code == 200
    nodes = response.json()
    assert isinstance(nodes, list)
    # All file nodes should have .md extension
    for node in nodes:
        if node.get("type") == "file":
            assert node.get("path", "").endswith(".md")


@pytest.mark.asyncio
async def test_list_directory_with_custom_path(
    client: AsyncClient,
    test_project: Project,
    v2_project_url: str,
):
    """Test listing a specific directory path via v2 endpoint."""
    response = await client.get(f"{v2_project_url}/directory/list?dir_name=/")

    assert response.status_code == 200
    nodes = response.json()
    assert isinstance(nodes, list)


@pytest.mark.asyncio
async def test_directory_invalid_project_id(
    client: AsyncClient,
):
    """Test directory endpoints with invalid project ID return 404."""
    # Test tree endpoint
    response = await client.get("/v2/projects/999999/directory/tree")
    assert response.status_code == 404

    # Test structure endpoint
    response = await client.get("/v2/projects/999999/directory/structure")
    assert response.status_code == 404

    # Test list endpoint
    response = await client.get("/v2/projects/999999/directory/list")
    assert response.status_code == 404


@pytest.mark.asyncio
async def test_v2_directory_endpoints_use_project_id_not_name(
    client: AsyncClient, test_project: Project
):
    """Verify v2 directory endpoints require project ID, not name."""
    # Try using project name instead of ID - should fail
    response = await client.get(f"/v2/projects/{test_project.name}/directory/tree")

    # Should get validation error or 404 because name is not a valid integer
    assert response.status_code in [404, 422]

```

--------------------------------------------------------------------------------
/test-int/BENCHMARKS.md:
--------------------------------------------------------------------------------

```markdown
# Performance Benchmarks

This directory contains performance benchmark tests for Basic Memory's sync/indexing operations.

## Purpose

These benchmarks measure baseline performance to track improvements from optimizations. They are particularly important for:
- Cloud deployments with ephemeral databases that need fast re-indexing
- Large repositories (100s to 1000s of files)
- Validating optimization efforts

## Running Benchmarks

### Run all benchmarks (excluding slow ones)
```bash
pytest test-int/test_sync_performance_benchmark.py -v -m "benchmark and not slow"
```

### Run specific benchmark
```bash
# 100 files (fast, ~10-30 seconds)
pytest test-int/test_sync_performance_benchmark.py::test_benchmark_sync_100_files -v

# 500 files (medium, ~1-3 minutes)
pytest test-int/test_sync_performance_benchmark.py::test_benchmark_sync_500_files -v

# 1000 files (slow, ~3-10 minutes)
pytest test-int/test_sync_performance_benchmark.py::test_benchmark_sync_1000_files -v

# Re-sync with no changes (tests scan performance)
pytest test-int/test_sync_performance_benchmark.py::test_benchmark_resync_no_changes -v
```

### Run all benchmarks including slow ones
```bash
pytest test-int/test_sync_performance_benchmark.py -v -m benchmark
```

### Skip benchmarks in regular test runs
```bash
pytest -m "not benchmark"
```

## Benchmark Output

Each benchmark provides detailed metrics including:

- **Performance Metrics**:
  - Total sync time
  - Files processed per second
  - Milliseconds per file

- **Database Metrics**:
  - Initial database size
  - Final database size
  - Database growth (total and per file)

- **Operation Counts**:
  - New files indexed
  - Modified files processed
  - Deleted files handled
  - Moved files tracked

## Example Output

```
======================================================================
BENCHMARK: Sync 100 files (small repository)
======================================================================

Generating 100 test files...
  Created files 0-100 (100/100)
  File generation completed in 0.15s (666.7 files/sec)

Initial database size: 120.00 KB

Starting sync of 100 files...

----------------------------------------------------------------------
RESULTS:
----------------------------------------------------------------------
Files processed:      100
  New:                100
  Modified:           0
  Deleted:            0
  Moved:              0

Performance:
  Total time:         12.34s
  Files/sec:          8.1
  ms/file:            123.4

Database:
  Initial size:       120.00 KB
  Final size:         5.23 MB
  Growth:             5.11 MB
  Growth per file:    52.31 KB
======================================================================
```

## Interpreting Results

### Good Performance Indicators
- **Files/sec > 10**: Good indexing speed for small-medium repos
- **Files/sec > 5**: Acceptable for large repos with complex relations
- **DB growth < 100KB per file**: Reasonable index size

### Areas for Improvement
- **Files/sec < 5**: May benefit from batch operations
- **ms/file > 200**: High latency per file, check for N+1 queries
- **DB growth > 200KB per file**: Search index may be bloated (trigrams?)

## Tracking Improvements

Before making optimizations:
1. Run benchmarks to establish baseline
2. Save output for comparison
3. Note any particular pain points (e.g., slow search indexing)

After optimizations:
1. Run the same benchmarks
2. Compare metrics:
   - Files/sec should increase
   - ms/file should decrease
   - DB growth per file may decrease (with search optimizations)
3. Document improvements in PR

## Related Issues

- [#351: Performance: Optimize sync/indexing for cloud deployments](https://github.com/basicmachines-co/basic-memory/issues/351)

## Test File Generation

Benchmarks generate realistic markdown files with:
- YAML frontmatter with tags
- 3-10 observations per file with categories
- 1-3 relations per file (including forward references)
- Varying content to simulate real usage
- Files organized in category subdirectories

```

--------------------------------------------------------------------------------
/src/basic_memory/alembic/versions/cc7172b46608_update_search_index_schema.py:
--------------------------------------------------------------------------------

```python
"""Update search index schema

Revision ID: cc7172b46608
Revises: 502b60eaa905
Create Date: 2025-02-28 18:48:23.244941

"""

from typing import Sequence, Union

from alembic import op


# revision identifiers, used by Alembic.
revision: str = "cc7172b46608"
down_revision: Union[str, None] = "502b60eaa905"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
    """Upgrade database schema to use new search index with content_stems and content_snippet."""

    # This migration is SQLite-specific (FTS5 virtual tables)
    # For Postgres, the search_index table is created via ORM models
    connection = op.get_bind()
    if connection.dialect.name != "sqlite":
        return

    # First, drop the existing search_index table
    op.execute("DROP TABLE IF EXISTS search_index")

    # Create new search_index with updated schema
    op.execute("""
    CREATE VIRTUAL TABLE IF NOT EXISTS search_index USING fts5(
        -- Core entity fields
        id UNINDEXED,          -- Row ID
        title,                 -- Title for searching
        content_stems,         -- Main searchable content split into stems
        content_snippet,       -- File content snippet for display
        permalink,             -- Stable identifier (now indexed for path search)
        file_path UNINDEXED,   -- Physical location
        type UNINDEXED,        -- entity/relation/observation
        
        -- Relation fields 
        from_id UNINDEXED,     -- Source entity
        to_id UNINDEXED,       -- Target entity
        relation_type UNINDEXED, -- Type of relation
        
        -- Observation fields
        entity_id UNINDEXED,   -- Parent entity
        category UNINDEXED,    -- Observation category
        
        -- Common fields
        metadata UNINDEXED,    -- JSON metadata
        created_at UNINDEXED,  -- Creation timestamp
        updated_at UNINDEXED,  -- Last update
        
        -- Configuration
        tokenize='unicode61 tokenchars 0x2F',  -- Hex code for /
        prefix='1,2,3,4'                    -- Support longer prefixes for paths
    );
    """)


def downgrade() -> None:
    """Downgrade database schema to use old search index."""

    # This migration is SQLite-specific (FTS5 virtual tables)
    # For Postgres, the search_index table is managed via ORM models
    connection = op.get_bind()
    if connection.dialect.name != "sqlite":
        return

    # Drop the updated search_index table
    op.execute("DROP TABLE IF EXISTS search_index")

    # Recreate the original search_index schema
    op.execute("""
    CREATE VIRTUAL TABLE IF NOT EXISTS search_index USING fts5(
        -- Core entity fields
        id UNINDEXED,          -- Row ID
        title,                 -- Title for searching
        content,               -- Main searchable content
        permalink,             -- Stable identifier (now indexed for path search)
        file_path UNINDEXED,   -- Physical location
        type UNINDEXED,        -- entity/relation/observation
        
        -- Relation fields 
        from_id UNINDEXED,     -- Source entity
        to_id UNINDEXED,       -- Target entity
        relation_type UNINDEXED, -- Type of relation
        
        -- Observation fields
        entity_id UNINDEXED,   -- Parent entity
        category UNINDEXED,    -- Observation category
        
        -- Common fields
        metadata UNINDEXED,    -- JSON metadata
        created_at UNINDEXED,  -- Creation timestamp
        updated_at UNINDEXED,  -- Last update
        
        -- Configuration
        tokenize='unicode61 tokenchars 0x2F',  -- Hex code for /
        prefix='1,2,3,4'                    -- Support longer prefixes for paths
    );
    """)

    # Print instruction to manually reindex after migration
    print("\n------------------------------------------------------------------")
    print("IMPORTANT: After downgrade completes, manually run the reindex command:")
    print("basic-memory sync")
    print("------------------------------------------------------------------\n")

```

--------------------------------------------------------------------------------
/tests/utils/test_permalink_formatting.py:
--------------------------------------------------------------------------------

```python
"""Test permalink formatting during sync."""

from pathlib import Path

import pytest

from basic_memory.config import ProjectConfig
from basic_memory.services import EntityService
from basic_memory.sync.sync_service import SyncService
from basic_memory.utils import generate_permalink


async def create_test_file(path: Path, content: str = "test content") -> None:
    """Create a test file with given content."""
    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(content)


@pytest.mark.asyncio
async def test_permalink_formatting(
    sync_service: SyncService, project_config: ProjectConfig, entity_service: EntityService
):
    """Test that permalinks are properly formatted during sync.

    This ensures:
    - Underscores are converted to hyphens
    - Spaces are converted to hyphens
    - Mixed case is lowercased
    - Directory structure is preserved
    - Multiple directories work correctly
    """
    project_dir = project_config.home

    # Test cases with different filename formats
    test_cases = [
        # filename -> expected permalink
        ("my_awesome_feature.md", "my-awesome-feature"),
        ("MIXED_CASE_NAME.md", "mixed-case-name"),
        ("spaces and_underscores.md", "spaces-and-underscores"),
        ("design/model_refactor.md", "design/model-refactor"),
        (
            "test/multiple_word_directory/feature_name.md",
            "test/multiple-word-directory/feature-name",
        ),
    ]

    # Create test files
    for filename, _ in test_cases:
        content = """
---
type: knowledge
created: 2024-01-01
modified: 2024-01-01
---
# Test File

Testing permalink generation.
"""
        await create_test_file(project_dir / filename, content)

    # Run sync
    await sync_service.sync(project_config.home)

    # Verify permalinks
    for filename, expected_permalink in test_cases:
        entity = await entity_service.repository.get_by_file_path(filename)
        assert entity.permalink == expected_permalink, (
            f"File {filename} should have permalink {expected_permalink}"
        )


@pytest.mark.parametrize(
    "input_path, expected",
    [
        ("test/Über File.md", "test/uber-file"),
        ("docs/résumé.md", "docs/resume"),
        ("notes/Déjà vu.md", "notes/deja-vu"),
        ("papers/Jürgen's Findings.md", "papers/jurgens-findings"),
        ("archive/François Müller.md", "archive/francois-muller"),
        ("research/Søren Kierkegård.md", "research/soren-kierkegard"),
        ("articles/El Niño.md", "articles/el-nino"),
        ("ArticlesElNiño.md", "articles-el-nino"),
        ("articleselniño.md", "articleselnino"),
        ("articles-El-Niño.md", "articles-el-nino"),
    ],
)
def test_latin_accents_transliteration(input_path, expected):
    """Test that Latin letters with accents are properly transliterated."""
    assert generate_permalink(input_path) == expected


@pytest.mark.parametrize(
    "input_path, expected",
    [
        ("中文/测试文档.md", "中文/测试文档"),
        ("notes/北京市.md", "notes/北京市"),
        ("research/上海简介.md", "research/上海简介"),
        ("docs/中文 English Mixed.md", "docs/中文-english-mixed"),
        ("articles/东京Tokyo混合.md", "articles/东京-tokyo-混合"),
        ("papers/汉字_underscore_test.md", "papers/汉字-underscore-test"),
        ("projects/中文CamelCase测试.md", "projects/中文-camel-case-测试"),
    ],
)
def test_chinese_character_preservation(input_path, expected):
    """Test that Chinese characters are preserved in permalinks."""
    assert generate_permalink(input_path) == expected


@pytest.mark.parametrize(
    "input_path, expected",
    [
        ("mixed/北京Café.md", "mixed/北京-cafe"),
        ("notes/东京Tōkyō.md", "notes/东京-tokyo"),
        ("research/München中文.md", "research/munchen-中文"),
        ("docs/Über测试.md", "docs/uber-测试"),
        ("complex/北京Beijing上海Shanghai.md", "complex/北京-beijing-上海-shanghai"),
        ("special/中文!@#$%^&*()_+.md", "special/中文"),
        ("punctuation/你好,世界!.md", "punctuation/你好世界"),
    ],
)
def test_mixed_character_sets(input_path, expected):
    """Test handling of mixed character sets and edge cases."""
    assert generate_permalink(input_path) == expected

```

--------------------------------------------------------------------------------
/tests/importers/test_importer_base.py:
--------------------------------------------------------------------------------

```python
"""Tests for the base importer class."""

import pytest

from basic_memory.importers.base import Importer
from basic_memory.markdown.entity_parser import EntityParser
from basic_memory.markdown.markdown_processor import MarkdownProcessor
from basic_memory.markdown.schemas import EntityFrontmatter, EntityMarkdown
from basic_memory.schemas.importer import ImportResult
from basic_memory.services.file_service import FileService


# Create a concrete implementation of the abstract class for testing
class ConcreteTestImporter(Importer[ImportResult]):
    """Test implementation of Importer base class."""

    async def import_data(self, source_data, destination_folder: str, **kwargs):
        """Implement the abstract method for testing."""
        try:
            # Test implementation that returns success
            await self.ensure_folder_exists(destination_folder)
            return ImportResult(
                import_count={"files": 1},
                success=True,
                error_message=None,
            )
        except Exception as e:
            return self.handle_error("Test import failed", e)

    def handle_error(self, message: str, error=None) -> ImportResult:
        """Implement the abstract handle_error method."""
        import logging

        logger = logging.getLogger(__name__)

        error_message = f"{message}"
        if error:
            error_message += f": {str(error)}"

        logger.error(error_message)
        return ImportResult(
            import_count={},
            success=False,
            error_message=error_message,
        )


@pytest.fixture
def test_importer(tmp_path):
    """Create a ConcreteTestImporter instance for testing."""
    entity_parser = EntityParser(base_path=tmp_path)
    markdown_processor = MarkdownProcessor(entity_parser=entity_parser)
    file_service = FileService(base_path=tmp_path, markdown_processor=markdown_processor)
    return ConcreteTestImporter(tmp_path, markdown_processor, file_service)


@pytest.mark.asyncio
async def test_import_data_success(test_importer):
    """Test successful import_data implementation."""
    result = await test_importer.import_data({}, "test_folder")
    assert result.success
    assert result.import_count == {"files": 1}
    assert result.error_message is None

    assert (test_importer.base_path / "test_folder").exists()


@pytest.mark.asyncio
async def test_write_entity(test_importer, tmp_path):
    """Test write_entity method."""
    # Create test entity
    entity = EntityMarkdown(
        frontmatter=EntityFrontmatter(metadata={"title": "Test Entity", "type": "note"}),
        content="Test content",
        observations=[],
        relations=[],
    )

    # Call write_entity
    file_path = tmp_path / "test_entity.md"
    checksum = await test_importer.write_entity(entity, file_path)

    assert file_path.exists()
    assert len(checksum) == 64  # sha256 hex digest
    assert file_path.read_text(encoding="utf-8").strip() != ""


@pytest.mark.asyncio
async def test_ensure_folder_exists(test_importer):
    """Test ensure_folder_exists method."""
    # Test with simple folder - now passes relative path to FileService
    await test_importer.ensure_folder_exists("test_folder")
    assert (test_importer.base_path / "test_folder").exists()

    # Test with nested folder - FileService handles base_path resolution
    await test_importer.ensure_folder_exists("nested/folder/path")
    assert (test_importer.base_path / "nested/folder/path").exists()


@pytest.mark.asyncio
async def test_handle_error(test_importer):
    """Test handle_error method."""
    # Test with message only
    result = test_importer.handle_error("Test error message")
    assert not result.success
    assert result.error_message == "Test error message"
    assert result.import_count == {}

    # Test with message and exception
    test_exception = ValueError("Test exception")
    result = test_importer.handle_error("Error occurred", test_exception)
    assert not result.success
    assert "Error occurred" in result.error_message
    assert "Test exception" in result.error_message
    assert result.import_count == {}

```

--------------------------------------------------------------------------------
/tests/services/test_initialization.py:
--------------------------------------------------------------------------------

```python
"""Integration-style tests for the initialization service.

Goal: avoid brittle deep mocking; assert real behavior using the existing
test config + dual-backend fixtures.
"""

from __future__ import annotations

import pytest

from basic_memory import db
from basic_memory.config import BasicMemoryConfig, DatabaseBackend
from basic_memory.repository.project_repository import ProjectRepository
from basic_memory.services.initialization import (
    ensure_initialization,
    initialize_database,
    reconcile_projects_with_config,
)


@pytest.mark.asyncio
async def test_initialize_database_creates_engine_and_allows_queries(app_config: BasicMemoryConfig):
    await db.shutdown_db()
    try:
        await initialize_database(app_config)

        engine, session_maker = await db.get_or_create_db(app_config.database_path)
        assert engine is not None
        assert session_maker is not None

        # Smoke query on the initialized DB
        async with db.scoped_session(session_maker) as session:
            result = await session.execute(db.text("SELECT 1"))
            assert result.scalar() == 1
    finally:
        await db.shutdown_db()


@pytest.mark.asyncio
async def test_initialize_database_raises_on_invalid_postgres_config(
    app_config: BasicMemoryConfig, config_manager
):
    """If config selects Postgres but has no DATABASE_URL, initialization should fail."""
    await db.shutdown_db()
    try:
        bad_config = app_config.model_copy(
            update={"database_backend": DatabaseBackend.POSTGRES, "database_url": None}
        )
        config_manager.save_config(bad_config)

        with pytest.raises(ValueError):
            await initialize_database(bad_config)
    finally:
        await db.shutdown_db()


@pytest.mark.asyncio
async def test_reconcile_projects_with_config_creates_projects_and_default(
    app_config: BasicMemoryConfig, config_manager, config_home
):
    await db.shutdown_db()
    try:
        # Ensure the configured paths exist
        proj_a = config_home / "proj-a"
        proj_b = config_home / "proj-b"
        proj_a.mkdir(parents=True, exist_ok=True)
        proj_b.mkdir(parents=True, exist_ok=True)

        updated = app_config.model_copy(
            update={
                "projects": {"proj-a": str(proj_a), "proj-b": str(proj_b)},
                "default_project": "proj-b",
            }
        )
        config_manager.save_config(updated)

        # Real DB init + reconcile
        await initialize_database(updated)
        await reconcile_projects_with_config(updated)

        _, session_maker = await db.get_or_create_db(
            updated.database_path, db_type=db.DatabaseType.FILESYSTEM
        )
        repo = ProjectRepository(session_maker)

        active = await repo.get_active_projects()
        names = {p.name for p in active}
        assert names.issuperset({"proj-a", "proj-b"})

        default = await repo.get_default_project()
        assert default is not None
        assert default.name == "proj-b"
    finally:
        await db.shutdown_db()


@pytest.mark.asyncio
async def test_reconcile_projects_with_config_swallow_errors(
    monkeypatch, app_config: BasicMemoryConfig
):
    """reconcile_projects_with_config should not raise if ProjectService sync fails."""
    await db.shutdown_db()
    try:
        await initialize_database(app_config)

        async def boom(self):  # noqa: ANN001
            raise ValueError("Project synchronization error")

        monkeypatch.setattr(
            "basic_memory.services.project_service.ProjectService.synchronize_projects",
            boom,
        )

        # Should not raise
        await reconcile_projects_with_config(app_config)
    finally:
        await db.shutdown_db()


def test_ensure_initialization_runs_and_cleans_up(app_config: BasicMemoryConfig, config_manager):
    # ensure_initialization uses asyncio.run; keep this test synchronous.
    ensure_initialization(app_config)

    # Must be cleaned up to avoid hanging processes.
    assert db._engine is None  # pyright: ignore [reportPrivateUsage]
    assert db._session_maker is None  # pyright: ignore [reportPrivateUsage]

```

--------------------------------------------------------------------------------
/src/basic_memory/api/container.py:
--------------------------------------------------------------------------------

```python
"""API composition root for Basic Memory.

This container owns reading ConfigManager and environment variables for the
API entrypoint. Downstream modules receive config/dependencies explicitly
rather than reading globals.

Design principles:
- Only this module reads ConfigManager directly
- Runtime mode (cloud/local/test) is resolved here
- Factories for services are provided, not singletons
"""

from dataclasses import dataclass
from typing import TYPE_CHECKING

from sqlalchemy.ext.asyncio import AsyncEngine, async_sessionmaker, AsyncSession

from basic_memory import db
from basic_memory.config import BasicMemoryConfig, ConfigManager
from basic_memory.runtime import RuntimeMode, resolve_runtime_mode

if TYPE_CHECKING:  # pragma: no cover
    from basic_memory.sync import SyncCoordinator


@dataclass
class ApiContainer:
    """Composition root for the API entrypoint.

    Holds resolved configuration and runtime context.
    Created once at app startup, then used to wire dependencies.
    """

    config: BasicMemoryConfig
    mode: RuntimeMode

    # --- Database ---
    # Cached database connections (set during lifespan startup)
    engine: AsyncEngine | None = None
    session_maker: async_sessionmaker[AsyncSession] | None = None

    @classmethod
    def create(cls) -> "ApiContainer":  # pragma: no cover
        """Create container by reading ConfigManager.

        This is the single point where API reads global config.
        """
        config = ConfigManager().config
        mode = resolve_runtime_mode(
            cloud_mode_enabled=config.cloud_mode_enabled,
            is_test_env=config.is_test_env,
        )
        return cls(config=config, mode=mode)

    # --- Runtime Mode Properties ---

    @property
    def should_sync_files(self) -> bool:
        """Whether file sync should be started.

        Sync is enabled when:
        - sync_changes is True in config
        - Not in test mode (tests manage their own sync)
        """
        return self.config.sync_changes and not self.mode.is_test

    @property
    def sync_skip_reason(self) -> str | None:  # pragma: no cover
        """Reason why sync is skipped, or None if sync should run.

        Useful for logging why sync was disabled.
        """
        if self.mode.is_test:
            return "Test environment detected"
        if not self.config.sync_changes:
            return "Sync changes disabled"
        return None

    def create_sync_coordinator(self) -> "SyncCoordinator":  # pragma: no cover
        """Create a SyncCoordinator with this container's settings.

        Returns:
            SyncCoordinator configured for this runtime environment
        """
        # Deferred import to avoid circular dependency
        from basic_memory.sync import SyncCoordinator

        return SyncCoordinator(
            config=self.config,
            should_sync=self.should_sync_files,
            skip_reason=self.sync_skip_reason,
        )

    # --- Database Factory ---

    async def init_database(  # pragma: no cover
        self,
    ) -> tuple[AsyncEngine, async_sessionmaker[AsyncSession]]:
        """Initialize and cache database connections.

        Returns:
            Tuple of (engine, session_maker)
        """
        engine, session_maker = await db.get_or_create_db(self.config.database_path)
        self.engine = engine
        self.session_maker = session_maker
        return engine, session_maker

    async def shutdown_database(self) -> None:  # pragma: no cover
        """Clean up database connections."""
        await db.shutdown_db()


# Module-level container instance (set by lifespan)
# This allows deps.py to access the container without reading ConfigManager
_container: ApiContainer | None = None


def get_container() -> ApiContainer:
    """Get the current API container.

    Raises:
        RuntimeError: If container hasn't been initialized
    """
    if _container is None:
        raise RuntimeError("API container not initialized. Call set_container() first.")
    return _container


def set_container(container: ApiContainer) -> None:
    """Set the API container (called by lifespan)."""
    global _container
    _container = container

```

--------------------------------------------------------------------------------
/src/basic_memory/cli/commands/cloud/upload_command.py:
--------------------------------------------------------------------------------

```python
"""Upload CLI commands for basic-memory projects."""

from pathlib import Path

import typer
from rich.console import Console

from basic_memory.cli.app import cloud_app
from basic_memory.cli.commands.command_utils import run_with_cleanup
from basic_memory.cli.commands.cloud.cloud_utils import (
    create_cloud_project,
    project_exists,
    sync_project,
)
from basic_memory.cli.commands.cloud.upload import upload_path

console = Console()


@cloud_app.command("upload")
def upload(
    path: Path = typer.Argument(
        ...,
        help="Path to local file or directory to upload",
        exists=True,
        readable=True,
        resolve_path=True,
    ),
    project: str = typer.Option(
        ...,
        "--project",
        "-p",
        help="Cloud project name (destination)",
    ),
    create_project: bool = typer.Option(
        False,
        "--create-project",
        "-c",
        help="Create project if it doesn't exist",
    ),
    sync: bool = typer.Option(
        True,
        "--sync/--no-sync",
        help="Sync project after upload (default: true)",
    ),
    verbose: bool = typer.Option(
        False,
        "--verbose",
        "-v",
        help="Show detailed information about file filtering and upload",
    ),
    no_gitignore: bool = typer.Option(
        False,
        "--no-gitignore",
        help="Skip .gitignore patterns (still respects .bmignore)",
    ),
    dry_run: bool = typer.Option(
        False,
        "--dry-run",
        help="Show what would be uploaded without actually uploading",
    ),
) -> None:
    """Upload local files or directories to cloud project via WebDAV.

    Examples:
      bm cloud upload ~/my-notes --project research
      bm cloud upload notes.md --project research --create-project
      bm cloud upload ~/docs --project work --no-sync
      bm cloud upload ./history --project proto --verbose
      bm cloud upload ./notes --project work --no-gitignore
      bm cloud upload ./files --project test --dry-run
    """

    async def _upload():
        # Check if project exists
        if not await project_exists(project):
            if create_project:
                console.print(f"[blue]Creating cloud project '{project}'...[/blue]")
                try:
                    await create_cloud_project(project)
                    console.print(f"[green]Created project '{project}'[/green]")
                except Exception as e:
                    console.print(f"[red]Failed to create project: {e}[/red]")
                    raise typer.Exit(1)
            else:
                console.print(
                    f"[red]Project '{project}' does not exist.[/red]\n"
                    f"[yellow]Options:[/yellow]\n"
                    f"  1. Create it first: bm project add {project}\n"
                    f"  2. Use --create-project flag to create automatically"
                )
                raise typer.Exit(1)

        # Perform upload (or dry run)
        if dry_run:
            console.print(
                f"[yellow]DRY RUN: Showing what would be uploaded to '{project}'[/yellow]"
            )
        else:
            console.print(f"[blue]Uploading {path} to project '{project}'...[/blue]")

        success = await upload_path(
            path, project, verbose=verbose, use_gitignore=not no_gitignore, dry_run=dry_run
        )
        if not success:
            console.print("[red]Upload failed[/red]")
            raise typer.Exit(1)

        if dry_run:
            console.print("[yellow]DRY RUN complete - no files were uploaded[/yellow]")
        else:
            console.print(f"[green]Successfully uploaded to '{project}'[/green]")

        # Sync project if requested (skip on dry run)
        # Force full scan after bisync to ensure database is up-to-date with synced files
        if sync and not dry_run:
            console.print(f"[blue]Syncing project '{project}'...[/blue]")
            try:
                await sync_project(project, force_full=True)
            except Exception as e:
                console.print(f"[yellow]Warning: Sync failed: {e}[/yellow]")
                console.print("[dim]Files uploaded but may not be indexed yet[/dim]")

    run_with_cleanup(_upload())

```

--------------------------------------------------------------------------------
/tests/markdown/test_observation_edge_cases.py:
--------------------------------------------------------------------------------

```python
"""Tests for edge cases in observation parsing."""

from markdown_it import MarkdownIt

from basic_memory.markdown.plugins import observation_plugin, parse_observation
from basic_memory.markdown.schemas import Observation


def test_empty_input():
    """Test handling of empty input."""
    md = MarkdownIt().use(observation_plugin)

    tokens = md.parse("")
    assert not any(t.meta and "observation" in t.meta for t in tokens)

    tokens = md.parse("   ")
    assert not any(t.meta and "observation" in t.meta for t in tokens)

    tokens = md.parse("\n")
    assert not any(t.meta and "observation" in t.meta for t in tokens)


def test_invalid_context():
    """Test handling of invalid context format."""
    md = MarkdownIt().use(observation_plugin)

    # Unclosed context
    tokens = md.parse("- [test] Content (unclosed")
    token = next(t for t in tokens if t.type == "inline")
    obs = parse_observation(token)
    assert obs["content"] == "Content (unclosed"
    assert obs["context"] is None

    # Multiple parens
    tokens = md.parse("- [test] Content (with) extra) parens)")
    token = next(t for t in tokens if t.type == "inline")
    obs = parse_observation(token)
    assert obs["content"] == "Content"
    assert obs["context"] == "with) extra) parens"


def test_complex_format():
    """Test parsing complex observation formats."""
    md = MarkdownIt().use(observation_plugin)

    # Multiple hashtags together
    tokens = md.parse("- [complex test] This is #tag1#tag2 with #tag3 content")
    token = next(t for t in tokens if t.type == "inline")

    obs = parse_observation(token)
    assert obs["category"] == "complex test"
    assert set(obs["tags"]) == {"tag1", "tag2", "tag3"}
    assert obs["content"] == "This is #tag1#tag2 with #tag3 content"

    # Pydantic model validation
    observation = Observation.model_validate(obs)
    assert observation.category == "complex test"
    assert set(observation.tags) == {"tag1", "tag2", "tag3"}
    assert observation.content == "This is #tag1#tag2 with #tag3 content"


def test_malformed_category():
    """Test handling of malformed category brackets."""
    md = MarkdownIt().use(observation_plugin)

    # Empty category
    tokens = md.parse("- [] Empty category")
    token = next(t for t in tokens if t.type == "inline")
    observation = Observation.model_validate(parse_observation(token))
    assert observation.category is None
    assert observation.content == "Empty category"

    # Missing close bracket
    tokens = md.parse("- [test Content")
    token = next(t for t in tokens if t.type == "inline")
    observation = Observation.model_validate(parse_observation(token))
    # Should treat whole thing as content
    assert observation.category is None
    assert "test Content" in observation.content


def test_no_category():
    """Test handling of malformed category brackets."""
    md = MarkdownIt().use(observation_plugin)

    # Empty category
    tokens = md.parse("- No category")
    token = next(t for t in tokens if t.type == "inline")
    observation = Observation.model_validate(parse_observation(token))
    assert observation.category is None
    assert observation.content == "No category"


def test_unicode_content():
    """Test handling of Unicode content."""
    md = MarkdownIt().use(observation_plugin)

    # Emoji
    tokens = md.parse("- [test] Emoji test 👍 #emoji #test (Testing emoji)")
    token = next(t for t in tokens if t.type == "inline")
    obs = parse_observation(token)
    assert "👍" in obs["content"]
    assert "emoji" in obs["tags"]

    # Non-Latin scripts
    tokens = md.parse("- [中文] Chinese text 测试 #language (Script test)")
    token = next(t for t in tokens if t.type == "inline")
    obs = parse_observation(token)
    assert obs["category"] == "中文"
    assert "测试" in obs["content"]

    # Mixed scripts and emoji
    tokens = md.parse("- [test] Mixed 中文 and 👍 #mixed")
    token = next(t for t in tokens if t.type == "inline")
    obs = parse_observation(token)
    assert "中文" in obs["content"]
    assert "👍" in obs["content"]

    # Model validation with Unicode
    observation = Observation.model_validate(obs)
    assert "中文" in observation.content
    assert "👍" in observation.content

```
Page 2/19FirstPrevNextLast