#
tokens: 48834/50000 34/348 files (page 3/17)
lines: off (toggle) GitHub
raw markdown copy
This is page 3 of 17. Use http://codebase.md/basicmachines-co/basic-memory?page={x} to view the full context.

# Directory Structure

```
├── .claude
│   ├── agents
│   │   ├── python-developer.md
│   │   └── system-architect.md
│   └── commands
│       ├── release
│       │   ├── beta.md
│       │   ├── changelog.md
│       │   ├── release-check.md
│       │   └── release.md
│       ├── spec.md
│       └── test-live.md
├── .dockerignore
├── .github
│   ├── dependabot.yml
│   ├── ISSUE_TEMPLATE
│   │   ├── bug_report.md
│   │   ├── config.yml
│   │   ├── documentation.md
│   │   └── feature_request.md
│   └── workflows
│       ├── claude-code-review.yml
│       ├── claude-issue-triage.yml
│       ├── claude.yml
│       ├── dev-release.yml
│       ├── docker.yml
│       ├── pr-title.yml
│       ├── release.yml
│       └── test.yml
├── .gitignore
├── .python-version
├── CHANGELOG.md
├── CITATION.cff
├── CLA.md
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── docker-compose.yml
├── Dockerfile
├── docs
│   ├── ai-assistant-guide-extended.md
│   ├── character-handling.md
│   ├── cloud-cli.md
│   └── Docker.md
├── justfile
├── LICENSE
├── llms-install.md
├── pyproject.toml
├── README.md
├── SECURITY.md
├── smithery.yaml
├── specs
│   ├── SPEC-1 Specification-Driven Development Process.md
│   ├── SPEC-10 Unified Deployment Workflow and Event Tracking.md
│   ├── SPEC-11 Basic Memory API Performance Optimization.md
│   ├── SPEC-12 OpenTelemetry Observability.md
│   ├── SPEC-13 CLI Authentication with Subscription Validation.md
│   ├── SPEC-14 Cloud Git Versioning & GitHub Backup.md
│   ├── SPEC-14- Cloud Git Versioning & GitHub Backup.md
│   ├── SPEC-15 Configuration Persistence via Tigris for Cloud Tenants.md
│   ├── SPEC-16 MCP Cloud Service Consolidation.md
│   ├── SPEC-17 Semantic Search with ChromaDB.md
│   ├── SPEC-18 AI Memory Management Tool.md
│   ├── SPEC-19 Sync Performance and Memory Optimization.md
│   ├── SPEC-2 Slash Commands Reference.md
│   ├── SPEC-3 Agent Definitions.md
│   ├── SPEC-4 Notes Web UI Component Architecture.md
│   ├── SPEC-5 CLI Cloud Upload via WebDAV.md
│   ├── SPEC-6 Explicit Project Parameter Architecture.md
│   ├── SPEC-7 POC to spike Tigris Turso for local access to cloud data.md
│   ├── SPEC-8 TigrisFS Integration.md
│   ├── SPEC-9 Multi-Project Bidirectional Sync Architecture.md
│   ├── SPEC-9 Signed Header Tenant Information.md
│   └── SPEC-9-1 Follow-Ups- Conflict, Sync, and Observability.md
├── src
│   └── basic_memory
│       ├── __init__.py
│       ├── alembic
│       │   ├── alembic.ini
│       │   ├── env.py
│       │   ├── migrations.py
│       │   ├── script.py.mako
│       │   └── versions
│       │       ├── 3dae7c7b1564_initial_schema.py
│       │       ├── 502b60eaa905_remove_required_from_entity_permalink.py
│       │       ├── 5fe1ab1ccebe_add_projects_table.py
│       │       ├── 647e7a75e2cd_project_constraint_fix.py
│       │       ├── 9d9c1cb7d8f5_add_mtime_and_size_columns_to_entity_.py
│       │       ├── a1b2c3d4e5f6_fix_project_foreign_keys.py
│       │       ├── b3c3938bacdb_relation_to_name_unique_index.py
│       │       ├── cc7172b46608_update_search_index_schema.py
│       │       └── e7e1f4367280_add_scan_watermark_tracking_to_project.py
│       ├── api
│       │   ├── __init__.py
│       │   ├── app.py
│       │   ├── routers
│       │   │   ├── __init__.py
│       │   │   ├── directory_router.py
│       │   │   ├── importer_router.py
│       │   │   ├── knowledge_router.py
│       │   │   ├── management_router.py
│       │   │   ├── memory_router.py
│       │   │   ├── project_router.py
│       │   │   ├── prompt_router.py
│       │   │   ├── resource_router.py
│       │   │   ├── search_router.py
│       │   │   └── utils.py
│       │   └── template_loader.py
│       ├── cli
│       │   ├── __init__.py
│       │   ├── app.py
│       │   ├── auth.py
│       │   ├── commands
│       │   │   ├── __init__.py
│       │   │   ├── cloud
│       │   │   │   ├── __init__.py
│       │   │   │   ├── api_client.py
│       │   │   │   ├── bisync_commands.py
│       │   │   │   ├── cloud_utils.py
│       │   │   │   ├── core_commands.py
│       │   │   │   ├── mount_commands.py
│       │   │   │   ├── rclone_config.py
│       │   │   │   ├── rclone_installer.py
│       │   │   │   ├── upload_command.py
│       │   │   │   └── upload.py
│       │   │   ├── command_utils.py
│       │   │   ├── db.py
│       │   │   ├── import_chatgpt.py
│       │   │   ├── import_claude_conversations.py
│       │   │   ├── import_claude_projects.py
│       │   │   ├── import_memory_json.py
│       │   │   ├── mcp.py
│       │   │   ├── project.py
│       │   │   ├── status.py
│       │   │   ├── sync.py
│       │   │   └── tool.py
│       │   └── main.py
│       ├── config.py
│       ├── db.py
│       ├── deps.py
│       ├── file_utils.py
│       ├── ignore_utils.py
│       ├── importers
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── chatgpt_importer.py
│       │   ├── claude_conversations_importer.py
│       │   ├── claude_projects_importer.py
│       │   ├── memory_json_importer.py
│       │   └── utils.py
│       ├── markdown
│       │   ├── __init__.py
│       │   ├── entity_parser.py
│       │   ├── markdown_processor.py
│       │   ├── plugins.py
│       │   ├── schemas.py
│       │   └── utils.py
│       ├── mcp
│       │   ├── __init__.py
│       │   ├── async_client.py
│       │   ├── project_context.py
│       │   ├── prompts
│       │   │   ├── __init__.py
│       │   │   ├── ai_assistant_guide.py
│       │   │   ├── continue_conversation.py
│       │   │   ├── recent_activity.py
│       │   │   ├── search.py
│       │   │   └── utils.py
│       │   ├── resources
│       │   │   ├── ai_assistant_guide.md
│       │   │   └── project_info.py
│       │   ├── server.py
│       │   └── tools
│       │       ├── __init__.py
│       │       ├── build_context.py
│       │       ├── canvas.py
│       │       ├── chatgpt_tools.py
│       │       ├── delete_note.py
│       │       ├── edit_note.py
│       │       ├── list_directory.py
│       │       ├── move_note.py
│       │       ├── project_management.py
│       │       ├── read_content.py
│       │       ├── read_note.py
│       │       ├── recent_activity.py
│       │       ├── search.py
│       │       ├── utils.py
│       │       ├── view_note.py
│       │       └── write_note.py
│       ├── models
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── knowledge.py
│       │   ├── project.py
│       │   └── search.py
│       ├── repository
│       │   ├── __init__.py
│       │   ├── entity_repository.py
│       │   ├── observation_repository.py
│       │   ├── project_info_repository.py
│       │   ├── project_repository.py
│       │   ├── relation_repository.py
│       │   ├── repository.py
│       │   └── search_repository.py
│       ├── schemas
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── cloud.py
│       │   ├── delete.py
│       │   ├── directory.py
│       │   ├── importer.py
│       │   ├── memory.py
│       │   ├── project_info.py
│       │   ├── prompt.py
│       │   ├── request.py
│       │   ├── response.py
│       │   ├── search.py
│       │   └── sync_report.py
│       ├── services
│       │   ├── __init__.py
│       │   ├── context_service.py
│       │   ├── directory_service.py
│       │   ├── entity_service.py
│       │   ├── exceptions.py
│       │   ├── file_service.py
│       │   ├── initialization.py
│       │   ├── link_resolver.py
│       │   ├── project_service.py
│       │   ├── search_service.py
│       │   └── service.py
│       ├── sync
│       │   ├── __init__.py
│       │   ├── background_sync.py
│       │   ├── sync_service.py
│       │   └── watch_service.py
│       ├── templates
│       │   └── prompts
│       │       ├── continue_conversation.hbs
│       │       └── search.hbs
│       └── utils.py
├── test-int
│   ├── BENCHMARKS.md
│   ├── cli
│   │   ├── test_project_commands_integration.py
│   │   ├── test_sync_commands_integration.py
│   │   └── test_version_integration.py
│   ├── conftest.py
│   ├── mcp
│   │   ├── test_build_context_underscore.py
│   │   ├── test_build_context_validation.py
│   │   ├── test_chatgpt_tools_integration.py
│   │   ├── test_default_project_mode_integration.py
│   │   ├── test_delete_note_integration.py
│   │   ├── test_edit_note_integration.py
│   │   ├── test_list_directory_integration.py
│   │   ├── test_move_note_integration.py
│   │   ├── test_project_management_integration.py
│   │   ├── test_project_state_sync_integration.py
│   │   ├── test_read_content_integration.py
│   │   ├── test_read_note_integration.py
│   │   ├── test_search_integration.py
│   │   ├── test_single_project_mcp_integration.py
│   │   └── test_write_note_integration.py
│   ├── test_db_wal_mode.py
│   ├── test_disable_permalinks_integration.py
│   └── test_sync_performance_benchmark.py
├── tests
│   ├── __init__.py
│   ├── api
│   │   ├── conftest.py
│   │   ├── test_async_client.py
│   │   ├── test_continue_conversation_template.py
│   │   ├── test_directory_router.py
│   │   ├── test_importer_router.py
│   │   ├── test_knowledge_router.py
│   │   ├── test_management_router.py
│   │   ├── test_memory_router.py
│   │   ├── test_project_router_operations.py
│   │   ├── test_project_router.py
│   │   ├── test_prompt_router.py
│   │   ├── test_relation_background_resolution.py
│   │   ├── test_resource_router.py
│   │   ├── test_search_router.py
│   │   ├── test_search_template.py
│   │   ├── test_template_loader_helpers.py
│   │   └── test_template_loader.py
│   ├── cli
│   │   ├── conftest.py
│   │   ├── test_bisync_commands.py
│   │   ├── test_cli_tools.py
│   │   ├── test_cloud_authentication.py
│   │   ├── test_cloud_utils.py
│   │   ├── test_ignore_utils.py
│   │   ├── test_import_chatgpt.py
│   │   ├── test_import_claude_conversations.py
│   │   ├── test_import_claude_projects.py
│   │   ├── test_import_memory_json.py
│   │   └── test_upload.py
│   ├── conftest.py
│   ├── db
│   │   └── test_issue_254_foreign_key_constraints.py
│   ├── importers
│   │   ├── test_importer_base.py
│   │   └── test_importer_utils.py
│   ├── markdown
│   │   ├── __init__.py
│   │   ├── test_date_frontmatter_parsing.py
│   │   ├── test_entity_parser_error_handling.py
│   │   ├── test_entity_parser.py
│   │   ├── test_markdown_plugins.py
│   │   ├── test_markdown_processor.py
│   │   ├── test_observation_edge_cases.py
│   │   ├── test_parser_edge_cases.py
│   │   ├── test_relation_edge_cases.py
│   │   └── test_task_detection.py
│   ├── mcp
│   │   ├── conftest.py
│   │   ├── test_obsidian_yaml_formatting.py
│   │   ├── test_permalink_collision_file_overwrite.py
│   │   ├── test_prompts.py
│   │   ├── test_resources.py
│   │   ├── test_tool_build_context.py
│   │   ├── test_tool_canvas.py
│   │   ├── test_tool_delete_note.py
│   │   ├── test_tool_edit_note.py
│   │   ├── test_tool_list_directory.py
│   │   ├── test_tool_move_note.py
│   │   ├── test_tool_read_content.py
│   │   ├── test_tool_read_note.py
│   │   ├── test_tool_recent_activity.py
│   │   ├── test_tool_resource.py
│   │   ├── test_tool_search.py
│   │   ├── test_tool_utils.py
│   │   ├── test_tool_view_note.py
│   │   ├── test_tool_write_note.py
│   │   └── tools
│   │       └── test_chatgpt_tools.py
│   ├── Non-MarkdownFileSupport.pdf
│   ├── repository
│   │   ├── test_entity_repository_upsert.py
│   │   ├── test_entity_repository.py
│   │   ├── test_entity_upsert_issue_187.py
│   │   ├── test_observation_repository.py
│   │   ├── test_project_info_repository.py
│   │   ├── test_project_repository.py
│   │   ├── test_relation_repository.py
│   │   ├── test_repository.py
│   │   ├── test_search_repository_edit_bug_fix.py
│   │   └── test_search_repository.py
│   ├── schemas
│   │   ├── test_base_timeframe_minimum.py
│   │   ├── test_memory_serialization.py
│   │   ├── test_memory_url_validation.py
│   │   ├── test_memory_url.py
│   │   ├── test_schemas.py
│   │   └── test_search.py
│   ├── Screenshot.png
│   ├── services
│   │   ├── test_context_service.py
│   │   ├── test_directory_service.py
│   │   ├── test_entity_service_disable_permalinks.py
│   │   ├── test_entity_service.py
│   │   ├── test_file_service.py
│   │   ├── test_initialization.py
│   │   ├── test_link_resolver.py
│   │   ├── test_project_removal_bug.py
│   │   ├── test_project_service_operations.py
│   │   ├── test_project_service.py
│   │   └── test_search_service.py
│   ├── sync
│   │   ├── test_character_conflicts.py
│   │   ├── test_sync_service_incremental.py
│   │   ├── test_sync_service.py
│   │   ├── test_sync_wikilink_issue.py
│   │   ├── test_tmp_files.py
│   │   ├── test_watch_service_edge_cases.py
│   │   ├── test_watch_service_reload.py
│   │   └── test_watch_service.py
│   ├── test_config.py
│   ├── test_db_migration_deduplication.py
│   ├── test_deps.py
│   ├── test_production_cascade_delete.py
│   └── utils
│       ├── test_file_utils.py
│       ├── test_frontmatter_obsidian_compatible.py
│       ├── test_parse_tags.py
│       ├── test_permalink_formatting.py
│       ├── test_utf8_handling.py
│       └── test_validate_project_path.py
├── uv.lock
├── v0.15.0-RELEASE-DOCS.md
└── v15-docs
    ├── api-performance.md
    ├── background-relations.md
    ├── basic-memory-home.md
    ├── bug-fixes.md
    ├── chatgpt-integration.md
    ├── cloud-authentication.md
    ├── cloud-bisync.md
    ├── cloud-mode-usage.md
    ├── cloud-mount.md
    ├── default-project-mode.md
    ├── env-file-removal.md
    ├── env-var-overrides.md
    ├── explicit-project-parameter.md
    ├── gitignore-integration.md
    ├── project-root-env-var.md
    ├── README.md
    └── sqlite-performance.md
```

# Files

--------------------------------------------------------------------------------
/tests/services/test_file_service.py:
--------------------------------------------------------------------------------

```python
"""Tests for file operations service."""

from pathlib import Path
from unittest.mock import patch

import pytest

from basic_memory.services.exceptions import FileOperationError
from basic_memory.services.file_service import FileService


@pytest.mark.asyncio
async def test_exists(tmp_path: Path, file_service: FileService):
    """Test file existence checking."""
    # Test path
    test_path = tmp_path / "test.md"

    # Should not exist initially
    assert not await file_service.exists(test_path)

    # Create file
    test_path.write_text("test content")
    assert await file_service.exists(test_path)

    # Delete file
    test_path.unlink()
    assert not await file_service.exists(test_path)


@pytest.mark.asyncio
async def test_exists_error_handling(tmp_path: Path, file_service: FileService):
    """Test error handling in exists() method."""
    test_path = tmp_path / "test.md"

    # Mock Path.exists to raise an error
    with patch.object(Path, "exists") as mock_exists:
        mock_exists.side_effect = PermissionError("Access denied")

        with pytest.raises(FileOperationError) as exc_info:
            await file_service.exists(test_path)

        assert "Failed to check file existence" in str(exc_info.value)


@pytest.mark.asyncio
async def test_write_read_file(tmp_path: Path, file_service: FileService):
    """Test basic write/read operations with checksums."""
    test_path = tmp_path / "test.md"
    test_content = "test content\nwith multiple lines"

    # Write file and get checksum
    checksum = await file_service.write_file(test_path, test_content)
    assert test_path.exists()

    # Read back and verify content/checksum
    content, read_checksum = await file_service.read_file(test_path)
    assert content == test_content
    assert read_checksum == checksum


@pytest.mark.asyncio
async def test_write_creates_directories(tmp_path: Path, file_service: FileService):
    """Test directory creation on write."""
    test_path = tmp_path / "subdir" / "nested" / "test.md"
    test_content = "test content"

    # Write should create directories
    await file_service.write_file(test_path, test_content)
    assert test_path.exists()
    assert test_path.parent.is_dir()


@pytest.mark.asyncio
async def test_write_atomic(tmp_path: Path, file_service: FileService):
    """Test atomic write with no partial files."""
    test_path = tmp_path / "test.md"
    temp_path = test_path.with_suffix(".tmp")

    # Mock write_file_atomic to raise an error
    with patch("basic_memory.file_utils.write_file_atomic") as mock_write:
        mock_write.side_effect = Exception("Write failed")

        # Attempt write that will fail
        with pytest.raises(FileOperationError):
            await file_service.write_file(test_path, "test content")

        # No partial files should exist
        assert not test_path.exists()
        assert not temp_path.exists()


@pytest.mark.asyncio
async def test_delete_file(tmp_path: Path, file_service: FileService):
    """Test file deletion."""
    test_path = tmp_path / "test.md"
    test_content = "test content"

    # Create then delete
    await file_service.write_file(test_path, test_content)
    assert test_path.exists()

    await file_service.delete_file(test_path)
    assert not test_path.exists()

    # Delete non-existent file should not error
    await file_service.delete_file(test_path)


@pytest.mark.asyncio
async def test_checksum_consistency(tmp_path: Path, file_service: FileService):
    """Test checksum remains consistent."""
    test_path = tmp_path / "test.md"
    test_content = "test content\n" * 10

    # Get checksum from write
    checksum1 = await file_service.write_file(test_path, test_content)

    # Get checksum from read
    _, checksum2 = await file_service.read_file(test_path)

    # Write again and get new checksum
    checksum3 = await file_service.write_file(test_path, test_content)

    # All should match
    assert checksum1 == checksum2 == checksum3


@pytest.mark.asyncio
async def test_error_handling_missing_file(tmp_path: Path, file_service: FileService):
    """Test error handling for missing files."""
    test_path = tmp_path / "missing.md"

    with pytest.raises(FileOperationError):
        await file_service.read_file(test_path)


@pytest.mark.asyncio
async def test_error_handling_invalid_path(tmp_path: Path, file_service: FileService):
    """Test error handling for invalid paths."""
    # Try to write to a directory instead of file
    test_path = tmp_path / "test.md"
    test_path.mkdir()  # Create a directory instead of a file

    with pytest.raises(FileOperationError):
        await file_service.write_file(test_path, "test")


@pytest.mark.asyncio
async def test_write_unicode_content(tmp_path: Path, file_service: FileService):
    """Test handling of unicode content."""
    test_path = tmp_path / "test.md"
    test_content = """
    # Test Unicode
    - Emoji: 🚀 ⭐️ 🔥
    - Chinese: 你好世界
    - Arabic: مرحبا بالعالم
    - Russian: Привет, мир
    """

    # Write and read back
    await file_service.write_file(test_path, test_content)
    content, _ = await file_service.read_file(test_path)

    assert content == test_content

```

--------------------------------------------------------------------------------
/tests/api/test_search_template.py:
--------------------------------------------------------------------------------

```python
"""Tests for the search template rendering."""

import datetime
import pytest

from basic_memory.api.template_loader import TemplateLoader
from basic_memory.schemas.search import SearchItemType, SearchResult


@pytest.fixture
def template_loader():
    """Return a TemplateLoader instance for testing."""
    return TemplateLoader()


@pytest.fixture
def search_result():
    """Create a sample SearchResult for testing."""
    return SearchResult(
        title="Test Search Result",
        type=SearchItemType.ENTITY,
        permalink="test/search-result",
        score=0.95,
        content="This is a test search result with some content.",
        file_path="/path/to/test/search-result.md",
        metadata={"created_at": datetime.datetime(2023, 2, 1, 12, 0)},
    )


@pytest.fixture
def context_with_results(search_result):
    """Create a sample context with search results."""
    return {
        "query": "test query",
        "timeframe": "30d",
        "has_results": True,
        "result_count": 1,
        "results": [search_result],
    }


@pytest.fixture
def context_without_results():
    """Create a sample context without search results."""
    return {
        "query": "empty query",
        "timeframe": None,
        "has_results": False,
        "result_count": 0,
        "results": [],
    }


@pytest.mark.asyncio
async def test_search_with_results(template_loader, context_with_results):
    """Test rendering the search template with results."""
    result = await template_loader.render("prompts/search.hbs", context_with_results)

    # Check that key elements are present
    assert 'Search Results for: "test query" (after 30d)' in result
    assert "1.0. Test Search Result" in result
    assert "Type**: entity" in result
    assert "Relevance Score**: 0.95" in result
    assert "This is a test search result with some content." in result
    assert 'read_note("test/search-result")' in result
    assert "Next Steps" in result
    assert "Synthesize and Capture Knowledge" in result


@pytest.mark.asyncio
async def test_search_without_results(template_loader, context_without_results):
    """Test rendering the search template without results."""
    result = await template_loader.render("prompts/search.hbs", context_without_results)

    # Check that key elements are present
    assert 'Search Results for: "empty query"' in result
    assert "I couldn't find any results for this query." in result
    assert "Opportunity to Capture Knowledge!" in result
    assert "write_note(" in result
    assert 'title="Empty query"' in result
    assert "Other Suggestions" in result


@pytest.mark.asyncio
async def test_multiple_search_results(template_loader):
    """Test rendering the search template with multiple results."""
    # Create multiple search results
    results = []
    for i in range(1, 6):  # Create 5 results
        results.append(
            SearchResult(
                title=f"Search Result {i}",
                type=SearchItemType.ENTITY,
                permalink=f"test/result-{i}",
                score=1.0 - (i * 0.1),  # Decreasing scores
                content=f"Content for result {i}",
                file_path=f"/path/to/result-{i}.md",
                metadata={},
            )
        )

    context = {
        "query": "multiple results",
        "timeframe": None,
        "has_results": True,
        "result_count": len(results),
        "results": results,
    }

    result = await template_loader.render("prompts/search.hbs", context)

    # Check that all results are rendered
    for i in range(1, 6):
        assert f"{i}.0. Search Result {i}" in result
        assert f"Content for result {i}" in result
        assert f'read_note("test/result-{i}")' in result


@pytest.mark.asyncio
async def test_capitalization_in_write_note_template(template_loader, context_with_results):
    """Test that the query is capitalized in the write_note template."""
    result = await template_loader.render("prompts/search.hbs", context_with_results)

    # The query should be capitalized in the suggested write_note call
    assert "Synthesis of Test query Information" in result


@pytest.mark.asyncio
async def test_timeframe_display(template_loader):
    """Test that the timeframe is displayed correctly when present, and not when absent."""
    # Context with timeframe
    context_with_timeframe = {
        "query": "with timeframe",
        "timeframe": "7d",
        "has_results": True,
        "result_count": 0,
        "results": [],
    }

    result_with_timeframe = await template_loader.render(
        "prompts/search.hbs", context_with_timeframe
    )
    assert 'Search Results for: "with timeframe" (after 7d)' in result_with_timeframe

    # Context without timeframe
    context_without_timeframe = {
        "query": "without timeframe",
        "timeframe": None,
        "has_results": True,
        "result_count": 0,
        "results": [],
    }

    result_without_timeframe = await template_loader.render(
        "prompts/search.hbs", context_without_timeframe
    )
    assert 'Search Results for: "without timeframe"' in result_without_timeframe
    assert 'Search Results for: "without timeframe" (after' not in result_without_timeframe

```

--------------------------------------------------------------------------------
/test-int/test_disable_permalinks_integration.py:
--------------------------------------------------------------------------------

```python
"""Integration tests for the disable_permalinks configuration."""

import pytest

from basic_memory.config import BasicMemoryConfig
from basic_memory.markdown import EntityParser, MarkdownProcessor
from basic_memory.repository import (
    EntityRepository,
    ObservationRepository,
    RelationRepository,
    ProjectRepository,
)
from basic_memory.repository.search_repository import SearchRepository
from basic_memory.schemas import Entity as EntitySchema
from basic_memory.services import FileService
from basic_memory.services.entity_service import EntityService
from basic_memory.services.link_resolver import LinkResolver
from basic_memory.services.search_service import SearchService
from basic_memory.sync.sync_service import SyncService


@pytest.mark.asyncio
async def test_disable_permalinks_create_entity(tmp_path, engine_factory):
    """Test that entities created with disable_permalinks=True don't have permalinks."""
    engine, session_maker = engine_factory

    # Create app config with disable_permalinks=True
    app_config = BasicMemoryConfig(disable_permalinks=True)

    # Setup repositories
    entity_repository = EntityRepository(session_maker, project_id=1)
    observation_repository = ObservationRepository(session_maker, project_id=1)
    relation_repository = RelationRepository(session_maker, project_id=1)
    search_repository = SearchRepository(session_maker, project_id=1)

    # Setup services
    entity_parser = EntityParser(tmp_path)
    markdown_processor = MarkdownProcessor(entity_parser)
    file_service = FileService(tmp_path, markdown_processor)
    search_service = SearchService(search_repository, entity_repository, file_service)
    await search_service.init_search_index()
    link_resolver = LinkResolver(entity_repository, search_service)

    entity_service = EntityService(
        entity_parser=entity_parser,
        entity_repository=entity_repository,
        observation_repository=observation_repository,
        relation_repository=relation_repository,
        file_service=file_service,
        link_resolver=link_resolver,
        app_config=app_config,
    )

    # Create entity via API
    entity_data = EntitySchema(
        title="Test Note",
        folder="test",
        entity_type="note",
        content="Test content",
    )

    created = await entity_service.create_entity(entity_data)

    # Verify entity has no permalink
    assert created.permalink is None

    # Verify file has no permalink in frontmatter
    file_path = tmp_path / "test" / "Test Note.md"
    assert file_path.exists()
    content = file_path.read_text()
    assert "permalink:" not in content
    assert "Test content" in content


@pytest.mark.asyncio
async def test_disable_permalinks_sync_workflow(tmp_path, engine_factory):
    """Test full sync workflow with disable_permalinks enabled."""
    engine, session_maker = engine_factory

    # Create app config with disable_permalinks=True
    app_config = BasicMemoryConfig(disable_permalinks=True)

    # Create a test markdown file without frontmatter
    test_file = tmp_path / "test_note.md"
    test_file.write_text("# Test Note\nThis is test content.")

    # Setup repositories
    entity_repository = EntityRepository(session_maker, project_id=1)
    observation_repository = ObservationRepository(session_maker, project_id=1)
    relation_repository = RelationRepository(session_maker, project_id=1)
    search_repository = SearchRepository(session_maker, project_id=1)
    project_repository = ProjectRepository(session_maker)

    # Setup services
    entity_parser = EntityParser(tmp_path)
    markdown_processor = MarkdownProcessor(entity_parser)
    file_service = FileService(tmp_path, markdown_processor)
    search_service = SearchService(search_repository, entity_repository, file_service)
    await search_service.init_search_index()
    link_resolver = LinkResolver(entity_repository, search_service)

    entity_service = EntityService(
        entity_parser=entity_parser,
        entity_repository=entity_repository,
        observation_repository=observation_repository,
        relation_repository=relation_repository,
        file_service=file_service,
        link_resolver=link_resolver,
        app_config=app_config,
    )

    sync_service = SyncService(
        app_config=app_config,
        entity_service=entity_service,
        project_repository=project_repository,
        entity_parser=entity_parser,
        entity_repository=entity_repository,
        relation_repository=relation_repository,
        search_service=search_service,
        file_service=file_service,
    )

    # Run sync
    report = await sync_service.scan(tmp_path)
    # Note: scan may pick up database files too, so just check our file is there
    assert "test_note.md" in report.new

    # Sync the file
    await sync_service.sync_file("test_note.md", new=True)

    # Verify file has no permalink added
    content = test_file.read_text()
    assert "permalink:" not in content
    assert "# Test Note" in content

    # Verify entity in database has no permalink
    entities = await entity_repository.find_all()
    assert len(entities) == 1
    assert entities[0].permalink is None
    # Title is extracted from filename when no frontmatter, or from frontmatter when present
    assert entities[0].title in ("test_note", "Test Note")

```

--------------------------------------------------------------------------------
/src/basic_memory/importers/claude_projects_importer.py:
--------------------------------------------------------------------------------

```python
"""Claude projects import service for Basic Memory."""

import logging
from typing import Any, Dict, Optional

from basic_memory.markdown.schemas import EntityFrontmatter, EntityMarkdown
from basic_memory.importers.base import Importer
from basic_memory.schemas.importer import ProjectImportResult
from basic_memory.importers.utils import clean_filename

logger = logging.getLogger(__name__)


class ClaudeProjectsImporter(Importer[ProjectImportResult]):
    """Service for importing Claude projects."""

    async def import_data(
        self, source_data, destination_folder: str, **kwargs: Any
    ) -> ProjectImportResult:
        """Import projects from Claude JSON export.

        Args:
            source_path: Path to the Claude projects.json file.
            destination_folder: Base folder for projects within the project.
            **kwargs: Additional keyword arguments.

        Returns:
            ProjectImportResult containing statistics and status of the import.
        """
        try:
            # Ensure the base folder exists
            base_path = self.base_path
            if destination_folder:
                base_path = self.ensure_folder_exists(destination_folder)

            projects = source_data

            # Process each project
            docs_imported = 0
            prompts_imported = 0

            for project in projects:
                project_dir = clean_filename(project["name"])

                # Create project directories
                docs_dir = base_path / project_dir / "docs"
                docs_dir.mkdir(parents=True, exist_ok=True)

                # Import prompt template if it exists
                if prompt_entity := self._format_prompt_markdown(project):
                    file_path = base_path / f"{prompt_entity.frontmatter.metadata['permalink']}.md"
                    await self.write_entity(prompt_entity, file_path)
                    prompts_imported += 1

                # Import project documents
                for doc in project.get("docs", []):
                    entity = self._format_project_markdown(project, doc)
                    file_path = base_path / f"{entity.frontmatter.metadata['permalink']}.md"
                    await self.write_entity(entity, file_path)
                    docs_imported += 1

            return ProjectImportResult(
                import_count={"documents": docs_imported, "prompts": prompts_imported},
                success=True,
                documents=docs_imported,
                prompts=prompts_imported,
            )

        except Exception as e:  # pragma: no cover
            logger.exception("Failed to import Claude projects")
            return self.handle_error("Failed to import Claude projects", e)  # pyright: ignore [reportReturnType]

    def _format_project_markdown(
        self, project: Dict[str, Any], doc: Dict[str, Any]
    ) -> EntityMarkdown:
        """Format a project document as a Basic Memory entity.

        Args:
            project: Project data.
            doc: Document data.

        Returns:
            EntityMarkdown instance representing the document.
        """
        # Extract timestamps
        created_at = doc.get("created_at") or project["created_at"]
        modified_at = project["updated_at"]

        # Generate clean names for organization
        project_dir = clean_filename(project["name"])
        doc_file = clean_filename(doc["filename"])

        # Create entity
        entity = EntityMarkdown(
            frontmatter=EntityFrontmatter(
                metadata={
                    "type": "project_doc",
                    "title": doc["filename"],
                    "created": created_at,
                    "modified": modified_at,
                    "permalink": f"{project_dir}/docs/{doc_file}",
                    "project_name": project["name"],
                    "project_uuid": project["uuid"],
                    "doc_uuid": doc["uuid"],
                }
            ),
            content=doc["content"],
        )

        return entity

    def _format_prompt_markdown(self, project: Dict[str, Any]) -> Optional[EntityMarkdown]:
        """Format project prompt template as a Basic Memory entity.

        Args:
            project: Project data.

        Returns:
            EntityMarkdown instance representing the prompt template, or None if
            no prompt template exists.
        """
        if not project.get("prompt_template"):
            return None

        # Extract timestamps
        created_at = project["created_at"]
        modified_at = project["updated_at"]

        # Generate clean project directory name
        project_dir = clean_filename(project["name"])

        # Create entity
        entity = EntityMarkdown(
            frontmatter=EntityFrontmatter(
                metadata={
                    "type": "prompt_template",
                    "title": f"Prompt Template: {project['name']}",
                    "created": created_at,
                    "modified": modified_at,
                    "permalink": f"{project_dir}/prompt-template",
                    "project_name": project["name"],
                    "project_uuid": project["uuid"],
                }
            ),
            content=f"# Prompt Template: {project['name']}\n\n{project['prompt_template']}",
        )

        return entity

```

--------------------------------------------------------------------------------
/v15-docs/explicit-project-parameter.md:
--------------------------------------------------------------------------------

```markdown
# Explicit Project Parameter (SPEC-6)

**Status**: Breaking Change
**PR**: #298
**Affects**: All MCP tool users

## What Changed

Starting in v0.15.0, **all MCP tools require an explicit `project` parameter**. The previous implicit project context (via middleware) has been removed in favor of a stateless architecture.

### Before v0.15.0
```python
# Tools used implicit current_project from middleware
await write_note("My Note", "Content", "folder")
await search_notes("query")
```

### v0.15.0 and Later
```python
# Explicit project required
await write_note("My Note", "Content", "folder", project="main")
await search_notes("query", project="main")
```

## Why This Matters

**Benefits:**
- **Stateless Architecture**: Tools are now truly stateless - no hidden state
- **Multi-project Clarity**: Explicit about which project you're working with
- **Better for Cloud**: Enables proper multi-tenant isolation
- **Simpler Debugging**: No confusion about "current" project

**Impact:**
- Existing MCP integrations may break if they don't specify project
- LLMs need to be aware of project parameter requirement
- Configuration option available for easier migration (see below)

## How to Use

### Option 1: Specify Project Every Time (Recommended for Multi-project Users)

```python
# Always include project parameter
results = await search_notes(
    query="authentication",
    project="work-docs"
)

content = await read_note(
    identifier="Search Design",
    project="work-docs"
)

await write_note(
    title="New Feature",
    content="...",
    folder="specs",
    project="work-docs"
)
```

### Option 2: Enable default_project_mode (Recommended for Single-project Users)

Edit `~/.basic-memory/config.json`:

```json
{
  "default_project": "main",
  "default_project_mode": true,
  "projects": {
    "main": "/Users/you/basic-memory"
  }
}
```

With `default_project_mode: true`:
```python
# Project parameter is optional - uses default_project when omitted
await write_note("My Note", "Content", "folder")  # Uses "main" project
await search_notes("query")  # Uses "main" project

# Can still override with explicit project
await search_notes("query", project="other-project")
```

### Option 3: Project Discovery for New Users

If you don't know which project to use:

```python
# List available projects
projects = await list_memory_projects()
for project in projects:
    print(f"- {project.name}: {project.path}")

# Check recent activity to find active project
activity = await recent_activity()  # Shows cross-project activity
# Returns recommendations for which project to use
```

## Migration Guide

### For Claude Desktop Users

1. **Check your config**: `cat ~/.basic-memory/config.json`

2. **Single project setup** (easiest):
   ```json
   {
     "default_project_mode": true,
     "default_project": "main"
   }
   ```

3. **Multi-project setup** (explicit):
   - Keep `default_project_mode: false` (or omit it)
   - LLM will need to specify project in each call

### For MCP Server Developers

Update tool calls to include project parameter:

```python
# Old (v0.14.x)
async def my_integration():
    # Relied on middleware to set current_project
    results = await search_notes(query="test")

# New (v0.15.0+)
async def my_integration(project: str = "main"):
    # Explicitly pass project
    results = await search_notes(query="test", project=project)
```

### For API Users

If using the Basic Memory API directly:

```python
# All endpoints now require project parameter
import httpx

async with httpx.AsyncClient() as client:
    response = await client.post(
        "http://localhost:8000/notes/search",
        json={
            "query": "test",
            "project": "main"  # Required
        }
    )
```

## Technical Details

### Architecture Change

**Removed:**
- `ProjectMiddleware` - no longer maintains project context
- `get_current_project()` - removed from MCP tools
- Implicit project state in MCP server

**Added:**
- `default_project_mode` config option
- Explicit project parameter on all MCP tools
- Stateless tool architecture (SPEC-6)

### Configuration Options

| Config Key | Type | Default | Description |
|------------|------|---------|-------------|
| `default_project_mode` | bool | `false` | Auto-use default_project when project param omitted |
| `default_project` | string | `"main"` | Project to use in default_project_mode |

### Three-Tier Project Resolution

1. **CLI Constraint** (Highest Priority): `--project` flag constrains all operations
2. **Explicit Parameter** (Medium): `project="name"` in tool calls
3. **Default Mode** (Lowest): Falls back to `default_project` if `default_project_mode: true`

## Common Questions

**Q: Will my existing setup break?**
A: If you use a single project and enable `default_project_mode: true`, no. Otherwise, you'll need to add project parameters.

**Q: Can I still use multiple projects?**
A: Yes! Just specify the project parameter explicitly in each call.

**Q: What if I forget the project parameter?**
A: You'll get an error unless `default_project_mode: true` is set in config.

**Q: How does this work with Claude Desktop?**
A: Claude can read your config and use default_project_mode, or it can discover projects using `list_memory_projects()`.

## Related Changes

- See `default-project-mode.md` for detailed config options
- See `cloud-mode-usage.md` for cloud API usage
- See SPEC-6 for full architectural specification

```

--------------------------------------------------------------------------------
/v0.15.0-RELEASE-DOCS.md:
--------------------------------------------------------------------------------

```markdown
# v0.15.0 Release Plan

## Release Overview

**Target Version**: v0.15.0
**Previous Version**: v0.14.4
**Release Date**: TBD
**Milestone**: [v0.15.0](https://github.com/basicmachines-co/basic-memory/milestone)

### Release Highlights

This is a **major release** with 53 merged PRs introducing:
- **Cloud Sync**: Bidirectional sync with rclone bisync
- **Authentication**: JWT-based cloud authentication with subscription validation
- **Performance**: API optimizations and background processing improvements
- **Security**: Removed .env loading vulnerability, added .gitignore support
- **Platform**: Python 3.13 support
- **Bug Fixes**: 13+ critical fixes

## Key Features by Category

### Cloud Features
- Cloud authentication with JWT and subscription validation
- Bidirectional sync with rclone bisync
- Cloud mount commands for direct file access
- Cloud project management
- Integrity verification

### Performance Improvements
- API performance optimizations (SPEC-11)
- Background relation resolution (prevents cold start blocking)
- WAL mode for SQLite
- Non-blocking sync operations

### Security Enhancements
- Removed .env file loading vulnerability
- .gitignore integration (respects gitignored files)
- Improved authentication and session management
- Better config security

### Developer Experience
- Python 3.13 support
- ChatGPT tools integration
- Improved error handling
- Better CLI output and formatting

### Bug Fixes (13+ PRs)
- Entity upsert conflict resolution (#328)
- memory:// URL underscore handling (#329)
- .env loading removed (#330)
- Minimum timeframe enforcement (#318)
- move_note file extension handling (#281)
- Project parameter handling (#310)
- And more...

---

## Document

- [ ] **MNew Cloud Features**
  - [ ] `bm cloud login` authentication flow
  - [ ] `bm cloud logout` session cleanup
  - [ ] `bm cloud sync` bidirectional sync
  - [ ] `bm cloud check` integrity verification
  - [ ] Cloud mode toggle for regular commands
  - [ ] Project creation in cloud mode

- [ ] **Manual Testing - Bug Fixes**
  - [ ] Entity upsert conflict resolution (#328)
  - [ ] memory:// URL underscore normalization (#329)
  - [ ] .gitignore file filtering (#287, #285)
  - [ ] move_note with/without file extension (#281)
  - [ ]  .env file loading removed (#330)

- [ ] **Platform Testing**
  - [ ] Python 3.13 compatibility (new in this release)

- [ ] **CHANGELOG.md**
  - [ ] Create comprehensive v0.15.0 entry
  - [ ] List all major features
  - [ ] Document all bug fixes with issue links
  - [ ] Include breaking changes (if any)
  - [ ] Add migration guide (if needed)
  - [ ] Credit contributors
  - [ ] `mcp/tools/chatgpt_tools.py` - ChatGPT integration

- [x] **README.md**
  - [x] Update Python version badge to 3.13+
  - [x] Add cloud features to feature list
  - [x] Add cloud CLI commands section
  - [x] Expand MCP tools list with all tools organized by category
  - [x] Add Cloud CLI documentation link

- [x] **CLAUDE.md**
  - [x] Add Python 3.13+ support note
  - [x] Add cloud commands section
  - [x] Expand MCP tools with all missing tools
  - [x] Add comprehensive "Cloud Features (v0.15.0+)" section

- [ ] **docs.basicmemory.com Updates** (Docs Site)
  - [ ] **latest-releases.mdx**: Add v0.15.0 release entry with all features
  - [ ] **cli-reference.mdx**: Add cloud commands section (login, logout, sync, check, mount, unmount)
  - [ ] **mcp-tools-reference.mdx**: Add missing tools (read_content, all project management tools)
  - [ ] **cloud-cli.mdx**: CREATE NEW - Cloud authentication, sync, rclone config, troubleshooting
  - [ ] **getting-started.mdx**: Mention Python 3.13 support
  - [ ] **whats-new.mdx**: Add v0.15.0 section with cloud features, performance, security updates

- [ ] **Cloud Documentation**
  - [ ] Review docs/cloud-cli.md for accuracy
  - [ ] Update authentication instructions
  - [ ] Document subscription requirements
  - [ ] Add troubleshooting section
  - [ ] rclone configuration

- [ ] **API Documentation**
  - [ ] Document new cloud endpoints
  - [ ] Update MCP tool documentation
  - [ ] Review schema documentation
  - [ ] Config file changes

- [ ] **New Specifications**
  - [ ] SPEC-11: API Performance Optimization
  - [ ] SPEC-13: CLI Authentication with Subscription Validation
  - [ ] SPEC-6: Explicit Project Parameter Architecture

- [ ] **Feature PRs**
  - [ ] #330: Remove .env file loading
  - [ ] #329: Normalize memory:// URLs
  - [ ] #328: Simplify entity upsert
  - [ ] #327: CLI subscription validation
  - [ ] #322: Cloud CLI rclone bisync
  - [ ] #320: Lifecycle management optimization
  - [ ] #319: Background relation resolution
  - [ ] #318: Minimum timeframe enforcement
  - [ ] #317: Cloud deployment fixes
  - [ ] #315: API performance optimizations
  - [ ] #314: .gitignore integration
  - [ ] #313: Disable permalinks config flag
  - [ ] #312: DateTime JSON schema fixes


### Phase 5: GitHub Milestone Review

- [ ] **Closed Issues** (23 total)
  - [ ] Review all closed issues for completeness
  - [ ] Verify fixes are properly tested
  - [ ] Ensure documentation updated

- [ ] **Merged PRs** (13 in milestone, 53 total since v0.14.4)
  - [ ] All critical PRs merged
  - [ ] All PRs properly tested
  - [ ] All PRs documented

- [ ] **Open Issues**
  - [ ] #326: Create user guides and demos (can defer to v0.15.1?)
  - [ ] Decision on whether to block release

## Notes

- This is a significant release with major new cloud features
- Cloud features require active subscription - ensure this is clear in docs

```

--------------------------------------------------------------------------------
/tests/api/test_memory_router.py:
--------------------------------------------------------------------------------

```python
"""Tests for memory router endpoints."""

from datetime import datetime

import pytest

from basic_memory.schemas.memory import GraphContext


@pytest.mark.asyncio
async def test_get_memory_context(client, test_graph, project_url):
    """Test getting context from memory URL."""
    response = await client.get(f"{project_url}/memory/test/root")
    assert response.status_code == 200

    context = GraphContext(**response.json())
    assert len(context.results) == 1
    assert context.results[0].primary_result.permalink == "test/root"
    assert len(context.results[0].related_results) > 0

    # Verify metadata
    assert context.metadata.uri == "test/root"
    assert context.metadata.depth == 1  # default depth
    assert isinstance(context.metadata.generated_at, datetime)
    assert context.metadata.primary_count + context.metadata.related_count > 0
    assert context.metadata.total_results is not None  # Backwards compatibility field


@pytest.mark.asyncio
async def test_get_memory_context_pagination(client, test_graph, project_url):
    """Test getting context from memory URL."""
    response = await client.get(f"{project_url}/memory/test/root?page=1&page_size=1")
    assert response.status_code == 200

    context = GraphContext(**response.json())
    assert len(context.results) == 1
    assert context.results[0].primary_result.permalink == "test/root"
    assert len(context.results[0].related_results) > 0

    # Verify metadata
    assert context.metadata.uri == "test/root"
    assert context.metadata.depth == 1  # default depth
    assert isinstance(context.metadata.generated_at, datetime)
    assert context.metadata.primary_count > 0


@pytest.mark.asyncio
async def test_get_memory_context_pattern(client, test_graph, project_url):
    """Test getting context with pattern matching."""
    response = await client.get(f"{project_url}/memory/test/*")
    assert response.status_code == 200

    context = GraphContext(**response.json())
    assert len(context.results) > 1  # Should match multiple test/* paths
    assert all("test/" in item.primary_result.permalink for item in context.results)


@pytest.mark.asyncio
async def test_get_memory_context_depth(client, test_graph, project_url):
    """Test depth parameter affects relation traversal."""
    # With depth=1, should only get immediate connections
    response = await client.get(f"{project_url}/memory/test/root?depth=1&max_results=20")
    assert response.status_code == 200
    context1 = GraphContext(**response.json())

    # With depth=2, should get deeper connections
    response = await client.get(f"{project_url}/memory/test/root?depth=3&max_results=20")
    assert response.status_code == 200
    context2 = GraphContext(**response.json())

    # Calculate total related items in all result items
    total_related1 = sum(len(item.related_results) for item in context1.results)
    total_related2 = sum(len(item.related_results) for item in context2.results)

    assert total_related2 > total_related1


@pytest.mark.asyncio
async def test_get_memory_context_timeframe(client, test_graph, project_url):
    """Test timeframe parameter filters by date."""
    # Recent timeframe
    response = await client.get(f"{project_url}/memory/test/root?timeframe=1d")
    assert response.status_code == 200
    recent = GraphContext(**response.json())

    # Longer timeframe
    response = await client.get(f"{project_url}/memory/test/root?timeframe=30d")
    assert response.status_code == 200
    older = GraphContext(**response.json())

    # Calculate total related items
    total_recent_related = (
        sum(len(item.related_results) for item in recent.results) if recent.results else 0
    )
    total_older_related = (
        sum(len(item.related_results) for item in older.results) if older.results else 0
    )

    assert total_older_related >= total_recent_related


@pytest.mark.asyncio
async def test_not_found(client, project_url):
    """Test handling of non-existent paths."""
    response = await client.get(f"{project_url}/memory/test/does-not-exist")
    assert response.status_code == 200

    context = GraphContext(**response.json())
    assert len(context.results) == 0


@pytest.mark.asyncio
async def test_recent_activity(client, test_graph, project_url):
    """Test handling of recent activity."""
    response = await client.get(f"{project_url}/memory/recent")
    assert response.status_code == 200

    context = GraphContext(**response.json())
    assert len(context.results) > 0
    assert context.metadata.primary_count > 0


@pytest.mark.asyncio
async def test_recent_activity_pagination(client, test_graph, project_url):
    """Test pagination for recent activity."""
    response = await client.get(f"{project_url}/memory/recent?page=1&page_size=1")
    assert response.status_code == 200

    context = GraphContext(**response.json())
    assert len(context.results) == 1
    assert context.page == 1
    assert context.page_size == 1


@pytest.mark.asyncio
async def test_recent_activity_by_type(client, test_graph, project_url):
    """Test filtering recent activity by type."""
    response = await client.get(f"{project_url}/memory/recent?type=relation&type=observation")
    assert response.status_code == 200

    context = GraphContext(**response.json())
    assert len(context.results) > 0

    # Check for relation and observation types in primary results
    primary_types = [item.primary_result.type for item in context.results]
    assert "relation" in primary_types or "observation" in primary_types

```

--------------------------------------------------------------------------------
/tests/mcp/test_tool_utils.py:
--------------------------------------------------------------------------------

```python
"""Tests for MCP tool utilities."""

from unittest.mock import AsyncMock

import pytest
from httpx import AsyncClient, HTTPStatusError
from mcp.server.fastmcp.exceptions import ToolError

from basic_memory.mcp.tools.utils import (
    call_get,
    call_post,
    call_put,
    call_delete,
    get_error_message,
)


@pytest.fixture
def mock_response(monkeypatch):
    """Create a mock response."""

    class MockResponse:
        def __init__(self, status_code=200):
            self.status_code = status_code
            self.is_success = status_code < 400
            self.json = lambda: {}

        def raise_for_status(self):
            if self.status_code >= 400:
                raise HTTPStatusError(
                    message=f"HTTP Error {self.status_code}", request=None, response=self
                )

    return MockResponse


@pytest.mark.asyncio
async def test_call_get_success(mock_response):
    """Test successful GET request."""
    client = AsyncClient()
    client.get = lambda *args, **kwargs: AsyncMock(return_value=mock_response())()

    response = await call_get(client, "http://test.com")
    assert response.status_code == 200


@pytest.mark.asyncio
async def test_call_get_error(mock_response):
    """Test GET request with error."""
    client = AsyncClient()
    client.get = lambda *args, **kwargs: AsyncMock(return_value=mock_response(404))()

    with pytest.raises(ToolError) as exc:
        await call_get(client, "http://test.com")
    assert "Resource not found" in str(exc.value)


@pytest.mark.asyncio
async def test_call_post_success(mock_response):
    """Test successful POST request."""
    client = AsyncClient()
    response = mock_response()
    response.json = lambda: {"test": "data"}
    client.post = lambda *args, **kwargs: AsyncMock(return_value=response)()

    response = await call_post(client, "http://test.com", json={"test": "data"})
    assert response.status_code == 200


@pytest.mark.asyncio
async def test_call_post_error(mock_response):
    """Test POST request with error."""
    client = AsyncClient()
    response = mock_response(500)
    response.json = lambda: {"test": "error"}

    client.post = lambda *args, **kwargs: AsyncMock(return_value=response)()

    with pytest.raises(ToolError) as exc:
        await call_post(client, "http://test.com", json={"test": "data"})
    assert "Internal server error" in str(exc.value)


@pytest.mark.asyncio
async def test_call_put_success(mock_response):
    """Test successful PUT request."""
    client = AsyncClient()
    client.put = lambda *args, **kwargs: AsyncMock(return_value=mock_response())()

    response = await call_put(client, "http://test.com", json={"test": "data"})
    assert response.status_code == 200


@pytest.mark.asyncio
async def test_call_put_error(mock_response):
    """Test PUT request with error."""
    client = AsyncClient()
    client.put = lambda *args, **kwargs: AsyncMock(return_value=mock_response(400))()

    with pytest.raises(ToolError) as exc:
        await call_put(client, "http://test.com", json={"test": "data"})
    assert "Invalid request" in str(exc.value)


@pytest.mark.asyncio
async def test_call_delete_success(mock_response):
    """Test successful DELETE request."""
    client = AsyncClient()
    client.delete = lambda *args, **kwargs: AsyncMock(return_value=mock_response())()

    response = await call_delete(client, "http://test.com")
    assert response.status_code == 200


@pytest.mark.asyncio
async def test_call_delete_error(mock_response):
    """Test DELETE request with error."""
    client = AsyncClient()
    client.delete = lambda *args, **kwargs: AsyncMock(return_value=mock_response(403))()

    with pytest.raises(ToolError) as exc:
        await call_delete(client, "http://test.com")
    assert "Access denied" in str(exc.value)


@pytest.mark.asyncio
async def test_call_get_with_params(mock_response):
    """Test GET request with query parameters."""
    client = AsyncClient()
    mock_get = AsyncMock(return_value=mock_response())
    client.get = mock_get

    params = {"key": "value", "test": "data"}
    await call_get(client, "http://test.com", params=params)

    mock_get.assert_called_once()
    call_kwargs = mock_get.call_args[1]
    assert call_kwargs["params"] == params


@pytest.mark.asyncio
async def test_get_error_message():
    """Test the get_error_message function."""

    # Test 400 status code
    message = get_error_message(400, "http://test.com/resource", "GET")
    assert "Invalid request" in message
    assert "resource" in message

    # Test 404 status code
    message = get_error_message(404, "http://test.com/missing", "GET")
    assert "Resource not found" in message
    assert "missing" in message

    # Test 500 status code
    message = get_error_message(500, "http://test.com/server", "POST")
    assert "Internal server error" in message
    assert "server" in message

    # Test URL object handling
    from httpx import URL

    url = URL("http://test.com/complex/path")
    message = get_error_message(403, url, "DELETE")
    assert "Access denied" in message
    assert "path" in message


@pytest.mark.asyncio
async def test_call_post_with_json(mock_response):
    """Test POST request with JSON payload."""
    client = AsyncClient()
    response = mock_response()
    response.json = lambda: {"test": "data"}

    mock_post = AsyncMock(return_value=response)
    client.post = mock_post

    json_data = {"key": "value", "nested": {"test": "data"}}
    await call_post(client, "http://test.com", json=json_data)

    mock_post.assert_called_once()
    call_kwargs = mock_post.call_args[1]
    assert call_kwargs["json"] == json_data

```

--------------------------------------------------------------------------------
/tests/utils/test_frontmatter_obsidian_compatible.py:
--------------------------------------------------------------------------------

```python
"""Tests for Obsidian-compatible YAML frontmatter formatting."""

import frontmatter

from basic_memory.file_utils import dump_frontmatter


def test_tags_formatted_as_yaml_list():
    """Test that tags are formatted as YAML list instead of JSON array."""
    post = frontmatter.Post("Test content")
    post.metadata["title"] = "Test Note"
    post.metadata["type"] = "note"
    post.metadata["tags"] = ["system", "overview", "reference"]

    result = dump_frontmatter(post)

    # Should use YAML list format
    assert "tags:" in result
    assert "- system" in result
    assert "- overview" in result
    assert "- reference" in result

    # Should NOT use JSON array format
    assert '["system"' not in result
    assert '"overview"' not in result
    assert '"reference"]' not in result


def test_empty_tags_list():
    """Test that empty tags list is handled correctly."""
    post = frontmatter.Post("Test content")
    post.metadata["title"] = "Test Note"
    post.metadata["tags"] = []

    result = dump_frontmatter(post)

    # Should have empty list representation
    assert "tags: []" in result


def test_single_tag():
    """Test that single tag is still formatted as list."""
    post = frontmatter.Post("Test content")
    post.metadata["title"] = "Test Note"
    post.metadata["tags"] = ["single-tag"]

    result = dump_frontmatter(post)

    assert "tags:" in result
    assert "- single-tag" in result


def test_no_tags_metadata():
    """Test that posts without tags work normally."""
    post = frontmatter.Post("Test content")
    post.metadata["title"] = "Test Note"
    post.metadata["type"] = "note"

    result = dump_frontmatter(post)

    assert "title: Test Note" in result
    assert "type: note" in result
    assert "tags:" not in result


def test_no_frontmatter():
    """Test that posts with no frontmatter just return content."""
    post = frontmatter.Post("Test content only")

    result = dump_frontmatter(post)

    assert result == "Test content only"


def test_complex_tags_with_special_characters():
    """Test tags with hyphens, underscores, and other valid characters."""
    post = frontmatter.Post("Test content")
    post.metadata["title"] = "Test Note"
    post.metadata["tags"] = ["python-test", "api_integration", "v2.0", "nested/tag"]

    result = dump_frontmatter(post)

    assert "- python-test" in result
    assert "- api_integration" in result
    assert "- v2.0" in result
    assert "- nested/tag" in result


def test_tags_order_preserved():
    """Test that tag order is preserved in output."""
    post = frontmatter.Post("Test content")
    post.metadata["title"] = "Test Note"
    post.metadata["tags"] = ["zebra", "apple", "banana"]

    result = dump_frontmatter(post)

    # Find the positions of each tag in the output
    zebra_pos = result.find("- zebra")
    apple_pos = result.find("- apple")
    banana_pos = result.find("- banana")

    # They should appear in the same order as input
    assert zebra_pos < apple_pos < banana_pos


def test_non_tags_lists_also_formatted():
    """Test that other lists in metadata are also formatted properly."""
    post = frontmatter.Post("Test content")
    post.metadata["title"] = "Test Note"
    post.metadata["authors"] = ["John Doe", "Jane Smith"]
    post.metadata["keywords"] = ["AI", "machine learning"]

    result = dump_frontmatter(post)

    # Authors should be formatted as YAML list
    assert "authors:" in result
    assert "- John Doe" in result
    assert "- Jane Smith" in result

    # Keywords should be formatted as YAML list
    assert "keywords:" in result
    assert "- AI" in result
    assert "- machine learning" in result


def test_mixed_metadata_types():
    """Test that mixed metadata types are handled correctly."""
    post = frontmatter.Post("Test content")
    post.metadata["title"] = "Test Note"
    post.metadata["tags"] = ["tag1", "tag2"]
    post.metadata["created"] = "2024-01-01"
    post.metadata["priority"] = 5
    post.metadata["draft"] = True

    result = dump_frontmatter(post)

    # Lists should use YAML format
    assert "tags:" in result
    assert "- tag1" in result
    assert "- tag2" in result

    # Other types should be normal
    assert "title: Test Note" in result
    assert "created: '2024-01-01'" in result or "created: 2024-01-01" in result
    assert "priority: 5" in result
    assert "draft: true" in result or "draft: True" in result


def test_empty_content():
    """Test posts with empty content but with frontmatter."""
    post = frontmatter.Post("")
    post.metadata["title"] = "Empty Note"
    post.metadata["tags"] = ["empty", "test"]

    result = dump_frontmatter(post)

    # Should have frontmatter delimiter
    assert result.startswith("---")
    assert result.endswith("---\n")

    # Should have proper tag formatting
    assert "- empty" in result
    assert "- test" in result


def test_roundtrip_compatibility():
    """Test that the formatted output can be parsed back by frontmatter."""
    original_post = frontmatter.Post("Test content")
    original_post.metadata["title"] = "Test Note"
    original_post.metadata["tags"] = ["system", "test", "obsidian"]
    original_post.metadata["type"] = "note"

    # Format with our function
    formatted = dump_frontmatter(original_post)

    # Parse it back
    parsed_post = frontmatter.loads(formatted)

    # Should have same content and metadata
    assert parsed_post.content == original_post.content
    assert parsed_post.metadata["title"] == original_post.metadata["title"]
    assert parsed_post.metadata["tags"] == original_post.metadata["tags"]
    assert parsed_post.metadata["type"] == original_post.metadata["type"]

```

--------------------------------------------------------------------------------
/tests/markdown/test_markdown_processor.py:
--------------------------------------------------------------------------------

```python
"""Tests for MarkdownProcessor.

Tests focus on the Read -> Modify -> Write pattern and content preservation.
"""

from datetime import datetime
from pathlib import Path

import pytest

from basic_memory.markdown.markdown_processor import DirtyFileError, MarkdownProcessor
from basic_memory.markdown.schemas import (
    EntityFrontmatter,
    EntityMarkdown,
    Observation,
    Relation,
)


@pytest.mark.asyncio
async def test_write_new_minimal_file(markdown_processor: MarkdownProcessor, tmp_path: Path):
    """Test creating new file with just title."""
    path = tmp_path / "test.md"

    # Create minimal markdown schema
    metadata = {}
    metadata["title"] = "Test Note"
    metadata["type"] = "note"
    metadata["permalink"] = "test"
    metadata["created"] = datetime(2024, 1, 1)
    metadata["modified"] = datetime(2024, 1, 1)
    metadata["tags"] = ["test"]
    markdown = EntityMarkdown(
        frontmatter=EntityFrontmatter(
            metadata=metadata,
        ),
        content="",
    )

    # Write file
    await markdown_processor.write_file(path, markdown)

    # Read back and verify
    content = path.read_text(encoding="utf-8")
    assert "---" in content  # Has frontmatter
    assert "type: note" in content
    assert "permalink: test" in content
    assert "# Test Note" in content  # Added title
    assert "tags:" in content
    assert "- test" in content

    # Should not have empty sections
    assert "## Observations" not in content
    assert "## Relations" not in content


@pytest.mark.asyncio
async def test_write_new_file_with_content(markdown_processor: MarkdownProcessor, tmp_path: Path):
    """Test creating new file with content and sections."""
    path = tmp_path / "test.md"

    # Create markdown with content and sections
    markdown = EntityMarkdown(
        frontmatter=EntityFrontmatter(
            type="note",
            permalink="test",
            title="Test Note",
            created=datetime(2024, 1, 1),
            modified=datetime(2024, 1, 1),
        ),
        content="# Custom Title\n\nMy content here.\nMultiple lines.",
        observations=[
            Observation(
                content="Test observation #test",
                category="tech",
                tags=["test"],
                context="test context",
            ),
        ],
        relations=[
            Relation(
                type="relates_to",
                target="other-note",
                context="test relation",
            ),
        ],
    )

    # Write file
    await markdown_processor.write_file(path, markdown)

    # Read back and verify
    content = path.read_text(encoding="utf-8")

    # Check content preserved exactly
    assert "# Custom Title" in content
    assert "My content here." in content
    assert "Multiple lines." in content

    # Check sections formatted correctly
    assert "- [tech] Test observation #test (test context)" in content
    assert "- relates_to [[other-note]] (test relation)" in content


@pytest.mark.asyncio
async def test_update_preserves_content(markdown_processor: MarkdownProcessor, tmp_path: Path):
    """Test that updating file preserves existing content."""
    path = tmp_path / "test.md"

    # Create initial file
    initial = EntityMarkdown(
        frontmatter=EntityFrontmatter(
            type="note",
            permalink="test",
            title="Test Note",
            created=datetime(2024, 1, 1),
            modified=datetime(2024, 1, 1),
        ),
        content="# My Note\n\nOriginal content here.",
        observations=[
            Observation(content="First observation", category="note"),
        ],
    )

    checksum = await markdown_processor.write_file(path, initial)

    # Update with new observation
    updated = EntityMarkdown(
        frontmatter=initial.frontmatter,
        content=initial.content,  # Preserve original content
        observations=[
            initial.observations[0],  # Keep original observation
            Observation(content="Second observation", category="tech"),  # Add new one
        ],
    )

    # Update file
    await markdown_processor.write_file(path, updated, expected_checksum=checksum)

    # Read back and verify
    result = await markdown_processor.read_file(path)

    # Original content preserved
    assert "Original content here." in result.content

    # Both observations present
    assert len(result.observations) == 2
    assert any(o.content == "First observation" for o in result.observations)
    assert any(o.content == "Second observation" for o in result.observations)


@pytest.mark.asyncio
async def test_dirty_file_detection(markdown_processor: MarkdownProcessor, tmp_path: Path):
    """Test detection of file modifications."""
    path = tmp_path / "test.md"

    # Create initial file
    initial = EntityMarkdown(
        frontmatter=EntityFrontmatter(
            type="note",
            permalink="test",
            title="Test Note",
            created=datetime(2024, 1, 1),
            modified=datetime(2024, 1, 1),
        ),
        content="Initial content",
    )

    checksum = await markdown_processor.write_file(path, initial)

    # Modify file directly
    path.write_text(path.read_text(encoding="utf-8") + "\nModified!")

    # Try to update with old checksum
    update = EntityMarkdown(
        frontmatter=initial.frontmatter,
        content="New content",
    )

    # Should raise DirtyFileError
    with pytest.raises(DirtyFileError):
        await markdown_processor.write_file(path, update, expected_checksum=checksum)

    # Should succeed without checksum
    new_checksum = await markdown_processor.write_file(path, update)
    assert new_checksum != checksum

```

--------------------------------------------------------------------------------
/src/basic_memory/mcp/prompts/utils.py:
--------------------------------------------------------------------------------

```python
"""Utility functions for formatting prompt responses.

These utilities help format data from various tools into consistent,
user-friendly markdown summaries.
"""

from dataclasses import dataclass
from textwrap import dedent
from typing import List

from basic_memory.schemas.base import TimeFrame
from basic_memory.schemas.memory import (
    normalize_memory_url,
    EntitySummary,
    RelationSummary,
    ObservationSummary,
)


@dataclass
class PromptContextItem:
    primary_results: List[EntitySummary]
    related_results: List[EntitySummary | RelationSummary | ObservationSummary]


@dataclass
class PromptContext:
    timeframe: TimeFrame
    topic: str
    results: List[PromptContextItem]


def format_prompt_context(context: PromptContext) -> str:
    """Format continuation context into a helpful summary.
    Returns:
        Formatted continuation summary
    """
    if not context.results:  # pragma: no cover
        return dedent(f"""
            # Continuing conversation on: {context.topic}

            This is a memory retrieval session. 
            The supplied query did not return any information specifically on this topic.
            
            ## Opportunity to Capture New Knowledge!
            
            This is an excellent chance to start documenting this topic:
            
            ```python
            await write_note(
                title="{context.topic}",
                content=f'''
                # {context.topic}
                
                ## Overview
                [Summary of what we know about {context.topic}]
                
                ## Key Points
                [Main aspects or components of {context.topic}]
                
                ## Observations
                - [category] [First important observation about {context.topic}]
                - [category] [Second observation about {context.topic}]
                
                ## Relations
                - relates_to [[Related Topic]]
                - part_of [[Broader Context]]
                '''
            )
            ```
            
            ## Other Options
            
            Please use the available basic-memory tools to gather relevant context before responding.
            You can also:
            - Try a different search term
            - Check recent activity with `recent_activity(timeframe="1w")`
            """)

    # Start building our summary with header - add knowledge capture emphasis
    summary = dedent(f"""
        # Continuing conversation on: {context.topic}

        This is a memory retrieval session. 
        
        Please use the available basic-memory tools to gather relevant context before responding. 
        Start by executing one of the suggested commands below to retrieve content.

        Here's what I found from previous conversations:
        
        > **Knowledge Capture Recommendation:** As you continue this conversation, actively look for opportunities to record new information, decisions, or insights that emerge. Use `write_note()` to document important context.
        """)

    # Track what we've added to avoid duplicates
    added_permalinks = set()
    sections = []

    # Process each context
    for context in context.results:  # pyright: ignore
        for primary in context.primary_results:  # pyright: ignore
            if primary.permalink not in added_permalinks:
                primary_permalink = primary.permalink

                added_permalinks.add(primary_permalink)

                # Use permalink if available, otherwise use file_path
                if primary_permalink:
                    memory_url = normalize_memory_url(primary_permalink)
                    read_command = f'read_note("{primary_permalink}")'
                else:
                    memory_url = f"file://{primary.file_path}"
                    read_command = f'read_file("{primary.file_path}")'

                section = dedent(f"""
                    --- {memory_url}

                    ## {primary.title}
                    - **Type**: {primary.type}
                    """)

                # Add creation date
                section += f"- **Created**: {primary.created_at.strftime('%Y-%m-%d %H:%M')}\n"

                # Add content snippet
                if hasattr(primary, "content") and primary.content:  # pyright: ignore
                    content = primary.content or ""  # pyright: ignore
                    if content:
                        section += f"\n**Excerpt**:\n{content}\n"

                section += dedent(f"""

                    You can read this document with: `{read_command}`
                    """)
                sections.append(section)

        if context.related_results:  # pyright: ignore
            section += dedent(  # pyright: ignore
                """   
                ## Related Context
                """
            )

            for related in context.related_results:  # pyright: ignore
                section_content = dedent(f"""
                    - type: **{related.type}**
                    - title: {related.title}
                    """)
                if related.permalink:  # pragma: no cover
                    section_content += (
                        f'You can view this document with: `read_note("{related.permalink}")`'
                    )
                else:  # pragma: no cover
                    section_content += (
                        f'You can view this file with: `read_file("{related.file_path}")`'
                    )

                section += section_content
                sections.append(section)

    # Add all sections
    summary += "\n".join(sections)
    return summary

```

--------------------------------------------------------------------------------
/tests/services/test_project_removal_bug.py:
--------------------------------------------------------------------------------

```python
"""Test for project removal bug #254."""

import os
import tempfile
from datetime import timezone, datetime
from pathlib import Path

import pytest

from basic_memory.services.project_service import ProjectService


@pytest.mark.asyncio
async def test_remove_project_with_related_entities(project_service: ProjectService):
    """Test removing a project that has related entities (reproduces issue #254).

    This test verifies that projects with related entities (entities, observations, relations)
    can be properly deleted without foreign key constraint violations.

    The bug was caused by missing foreign key constraints with CASCADE DELETE after
    the project table was recreated in migration 647e7a75e2cd.
    """
    test_project_name = f"test-remove-with-entities-{os.urandom(4).hex()}"
    with tempfile.TemporaryDirectory() as temp_dir:
        test_root = Path(temp_dir)
        test_project_path = str(test_root / "test-remove-with-entities")

        # Make sure the test directory exists
        os.makedirs(test_project_path, exist_ok=True)

        try:
            # Step 1: Add the test project
            await project_service.add_project(test_project_name, test_project_path)

            # Verify project exists
            project = await project_service.get_project(test_project_name)
            assert project is not None

            # Step 2: Create related entities for this project
            from basic_memory.repository.entity_repository import EntityRepository

            entity_repo = EntityRepository(
                project_service.repository.session_maker, project_id=project.id
            )

            entity_data = {
                "title": "Test Entity for Deletion",
                "entity_type": "note",
                "content_type": "text/markdown",
                "project_id": project.id,
                "permalink": "test-deletion-entity",
                "file_path": "test-deletion-entity.md",
                "checksum": "test123",
                "created_at": datetime.now(timezone.utc),
                "updated_at": datetime.now(timezone.utc),
            }
            entity = await entity_repo.create(entity_data)
            assert entity is not None

            # Step 3: Create observations for the entity
            from basic_memory.repository.observation_repository import ObservationRepository

            obs_repo = ObservationRepository(
                project_service.repository.session_maker, project_id=project.id
            )

            observation_data = {
                "entity_id": entity.id,
                "content": "This is a test observation",
                "category": "note",
            }
            observation = await obs_repo.create(observation_data)
            assert observation is not None

            # Step 4: Create relations involving the entity
            from basic_memory.repository.relation_repository import RelationRepository

            rel_repo = RelationRepository(
                project_service.repository.session_maker, project_id=project.id
            )

            relation_data = {
                "from_id": entity.id,
                "to_name": "some-target-entity",
                "relation_type": "relates-to",
            }
            relation = await rel_repo.create(relation_data)
            assert relation is not None

            # Step 5: Attempt to remove the project
            # This should work with proper cascade delete, or fail with foreign key constraint
            await project_service.remove_project(test_project_name)

            # Step 6: Verify everything was properly deleted

            # Project should be gone
            removed_project = await project_service.get_project(test_project_name)
            assert removed_project is None, "Project should have been removed"

            # Related entities should be cascade deleted
            remaining_entity = await entity_repo.find_by_id(entity.id)
            assert remaining_entity is None, "Entity should have been cascade deleted"

            # Observations should be cascade deleted
            remaining_obs = await obs_repo.find_by_id(observation.id)
            assert remaining_obs is None, "Observation should have been cascade deleted"

            # Relations should be cascade deleted
            remaining_rel = await rel_repo.find_by_id(relation.id)
            assert remaining_rel is None, "Relation should have been cascade deleted"

        except Exception as e:
            # Check if this is the specific foreign key constraint error from the bug report
            if "FOREIGN KEY constraint failed" in str(e):
                pytest.fail(
                    f"Bug #254 reproduced: {e}. "
                    "This indicates missing foreign key constraints with CASCADE DELETE. "
                    "Run migration a1b2c3d4e5f6_fix_project_foreign_keys.py to fix this."
                )
            else:
                # Re-raise other unexpected errors
                raise e

        finally:
            # Clean up - remove project if it still exists
            if test_project_name in project_service.projects:
                try:
                    await project_service.remove_project(test_project_name)
                except Exception:
                    # Manual cleanup if remove_project fails
                    try:
                        project_service.config_manager.remove_project(test_project_name)
                    except Exception:
                        pass

                    project = await project_service.get_project(test_project_name)
                    if project:
                        await project_service.repository.delete(project.id)

```

--------------------------------------------------------------------------------
/src/basic_memory/importers/claude_conversations_importer.py:
--------------------------------------------------------------------------------

```python
"""Claude conversations import service for Basic Memory."""

import logging
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List

from basic_memory.markdown.schemas import EntityFrontmatter, EntityMarkdown
from basic_memory.importers.base import Importer
from basic_memory.schemas.importer import ChatImportResult
from basic_memory.importers.utils import clean_filename, format_timestamp

logger = logging.getLogger(__name__)


class ClaudeConversationsImporter(Importer[ChatImportResult]):
    """Service for importing Claude conversations."""

    async def import_data(
        self, source_data, destination_folder: str, **kwargs: Any
    ) -> ChatImportResult:
        """Import conversations from Claude JSON export.

        Args:
            source_data: Path to the Claude conversations.json file.
            destination_folder: Destination folder within the project.
            **kwargs: Additional keyword arguments.

        Returns:
            ChatImportResult containing statistics and status of the import.
        """
        try:
            # Ensure the destination folder exists
            folder_path = self.ensure_folder_exists(destination_folder)

            conversations = source_data

            # Process each conversation
            messages_imported = 0
            chats_imported = 0

            for chat in conversations:
                # Convert to entity
                entity = self._format_chat_content(
                    base_path=folder_path,
                    name=chat["name"],
                    messages=chat["chat_messages"],
                    created_at=chat["created_at"],
                    modified_at=chat["updated_at"],
                )

                # Write file
                file_path = self.base_path / Path(f"{entity.frontmatter.metadata['permalink']}.md")
                await self.write_entity(entity, file_path)

                chats_imported += 1
                messages_imported += len(chat["chat_messages"])

            return ChatImportResult(
                import_count={"conversations": chats_imported, "messages": messages_imported},
                success=True,
                conversations=chats_imported,
                messages=messages_imported,
            )

        except Exception as e:  # pragma: no cover
            logger.exception("Failed to import Claude conversations")
            return self.handle_error("Failed to import Claude conversations", e)  # pyright: ignore [reportReturnType]

    def _format_chat_content(
        self,
        base_path: Path,
        name: str,
        messages: List[Dict[str, Any]],
        created_at: str,
        modified_at: str,
    ) -> EntityMarkdown:
        """Convert chat messages to Basic Memory entity format.

        Args:
            base_path: Base path for the entity.
            name: Chat name.
            messages: List of chat messages.
            created_at: Creation timestamp.
            modified_at: Modification timestamp.

        Returns:
            EntityMarkdown instance representing the conversation.
        """
        # Generate permalink
        date_prefix = datetime.fromisoformat(created_at.replace("Z", "+00:00")).strftime("%Y%m%d")
        clean_title = clean_filename(name)
        permalink = f"{base_path.name}/{date_prefix}-{clean_title}"

        # Format content
        content = self._format_chat_markdown(
            name=name,
            messages=messages,
            created_at=created_at,
            modified_at=modified_at,
            permalink=permalink,
        )

        # Create entity
        entity = EntityMarkdown(
            frontmatter=EntityFrontmatter(
                metadata={
                    "type": "conversation",
                    "title": name,
                    "created": created_at,
                    "modified": modified_at,
                    "permalink": permalink,
                }
            ),
            content=content,
        )

        return entity

    def _format_chat_markdown(
        self,
        name: str,
        messages: List[Dict[str, Any]],
        created_at: str,
        modified_at: str,
        permalink: str,
    ) -> str:
        """Format chat as clean markdown.

        Args:
            name: Chat name.
            messages: List of chat messages.
            created_at: Creation timestamp.
            modified_at: Modification timestamp.
            permalink: Permalink for the entity.

        Returns:
            Formatted markdown content.
        """
        # Start with frontmatter and title
        lines = [
            f"# {name}\n",
        ]

        # Add messages
        for msg in messages:
            # Format timestamp
            ts = format_timestamp(msg["created_at"])

            # Add message header
            lines.append(f"### {msg['sender'].title()} ({ts})")

            # Handle message content
            content = msg.get("text", "")
            if msg.get("content"):
                # Filter out None values before joining
                content = " ".join(
                    str(c.get("text", ""))
                    for c in msg["content"]
                    if c and c.get("text") is not None
                )
            lines.append(content)

            # Handle attachments
            attachments = msg.get("attachments", [])
            for attachment in attachments:
                if "file_name" in attachment:
                    lines.append(f"\n**Attachment: {attachment['file_name']}**")
                    if "extracted_content" in attachment:
                        lines.append("```")
                        lines.append(attachment["extracted_content"])
                        lines.append("```")

            # Add spacing between messages
            lines.append("")

        return "\n".join(lines)

```

--------------------------------------------------------------------------------
/tests/sync/test_tmp_files.py:
--------------------------------------------------------------------------------

```python
"""Test proper handling of .tmp files during sync."""

import asyncio
from pathlib import Path

import pytest
from watchfiles import Change


async def create_test_file(path: Path, content: str = "test content") -> None:
    """Create a test file with given content."""
    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(content)


@pytest.mark.asyncio
async def test_temp_file_filter(watch_service, app_config, project_config, test_project):
    """Test that .tmp files are correctly filtered out."""
    # Test filter_changes method directly
    tmp_path = Path(test_project.path) / "test.tmp"
    assert not watch_service.filter_changes(Change.added, str(tmp_path))

    # Test with valid file
    valid_path = Path(test_project.path) / "test.md"
    assert watch_service.filter_changes(Change.added, str(valid_path))


@pytest.mark.asyncio
async def test_handle_tmp_files(watch_service, project_config, test_project, sync_service):
    """Test handling of .tmp files during sync process."""
    project_dir = Path(test_project.path)

    # Create a .tmp file - this simulates a file being written with write_file_atomic
    tmp_file = project_dir / "test.tmp"
    await create_test_file(tmp_file, "This is a temporary file")

    # Create the target final file
    final_file = project_dir / "test.md"
    await create_test_file(final_file, "This is the final file")

    # Setup changes that include both the .tmp and final file
    changes = {
        (Change.added, str(tmp_file)),
        (Change.added, str(final_file)),
    }

    # Handle changes
    await watch_service.handle_changes(test_project, changes)

    # Verify only the final file got an entity
    tmp_entity = await sync_service.entity_repository.get_by_file_path("test.tmp")
    final_entity = await sync_service.entity_repository.get_by_file_path("test.md")

    assert tmp_entity is None, "Temp file should not have an entity"
    assert final_entity is not None, "Final file should have an entity"


@pytest.mark.asyncio
async def test_atomic_write_tmp_file_handling(
    watch_service, project_config, test_project, sync_service
):
    """Test handling of file changes during atomic write operations."""
    project_dir = project_config.home

    # This test simulates the full atomic write process:
    # 1. First a .tmp file is created
    # 2. Then the .tmp file is renamed to the final file
    # 3. Both events are processed by the watch service

    # Setup file paths
    tmp_path = project_dir / "document.tmp"
    final_path = project_dir / "document.md"

    # Create mockup of the atomic write process
    await create_test_file(tmp_path, "Content for document")

    # First batch of changes - .tmp file created
    changes1 = {(Change.added, str(tmp_path))}

    # Process first batch
    await watch_service.handle_changes(test_project, changes1)

    # Now "replace" the temp file with the final file
    tmp_path.rename(final_path)

    # Second batch of changes - .tmp file deleted, final file added
    changes2 = {(Change.deleted, str(tmp_path)), (Change.added, str(final_path))}

    # Process second batch
    await watch_service.handle_changes(test_project, changes2)

    # Verify only the final file is in the database
    tmp_entity = await sync_service.entity_repository.get_by_file_path("document.tmp")
    final_entity = await sync_service.entity_repository.get_by_file_path("document.md")

    assert tmp_entity is None, "Temp file should not have an entity"
    assert final_entity is not None, "Final file should have an entity"

    # Check events
    new_events = [e for e in watch_service.state.recent_events if e.action == "new"]
    assert len(new_events) == 1
    assert new_events[0].path == "document.md"


@pytest.mark.asyncio
async def test_rapid_atomic_writes(watch_service, project_config, test_project, sync_service):
    """Test handling of rapid atomic writes to the same destination."""
    project_dir = Path(test_project.path)

    # This test simulates multiple rapid atomic writes to the same file:
    # 1. Several .tmp files are created one after another
    # 2. Each is then renamed to the same final file
    # 3. Events are batched and processed together

    # Setup file paths
    tmp1_path = project_dir / "document.1.tmp"
    tmp2_path = project_dir / "document.2.tmp"
    final_path = project_dir / "document.md"

    # Create multiple temp files that will be used in sequence
    await create_test_file(tmp1_path, "First version")
    await create_test_file(tmp2_path, "Second version")

    # Simulate the first atomic write
    tmp1_path.replace(final_path)

    # Brief pause to ensure file system registers the change
    await asyncio.sleep(0.1)

    # Read content to verify
    content1 = final_path.read_text(encoding="utf-8")
    assert content1 == "First version"

    # Simulate the second atomic write
    tmp2_path.replace(final_path)

    # Verify content was updated
    content2 = final_path.read_text(encoding="utf-8")
    assert content2 == "Second version"

    # Create a batch of changes that might arrive in mixed order
    changes = {
        (Change.added, str(tmp1_path)),
        (Change.deleted, str(tmp1_path)),
        (Change.added, str(tmp2_path)),
        (Change.deleted, str(tmp2_path)),
        (Change.added, str(final_path)),
        (Change.modified, str(final_path)),
    }

    # Process all changes
    await watch_service.handle_changes(test_project, changes)

    # Verify only the final file is in the database
    final_entity = await sync_service.entity_repository.get_by_file_path("document.md")
    assert final_entity is not None

    # Also verify no tmp entities were created
    tmp1_entity = await sync_service.entity_repository.get_by_file_path("document.1.tmp")
    tmp2_entity = await sync_service.entity_repository.get_by_file_path("document.2.tmp")
    assert tmp1_entity is None
    assert tmp2_entity is None

```

--------------------------------------------------------------------------------
/src/basic_memory/mcp/tools/list_directory.py:
--------------------------------------------------------------------------------

```python
"""List directory tool for Basic Memory MCP server."""

from typing import Optional

from loguru import logger
from fastmcp import Context

from basic_memory.mcp.async_client import get_client
from basic_memory.mcp.project_context import get_active_project
from basic_memory.mcp.server import mcp
from basic_memory.mcp.tools.utils import call_get


@mcp.tool(
    description="List directory contents with filtering and depth control.",
)
async def list_directory(
    dir_name: str = "/",
    depth: int = 1,
    file_name_glob: Optional[str] = None,
    project: Optional[str] = None,
    context: Context | None = None,
) -> str:
    """List directory contents from the knowledge base with optional filtering.

    This tool provides 'ls' functionality for browsing the knowledge base directory structure.
    It can list immediate children or recursively explore subdirectories with depth control,
    and supports glob pattern filtering for finding specific files.

    Args:
        dir_name: Directory path to list (default: root "/")
                 Examples: "/", "/projects", "/research/ml"
        depth: Recursion depth (1-10, default: 1 for immediate children only)
               Higher values show subdirectory contents recursively
        file_name_glob: Optional glob pattern for filtering file names
                       Examples: "*.md", "*meeting*", "project_*"
        project: Project name to list directory from. Optional - server will resolve using hierarchy.
                If unknown, use list_memory_projects() to discover available projects.
        context: Optional FastMCP context for performance caching.

    Returns:
        Formatted listing of directory contents with file metadata

    Examples:
        # List root directory contents
        list_directory()

        # List specific folder
        list_directory(dir_name="/projects")

        # Find all markdown files
        list_directory(file_name_glob="*.md")

        # Deep exploration of research folder
        list_directory(dir_name="/research", depth=3)

        # Find meeting notes in projects folder
        list_directory(dir_name="/projects", file_name_glob="*meeting*")

        # Explicit project specification
        list_directory(project="work-docs", dir_name="/projects")

    Raises:
        ToolError: If project doesn't exist or directory path is invalid
    """
    async with get_client() as client:
        active_project = await get_active_project(client, project, context)
        project_url = active_project.project_url

        # Prepare query parameters
        params = {
            "dir_name": dir_name,
            "depth": str(depth),
        }
        if file_name_glob:
            params["file_name_glob"] = file_name_glob

        logger.debug(
            f"Listing directory '{dir_name}' in project {project} with depth={depth}, glob='{file_name_glob}'"
        )

        # Call the API endpoint
        response = await call_get(
            client,
            f"{project_url}/directory/list",
            params=params,
        )

        nodes = response.json()

        if not nodes:
            filter_desc = ""
            if file_name_glob:
                filter_desc = f" matching '{file_name_glob}'"
            return f"No files found in directory '{dir_name}'{filter_desc}"

        # Format the results
        output_lines = []
        if file_name_glob:
            output_lines.append(
                f"Files in '{dir_name}' matching '{file_name_glob}' (depth {depth}):"
            )
        else:
            output_lines.append(f"Contents of '{dir_name}' (depth {depth}):")
        output_lines.append("")

        # Group by type and sort
        directories = [n for n in nodes if n["type"] == "directory"]
        files = [n for n in nodes if n["type"] == "file"]

        # Sort by name
        directories.sort(key=lambda x: x["name"])
        files.sort(key=lambda x: x["name"])

        # Display directories first
        for node in directories:
            path_display = node["directory_path"]
            output_lines.append(f"📁 {node['name']:<30} {path_display}")

        # Add separator if we have both directories and files
        if directories and files:
            output_lines.append("")

        # Display files with metadata
        for node in files:
            path_display = node["directory_path"]
            title = node.get("title", "")
            updated = node.get("updated_at", "")

            # Remove leading slash if present, requesting the file via read_note does not use the beginning slash'
            if path_display.startswith("/"):
                path_display = path_display[1:]

            # Format date if available
            date_str = ""
            if updated:
                try:
                    from datetime import datetime

                    dt = datetime.fromisoformat(updated.replace("Z", "+00:00"))
                    date_str = dt.strftime("%Y-%m-%d")
                except Exception:  # pragma: no cover
                    date_str = updated[:10] if len(updated) >= 10 else ""

            # Create formatted line
            file_line = f"📄 {node['name']:<30} {path_display}"
            if title and title != node["name"]:
                file_line += f" | {title}"
            if date_str:
                file_line += f" | {date_str}"

            output_lines.append(file_line)

        # Add summary
        output_lines.append("")
        total_count = len(directories) + len(files)
        summary_parts = []
        if directories:
            summary_parts.append(
                f"{len(directories)} director{'y' if len(directories) == 1 else 'ies'}"
            )
        if files:
            summary_parts.append(f"{len(files)} file{'s' if len(files) != 1 else ''}")

        output_lines.append(f"Total: {total_count} items ({', '.join(summary_parts)})")

        return "\n".join(output_lines)

```

--------------------------------------------------------------------------------
/.claude/agents/system-architect.md:
--------------------------------------------------------------------------------

```markdown
---
name: system-architect
description: System architect who designs and implements architectural solutions, creates ADRs, and applies software engineering principles to solve complex system design problems.
model: sonnet
color: blue
---

You are a Senior System Architect who designs and implements architectural solutions for complex software systems. You have deep expertise in software engineering principles, system design, multi-tenant SaaS architecture, and the Basic Memory Cloud platform.

**Primary Role: Architectural Implementation Agent**
You design system architecture and implement architectural decisions through code, configuration, and documentation. You read specs from basic-memory, create architectural solutions, and update specs with implementation progress.

**Core Responsibilities:**

**Specification Implementation:**
- Read architectural specs using basic-memory MCP tools
- Design and implement system architecture solutions
- Create code scaffolding, service structure, and system interfaces
- Update specs with architectural decisions and implementation status
- Document ADRs (Architectural Decision Records) for significant choices

**Architectural Design & Implementation:**
- Design multi-service system architectures
- Implement service boundaries and communication patterns
- Create database schemas and migration strategies
- Design authentication and authorization systems
- Implement infrastructure-as-code patterns

**System Implementation Process:**
1. **Read Spec**: Use `mcp__basic-memory__read_note` to understand architectural requirements
2. **Design Solution**: Apply architectural principles and patterns
3. **Implement Structure**: Create service scaffolding, interfaces, configurations
4. **Document Decisions**: Create ADRs documenting architectural choices
5. **Update Spec**: Record implementation progress and decisions
6. **Validate**: Ensure implementation meets spec success criteria

**Architectural Principles Applied:**
- DRY (Don't Repeat Yourself) - Single sources of truth
- KISS (Keep It Simple Stupid) - Favor simplicity over cleverness
- YAGNI (You Aren't Gonna Need It) - Build only what's needed now
- Principle of Least Astonishment - Intuitive system behavior
- Separation of Concerns - Clear boundaries and responsibilities

**Basic Memory Cloud Expertise:**

**Multi-Service Architecture:**
- **Cloud Service**: Tenant management, OAuth 2.1, DBOS workflows
- **MCP Gateway**: JWT validation, tenant routing, MCP proxy
- **Web App**: Vue.js frontend, OAuth flows, user interface
- **API Service**: Per-tenant Basic Memory instances with MCP

**Multi-Tenant SaaS Patterns:**
- **Tenant Isolation**: Infrastructure-level isolation with dedicated instances
- **Database-per-tenant**: Isolated PostgreSQL databases
- **Authentication**: JWT tokens with tenant-specific claims
- **Provisioning**: DBOS workflows for durable operations
- **Resource Management**: Fly.io machine lifecycle management

**Implementation Capabilities:**
- FastAPI service structure and middleware
- DBOS workflow implementation
- Database schema design and migrations
- JWT authentication and authorization
- Fly.io deployment configuration
- Service communication patterns

**Technical Implementation:**
- Create service scaffolding and project structure
- Implement authentication and authorization middleware
- Design database schemas and relationships
- Configure deployment and infrastructure
- Implement monitoring and health checks
- Create API interfaces and contracts

**Code Quality Standards:**
- Follow established patterns and conventions
- Implement proper error handling and logging
- Design for scalability and maintainability
- Apply security best practices
- Create comprehensive tests for architectural components
- Document system behavior and interfaces

**Decision Documentation:**
- Create ADRs for significant architectural choices
- Document trade-offs and alternative approaches considered
- Maintain decision history and rationale
- Link architectural decisions to implementation code
- Update decisions when new information becomes available

**Basic Memory Integration:**
- Use `mcp__basic-memory__read_note` to read architectural specs
- Use `mcp__basic-memory__write_note` to create ADRs and architectural documentation
- Use `mcp__basic-memory__edit_note` to update specs with implementation progress
- Document architectural patterns and anti-patterns for reuse
- Maintain searchable knowledge base of system design decisions

**Communication Style:**
- Focus on implemented solutions and concrete architectural artifacts
- Document decisions with clear rationale and trade-offs
- Provide specific implementation guidance and code examples
- Ask targeted questions about requirements and constraints
- Explain architectural choices in terms of business and technical impact

**Deliverables:**
- Working system architecture implementations
- ADRs documenting architectural decisions
- Service scaffolding and interface definitions
- Database schemas and migration scripts
- Configuration and deployment artifacts
- Updated specifications with implementation status

**Anti-Patterns to Avoid:**
- Premature optimization over correctness
- Over-engineering for current needs
- Building without clear requirements
- Creating multiple sources of truth
- Implementing solutions without understanding root causes

**Key Principles:**
- Implement architectural decisions through working code
- Document all significant decisions and trade-offs
- Build systems that teams can understand and maintain
- Apply proven patterns and avoid reinventing solutions
- Balance current needs with long-term maintainability

When handed an architectural specification via `/spec implement`, you will read the spec, design the solution applying architectural principles, implement the necessary code and configuration, document decisions through ADRs, and update the spec with completion status and architectural notes.
```

--------------------------------------------------------------------------------
/tests/utils/test_file_utils.py:
--------------------------------------------------------------------------------

```python
"""Tests for file utilities."""

from pathlib import Path

import pytest
import random
import string

from basic_memory.file_utils import (
    FileError,
    FileWriteError,
    ParseError,
    compute_checksum,
    has_frontmatter,
    parse_frontmatter,
    remove_frontmatter,
    sanitize_for_filename,
    sanitize_for_folder,
    write_file_atomic,
)


def get_random_word(length: int = 12, necessary_char: str | None = None) -> str:
    letters = string.ascii_lowercase
    word_chars = [random.choice(letters) for i in range(length)]

    if necessary_char and length > 0:
        # Replace a character at a random position with the necessary character
        random_pos = random.randint(0, length - 1)
        word_chars[random_pos] = necessary_char

    return "".join(word_chars)


@pytest.mark.asyncio
async def test_compute_checksum():
    """Test checksum computation."""
    content = "test content"
    checksum = await compute_checksum(content)
    assert isinstance(checksum, str)
    assert len(checksum) == 64  # SHA-256 produces 64 char hex string


@pytest.mark.asyncio
async def test_compute_checksum_error():
    """Test checksum error handling."""
    with pytest.raises(FileError):
        # Try to hash an object that can't be encoded
        await compute_checksum(object())  # pyright: ignore [reportArgumentType]


@pytest.mark.asyncio
async def test_write_file_atomic(tmp_path: Path):
    """Test atomic file writing."""
    test_file = tmp_path / "test.txt"
    content = "test content"

    await write_file_atomic(test_file, content)
    assert test_file.exists()
    assert test_file.read_text(encoding="utf-8") == content

    # Temp file should be cleaned up
    assert not test_file.with_suffix(".tmp").exists()


@pytest.mark.asyncio
async def test_write_file_atomic_error(tmp_path: Path):
    """Test atomic write error handling."""
    # Try to write to a directory that doesn't exist
    test_file = tmp_path / "nonexistent" / "test.txt"

    with pytest.raises(FileWriteError):
        await write_file_atomic(test_file, "test content")


def test_has_frontmatter():
    """Test frontmatter detection."""
    # Valid frontmatter
    assert has_frontmatter("""---
title: Test
---
content""")

    # Just content
    assert not has_frontmatter("Just content")

    # Empty content
    assert not has_frontmatter("")

    # Just delimiter
    assert not has_frontmatter("---")

    # Delimiter not at start
    assert not has_frontmatter("""
Some text
---
title: Test
---""")

    # Invalid format
    assert not has_frontmatter("--title: test--")


def test_parse_frontmatter():
    """Test parsing frontmatter."""
    # Valid frontmatter
    content = """---
title: Test
tags:
  - a
  - b
---
content"""

    result = parse_frontmatter(content)
    assert result == {"title": "Test", "tags": ["a", "b"]}

    # Empty frontmatter
    content = """---
---
content"""
    result = parse_frontmatter(content)
    assert result == {} or result == {}  # Handle both None and empty dict cases

    # Invalid YAML syntax
    with pytest.raises(ParseError) as exc:
        parse_frontmatter("""---
[: invalid yaml syntax :]
---
content""")
    assert "Invalid YAML in frontmatter" in str(exc.value)

    # Non-dict YAML content
    with pytest.raises(ParseError) as exc:
        parse_frontmatter("""---
- just
- a
- list
---
content""")
    assert "Frontmatter must be a YAML dictionary" in str(exc.value)

    # No frontmatter
    with pytest.raises(ParseError):
        parse_frontmatter("Just content")

    # Incomplete frontmatter
    with pytest.raises(ParseError):
        parse_frontmatter("""---
title: Test""")


def test_remove_frontmatter():
    """Test removing frontmatter."""
    # With frontmatter
    content = """---
title: Test
---
test content"""
    assert remove_frontmatter(content) == "test content"

    # No frontmatter
    content = "test content"
    assert remove_frontmatter(content) == "test content"

    # Only frontmatter
    content = """---
title: Test
---
"""
    assert remove_frontmatter(content) == ""

    # Invalid frontmatter - missing closing delimiter
    with pytest.raises(ParseError) as exc:
        remove_frontmatter("""---
title: Test""")
    assert "Invalid frontmatter format" in str(exc.value)


@pytest.mark.asyncio
def test_sanitize_for_filename_removes_invalid_characters():
    # Test all invalid characters listed in the regex
    invalid_chars = '<>:"|?*'

    # All invalid characters should be replaced
    for char in invalid_chars:
        text = get_random_word(length=12, necessary_char=char)
        sanitized_text = sanitize_for_filename(text)

        assert char not in sanitized_text


@pytest.mark.parametrize(
    "input_folder,expected",
    [
        ("", ""),  # Empty string
        ("   ", ""),  # Whitespace only
        ("my-folder", "my-folder"),  # Simple folder
        ("my/folder", "my/folder"),  # Nested folder
        ("my//folder", "my/folder"),  # Double slash compressed
        ("my\\\\folder", "my/folder"),  # Windows-style double backslash compressed
        ("my/folder/", "my/folder"),  # Trailing slash removed
        ("/my/folder", "my/folder"),  # Leading slash removed
        ("./my/folder", "my/folder"),  # Leading ./ removed
        ("my<>folder", "myfolder"),  # Special chars removed
        ("my:folder|test", "myfoldertest"),  # More special chars removed
        ("my_folder-1", "my_folder-1"),  # Allowed chars preserved
        ("my folder", "my folder"),  # Space preserved
        ("my/folder//sub//", "my/folder/sub"),  # Multiple compressions and trims
        ("my\\folder\\sub", "my/folder/sub"),  # Windows-style separators normalized
        ("my/folder<>:|?*sub", "my/foldersub"),  # All invalid chars removed
        ("////my////folder////", "my/folder"),  # Excessive leading/trailing/multiple slashes
    ],
)
def test_sanitize_for_folder_edge_cases(input_folder, expected):
    assert sanitize_for_folder(input_folder) == expected

```

--------------------------------------------------------------------------------
/tests/repository/test_repository.py:
--------------------------------------------------------------------------------

```python
"""Test repository implementation."""

from datetime import datetime
import pytest
from sqlalchemy import String, DateTime
from sqlalchemy.orm import Mapped, mapped_column

from basic_memory.models import Base
from basic_memory.repository.repository import Repository


class ModelTest(Base):
    """Test model for repository tests."""

    __tablename__ = "test_model"

    id: Mapped[str] = mapped_column(String(255), primary_key=True)
    name: Mapped[str] = mapped_column(String(255))
    description: Mapped[str | None] = mapped_column(String(255), nullable=True)
    created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow)
    updated_at: Mapped[datetime] = mapped_column(
        DateTime, default=datetime.utcnow, onupdate=datetime.utcnow
    )


@pytest.fixture
def repository(session_maker):
    """Create a test repository."""
    return Repository(session_maker, ModelTest)


@pytest.mark.asyncio
async def test_add(repository):
    """Test bulk creation of entities."""
    # Create test instances
    instance = ModelTest(id="test_add", name="Test Add")
    await repository.add(instance)

    # Verify we can find in db
    found = await repository.find_by_id("test_add")
    assert found is not None
    assert found.name == "Test Add"


@pytest.mark.asyncio
async def test_add_all(repository):
    """Test bulk creation of entities."""
    # Create test instances
    instances = [ModelTest(id=f"test_{i}", name=f"Test {i}") for i in range(3)]
    await repository.add_all(instances)

    # Verify we can find them in db
    found = await repository.find_by_id("test_0")
    assert found is not None
    assert found.name == "Test 0"


@pytest.mark.asyncio
async def test_bulk_create(repository):
    """Test bulk creation of entities."""
    # Create test instances
    instances = [ModelTest(id=f"test_{i}", name=f"Test {i}") for i in range(3)]

    # Bulk create
    await repository.create_all([instance.__dict__ for instance in instances])

    # Verify we can find them in db
    found = await repository.find_by_id("test_0")
    assert found is not None
    assert found.name == "Test 0"


@pytest.mark.asyncio
async def test_find_all(repository):
    """Test finding multiple entities by IDs."""
    # Create test data
    instances = [ModelTest(id=f"test_{i}", name=f"Test {i}") for i in range(5)]
    await repository.create_all([instance.__dict__ for instance in instances])

    found = await repository.find_all(limit=3)
    assert len(found) == 3


@pytest.mark.asyncio
async def test_find_by_ids(repository):
    """Test finding multiple entities by IDs."""
    # Create test data
    instances = [ModelTest(id=f"test_{i}", name=f"Test {i}") for i in range(5)]
    await repository.create_all([instance.__dict__ for instance in instances])

    # Test finding subset of entities
    ids_to_find = ["test_0", "test_2", "test_4"]
    found = await repository.find_by_ids(ids_to_find)
    assert len(found) == 3
    assert sorted([e.id for e in found]) == sorted(ids_to_find)

    # Test finding with some non-existent IDs
    mixed_ids = ["test_0", "nonexistent", "test_4"]
    partial_found = await repository.find_by_ids(mixed_ids)
    assert len(partial_found) == 2
    assert sorted([e.id for e in partial_found]) == ["test_0", "test_4"]

    # Test with empty list
    empty_found = await repository.find_by_ids([])
    assert len(empty_found) == 0

    # Test with all non-existent IDs
    not_found = await repository.find_by_ids(["fake1", "fake2"])
    assert len(not_found) == 0


@pytest.mark.asyncio
async def test_delete_by_ids(repository):
    """Test finding multiple entities by IDs."""
    # Create test data
    instances = [ModelTest(id=f"test_{i}", name=f"Test {i}") for i in range(5)]
    await repository.create_all([instance.__dict__ for instance in instances])

    # Test delete subset of entities
    ids_to_delete = ["test_0", "test_2", "test_4"]
    deleted_count = await repository.delete_by_ids(ids_to_delete)
    assert deleted_count == 3

    # Test finding subset of entities
    ids_to_find = ["test_1", "test_3"]
    found = await repository.find_by_ids(ids_to_find)
    assert len(found) == 2
    assert sorted([e.id for e in found]) == sorted(ids_to_find)

    assert await repository.find_by_id(ids_to_delete[0]) is None
    assert await repository.find_by_id(ids_to_delete[1]) is None
    assert await repository.find_by_id(ids_to_delete[2]) is None


@pytest.mark.asyncio
async def test_update(repository):
    """Test finding entities modified since a timestamp."""
    # Create initial test data
    instance = ModelTest(id="test_add", name="Test Add")
    await repository.add(instance)

    instance = ModelTest(id="test_add", name="Updated")

    # Find recently modified
    modified = await repository.update(instance.id, {"name": "Updated"})
    assert modified is not None
    assert modified.name == "Updated"


@pytest.mark.asyncio
async def test_update_model(repository):
    """Test finding entities modified since a timestamp."""
    # Create initial test data
    instance = ModelTest(id="test_add", name="Test Add")
    await repository.add(instance)

    instance.name = "Updated"

    # Find recently modified
    modified = await repository.update(instance.id, instance)
    assert modified is not None
    assert modified.name == "Updated"


@pytest.mark.asyncio
async def test_update_model_not_found(repository):
    """Test finding entities modified since a timestamp."""
    # Create initial test data
    instance = ModelTest(id="test_add", name="Test Add")
    await repository.add(instance)

    modified = await repository.update(0, {})
    assert modified is None


@pytest.mark.asyncio
async def test_count(repository):
    """Test bulk creation of entities."""
    # Create test instances
    instance = ModelTest(id="test_add", name="Test Add")
    await repository.add(instance)

    # Verify we can count in db
    count = await repository.count()
    assert count == 1

```

--------------------------------------------------------------------------------
/test-int/mcp/test_build_context_underscore.py:
--------------------------------------------------------------------------------

```python
"""Integration test for build_context with underscore in memory:// URLs."""

import pytest
from fastmcp import Client


@pytest.mark.asyncio
async def test_build_context_underscore_normalization(mcp_server, app, test_project):
    """Test that build_context normalizes underscores in relation types."""

    async with Client(mcp_server) as client:
        # Create parent note
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Parent Entity",
                "folder": "testing",
                "content": "# Parent Entity\n\nMain entity for testing underscore relations.",
                "tags": "test,parent",
            },
        )

        # Create child notes with different relation formats
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Child with Underscore",
                "folder": "testing",
                "content": """# Child with Underscore

- part_of [[Parent Entity]]
- related_to [[Parent Entity]]
                """,
                "tags": "test,child",
            },
        )

        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Child with Hyphen",
                "folder": "testing",
                "content": """# Child with Hyphen

- part-of [[Parent Entity]]
- related-to [[Parent Entity]]
                """,
                "tags": "test,child",
            },
        )

        # Test 1: Search with underscore format should return results
        # Relation permalinks are: source/relation_type/target
        # So child-with-underscore/part-of/parent-entity
        result_underscore = await client.call_tool(
            "build_context",
            {
                "project": test_project.name,
                "url": "memory://testing/*/part_of/*parent*",  # Using underscore
            },
        )

        # Parse response
        assert len(result_underscore.content) == 1
        response_text = result_underscore.content[0].text  # pyright: ignore
        assert '"results"' in response_text

        # Both relations should be found since they both connect to parent-entity
        # The system should normalize the underscore to hyphen internally
        assert "part-of" in response_text.lower()

        # Test 2: Search with hyphen format should also return results
        result_hyphen = await client.call_tool(
            "build_context",
            {
                "project": test_project.name,
                "url": "memory://testing/*/part-of/*parent*",  # Using hyphen
            },
        )

        response_text_hyphen = result_hyphen.content[0].text  # pyright: ignore
        assert '"results"' in response_text_hyphen
        assert "part-of" in response_text_hyphen.lower()

        # Test 3: Test with related_to/related-to as well
        result_related = await client.call_tool(
            "build_context",
            {
                "project": test_project.name,
                "url": "memory://testing/*/related_to/*parent*",  # Using underscore
            },
        )

        response_text_related = result_related.content[0].text  # pyright: ignore
        assert '"results"' in response_text_related
        assert "related-to" in response_text_related.lower()

        # Test 4: Test exact path (non-wildcard) with underscore
        # Exact relation permalink would be child/relation/target
        result_exact = await client.call_tool(
            "build_context",
            {
                "project": test_project.name,
                "url": "memory://testing/child-with-underscore/part_of/testing/parent-entity",
            },
        )

        response_text_exact = result_exact.content[0].text  # pyright: ignore
        assert '"results"' in response_text_exact
        assert "part-of" in response_text_exact.lower()


@pytest.mark.asyncio
async def test_build_context_complex_underscore_paths(mcp_server, app, test_project):
    """Test build_context with complex paths containing underscores."""

    async with Client(mcp_server) as client:
        # Create notes with underscores in titles and relations
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "workflow_manager_agent",
                "folder": "specs",
                "content": """# Workflow Manager Agent

Specification for the workflow manager agent.
                """,
                "tags": "spec,workflow",
            },
        )

        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "task_parser",
                "folder": "components",
                "content": """# Task Parser

- part_of [[workflow_manager_agent]]
- implements_for [[workflow_manager_agent]]
                """,
                "tags": "component,parser",
            },
        )

        # Test with underscores in all parts of the path
        # Relations are created as: task-parser/part-of/workflow-manager-agent
        # So search for */part_of/* or */part-of/* to find them
        test_cases = [
            "memory://components/*/part_of/*workflow*",
            "memory://components/*/part-of/*workflow*",
            "memory://*/task*/part_of/*",
            "memory://*/task*/part-of/*",
        ]

        for url in test_cases:
            result = await client.call_tool(
                "build_context", {"project": test_project.name, "url": url}
            )

            # All variations should work and find the related content
            assert len(result.content) == 1
            response = result.content[0].text  # pyright: ignore
            assert '"results"' in response
            # The relation should be found showing part-of connection
            assert "part-of" in response.lower(), f"Failed for URL: {url}"

```

--------------------------------------------------------------------------------
/tests/test_db_migration_deduplication.py:
--------------------------------------------------------------------------------

```python
"""Tests for database migration deduplication functionality."""

import pytest
from unittest.mock import patch, AsyncMock, MagicMock

from basic_memory import db


@pytest.fixture
def mock_alembic_config():
    """Mock Alembic config to avoid actual migration runs."""
    with patch("basic_memory.db.Config") as mock_config_class:
        mock_config = MagicMock()
        mock_config_class.return_value = mock_config
        yield mock_config


@pytest.fixture
def mock_alembic_command():
    """Mock Alembic command to avoid actual migration runs."""
    with patch("basic_memory.db.command") as mock_command:
        yield mock_command


@pytest.fixture
def mock_search_repository():
    """Mock SearchRepository to avoid database dependencies."""
    with patch("basic_memory.db.SearchRepository") as mock_repo_class:
        mock_repo = AsyncMock()
        mock_repo_class.return_value = mock_repo
        yield mock_repo


# Use the app_config fixture from conftest.py


@pytest.mark.asyncio
async def test_migration_deduplication_single_call(
    app_config, mock_alembic_config, mock_alembic_command, mock_search_repository
):
    """Test that migrations are only run once when called multiple times."""
    # Reset module state
    db._migrations_completed = False
    db._engine = None
    db._session_maker = None

    # First call should run migrations
    await db.run_migrations(app_config)

    # Verify migrations were called
    mock_alembic_command.upgrade.assert_called_once_with(mock_alembic_config, "head")
    mock_search_repository.init_search_index.assert_called_once()

    # Reset mocks for second call
    mock_alembic_command.reset_mock()
    mock_search_repository.reset_mock()

    # Second call should skip migrations
    await db.run_migrations(app_config)

    # Verify migrations were NOT called again
    mock_alembic_command.upgrade.assert_not_called()
    mock_search_repository.init_search_index.assert_not_called()


@pytest.mark.asyncio
async def test_migration_force_parameter(
    app_config, mock_alembic_config, mock_alembic_command, mock_search_repository
):
    """Test that migrations can be forced to run even if already completed."""
    # Reset module state
    db._migrations_completed = False
    db._engine = None
    db._session_maker = None

    # First call should run migrations
    await db.run_migrations(app_config)

    # Verify migrations were called
    mock_alembic_command.upgrade.assert_called_once_with(mock_alembic_config, "head")
    mock_search_repository.init_search_index.assert_called_once()

    # Reset mocks for forced call
    mock_alembic_command.reset_mock()
    mock_search_repository.reset_mock()

    # Forced call should run migrations again
    await db.run_migrations(app_config, force=True)

    # Verify migrations were called again
    mock_alembic_command.upgrade.assert_called_once_with(mock_alembic_config, "head")
    mock_search_repository.init_search_index.assert_called_once()


@pytest.mark.asyncio
async def test_migration_state_reset_on_shutdown():
    """Test that migration state is reset when database is shut down."""
    # Set up completed state
    db._migrations_completed = True
    db._engine = AsyncMock()
    db._session_maker = AsyncMock()

    # Shutdown should reset state
    await db.shutdown_db()

    # Verify state was reset
    assert db._migrations_completed is False
    assert db._engine is None
    assert db._session_maker is None


@pytest.mark.asyncio
async def test_get_or_create_db_runs_migrations_automatically(
    app_config, mock_alembic_config, mock_alembic_command, mock_search_repository
):
    """Test that get_or_create_db runs migrations automatically."""
    # Reset module state
    db._migrations_completed = False
    db._engine = None
    db._session_maker = None

    # First call should create engine and run migrations
    engine, session_maker = await db.get_or_create_db(app_config.database_path)

    # Verify we got valid objects
    assert engine is not None
    assert session_maker is not None

    # Verify migrations were called
    mock_alembic_command.upgrade.assert_called_once_with(mock_alembic_config, "head")
    mock_search_repository.init_search_index.assert_called_once()


@pytest.mark.asyncio
async def test_get_or_create_db_skips_migrations_when_disabled(
    app_config, mock_alembic_config, mock_alembic_command, mock_search_repository
):
    """Test that get_or_create_db can skip migrations when ensure_migrations=False."""
    # Reset module state
    db._migrations_completed = False
    db._engine = None
    db._session_maker = None

    # Call with ensure_migrations=False should skip migrations
    engine, session_maker = await db.get_or_create_db(
        app_config.database_path, ensure_migrations=False
    )

    # Verify we got valid objects
    assert engine is not None
    assert session_maker is not None

    # Verify migrations were NOT called
    mock_alembic_command.upgrade.assert_not_called()
    mock_search_repository.init_search_index.assert_not_called()


@pytest.mark.asyncio
async def test_multiple_get_or_create_db_calls_deduplicated(
    app_config, mock_alembic_config, mock_alembic_command, mock_search_repository
):
    """Test that multiple get_or_create_db calls only run migrations once."""
    # Reset module state
    db._migrations_completed = False
    db._engine = None
    db._session_maker = None

    # First call should create engine and run migrations
    await db.get_or_create_db(app_config.database_path)

    # Verify migrations were called
    mock_alembic_command.upgrade.assert_called_once_with(mock_alembic_config, "head")
    mock_search_repository.init_search_index.assert_called_once()

    # Reset mocks for subsequent calls
    mock_alembic_command.reset_mock()
    mock_search_repository.reset_mock()

    # Subsequent calls should not run migrations again
    await db.get_or_create_db(app_config.database_path)
    await db.get_or_create_db(app_config.database_path)

    # Verify migrations were NOT called again
    mock_alembic_command.upgrade.assert_not_called()
    mock_search_repository.init_search_index.assert_not_called()

```

--------------------------------------------------------------------------------
/tests/api/test_management_router.py:
--------------------------------------------------------------------------------

```python
"""Tests for management router API endpoints."""

from unittest.mock import AsyncMock, MagicMock, patch

import pytest
from fastapi import FastAPI

from basic_memory.api.routers.management_router import (
    WatchStatusResponse,
    get_watch_status,
    start_watch_service,
    stop_watch_service,
)


class MockRequest:
    """Mock FastAPI request with app state."""

    def __init__(self, app):
        self.app = app


@pytest.fixture
def mock_app():
    """Create a mock FastAPI app with state."""
    app = MagicMock(spec=FastAPI)
    app.state = MagicMock()
    app.state.watch_task = None
    return app


@pytest.mark.asyncio
async def test_get_watch_status_not_running(mock_app):
    """Test getting watch status when watch service is not running."""
    # Set up app state
    mock_app.state.watch_task = None

    # Create mock request
    mock_request = MockRequest(mock_app)

    # Call endpoint directly
    response = await get_watch_status(mock_request)

    # Verify response
    assert isinstance(response, WatchStatusResponse)
    assert response.running is False


@pytest.mark.asyncio
async def test_get_watch_status_running(mock_app):
    """Test getting watch status when watch service is running."""
    # Create a mock task that is running
    mock_task = MagicMock()
    mock_task.done.return_value = False

    # Set up app state
    mock_app.state.watch_task = mock_task

    # Create mock request
    mock_request = MockRequest(mock_app)

    # Call endpoint directly
    response = await get_watch_status(mock_request)

    # Verify response
    assert isinstance(response, WatchStatusResponse)
    assert response.running is True


@pytest.fixture
def mock_sync_service():
    """Create a mock SyncService."""
    mock_service = AsyncMock()
    mock_service.entity_service = MagicMock()
    mock_service.entity_service.file_service = MagicMock()
    return mock_service


@pytest.fixture
def mock_project_repository():
    """Create a mock ProjectRepository."""
    mock_repository = AsyncMock()
    return mock_repository


@pytest.mark.asyncio
async def test_start_watch_service_when_not_running(
    mock_app, mock_sync_service, mock_project_repository
):
    """Test starting watch service when it's not running."""
    # Set up app state
    mock_app.state.watch_task = None

    # Create mock request
    mock_request = MockRequest(mock_app)

    # Mock the create_background_sync_task function
    with (
        patch("basic_memory.sync.WatchService") as mock_watch_service_class,
        patch("basic_memory.sync.background_sync.create_background_sync_task") as mock_create_task,
    ):
        # Create a mock task
        mock_task = MagicMock()
        mock_task.done.return_value = False
        mock_create_task.return_value = mock_task

        # Setup mock watch service
        mock_watch_service = MagicMock()
        mock_watch_service_class.return_value = mock_watch_service

        # Call endpoint directly
        response = await start_watch_service(
            mock_request, mock_project_repository, mock_sync_service
        )  # pyright: ignore [reportCallIssue]

        # Verify response
        assert isinstance(response, WatchStatusResponse)
        assert response.running is True

        # Verify that the task was created
        assert mock_create_task.called


@pytest.mark.asyncio
async def test_start_watch_service_already_running(
    mock_app, mock_sync_service, mock_project_repository
):
    """Test starting watch service when it's already running."""
    # Create a mock task that reports as running
    mock_task = MagicMock()
    mock_task.done.return_value = False

    # Set up app state with a "running" task
    mock_app.state.watch_task = mock_task

    # Create mock request
    mock_request = MockRequest(mock_app)

    with patch("basic_memory.sync.background_sync.create_background_sync_task") as mock_create_task:
        # Call endpoint directly
        response = await start_watch_service(
            mock_request, mock_project_repository, mock_sync_service
        )

        # Verify response
        assert isinstance(response, WatchStatusResponse)
        assert response.running is True

        # Verify that no new task was created
        assert not mock_create_task.called

        # Verify app state was not changed
        assert mock_app.state.watch_task is mock_task


@pytest.mark.asyncio
async def test_stop_watch_service_when_running():
    """Test stopping the watch service when it's running.

    This test directly tests parts of the code without actually awaiting the task.
    """
    from basic_memory.api.routers.management_router import WatchStatusResponse

    # Create a response object directly
    response = WatchStatusResponse(running=False)

    # We're just testing that the response model works correctly
    assert isinstance(response, WatchStatusResponse)
    assert response.running is False

    # The actual functionality is simple enough that other tests
    # indirectly cover the basic behavior, and the error paths
    # are directly tested in the other test cases


@pytest.mark.asyncio
async def test_stop_watch_service_not_running(mock_app):
    """Test stopping the watch service when it's not running."""
    # Set up app state with no task
    mock_app.state.watch_task = None

    # Create mock request
    mock_request = MockRequest(mock_app)

    # Call endpoint directly
    response = await stop_watch_service(mock_request)

    # Verify response
    assert isinstance(response, WatchStatusResponse)
    assert response.running is False


@pytest.mark.asyncio
async def test_stop_watch_service_already_done(mock_app):
    """Test stopping the watch service when it's already done."""
    # Create a mock task that reports as done
    mock_task = MagicMock()
    mock_task.done.return_value = True

    # Set up app state
    mock_app.state.watch_task = mock_task

    # Create mock request
    mock_request = MockRequest(mock_app)

    # Call endpoint directly
    response = await stop_watch_service(mock_request)  # pyright: ignore [reportArgumentType]

    # Verify response
    assert isinstance(response, WatchStatusResponse)
    assert response.running is False

```

--------------------------------------------------------------------------------
/tests/mcp/test_obsidian_yaml_formatting.py:
--------------------------------------------------------------------------------

```python
"""Integration tests for Obsidian-compatible YAML formatting in write_note tool."""

import pytest

from basic_memory.mcp.tools import write_note


@pytest.mark.asyncio
async def test_write_note_tags_yaml_format(app, project_config, test_project):
    """Test that write_note creates files with proper YAML list format for tags."""
    # Create a note with tags using write_note
    result = await write_note.fn(
        project=test_project.name,
        title="YAML Format Test",
        folder="test",
        content="Testing YAML tag formatting",
        tags=["system", "overview", "reference"],
    )

    # Verify the note was created successfully
    assert "Created note" in result
    assert "file_path: test/YAML Format Test.md" in result

    # Read the file directly to check YAML formatting
    file_path = project_config.home / "test" / "YAML Format Test.md"
    content = file_path.read_text(encoding="utf-8")

    # Should use YAML list format
    assert "tags:" in content
    assert "- system" in content
    assert "- overview" in content
    assert "- reference" in content

    # Should NOT use JSON array format
    assert '["system"' not in content
    assert '"overview"' not in content
    assert '"reference"]' not in content


@pytest.mark.asyncio
async def test_write_note_stringified_json_tags(app, project_config, test_project):
    """Test that stringified JSON arrays are handled correctly."""
    # This simulates the issue where AI assistants pass tags as stringified JSON
    result = await write_note.fn(
        project=test_project.name,
        title="Stringified JSON Test",
        folder="test",
        content="Testing stringified JSON tag input",
        tags='["python", "testing", "json"]',  # Stringified JSON array
    )

    # Verify the note was created successfully
    assert "Created note" in result

    # Read the file to check formatting
    file_path = project_config.home / "test" / "Stringified JSON Test.md"
    content = file_path.read_text(encoding="utf-8")

    # Should properly parse the JSON and format as YAML list
    assert "tags:" in content
    assert "- python" in content
    assert "- testing" in content
    assert "- json" in content

    # Should NOT have the original stringified format issues
    assert '["python"' not in content
    assert '"testing"' not in content
    assert '"json"]' not in content


@pytest.mark.asyncio
async def test_write_note_single_tag_yaml_format(app, project_config, test_project):
    """Test that single tags are still formatted as YAML lists."""
    await write_note.fn(
        project=test_project.name,
        title="Single Tag Test",
        folder="test",
        content="Testing single tag formatting",
        tags=["solo-tag"],
    )

    file_path = project_config.home / "test" / "Single Tag Test.md"
    content = file_path.read_text(encoding="utf-8")

    # Single tag should still use list format
    assert "tags:" in content
    assert "- solo-tag" in content


@pytest.mark.asyncio
async def test_write_note_no_tags(app, project_config, test_project):
    """Test that notes without tags work normally."""
    await write_note.fn(
        project=test_project.name,
        title="No Tags Test",
        folder="test",
        content="Testing note without tags",
        tags=None,
    )

    file_path = project_config.home / "test" / "No Tags Test.md"
    content = file_path.read_text(encoding="utf-8")

    # Should not have tags field in frontmatter
    assert "tags:" not in content
    assert "title: No Tags Test" in content


@pytest.mark.asyncio
async def test_write_note_empty_tags_list(app, project_config, test_project):
    """Test that empty tag lists are handled properly."""
    await write_note.fn(
        project=test_project.name,
        title="Empty Tags Test",
        folder="test",
        content="Testing empty tag list",
        tags=[],
    )

    file_path = project_config.home / "test" / "Empty Tags Test.md"
    content = file_path.read_text(encoding="utf-8")

    # Should not add tags field to frontmatter for empty lists
    assert "tags:" not in content


@pytest.mark.asyncio
async def test_write_note_update_preserves_yaml_format(app, project_config, test_project):
    """Test that updating a note preserves the YAML list format."""
    # First, create the note
    await write_note.fn(
        project=test_project.name,
        title="Update Format Test",
        folder="test",
        content="Initial content",
        tags=["initial", "tag"],
    )

    # Then update it with new tags
    result = await write_note.fn(
        project=test_project.name,
        title="Update Format Test",
        folder="test",
        content="Updated content",
        tags=["updated", "new-tag", "format"],
    )

    # Should be an update, not a new creation
    assert "Updated note" in result

    # Check the file format
    file_path = project_config.home / "test" / "Update Format Test.md"
    content = file_path.read_text(encoding="utf-8")

    # Should have proper YAML formatting for updated tags
    assert "tags:" in content
    assert "- updated" in content
    assert "- new-tag" in content
    assert "- format" in content

    # Old tags should be gone
    assert "- initial" not in content
    assert "- tag" not in content

    # Content should be updated
    assert "Updated content" in content
    assert "Initial content" not in content


@pytest.mark.asyncio
async def test_complex_tags_yaml_format(app, project_config, test_project):
    """Test that complex tags with special characters format correctly."""
    await write_note.fn(
        project=test_project.name,
        title="Complex Tags Test",
        folder="test",
        content="Testing complex tag formats",
        tags=["python-3.9", "api_integration", "v2.0", "nested/category", "under_score"],
    )

    file_path = project_config.home / "test" / "Complex Tags Test.md"
    content = file_path.read_text(encoding="utf-8")

    # All complex tags should format correctly
    assert "- python-3.9" in content
    assert "- api_integration" in content
    assert "- v2.0" in content
    assert "- nested/category" in content
    assert "- under_score" in content

```

--------------------------------------------------------------------------------
/specs/SPEC-11 Basic Memory API Performance Optimization.md:
--------------------------------------------------------------------------------

```markdown
---
title: 'SPEC-11: Basic Memory API Performance Optimization'
type: spec
permalink: specs/spec-11-basic-memory-api-performance-optimization
tags:
- performance
- api
- mcp
- database
- cloud
---

# SPEC-11: Basic Memory API Performance Optimization

## Why

The Basic Memory API experiences significant performance issues in cloud environments due to expensive per-request initialization. MCP tools making
HTTP requests to the API suffer from 350ms-2.6s latency overhead **before** any actual operation occurs.

**Root Cause Analysis:**
- GitHub Issue #82 shows repeated initialization sequences in logs (16:29:35 and 16:49:58)
- Each MCP tool call triggers full database initialization + project reconciliation
- `get_engine_factory()` dependency calls `db.get_or_create_db()` on every request
- `reconcile_projects_with_config()` runs expensive sync operations repeatedly

**Performance Impact:**
- Database connection setup: ~50-100ms per request
- Migration checks: ~100-500ms per request
- Project reconciliation: ~200ms-2s per request
- **Total overhead**: ~350ms-2.6s per MCP tool call

This creates compounding effects with tenant auto-start delays and increases timeout risk in cloud deployments.

## What

This optimization affects the **core basic-memory repository** components:

1. **API Lifespan Management** (`src/basic_memory/api/app.py`)
 - Cache database connections in app state during startup
 - Avoid repeated expensive initialization

2. **Dependency Injection** (`src/basic_memory/deps.py`)
 - Modify `get_engine_factory()` to use cached connections
 - Eliminate per-request database setup

3. **Initialization Service** (`src/basic_memory/services/initialization.py`)
 - Add caching/throttling to project reconciliation
 - Skip expensive operations when appropriate

4. **Configuration** (`src/basic_memory/config.py`)
 - Add optional performance flags for cloud environments

**Backwards Compatibility**: All changes must be backwards compatible with existing CLI and non-cloud usage.

## How (High Level)

### Phase 1: Cache Database Connections (Critical - 80% of gains)

**Problem**: `get_engine_factory()` calls `db.get_or_create_db()` per request
**Solution**: Cache database engine/session in app state during lifespan

1. **Modify API Lifespan** (`api/app.py`):
 ```python
 @asynccontextmanager
 async def lifespan(app: FastAPI):
     app_config = ConfigManager().config
     await initialize_app(app_config)

     # Cache database connection in app state
     engine, session_maker = await db.get_or_create_db(app_config.database_path)
     app.state.engine = engine
     app.state.session_maker = session_maker

     # ... rest of startup logic
```

2. Modify Dependency Injection (deps.py):
```python
async def get_engine_factory(
  request: Request
) -> tuple[AsyncEngine, async_sessionmaker[AsyncSession]]:
  """Get cached engine and session maker from app state."""
  return request.app.state.engine, request.app.state.session_maker
```
Phase 2: Optimize Project Reconciliation (Secondary - 20% of gains)

Problem: reconcile_projects_with_config() runs expensive sync repeatedly
Solution: Add module-level caching with time-based throttling

1. Add Reconciliation Cache (services/initialization.py):
```ptyhon
_project_reconciliation_completed = False
_last_reconciliation_time = 0

async def reconcile_projects_with_config(app_config, force=False):
  # Skip if recently completed (within 60 seconds) unless forced
  if recently_completed and not force:
      return
  # ... existing logic
```
Phase 3: Cloud Environment Flags (Optional)

Problem: Force expensive initialization in production environments
Solution: Add skip flags for cloud/stateless deployments

1. Add Config Flag (config.py):
skip_initialization_sync: bool = Field(default=False)
2. Configure in Cloud (basic-memory-cloud integration):
BASIC_MEMORY_SKIP_INITIALIZATION_SYNC=true

How to Evaluate

Success Criteria

1. Performance Metrics (Primary):
- MCP tool response time reduced by 50%+ (measure before/after)
- Database connection overhead eliminated (0ms vs 50-100ms)
- Migration check overhead eliminated (0ms vs 100-500ms)
- Project reconciliation overhead reduced by 90%+
2. Load Testing:
- Concurrent MCP tool calls maintain performance
- No memory leaks in cached connections
- Database connection pool behaves correctly
3. Functional Correctness:
- All existing API endpoints work identically
- MCP tools maintain full functionality
- CLI operations unaffected
- Database migrations still execute properly
4. Backwards Compatibility:
- No breaking changes to existing APIs
- Config changes are optional with safe defaults
- Non-cloud deployments work unchanged

Testing Strategy

Performance Testing:
# Before optimization
time basic-memory-mcp-tools write_note "test" "content" "folder"
# Measure: ~1-3 seconds

# After optimization  
time basic-memory-mcp-tools write_note "test" "content" "folder"
# Target: <500ms

Load Testing:
# Multiple concurrent MCP tool calls
for i in {1..10}; do
basic-memory-mcp-tools search "test" &
done
wait
# Verify: No degradation, consistent response times

Regression Testing:
# Full basic-memory test suite
just test
# All tests must pass

# Integration tests with cloud deployment
# Verify MCP gateway → API → database flow works

Validation Checklist

- Phase 1 Complete: Database connections cached, dependency injection optimized
- Performance Benchmark: 50%+ improvement in MCP tool response times
- Memory Usage: No leaks in cached connections over 24h+ periods
- Stress Testing: 100+ concurrent requests maintain performance
- Backwards Compatibility: All existing functionality preserved
- Documentation: Performance optimization documented in README
- Cloud Integration: basic-memory-cloud sees performance benefits

Notes

Implementation Priority:
- Phase 1 provides 80% of performance gains and should be implemented first
- Phase 2 provides remaining 20% and addresses edge cases
- Phase 3 is optional for maximum cloud optimization

Risk Mitigation:
- All changes backwards compatible
- Gradual rollout possible (Phase 1 → 2 → 3)
- Easy rollback via configuration flags

Cloud Integration:
- This optimization directly addresses basic-memory-cloud issue #82
- Changes in core basic-memory will benefit all cloud tenants
- No changes needed in basic-memory-cloud itself

```

--------------------------------------------------------------------------------
/specs/SPEC-1 Specification-Driven Development Process.md:
--------------------------------------------------------------------------------

```markdown
---
title: 'SPEC-1: Specification-Driven Development Process'
type: spec
permalink: specs/spec-1-specification-driven-development-process
tags:
- process
- specification
- development
- meta
---

# SPEC-1: Specification-Driven Development Process

## Why
We're implementing specification-driven development to solve the complexity and circular refactoring issues in our web development process. 
Instead of getting lost in framework details and type gymnastics, we start with clear specifications that drive implementation.

The default approach of adhoc development with AI agents tends to result in:
- Circular refactoring cycles
- Fighting framework complexity
- Lost context between sessions
- Unclear requirements and scope

## What
This spec defines our process for using basic-memory as the specification engine to build basic-memory-cloud. 
We're creating a recursive development pattern where basic-memory manages the specs that drive the development of basic-memory-cloud.

**Affected Areas:**
- All future component development
- Architecture decisions
- Agent collaboration workflows
- Knowledge management and context preservation

## How (High Level)

### Specification Structure

Name: Spec names should be numbered sequentially, followed by a description eg. `SPEC-X - Simple Description.md`.
See: [[Spec-2: Slash Commands Reference]]

Every spec is a complete thought containing:
- **Why**: The reasoning and problem being solved
- **What**: What is affected or changed
- **How**: High-level approach to implementation
- **How to Evaluate**: Testing/validation procedure
- Additional context as needed

### Living Specification Format

Specifications are **living documents** that evolve throughout implementation:

**Progress Tracking:**
- **Completed items**: Use ✅ checkmark emoji for implemented features
- **Pending items**: Use `- [ ]` GitHub-style checkboxes for remaining tasks
- **In-progress items**: Use `- [x]` when work is actively underway

**Status Philosophy:**
- **Avoid static status headers** like "COMPLETE" or "IN PROGRESS" that become stale
- **Use checklists within content** to show granular implementation progress
- **Keep specs informative** while providing clear progress visibility
- **Update continuously** as understanding and implementation evolve

**Example Format:**
```markdown
### ComponentName
- ✅ Basic functionality implemented
- ✅ Props and events defined
- - [ ] Add sorting controls
- - [ ] Improve accessibility
- - [x] Currently implementing responsive design
```

This creates **git-friendly progress tracking** where `[ ]` easily becomes `[x]` or ✅ when completed, and specs remain valuable throughout the development lifecycle.


## Claude Code 

We will leverage Claude Code capabilities to make the process semi-automated. 

- Slash commands: define repeatable steps in the process (create spec, implement, review, etc)
- Agents: define roles to carry out instructions  (front end developer, baskend developer, etc)
- MCP tools: enable agents to implement specs via actions (write code, test, etc)

### Workflow
1. **Create**: Write spec as complete thought in `/specs` folder
2. **Discuss**: Iterate and refine through agent collaboration
3. **Implement**: Hand spec to appropriate specialist agent
4. **Validate**: Review implementation against spec criteria
5. **Document**: Update spec with learnings and decisions

### Slash Commands

Claude slash commands are used to manage the flow.
These are simple instructions to help make the process uniform. 
They can be updated and refined as needed. 

- `/spec create [name]` - Create new specification
- `/spec status` - Show current spec states
- `/spec implement [name]` - Hand to appropriate agent
- `/spec review [name]` - Validate implementation

### Agent Orchestration

Agents are defined with clear roles, for instance:

- **system-architect**: Creates high-level specs, ADRs, architectural decisions
- **vue-developer**: Component specs, UI patterns, frontend architecture
- **python-developer**: Implementation specs, technical details, backend logic
- 
- Each agent reads/updates specs through basic-memory tools. 

## How to Evaluate

### Success Criteria
- Specs provide clear, actionable guidance for implementation
- Reduced circular refactoring and scope creep
- Persistent context across development sessions
- Clean separation between "what/why" and implementation details
- Specs record a history of what happened and why for historical context

### Testing Procedure
1. Create a spec for an existing problematic component
2. Have an agent implement following only the spec
3. Compare result quality and development speed vs. ad-hoc approach
4. Measure context preservation across sessions
5. Evaluate spec clarity and completeness

### Metrics
- Time from spec to working implementation
- Number of refactoring cycles required
- Agent understanding of requirements
- Spec reusability for similar components

## Notes
- Start simple: specs are just complete thoughts, not heavy processes
- Use basic-memory's knowledge graph to link specs, decisions, components
- Let the process evolve naturally based on what works
- Focus on solving the actual problem: Manage complexity in development

## Observations

- [problem] Web development without clear goals and documentation circular refactoring cycles #complexity
- [solution] Specification-driven development reduces scope creep and context loss #process-improvement  
- [pattern] basic-memory as specification engine creates recursive development loop #meta-development
- [workflow] Five-step process: Create → Discuss → Implement → Validate → Document #methodology
- [tool] Slash commands provide uniform process automation #automation
- [agent-pattern] Three specialized agents handle different implementation domains #specialization
- [success-metric] Time from spec to working implementation measures process efficiency #measurement
- [learning] Process should evolve naturally based on what works in practice #adaptation
- [format] Living specifications use checklists for progress tracking instead of static status headers #documentation
- [evolution] Specs evolve throughout implementation maintaining value as working documents #continuous-improvement

## Relations

- spec [[Spec-2: Slash Commands Reference]]
- spec [[Spec-3: Agent Definitions]]

```

--------------------------------------------------------------------------------
/tests/api/test_search_router.py:
--------------------------------------------------------------------------------

```python
"""Tests for search router."""

from datetime import datetime, timezone

import pytest
import pytest_asyncio
from sqlalchemy import text

from basic_memory import db
from basic_memory.schemas import Entity as EntitySchema
from basic_memory.schemas.search import SearchItemType, SearchResponse


@pytest_asyncio.fixture
async def indexed_entity(init_search_index, full_entity, search_service):
    """Create an entity and index it."""
    await search_service.index_entity(full_entity)
    return full_entity


@pytest.mark.asyncio
async def test_search_basic(client, indexed_entity, project_url):
    """Test basic text search."""
    response = await client.post(f"{project_url}/search/", json={"text": "search"})
    assert response.status_code == 200
    search_results = SearchResponse.model_validate(response.json())
    assert len(search_results.results) == 3

    found = False
    for r in search_results.results:
        if r.type == SearchItemType.ENTITY.value:
            assert r.permalink == indexed_entity.permalink
            found = True

    assert found, "Expected to find indexed entity in results"


@pytest.mark.asyncio
async def test_search_basic_pagination(client, indexed_entity, project_url):
    """Test basic text search."""
    response = await client.post(
        f"{project_url}/search/?page=3&page_size=1", json={"text": "search"}
    )
    assert response.status_code == 200
    search_results = SearchResponse.model_validate(response.json())
    assert len(search_results.results) == 1

    assert search_results.current_page == 3
    assert search_results.page_size == 1


@pytest.mark.asyncio
async def test_search_with_entity_type_filter(client, indexed_entity, project_url):
    """Test search with type filter."""
    # Should find with correct type
    response = await client.post(
        f"{project_url}/search/",
        json={"text": "test", "entity_types": [SearchItemType.ENTITY.value]},
    )
    assert response.status_code == 200
    search_results = SearchResponse.model_validate(response.json())
    assert len(search_results.results) > 0

    # Should find with relation type
    response = await client.post(
        f"{project_url}/search/",
        json={"text": "test", "entity_types": [SearchItemType.RELATION.value]},
    )
    assert response.status_code == 200
    search_results = SearchResponse.model_validate(response.json())
    assert len(search_results.results) == 2


@pytest.mark.asyncio
async def test_search_with_type_filter(client, indexed_entity, project_url):
    """Test search with entity type filter."""
    # Should find with correct entity type
    response = await client.post(f"{project_url}/search/", json={"text": "test", "types": ["test"]})
    assert response.status_code == 200
    search_results = SearchResponse.model_validate(response.json())
    assert len(search_results.results) == 1

    # Should not find with wrong entity type
    response = await client.post(f"{project_url}/search/", json={"text": "test", "types": ["note"]})
    assert response.status_code == 200
    search_results = SearchResponse.model_validate(response.json())
    assert len(search_results.results) == 0


@pytest.mark.asyncio
async def test_search_with_date_filter(client, indexed_entity, project_url):
    """Test search with date filter."""
    # Should find with past date
    past_date = datetime(2020, 1, 1, tzinfo=timezone.utc)
    response = await client.post(
        f"{project_url}/search/", json={"text": "test", "after_date": past_date.isoformat()}
    )
    assert response.status_code == 200
    search_results = SearchResponse.model_validate(response.json())

    # Should not find with future date
    future_date = datetime(2030, 1, 1, tzinfo=timezone.utc)
    response = await client.post(
        f"{project_url}/search/", json={"text": "test", "after_date": future_date.isoformat()}
    )
    assert response.status_code == 200
    search_results = SearchResponse.model_validate(response.json())
    assert len(search_results.results) == 0


@pytest.mark.asyncio
async def test_search_empty(search_service, client, project_url):
    """Test search with no matches."""
    response = await client.post(f"{project_url}/search/", json={"text": "nonexistent"})
    assert response.status_code == 200
    search_result = SearchResponse.model_validate(response.json())
    assert len(search_result.results) == 0


@pytest.mark.asyncio
async def test_reindex(client, search_service, entity_service, session_maker, project_url):
    """Test reindex endpoint."""
    # Create test entity and document
    await entity_service.create_entity(
        EntitySchema(
            title="TestEntity1",
            folder="test",
            entity_type="test",
        ),
    )

    # Clear search index
    async with db.scoped_session(session_maker) as session:
        await session.execute(text("DELETE FROM search_index"))
        await session.commit()

    # Verify nothing is searchable
    response = await client.post(f"{project_url}/search/", json={"text": "test"})
    search_results = SearchResponse.model_validate(response.json())
    assert len(search_results.results) == 0

    # Trigger reindex
    reindex_response = await client.post(f"{project_url}/search/reindex")
    assert reindex_response.status_code == 200
    assert reindex_response.json()["status"] == "ok"

    # Verify content is searchable again
    search_response = await client.post(f"{project_url}/search/", json={"text": "test"})
    search_results = SearchResponse.model_validate(search_response.json())
    assert len(search_results.results) == 1


@pytest.mark.asyncio
async def test_multiple_filters(client, indexed_entity, project_url):
    """Test search with multiple filters combined."""
    response = await client.post(
        f"{project_url}/search/",
        json={
            "text": "test",
            "entity_types": [SearchItemType.ENTITY.value],
            "types": ["test"],
            "after_date": datetime(2020, 1, 1, tzinfo=timezone.utc).isoformat(),
        },
    )
    assert response.status_code == 200
    search_result = SearchResponse.model_validate(response.json())
    assert len(search_result.results) == 1
    result = search_result.results[0]
    assert result.permalink == indexed_entity.permalink
    assert result.type == SearchItemType.ENTITY.value
    assert result.metadata["entity_type"] == "test"

```

--------------------------------------------------------------------------------
/src/basic_memory/schemas/response.py:
--------------------------------------------------------------------------------

```python
"""Response schemas for knowledge graph operations.

This module defines the response formats for all knowledge graph operations.
Each response includes complete information about the affected entities,
including IDs that can be used in subsequent operations.

Key Features:
1. Every created/updated object gets an ID
2. Relations are included with their parent entities
3. Responses include everything needed for next operations
4. Bulk operations return all affected items
"""

from datetime import datetime
from typing import List, Optional, Dict

from pydantic import BaseModel, ConfigDict, Field, AliasPath, AliasChoices

from basic_memory.schemas.base import Relation, Permalink, EntityType, ContentType, Observation


class SQLAlchemyModel(BaseModel):
    """Base class for models that read from SQLAlchemy attributes.

    This base class handles conversion of SQLAlchemy model attributes
    to Pydantic model fields. All response models extend this to ensure
    proper handling of database results.
    """

    model_config = ConfigDict(from_attributes=True)


class ObservationResponse(Observation, SQLAlchemyModel):
    """Schema for observation data returned from the service.

    Each observation gets a unique ID that can be used for later
    reference or deletion.

    Example Response:
    {
        "category": "feature",
        "content": "Added support for async operations",
        "context": "Initial database design meeting"
    }
    """

    permalink: Permalink


class RelationResponse(Relation, SQLAlchemyModel):
    """Response schema for relation operations.

    Extends the base Relation model with a unique ID that can be
    used for later modification or deletion.

    Example Response:
    {
        "from_id": "test/memory_test",
        "to_id": "component/memory-service",
        "relation_type": "validates",
        "context": "Comprehensive test suite"
    }
    """

    permalink: Permalink

    from_id: Permalink = Field(
        # use the permalink from the associated Entity
        # or the from_id value
        validation_alias=AliasChoices(
            AliasPath("from_entity", "permalink"),
            "from_id",
        )
    )
    to_id: Optional[Permalink] = Field(  # pyright: ignore
        # use the permalink from the associated Entity
        # or the to_id value
        validation_alias=AliasChoices(
            AliasPath("to_entity", "permalink"),
            "to_id",
        ),
        default=None,
    )
    to_name: Optional[Permalink] = Field(
        # use the permalink from the associated Entity
        # or the to_id value
        validation_alias=AliasChoices(
            AliasPath("to_entity", "title"),
            "to_name",
        ),
        default=None,
    )


class EntityResponse(SQLAlchemyModel):
    """Complete entity data returned from the service.

    This is the most comprehensive entity view, including:
    1. Basic entity details (id, name, type)
    2. All observations with their IDs
    3. All relations with their IDs
    4. Optional description

    Example Response:
    {
        "permalink": "component/memory-service",
        "file_path": "MemoryService",
        "entity_type": "component",
        "entity_metadata": {}
        "content_type: "text/markdown"
        "observations": [
            {
                "category": "feature",
                "content": "Uses SQLite storage"
                "context": "Initial design"
            },
            {
                "category": "feature",
                "content": "Implements async operations"
                "context": "Initial design"
            }
        ],
        "relations": [
            {
                "from_id": "test/memory-test",
                "to_id": "component/memory-service",
                "relation_type": "validates",
                "context": "Main test suite"
            }
        ]
    }
    """

    permalink: Optional[Permalink]
    title: str
    file_path: str
    entity_type: EntityType
    entity_metadata: Optional[Dict] = None
    checksum: Optional[str] = None
    content_type: ContentType
    observations: List[ObservationResponse] = []
    relations: List[RelationResponse] = []
    created_at: datetime
    updated_at: datetime


class EntityListResponse(SQLAlchemyModel):
    """Response for create_entities operation.

    Returns complete information about entities returned from the service,
    including their permalinks, observations,
    and any established relations.

    Example Response:
    {
        "entities": [
            {
                "permalink": "component/search_service",
                "title": "SearchService",
                "entity_type": "component",
                "description": "Knowledge graph search",
                "observations": [
                    {
                        "content": "Implements full-text search"
                    }
                ],
                "relations": []
            },
            {
                "permalink": "document/api_docs",
                "title": "API_Documentation",
                "entity_type": "document",
                "description": "API Reference",
                "observations": [
                    {
                        "content": "Documents REST endpoints"
                    }
                ],
                "relations": []
            }
        ]
    }
    """

    entities: List[EntityResponse]


class SearchNodesResponse(SQLAlchemyModel):
    """Response for search operation.

    Returns matching entities with their complete information,
    plus the original query for reference.

    Example Response:
    {
        "matches": [
            {
                "permalink": "component/memory-service",
                "title": "MemoryService",
                "entity_type": "component",
                "description": "Core service",
                "observations": [...],
                "relations": [...]
            }
        ],
        "query": "memory"
    }

    Note: Each entity in matches includes full details
    just like EntityResponse.
    """

    matches: List[EntityResponse]
    query: str


class DeleteEntitiesResponse(SQLAlchemyModel):
    """Response indicating successful entity deletion.

    A simple boolean response confirming the delete operation
    completed successfully.

    Example Response:
    {
        "deleted": true
    }
    """

    deleted: bool

```

--------------------------------------------------------------------------------
/src/basic_memory/mcp/prompts/recent_activity.py:
--------------------------------------------------------------------------------

```python
"""Recent activity prompts for Basic Memory MCP server.

These prompts help users see what has changed in their knowledge base recently.
"""

from typing import Annotated, Optional

from loguru import logger
from pydantic import Field

from basic_memory.mcp.prompts.utils import format_prompt_context, PromptContext, PromptContextItem
from basic_memory.mcp.server import mcp
from basic_memory.mcp.tools.recent_activity import recent_activity
from basic_memory.schemas.base import TimeFrame
from basic_memory.schemas.memory import GraphContext, ProjectActivitySummary
from basic_memory.schemas.search import SearchItemType


@mcp.prompt(
    name="recent_activity",
    description="Get recent activity from a specific project or across all projects",
)
async def recent_activity_prompt(
    timeframe: Annotated[
        TimeFrame,
        Field(description="How far back to look for activity (e.g. '1d', '1 week')"),
    ] = "7d",
    project: Annotated[
        Optional[str],
        Field(
            description="Specific project to get activity from (None for discovery across all projects)"
        ),
    ] = None,
) -> str:
    """Get recent activity from a specific project or across all projects.

    This prompt helps you see what's changed recently in the knowledge base.
    In discovery mode (project=None), it shows activity across all projects.
    In project-specific mode, it shows detailed activity for one project.

    Args:
        timeframe: How far back to look for activity (e.g. '1d', '1 week')
        project: Specific project to get activity from (None for discovery across all projects)

    Returns:
        Formatted summary of recent activity
    """
    logger.info(f"Getting recent activity, timeframe: {timeframe}, project: {project}")

    recent = await recent_activity.fn(
        project=project, timeframe=timeframe, type=[SearchItemType.ENTITY]
    )

    # Extract primary results from the hierarchical structure
    primary_results = []
    related_results = []

    if isinstance(recent, ProjectActivitySummary):
        # Discovery mode - extract results from all projects
        for _, project_activity in recent.projects.items():
            if project_activity.activity.results:
                # Take up to 2 primary results per project
                for item in project_activity.activity.results[:2]:
                    primary_results.append(item.primary_result)
                    # Add up to 1 related result per primary item
                    if item.related_results:
                        related_results.extend(item.related_results[:1])

        # Limit total results for readability
        primary_results = primary_results[:8]
        related_results = related_results[:6]

    elif isinstance(recent, GraphContext):
        # Project-specific mode - use existing logic
        if recent.results:
            # Take up to 5 primary results
            for item in recent.results[:5]:
                primary_results.append(item.primary_result)
                # Add up to 2 related results per primary item
                if item.related_results:
                    related_results.extend(item.related_results[:2])

    # Set topic based on mode
    if project:
        topic = f"Recent Activity in {project} ({timeframe})"
    else:
        topic = f"Recent Activity Across All Projects ({timeframe})"

    prompt_context = format_prompt_context(
        PromptContext(
            topic=topic,
            timeframe=timeframe,
            results=[
                PromptContextItem(
                    primary_results=primary_results,
                    related_results=related_results[:10],  # Limit total related results
                )
            ],
        )
    )

    # Add mode-specific suggestions
    first_title = "Recent Topic"
    if primary_results and len(primary_results) > 0:
        first_title = primary_results[0].title

    if project:
        # Project-specific suggestions
        capture_suggestions = f"""
    ## Opportunity to Capture Activity Summary

    Consider creating a summary note of recent activity in {project}:

    ```python
    await write_note(
        "{project}",
        title="Activity Summary {timeframe}",
        content='''
        # Activity Summary for {project} ({timeframe})

        ## Overview
        [Summary of key changes and developments in this project over this period]

        ## Key Updates
        [List main updates and their significance within this project]

        ## Observations
        - [trend] [Observation about patterns in recent activity]
        - [insight] [Connection between different activities]

        ## Relations
        - summarizes [[{first_title}]]
        - relates_to [[{project} Overview]]
        ''',
        folder="summaries"
    )
    ```

    Summarizing periodic activity helps create high-level insights and connections within the project.
    """
    else:
        # Discovery mode suggestions
        project_count = len(recent.projects) if isinstance(recent, ProjectActivitySummary) else 0
        most_active = (
            getattr(recent.summary, "most_active_project", "Unknown")
            if isinstance(recent, ProjectActivitySummary)
            else "Unknown"
        )

        capture_suggestions = f"""
    ## Cross-Project Activity Discovery

    Found activity across {project_count} projects. Most active: **{most_active}**

    Consider creating a cross-project summary:

    ```python
    await write_note(
        "{most_active if most_active != "Unknown" else "main"}",
        title="Cross-Project Activity Summary {timeframe}",
        content='''
        # Cross-Project Activity Summary ({timeframe})

        ## Overview
        Activity found across {project_count} projects, with {most_active} showing the most activity.

        ## Key Developments
        [Summarize important changes across all projects]

        ## Project Insights
        [Note patterns or connections between projects]

        ## Observations
        - [trend] [Cross-project patterns observed]
        - [insight] [Connections between different project activities]

        ## Relations
        - summarizes [[{first_title}]]
        - relates_to [[Project Portfolio Overview]]
        ''',
        folder="summaries"
    )
    ```

    Cross-project summaries help identify broader trends and project interconnections.
    """

    return prompt_context + capture_suggestions

```

--------------------------------------------------------------------------------
/src/basic_memory/cli/commands/status.py:
--------------------------------------------------------------------------------

```python
"""Status command for basic-memory CLI."""

import asyncio
from typing import Set, Dict
from typing import Annotated, Optional

from mcp.server.fastmcp.exceptions import ToolError
import typer
from loguru import logger
from rich.console import Console
from rich.panel import Panel
from rich.tree import Tree

from basic_memory.cli.app import app
from basic_memory.mcp.async_client import get_client
from basic_memory.mcp.tools.utils import call_post
from basic_memory.schemas import SyncReportResponse
from basic_memory.mcp.project_context import get_active_project

# Create rich console
console = Console()


def add_files_to_tree(
    tree: Tree, paths: Set[str], style: str, checksums: Dict[str, str] | None = None
):
    """Add files to tree, grouped by directory."""
    # Group by directory
    by_dir = {}
    for path in sorted(paths):
        parts = path.split("/", 1)
        dir_name = parts[0] if len(parts) > 1 else ""
        file_name = parts[1] if len(parts) > 1 else parts[0]
        by_dir.setdefault(dir_name, []).append((file_name, path))

    # Add to tree
    for dir_name, files in sorted(by_dir.items()):
        if dir_name:
            branch = tree.add(f"[bold]{dir_name}/[/bold]")
        else:
            branch = tree

        for file_name, full_path in sorted(files):
            if checksums and full_path in checksums:
                checksum_short = checksums[full_path][:8]
                branch.add(f"[{style}]{file_name}[/{style}] ({checksum_short})")
            else:
                branch.add(f"[{style}]{file_name}[/{style}]")


def group_changes_by_directory(changes: SyncReportResponse) -> Dict[str, Dict[str, int]]:
    """Group changes by directory for summary view."""
    by_dir = {}
    for change_type, paths in [
        ("new", changes.new),
        ("modified", changes.modified),
        ("deleted", changes.deleted),
    ]:
        for path in paths:
            dir_name = path.split("/", 1)[0]
            by_dir.setdefault(dir_name, {"new": 0, "modified": 0, "deleted": 0, "moved": 0})
            by_dir[dir_name][change_type] += 1

    # Handle moves - count in both source and destination directories
    for old_path, new_path in changes.moves.items():
        old_dir = old_path.split("/", 1)[0]
        new_dir = new_path.split("/", 1)[0]
        by_dir.setdefault(old_dir, {"new": 0, "modified": 0, "deleted": 0, "moved": 0})
        by_dir.setdefault(new_dir, {"new": 0, "modified": 0, "deleted": 0, "moved": 0})
        by_dir[old_dir]["moved"] += 1
        if old_dir != new_dir:
            by_dir[new_dir]["moved"] += 1

    return by_dir


def build_directory_summary(counts: Dict[str, int]) -> str:
    """Build summary string for directory changes."""
    parts = []
    if counts["new"]:
        parts.append(f"[green]+{counts['new']} new[/green]")
    if counts["modified"]:
        parts.append(f"[yellow]~{counts['modified']} modified[/yellow]")
    if counts["moved"]:
        parts.append(f"[blue]↔{counts['moved']} moved[/blue]")
    if counts["deleted"]:
        parts.append(f"[red]-{counts['deleted']} deleted[/red]")
    return " ".join(parts)


def display_changes(
    project_name: str, title: str, changes: SyncReportResponse, verbose: bool = False
):
    """Display changes using Rich for better visualization."""
    tree = Tree(f"{project_name}: {title}")

    if changes.total == 0 and not changes.skipped_files:
        tree.add("No changes")
        console.print(Panel(tree, expand=False))
        return

    if verbose:
        # Full file listing with checksums
        if changes.new:
            new_branch = tree.add("[green]New Files[/green]")
            add_files_to_tree(new_branch, changes.new, "green", changes.checksums)
        if changes.modified:
            mod_branch = tree.add("[yellow]Modified[/yellow]")
            add_files_to_tree(mod_branch, changes.modified, "yellow", changes.checksums)
        if changes.moves:
            move_branch = tree.add("[blue]Moved[/blue]")
            for old_path, new_path in sorted(changes.moves.items()):
                move_branch.add(f"[blue]{old_path}[/blue] → [blue]{new_path}[/blue]")
        if changes.deleted:
            del_branch = tree.add("[red]Deleted[/red]")
            add_files_to_tree(del_branch, changes.deleted, "red")
        if changes.skipped_files:
            skip_branch = tree.add("[red]⚠️  Skipped (Circuit Breaker)[/red]")
            for skipped in sorted(changes.skipped_files, key=lambda x: x.path):
                skip_branch.add(
                    f"[red]{skipped.path}[/red] "
                    f"(failures: {skipped.failure_count}, reason: {skipped.reason})"
                )
    else:
        # Show directory summaries
        by_dir = group_changes_by_directory(changes)
        for dir_name, counts in sorted(by_dir.items()):
            summary = build_directory_summary(counts)
            if summary:  # Only show directories with changes
                tree.add(f"[bold]{dir_name}/[/bold] {summary}")

        # Show skipped files summary in non-verbose mode
        if changes.skipped_files:
            skip_count = len(changes.skipped_files)
            tree.add(
                f"[red]⚠️  {skip_count} file{'s' if skip_count != 1 else ''} "
                f"skipped due to repeated failures[/red]"
            )

    console.print(Panel(tree, expand=False))


async def run_status(project: Optional[str] = None, verbose: bool = False):  # pragma: no cover
    """Check sync status of files vs database."""

    try:
        async with get_client() as client:
            project_item = await get_active_project(client, project, None)
            response = await call_post(client, f"{project_item.project_url}/project/status")
            sync_report = SyncReportResponse.model_validate(response.json())

            display_changes(project_item.name, "Status", sync_report, verbose)

    except (ValueError, ToolError) as e:
        console.print(f"[red]✗ Error: {e}[/red]")
        raise typer.Exit(1)


@app.command()
def status(
    project: Annotated[
        Optional[str],
        typer.Option(help="The project name."),
    ] = None,
    verbose: bool = typer.Option(False, "--verbose", "-v", help="Show detailed file information"),
):
    """Show sync status between files and database."""
    try:
        asyncio.run(run_status(project, verbose))  # pragma: no cover
    except Exception as e:
        logger.error(f"Error checking status: {e}")
        typer.echo(f"Error checking status: {e}", err=True)
        raise typer.Exit(code=1)  # pragma: no cover

```

--------------------------------------------------------------------------------
/tests/cli/test_import_chatgpt.py:
--------------------------------------------------------------------------------

```python
"""Tests for import_chatgpt command."""

import json

import pytest
from typer.testing import CliRunner

from basic_memory.cli.app import app, import_app
from basic_memory.cli.commands import import_chatgpt  # noqa
from basic_memory.config import get_project_config

# Set up CLI runner
runner = CliRunner()


@pytest.fixture
def sample_conversation():
    """Sample ChatGPT conversation data for testing."""
    return {
        "title": "Test Conversation",
        "create_time": 1736616594.24054,  # Example timestamp
        "update_time": 1736616603.164995,
        "mapping": {
            "root": {"id": "root", "message": None, "parent": None, "children": ["msg1"]},
            "msg1": {
                "id": "msg1",
                "message": {
                    "id": "msg1",
                    "author": {"role": "user", "name": None, "metadata": {}},
                    "create_time": 1736616594.24054,
                    "content": {"content_type": "text", "parts": ["Hello, this is a test message"]},
                    "status": "finished_successfully",
                    "metadata": {},
                },
                "parent": "root",
                "children": ["msg2"],
            },
            "msg2": {
                "id": "msg2",
                "message": {
                    "id": "msg2",
                    "author": {"role": "assistant", "name": None, "metadata": {}},
                    "create_time": 1736616603.164995,
                    "content": {"content_type": "text", "parts": ["This is a test response"]},
                    "status": "finished_successfully",
                    "metadata": {},
                },
                "parent": "msg1",
                "children": [],
            },
        },
    }


@pytest.fixture
def sample_conversation_with_code():
    """Sample conversation with code block."""
    conversation = {
        "title": "Code Test",
        "create_time": 1736616594.24054,
        "update_time": 1736616603.164995,
        "mapping": {
            "root": {"id": "root", "message": None, "parent": None, "children": ["msg1"]},
            "msg1": {
                "id": "msg1",
                "message": {
                    "id": "msg1",
                    "author": {"role": "assistant", "name": None, "metadata": {}},
                    "create_time": 1736616594.24054,
                    "content": {
                        "content_type": "code",
                        "language": "python",
                        "text": "def hello():\n    print('Hello world!')",
                    },
                    "status": "finished_successfully",
                    "metadata": {},
                },
                "parent": "root",
                "children": [],
            },
            "msg2": {
                "id": "msg2",
                "message": {
                    "id": "msg2",
                    "author": {"role": "assistant", "name": None, "metadata": {}},
                    "create_time": 1736616594.24054,
                    "status": "finished_successfully",
                    "metadata": {},
                },
                "parent": "root",
                "children": [],
            },
        },
    }
    return conversation


@pytest.fixture
def sample_conversation_with_hidden():
    """Sample conversation with hidden messages."""
    conversation = {
        "title": "Hidden Test",
        "create_time": 1736616594.24054,
        "update_time": 1736616603.164995,
        "mapping": {
            "root": {
                "id": "root",
                "message": None,
                "parent": None,
                "children": ["visible", "hidden"],
            },
            "visible": {
                "id": "visible",
                "message": {
                    "id": "visible",
                    "author": {"role": "user", "name": None, "metadata": {}},
                    "create_time": 1736616594.24054,
                    "content": {"content_type": "text", "parts": ["Visible message"]},
                    "status": "finished_successfully",
                    "metadata": {},
                },
                "parent": "root",
                "children": [],
            },
            "hidden": {
                "id": "hidden",
                "message": {
                    "id": "hidden",
                    "author": {"role": "system", "name": None, "metadata": {}},
                    "create_time": 1736616594.24054,
                    "content": {"content_type": "text", "parts": ["Hidden message"]},
                    "status": "finished_successfully",
                    "metadata": {"is_visually_hidden_from_conversation": True},
                },
                "parent": "root",
                "children": [],
            },
        },
    }
    return conversation


@pytest.fixture
def sample_chatgpt_json(tmp_path, sample_conversation):
    """Create a sample ChatGPT JSON file."""
    json_file = tmp_path / "conversations.json"
    with open(json_file, "w", encoding="utf-8") as f:
        json.dump([sample_conversation], f)
    return json_file


def test_import_chatgpt_command_success(tmp_path, sample_chatgpt_json, monkeypatch):
    """Test successful conversation import via command."""
    # Set up test environment
    monkeypatch.setenv("HOME", str(tmp_path))

    # Run import
    result = runner.invoke(import_app, ["chatgpt", str(sample_chatgpt_json)])
    assert result.exit_code == 0
    assert "Import complete" in result.output
    assert "Imported 1 conversations" in result.output
    assert "Containing 2 messages" in result.output


def test_import_chatgpt_command_invalid_json(tmp_path):
    """Test error handling for invalid JSON."""
    # Create invalid JSON file
    invalid_file = tmp_path / "invalid.json"
    invalid_file.write_text("not json")

    result = runner.invoke(import_app, ["chatgpt", str(invalid_file)])
    assert result.exit_code == 1
    assert "Error during import" in result.output


def test_import_chatgpt_with_custom_folder(tmp_path, sample_chatgpt_json, monkeypatch):
    """Test import with custom conversations folder."""
    # Set up test environment

    config = get_project_config()
    config.home = tmp_path
    conversations_folder = "chats"

    # Run import
    result = runner.invoke(
        app,
        [
            "import",
            "chatgpt",
            str(sample_chatgpt_json),
            "--folder",
            conversations_folder,
        ],
    )
    assert result.exit_code == 0

    # Check files in custom folder
    conv_path = tmp_path / conversations_folder / "20250111-Test_Conversation.md"
    assert conv_path.exists()

```

--------------------------------------------------------------------------------
/tests/services/test_initialization.py:
--------------------------------------------------------------------------------

```python
"""Tests for the initialization service."""

from unittest.mock import patch, MagicMock, AsyncMock

import pytest

from basic_memory.services.initialization import (
    ensure_initialization,
    initialize_database,
    reconcile_projects_with_config,
    initialize_file_sync,
)


@pytest.mark.asyncio
@patch("basic_memory.services.initialization.db.get_or_create_db")
async def test_initialize_database(mock_get_or_create_db, app_config):
    """Test initializing the database."""
    mock_get_or_create_db.return_value = (MagicMock(), MagicMock())
    await initialize_database(app_config)
    mock_get_or_create_db.assert_called_once_with(app_config.database_path)


@pytest.mark.asyncio
@patch("basic_memory.services.initialization.db.get_or_create_db")
async def test_initialize_database_error(mock_get_or_create_db, app_config):
    """Test handling errors during database initialization."""
    mock_get_or_create_db.side_effect = Exception("Test error")
    await initialize_database(app_config)
    mock_get_or_create_db.assert_called_once_with(app_config.database_path)


@patch("basic_memory.services.initialization.asyncio.run")
def test_ensure_initialization(mock_run, app_config):
    """Test synchronous initialization wrapper."""
    ensure_initialization(app_config)
    mock_run.assert_called_once()


@pytest.mark.asyncio
@patch("basic_memory.services.initialization.db.get_or_create_db")
async def test_reconcile_projects_with_config(mock_get_db, app_config):
    """Test reconciling projects from config with database using ProjectService."""
    # Setup mocks
    mock_session_maker = AsyncMock()
    mock_get_db.return_value = (None, mock_session_maker)

    mock_repository = AsyncMock()
    mock_project_service = AsyncMock()
    mock_project_service.synchronize_projects = AsyncMock()

    # Mock the repository and project service
    with (
        patch("basic_memory.services.initialization.ProjectRepository") as mock_repo_class,
        patch(
            "basic_memory.services.project_service.ProjectService",
            return_value=mock_project_service,
        ),
    ):
        mock_repo_class.return_value = mock_repository

        # Set up app_config projects as a dictionary
        app_config.projects = {"test_project": "/path/to/project", "new_project": "/path/to/new"}
        app_config.default_project = "test_project"

        # Run the function
        await reconcile_projects_with_config(app_config)

        # Assertions
        mock_get_db.assert_called_once()
        mock_repo_class.assert_called_once_with(mock_session_maker)
        mock_project_service.synchronize_projects.assert_called_once()

        # We should no longer be calling these directly since we're using the service
        mock_repository.find_all.assert_not_called()
        mock_repository.set_as_default.assert_not_called()


@pytest.mark.asyncio
@patch("basic_memory.services.initialization.db.get_or_create_db")
async def test_reconcile_projects_with_error_handling(mock_get_db, app_config):
    """Test error handling during project synchronization."""
    # Setup mocks
    mock_session_maker = AsyncMock()
    mock_get_db.return_value = (None, mock_session_maker)

    mock_repository = AsyncMock()
    mock_project_service = AsyncMock()
    mock_project_service.synchronize_projects = AsyncMock(
        side_effect=ValueError("Project synchronization error")
    )

    # Mock the repository and project service
    with (
        patch("basic_memory.services.initialization.ProjectRepository") as mock_repo_class,
        patch(
            "basic_memory.services.project_service.ProjectService",
            return_value=mock_project_service,
        ),
        patch("basic_memory.services.initialization.logger") as mock_logger,
    ):
        mock_repo_class.return_value = mock_repository

        # Set up app_config projects as a dictionary
        app_config.projects = {"test_project": "/path/to/project"}
        app_config.default_project = "missing_project"

        # Run the function which now has error handling
        await reconcile_projects_with_config(app_config)

        # Assertions
        mock_get_db.assert_called_once()
        mock_repo_class.assert_called_once_with(mock_session_maker)
        mock_project_service.synchronize_projects.assert_called_once()

        # Verify error was logged
        mock_logger.error.assert_called_once_with(
            "Error during project synchronization: Project synchronization error"
        )
        mock_logger.info.assert_any_call(
            "Continuing with initialization despite synchronization error"
        )


@pytest.mark.asyncio
@patch("basic_memory.services.initialization.db.get_or_create_db")
@patch("basic_memory.sync.sync_service.get_sync_service")
@patch("basic_memory.sync.WatchService")
@patch("basic_memory.services.initialization.asyncio.create_task")
async def test_initialize_file_sync_background_tasks(
    mock_create_task, mock_watch_service_class, mock_get_sync_service, mock_get_db, app_config
):
    """Test file sync initialization with background task processing."""
    # Setup mocks
    mock_session_maker = AsyncMock()
    mock_get_db.return_value = (None, mock_session_maker)

    mock_watch_service = AsyncMock()
    mock_watch_service.run = AsyncMock()
    mock_watch_service_class.return_value = mock_watch_service

    mock_repository = AsyncMock()
    mock_project1 = MagicMock()
    mock_project1.name = "project1"
    mock_project1.path = "/path/to/project1"
    mock_project1.id = 1

    mock_project2 = MagicMock()
    mock_project2.name = "project2"
    mock_project2.path = "/path/to/project2"
    mock_project2.id = 2

    mock_sync_service = AsyncMock()
    mock_sync_service.sync = AsyncMock()
    mock_get_sync_service.return_value = mock_sync_service

    # Mock background tasks
    mock_task1 = MagicMock()
    mock_task2 = MagicMock()
    mock_create_task.side_effect = [mock_task1, mock_task2]

    # Mock the repository
    with patch("basic_memory.services.initialization.ProjectRepository") as mock_repo_class:
        mock_repo_class.return_value = mock_repository
        mock_repository.get_active_projects.return_value = [mock_project1, mock_project2]

        # Run the function
        result = await initialize_file_sync(app_config)

        # Assertions
        mock_repository.get_active_projects.assert_called_once()

        # Should create background tasks for each project (non-blocking)
        assert mock_create_task.call_count == 2

        # Verify tasks were created but not awaited (function returns immediately)
        assert result is None

        # Watch service should still be started
        mock_watch_service.run.assert_called_once()

```

--------------------------------------------------------------------------------
/src/basic_memory/schemas/project_info.py:
--------------------------------------------------------------------------------

```python
"""Schema for project info response."""

import os
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Any

from pydantic import Field, BaseModel

from basic_memory.utils import generate_permalink


class ProjectStatistics(BaseModel):
    """Statistics about the current project."""

    # Basic counts
    total_entities: int = Field(description="Total number of entities in the knowledge base")
    total_observations: int = Field(description="Total number of observations across all entities")
    total_relations: int = Field(description="Total number of relations between entities")
    total_unresolved_relations: int = Field(
        description="Number of relations with unresolved targets"
    )

    # Entity counts by type
    entity_types: Dict[str, int] = Field(
        description="Count of entities by type (e.g., note, conversation)"
    )

    # Observation counts by category
    observation_categories: Dict[str, int] = Field(
        description="Count of observations by category (e.g., tech, decision)"
    )

    # Relation counts by type
    relation_types: Dict[str, int] = Field(
        description="Count of relations by type (e.g., implements, relates_to)"
    )

    # Graph metrics
    most_connected_entities: List[Dict[str, Any]] = Field(
        description="Entities with the most relations, including their titles and permalinks"
    )
    isolated_entities: int = Field(description="Number of entities with no relations")


class ActivityMetrics(BaseModel):
    """Activity metrics for the current project."""

    # Recent activity
    recently_created: List[Dict[str, Any]] = Field(
        description="Recently created entities with timestamps"
    )
    recently_updated: List[Dict[str, Any]] = Field(
        description="Recently updated entities with timestamps"
    )

    # Growth over time (last 6 months)
    monthly_growth: Dict[str, Dict[str, int]] = Field(
        description="Monthly growth statistics for entities, observations, and relations"
    )


class SystemStatus(BaseModel):
    """System status information."""

    # Version information
    version: str = Field(description="Basic Memory version")

    # Database status
    database_path: str = Field(description="Path to the SQLite database")
    database_size: str = Field(description="Size of the database in human-readable format")

    # Watch service status
    watch_status: Optional[Dict[str, Any]] = Field(
        default=None, description="Watch service status information (if running)"
    )

    # System information
    timestamp: datetime = Field(description="Timestamp when the information was collected")


class ProjectInfoResponse(BaseModel):
    """Response for the project_info tool."""

    # Project configuration
    project_name: str = Field(description="Name of the current project")
    project_path: str = Field(description="Path to the current project files")
    available_projects: Dict[str, Dict[str, Any]] = Field(
        description="Map of configured project names to detailed project information"
    )
    default_project: str = Field(description="Name of the default project")

    # Statistics
    statistics: ProjectStatistics = Field(description="Statistics about the knowledge base")

    # Activity metrics
    activity: ActivityMetrics = Field(description="Activity and growth metrics")

    # System status
    system: SystemStatus = Field(description="System and service status information")


class ProjectInfoRequest(BaseModel):
    """Request model for switching projects."""

    name: str = Field(..., description="Name of the project to switch to")
    path: str = Field(..., description="Path to the project directory")
    set_default: bool = Field(..., description="Set the project as the default")


class WatchEvent(BaseModel):
    timestamp: datetime
    path: str
    action: str  # new, delete, etc
    status: str  # success, error
    checksum: Optional[str]
    error: Optional[str] = None


class WatchServiceState(BaseModel):
    # Service status
    running: bool = False
    start_time: datetime = datetime.now()  # Use directly with Pydantic model
    pid: int = os.getpid()  # Use directly with Pydantic model

    # Stats
    error_count: int = 0
    last_error: Optional[datetime] = None
    last_scan: Optional[datetime] = None

    # File counts
    synced_files: int = 0

    # Recent activity
    recent_events: List[WatchEvent] = []  # Use directly with Pydantic model

    def add_event(
        self,
        path: str,
        action: str,
        status: str,
        checksum: Optional[str] = None,
        error: Optional[str] = None,
    ) -> WatchEvent:  # pragma: no cover
        event = WatchEvent(
            timestamp=datetime.now(),
            path=path,
            action=action,
            status=status,
            checksum=checksum,
            error=error,
        )
        self.recent_events.insert(0, event)
        self.recent_events = self.recent_events[:100]  # Keep last 100
        return event

    def record_error(self, error: str):  # pragma: no cover
        self.error_count += 1
        self.add_event(path="", action="sync", status="error", error=error)
        self.last_error = datetime.now()


class ProjectWatchStatus(BaseModel):
    """Project with its watch status."""

    name: str = Field(..., description="Name of the project")
    path: str = Field(..., description="Path to the project")
    watch_status: Optional[WatchServiceState] = Field(
        None, description="Watch status information for the project"
    )


class ProjectItem(BaseModel):
    """Simple representation of a project."""

    name: str
    path: str
    is_default: bool = False

    @property
    def permalink(self) -> str:  # pragma: no cover
        return generate_permalink(self.name)

    @property
    def home(self) -> Path:  # pragma: no cover
        return Path(self.path).expanduser()

    @property
    def project_url(self) -> str:  # pragma: no cover
        return f"/{generate_permalink(self.name)}"


class ProjectList(BaseModel):
    """Response model for listing projects."""

    projects: List[ProjectItem]
    default_project: str


class ProjectStatusResponse(BaseModel):
    """Response model for switching projects."""

    message: str = Field(..., description="Status message about the project switch")
    status: str = Field(..., description="Status of the switch (success or error)")
    default: bool = Field(..., description="True if the project was set as the default")
    old_project: Optional[ProjectItem] = Field(
        None, description="Information about the project being switched from"
    )
    new_project: Optional[ProjectItem] = Field(
        None, description="Information about the project being switched to"
    )

```

--------------------------------------------------------------------------------
/src/basic_memory/mcp/tools/chatgpt_tools.py:
--------------------------------------------------------------------------------

```python
"""ChatGPT-compatible MCP tools for Basic Memory.

These adapters expose Basic Memory's search/fetch functionality using the exact
tool names and response structure OpenAI's MCP clients expect: each call returns
a list containing a single `{"type": "text", "text": "{...json...}"}` item.
"""

import json
from typing import Any, Dict, List, Optional
from loguru import logger
from fastmcp import Context

from basic_memory.mcp.server import mcp
from basic_memory.mcp.tools.search import search_notes
from basic_memory.mcp.tools.read_note import read_note
from basic_memory.schemas.search import SearchResponse
from basic_memory.config import ConfigManager


def _format_search_results_for_chatgpt(results: SearchResponse) -> List[Dict[str, Any]]:
    """Format search results according to ChatGPT's expected schema.

    Returns a list of result objects with id, title, and url fields.
    """
    formatted_results = []

    for result in results.results:
        formatted_result = {
            "id": result.permalink or f"doc-{len(formatted_results)}",
            "title": result.title if result.title and result.title.strip() else "Untitled",
            "url": result.permalink or "",
        }
        formatted_results.append(formatted_result)

    return formatted_results


def _format_document_for_chatgpt(
    content: str, identifier: str, title: Optional[str] = None
) -> Dict[str, Any]:
    """Format document content according to ChatGPT's expected schema.

    Returns a document object with id, title, text, url, and metadata fields.
    """
    # Extract title from markdown content if not provided
    if not title and isinstance(content, str):
        lines = content.split("\n")
        if lines and lines[0].startswith("# "):
            title = lines[0][2:].strip()
        else:
            title = identifier.split("/")[-1].replace("-", " ").title()

    # Ensure title is never None
    if not title:
        title = "Untitled Document"

    # Handle error cases
    if isinstance(content, str) and content.startswith("# Note Not Found"):
        return {
            "id": identifier,
            "title": title or "Document Not Found",
            "text": content,
            "url": identifier,
            "metadata": {"error": "Document not found"},
        }

    return {
        "id": identifier,
        "title": title or "Untitled Document",
        "text": content,
        "url": identifier,
        "metadata": {"format": "markdown"},
    }


@mcp.tool(description="Search for content across the knowledge base")
async def search(
    query: str,
    context: Context | None = None,
) -> List[Dict[str, Any]]:
    """ChatGPT/OpenAI MCP search adapter returning a single text content item.

    Args:
        query: Search query (full-text syntax supported by `search_notes`)
        context: Optional FastMCP context passed through for auth/session data

    Returns:
        List with one dict: `{ "type": "text", "text": "{...JSON...}" }`
        where the JSON body contains `results`, `total_count`, and echo of `query`.
    """
    logger.info(f"ChatGPT search request: query='{query}'")

    try:
        # ChatGPT tools don't expose project parameter, so use default project
        config = ConfigManager().config
        default_project = config.default_project

        # Call underlying search_notes with sensible defaults for ChatGPT
        results = await search_notes.fn(
            query=query,
            project=default_project,  # Use default project for ChatGPT
            page=1,
            page_size=10,  # Reasonable default for ChatGPT consumption
            search_type="text",  # Default to full-text search
            context=context,
        )

        # Handle string error responses from search_notes
        if isinstance(results, str):
            logger.warning(f"Search failed with error: {results[:100]}...")
            search_results = {
                "results": [],
                "error": "Search failed",
                "error_details": results[:500],  # Truncate long error messages
            }
        else:
            # Format successful results for ChatGPT
            formatted_results = _format_search_results_for_chatgpt(results)
            search_results = {
                "results": formatted_results,
                "total_count": len(results.results),  # Use actual count from results
                "query": query,
            }
            logger.info(f"Search completed: {len(formatted_results)} results returned")

        # Return in MCP content array format as required by OpenAI
        return [{"type": "text", "text": json.dumps(search_results, ensure_ascii=False)}]

    except Exception as e:
        logger.error(f"ChatGPT search failed for query '{query}': {e}")
        error_results = {
            "results": [],
            "error": "Internal search error",
            "error_message": str(e)[:200],
        }
        return [{"type": "text", "text": json.dumps(error_results, ensure_ascii=False)}]


@mcp.tool(description="Fetch the full contents of a search result document")
async def fetch(
    id: str,
    context: Context | None = None,
) -> List[Dict[str, Any]]:
    """ChatGPT/OpenAI MCP fetch adapter returning a single text content item.

    Args:
        id: Document identifier (permalink, title, or memory URL)
        context: Optional FastMCP context passed through for auth/session data

    Returns:
        List with one dict: `{ "type": "text", "text": "{...JSON...}" }`
        where the JSON body includes `id`, `title`, `text`, `url`, and metadata.
    """
    logger.info(f"ChatGPT fetch request: id='{id}'")

    try:
        # ChatGPT tools don't expose project parameter, so use default project
        config = ConfigManager().config
        default_project = config.default_project

        # Call underlying read_note function
        content = await read_note.fn(
            identifier=id,
            project=default_project,  # Use default project for ChatGPT
            page=1,
            page_size=10,  # Default pagination
            context=context,
        )

        # Format the document for ChatGPT
        document = _format_document_for_chatgpt(content, id)

        logger.info(f"Fetch completed: id='{id}', content_length={len(document.get('text', ''))}")

        # Return in MCP content array format as required by OpenAI
        return [{"type": "text", "text": json.dumps(document, ensure_ascii=False)}]

    except Exception as e:
        logger.error(f"ChatGPT fetch failed for id '{id}': {e}")
        error_document = {
            "id": id,
            "title": "Fetch Error",
            "text": f"Failed to fetch document: {str(e)[:200]}",
            "url": id,
            "metadata": {"error": "Fetch failed"},
        }
        return [{"type": "text", "text": json.dumps(error_document, ensure_ascii=False)}]

```

--------------------------------------------------------------------------------
/tests/cli/test_import_claude_conversations.py:
--------------------------------------------------------------------------------

```python
"""Tests for import_claude command (chat conversations)."""

import json

import pytest
from typer.testing import CliRunner

from basic_memory.cli.app import app
from basic_memory.cli.commands import import_claude_conversations  # noqa
from basic_memory.config import get_project_config

# Set up CLI runner
runner = CliRunner()


@pytest.fixture
def sample_conversation():
    """Sample conversation data for testing."""
    return {
        "uuid": "test-uuid",
        "name": "Test Conversation",
        "created_at": "2025-01-05T20:55:32.499880+00:00",
        "updated_at": "2025-01-05T20:56:39.477600+00:00",
        "chat_messages": [
            {
                "uuid": "msg-1",
                "text": "Hello, this is a test",
                "sender": "human",
                "created_at": "2025-01-05T20:55:32.499880+00:00",
                "content": [{"type": "text", "text": "Hello, this is a test"}],
            },
            {
                "uuid": "msg-2",
                "text": "Response to test",
                "sender": "assistant",
                "created_at": "2025-01-05T20:55:40.123456+00:00",
                "content": [{"type": "text", "text": "Response to test"}],
            },
        ],
    }


@pytest.fixture
def sample_conversations_json(tmp_path, sample_conversation):
    """Create a sample conversations.json file."""
    json_file = tmp_path / "conversations.json"
    with open(json_file, "w", encoding="utf-8") as f:
        json.dump([sample_conversation], f)
    return json_file


def test_import_conversations_command_file_not_found(tmp_path):
    """Test error handling for nonexistent file."""
    nonexistent = tmp_path / "nonexistent.json"
    result = runner.invoke(app, ["import", "claude", "conversations", str(nonexistent)])
    assert result.exit_code == 1
    assert "File not found" in result.output


def test_import_conversations_command_success(tmp_path, sample_conversations_json, monkeypatch):
    """Test successful conversation import via command."""
    # Set up test environment
    monkeypatch.setenv("HOME", str(tmp_path))

    # Run import
    result = runner.invoke(
        app, ["import", "claude", "conversations", str(sample_conversations_json)]
    )
    assert result.exit_code == 0
    assert "Import complete" in result.output
    assert "Imported 1 conversations" in result.output
    assert "Containing 2 messages" in result.output


def test_import_conversations_command_invalid_json(tmp_path):
    """Test error handling for invalid JSON."""
    # Create invalid JSON file
    invalid_file = tmp_path / "invalid.json"
    invalid_file.write_text("not json")

    result = runner.invoke(app, ["import", "claude", "conversations", str(invalid_file)])
    assert result.exit_code == 1
    assert "Error during import" in result.output


def test_import_conversations_with_custom_folder(tmp_path, sample_conversations_json, monkeypatch):
    """Test import with custom conversations folder."""
    # Set up test environment
    config = get_project_config()
    config.home = tmp_path
    conversations_folder = "chats"

    # Run import
    result = runner.invoke(
        app,
        [
            "import",
            "claude",
            "conversations",
            str(sample_conversations_json),
            "--folder",
            conversations_folder,
        ],
    )
    assert result.exit_code == 0

    # Check files in custom folder
    conv_path = tmp_path / conversations_folder / "20250105-Test_Conversation.md"
    assert conv_path.exists()


def test_import_conversation_with_attachments(tmp_path):
    """Test importing conversation with attachments."""
    # Create conversation with attachments
    conversation = {
        "uuid": "test-uuid",
        "name": "Test With Attachments",
        "created_at": "2025-01-05T20:55:32.499880+00:00",
        "updated_at": "2025-01-05T20:56:39.477600+00:00",
        "chat_messages": [
            {
                "uuid": "msg-1",
                "text": "Here's a file",
                "sender": "human",
                "created_at": "2025-01-05T20:55:32.499880+00:00",
                "content": [{"type": "text", "text": "Here's a file"}],
                "attachments": [
                    {"file_name": "test.txt", "extracted_content": "Test file content"}
                ],
            }
        ],
    }

    json_file = tmp_path / "with_attachments.json"
    with open(json_file, "w", encoding="utf-8") as f:
        json.dump([conversation], f)

    config = get_project_config()
    # Set up environment
    config.home = tmp_path

    # Run import
    result = runner.invoke(app, ["import", "claude", "conversations", str(json_file)])
    assert result.exit_code == 0

    # Check attachment formatting
    conv_path = tmp_path / "conversations/20250105-Test_With_Attachments.md"
    content = conv_path.read_text(encoding="utf-8")
    assert "**Attachment: test.txt**" in content
    assert "```" in content
    assert "Test file content" in content


def test_import_conversation_with_none_text_values(tmp_path):
    """Test importing conversation with None text values in content array (issue #236)."""
    # Create conversation with None text values
    conversation = {
        "uuid": "test-uuid",
        "name": "Test With None Text",
        "created_at": "2025-01-05T20:55:32.499880+00:00",
        "updated_at": "2025-01-05T20:56:39.477600+00:00",
        "chat_messages": [
            {
                "uuid": "msg-1",
                "text": None,
                "sender": "human",
                "created_at": "2025-01-05T20:55:32.499880+00:00",
                "content": [
                    {"type": "text", "text": "Valid text here"},
                    {"type": "text", "text": None},  # This caused the TypeError
                    {"type": "text", "text": "More valid text"},
                ],
            },
            {
                "uuid": "msg-2",
                "text": None,
                "sender": "assistant",
                "created_at": "2025-01-05T20:55:40.123456+00:00",
                "content": [
                    {"type": "text", "text": None},  # All None case
                    {"type": "text", "text": None},
                ],
            },
        ],
    }

    json_file = tmp_path / "with_none_text.json"
    with open(json_file, "w", encoding="utf-8") as f:
        json.dump([conversation], f)

    config = get_project_config()
    config.home = tmp_path

    # Run import - should not fail with TypeError
    result = runner.invoke(app, ["import", "claude", "conversations", str(json_file)])
    assert result.exit_code == 0

    # Check that valid text is preserved and None values are filtered out
    conv_path = tmp_path / "conversations/20250105-Test_With_None_Text.md"
    assert conv_path.exists()
    content = conv_path.read_text(encoding="utf-8")
    assert "Valid text here" in content
    assert "More valid text" in content

```

--------------------------------------------------------------------------------
/src/basic_memory/file_utils.py:
--------------------------------------------------------------------------------

```python
"""Utilities for file operations."""

import hashlib
from pathlib import Path
import re
from typing import Any, Dict, Union

import aiofiles
import yaml
import frontmatter
from loguru import logger

from basic_memory.utils import FilePath


class FileError(Exception):
    """Base exception for file operations."""

    pass


class FileWriteError(FileError):
    """Raised when file operations fail."""

    pass


class ParseError(FileError):
    """Raised when parsing file content fails."""

    pass


async def compute_checksum(content: Union[str, bytes]) -> str:
    """
    Compute SHA-256 checksum of content.

    Args:
        content: Content to hash (either text string or bytes)

    Returns:
        SHA-256 hex digest

    Raises:
        FileError: If checksum computation fails
    """
    try:
        if isinstance(content, str):
            content = content.encode()
        return hashlib.sha256(content).hexdigest()
    except Exception as e:  # pragma: no cover
        logger.error(f"Failed to compute checksum: {e}")
        raise FileError(f"Failed to compute checksum: {e}")


async def write_file_atomic(path: FilePath, content: str) -> None:
    """
    Write file with atomic operation using temporary file.

    Uses aiofiles for true async I/O (non-blocking).

    Args:
        path: Target file path (Path or string)
        content: Content to write

    Raises:
        FileWriteError: If write operation fails
    """
    # Convert string to Path if needed
    path_obj = Path(path) if isinstance(path, str) else path
    temp_path = path_obj.with_suffix(".tmp")

    try:
        # Use aiofiles for non-blocking write
        async with aiofiles.open(temp_path, mode="w", encoding="utf-8") as f:
            await f.write(content)

        # Atomic rename (this is fast, doesn't need async)
        temp_path.replace(path_obj)
        logger.debug("Wrote file atomically", path=str(path_obj), content_length=len(content))
    except Exception as e:  # pragma: no cover
        temp_path.unlink(missing_ok=True)
        logger.error("Failed to write file", path=str(path_obj), error=str(e))
        raise FileWriteError(f"Failed to write file {path}: {e}")


def has_frontmatter(content: str) -> bool:
    """
    Check if content contains valid YAML frontmatter.

    Args:
        content: Content to check

    Returns:
        True if content has valid frontmatter markers (---), False otherwise
    """
    if not content:
        return False

    content = content.strip()
    if not content.startswith("---"):
        return False

    return "---" in content[3:]


def parse_frontmatter(content: str) -> Dict[str, Any]:
    """
    Parse YAML frontmatter from content.

    Args:
        content: Content with YAML frontmatter

    Returns:
        Dictionary of frontmatter values

    Raises:
        ParseError: If frontmatter is invalid or parsing fails
    """
    try:
        if not content.strip().startswith("---"):
            raise ParseError("Content has no frontmatter")

        # Split on first two occurrences of ---
        parts = content.split("---", 2)
        if len(parts) < 3:
            raise ParseError("Invalid frontmatter format")

        # Parse YAML
        try:
            frontmatter = yaml.safe_load(parts[1])
            # Handle empty frontmatter (None from yaml.safe_load)
            if frontmatter is None:
                return {}
            if not isinstance(frontmatter, dict):
                raise ParseError("Frontmatter must be a YAML dictionary")
            return frontmatter

        except yaml.YAMLError as e:
            raise ParseError(f"Invalid YAML in frontmatter: {e}")

    except Exception as e:  # pragma: no cover
        if not isinstance(e, ParseError):
            logger.error(f"Failed to parse frontmatter: {e}")
            raise ParseError(f"Failed to parse frontmatter: {e}")
        raise


def remove_frontmatter(content: str) -> str:
    """
    Remove YAML frontmatter from content.

    Args:
        content: Content with frontmatter

    Returns:
        Content with frontmatter removed, or original content if no frontmatter

    Raises:
        ParseError: If content starts with frontmatter marker but is malformed
    """
    content = content.strip()

    # Return as-is if no frontmatter marker
    if not content.startswith("---"):
        return content

    # Split on first two occurrences of ---
    parts = content.split("---", 2)
    if len(parts) < 3:
        raise ParseError("Invalid frontmatter format")

    return parts[2].strip()


def dump_frontmatter(post: frontmatter.Post) -> str:
    """
    Serialize frontmatter.Post to markdown with Obsidian-compatible YAML format.

    This function ensures that tags are formatted as YAML lists instead of JSON arrays:

    Good (Obsidian compatible):
    ---
    tags:
    - system
    - overview
    - reference
    ---

    Bad (current behavior):
    ---
    tags: ["system", "overview", "reference"]
    ---

    Args:
        post: frontmatter.Post object to serialize

    Returns:
        String containing markdown with properly formatted YAML frontmatter
    """
    if not post.metadata:
        # No frontmatter, just return content
        return post.content

    # Serialize YAML with block style for lists
    yaml_str = yaml.dump(
        post.metadata, sort_keys=False, allow_unicode=True, default_flow_style=False
    )

    # Construct the final markdown with frontmatter
    if post.content:
        return f"---\n{yaml_str}---\n\n{post.content}"
    else:
        return f"---\n{yaml_str}---\n"


def sanitize_for_filename(text: str, replacement: str = "-") -> str:
    """
    Sanitize string to be safe for use as a note title
    Replaces path separators and other problematic characters
    with hyphens.
    """
    # replace both POSIX and Windows path separators
    text = re.sub(r"[/\\]", replacement, text)

    # replace some other problematic chars
    text = re.sub(r'[<>:"|?*]', replacement, text)

    # compress multiple, repeated replacements
    text = re.sub(f"{re.escape(replacement)}+", replacement, text)

    return text.strip(replacement)


def sanitize_for_folder(folder: str) -> str:
    """
    Sanitize folder path to be safe for use in file system paths.
    Removes leading/trailing whitespace, compresses multiple slashes,
    and removes special characters except for /, -, and _.
    """
    if not folder:
        return ""

    sanitized = folder.strip()

    if sanitized.startswith("./"):
        sanitized = sanitized[2:]

    # ensure no special characters (except for a few that are allowed)
    sanitized = "".join(
        c for c in sanitized if c.isalnum() or c in (".", " ", "-", "_", "\\", "/")
    ).rstrip()

    # compress multiple, repeated instances of path separators
    sanitized = re.sub(r"[\\/]+", "/", sanitized)

    # trim any leading/trailing path separators
    sanitized = sanitized.strip("\\/")

    return sanitized

```
Page 3/17FirstPrevNextLast