#
tokens: 46939/50000 14/347 files (page 8/17)
lines: off (toggle) GitHub
raw markdown copy
This is page 8 of 17. Use http://codebase.md/basicmachines-co/basic-memory?lines=false&page={x} to view the full context.

# Directory Structure

```
├── .claude
│   ├── agents
│   │   ├── python-developer.md
│   │   └── system-architect.md
│   └── commands
│       ├── release
│       │   ├── beta.md
│       │   ├── changelog.md
│       │   ├── release-check.md
│       │   └── release.md
│       ├── spec.md
│       └── test-live.md
├── .dockerignore
├── .github
│   ├── dependabot.yml
│   ├── ISSUE_TEMPLATE
│   │   ├── bug_report.md
│   │   ├── config.yml
│   │   ├── documentation.md
│   │   └── feature_request.md
│   └── workflows
│       ├── claude-code-review.yml
│       ├── claude-issue-triage.yml
│       ├── claude.yml
│       ├── dev-release.yml
│       ├── docker.yml
│       ├── pr-title.yml
│       ├── release.yml
│       └── test.yml
├── .gitignore
├── .python-version
├── CHANGELOG.md
├── CITATION.cff
├── CLA.md
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── docker-compose.yml
├── Dockerfile
├── docs
│   ├── ai-assistant-guide-extended.md
│   ├── character-handling.md
│   ├── cloud-cli.md
│   └── Docker.md
├── justfile
├── LICENSE
├── llms-install.md
├── pyproject.toml
├── README.md
├── SECURITY.md
├── smithery.yaml
├── specs
│   ├── SPEC-1 Specification-Driven Development Process.md
│   ├── SPEC-10 Unified Deployment Workflow and Event Tracking.md
│   ├── SPEC-11 Basic Memory API Performance Optimization.md
│   ├── SPEC-12 OpenTelemetry Observability.md
│   ├── SPEC-13 CLI Authentication with Subscription Validation.md
│   ├── SPEC-14 Cloud Git Versioning & GitHub Backup.md
│   ├── SPEC-14- Cloud Git Versioning & GitHub Backup.md
│   ├── SPEC-15 Configuration Persistence via Tigris for Cloud Tenants.md
│   ├── SPEC-16 MCP Cloud Service Consolidation.md
│   ├── SPEC-17 Semantic Search with ChromaDB.md
│   ├── SPEC-18 AI Memory Management Tool.md
│   ├── SPEC-19 Sync Performance and Memory Optimization.md
│   ├── SPEC-2 Slash Commands Reference.md
│   ├── SPEC-20 Simplified Project-Scoped Rclone Sync.md
│   ├── SPEC-3 Agent Definitions.md
│   ├── SPEC-4 Notes Web UI Component Architecture.md
│   ├── SPEC-5 CLI Cloud Upload via WebDAV.md
│   ├── SPEC-6 Explicit Project Parameter Architecture.md
│   ├── SPEC-7 POC to spike Tigris Turso for local access to cloud data.md
│   ├── SPEC-8 TigrisFS Integration.md
│   ├── SPEC-9 Multi-Project Bidirectional Sync Architecture.md
│   ├── SPEC-9 Signed Header Tenant Information.md
│   └── SPEC-9-1 Follow-Ups- Conflict, Sync, and Observability.md
├── src
│   └── basic_memory
│       ├── __init__.py
│       ├── alembic
│       │   ├── alembic.ini
│       │   ├── env.py
│       │   ├── migrations.py
│       │   ├── script.py.mako
│       │   └── versions
│       │       ├── 3dae7c7b1564_initial_schema.py
│       │       ├── 502b60eaa905_remove_required_from_entity_permalink.py
│       │       ├── 5fe1ab1ccebe_add_projects_table.py
│       │       ├── 647e7a75e2cd_project_constraint_fix.py
│       │       ├── 9d9c1cb7d8f5_add_mtime_and_size_columns_to_entity_.py
│       │       ├── a1b2c3d4e5f6_fix_project_foreign_keys.py
│       │       ├── b3c3938bacdb_relation_to_name_unique_index.py
│       │       ├── cc7172b46608_update_search_index_schema.py
│       │       └── e7e1f4367280_add_scan_watermark_tracking_to_project.py
│       ├── api
│       │   ├── __init__.py
│       │   ├── app.py
│       │   ├── routers
│       │   │   ├── __init__.py
│       │   │   ├── directory_router.py
│       │   │   ├── importer_router.py
│       │   │   ├── knowledge_router.py
│       │   │   ├── management_router.py
│       │   │   ├── memory_router.py
│       │   │   ├── project_router.py
│       │   │   ├── prompt_router.py
│       │   │   ├── resource_router.py
│       │   │   ├── search_router.py
│       │   │   └── utils.py
│       │   └── template_loader.py
│       ├── cli
│       │   ├── __init__.py
│       │   ├── app.py
│       │   ├── auth.py
│       │   ├── commands
│       │   │   ├── __init__.py
│       │   │   ├── cloud
│       │   │   │   ├── __init__.py
│       │   │   │   ├── api_client.py
│       │   │   │   ├── bisync_commands.py
│       │   │   │   ├── cloud_utils.py
│       │   │   │   ├── core_commands.py
│       │   │   │   ├── rclone_commands.py
│       │   │   │   ├── rclone_config.py
│       │   │   │   ├── rclone_installer.py
│       │   │   │   ├── upload_command.py
│       │   │   │   └── upload.py
│       │   │   ├── command_utils.py
│       │   │   ├── db.py
│       │   │   ├── import_chatgpt.py
│       │   │   ├── import_claude_conversations.py
│       │   │   ├── import_claude_projects.py
│       │   │   ├── import_memory_json.py
│       │   │   ├── mcp.py
│       │   │   ├── project.py
│       │   │   ├── status.py
│       │   │   └── tool.py
│       │   └── main.py
│       ├── config.py
│       ├── db.py
│       ├── deps.py
│       ├── file_utils.py
│       ├── ignore_utils.py
│       ├── importers
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── chatgpt_importer.py
│       │   ├── claude_conversations_importer.py
│       │   ├── claude_projects_importer.py
│       │   ├── memory_json_importer.py
│       │   └── utils.py
│       ├── markdown
│       │   ├── __init__.py
│       │   ├── entity_parser.py
│       │   ├── markdown_processor.py
│       │   ├── plugins.py
│       │   ├── schemas.py
│       │   └── utils.py
│       ├── mcp
│       │   ├── __init__.py
│       │   ├── async_client.py
│       │   ├── project_context.py
│       │   ├── prompts
│       │   │   ├── __init__.py
│       │   │   ├── ai_assistant_guide.py
│       │   │   ├── continue_conversation.py
│       │   │   ├── recent_activity.py
│       │   │   ├── search.py
│       │   │   └── utils.py
│       │   ├── resources
│       │   │   ├── ai_assistant_guide.md
│       │   │   └── project_info.py
│       │   ├── server.py
│       │   └── tools
│       │       ├── __init__.py
│       │       ├── build_context.py
│       │       ├── canvas.py
│       │       ├── chatgpt_tools.py
│       │       ├── delete_note.py
│       │       ├── edit_note.py
│       │       ├── list_directory.py
│       │       ├── move_note.py
│       │       ├── project_management.py
│       │       ├── read_content.py
│       │       ├── read_note.py
│       │       ├── recent_activity.py
│       │       ├── search.py
│       │       ├── utils.py
│       │       ├── view_note.py
│       │       └── write_note.py
│       ├── models
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── knowledge.py
│       │   ├── project.py
│       │   └── search.py
│       ├── repository
│       │   ├── __init__.py
│       │   ├── entity_repository.py
│       │   ├── observation_repository.py
│       │   ├── project_info_repository.py
│       │   ├── project_repository.py
│       │   ├── relation_repository.py
│       │   ├── repository.py
│       │   └── search_repository.py
│       ├── schemas
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── cloud.py
│       │   ├── delete.py
│       │   ├── directory.py
│       │   ├── importer.py
│       │   ├── memory.py
│       │   ├── project_info.py
│       │   ├── prompt.py
│       │   ├── request.py
│       │   ├── response.py
│       │   ├── search.py
│       │   └── sync_report.py
│       ├── services
│       │   ├── __init__.py
│       │   ├── context_service.py
│       │   ├── directory_service.py
│       │   ├── entity_service.py
│       │   ├── exceptions.py
│       │   ├── file_service.py
│       │   ├── initialization.py
│       │   ├── link_resolver.py
│       │   ├── project_service.py
│       │   ├── search_service.py
│       │   └── service.py
│       ├── sync
│       │   ├── __init__.py
│       │   ├── background_sync.py
│       │   ├── sync_service.py
│       │   └── watch_service.py
│       ├── templates
│       │   └── prompts
│       │       ├── continue_conversation.hbs
│       │       └── search.hbs
│       └── utils.py
├── test-int
│   ├── BENCHMARKS.md
│   ├── cli
│   │   ├── test_project_commands_integration.py
│   │   └── test_version_integration.py
│   ├── conftest.py
│   ├── mcp
│   │   ├── test_build_context_underscore.py
│   │   ├── test_build_context_validation.py
│   │   ├── test_chatgpt_tools_integration.py
│   │   ├── test_default_project_mode_integration.py
│   │   ├── test_delete_note_integration.py
│   │   ├── test_edit_note_integration.py
│   │   ├── test_list_directory_integration.py
│   │   ├── test_move_note_integration.py
│   │   ├── test_project_management_integration.py
│   │   ├── test_project_state_sync_integration.py
│   │   ├── test_read_content_integration.py
│   │   ├── test_read_note_integration.py
│   │   ├── test_search_integration.py
│   │   ├── test_single_project_mcp_integration.py
│   │   └── test_write_note_integration.py
│   ├── test_db_wal_mode.py
│   ├── test_disable_permalinks_integration.py
│   └── test_sync_performance_benchmark.py
├── tests
│   ├── __init__.py
│   ├── api
│   │   ├── conftest.py
│   │   ├── test_async_client.py
│   │   ├── test_continue_conversation_template.py
│   │   ├── test_directory_router.py
│   │   ├── test_importer_router.py
│   │   ├── test_knowledge_router.py
│   │   ├── test_management_router.py
│   │   ├── test_memory_router.py
│   │   ├── test_project_router_operations.py
│   │   ├── test_project_router.py
│   │   ├── test_prompt_router.py
│   │   ├── test_relation_background_resolution.py
│   │   ├── test_resource_router.py
│   │   ├── test_search_router.py
│   │   ├── test_search_template.py
│   │   ├── test_template_loader_helpers.py
│   │   └── test_template_loader.py
│   ├── cli
│   │   ├── conftest.py
│   │   ├── test_cli_tools.py
│   │   ├── test_cloud_authentication.py
│   │   ├── test_ignore_utils.py
│   │   ├── test_import_chatgpt.py
│   │   ├── test_import_claude_conversations.py
│   │   ├── test_import_claude_projects.py
│   │   ├── test_import_memory_json.py
│   │   ├── test_project_add_with_local_path.py
│   │   └── test_upload.py
│   ├── conftest.py
│   ├── db
│   │   └── test_issue_254_foreign_key_constraints.py
│   ├── importers
│   │   ├── test_importer_base.py
│   │   └── test_importer_utils.py
│   ├── markdown
│   │   ├── __init__.py
│   │   ├── test_date_frontmatter_parsing.py
│   │   ├── test_entity_parser_error_handling.py
│   │   ├── test_entity_parser.py
│   │   ├── test_markdown_plugins.py
│   │   ├── test_markdown_processor.py
│   │   ├── test_observation_edge_cases.py
│   │   ├── test_parser_edge_cases.py
│   │   ├── test_relation_edge_cases.py
│   │   └── test_task_detection.py
│   ├── mcp
│   │   ├── conftest.py
│   │   ├── test_obsidian_yaml_formatting.py
│   │   ├── test_permalink_collision_file_overwrite.py
│   │   ├── test_prompts.py
│   │   ├── test_resources.py
│   │   ├── test_tool_build_context.py
│   │   ├── test_tool_canvas.py
│   │   ├── test_tool_delete_note.py
│   │   ├── test_tool_edit_note.py
│   │   ├── test_tool_list_directory.py
│   │   ├── test_tool_move_note.py
│   │   ├── test_tool_read_content.py
│   │   ├── test_tool_read_note.py
│   │   ├── test_tool_recent_activity.py
│   │   ├── test_tool_resource.py
│   │   ├── test_tool_search.py
│   │   ├── test_tool_utils.py
│   │   ├── test_tool_view_note.py
│   │   ├── test_tool_write_note.py
│   │   └── tools
│   │       └── test_chatgpt_tools.py
│   ├── Non-MarkdownFileSupport.pdf
│   ├── repository
│   │   ├── test_entity_repository_upsert.py
│   │   ├── test_entity_repository.py
│   │   ├── test_entity_upsert_issue_187.py
│   │   ├── test_observation_repository.py
│   │   ├── test_project_info_repository.py
│   │   ├── test_project_repository.py
│   │   ├── test_relation_repository.py
│   │   ├── test_repository.py
│   │   ├── test_search_repository_edit_bug_fix.py
│   │   └── test_search_repository.py
│   ├── schemas
│   │   ├── test_base_timeframe_minimum.py
│   │   ├── test_memory_serialization.py
│   │   ├── test_memory_url_validation.py
│   │   ├── test_memory_url.py
│   │   ├── test_schemas.py
│   │   └── test_search.py
│   ├── Screenshot.png
│   ├── services
│   │   ├── test_context_service.py
│   │   ├── test_directory_service.py
│   │   ├── test_entity_service_disable_permalinks.py
│   │   ├── test_entity_service.py
│   │   ├── test_file_service.py
│   │   ├── test_initialization.py
│   │   ├── test_link_resolver.py
│   │   ├── test_project_removal_bug.py
│   │   ├── test_project_service_operations.py
│   │   ├── test_project_service.py
│   │   └── test_search_service.py
│   ├── sync
│   │   ├── test_character_conflicts.py
│   │   ├── test_sync_service_incremental.py
│   │   ├── test_sync_service.py
│   │   ├── test_sync_wikilink_issue.py
│   │   ├── test_tmp_files.py
│   │   ├── test_watch_service_edge_cases.py
│   │   ├── test_watch_service_reload.py
│   │   └── test_watch_service.py
│   ├── test_config.py
│   ├── test_db_migration_deduplication.py
│   ├── test_deps.py
│   ├── test_production_cascade_delete.py
│   ├── test_rclone_commands.py
│   └── utils
│       ├── test_file_utils.py
│       ├── test_frontmatter_obsidian_compatible.py
│       ├── test_parse_tags.py
│       ├── test_permalink_formatting.py
│       ├── test_utf8_handling.py
│       └── test_validate_project_path.py
├── uv.lock
├── v0.15.0-RELEASE-DOCS.md
└── v15-docs
    ├── api-performance.md
    ├── background-relations.md
    ├── basic-memory-home.md
    ├── bug-fixes.md
    ├── chatgpt-integration.md
    ├── cloud-authentication.md
    ├── cloud-bisync.md
    ├── cloud-mode-usage.md
    ├── cloud-mount.md
    ├── default-project-mode.md
    ├── env-file-removal.md
    ├── env-var-overrides.md
    ├── explicit-project-parameter.md
    ├── gitignore-integration.md
    ├── project-root-env-var.md
    ├── README.md
    └── sqlite-performance.md
```

# Files

--------------------------------------------------------------------------------
/test-int/mcp/test_single_project_mcp_integration.py:
--------------------------------------------------------------------------------

```python
"""
Integration tests for single project mode MCP functionality.

Tests the --project constraint feature that restricts MCP server to a single project,
covering project override behavior, project management tool restrictions, and
content tool functionality in constrained mode.
"""

import os
import pytest
from fastmcp import Client


@pytest.mark.asyncio
async def test_project_constraint_override_content_tools(mcp_server, app, test_project):
    """Test that content tools use constrained project even when different project specified."""

    # Set up project constraint
    os.environ["BASIC_MEMORY_MCP_PROJECT"] = test_project.name

    try:
        async with Client(mcp_server) as client:
            # Try to write to a different project - should be overridden
            result = await client.call_tool(
                "write_note",
                {
                    "project": "some-other-project",  # Should be ignored
                    "title": "Constraint Test Note",
                    "folder": "test",
                    "content": "# Constraint Test\n\nThis should go to the constrained project.",
                    "tags": "constraint,test",
                },
            )

            assert len(result.content) == 1
            response_text = result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]

            # Should use the constrained project, not the requested one
            assert f"project: {test_project.name}" in response_text
            assert "# Created note" in response_text
            assert "file_path: test/Constraint Test Note.md" in response_text
            assert f"[Session: Using project '{test_project.name}']" in response_text

    finally:
        # Clean up environment variable
        if "BASIC_MEMORY_MCP_PROJECT" in os.environ:
            del os.environ["BASIC_MEMORY_MCP_PROJECT"]


@pytest.mark.asyncio
async def test_project_constraint_read_note_override(mcp_server, app, test_project):
    """Test that read_note also respects project constraint."""

    # Set up project constraint
    os.environ["BASIC_MEMORY_MCP_PROJECT"] = test_project.name

    try:
        async with Client(mcp_server) as client:
            # First create a note
            await client.call_tool(
                "write_note",
                {
                    "project": test_project.name,
                    "title": "Read Test Note",
                    "folder": "test",
                    "content": "# Read Test\n\nContent for reading test.",
                },
            )

            # Try to read from different project - should be overridden
            result = await client.call_tool(
                "read_note",
                {
                    "project": "wrong-project",  # Should be ignored
                    "identifier": "Read Test Note",
                },
            )

            assert len(result.content) == 1
            response_text = result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]

            # Should find note in constrained project
            assert "# Read Test" in response_text
            # read_note returns the note content, not a summary with project info

    finally:
        if "BASIC_MEMORY_MCP_PROJECT" in os.environ:
            del os.environ["BASIC_MEMORY_MCP_PROJECT"]


@pytest.mark.asyncio
async def test_project_constraint_search_notes_override(mcp_server, app, test_project):
    """Test that search_notes respects project constraint."""

    # Set up project constraint
    os.environ["BASIC_MEMORY_MCP_PROJECT"] = test_project.name

    try:
        async with Client(mcp_server) as client:
            # First create a searchable note
            await client.call_tool(
                "write_note",
                {
                    "project": test_project.name,
                    "title": "Searchable Note",
                    "folder": "test",
                    "content": "# Searchable\n\nThis content has unique searchable terms.",
                },
            )

            # Try to search in different project - should be overridden
            result = await client.call_tool(
                "search_notes",
                {
                    "project": "different-project",  # Should be ignored
                    "query": "searchable terms",
                },
            )

            assert len(result.content) == 1
            response_text = result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]

            # Should find results in constrained project
            assert "Searchable Note" in response_text
            # search_notes returns search results, check if it found the note

    finally:
        if "BASIC_MEMORY_MCP_PROJECT" in os.environ:
            del os.environ["BASIC_MEMORY_MCP_PROJECT"]


@pytest.mark.asyncio
async def test_list_projects_constrained_mode(mcp_server, app, test_project):
    """Test that list_memory_projects shows only constrained project."""

    # Set up project constraint
    os.environ["BASIC_MEMORY_MCP_PROJECT"] = test_project.name

    try:
        async with Client(mcp_server) as client:
            result = await client.call_tool("list_memory_projects", {})

            assert len(result.content) == 1
            response_text = result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]

            # Should show constraint message
            assert "MCP server is constrained to a single project" in response_text

    finally:
        if "BASIC_MEMORY_MCP_PROJECT" in os.environ:
            del os.environ["BASIC_MEMORY_MCP_PROJECT"]


@pytest.mark.asyncio
async def test_create_project_disabled_in_constrained_mode(mcp_server, app, test_project):
    """Test that create_memory_project is disabled when server is constrained."""

    # Set up project constraint
    os.environ["BASIC_MEMORY_MCP_PROJECT"] = test_project.name

    try:
        async with Client(mcp_server) as client:
            result = await client.call_tool(
                "create_memory_project",
                {"project_name": "new-project", "project_path": "/tmp/new-project"},
            )

            assert len(result.content) == 1
            response_text = result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]

            # Should show error message
            assert "# Error" in response_text
            assert "Project creation disabled" in response_text
            assert f"constrained to project '{test_project.name}'" in response_text
            assert "Use the CLI to create projects:" in response_text
            assert 'basic-memory project add "new-project" "/tmp/new-project"' in response_text

    finally:
        if "BASIC_MEMORY_MCP_PROJECT" in os.environ:
            del os.environ["BASIC_MEMORY_MCP_PROJECT"]


@pytest.mark.asyncio
async def test_delete_project_disabled_in_constrained_mode(mcp_server, app, test_project):
    """Test that delete_project is disabled when server is constrained."""

    # Set up project constraint
    os.environ["BASIC_MEMORY_MCP_PROJECT"] = test_project.name

    try:
        async with Client(mcp_server) as client:
            result = await client.call_tool("delete_project", {"project_name": "some-project"})

            assert len(result.content) == 1
            response_text = result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]

            # Should show error message
            assert "# Error" in response_text
            assert "Project deletion disabled" in response_text
            assert f"constrained to project '{test_project.name}'" in response_text
            assert "Use the CLI to delete projects:" in response_text
            assert 'basic-memory project remove "some-project"' in response_text

    finally:
        if "BASIC_MEMORY_MCP_PROJECT" in os.environ:
            del os.environ["BASIC_MEMORY_MCP_PROJECT"]


@pytest.mark.asyncio
async def test_normal_mode_without_constraint(mcp_server, app, test_project):
    """Test that tools work normally when no constraint is set."""

    # Ensure no constraint is set
    if "BASIC_MEMORY_MCP_PROJECT" in os.environ:
        del os.environ["BASIC_MEMORY_MCP_PROJECT"]

    async with Client(mcp_server) as client:
        # Test write_note works with explicit project
        result = await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Normal Mode Note",
                "folder": "test",
                "content": "# Normal Mode\n\nThis should work normally.",
            },
        )

        assert len(result.content) == 1
        response_text = result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]
        assert f"project: {test_project.name}" in response_text
        assert "# Created note" in response_text

        # Test list_memory_projects works normally
        list_result = await client.call_tool("list_memory_projects", {})
        list_text = list_result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]

        # Should not show constraint message
        assert "MCP server is constrained to a single project" not in list_text


@pytest.mark.asyncio
async def test_constraint_with_multiple_content_tools(mcp_server, app, test_project):
    """Test that constraint works across multiple different content tools."""

    # Set up project constraint
    os.environ["BASIC_MEMORY_MCP_PROJECT"] = test_project.name

    try:
        async with Client(mcp_server) as client:
            # Test write_note
            write_result = await client.call_tool(
                "write_note",
                {
                    "project": "wrong-project",
                    "title": "Multi Tool Test",
                    "folder": "test",
                    "content": "# Multi Tool Test\n\n- [note] Testing multiple tools",
                },
            )
            assert f"project: {test_project.name}" in write_result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]

            # Test read_note
            read_result = await client.call_tool(
                "read_note", {"project": "another-wrong-project", "identifier": "Multi Tool Test"}
            )
            # Should successfully find the note (proving constraint worked)
            assert "# Multi Tool Test" in read_result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]

            # Test search_notes
            search_result = await client.call_tool(
                "search_notes", {"project": "yet-another-wrong-project", "query": "multiple tools"}
            )
            # Should find results (proving constraint worked)
            assert "Multi Tool Test" in search_result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]

    finally:
        if "BASIC_MEMORY_MCP_PROJECT" in os.environ:
            del os.environ["BASIC_MEMORY_MCP_PROJECT"]


@pytest.mark.asyncio
async def test_constraint_environment_cleanup(mcp_server, app, test_project):
    """Test that removing constraint restores normal behavior."""

    # Set constraint
    os.environ["BASIC_MEMORY_MCP_PROJECT"] = test_project.name

    async with Client(mcp_server) as client:
        # Verify constraint is active
        constrained_result = await client.call_tool("list_memory_projects", {})
        assert "MCP server is constrained to a single project" in constrained_result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]

        # Remove constraint
        del os.environ["BASIC_MEMORY_MCP_PROJECT"]

        # Verify normal behavior is restored
        normal_result = await client.call_tool("list_memory_projects", {})
        normal_text = normal_result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]

        assert "constrained to a single project" not in normal_text


@pytest.mark.asyncio
async def test_constraint_with_invalid_project_override(mcp_server, app, test_project):
    """Test constraint behavior when trying to override with invalid project names."""

    # Set up project constraint
    os.environ["BASIC_MEMORY_MCP_PROJECT"] = test_project.name

    try:
        async with Client(mcp_server) as client:
            # Try various invalid project names - should all be overridden
            invalid_projects = [
                "non-existent-project",
                "",
                "project-with-special-chars!@#",
                "a" * 100,  # Very long name
            ]

            for i, invalid_project in enumerate(invalid_projects):
                result = await client.call_tool(
                    "write_note",
                    {
                        "project": invalid_project,
                        "title": f"Test Invalid {i} {invalid_project[:5]}",
                        "folder": "test",
                        "content": f"Testing with invalid project: {invalid_project}",
                    },
                )

                # Should still use the constrained project
                response_text = result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]
                assert f"project: {test_project.name}" in response_text
                # Should create or update successfully
                assert "# Created note" in response_text or "# Updated note" in response_text

    finally:
        if "BASIC_MEMORY_MCP_PROJECT" in os.environ:
            del os.environ["BASIC_MEMORY_MCP_PROJECT"]

```

--------------------------------------------------------------------------------
/tests/services/test_link_resolver.py:
--------------------------------------------------------------------------------

```python
"""Tests for link resolution service."""

from datetime import datetime, timezone

import pytest

import pytest_asyncio

from basic_memory.schemas.base import Entity as EntitySchema
from basic_memory.services.link_resolver import LinkResolver
from basic_memory.models.knowledge import Entity as EntityModel


@pytest_asyncio.fixture
async def test_entities(entity_service, file_service):
    """Create a set of test entities.

    ├── components
    │   ├── Auth Service.md
    │   └── Core Service.md
    ├── config
    │   └── Service Config.md
    └── specs
        └── Core Features.md

    """

    e1, _ = await entity_service.create_or_update_entity(
        EntitySchema(
            title="Core Service",
            entity_type="component",
            folder="components",
            project=entity_service.repository.project_id,
        )
    )
    e2, _ = await entity_service.create_or_update_entity(
        EntitySchema(
            title="Service Config",
            entity_type="config",
            folder="config",
            project=entity_service.repository.project_id,
        )
    )
    e3, _ = await entity_service.create_or_update_entity(
        EntitySchema(
            title="Auth Service",
            entity_type="component",
            folder="components",
            project=entity_service.repository.project_id,
        )
    )
    e4, _ = await entity_service.create_or_update_entity(
        EntitySchema(
            title="Core Features",
            entity_type="specs",
            folder="specs",
            project=entity_service.repository.project_id,
        )
    )
    e5, _ = await entity_service.create_or_update_entity(
        EntitySchema(
            title="Sub Features 1",
            entity_type="specs",
            folder="specs/subspec",
            project=entity_service.repository.project_id,
        )
    )
    e6, _ = await entity_service.create_or_update_entity(
        EntitySchema(
            title="Sub Features 2",
            entity_type="specs",
            folder="specs/subspec",
            project=entity_service.repository.project_id,
        )
    )

    # non markdown entity
    e7 = await entity_service.repository.add(
        EntityModel(
            title="Image.png",
            entity_type="file",
            content_type="image/png",
            file_path="Image.png",
            created_at=datetime.now(timezone.utc),
            updated_at=datetime.now(timezone.utc),
            project_id=entity_service.repository.project_id,
        )
    )

    e8 = await entity_service.create_entity(  # duplicate title
        EntitySchema(
            title="Core Service",
            entity_type="component",
            folder="components2",
            project=entity_service.repository.project_id,
        )
    )

    return [e1, e2, e3, e4, e5, e6, e7, e8]


@pytest_asyncio.fixture
async def link_resolver(entity_repository, search_service, test_entities):
    """Create LinkResolver instance with indexed test data."""
    # Index all test entities
    for entity in test_entities:
        await search_service.index_entity(entity)

    return LinkResolver(entity_repository, search_service)


@pytest.mark.asyncio
async def test_exact_permalink_match(link_resolver, test_entities):
    """Test resolving a link that exactly matches a permalink."""
    entity = await link_resolver.resolve_link("components/core-service")
    assert entity.permalink == "components/core-service"


@pytest.mark.asyncio
async def test_exact_title_match(link_resolver, test_entities):
    """Test resolving a link that matches an entity title."""
    entity = await link_resolver.resolve_link("Core Service")
    assert entity.permalink == "components/core-service"


@pytest.mark.asyncio
async def test_duplicate_title_match(link_resolver, test_entities):
    """Test resolving a link that matches an entity title."""
    entity = await link_resolver.resolve_link("Core Service")
    assert entity.permalink == "components/core-service"


@pytest.mark.asyncio
async def test_fuzzy_title_partial_match(link_resolver):
    # Test partial match
    result = await link_resolver.resolve_link("Auth Serv")
    assert result is not None, "Did not find partial match"
    assert result.permalink == "components/auth-service"


@pytest.mark.asyncio
async def test_fuzzy_title_exact_match(link_resolver):
    # Test partial match
    result = await link_resolver.resolve_link("auth-service")
    assert result.permalink == "components/auth-service"


@pytest.mark.asyncio
async def test_link_text_normalization(link_resolver):
    """Test link text normalization."""
    # Basic normalization
    text, alias = link_resolver._normalize_link_text("[[Core Service]]")
    assert text == "Core Service"
    assert alias is None

    # With alias
    text, alias = link_resolver._normalize_link_text("[[Core Service|Main Service]]")
    assert text == "Core Service"
    assert alias == "Main Service"

    # Extra whitespace
    text, alias = link_resolver._normalize_link_text("  [[  Core Service  |  Main Service  ]]  ")
    assert text == "Core Service"
    assert alias == "Main Service"


@pytest.mark.asyncio
async def test_resolve_none(link_resolver):
    """Test resolving non-existent entity."""
    # Basic new entity
    assert await link_resolver.resolve_link("New Feature") is None


@pytest.mark.asyncio
async def test_resolve_file(link_resolver):
    """Test resolving non-existent entity."""
    # Basic new entity
    resolved = await link_resolver.resolve_link("Image.png")
    assert resolved is not None
    assert resolved.entity_type == "file"
    assert resolved.title == "Image.png"


@pytest.mark.asyncio
async def test_folder_title_pattern_with_md_extension(link_resolver, test_entities):
    """Test resolving folder/title patterns that need .md extension added.

    This tests the new logic added in step 4 of resolve_link that handles
    patterns like 'folder/title' by trying 'folder/title.md' as file path.
    """
    # Test folder/title pattern for markdown entities
    # "components/Core Service" should resolve to file path "components/Core Service.md"
    entity = await link_resolver.resolve_link("components/Core Service")
    assert entity is not None
    assert entity.permalink == "components/core-service"
    assert entity.file_path == "components/Core Service.md"

    # Test with different entity
    entity = await link_resolver.resolve_link("config/Service Config")
    assert entity is not None
    assert entity.permalink == "config/service-config"
    assert entity.file_path == "config/Service Config.md"

    # Test with nested folder structure
    entity = await link_resolver.resolve_link("specs/subspec/Sub Features 1")
    assert entity is not None
    assert entity.permalink == "specs/subspec/sub-features-1"
    assert entity.file_path == "specs/subspec/Sub Features 1.md"

    # Test that it doesn't try to add .md to things that already have it
    entity = await link_resolver.resolve_link("components/Core Service.md")
    assert entity is not None
    assert entity.permalink == "components/core-service"

    # Test that it doesn't try to add .md to single words (no slash)
    entity = await link_resolver.resolve_link("NonExistent")
    assert entity is None

    # Test that it doesn't interfere with exact permalink matches
    entity = await link_resolver.resolve_link("components/core-service")
    assert entity is not None
    assert entity.permalink == "components/core-service"


# Tests for strict mode parameter combinations
@pytest.mark.asyncio
async def test_strict_mode_parameter_combinations(link_resolver, test_entities):
    """Test all combinations of use_search and strict parameters."""

    # Test queries
    exact_match = "Auth Service"  # Should always work (unique title)
    fuzzy_match = "Auth Serv"  # Should only work with fuzzy search enabled
    non_existent = "Does Not Exist"  # Should never work

    # Case 1: use_search=True, strict=False (default behavior - fuzzy matching allowed)
    result = await link_resolver.resolve_link(exact_match, use_search=True, strict=False)
    assert result is not None
    assert result.permalink == "components/auth-service"

    result = await link_resolver.resolve_link(fuzzy_match, use_search=True, strict=False)
    assert result is not None  # Should find "Auth Service" via fuzzy matching
    assert result.permalink == "components/auth-service"

    result = await link_resolver.resolve_link(non_existent, use_search=True, strict=False)
    assert result is None

    # Case 2: use_search=True, strict=True (exact matches only, even with search enabled)
    result = await link_resolver.resolve_link(exact_match, use_search=True, strict=True)
    assert result is not None
    assert result.permalink == "components/auth-service"

    result = await link_resolver.resolve_link(fuzzy_match, use_search=True, strict=True)
    assert result is None  # Should NOT find via fuzzy matching in strict mode

    result = await link_resolver.resolve_link(non_existent, use_search=True, strict=True)
    assert result is None

    # Case 3: use_search=False, strict=False (no search, exact repository matches only)
    result = await link_resolver.resolve_link(exact_match, use_search=False, strict=False)
    assert result is not None
    assert result.permalink == "components/auth-service"

    result = await link_resolver.resolve_link(fuzzy_match, use_search=False, strict=False)
    assert result is None  # No search means no fuzzy matching

    result = await link_resolver.resolve_link(non_existent, use_search=False, strict=False)
    assert result is None

    # Case 4: use_search=False, strict=True (redundant but should work same as case 3)
    result = await link_resolver.resolve_link(exact_match, use_search=False, strict=True)
    assert result is not None
    assert result.permalink == "components/auth-service"

    result = await link_resolver.resolve_link(fuzzy_match, use_search=False, strict=True)
    assert result is None  # No search means no fuzzy matching

    result = await link_resolver.resolve_link(non_existent, use_search=False, strict=True)
    assert result is None


@pytest.mark.asyncio
async def test_exact_match_types_in_strict_mode(link_resolver, test_entities):
    """Test that all types of exact matches work in strict mode."""

    # 1. Exact permalink match
    result = await link_resolver.resolve_link("components/core-service", strict=True)
    assert result is not None
    assert result.permalink == "components/core-service"

    # 2. Exact title match
    result = await link_resolver.resolve_link("Core Service", strict=True)
    assert result is not None
    assert result.permalink == "components/core-service"

    # 3. Exact file path match
    result = await link_resolver.resolve_link("components/Core Service.md", strict=True)
    assert result is not None
    assert result.permalink == "components/core-service"

    # 4. Folder/title pattern with .md extension added
    result = await link_resolver.resolve_link("components/Core Service", strict=True)
    assert result is not None
    assert result.permalink == "components/core-service"

    # 5. Non-markdown file (Image.png)
    result = await link_resolver.resolve_link("Image.png", strict=True)
    assert result is not None
    assert result.title == "Image.png"


@pytest.mark.asyncio
async def test_fuzzy_matching_blocked_in_strict_mode(link_resolver, test_entities):
    """Test that various fuzzy matching scenarios are blocked in strict mode."""

    # Partial matches that would work in normal mode
    fuzzy_queries = [
        "Auth Serv",  # Partial title
        "auth-service",  # Lowercase permalink variation
        "Core",  # Single word from title
        "Service",  # Common word
        "Serv",  # Partial word
    ]

    for query in fuzzy_queries:
        # Should NOT work in strict mode
        strict_result = await link_resolver.resolve_link(query, strict=True)
        assert strict_result is None, f"Query '{query}' should return None in strict mode"


@pytest.mark.asyncio
async def test_link_normalization_with_strict_mode(link_resolver, test_entities):
    """Test that link normalization still works in strict mode."""

    # Test bracket removal and alias handling in strict mode
    queries_and_expected = [
        ("[[Core Service]]", "components/core-service"),
        ("[[Core Service|Main]]", "components/core-service"),  # Alias should be ignored
        ("  [[  Core Service  ]]  ", "components/core-service"),  # Extra whitespace
    ]

    for query, expected_permalink in queries_and_expected:
        result = await link_resolver.resolve_link(query, strict=True)
        assert result is not None, f"Query '{query}' should find entity in strict mode"
        assert result.permalink == expected_permalink


@pytest.mark.asyncio
async def test_duplicate_title_handling_in_strict_mode(link_resolver, test_entities):
    """Test how duplicate titles are handled in strict mode."""

    # "Core Service" appears twice in test data (components/core-service and components2/core-service)
    # In strict mode, if there are multiple exact title matches, it should still return the first one
    # (same behavior as normal mode for exact matches)

    result = await link_resolver.resolve_link("Core Service", strict=True)
    assert result is not None
    # Should return the first match (components/core-service based on test fixture order)
    assert result.permalink == "components/core-service"

```

--------------------------------------------------------------------------------
/tests/api/test_directory_router.py:
--------------------------------------------------------------------------------

```python
"""Tests for the directory router API endpoints."""

from unittest.mock import patch

import pytest

from basic_memory.schemas.directory import DirectoryNode


@pytest.mark.asyncio
async def test_get_directory_tree_endpoint(test_graph, client, project_url):
    """Test the get_directory_tree endpoint returns correctly structured data."""
    # Call the endpoint
    response = await client.get(f"{project_url}/directory/tree")

    # Verify response
    assert response.status_code == 200
    data = response.json()

    # Check that the response is a valid directory tree
    assert "name" in data
    assert "directory_path" in data
    assert "children" in data
    assert "type" in data

    # The root node should have children
    assert isinstance(data["children"], list)

    # Root name should be the project name or similar
    assert data["name"]

    # Root directory_path should be a string
    assert isinstance(data["directory_path"], str)


@pytest.mark.asyncio
async def test_get_directory_tree_structure(test_graph, client, project_url):
    """Test the structure of the directory tree returned by the endpoint."""
    # Call the endpoint
    response = await client.get(f"{project_url}/directory/tree")

    # Verify response
    assert response.status_code == 200
    data = response.json()

    # Function to recursively check each node in the tree
    def check_node_structure(node):
        assert "name" in node
        assert "directory_path" in node
        assert "children" in node
        assert "type" in node
        assert isinstance(node["children"], list)

        # Check each child recursively
        for child in node["children"]:
            check_node_structure(child)

    # Check the entire tree structure
    check_node_structure(data)


@pytest.mark.asyncio
async def test_get_directory_tree_mocked(client, project_url):
    """Test the get_directory_tree endpoint with a mocked service."""
    # Create a mock directory tree
    mock_tree = DirectoryNode(
        name="root",
        directory_path="/test",
        type="directory",
        children=[
            DirectoryNode(
                name="folder1",
                directory_path="/test/folder1",
                type="directory",
                children=[
                    DirectoryNode(
                        name="subfolder",
                        directory_path="/test/folder1/subfolder",
                        type="directory",
                        children=[],
                    )
                ],
            ),
            DirectoryNode(
                name="folder2", directory_path="/test/folder2", type="directory", children=[]
            ),
        ],
    )

    # Patch the directory service
    with patch(
        "basic_memory.services.directory_service.DirectoryService.get_directory_tree",
        return_value=mock_tree,
    ):
        # Call the endpoint
        response = await client.get(f"{project_url}/directory/tree")

        # Verify response
        assert response.status_code == 200
        data = response.json()

        # Check structure matches our mock
        assert data["name"] == "root"
        assert data["directory_path"] == "/test"
        assert data["type"] == "directory"
        assert len(data["children"]) == 2

        # Check first child
        folder1 = data["children"][0]
        assert folder1["name"] == "folder1"
        assert folder1["directory_path"] == "/test/folder1"
        assert folder1["type"] == "directory"
        assert len(folder1["children"]) == 1

        # Check subfolder
        subfolder = folder1["children"][0]
        assert subfolder["name"] == "subfolder"
        assert subfolder["directory_path"] == "/test/folder1/subfolder"
        assert subfolder["type"] == "directory"
        assert subfolder["children"] == []

        # Check second child
        folder2 = data["children"][1]
        assert folder2["name"] == "folder2"
        assert folder2["directory_path"] == "/test/folder2"
        assert folder2["type"] == "directory"
        assert folder2["children"] == []


@pytest.mark.asyncio
async def test_list_directory_endpoint_default(test_graph, client, project_url):
    """Test the list_directory endpoint with default parameters."""
    # Call the endpoint with default parameters
    response = await client.get(f"{project_url}/directory/list")

    # Verify response
    assert response.status_code == 200
    data = response.json()

    # Should return a list
    assert isinstance(data, list)

    # With test_graph, should return the "test" directory
    assert len(data) == 1
    assert data[0]["name"] == "test"
    assert data[0]["type"] == "directory"


@pytest.mark.asyncio
async def test_list_directory_endpoint_specific_path(test_graph, client, project_url):
    """Test the list_directory endpoint with specific directory path."""
    # Call the endpoint with /test directory
    response = await client.get(f"{project_url}/directory/list?dir_name=/test")

    # Verify response
    assert response.status_code == 200
    data = response.json()

    # Should return list of files in test directory
    assert isinstance(data, list)
    assert len(data) == 5

    # All should be files (no subdirectories in test_graph)
    for item in data:
        assert item["type"] == "file"
        assert item["name"].endswith(".md")


@pytest.mark.asyncio
async def test_list_directory_endpoint_with_glob(test_graph, client, project_url):
    """Test the list_directory endpoint with glob filtering."""
    # Call the endpoint with glob filter
    response = await client.get(
        f"{project_url}/directory/list?dir_name=/test&file_name_glob=*Connected*"
    )

    # Verify response
    assert response.status_code == 200
    data = response.json()

    # Should return only Connected Entity files
    assert isinstance(data, list)
    assert len(data) == 2

    file_names = {item["name"] for item in data}
    assert file_names == {"Connected Entity 1.md", "Connected Entity 2.md"}


@pytest.mark.asyncio
async def test_list_directory_endpoint_with_depth(test_graph, client, project_url):
    """Test the list_directory endpoint with depth control."""
    # Test depth=1 (default)
    response_depth_1 = await client.get(f"{project_url}/directory/list?dir_name=/&depth=1")
    assert response_depth_1.status_code == 200
    data_depth_1 = response_depth_1.json()
    assert len(data_depth_1) == 1  # Just the test directory

    # Test depth=2 (should include files in test directory)
    response_depth_2 = await client.get(f"{project_url}/directory/list?dir_name=/&depth=2")
    assert response_depth_2.status_code == 200
    data_depth_2 = response_depth_2.json()
    assert len(data_depth_2) == 6  # test directory + 5 files


@pytest.mark.asyncio
async def test_list_directory_endpoint_nonexistent_path(test_graph, client, project_url):
    """Test the list_directory endpoint with nonexistent directory."""
    # Call the endpoint with nonexistent directory
    response = await client.get(f"{project_url}/directory/list?dir_name=/nonexistent")

    # Verify response
    assert response.status_code == 200
    data = response.json()

    # Should return empty list
    assert isinstance(data, list)
    assert len(data) == 0


@pytest.mark.asyncio
async def test_list_directory_endpoint_validation_errors(client, project_url):
    """Test the list_directory endpoint with invalid parameters."""
    # Test depth too low
    response = await client.get(f"{project_url}/directory/list?depth=0")
    assert response.status_code == 422  # Validation error

    # Test depth too high
    response = await client.get(f"{project_url}/directory/list?depth=11")
    assert response.status_code == 422  # Validation error


@pytest.mark.asyncio
async def test_list_directory_endpoint_mocked(client, project_url):
    """Test the list_directory endpoint with mocked service."""
    # Create mock directory nodes
    mock_nodes = [
        DirectoryNode(
            name="folder1",
            directory_path="/folder1",
            type="directory",
        ),
        DirectoryNode(
            name="file1.md",
            directory_path="/file1.md",
            file_path="file1.md",
            type="file",
            title="File 1",
            permalink="file-1",
        ),
    ]

    # Patch the directory service
    with patch(
        "basic_memory.services.directory_service.DirectoryService.list_directory",
        return_value=mock_nodes,
    ):
        # Call the endpoint
        response = await client.get(f"{project_url}/directory/list?dir_name=/test")

        # Verify response
        assert response.status_code == 200
        data = response.json()

        # Check structure matches our mock
        assert isinstance(data, list)
        assert len(data) == 2

        # Check directory
        folder = next(item for item in data if item["type"] == "directory")
        assert folder["name"] == "folder1"
        assert folder["directory_path"] == "/folder1"

        # Check file
        file_item = next(item for item in data if item["type"] == "file")
        assert file_item["name"] == "file1.md"
        assert file_item["directory_path"] == "/file1.md"
        assert file_item["file_path"] == "file1.md"
        assert file_item["title"] == "File 1"
        assert file_item["permalink"] == "file-1"


@pytest.mark.asyncio
async def test_get_directory_structure_endpoint(test_graph, client, project_url):
    """Test the get_directory_structure endpoint returns folders only."""
    # Call the endpoint
    response = await client.get(f"{project_url}/directory/structure")

    # Verify response
    assert response.status_code == 200
    data = response.json()

    # Check that the response is a valid directory tree
    assert "name" in data
    assert "directory_path" in data
    assert "children" in data
    assert "type" in data
    assert data["type"] == "directory"

    # Root should be present
    assert data["name"] == "Root"
    assert data["directory_path"] == "/"

    # Should have the test directory
    assert len(data["children"]) == 1
    test_dir = data["children"][0]
    assert test_dir["name"] == "test"
    assert test_dir["type"] == "directory"
    assert test_dir["directory_path"] == "/test"

    # Should NOT have any files (test_graph has files but no subdirectories)
    assert len(test_dir["children"]) == 0

    # Verify no file metadata is present in directory nodes
    assert test_dir.get("entity_id") is None
    assert test_dir.get("content_type") is None
    assert test_dir.get("title") is None
    assert test_dir.get("permalink") is None


@pytest.mark.asyncio
async def test_get_directory_structure_empty(client, project_url):
    """Test the get_directory_structure endpoint with empty database."""
    # Call the endpoint
    response = await client.get(f"{project_url}/directory/structure")

    # Verify response
    assert response.status_code == 200
    data = response.json()

    # Should return root with no children
    assert data["name"] == "Root"
    assert data["directory_path"] == "/"
    assert data["type"] == "directory"
    assert len(data["children"]) == 0


@pytest.mark.asyncio
async def test_get_directory_structure_mocked(client, project_url):
    """Test the get_directory_structure endpoint with mocked service."""
    # Create a mock directory structure (folders only, no files)
    mock_structure = DirectoryNode(
        name="Root",
        directory_path="/",
        type="directory",
        children=[
            DirectoryNode(
                name="docs",
                directory_path="/docs",
                type="directory",
                children=[
                    DirectoryNode(
                        name="guides",
                        directory_path="/docs/guides",
                        type="directory",
                        children=[],
                    ),
                    DirectoryNode(
                        name="api",
                        directory_path="/docs/api",
                        type="directory",
                        children=[],
                    ),
                ],
            ),
            DirectoryNode(name="specs", directory_path="/specs", type="directory", children=[]),
        ],
    )

    # Patch the directory service
    with patch(
        "basic_memory.services.directory_service.DirectoryService.get_directory_structure",
        return_value=mock_structure,
    ):
        # Call the endpoint
        response = await client.get(f"{project_url}/directory/structure")

        # Verify response
        assert response.status_code == 200
        data = response.json()

        # Check structure matches our mock (folders only)
        assert data["name"] == "Root"
        assert data["directory_path"] == "/"
        assert data["type"] == "directory"
        assert len(data["children"]) == 2

        # Check docs directory
        docs = data["children"][0]
        assert docs["name"] == "docs"
        assert docs["directory_path"] == "/docs"
        assert docs["type"] == "directory"
        assert len(docs["children"]) == 2

        # Check subdirectories
        guides = docs["children"][0]
        assert guides["name"] == "guides"
        assert guides["directory_path"] == "/docs/guides"
        assert guides["type"] == "directory"
        assert guides["children"] == []

        api = docs["children"][1]
        assert api["name"] == "api"
        assert api["directory_path"] == "/docs/api"
        assert api["type"] == "directory"
        assert api["children"] == []

        # Check specs directory
        specs = data["children"][1]
        assert specs["name"] == "specs"
        assert specs["directory_path"] == "/specs"
        assert specs["type"] == "directory"
        assert specs["children"] == []

```

--------------------------------------------------------------------------------
/test-int/mcp/test_default_project_mode_integration.py:
--------------------------------------------------------------------------------

```python
"""
Integration tests for default project mode functionality.

Tests the default_project_mode configuration that allows tools to automatically
use the default_project when no project parameter is specified, covering
parameter resolution hierarchy and mode-specific behavior.
"""

import os
from pathlib import Path

import pytest
from fastmcp import Client
from unittest.mock import patch

from basic_memory.config import ConfigManager, BasicMemoryConfig


@pytest.mark.asyncio
async def test_default_project_mode_enabled_write_note(mcp_server, app, test_project):
    """Test that write_note uses default project when default_project_mode=true and no project specified."""

    # Mock config with default_project_mode enabled
    mock_config = BasicMemoryConfig(
        default_project=test_project.name,
        default_project_mode=True,
        projects={test_project.name: test_project.path},
    )

    with patch.object(ConfigManager, "config", mock_config):
        async with Client(mcp_server) as client:
            # Call write_note without project parameter
            result = await client.call_tool(
                "write_note",
                {
                    "title": "Default Mode Test",
                    "folder": "test",
                    "content": "# Default Mode Test\n\nThis should use the default project automatically.",
                    "tags": "default,mode,test",
                },
            )

            assert len(result.content) == 1
            response_text = result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]

            # Should use the default project
            assert f"project: {test_project.name}" in response_text
            assert "# Created note" in response_text
            assert "file_path: test/Default Mode Test.md" in response_text
            assert f"[Session: Using project '{test_project.name}']" in response_text


@pytest.mark.asyncio
async def test_default_project_mode_explicit_override(
    mcp_server, app, test_project, config_home, engine_factory
):
    """Test that explicit project parameter overrides default_project_mode."""

    # Create a second project for testing override
    engine, session_maker = engine_factory
    from basic_memory.repository.project_repository import ProjectRepository

    project_repository = ProjectRepository(session_maker)

    other_project = await project_repository.create(
        {
            "name": "other-project",
            "description": "Second project for testing",
            "path": str(config_home / "other-project"),
            "is_active": True,
            "is_default": False,
        }
    )

    # Mock config with default_project_mode enabled pointing to test_project
    mock_config = BasicMemoryConfig(
        default_project=test_project.name,
        default_project_mode=True,
        projects={test_project.name: test_project.path, other_project.name: other_project.path},
    )

    with patch.object(ConfigManager, "config", mock_config):
        async with Client(mcp_server) as client:
            # Call write_note with explicit project parameter (should override default)
            result = await client.call_tool(
                "write_note",
                {
                    "title": "Override Test",
                    "folder": "test",
                    "content": "# Override Test\n\nThis should go to the explicitly specified project.",
                    "project": other_project.name,  # Explicit override
                },
            )

            assert len(result.content) == 1
            response_text = result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]

            # Should use the explicitly specified project, not default
            assert f"project: {other_project.name}" in response_text
            assert "# Created note" in response_text
            assert f"[Session: Using project '{other_project.name}']" in response_text


@pytest.mark.asyncio
async def test_default_project_mode_disabled_requires_project(mcp_server, app, test_project):
    """Test that tools require project parameter when default_project_mode=false."""

    # Mock config with default_project_mode disabled
    mock_config = BasicMemoryConfig(
        default_project=test_project.name,
        default_project_mode=False,  # Disabled
        projects={test_project.name: test_project.path},
    )

    with patch.object(ConfigManager, "config", mock_config):
        async with Client(mcp_server) as client:
            # Call write_note without project parameter - should fail
            with pytest.raises(Exception) as exc_info:
                await client.call_tool(
                    "write_note",
                    {
                        "title": "Should Fail",
                        "folder": "test",
                        "content": "# Should Fail\n\nThis should fail because no project specified.",
                    },
                )

            # Should get an error about missing project
            error_message = str(exc_info.value)
            assert (
                "No project specified" in error_message
                or "project parameter" in error_message.lower()
            )


@pytest.mark.asyncio
async def test_cli_constraint_overrides_default_project_mode(
    mcp_server, app, test_project, config_home, engine_factory
):
    """Test that CLI --project constraint overrides default_project_mode."""

    # Create a different project for CLI constraint
    engine, session_maker = engine_factory
    from basic_memory.repository.project_repository import ProjectRepository

    project_repository = ProjectRepository(session_maker)

    other_project = await project_repository.create(
        {
            "name": "cli-project",
            "description": "Project for CLI constraint testing",
            "path": str(config_home / "cli-project"),
            "is_active": True,
            "is_default": False,
        }
    )

    # Set up CLI project constraint (highest priority)
    os.environ["BASIC_MEMORY_MCP_PROJECT"] = other_project.name

    # Mock config with default_project_mode enabled pointing to test_project
    mock_config = BasicMemoryConfig(
        default_project=test_project.name,
        default_project_mode=True,
        projects={test_project.name: test_project.path, other_project.name: other_project.path},
    )

    try:
        with patch.object(ConfigManager, "config", mock_config):
            async with Client(mcp_server) as client:
                # Call write_note without project parameter
                result = await client.call_tool(
                    "write_note",
                    {
                        "title": "CLI Constraint Test",
                        "folder": "test",
                        "content": "# CLI Constraint Test\n\nThis should use CLI constrained project.",
                    },
                )

                assert len(result.content) == 1
                response_text = result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]

                # Should use CLI constrained project, not default project
                assert f"project: {other_project.name}" in response_text
                assert "# Created note" in response_text
                assert f"[Session: Using project '{other_project.name}']" in response_text

    finally:
        # Clean up environment variable
        if "BASIC_MEMORY_MCP_PROJECT" in os.environ:
            del os.environ["BASIC_MEMORY_MCP_PROJECT"]


@pytest.mark.asyncio
async def test_default_project_mode_read_note(mcp_server, app, test_project):
    """Test that read_note works with default_project_mode."""

    # Mock config with default_project_mode enabled
    mock_config = BasicMemoryConfig(
        default_project=test_project.name,
        default_project_mode=True,
        projects={test_project.name: test_project.path},
    )

    with patch.object(ConfigManager, "config", mock_config):
        async with Client(mcp_server) as client:
            # First create a note
            await client.call_tool(
                "write_note",
                {
                    "title": "Read Test Note",
                    "folder": "test",
                    "content": "# Read Test Note\n\nThis note will be read back.",
                },
            )

            # Now read it back without specifying project
            result = await client.call_tool(
                "read_note",
                {
                    "identifier": "Read Test Note",
                },
            )

            assert len(result.content) == 1
            response_text = result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]

            # Should successfully read the note
            assert "# Read Test Note" in response_text
            assert "This note will be read back." in response_text


@pytest.mark.asyncio
async def test_default_project_mode_edit_note(mcp_server, app, test_project):
    """Test that edit_note works with default_project_mode."""

    # Mock config with default_project_mode enabled
    mock_config = BasicMemoryConfig(
        default_project=test_project.name,
        default_project_mode=True,
        projects={test_project.name: test_project.path},
    )

    with patch.object(ConfigManager, "config", mock_config):
        async with Client(mcp_server) as client:
            # First create a note
            await client.call_tool(
                "write_note",
                {
                    "title": "Edit Test Note",
                    "folder": "test",
                    "content": "# Edit Test Note\n\nOriginal content.",
                },
            )

            # Now edit it without specifying project
            result = await client.call_tool(
                "edit_note",
                {
                    "identifier": "Edit Test Note",
                    "operation": "append",
                    "content": "\n\n## Added Content\n\nThis was added via edit_note.",
                },
            )

            assert len(result.content) == 1
            response_text = result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]

            # Should successfully edit the note
            assert "# Edited note" in response_text
            assert "operation: Added" in response_text


@pytest.mark.asyncio
async def test_project_resolution_hierarchy(
    mcp_server, app, test_project, config_home, engine_factory
):
    """Test the complete three-tier project resolution hierarchy."""

    # Create projects for testing
    engine, session_maker = engine_factory
    from basic_memory.repository.project_repository import ProjectRepository

    project_repository = ProjectRepository(session_maker)

    default_project = test_project
    cli_project = await project_repository.create(
        {
            "name": "cli-hierarchy-project",
            "description": "Project for CLI hierarchy testing",
            "path": str(config_home / "cli-hierarchy-project"),
            "is_active": True,
            "is_default": False,
        }
    )
    explicit_project = await project_repository.create(
        {
            "name": "explicit-hierarchy-project",
            "description": "Project for explicit hierarchy testing",
            "path": str(config_home / "explicit-hierarchy-project"),
            "is_active": True,
            "is_default": False,
        }
    )

    # Mock config with default_project_mode enabled
    mock_config = BasicMemoryConfig(
        default_project=default_project.name,
        default_project_mode=True,
        projects={
            default_project.name: Path(default_project.path).as_posix(),
            cli_project.name: Path(cli_project.path).as_posix(),
            explicit_project.name: Path(explicit_project.path).as_posix(),
        },
    )

    # Test 1: CLI constraint (highest priority)
    os.environ["BASIC_MEMORY_MCP_PROJECT"] = cli_project.name

    try:
        with patch.object(ConfigManager, "config", mock_config):
            async with Client(mcp_server) as client:
                result = await client.call_tool(
                    "write_note",
                    {
                        "title": "CLI Priority Test",
                        "folder": "test",
                        "content": "# CLI Priority Test",
                        "project": explicit_project.name,  # Should be ignored
                    },
                )
                response_text = result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]
                assert f"project: {cli_project.name}" in response_text

    finally:
        del os.environ["BASIC_MEMORY_MCP_PROJECT"]

    # Test 2: Explicit project (medium priority)
    with patch.object(ConfigManager, "config", mock_config):
        async with Client(mcp_server) as client:
            result = await client.call_tool(
                "write_note",
                {
                    "title": "Explicit Priority Test",
                    "folder": "test",
                    "content": "# Explicit Priority Test",
                    "project": explicit_project.name,
                },
            )
            response_text = result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]
            assert f"project: {explicit_project.name}" in response_text

    # Test 3: Default project (lowest priority)
    with patch.object(ConfigManager, "config", mock_config):
        async with Client(mcp_server) as client:
            result = await client.call_tool(
                "write_note",
                {
                    "title": "Default Priority Test",
                    "folder": "test",
                    "content": "# Default Priority Test",
                    # No project specified
                },
            )
            response_text = result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]
            assert f"project: {default_project.name}" in response_text

```

--------------------------------------------------------------------------------
/specs/SPEC-9-1 Follow-Ups- Conflict, Sync, and Observability.md:
--------------------------------------------------------------------------------

```markdown
---
title: 'SPEC-9-1 Follow-Ups: Conflict, Sync, and Observability'
type: tasklist
permalink: specs/spec-9-follow-ups-conflict-sync-and-observability
related: specs/spec-9-multi-project-bisync
status: revised
revision_date: 2025-10-03
---

# SPEC-9-1 Follow-Ups: Conflict, Sync, and Observability

**REVISED 2025-10-03:** Simplified to leverage rclone built-ins instead of custom conflict handling.

**Context:** SPEC-9 delivered multi-project bidirectional sync and a unified CLI. This follow-up focuses on **observability and safety** using rclone's built-in capabilities rather than reinventing conflict handling.

**Design Philosophy: "Be Dumb Like Git"**
- Let rclone bisync handle conflict detection (it already does this)
- Make conflicts visible and recoverable, don't prevent them
- Cloud is always the winner on conflict (cloud-primary model)
- Users who want version history can just use Git locally in their sync directory

**What Changed from Original Version:**
- **Replaced:** Custom `.bmmeta` sidecars → Use rclone's `.bisync/` state tracking
- **Replaced:** Custom conflict detection → Use rclone bisync 3-way merge
- **Replaced:** Tombstone files → rclone delete tracking handles this
- **Replaced:** Distributed lease → Local process lock only (document multi-device warning)
- **Replaced:** S3 versioning service → Users just use Git locally if they want history
- **Deferred:** SPEC-14 Git integration → Postponed to teams/multi-user features

## ✅ Now 
- [ ] **Local process lock**: Prevent concurrent bisync runs on same device (`~/.basic-memory/sync.lock`)
- [ ] **Structured sync reports**: Parse rclone bisync output into JSON reports (creates/updates/deletes/conflicts, bytes, duration); `bm sync --report`
- [ ] **Multi-device warning**: Document that users should not run `--watch` on multiple devices simultaneously
- [ ] **Version control guidance**: Document pattern for users to use Git locally in their sync directory if they want version history
- [ ] **Docs polish**: cloud-mode toggle, mount↔bisync directory isolation, conflict semantics, quick start, migration guide, short demo clip/GIF

## 🔜 Next
- [ ] **Observability commands**: `bm conflicts list`, `bm sync history` to view sync reports and conflicts
- [ ] **Conflict resolution UI**: `bm conflicts resolve <file>` to interactively pick winner from conflict files
- [ ] **Selective sync**: allow include/exclude by project; per-project profile (safe/balanced/fast)

## 🧭 Later
- [ ] **Near real-time sync**: File watcher → targeted `rclone copy` for individual files (keep bisync as backstop)
- [ ] **Sharing / scoped tokens**: cross-tenant/project access
- [ ] **Bandwidth controls & backpressure**: policy for large repos
- [ ] **Client-side encryption (optional)**: with clear trade-offs

## 📏 Acceptance criteria (for "Now" items)
- [ ] Local process lock prevents concurrent bisync runs on same device
- [ ] rclone bisync conflict files visible and documented (`file.conflict1.md`, `file.conflict2.md`)
- [ ] `bm sync --report` generates parsable JSON with sync statistics
- [ ] Documentation clearly warns about multi-device `--watch` mode
- [ ] Documentation shows users how to use Git locally for version history

## What We're NOT Building (Deferred to rclone)
- ❌ Custom `.bmmeta` sidecars (rclone tracks state in `.bisync/` workdir)
- ❌ Custom conflict detection (rclone bisync already does 3-way merge detection)
- ❌ Tombstone files (S3 versioning + rclone delete tracking handles this)
- ❌ Distributed lease (low probability issue, rclone detects state divergence)
- ❌ Rename/move tracking (rclone has size+modtime heuristics built-in)

## Implementation Summary

**Current State (SPEC-9):**
- ✅ rclone bisync with 3 profiles (safe/balanced/fast)
- ✅ `--max-delete` safety limits (10/25/50 files)
- ✅ `--conflict-resolve=newer` for auto-resolution
- ✅ Watch mode: `bm sync --watch` (60s intervals)
- ✅ Integrity checking: `bm cloud check`
- ✅ Mount vs bisync directory isolation

**What's Needed (This Spec):**
1. **Process lock** - Simple file-based lock in `~/.basic-memory/sync.lock`
2. **Sync reports** - Parse rclone output, save to `~/.basic-memory/sync-history/`
3. **Documentation** - Multi-device warnings, conflict resolution workflow, Git usage pattern

**User Model:**
- Cloud is always the winner on conflict (cloud-primary)
- rclone creates `.conflict` files for divergent edits
- Users who want version history just use Git in their local sync directory
- Users warned: don't run `--watch` on multiple devices

## Decision Rationale & Trade-offs

### Why Trust rclone Instead of Custom Conflict Handling?

**rclone bisync already provides:**
- 3-way merge detection (compares local, remote, and last-known state)
- File state tracking in `.bisync/` workdir (hashes, modtimes)
- Automatic conflict file creation: `file.conflict1.md`, `file.conflict2.md`
- Rename detection via size+modtime heuristics
- Delete tracking (prevents resurrection of deleted files)
- Battle-tested with extensive edge case handling

**What we'd have to build with custom approach:**
- Per-file metadata tracking (`.bmmeta` sidecars)
- 3-way diff algorithm
- Conflict detection logic
- Tombstone files for deletes
- Rename/move detection
- Testing for all edge cases

**Decision:** Use what rclone already does well. Don't reinvent the wheel.

### Why Let Users Use Git Locally Instead of Building Versioning?

**The simplest solution: Just use Git**

Users who want version history can literally just use Git in their sync directory:

```bash
cd ~/basic-memory-cloud-sync/
git init
git add .
git commit -m "backup"

# Push to their own GitHub if they want
git remote add origin [email protected]:user/my-knowledge.git
git push
```

**Why this is perfect:**
- ✅ We build nothing
- ✅ Users who want Git... just use Git
- ✅ Users who don't care... don't need to
- ✅ rclone bisync already handles sync conflicts
- ✅ Users own their data, they can version it however they want (Git, Time Machine, etc.)

**What we'd have to build for S3 versioning:**
- API to enable versioning on Tigris buckets
  - **Problem**: Tigris doesn't support S3 bucket versioning
- Restore commands: `bm cloud restore --version-id`
- Version listing: `bm cloud versions <path>`
- Lifecycle policies for version retention
- Documentation and user education

**What we'd have to build for SPEC-14 Git integration:**
- Committer service (daemon watching `/app/data/`)
- Puller service (webhook handler for GitHub pushes)
- Git LFS for large files
- Loop prevention between Git ↔ bisync ↔ local
- Merge conflict handling at TWO layers (rclone + Git)
- Webhook infrastructure and monitoring

**Decision:** Don't build version control. Document the pattern. "The easiest problem to solve is the one you avoid."

**When to revisit:** Teams/multi-user features where server-side version control becomes necessary for collaboration.

### Why No Distributed Lease?

**Low probability issue:**
- Requires user to manually run `bm sync` on multiple devices at exact same time
- Most users run `--watch` on one primary device
- rclone bisync detects state divergence and fails safely

**Safety nets in place:**
- Local process lock prevents concurrent runs on same device
- rclone bisync aborts if bucket state changed during sync
- S3 versioning recovers from any overwrites
- Documentation warns against multi-device `--watch`

**Failure mode:**
```bash
# Device A and B sync simultaneously
Device A: bm sync → succeeds
Device B: bm sync → "Error: path has changed, run --resync"

# User fixes with resync
Device B: bm sync --resync → establishes new baseline
```

**Decision:** Document the issue, add local lock, defer distributed coordination until users report actual problems.

### Cloud-Primary Conflict Model

**User mental model:**
- Cloud is the source of truth (like Dropbox/iCloud)
- Local is working copy
- On conflict: cloud wins, local edits → `.conflict` file
- User manually picks winner

**Why this works:**
- Simpler than bidirectional merge (no automatic resolution risk)
- Matches user expectations from Dropbox
- S3 versioning provides safety net for overwrites
- Clear recovery path: restore from S3 version if needed

**Example workflow:**
```bash
# Edit file on Device A and Device B while offline
# Both devices come online and sync

Device A: bm sync
# → Pushes to cloud first, becomes canonical version

Device B: bm sync
# → Detects conflict
# → Cloud version: work/notes.md
# → Local version: work/notes.md.conflict1
# → User manually merges or picks winner

# Restore if needed
bm cloud restore work/notes.md --version-id abc123
```

## Implementation Details

### 1. Local Process Lock

```python
# ~/.basic-memory/sync.lock
import os
import psutil
from pathlib import Path

class SyncLock:
    def __init__(self):
        self.lock_file = Path.home() / '.basic-memory' / 'sync.lock'

    def acquire(self):
        if self.lock_file.exists():
            pid = int(self.lock_file.read_text())
            if psutil.pid_exists(pid):
                raise BisyncError(
                    f"Sync already running (PID {pid}). "
                    f"Wait for completion or kill stale process."
                )
            # Stale lock, remove it
            self.lock_file.unlink()

        self.lock_file.write_text(str(os.getpid()))

    def release(self):
        if self.lock_file.exists():
            self.lock_file.unlink()

    def __enter__(self):
        self.acquire()
        return self

    def __exit__(self, *args):
        self.release()

# Usage
with SyncLock():
    run_rclone_bisync()
```

### 3. Sync Report Parsing

```python
# Parse rclone bisync output
import json
from datetime import datetime
from pathlib import Path

def parse_sync_report(rclone_output: str, duration: float, exit_code: int) -> dict:
    """Parse rclone bisync output into structured report."""

    # rclone bisync outputs lines like:
    # "Synching Path1 /local/path with Path2 remote:bucket"
    # "- Path1    File was copied to Path2"
    # "Bisync successful"

    report = {
        "timestamp": datetime.now().isoformat(),
        "duration_seconds": duration,
        "exit_code": exit_code,
        "success": exit_code == 0,
        "files_created": 0,
        "files_updated": 0,
        "files_deleted": 0,
        "conflicts": [],
        "errors": []
    }

    for line in rclone_output.split('\n'):
        if 'was copied to' in line:
            report['files_created'] += 1
        elif 'was updated in' in line:
            report['files_updated'] += 1
        elif 'was deleted from' in line:
            report['files_deleted'] += 1
        elif '.conflict' in line:
            report['conflicts'].append(line.strip())
        elif 'ERROR' in line:
            report['errors'].append(line.strip())

    return report

def save_sync_report(report: dict):
    """Save sync report to history."""
    history_dir = Path.home() / '.basic-memory' / 'sync-history'
    history_dir.mkdir(parents=True, exist_ok=True)

    timestamp = datetime.now().strftime('%Y%m%d-%H%M%S')
    report_file = history_dir / f'{timestamp}.json'

    report_file.write_text(json.dumps(report, indent=2))

# Usage in run_bisync()
start_time = time.time()
result = subprocess.run(bisync_cmd, capture_output=True, text=True)
duration = time.time() - start_time

report = parse_sync_report(result.stdout, duration, result.returncode)
save_sync_report(report)

if report['conflicts']:
    console.print(f"[yellow]⚠ {len(report['conflicts'])} conflict(s) detected[/yellow]")
    console.print("[dim]Run 'bm conflicts list' to view[/dim]")
```

### 4. User Commands

```bash
# View sync history
bm sync history
# → Lists recent syncs from ~/.basic-memory/sync-history/*.json
# → Shows: timestamp, duration, files changed, conflicts, errors

# View current conflicts
bm conflicts list
# → Scans sync directory for *.conflict* files
# → Shows: file path, conflict versions, timestamps

# Restore from S3 version
bm cloud restore work/notes.md --version-id abc123
# → Uses aws s3api get-object with version-id
# → Downloads to original path

bm cloud restore work/notes.md --timestamp "2025-10-03 14:30"
# → Lists versions, finds closest to timestamp
# → Downloads that version

# List file versions
bm cloud versions work/notes.md
# → Uses aws s3api list-object-versions
# → Shows: version-id, timestamp, size, author

# Interactive conflict resolution
bm conflicts resolve work/notes.md
# → Shows both versions side-by-side
# → Prompts: Keep local, keep cloud, merge manually, restore from S3 version
# → Cleans up .conflict files after resolution
```

## Success Metrics & Monitoring

**Phase 1 (v1) - Basic Safety:**
- [ ] Conflict detection rate < 5% of syncs (measure in telemetry)
- [ ] User can resolve conflicts within 5 minutes (UX testing)
- [ ] Documentation prevents 90% of multi-device issues

**Phase 2 (v2) - Observability:**
- [ ] 80% of users check `bm sync history` when troubleshooting
- [ ] Average time to restore from S3 version < 2 minutes
- 
- [ ] Conflict resolution success rate > 95%

**What to measure:**
```python
# Telemetry in sync reports
{
    "conflict_rate": conflicts / total_syncs,
    "multi_device_collisions": count_state_divergence_errors,
    "version_restores": count_restore_operations,
    "avg_sync_duration": sum(durations) / count,
    "max_delete_trips": count_max_delete_aborts
}
```

**When to add distributed lease:**
- Multi-device collision rate > 5% of syncs
- User complaints about state divergence errors
- Evidence that local lock isn't sufficient

**When to revisit Git (SPEC-14):**
- Teams feature launches (multi-user collaboration)
- Users request commit messages / audit trail
- PR-based review workflow becomes valuable

## Links
- SPEC-9: `specs/spec-9-multi-project-bisync`
- SPEC-14: `specs/spec-14-cloud-git-versioning` (deferred in favor of S3 versioning)
- rclone bisync docs: https://rclone.org/bisync/
- Tigris S3 versioning: https://www.tigrisdata.com/docs/buckets/versioning/

---
**Owner:** <assign>  |  **Review cadence:** weekly in standup  |  **Last updated:** 2025-10-03

```

--------------------------------------------------------------------------------
/tests/api/test_resource_router.py:
--------------------------------------------------------------------------------

```python
"""Tests for resource router endpoints."""

import json
from datetime import datetime, timezone
from pathlib import Path

import pytest

from basic_memory.schemas import EntityResponse
from basic_memory.utils import normalize_newlines


@pytest.mark.asyncio
async def test_get_resource_content(client, project_config, entity_repository, project_url):
    """Test getting content by permalink."""
    # Create a test file
    content = "# Test Content\n\nThis is a test file."
    test_file = Path(project_config.home) / "test" / "test.md"
    test_file.parent.mkdir(parents=True, exist_ok=True)
    test_file.write_text(content)

    # Create entity referencing the file
    entity = await entity_repository.create(
        {
            "title": "Test Entity",
            "entity_type": "test",
            "permalink": "test/test",
            "file_path": "test/test.md",  # Relative to config.home
            "content_type": "text/markdown",
            "created_at": datetime.now(timezone.utc),
            "updated_at": datetime.now(timezone.utc),
        }
    )

    # Test getting the content
    response = await client.get(f"{project_url}/resource/{entity.permalink}")
    assert response.status_code == 200
    assert response.headers["content-type"] == "text/markdown; charset=utf-8"
    assert response.text == normalize_newlines(content)


@pytest.mark.asyncio
async def test_get_resource_pagination(client, project_config, entity_repository, project_url):
    """Test getting content by permalink with pagination."""
    # Create a test file
    content = "# Test Content\n\nThis is a test file."
    test_file = Path(project_config.home) / "test" / "test.md"
    test_file.parent.mkdir(parents=True, exist_ok=True)
    test_file.write_text(content)

    # Create entity referencing the file
    entity = await entity_repository.create(
        {
            "title": "Test Entity",
            "entity_type": "test",
            "permalink": "test/test",
            "file_path": "test/test.md",  # Relative to config.home
            "content_type": "text/markdown",
            "created_at": datetime.now(timezone.utc),
            "updated_at": datetime.now(timezone.utc),
        }
    )

    # Test getting the content
    response = await client.get(
        f"{project_url}/resource/{entity.permalink}", params={"page": 1, "page_size": 1}
    )
    assert response.status_code == 200
    assert response.headers["content-type"] == "text/markdown; charset=utf-8"
    assert response.text == normalize_newlines(content)


@pytest.mark.asyncio
async def test_get_resource_by_title(client, project_config, entity_repository, project_url):
    """Test getting content by permalink."""
    # Create a test file
    content = "# Test Content\n\nThis is a test file."
    test_file = Path(project_config.home) / "test" / "test.md"
    test_file.parent.mkdir(parents=True, exist_ok=True)
    test_file.write_text(content)

    # Create entity referencing the file
    entity = await entity_repository.create(
        {
            "title": "Test Entity",
            "entity_type": "test",
            "permalink": "test/test",
            "file_path": "test/test.md",  # Relative to config.home
            "content_type": "text/markdown",
            "created_at": datetime.now(timezone.utc),
            "updated_at": datetime.now(timezone.utc),
        }
    )

    # Test getting the content
    response = await client.get(f"{project_url}/resource/{entity.title}")
    assert response.status_code == 200


@pytest.mark.asyncio
async def test_get_resource_missing_entity(client, project_url):
    """Test 404 when entity doesn't exist."""
    response = await client.get(f"{project_url}/resource/does/not/exist")
    assert response.status_code == 404
    assert "Resource not found" in response.json()["detail"]


@pytest.mark.asyncio
async def test_get_resource_missing_file(client, project_config, entity_repository, project_url):
    """Test 404 when file doesn't exist."""
    # Create entity referencing non-existent file
    entity = await entity_repository.create(
        {
            "title": "Missing File",
            "entity_type": "test",
            "permalink": "test/missing",
            "file_path": "test/missing.md",
            "content_type": "text/markdown",
            "created_at": datetime.now(timezone.utc),
            "updated_at": datetime.now(timezone.utc),
        }
    )

    response = await client.get(f"{project_url}/resource/{entity.permalink}")
    assert response.status_code == 404
    assert "File not found" in response.json()["detail"]


@pytest.mark.asyncio
async def test_get_resource_observation(client, project_config, entity_repository, project_url):
    """Test getting content by observation permalink."""
    # Create entity
    content = "# Test Content\n\n- [note] an observation."
    data = {
        "title": "Test Entity",
        "folder": "test",
        "entity_type": "test",
        "content": f"{content}",
    }
    response = await client.post(f"{project_url}/knowledge/entities", json=data)
    entity_response = response.json()
    entity = EntityResponse(**entity_response)

    assert len(entity.observations) == 1
    observation = entity.observations[0]

    # Test getting the content via the observation
    response = await client.get(f"{project_url}/resource/{observation.permalink}")
    assert response.status_code == 200
    assert response.headers["content-type"] == "text/markdown; charset=utf-8"
    assert (
        normalize_newlines(
            """
---
title: Test Entity
type: test
permalink: test/test-entity
---

# Test Content

- [note] an observation.
    """.strip()
        )
        in response.text
    )


@pytest.mark.asyncio
async def test_get_resource_entities(client, project_config, entity_repository, project_url):
    """Test getting content by permalink match."""
    # Create entity
    content1 = "# Test Content\n"
    data = {
        "title": "Test Entity",
        "folder": "test",
        "entity_type": "test",
        "content": f"{content1}",
    }
    response = await client.post(f"{project_url}/knowledge/entities", json=data)
    entity_response = response.json()
    entity1 = EntityResponse(**entity_response)

    content2 = "# Related Content\n- links to [[Test Entity]]"
    data = {
        "title": "Related Entity",
        "folder": "test",
        "entity_type": "test",
        "content": f"{content2}",
    }
    response = await client.post(f"{project_url}/knowledge/entities", json=data)
    entity_response = response.json()
    entity2 = EntityResponse(**entity_response)

    assert len(entity2.relations) == 1

    # Test getting the content via the relation
    response = await client.get(f"{project_url}/resource/test/*")
    assert response.status_code == 200
    assert response.headers["content-type"] == "text/markdown; charset=utf-8"
    assert (
        normalize_newlines(
            f"""
--- memory://test/test-entity {entity1.updated_at.isoformat()} {entity1.checksum[:8]}

# Test Content

--- memory://test/related-entity {entity2.updated_at.isoformat()} {entity2.checksum[:8]}

# Related Content
- links to [[Test Entity]]

    """.strip()
        )
        in response.text
    )


@pytest.mark.asyncio
async def test_get_resource_entities_pagination(
    client, project_config, entity_repository, project_url
):
    """Test getting content by permalink match."""
    # Create entity
    content1 = "# Test Content\n"
    data = {
        "title": "Test Entity",
        "folder": "test",
        "entity_type": "test",
        "content": f"{content1}",
    }
    response = await client.post(f"{project_url}/knowledge/entities", json=data)
    entity_response = response.json()
    entity1 = EntityResponse(**entity_response)
    assert entity1

    content2 = "# Related Content\n- links to [[Test Entity]]"
    data = {
        "title": "Related Entity",
        "folder": "test",
        "entity_type": "test",
        "content": f"{content2}",
    }
    response = await client.post(f"{project_url}/knowledge/entities", json=data)
    entity_response = response.json()
    entity2 = EntityResponse(**entity_response)

    assert len(entity2.relations) == 1

    # Test getting second result
    response = await client.get(
        f"{project_url}/resource/test/*", params={"page": 2, "page_size": 1}
    )
    assert response.status_code == 200
    assert response.headers["content-type"] == "text/markdown; charset=utf-8"
    assert (
        normalize_newlines(
            """
---
title: Related Entity
type: test
permalink: test/related-entity
---

# Related Content
- links to [[Test Entity]]
""".strip()
        )
        in response.text
    )


@pytest.mark.asyncio
async def test_get_resource_relation(client, project_config, entity_repository, project_url):
    """Test getting content by relation permalink."""
    # Create entity
    content1 = "# Test Content\n"
    data = {
        "title": "Test Entity",
        "folder": "test",
        "entity_type": "test",
        "content": f"{content1}",
    }
    response = await client.post(f"{project_url}/knowledge/entities", json=data)
    entity_response = response.json()
    entity1 = EntityResponse(**entity_response)

    content2 = "# Related Content\n- links to [[Test Entity]]"
    data = {
        "title": "Related Entity",
        "folder": "test",
        "entity_type": "test",
        "content": f"{content2}",
    }
    response = await client.post(f"{project_url}/knowledge/entities", json=data)
    entity_response = response.json()
    entity2 = EntityResponse(**entity_response)

    assert len(entity2.relations) == 1
    relation = entity2.relations[0]

    # Test getting the content via the relation
    response = await client.get(f"{project_url}/resource/{relation.permalink}")
    assert response.status_code == 200
    assert response.headers["content-type"] == "text/markdown; charset=utf-8"
    assert (
        normalize_newlines(
            f"""
--- memory://test/test-entity {entity1.updated_at.isoformat()} {entity1.checksum[:8]}

# Test Content

--- memory://test/related-entity {entity2.updated_at.isoformat()} {entity2.checksum[:8]}

# Related Content
- links to [[Test Entity]]
    
    """.strip()
        )
        in response.text
    )


@pytest.mark.asyncio
async def test_put_resource_new_file(
    client, project_config, entity_repository, search_repository, project_url
):
    """Test creating a new file via PUT."""
    # Test data
    file_path = "visualizations/test.canvas"
    canvas_data = {
        "nodes": [
            {
                "id": "node1",
                "type": "text",
                "text": "Test node content",
                "x": 100,
                "y": 200,
                "width": 400,
                "height": 300,
            }
        ],
        "edges": [],
    }

    # Make sure the file doesn't exist yet
    full_path = Path(project_config.home) / file_path
    if full_path.exists():
        full_path.unlink()

    # Execute PUT request
    response = await client.put(
        f"{project_url}/resource/{file_path}", json=json.dumps(canvas_data, indent=2)
    )

    # Verify response
    assert response.status_code == 201
    response_data = response.json()
    assert response_data["file_path"] == file_path
    assert "checksum" in response_data
    assert "size" in response_data

    # Verify file was created
    full_path = Path(project_config.home) / file_path
    assert full_path.exists()

    # Verify file content
    file_content = full_path.read_text(encoding="utf-8")
    assert json.loads(file_content) == canvas_data

    # Verify entity was created in DB
    entity = await entity_repository.get_by_file_path(file_path)
    assert entity is not None
    assert entity.entity_type == "canvas"
    assert entity.content_type == "application/json"

    # Verify entity was indexed for search
    search_results = await search_repository.search(title="test.canvas")
    assert len(search_results) > 0


@pytest.mark.asyncio
async def test_put_resource_update_existing(client, project_config, entity_repository, project_url):
    """Test updating an existing file via PUT."""
    # Create an initial file and entity
    file_path = "visualizations/update-test.canvas"
    full_path = Path(project_config.home) / file_path
    full_path.parent.mkdir(parents=True, exist_ok=True)

    initial_data = {
        "nodes": [
            {
                "id": "initial",
                "type": "text",
                "text": "Initial content",
                "x": 0,
                "y": 0,
                "width": 200,
                "height": 100,
            }
        ],
        "edges": [],
    }
    full_path.write_text(json.dumps(initial_data))

    # Create the initial entity
    initial_entity = await entity_repository.create(
        {
            "title": "update-test.canvas",
            "entity_type": "canvas",
            "file_path": file_path,
            "content_type": "application/json",
            "checksum": "initial123",
            "created_at": datetime.now(timezone.utc),
            "updated_at": datetime.now(timezone.utc),
        }
    )

    # New data for update
    updated_data = {
        "nodes": [
            {
                "id": "updated",
                "type": "text",
                "text": "Updated content",
                "x": 100,
                "y": 100,
                "width": 300,
                "height": 200,
            }
        ],
        "edges": [],
    }

    # Execute PUT request to update
    response = await client.put(
        f"{project_url}/resource/{file_path}", json=json.dumps(updated_data, indent=2)
    )

    # Verify response
    assert response.status_code == 200

    # Verify file was updated
    updated_content = full_path.read_text(encoding="utf-8")
    assert json.loads(updated_content) == updated_data

    # Verify entity was updated
    updated_entity = await entity_repository.get_by_file_path(file_path)
    assert updated_entity.id == initial_entity.id  # Same entity, updated
    assert updated_entity.checksum != initial_entity.checksum  # Checksum changed

```

--------------------------------------------------------------------------------
/tests/mcp/test_permalink_collision_file_overwrite.py:
--------------------------------------------------------------------------------

```python
"""Tests for permalink collision file overwrite bug discovered in live testing.

This test reproduces a critical data loss bug where creating notes with
titles that normalize to different permalinks but resolve to the same
file location causes silent file overwrites without warning.

Related to GitHub Issue #139 but tests a different aspect - not database
UNIQUE constraints, but actual file overwrite behavior.

Example scenario from live testing:
1. Create "Node A" → file: edge-cases/Node A.md, permalink: edge-cases/node-a
2. Create "Node C" → file: edge-cases/Node C.md, permalink: edge-cases/node-c
3. BUG: Node C creation overwrites edge-cases/Node A.md file content
4. Result: File "Node A.md" exists but contains "Node C" content
"""

import pytest
from pathlib import Path
from textwrap import dedent

from basic_memory.mcp.tools import write_note, read_note
from basic_memory.sync.sync_service import SyncService
from basic_memory.config import ProjectConfig
from basic_memory.services import EntityService


async def force_full_scan(sync_service: SyncService) -> None:
    """Force next sync to do a full scan by clearing watermark (for testing moves/deletions)."""
    if sync_service.entity_repository.project_id is not None:
        project = await sync_service.project_repository.find_by_id(
            sync_service.entity_repository.project_id
        )
        if project:
            await sync_service.project_repository.update(
                project.id,
                {
                    "last_scan_timestamp": None,
                    "last_file_count": None,
                },
            )


@pytest.mark.asyncio
async def test_permalink_collision_should_not_overwrite_different_file(app, test_project):
    """Test that creating notes with different titles doesn't overwrite existing files.

    This test reproduces the critical bug discovered in Phase 4 of live testing where:
    - Creating "Node A" worked fine
    - Creating "Node C" silently overwrote Node A.md's content
    - No warning or error was shown to the user
    - Original Node A content was permanently lost

    Expected behavior:
    - Each note with a different title should create/update its own file
    - No silent overwrites should occur
    - Files should maintain their distinct content

    Current behavior (BUG):
    - Second note creation sometimes overwrites first note's file
    - File "Node A.md" contains "Node C" content after creating Node C
    - Data loss occurs without user warning
    """
    # Step 1: Create first note "Node A"
    result_a = await write_note.fn(
        project=test_project.name,
        title="Node A",
        folder="edge-cases",
        content="# Node A\n\nOriginal content for Node A\n\n## Relations\n- links_to [[Node B]]",
    )

    assert "# Created note" in result_a
    assert "file_path: edge-cases/Node A.md" in result_a
    assert "permalink: edge-cases/node-a" in result_a

    # Verify Node A content via read
    content_a = await read_note.fn("edge-cases/node-a", project=test_project.name)
    assert "Node A" in content_a
    assert "Original content for Node A" in content_a

    # Step 2: Create second note "Node B" (should be independent)
    result_b = await write_note.fn(
        project=test_project.name,
        title="Node B",
        folder="edge-cases",
        content="# Node B\n\nContent for Node B",
    )

    assert "# Created note" in result_b
    assert "file_path: edge-cases/Node B.md" in result_b
    assert "permalink: edge-cases/node-b" in result_b

    # Step 3: Create third note "Node C" (this is where the bug occurs)
    result_c = await write_note.fn(
        project=test_project.name,
        title="Node C",
        folder="edge-cases",
        content="# Node C\n\nContent for Node C\n\n## Relations\n- links_to [[Node A]]",
    )

    assert "# Created note" in result_c
    assert "file_path: edge-cases/Node C.md" in result_c
    assert "permalink: edge-cases/node-c" in result_c

    # CRITICAL CHECK: Verify Node A still has its original content
    # This is where the bug manifests - Node A.md gets overwritten with Node C content
    content_a_after = await read_note.fn("edge-cases/node-a", project=test_project.name)
    assert "Node A" in content_a_after, "Node A title should still be 'Node A'"
    assert "Original content for Node A" in content_a_after, (
        "Node A file should NOT be overwritten by Node C creation"
    )
    assert "Content for Node C" not in content_a_after, "Node A should NOT contain Node C's content"

    # Verify Node C has its own content
    content_c = await read_note.fn("edge-cases/node-c", project=test_project.name)
    assert "Node C" in content_c
    assert "Content for Node C" in content_c
    assert "Original content for Node A" not in content_c, (
        "Node C should not contain Node A's content"
    )

    # Verify files physically exist with correct content
    project_path = Path(test_project.path)
    node_a_file = project_path / "edge-cases" / "Node A.md"
    node_c_file = project_path / "edge-cases" / "Node C.md"

    assert node_a_file.exists(), "Node A.md file should exist"
    assert node_c_file.exists(), "Node C.md file should exist"

    # Read actual file contents to verify no overwrite occurred
    node_a_file_content = node_a_file.read_text()
    node_c_file_content = node_c_file.read_text()

    assert "Node A" in node_a_file_content, "Physical file Node A.md should contain Node A title"
    assert "Original content for Node A" in node_a_file_content, (
        "Physical file Node A.md should contain original Node A content"
    )
    assert "Content for Node C" not in node_a_file_content, (
        "Physical file Node A.md should NOT contain Node C content"
    )

    assert "Node C" in node_c_file_content, "Physical file Node C.md should contain Node C title"
    assert "Content for Node C" in node_c_file_content, (
        "Physical file Node C.md should contain Node C content"
    )


@pytest.mark.asyncio
async def test_notes_with_similar_titles_maintain_separate_files(app, test_project):
    """Test that notes with similar titles that normalize differently don't collide.

    Tests additional edge cases around permalink normalization to ensure
    we don't have collision issues with various title patterns.
    """
    # Create notes with titles that could potentially cause issues
    titles_and_folders = [
        ("My Note", "test"),
        ("My-Note", "test"),  # Different title, similar permalink
        ("My_Note", "test"),  # Underscore vs hyphen
        ("my note", "test"),  # Case variation
    ]

    created_permalinks = []

    for title, folder in titles_and_folders:
        result = await write_note.fn(
            project=test_project.name,
            title=title,
            folder=folder,
            content=f"# {title}\n\nUnique content for {title}",
        )

        permalink = None
        # Extract permalink from result
        for line in result.split("\n"):
            if line.startswith("permalink:"):
                permalink = line.split(":", 1)[1].strip()
                created_permalinks.append((title, permalink))
                break

        # Verify each note can be read back with its own content
        content = await read_note.fn(permalink, project=test_project.name)
        assert f"Unique content for {title}" in content, (
            f"Note with title '{title}' should maintain its unique content"
        )

    # Verify all created permalinks are tracked
    assert len(created_permalinks) == len(titles_and_folders), (
        "All notes should be created successfully"
    )


@pytest.mark.asyncio
async def test_sequential_note_creation_preserves_all_files(app, test_project):
    """Test that rapid sequential note creation doesn't cause file overwrites.

    This test creates multiple notes in sequence to ensure that file
    creation/update logic doesn't have race conditions or state issues
    that could cause overwrites.
    """
    notes_data = [
        ("Alpha", "# Alpha\n\nAlpha content"),
        ("Beta", "# Beta\n\nBeta content"),
        ("Gamma", "# Gamma\n\nGamma content"),
        ("Delta", "# Delta\n\nDelta content"),
        ("Epsilon", "# Epsilon\n\nEpsilon content"),
    ]

    # Create all notes
    for title, content in notes_data:
        result = await write_note.fn(
            project=test_project.name,
            title=title,
            folder="sequence-test",
            content=content,
        )
        assert "# Created note" in result or "# Updated note" in result

    # Verify all notes still exist with correct content
    for title, expected_content in notes_data:
        # Normalize title to permalink format
        permalink = f"sequence-test/{title.lower()}"
        content = await read_note.fn(permalink, project=test_project.name)

        assert title in content, f"Note '{title}' should still have its title"
        assert expected_content.split("\n\n")[1] in content, (
            f"Note '{title}' should still have its original content"
        )

    # Verify physical files exist
    project_path = Path(test_project.path)
    sequence_dir = project_path / "sequence-test"

    for title, _ in notes_data:
        file_path = sequence_dir / f"{title}.md"
        assert file_path.exists(), f"File for '{title}' should exist"

        file_content = file_path.read_text()
        assert title in file_content, f"Physical file for '{title}' should contain correct title"


@pytest.mark.asyncio
async def test_sync_permalink_collision_file_overwrite_bug(
    sync_service: SyncService,
    project_config: ProjectConfig,
    entity_service: EntityService,
):
    """Test that reproduces the permalink collision file overwrite bug via sync.

    This test directly creates files and runs sync to reproduce the exact bug
    discovered in live testing where Node C overwrote Node A.md.

    The bug occurs when:
    1. File "Node A.md" exists with permalink "edge-cases/node-a"
    2. File "Node C.md" is created with permalink "edge-cases/node-c"
    3. During sync, somehow Node C content overwrites Node A.md
    4. Result: File "Node A.md" contains Node C content (data loss!)
    """
    project_dir = project_config.home
    edge_cases_dir = project_dir / "edge-cases"
    edge_cases_dir.mkdir(parents=True, exist_ok=True)

    # Step 1: Create Node A file
    node_a_content = dedent("""
        ---
        title: Node A
        type: note
        tags:
        - circular-test
        ---

        # Node A

        Original content for Node A

        ## Relations
        - links_to [[Node B]]
        - references [[Node C]]
    """).strip()

    node_a_file = edge_cases_dir / "Node A.md"
    node_a_file.write_text(node_a_content)

    # Sync to create Node A in database
    await sync_service.sync(project_dir)

    # Verify Node A is in database
    node_a = await entity_service.get_by_permalink("edge-cases/node-a")
    assert node_a is not None
    assert node_a.title == "Node A"

    # Verify Node A file has correct content
    assert node_a_file.exists()
    node_a_file_content = node_a_file.read_text()
    assert "title: Node A" in node_a_file_content
    assert "Original content for Node A" in node_a_file_content

    # Step 2: Create Node B file
    node_b_content = dedent("""
        ---
        title: Node B
        type: note
        tags:
        - circular-test
        ---

        # Node B

        Content for Node B

        ## Relations
        - links_to [[Node C]]
        - part_of [[Node A]]
    """).strip()

    node_b_file = edge_cases_dir / "Node B.md"
    node_b_file.write_text(node_b_content)

    # Force full scan to detect the new file
    # (file just created may not be newer than watermark due to timing precision)
    await force_full_scan(sync_service)

    # Sync to create Node B
    await sync_service.sync(project_dir)

    # Step 3: Create Node C file (this is where the bug might occur)
    node_c_content = dedent("""
        ---
        title: Node C
        type: note
        tags:
        - circular-test
        ---

        # Node C

        Content for Node C

        ## Relations
        - links_to [[Node A]]
        - references [[Node B]]
    """).strip()

    node_c_file = edge_cases_dir / "Node C.md"
    node_c_file.write_text(node_c_content)

    # Force full scan to detect the new file
    # (file just created may not be newer than watermark due to timing precision)
    await force_full_scan(sync_service)

    # Sync to create Node C - THIS IS WHERE THE BUG OCCURS
    await sync_service.sync(project_dir)

    # CRITICAL VERIFICATION: Check if Node A file was overwritten
    assert node_a_file.exists(), "Node A.md file should still exist"

    # Read Node A file content to check for overwrite bug
    node_a_after_sync = node_a_file.read_text()

    # The bug: Node A.md contains Node C content instead of Node A content
    assert "title: Node A" in node_a_after_sync, (
        "Node A.md file should still have title: Node A in frontmatter"
    )
    assert "Node A" in node_a_after_sync, "Node A.md file should still contain 'Node A' title"
    assert "Original content for Node A" in node_a_after_sync, (
        f"Node A.md file should NOT be overwritten! Content: {node_a_after_sync[:200]}"
    )
    assert "Content for Node C" not in node_a_after_sync, (
        f"Node A.md should NOT contain Node C content! Content: {node_a_after_sync[:200]}"
    )

    # Verify Node C file exists with correct content
    assert node_c_file.exists(), "Node C.md file should exist"
    node_c_after_sync = node_c_file.read_text()
    assert "Node C" in node_c_after_sync
    assert "Content for Node C" in node_c_after_sync

    # Verify database has both entities correctly
    node_a_db = await entity_service.get_by_permalink("edge-cases/node-a")
    node_c_db = await entity_service.get_by_permalink("edge-cases/node-c")

    assert node_a_db is not None, "Node A should exist in database"
    assert node_a_db.title == "Node A", "Node A database entry should have correct title"

    assert node_c_db is not None, "Node C should exist in database"
    assert node_c_db.title == "Node C", "Node C database entry should have correct title"

```

--------------------------------------------------------------------------------
/src/basic_memory/api/routers/project_router.py:
--------------------------------------------------------------------------------

```python
"""Router for project management."""

import os
from fastapi import APIRouter, HTTPException, Path, Body, BackgroundTasks, Response, Query
from typing import Optional
from loguru import logger

from basic_memory.deps import (
    ProjectConfigDep,
    ProjectServiceDep,
    ProjectPathDep,
    SyncServiceDep,
)
from basic_memory.schemas import ProjectInfoResponse, SyncReportResponse
from basic_memory.schemas.project_info import (
    ProjectList,
    ProjectItem,
    ProjectInfoRequest,
    ProjectStatusResponse,
)
from basic_memory.utils import normalize_project_path

# Router for resources in a specific project
# The ProjectPathDep is used in the path as a prefix, so the request path is like /{project}/project/info
project_router = APIRouter(prefix="/project", tags=["project"])

# Router for managing project resources
project_resource_router = APIRouter(prefix="/projects", tags=["project_management"])


@project_router.get("/info", response_model=ProjectInfoResponse)
async def get_project_info(
    project_service: ProjectServiceDep,
    project: ProjectPathDep,
) -> ProjectInfoResponse:
    """Get comprehensive information about the specified Basic Memory project."""
    return await project_service.get_project_info(project)


@project_router.get("/item", response_model=ProjectItem)
async def get_project(
    project_service: ProjectServiceDep,
    project: ProjectPathDep,
) -> ProjectItem:
    """Get bassic info about the specified Basic Memory project."""
    found_project = await project_service.get_project(project)
    if not found_project:
        raise HTTPException(
            status_code=404, detail=f"Project: '{project}' does not exist"
        )  # pragma: no cover

    return ProjectItem(
        name=found_project.name,
        path=normalize_project_path(found_project.path),
        is_default=found_project.is_default or False,
    )


# Update a project
@project_router.patch("/{name}", response_model=ProjectStatusResponse)
async def update_project(
    project_service: ProjectServiceDep,
    name: str = Path(..., description="Name of the project to update"),
    path: Optional[str] = Body(None, description="New absolute path for the project"),
    is_active: Optional[bool] = Body(None, description="Status of the project (active/inactive)"),
) -> ProjectStatusResponse:
    """Update a project's information in configuration and database.

    Args:
        name: The name of the project to update
        path: Optional new absolute path for the project
        is_active: Optional status update for the project

    Returns:
        Response confirming the project was updated
    """
    try:
        # Validate that path is absolute if provided
        if path and not os.path.isabs(path):
            raise HTTPException(status_code=400, detail="Path must be absolute")

        # Get original project info for the response
        old_project_info = ProjectItem(
            name=name,
            path=project_service.projects.get(name, ""),
        )

        if path:
            await project_service.move_project(name, path)
        elif is_active is not None:
            await project_service.update_project(name, is_active=is_active)

        # Get updated project info
        updated_path = path if path else project_service.projects.get(name, "")

        return ProjectStatusResponse(
            message=f"Project '{name}' updated successfully",
            status="success",
            default=(name == project_service.default_project),
            old_project=old_project_info,
            new_project=ProjectItem(name=name, path=updated_path),
        )
    except ValueError as e:
        raise HTTPException(status_code=400, detail=str(e))


# Sync project filesystem
@project_router.post("/sync")
async def sync_project(
    background_tasks: BackgroundTasks,
    sync_service: SyncServiceDep,
    project_config: ProjectConfigDep,
    force_full: bool = Query(
        False, description="Force full scan, bypassing watermark optimization"
    ),
):
    """Force project filesystem sync to database.

    Scans the project directory and updates the database with any new or modified files.

    Args:
        background_tasks: FastAPI background tasks
        sync_service: Sync service for this project
        project_config: Project configuration
        force_full: If True, force a full scan even if watermark exists

    Returns:
        Response confirming sync was initiated
    """
    background_tasks.add_task(
        sync_service.sync, project_config.home, project_config.name, force_full=force_full
    )
    logger.info(
        f"Filesystem sync initiated for project: {project_config.name} (force_full={force_full})"
    )

    return {
        "status": "sync_started",
        "message": f"Filesystem sync initiated for project '{project_config.name}'",
    }


@project_router.post("/status", response_model=SyncReportResponse)
async def project_sync_status(
    sync_service: SyncServiceDep,
    project_config: ProjectConfigDep,
) -> SyncReportResponse:
    """Scan directory for changes compared to database state.

    Args:
        sync_service: Sync service for this project
        project_config: Project configuration

    Returns:
        Scan report with details on files that need syncing
    """
    logger.info(f"Scanning filesystem for project: {project_config.name}")
    sync_report = await sync_service.scan(project_config.home)

    return SyncReportResponse.from_sync_report(sync_report)


# List all available projects
@project_resource_router.get("/projects", response_model=ProjectList)
async def list_projects(
    project_service: ProjectServiceDep,
) -> ProjectList:
    """List all configured projects.

    Returns:
        A list of all projects with metadata
    """
    projects = await project_service.list_projects()
    default_project = project_service.default_project

    project_items = [
        ProjectItem(
            name=project.name,
            path=normalize_project_path(project.path),
            is_default=project.is_default or False,
        )
        for project in projects
    ]

    return ProjectList(
        projects=project_items,
        default_project=default_project,
    )


# Add a new project
@project_resource_router.post("/projects", response_model=ProjectStatusResponse, status_code=201)
async def add_project(
    response: Response,
    project_data: ProjectInfoRequest,
    project_service: ProjectServiceDep,
) -> ProjectStatusResponse:
    """Add a new project to configuration and database.

    Args:
        project_data: The project name and path, with option to set as default

    Returns:
        Response confirming the project was added
    """
    # Check if project already exists before attempting to add
    existing_project = await project_service.get_project(project_data.name)
    if existing_project:
        # Project exists - check if paths match for true idempotency
        # Normalize paths for comparison (resolve symlinks, etc.)
        from pathlib import Path

        requested_path = Path(project_data.path).resolve()
        existing_path = Path(existing_project.path).resolve()

        if requested_path == existing_path:
            # Same name, same path - return 200 OK (idempotent)
            response.status_code = 200
            return ProjectStatusResponse(  # pyright: ignore [reportCallIssue]
                message=f"Project '{project_data.name}' already exists",
                status="success",
                default=existing_project.is_default or False,
                new_project=ProjectItem(
                    name=existing_project.name,
                    path=existing_project.path,
                    is_default=existing_project.is_default or False,
                ),
            )
        else:
            # Same name, different path - this is an error
            raise HTTPException(
                status_code=400,
                detail=f"Project '{project_data.name}' already exists with different path. Existing: {existing_project.path}, Requested: {project_data.path}",
            )

    try:  # pragma: no cover
        # The service layer now handles cloud mode validation and path sanitization
        await project_service.add_project(
            project_data.name, project_data.path, set_default=project_data.set_default
        )

        return ProjectStatusResponse(  # pyright: ignore [reportCallIssue]
            message=f"Project '{project_data.name}' added successfully",
            status="success",
            default=project_data.set_default,
            new_project=ProjectItem(
                name=project_data.name, path=project_data.path, is_default=project_data.set_default
            ),
        )
    except ValueError as e:  # pragma: no cover
        raise HTTPException(status_code=400, detail=str(e))


# Remove a project
@project_resource_router.delete("/{name}", response_model=ProjectStatusResponse)
async def remove_project(
    project_service: ProjectServiceDep,
    name: str = Path(..., description="Name of the project to remove"),
    delete_notes: bool = Query(
        False, description="If True, delete project directory from filesystem"
    ),
) -> ProjectStatusResponse:
    """Remove a project from configuration and database.

    Args:
        name: The name of the project to remove
        delete_notes: If True, delete the project directory from the filesystem

    Returns:
        Response confirming the project was removed
    """
    try:
        old_project = await project_service.get_project(name)
        if not old_project:  # pragma: no cover
            raise HTTPException(
                status_code=404, detail=f"Project: '{name}' does not exist"
            )  # pragma: no cover

        # Check if trying to delete the default project
        if name == project_service.default_project:
            available_projects = await project_service.list_projects()
            other_projects = [p.name for p in available_projects if p.name != name]
            detail = f"Cannot delete default project '{name}'. "
            if other_projects:
                detail += (
                    f"Set another project as default first. Available: {', '.join(other_projects)}"
                )
            else:
                detail += "This is the only project in your configuration."
            raise HTTPException(status_code=400, detail=detail)

        await project_service.remove_project(name, delete_notes=delete_notes)

        return ProjectStatusResponse(
            message=f"Project '{name}' removed successfully",
            status="success",
            default=False,
            old_project=ProjectItem(name=old_project.name, path=old_project.path),
            new_project=None,
        )
    except ValueError as e:  # pragma: no cover
        raise HTTPException(status_code=400, detail=str(e))


# Set a project as default
@project_resource_router.put("/{name}/default", response_model=ProjectStatusResponse)
async def set_default_project(
    project_service: ProjectServiceDep,
    name: str = Path(..., description="Name of the project to set as default"),
) -> ProjectStatusResponse:
    """Set a project as the default project.

    Args:
        name: The name of the project to set as default

    Returns:
        Response confirming the project was set as default
    """
    try:
        # Get the old default project
        default_name = project_service.default_project
        default_project = await project_service.get_project(default_name)
        if not default_project:  # pragma: no cover
            raise HTTPException(  # pragma: no cover
                status_code=404, detail=f"Default Project: '{default_name}' does not exist"
            )

        # get the new project
        new_default_project = await project_service.get_project(name)
        if not new_default_project:  # pragma: no cover
            raise HTTPException(
                status_code=404, detail=f"Project: '{name}' does not exist"
            )  # pragma: no cover

        await project_service.set_default_project(name)

        return ProjectStatusResponse(
            message=f"Project '{name}' set as default successfully",
            status="success",
            default=True,
            old_project=ProjectItem(name=default_name, path=default_project.path),
            new_project=ProjectItem(
                name=name,
                path=new_default_project.path,
                is_default=True,
            ),
        )
    except ValueError as e:  # pragma: no cover
        raise HTTPException(status_code=400, detail=str(e))


# Get the default project
@project_resource_router.get("/default", response_model=ProjectItem)
async def get_default_project(
    project_service: ProjectServiceDep,
) -> ProjectItem:
    """Get the default project.

    Returns:
        Response with project default information
    """
    # Get the old default project
    default_name = project_service.default_project
    default_project = await project_service.get_project(default_name)
    if not default_project:  # pragma: no cover
        raise HTTPException(  # pragma: no cover
            status_code=404, detail=f"Default Project: '{default_name}' does not exist"
        )

    return ProjectItem(name=default_project.name, path=default_project.path, is_default=True)


# Synchronize projects between config and database
@project_resource_router.post("/config/sync", response_model=ProjectStatusResponse)
async def synchronize_projects(
    project_service: ProjectServiceDep,
) -> ProjectStatusResponse:
    """Synchronize projects between configuration file and database.

    Ensures that all projects in the configuration file exist in the database
    and vice versa.

    Returns:
        Response confirming synchronization was completed
    """
    try:  # pragma: no cover
        await project_service.synchronize_projects()

        return ProjectStatusResponse(  # pyright: ignore [reportCallIssue]
            message="Projects synchronized successfully between configuration and database",
            status="success",
            default=False,
        )
    except ValueError as e:  # pragma: no cover
        raise HTTPException(status_code=400, detail=str(e))

```

--------------------------------------------------------------------------------
/test-int/mcp/test_delete_note_integration.py:
--------------------------------------------------------------------------------

```python
"""
Integration tests for delete_note MCP tool.

Tests the complete delete note workflow: MCP client -> MCP server -> FastAPI -> database
"""

import pytest
from fastmcp import Client


@pytest.mark.asyncio
async def test_delete_note_by_title(mcp_server, app, test_project):
    """Test deleting a note by its title."""

    async with Client(mcp_server) as client:
        # First create a note
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Note to Delete",
                "folder": "test",
                "content": "# Note to Delete\n\nThis note will be deleted.",
                "tags": "test,delete",
            },
        )

        # Verify the note exists by reading it
        read_result = await client.call_tool(
            "read_note",
            {
                "project": test_project.name,
                "identifier": "Note to Delete",
            },
        )
        assert len(read_result.content) == 1
        assert "Note to Delete" in read_result.content[0].text

        # Delete the note by title
        delete_result = await client.call_tool(
            "delete_note",
            {
                "project": test_project.name,
                "identifier": "Note to Delete",
            },
        )

        # Should return True for successful deletion
        assert len(delete_result.content) == 1
        assert delete_result.content[0].type == "text"
        assert "true" in delete_result.content[0].text.lower()

        # Verify the note no longer exists
        read_after_delete = await client.call_tool(
            "read_note",
            {
                "project": test_project.name,
                "identifier": "Note to Delete",
            },
        )

        # Should return helpful "Note Not Found" message instead of the actual note
        assert len(read_after_delete.content) == 1
        result_text = read_after_delete.content[0].text
        assert "Note Not Found" in result_text
        assert "Note to Delete" in result_text


@pytest.mark.asyncio
async def test_delete_note_by_permalink(mcp_server, app, test_project):
    """Test deleting a note by its permalink."""

    async with Client(mcp_server) as client:
        # Create a note
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Permalink Delete Test",
                "folder": "tests",
                "content": "# Permalink Delete Test\n\nTesting deletion by permalink.",
                "tags": "test,permalink",
            },
        )

        # Delete the note by permalink
        delete_result = await client.call_tool(
            "delete_note",
            {
                "project": test_project.name,
                "identifier": "tests/permalink-delete-test",
            },
        )

        # Should return True for successful deletion
        assert len(delete_result.content) == 1
        assert "true" in delete_result.content[0].text.lower()

        # Verify the note no longer exists by searching
        search_result = await client.call_tool(
            "search_notes",
            {
                "project": test_project.name,
                "query": "Permalink Delete Test",
            },
        )

        # Should have no results
        assert (
            '"results": []' in search_result.content[0].text
            or '"results":[]' in search_result.content[0].text
        )


@pytest.mark.asyncio
async def test_delete_note_with_observations_and_relations(mcp_server, app, test_project):
    """Test deleting a note that has observations and relations."""

    async with Client(mcp_server) as client:
        # Create a complex note with observations and relations
        complex_content = """# Project Management System

This is a comprehensive project management system.

## Observations
- [feature] Task tracking functionality
- [feature] User authentication system
- [tech] Built with Python and Flask
- [status] Currently in development

## Relations
- depends_on [[Database Schema]]
- implements [[User Stories]]
- part_of [[Main Application]]

The system handles multiple projects and users."""

        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Project Management System",
                "folder": "projects",
                "content": complex_content,
                "tags": "project,management,system",
            },
        )

        # Verify the note exists and has content
        read_result = await client.call_tool(
            "read_note",
            {
                "project": test_project.name,
                "identifier": "Project Management System",
            },
        )
        assert len(read_result.content) == 1
        result_text = read_result.content[0].text
        assert "Task tracking functionality" in result_text
        assert "depends_on" in result_text

        # Delete the complex note
        delete_result = await client.call_tool(
            "delete_note",
            {
                "project": test_project.name,
                "identifier": "projects/project-management-system",
            },
        )

        # Should return True for successful deletion
        assert "true" in delete_result.content[0].text.lower()

        # Verify the note and all its components are deleted
        read_after_delete_2 = await client.call_tool(
            "read_note",
            {
                "project": test_project.name,
                "identifier": "Project Management System",
            },
        )

        # Should return "Note Not Found" message
        assert len(read_after_delete_2.content) == 1
        result_text = read_after_delete_2.content[0].text
        assert "Note Not Found" in result_text
        assert "Project Management System" in result_text


@pytest.mark.asyncio
async def test_delete_note_special_characters_in_title(mcp_server, app, test_project):
    """Test deleting notes with special characters in the title."""

    async with Client(mcp_server) as client:
        # Create notes with special characters
        special_titles = [
            "Note with spaces",
            "Note-with-dashes",
            "Note_with_underscores",
            "Note (with parentheses)",
            "Note & Symbols!",
        ]

        # Create all the notes
        for title in special_titles:
            await client.call_tool(
                "write_note",
                {
                    "project": test_project.name,
                    "title": title,
                    "folder": "special",
                    "content": f"# {title}\n\nContent for {title}",
                    "tags": "special,characters",
                },
            )

        # Delete each note by title
        for title in special_titles:
            delete_result = await client.call_tool(
                "delete_note",
                {
                    "project": test_project.name,
                    "identifier": title,
                },
            )

            # Should return True for successful deletion
            assert "true" in delete_result.content[0].text.lower(), (
                f"Failed to delete note: {title}"
            )

            # Verify the note is deleted
            read_after_delete = await client.call_tool(
                "read_note",
                {
                    "project": test_project.name,
                    "identifier": title,
                },
            )

            # Should return "Note Not Found" message
            assert len(read_after_delete.content) == 1
            result_text = read_after_delete.content[0].text
            assert "Note Not Found" in result_text
            assert title in result_text


@pytest.mark.asyncio
async def test_delete_nonexistent_note(mcp_server, app, test_project):
    """Test attempting to delete a note that doesn't exist."""

    async with Client(mcp_server) as client:
        # Try to delete a note that doesn't exist
        delete_result = await client.call_tool(
            "delete_note",
            {
                "project": test_project.name,
                "identifier": "Nonexistent Note",
            },
        )

        # Should return False for unsuccessful deletion
        assert len(delete_result.content) == 1
        assert "false" in delete_result.content[0].text.lower()


@pytest.mark.asyncio
async def test_delete_note_by_file_path(mcp_server, app, test_project):
    """Test deleting a note using its file path."""

    async with Client(mcp_server) as client:
        # Create a note
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "File Path Delete",
                "folder": "docs",
                "content": "# File Path Delete\n\nTesting deletion by file path.",
                "tags": "test,filepath",
            },
        )

        # Try to delete using the file path (should work as an identifier)
        delete_result = await client.call_tool(
            "delete_note",
            {
                "project": test_project.name,
                "identifier": "docs/File Path Delete.md",
            },
        )

        # Should return True for successful deletion
        assert "true" in delete_result.content[0].text.lower()

        # Verify deletion
        read_after_delete = await client.call_tool(
            "read_note",
            {
                "project": test_project.name,
                "identifier": "File Path Delete",
            },
        )

        # Should return "Note Not Found" message
        assert len(read_after_delete.content) == 1
        result_text = read_after_delete.content[0].text
        assert "Note Not Found" in result_text
        assert "File Path Delete" in result_text


@pytest.mark.asyncio
async def test_delete_note_case_insensitive(mcp_server, app, test_project):
    """Test that note deletion is case insensitive for titles."""

    async with Client(mcp_server) as client:
        # Create a note with mixed case
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "CamelCase Note Title",
                "folder": "test",
                "content": "# CamelCase Note Title\n\nTesting case sensitivity.",
                "tags": "test,case",
            },
        )

        # Try to delete with different case
        delete_result = await client.call_tool(
            "delete_note",
            {
                "project": test_project.name,
                "identifier": "camelcase note title",
            },
        )

        # Should return True for successful deletion
        assert "true" in delete_result.content[0].text.lower()


@pytest.mark.asyncio
async def test_delete_multiple_notes_sequentially(mcp_server, app, test_project):
    """Test deleting multiple notes in sequence."""

    async with Client(mcp_server) as client:
        # Create multiple notes
        note_titles = [
            "First Note",
            "Second Note",
            "Third Note",
            "Fourth Note",
            "Fifth Note",
        ]

        for title in note_titles:
            await client.call_tool(
                "write_note",
                {
                    "project": test_project.name,
                    "title": title,
                    "folder": "batch",
                    "content": f"# {title}\n\nContent for {title}",
                    "tags": "batch,test",
                },
            )

        # Delete all notes sequentially
        for title in note_titles:
            delete_result = await client.call_tool(
                "delete_note",
                {
                    "project": test_project.name,
                    "identifier": title,
                },
            )

            # Each deletion should be successful
            assert "true" in delete_result.content[0].text.lower(), f"Failed to delete {title}"

        # Verify all notes are deleted by searching
        search_result = await client.call_tool(
            "search_notes",
            {
                "project": test_project.name,
                "query": "batch",
            },
        )

        # Should have no results
        assert (
            '"results": []' in search_result.content[0].text
            or '"results":[]' in search_result.content[0].text
        )


@pytest.mark.asyncio
async def test_delete_note_with_unicode_content(mcp_server, app, test_project):
    """Test deleting notes with Unicode content."""

    async with Client(mcp_server) as client:
        # Create a note with Unicode content
        unicode_content = """# Unicode Test Note 🚀

This note contains various Unicode characters:
- Emojis: 🎉 🔥 ⚡ 💡
- Languages: 测试中文 Tëst Übër
- Symbols: ♠♣♥♦ ←→↑↓ ∞≠≤≥
- Math: ∑∏∂∇∆Ω

## Observations
- [test] Unicode characters preserved ✓
- [note] Emoji support working 🎯

## Relations  
- supports [[Unicode Standards]]
- tested_with [[Various Languages]]"""

        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Unicode Test Note",
                "folder": "unicode",
                "content": unicode_content,
                "tags": "unicode,test,emoji",
            },
        )

        # Delete the Unicode note
        delete_result = await client.call_tool(
            "delete_note",
            {
                "project": test_project.name,
                "identifier": "Unicode Test Note",
            },
        )

        # Should return True for successful deletion
        assert "true" in delete_result.content[0].text.lower()

        # Verify deletion
        read_after_delete = await client.call_tool(
            "read_note",
            {
                "project": test_project.name,
                "identifier": "Unicode Test Note",
            },
        )

        # Should return "Note Not Found" message
        assert len(read_after_delete.content) == 1
        result_text = read_after_delete.content[0].text
        assert "Note Not Found" in result_text
        assert "Unicode Test Note" in result_text

```

--------------------------------------------------------------------------------
/src/basic_memory/utils.py:
--------------------------------------------------------------------------------

```python
"""Utility functions for basic-memory."""

import os

import logging
import re
import sys
from datetime import datetime
from pathlib import Path
from typing import Optional, Protocol, Union, runtime_checkable, List

from loguru import logger
from unidecode import unidecode


def normalize_project_path(path: str) -> str:
    """Normalize project path by stripping mount point prefix.

    In cloud deployments, the S3 bucket is mounted at /app/data. We strip this
    prefix from project paths to avoid leaking implementation details and to
    ensure paths match the actual S3 bucket structure.

    For local paths (including Windows paths), returns the path unchanged.

    Args:
        path: Project path (e.g., "/app/data/basic-memory-llc" or "C:\\Users\\...")

    Returns:
        Normalized path (e.g., "/basic-memory-llc" or "C:\\Users\\...")

    Examples:
        >>> normalize_project_path("/app/data/my-project")
        '/my-project'
        >>> normalize_project_path("/my-project")
        '/my-project'
        >>> normalize_project_path("app/data/my-project")
        '/my-project'
        >>> normalize_project_path("C:\\\\Users\\\\project")
        'C:\\\\Users\\\\project'
    """
    # Check if this is a Windows absolute path (e.g., C:\Users\...)
    # Windows paths have a drive letter followed by a colon
    if len(path) >= 2 and path[1] == ":":
        # Windows absolute path - return unchanged
        return path

    # Handle both absolute and relative Unix paths
    normalized = path.lstrip("/")
    if normalized.startswith("app/data/"):
        normalized = normalized.removeprefix("app/data/")

    # Ensure leading slash for Unix absolute paths
    if not normalized.startswith("/"):
        normalized = "/" + normalized

    return normalized


@runtime_checkable
class PathLike(Protocol):
    """Protocol for objects that can be used as paths."""

    def __str__(self) -> str: ...


# In type annotations, use Union[Path, str] instead of FilePath for now
# This preserves compatibility with existing code while we migrate
FilePath = Union[Path, str]

# Disable the "Queue is full" warning
logging.getLogger("opentelemetry.sdk.metrics._internal.instrument").setLevel(logging.ERROR)


def generate_permalink(file_path: Union[Path, str, PathLike], split_extension: bool = True) -> str:
    """Generate a stable permalink from a file path.

    Args:
        file_path: Original file path (str, Path, or PathLike)

    Returns:
        Normalized permalink that matches validation rules. Converts spaces and underscores
        to hyphens for consistency. Preserves non-ASCII characters like Chinese.

    Examples:
        >>> generate_permalink("docs/My Feature.md")
        'docs/my-feature'
        >>> generate_permalink("specs/API (v2).md")
        'specs/api-v2'
        >>> generate_permalink("design/unified_model_refactor.md")
        'design/unified-model-refactor'
        >>> generate_permalink("中文/测试文档.md")
        '中文/测试文档'
    """
    # Convert Path to string if needed
    path_str = Path(str(file_path)).as_posix()

    # Remove extension (for now, possibly)
    (base, extension) = os.path.splitext(path_str)

    # Check if we have CJK characters that should be preserved
    # CJK ranges: \u4e00-\u9fff (CJK Unified Ideographs), \u3000-\u303f (CJK symbols),
    # \u3400-\u4dbf (CJK Extension A), \uff00-\uffef (Fullwidth forms)
    has_cjk_chars = any(
        "\u4e00" <= char <= "\u9fff"
        or "\u3000" <= char <= "\u303f"
        or "\u3400" <= char <= "\u4dbf"
        or "\uff00" <= char <= "\uffef"
        for char in base
    )

    if has_cjk_chars:
        # For text with CJK characters, selectively transliterate only Latin accented chars
        result = ""
        for char in base:
            if (
                "\u4e00" <= char <= "\u9fff"
                or "\u3000" <= char <= "\u303f"
                or "\u3400" <= char <= "\u4dbf"
            ):
                # Preserve CJK ideographs and symbols
                result += char
            elif "\uff00" <= char <= "\uffef":
                # Remove Chinese fullwidth punctuation entirely (like ,!?)
                continue
            else:
                # Transliterate Latin accented characters to ASCII
                result += unidecode(char)

        # Insert hyphens between CJK and Latin character transitions
        # Match: CJK followed by Latin letter/digit, or Latin letter/digit followed by CJK
        result = re.sub(
            r"([\u4e00-\u9fff\u3000-\u303f\u3400-\u4dbf])([a-zA-Z0-9])", r"\1-\2", result
        )
        result = re.sub(
            r"([a-zA-Z0-9])([\u4e00-\u9fff\u3000-\u303f\u3400-\u4dbf])", r"\1-\2", result
        )

        # Insert dash between camelCase
        result = re.sub(r"([a-z0-9])([A-Z])", r"\1-\2", result)

        # Convert ASCII letters to lowercase, preserve CJK
        lower_text = "".join(c.lower() if c.isascii() and c.isalpha() else c for c in result)

        # Replace underscores with hyphens
        text_with_hyphens = lower_text.replace("_", "-")

        # Remove apostrophes entirely (don't replace with hyphens)
        text_no_apostrophes = text_with_hyphens.replace("'", "")

        # Replace unsafe chars with hyphens, but preserve CJK characters
        clean_text = re.sub(
            r"[^a-z0-9\u4e00-\u9fff\u3000-\u303f\u3400-\u4dbf/\-]", "-", text_no_apostrophes
        )
    else:
        # Original ASCII-only processing for backward compatibility
        # Transliterate unicode to ascii
        ascii_text = unidecode(base)

        # Insert dash between camelCase
        ascii_text = re.sub(r"([a-z0-9])([A-Z])", r"\1-\2", ascii_text)

        # Convert to lowercase
        lower_text = ascii_text.lower()

        # replace underscores with hyphens
        text_with_hyphens = lower_text.replace("_", "-")

        # Remove apostrophes entirely (don't replace with hyphens)
        text_no_apostrophes = text_with_hyphens.replace("'", "")

        # Replace remaining invalid chars with hyphens
        clean_text = re.sub(r"[^a-z0-9/\-]", "-", text_no_apostrophes)

    # Collapse multiple hyphens
    clean_text = re.sub(r"-+", "-", clean_text)

    # Clean each path segment
    segments = clean_text.split("/")
    clean_segments = [s.strip("-") for s in segments]

    return_val = "/".join(clean_segments)

    # Append file extension back, if necessary
    if not split_extension and extension:
        return_val += extension

    return return_val


def setup_logging(
    env: str,
    home_dir: Path,
    log_file: Optional[str] = None,
    log_level: str = "INFO",
    console: bool = True,
) -> None:  # pragma: no cover
    """
    Configure logging for the application.

    Args:
        env: The environment name (dev, test, prod)
        home_dir: The root directory for the application
        log_file: The name of the log file to write to
        log_level: The logging level to use
        console: Whether to log to the console
    """
    # Remove default handler and any existing handlers
    logger.remove()

    # Add file handler if we are not running tests and a log file is specified
    if log_file and env != "test":
        # Setup file logger
        log_path = home_dir / log_file
        logger.add(
            str(log_path),
            level=log_level,
            rotation="10 MB",
            retention="10 days",
            backtrace=True,
            diagnose=True,
            enqueue=True,
            colorize=False,
        )

    # Add console logger if requested or in test mode
    if env == "test" or console:
        logger.add(sys.stderr, level=log_level, backtrace=True, diagnose=True, colorize=True)

    logger.info(f"ENV: '{env}' Log level: '{log_level}' Logging to {log_file}")

    # Bind environment context for structured logging (works in both local and cloud)
    tenant_id = os.getenv("BASIC_MEMORY_TENANT_ID", "local")
    fly_app_name = os.getenv("FLY_APP_NAME", "local")
    fly_machine_id = os.getenv("FLY_MACHINE_ID", "local")
    fly_region = os.getenv("FLY_REGION", "local")

    logger.configure(
        extra={
            "tenant_id": tenant_id,
            "fly_app_name": fly_app_name,
            "fly_machine_id": fly_machine_id,
            "fly_region": fly_region,
        }
    )

    # Reduce noise from third-party libraries
    noisy_loggers = {
        # HTTP client logs
        "httpx": logging.WARNING,
        # File watching logs
        "watchfiles.main": logging.WARNING,
    }

    # Set log levels for noisy loggers
    for logger_name, level in noisy_loggers.items():
        logging.getLogger(logger_name).setLevel(level)


def parse_tags(tags: Union[List[str], str, None]) -> List[str]:
    """Parse tags from various input formats into a consistent list.

    Args:
        tags: Can be a list of strings, a comma-separated string, or None

    Returns:
        A list of tag strings, or an empty list if no tags

    Note:
        This function strips leading '#' characters from tags to prevent
        their accumulation when tags are processed multiple times.
    """
    if tags is None:
        return []

    # Process list of tags
    if isinstance(tags, list):
        # First strip whitespace, then strip leading '#' characters to prevent accumulation
        return [tag.strip().lstrip("#") for tag in tags if tag and tag.strip()]

    # Process string input
    if isinstance(tags, str):
        # Check if it's a JSON array string (common issue from AI assistants)
        import json

        if tags.strip().startswith("[") and tags.strip().endswith("]"):
            try:
                # Try to parse as JSON array
                parsed_json = json.loads(tags)
                if isinstance(parsed_json, list):
                    # Recursively parse the JSON array as a list
                    return parse_tags(parsed_json)
            except json.JSONDecodeError:
                # Not valid JSON, fall through to comma-separated parsing
                pass

        # Split by comma, strip whitespace, then strip leading '#' characters
        return [tag.strip().lstrip("#") for tag in tags.split(",") if tag and tag.strip()]

    # For any other type, try to convert to string and parse
    try:  # pragma: no cover
        return parse_tags(str(tags))
    except (ValueError, TypeError):  # pragma: no cover
        logger.warning(f"Couldn't parse tags from input of type {type(tags)}: {tags}")
        return []


def normalize_newlines(multiline: str) -> str:
    """Replace any \r\n, \r, or \n with the native newline.

    Args:
        multiline: String containing any mixture of newlines.

    Returns:
        A string with normalized newlines native to the platform.
    """
    return re.sub(r"\r\n?|\n", os.linesep, multiline)


def normalize_file_path_for_comparison(file_path: str) -> str:
    """Normalize a file path for conflict detection.

    This function normalizes file paths to help detect potential conflicts:
    - Converts to lowercase for case-insensitive comparison
    - Normalizes Unicode characters
    - Handles path separators consistently

    Args:
        file_path: The file path to normalize

    Returns:
        Normalized file path for comparison purposes
    """
    import unicodedata

    # Convert to lowercase for case-insensitive comparison
    normalized = file_path.lower()

    # Normalize Unicode characters (NFD normalization)
    normalized = unicodedata.normalize("NFD", normalized)

    # Replace path separators with forward slashes
    normalized = normalized.replace("\\", "/")

    # Remove multiple slashes
    normalized = re.sub(r"/+", "/", normalized)

    return normalized


def detect_potential_file_conflicts(file_path: str, existing_paths: List[str]) -> List[str]:
    """Detect potential conflicts between a file path and existing paths.

    This function checks for various types of conflicts:
    - Case sensitivity differences
    - Unicode normalization differences
    - Path separator differences
    - Permalink generation conflicts

    Args:
        file_path: The file path to check
        existing_paths: List of existing file paths to check against

    Returns:
        List of existing paths that might conflict with the given file path
    """
    conflicts = []

    # Normalize the input file path
    normalized_input = normalize_file_path_for_comparison(file_path)
    input_permalink = generate_permalink(file_path)

    for existing_path in existing_paths:
        # Skip identical paths
        if existing_path == file_path:
            continue

        # Check for case-insensitive path conflicts
        normalized_existing = normalize_file_path_for_comparison(existing_path)
        if normalized_input == normalized_existing:
            conflicts.append(existing_path)
            continue

        # Check for permalink conflicts
        existing_permalink = generate_permalink(existing_path)
        if input_permalink == existing_permalink:
            conflicts.append(existing_path)
            continue

    return conflicts


def valid_project_path_value(path: str):
    """Ensure project path is valid."""
    # Allow empty strings as they resolve to the project root
    if not path:
        return True

    # Check for obvious path traversal patterns first
    if ".." in path or "~" in path:
        return False

    # Check for Windows-style path traversal (even on Unix systems)
    if "\\.." in path or path.startswith("\\"):
        return False

    # Block absolute paths (Unix-style starting with / or Windows-style with drive letters)
    if path.startswith("/") or (len(path) >= 2 and path[1] == ":"):
        return False

    # Block paths with control characters (but allow whitespace that will be stripped)
    if path.strip() and any(ord(c) < 32 and c not in [" ", "\t"] for c in path):
        return False

    return True


def validate_project_path(path: str, project_path: Path) -> bool:
    """Ensure path is valid and stays within project boundaries."""

    if not valid_project_path_value(path):
        return False

    try:
        resolved = (project_path / path).resolve()
        return resolved.is_relative_to(project_path.resolve())
    except (ValueError, OSError):
        return False


def ensure_timezone_aware(dt: datetime) -> datetime:
    """Ensure a datetime is timezone-aware using system timezone.

    If the datetime is naive, convert it to timezone-aware using the system's local timezone.
    If it's already timezone-aware, return it unchanged.

    Args:
        dt: The datetime to ensure is timezone-aware

    Returns:
        A timezone-aware datetime
    """
    if dt.tzinfo is None:
        # Naive datetime - assume it's in local time and add timezone
        return dt.astimezone()
    else:
        # Already timezone-aware
        return dt

```

--------------------------------------------------------------------------------
/test-int/mcp/test_chatgpt_tools_integration.py:
--------------------------------------------------------------------------------

```python
"""
Integration tests for ChatGPT-compatible MCP tools.

Tests the complete flow of search and fetch tools designed for ChatGPT integration,
ensuring they properly wrap Basic Memory's MCP tools and return OpenAI-compatible
MCP content array format.
"""

import json
import pytest
from fastmcp import Client


def extract_mcp_json_content(mcp_result):
    """
    Helper to extract JSON content from MCP CallToolResult.

    FastMCP auto-serializes our List[Dict[str, Any]] return values, so we need to:
    1. Get the content list from the CallToolResult
    2. Parse the JSON string in the text field (which is our serialized list)
    3. Extract the actual JSON from the MCP content array structure
    """
    content_list = mcp_result.content
    mcp_content_list = json.loads(content_list[0].text)
    return json.loads(mcp_content_list[0]["text"])


@pytest.mark.asyncio
async def test_chatgpt_search_basic(mcp_server, app, test_project):
    """Test basic ChatGPT search functionality with MCP content array format."""

    async with Client(mcp_server) as client:
        # Create test notes for searching
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Machine Learning Fundamentals",
                "folder": "ai",
                "content": (
                    "# Machine Learning Fundamentals\n\nIntroduction to ML concepts and algorithms."
                ),
                "tags": "ml,ai,fundamentals",
            },
        )

        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Deep Learning with PyTorch",
                "folder": "ai",
                "content": (
                    "# Deep Learning with PyTorch\n\n"
                    "Building neural networks using PyTorch framework."
                ),
                "tags": "pytorch,deep-learning,ai",
            },
        )

        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Data Visualization Guide",
                "folder": "data",
                "content": (
                    "# Data Visualization Guide\n\nCreating charts and graphs for data analysis."
                ),
                "tags": "visualization,data,charts",
            },
        )

        # Test ChatGPT search tool
        search_result = await client.call_tool(
            "search",
            {
                "query": "Machine Learning",
            },
        )

        # Extract JSON content from MCP result
        results_json = extract_mcp_json_content(search_result)
        assert "results" in results_json
        assert len(results_json["results"]) > 0

        # Check result structure
        first_result = results_json["results"][0]
        assert "id" in first_result
        assert "title" in first_result
        assert "url" in first_result

        # Verify correct content found
        titles = [r["title"] for r in results_json["results"]]
        assert "Machine Learning Fundamentals" in titles
        assert "Data Visualization Guide" not in titles


@pytest.mark.asyncio
async def test_chatgpt_search_empty_results(mcp_server, app, test_project):
    """Test ChatGPT search with no matching results."""

    async with Client(mcp_server) as client:
        # Search for non-existent content
        search_result = await client.call_tool(
            "search",
            {
                "query": "NonExistentTopic12345",
            },
        )

        # Extract JSON content from MCP result
        results_json = extract_mcp_json_content(search_result)
        assert "results" in results_json
        assert len(results_json["results"]) == 0
        assert results_json["query"] == "NonExistentTopic12345"


@pytest.mark.asyncio
async def test_chatgpt_search_with_boolean_operators(mcp_server, app, test_project):
    """Test ChatGPT search with boolean operators."""

    async with Client(mcp_server) as client:
        # Create test notes
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Python Web Frameworks",
                "folder": "dev",
                "content": (
                    "# Python Web Frameworks\n\nComparing Django and Flask for web development."
                ),
                "tags": "python,web,frameworks",
            },
        )

        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "JavaScript Frameworks",
                "folder": "dev",
                "content": "# JavaScript Frameworks\n\nReact, Vue, and Angular comparison.",
                "tags": "javascript,web,frameworks",
            },
        )

        # Test with AND operator
        search_result = await client.call_tool(
            "search",
            {
                "query": "Python AND frameworks",
            },
        )

        results_json = extract_mcp_json_content(search_result)
        titles = [r["title"] for r in results_json["results"]]
        assert "Python Web Frameworks" in titles
        assert "JavaScript Frameworks" not in titles


@pytest.mark.asyncio
async def test_chatgpt_fetch_document(mcp_server, app, test_project):
    """Test ChatGPT fetch tool for retrieving full document content."""

    async with Client(mcp_server) as client:
        # Create a test note
        note_content = """# Advanced Python Techniques

## Overview
This document covers advanced Python programming techniques.

## Topics Covered
- Decorators
- Context Managers
- Metaclasses
- Async/Await patterns

## Code Examples
```python
def my_decorator(func):
    def wrapper(*args, **kwargs):
        return func(*args, **kwargs)
    return wrapper
```
"""

        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Advanced Python Techniques",
                "folder": "programming",
                "content": note_content,
                "tags": "python,advanced,programming",
            },
        )

        # Fetch the document using its title
        fetch_result = await client.call_tool(
            "fetch",
            {
                "id": "Advanced Python Techniques",
            },
        )

        # Extract JSON content from MCP result
        document_json = extract_mcp_json_content(fetch_result)
        assert "id" in document_json
        assert "title" in document_json
        assert "text" in document_json
        assert "url" in document_json
        assert "metadata" in document_json

        # Verify content
        assert document_json["title"] == "Advanced Python Techniques"
        assert "Decorators" in document_json["text"]
        assert "Context Managers" in document_json["text"]
        assert "def my_decorator" in document_json["text"]


@pytest.mark.asyncio
async def test_chatgpt_fetch_by_permalink(mcp_server, app, test_project):
    """Test ChatGPT fetch using permalink identifier."""

    async with Client(mcp_server) as client:
        # Create a note with known content
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Test Document",
                "folder": "test",
                "content": "# Test Document\n\nThis is test content for permalink fetching.",
                "tags": "test",
            },
        )

        # First search to get the permalink
        search_result = await client.call_tool(
            "search",
            {
                "query": "Test Document",
            },
        )

        results_json = extract_mcp_json_content(search_result)
        assert len(results_json["results"]) > 0
        permalink = results_json["results"][0]["id"]

        # Fetch using the permalink
        fetch_result = await client.call_tool(
            "fetch",
            {
                "id": permalink,
            },
        )

        # Verify the fetched document
        document_json = extract_mcp_json_content(fetch_result)
        assert document_json["id"] == permalink
        assert "Test Document" in document_json["title"]
        assert "test content for permalink fetching" in document_json["text"]


@pytest.mark.asyncio
async def test_chatgpt_fetch_nonexistent_document(mcp_server, app, test_project):
    """Test ChatGPT fetch with non-existent document ID."""

    async with Client(mcp_server) as client:
        # Try to fetch a non-existent document
        fetch_result = await client.call_tool(
            "fetch",
            {
                "id": "NonExistentDocument12345",
            },
        )

        # Extract JSON content from MCP result
        document_json = extract_mcp_json_content(fetch_result)

        # Should have document structure even for errors
        assert "id" in document_json
        assert "title" in document_json
        assert "text" in document_json

        # Check for error indication
        assert document_json["id"] == "NonExistentDocument12345"
        assert "Not Found" in document_json["text"] or "not found" in document_json["text"]


@pytest.mark.asyncio
async def test_chatgpt_fetch_with_empty_title(mcp_server, app, test_project):
    """Test ChatGPT fetch handles documents with empty or missing titles."""

    async with Client(mcp_server) as client:
        # Create a note without a title in the content
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "untitled-note",
                "folder": "misc",
                "content": "This is content without a markdown header.\n\nJust plain text.",
                "tags": "misc",
            },
        )

        # Fetch the document
        fetch_result = await client.call_tool(
            "fetch",
            {
                "id": "untitled-note",
            },
        )

        # Parse JSON response
        document_json = extract_mcp_json_content(fetch_result)

        # Should have a title even if content doesn't have one
        assert "title" in document_json
        assert document_json["title"] != ""
        assert document_json["title"] is not None
        assert "content without a markdown header" in document_json["text"]


@pytest.mark.asyncio
async def test_chatgpt_search_pagination_default(mcp_server, app, test_project):
    """Test that ChatGPT search uses reasonable pagination defaults."""

    async with Client(mcp_server) as client:
        # Create more than 10 notes to test pagination
        for i in range(15):
            await client.call_tool(
                "write_note",
                {
                    "project": test_project.name,
                    "title": f"Test Note {i}",
                    "folder": "bulk",
                    "content": f"# Test Note {i}\n\nThis is test content number {i}.",
                    "tags": "test,bulk",
                },
            )

        # Search should return max 10 results by default
        search_result = await client.call_tool(
            "search",
            {
                "query": "Test Note",
            },
        )

        results_json = extract_mcp_json_content(search_result)

        # Should have at most 10 results (the default page_size)
        assert len(results_json["results"]) <= 10
        assert results_json["total_count"] <= 10


@pytest.mark.asyncio
async def test_chatgpt_tools_error_handling(mcp_server, app, test_project):
    """Test error handling in ChatGPT tools returns proper MCP format."""

    async with Client(mcp_server) as client:
        # Test search with invalid query (if validation exists)
        # Using empty query to potentially trigger an error
        search_result = await client.call_tool(
            "search",
            {
                "query": "",  # Empty query might cause an error
            },
        )

        # Should still return MCP content array format
        assert hasattr(search_result, "content")
        content_list = search_result.content
        assert isinstance(content_list, list)
        assert len(content_list) == 1
        assert content_list[0].type == "text"

        # Should be valid JSON even on error
        results_json = extract_mcp_json_content(search_result)
        assert "results" in results_json  # Should have results key even if empty


@pytest.mark.asyncio
async def test_chatgpt_integration_workflow(mcp_server, app, test_project):
    """Test complete workflow: search then fetch, as ChatGPT would use it."""

    async with Client(mcp_server) as client:
        # Step 1: Create multiple documents
        docs = [
            {
                "title": "API Design Best Practices",
                "content": (
                    "# API Design Best Practices\n\nRESTful API design principles and patterns."
                ),
                "tags": "api,rest,design",
            },
            {
                "title": "GraphQL vs REST",
                "content": "# GraphQL vs REST\n\nComparing GraphQL and REST API architectures.",
                "tags": "api,graphql,rest",
            },
            {
                "title": "Database Design Patterns",
                "content": (
                    "# Database Design Patterns\n\n"
                    "Common database design patterns and anti-patterns."
                ),
                "tags": "database,design,patterns",
            },
        ]

        for doc in docs:
            await client.call_tool(
                "write_note",
                {
                    "project": test_project.name,
                    "title": doc["title"],
                    "folder": "architecture",
                    "content": doc["content"],
                    "tags": doc["tags"],
                },
            )

        # Step 2: Search for API-related content (as ChatGPT would)
        search_result = await client.call_tool(
            "search",
            {
                "query": "API",
            },
        )

        results_json = extract_mcp_json_content(search_result)
        assert len(results_json["results"]) >= 2

        # Step 3: Fetch one of the search results (as ChatGPT would)
        first_result_id = results_json["results"][0]["id"]
        fetch_result = await client.call_tool(
            "fetch",
            {
                "id": first_result_id,
            },
        )

        document_json = extract_mcp_json_content(fetch_result)

        # Verify the fetched document matches search result
        assert document_json["id"] == first_result_id
        assert "API" in document_json["text"] or "api" in document_json["text"].lower()

        # Verify document has expected structure
        assert document_json["metadata"]["format"] == "markdown"

```

--------------------------------------------------------------------------------
/tests/cli/test_cli_tools.py:
--------------------------------------------------------------------------------

```python
"""Tests for the Basic Memory CLI tools.

These tests use real MCP tools with the test environment instead of mocks.
"""

# Import for testing

import io
from datetime import datetime, timedelta
import json
from textwrap import dedent
from typing import AsyncGenerator
from unittest.mock import patch

import pytest_asyncio
from typer.testing import CliRunner

from basic_memory.cli.commands.tool import tool_app
from basic_memory.schemas.base import Entity as EntitySchema

runner = CliRunner()


@pytest_asyncio.fixture
async def setup_test_note(entity_service, search_service) -> AsyncGenerator[dict, None]:
    """Create a test note for CLI tests."""
    note_content = dedent("""
        # Test Note
        
        This is a test note for CLI commands.
        
        ## Observations
        - [tech] Test observation #test
        - [note] Another observation
        
        ## Relations
        - connects_to [[Another Note]]
    """)

    entity, created = await entity_service.create_or_update_entity(
        EntitySchema(
            title="Test Note",
            folder="test",
            entity_type="note",
            content=note_content,
        )
    )

    # Index the entity for search
    await search_service.index_entity(entity)

    yield {
        "title": entity.title,
        "permalink": entity.permalink,
        "content": note_content,
    }


def test_write_note(cli_env, project_config, test_project):
    """Test write_note command with basic arguments."""
    result = runner.invoke(
        tool_app,
        [
            "write-note",
            "--title",
            "CLI Test Note",
            "--content",
            "This is a CLI test note",
            "--folder",
            "test",
            "--project",
            test_project.name,
        ],
    )
    assert result.exit_code == 0

    # Check for expected success message
    assert "CLI Test Note" in result.stdout
    assert "Created" in result.stdout or "Updated" in result.stdout
    assert "permalink" in result.stdout


def test_write_note_with_project_arg(cli_env, project_config, test_project):
    """Test write_note command with basic arguments."""
    result = runner.invoke(
        tool_app,
        [
            "write-note",
            "--project",
            test_project.name,
            "--title",
            "CLI Test Note",
            "--content",
            "This is a CLI test note",
            "--folder",
            "test",
        ],
    )
    assert result.exit_code == 0

    # Check for expected success message
    assert "CLI Test Note" in result.stdout
    assert "Created" in result.stdout or "Updated" in result.stdout
    assert "permalink" in result.stdout


def test_write_note_with_tags(cli_env, project_config):
    """Test write_note command with tags."""
    result = runner.invoke(
        tool_app,
        [
            "write-note",
            "--title",
            "Tagged CLI Test Note",
            "--content",
            "This is a test note with tags",
            "--folder",
            "test",
            "--tags",
            "tag1",
            "--tags",
            "tag2",
        ],
    )
    assert result.exit_code == 0

    # Check for expected success message
    assert "Tagged CLI Test Note" in result.stdout
    assert "tag1, tag2" in result.stdout or "tag1" in result.stdout and "tag2" in result.stdout


def test_write_note_from_stdin(cli_env, project_config, monkeypatch):
    """Test write_note command reading from stdin.

    This test requires minimal mocking of stdin to simulate piped input.
    """
    test_content = "This is content from stdin for testing"

    # Mock stdin using monkeypatch, which works better with typer's CliRunner
    monkeypatch.setattr("sys.stdin", io.StringIO(test_content))
    monkeypatch.setattr("sys.stdin.isatty", lambda: False)  # Simulate piped input

    # Use runner.invoke with input parameter as a fallback
    result = runner.invoke(
        tool_app,
        [
            "write-note",
            "--title",
            "Stdin Test Note",
            "--folder",
            "test",
        ],
        input=test_content,  # Provide input as a fallback
    )

    assert result.exit_code == 0

    # Check for expected success message
    assert "Stdin Test Note" in result.stdout
    assert "Created" in result.stdout or "Updated" in result.stdout
    assert "permalink" in result.stdout


def test_write_note_content_param_priority(cli_env, project_config):
    """Test that content parameter has priority over stdin."""
    stdin_content = "This content from stdin should NOT be used"
    param_content = "This explicit content parameter should be used"

    # Mock stdin but provide explicit content parameter
    with (
        patch("sys.stdin", io.StringIO(stdin_content)),
        patch("sys.stdin.isatty", return_value=False),
    ):  # Simulate piped input
        result = runner.invoke(
            tool_app,
            [
                "write-note",
                "--title",
                "Priority Test Note",
                "--content",
                param_content,
                "--folder",
                "test",
            ],
        )

        assert result.exit_code == 0

        # Check the note was created with the content from parameter, not stdin
        # We can't directly check file contents in this test approach
        # but we can verify the command succeeded
        assert "Priority Test Note" in result.stdout
        assert "Created" in result.stdout or "Updated" in result.stdout


def test_write_note_no_content(cli_env, project_config):
    """Test error handling when no content is provided."""
    # Mock stdin to appear as a terminal, not a pipe
    with patch("sys.stdin.isatty", return_value=True):
        result = runner.invoke(
            tool_app,
            [
                "write-note",
                "--title",
                "No Content Note",
                "--folder",
                "test",
            ],
        )

        # Should exit with an error
        assert result.exit_code == 1
        # assert "No content provided" in result.stderr


def test_read_note(cli_env, setup_test_note):
    """Test read_note command."""
    permalink = setup_test_note["permalink"]

    result = runner.invoke(
        tool_app,
        ["read-note", permalink],
    )
    assert result.exit_code == 0

    # Should contain the note content and structure
    assert "Test Note" in result.stdout
    assert "This is a test note for CLI commands" in result.stdout
    assert "## Observations" in result.stdout
    assert "Test observation" in result.stdout
    assert "## Relations" in result.stdout
    assert "connects_to [[Another Note]]" in result.stdout

    # Note: We found that square brackets like [tech] are being stripped in CLI output,
    # so we're not asserting their presence


def test_search_basic(cli_env, setup_test_note, test_project):
    """Test basic search command."""
    result = runner.invoke(
        tool_app,
        ["search-notes", "test observation", "--project", test_project.name],
    )
    assert result.exit_code == 0

    # Result should be JSON containing our test note
    search_result = json.loads(result.stdout)
    assert len(search_result["results"]) > 0

    # At least one result should match our test note or observation
    found = False
    for item in search_result["results"]:
        if "test" in item["permalink"].lower() and "observation" in item["permalink"].lower():
            found = True
            break

    assert found, "Search did not find the test observation"


def test_search_permalink(cli_env, setup_test_note):
    """Test search with permalink flag."""
    permalink = setup_test_note["permalink"]

    result = runner.invoke(
        tool_app,
        ["search-notes", permalink, "--permalink"],
    )
    assert result.exit_code == 0

    # Result should be JSON containing our test note
    search_result = json.loads(result.stdout)
    assert len(search_result["results"]) > 0

    # Should find a result with matching permalink
    found = False
    for item in search_result["results"]:
        if item["permalink"] == permalink:
            found = True
            break

    assert found, "Search did not find the note by permalink"


def test_build_context(cli_env, setup_test_note):
    """Test build_context command."""
    permalink = setup_test_note["permalink"]

    result = runner.invoke(
        tool_app,
        ["build-context", f"memory://{permalink}"],
    )
    assert result.exit_code == 0

    # Result should be JSON containing our test note
    context_result = json.loads(result.stdout)
    assert "results" in context_result
    assert len(context_result["results"]) > 0

    # Primary results should include our test note
    found = False
    for item in context_result["results"]:
        if item["primary_result"]["permalink"] == permalink:
            found = True
            break

    assert found, "Context did not include the test note"


def test_build_context_with_options(cli_env, setup_test_note):
    """Test build_context command with all options."""
    permalink = setup_test_note["permalink"]

    result = runner.invoke(
        tool_app,
        [
            "build-context",
            f"memory://{permalink}",
            "--depth",
            "2",
            "--timeframe",
            "1d",
            "--page",
            "1",
            "--page-size",
            "5",
            "--max-related",
            "20",
        ],
    )
    assert result.exit_code == 0

    # Result should be JSON containing our test note
    context_result = json.loads(result.stdout)

    # Check that metadata reflects our options
    assert context_result["metadata"]["depth"] == 2
    timeframe = datetime.fromisoformat(context_result["metadata"]["timeframe"])
    assert datetime.now().astimezone() - timeframe <= timedelta(
        days=2
    )  # Compare timezone-aware datetimes

    # Results should include our test note
    found = False
    for item in context_result["results"]:
        if item["primary_result"]["permalink"] == permalink:
            found = True
            break

    assert found, "Context did not include the test note"


def test_build_context_string_depth_parameter(cli_env, setup_test_note):
    """Test build_context command handles string depth parameter correctly."""
    permalink = setup_test_note["permalink"]

    # Test valid string depth parameter - Typer should convert it to int
    result = runner.invoke(
        tool_app,
        [
            "build-context",
            f"memory://{permalink}",
            "--depth",
            "2",  # This is always a string from CLI
        ],
    )
    assert result.exit_code == 0

    # Result should be JSON containing our test note with correct depth
    context_result = json.loads(result.stdout)
    assert context_result["metadata"]["depth"] == 2

    # Test invalid string depth parameter - should fail with Typer validation error
    result = runner.invoke(
        tool_app,
        [
            "build-context",
            f"memory://{permalink}",
            "--depth",
            "invalid",
        ],
    )
    assert result.exit_code == 2  # Typer exits with code 2 for parameter validation errors
    # Typer should show a usage error for invalid integer
    assert (
        "invalid" in result.stderr
        and "is not a valid" in result.stderr
        and "integer" in result.stderr
    )


# The get-entity CLI command was removed when tools were refactored
# into separate files with improved error handling


def test_recent_activity(cli_env, setup_test_note, test_project):
    """Test recent_activity command with defaults."""
    result = runner.invoke(
        tool_app,
        ["recent-activity"],
    )
    assert result.exit_code == 0

    # Result should be human-readable string containing recent activity
    output = result.stdout
    assert "Recent Activity Summary" in output
    assert "Most Active Project:" in output or "Other Active Projects:" in output

    # Our test note should be referenced in the output
    assert setup_test_note["permalink"] in output or setup_test_note["title"] in output


def test_recent_activity_with_options(cli_env, setup_test_note, test_project):
    """Test recent_activity command with options."""
    result = runner.invoke(
        tool_app,
        [
            "recent-activity",
            "--type",
            "entity",
            "--depth",
            "2",
            "--timeframe",
            "7d",
        ],
    )
    assert result.exit_code == 0

    # Result should be human-readable string containing recent activity
    output = result.stdout
    assert "Recent Activity Summary" in output
    assert "Most Active Project:" in output or "Other Active Projects:" in output

    # Should include information about entities since we requested entity type
    assert setup_test_note["permalink"] in output or setup_test_note["title"] in output


def test_continue_conversation(cli_env, setup_test_note):
    """Test continue_conversation command."""
    permalink = setup_test_note["permalink"]

    # Run the CLI command
    result = runner.invoke(
        tool_app,
        ["continue-conversation", "--topic", "Test Note"],
    )
    assert result.exit_code == 0

    # Check result contains expected content
    assert "Continuing conversation on: Test Note" in result.stdout
    assert "This is a memory retrieval session" in result.stdout
    assert "read_note" in result.stdout
    assert permalink in result.stdout


def test_continue_conversation_no_results(cli_env):
    """Test continue_conversation command with no results."""
    # Run the CLI command with a nonexistent topic
    result = runner.invoke(
        tool_app,
        ["continue-conversation", "--topic", "NonexistentTopic"],
    )
    assert result.exit_code == 0

    # Check result contains expected content for no results
    assert "Continuing conversation on: NonexistentTopic" in result.stdout
    assert "The supplied query did not return any information" in result.stdout


@patch("basic_memory.services.initialization.initialize_database")
def test_ensure_migrations_functionality(mock_initialize_database, app_config, monkeypatch):
    """Test the database initialization functionality."""
    from basic_memory.services.initialization import ensure_initialization

    # Call the function
    ensure_initialization(app_config)

    # The underlying asyncio.run should call our mocked function
    mock_initialize_database.assert_called_once()


@patch("basic_memory.services.initialization.initialize_database")
def test_ensure_migrations_handles_errors(mock_initialize_database, app_config, monkeypatch):
    """Test that initialization handles errors gracefully."""
    from basic_memory.services.initialization import ensure_initialization

    # Configure mock to raise an exception
    mock_initialize_database.side_effect = Exception("Test error")

    # Call the function - should not raise exception
    ensure_initialization(app_config)

    # We're just making sure it doesn't crash by calling it

```

--------------------------------------------------------------------------------
/tests/conftest.py:
--------------------------------------------------------------------------------

```python
"""Common test fixtures."""

from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from textwrap import dedent
from typing import AsyncGenerator

import os
import pytest
import pytest_asyncio
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker

from basic_memory import db
from basic_memory.config import ProjectConfig, BasicMemoryConfig, ConfigManager
from basic_memory.db import DatabaseType
from basic_memory.markdown import EntityParser
from basic_memory.markdown.markdown_processor import MarkdownProcessor
from basic_memory.models import Base
from basic_memory.models.knowledge import Entity
from basic_memory.models.project import Project
from basic_memory.repository.entity_repository import EntityRepository
from basic_memory.repository.observation_repository import ObservationRepository
from basic_memory.repository.project_repository import ProjectRepository
from basic_memory.repository.relation_repository import RelationRepository
from basic_memory.repository.search_repository import SearchRepository
from basic_memory.schemas.base import Entity as EntitySchema
from basic_memory.services import (
    EntityService,
    ProjectService,
)
from basic_memory.services.directory_service import DirectoryService
from basic_memory.services.file_service import FileService
from basic_memory.services.link_resolver import LinkResolver
from basic_memory.services.search_service import SearchService
from basic_memory.sync.sync_service import SyncService
from basic_memory.sync.watch_service import WatchService


@pytest.fixture
def anyio_backend():
    return "asyncio"


@pytest.fixture
def project_root() -> Path:
    return Path(__file__).parent.parent


@pytest.fixture
def config_home(tmp_path, monkeypatch) -> Path:
    # Patch HOME environment variable for the duration of the test
    monkeypatch.setenv("HOME", str(tmp_path))
    # On Windows, also set USERPROFILE
    if os.name == "nt":
        monkeypatch.setenv("USERPROFILE", str(tmp_path))
    # Set BASIC_MEMORY_HOME to the test directory
    monkeypatch.setenv("BASIC_MEMORY_HOME", str(tmp_path / "basic-memory"))
    return tmp_path


@pytest.fixture(scope="function", autouse=True)
def app_config(config_home, tmp_path, monkeypatch) -> BasicMemoryConfig:
    """Create test app configuration."""
    # Create a basic config without depending on test_project to avoid circular dependency
    projects = {"test-project": str(config_home)}
    app_config = BasicMemoryConfig(
        env="test",
        projects=projects,
        default_project="test-project",
        update_permalinks_on_move=True,
    )

    return app_config


@pytest.fixture(autouse=True)
def config_manager(
    app_config: BasicMemoryConfig, project_config: ProjectConfig, config_home: Path, monkeypatch
) -> ConfigManager:
    # Invalidate config cache to ensure clean state for each test
    from basic_memory import config as config_module

    config_module._CONFIG_CACHE = None

    # Create a new ConfigManager that uses the test home directory
    config_manager = ConfigManager()
    # Update its paths to use the test directory
    config_manager.config_dir = config_home / ".basic-memory"
    config_manager.config_file = config_manager.config_dir / "config.json"
    config_manager.config_dir.mkdir(parents=True, exist_ok=True)

    # Ensure the config file is written to disk
    config_manager.save_config(app_config)
    return config_manager


@pytest.fixture(scope="function", autouse=True)
def project_config(test_project):
    """Create test project configuration."""

    project_config = ProjectConfig(
        name=test_project.name,
        home=Path(test_project.path),
    )

    return project_config


@dataclass
class TestConfig:
    config_home: Path
    project_config: ProjectConfig
    app_config: BasicMemoryConfig
    config_manager: ConfigManager


@pytest.fixture
def test_config(config_home, project_config, app_config, config_manager) -> TestConfig:
    """All test configuration fixtures"""
    return TestConfig(config_home, project_config, app_config, config_manager)


@pytest_asyncio.fixture(scope="function")
async def engine_factory(
    app_config,
) -> AsyncGenerator[tuple[AsyncEngine, async_sessionmaker[AsyncSession]], None]:
    """Create an engine and session factory using an in-memory SQLite database."""
    async with db.engine_session_factory(
        db_path=app_config.database_path, db_type=DatabaseType.MEMORY
    ) as (engine, session_maker):
        # Create all tables for the DB the engine is connected to
        async with engine.begin() as conn:
            await conn.run_sync(Base.metadata.create_all)

        yield engine, session_maker


@pytest_asyncio.fixture
async def session_maker(engine_factory) -> async_sessionmaker[AsyncSession]:
    """Get session maker for tests."""
    _, session_maker = engine_factory
    return session_maker


## Repositories


@pytest_asyncio.fixture(scope="function")
async def entity_repository(
    session_maker: async_sessionmaker[AsyncSession], test_project: Project
) -> EntityRepository:
    """Create an EntityRepository instance with project context."""
    return EntityRepository(session_maker, project_id=test_project.id)


@pytest_asyncio.fixture(scope="function")
async def observation_repository(
    session_maker: async_sessionmaker[AsyncSession], test_project: Project
) -> ObservationRepository:
    """Create an ObservationRepository instance with project context."""
    return ObservationRepository(session_maker, project_id=test_project.id)


@pytest_asyncio.fixture(scope="function")
async def relation_repository(
    session_maker: async_sessionmaker[AsyncSession], test_project: Project
) -> RelationRepository:
    """Create a RelationRepository instance with project context."""
    return RelationRepository(session_maker, project_id=test_project.id)


@pytest_asyncio.fixture(scope="function")
async def project_repository(
    session_maker: async_sessionmaker[AsyncSession],
) -> ProjectRepository:
    """Create a ProjectRepository instance."""
    return ProjectRepository(session_maker)


@pytest_asyncio.fixture(scope="function")
async def test_project(config_home, engine_factory) -> Project:
    """Create a test project to be used as context for other repositories."""
    project_data = {
        "name": "test-project",
        "description": "Project used as context for tests",
        "path": str(config_home),
        "is_active": True,
        "is_default": True,  # Explicitly set as the default project (for cli operations)
    }
    engine, session_maker = engine_factory
    project_repository = ProjectRepository(session_maker)
    project = await project_repository.create(project_data)
    return project


## Services


@pytest_asyncio.fixture
async def entity_service(
    entity_repository: EntityRepository,
    observation_repository: ObservationRepository,
    relation_repository: RelationRepository,
    entity_parser: EntityParser,
    file_service: FileService,
    link_resolver: LinkResolver,
    app_config: BasicMemoryConfig,
) -> EntityService:
    """Create EntityService."""
    return EntityService(
        entity_parser=entity_parser,
        entity_repository=entity_repository,
        observation_repository=observation_repository,
        relation_repository=relation_repository,
        file_service=file_service,
        link_resolver=link_resolver,
        app_config=app_config,
    )


@pytest.fixture
def file_service(
    project_config: ProjectConfig, markdown_processor: MarkdownProcessor
) -> FileService:
    """Create FileService instance."""
    return FileService(project_config.home, markdown_processor)


@pytest.fixture
def markdown_processor(entity_parser: EntityParser) -> MarkdownProcessor:
    """Create writer instance."""
    return MarkdownProcessor(entity_parser)


@pytest.fixture
def link_resolver(entity_repository: EntityRepository, search_service: SearchService):
    """Create parser instance."""
    return LinkResolver(entity_repository, search_service)


@pytest.fixture
def entity_parser(project_config):
    """Create parser instance."""
    return EntityParser(project_config.home)


@pytest_asyncio.fixture
async def sync_service(
    app_config: BasicMemoryConfig,
    entity_service: EntityService,
    entity_parser: EntityParser,
    project_repository: ProjectRepository,
    entity_repository: EntityRepository,
    relation_repository: RelationRepository,
    search_service: SearchService,
    file_service: FileService,
) -> SyncService:
    """Create sync service for testing."""
    return SyncService(
        app_config=app_config,
        entity_service=entity_service,
        project_repository=project_repository,
        entity_repository=entity_repository,
        relation_repository=relation_repository,
        entity_parser=entity_parser,
        search_service=search_service,
        file_service=file_service,
    )


@pytest_asyncio.fixture
async def directory_service(entity_repository, project_config) -> DirectoryService:
    """Create directory service for testing."""
    return DirectoryService(
        entity_repository=entity_repository,
    )


@pytest_asyncio.fixture
async def search_repository(session_maker, test_project: Project):
    """Create SearchRepository instance with project context"""
    return SearchRepository(session_maker, project_id=test_project.id)


@pytest_asyncio.fixture(autouse=True)
async def init_search_index(search_service):
    await search_service.init_search_index()


@pytest_asyncio.fixture
async def search_service(
    search_repository: SearchRepository,
    entity_repository: EntityRepository,
    file_service: FileService,
) -> SearchService:
    """Create and initialize search service"""
    service = SearchService(search_repository, entity_repository, file_service)
    await service.init_search_index()
    return service


@pytest_asyncio.fixture(scope="function")
async def sample_entity(entity_repository: EntityRepository) -> Entity:
    """Create a sample entity for testing."""
    entity_data = {
        "project_id": entity_repository.project_id,
        "title": "Test Entity",
        "entity_type": "test",
        "permalink": "test/test-entity",
        "file_path": "test/test_entity.md",
        "content_type": "text/markdown",
        "created_at": datetime.now(timezone.utc),
        "updated_at": datetime.now(timezone.utc),
    }
    return await entity_repository.create(entity_data)


@pytest_asyncio.fixture
async def project_service(
    project_repository: ProjectRepository,
) -> ProjectService:
    """Create ProjectService with repository."""
    return ProjectService(repository=project_repository)


@pytest_asyncio.fixture
async def full_entity(sample_entity, entity_repository, file_service, entity_service) -> Entity:
    """Create a search test entity."""

    # Create test entity
    entity, created = await entity_service.create_or_update_entity(
        EntitySchema(
            title="Search_Entity",
            folder="test",
            entity_type="test",
            content=dedent("""
                ## Observations
                - [tech] Tech note
                - [design] Design note

                ## Relations
                - out1 [[Test Entity]]
                - out2 [[Test Entity]]
                """),
        )
    )
    return entity


@pytest_asyncio.fixture
async def test_graph(
    entity_repository,
    relation_repository,
    observation_repository,
    search_service,
    file_service,
    entity_service,
):
    """Create a test knowledge graph with entities, relations and observations."""

    # Create some test entities in reverse order so they will be linked
    deeper, _ = await entity_service.create_or_update_entity(
        EntitySchema(
            title="Deeper Entity",
            entity_type="deeper",
            folder="test",
            content=dedent("""
                # Deeper Entity
                """),
        )
    )

    deep, _ = await entity_service.create_or_update_entity(
        EntitySchema(
            title="Deep Entity",
            entity_type="deep",
            folder="test",
            content=dedent("""
                # Deep Entity
                - deeper_connection [[Deeper Entity]]
                """),
        )
    )

    connected_2, _ = await entity_service.create_or_update_entity(
        EntitySchema(
            title="Connected Entity 2",
            entity_type="test",
            folder="test",
            content=dedent("""
                # Connected Entity 2
                - deep_connection [[Deep Entity]]
                """),
        )
    )

    connected_1, _ = await entity_service.create_or_update_entity(
        EntitySchema(
            title="Connected Entity 1",
            entity_type="test",
            folder="test",
            content=dedent("""
                # Connected Entity 1
                - [note] Connected 1 note
                - connected_to [[Connected Entity 2]]
                """),
        )
    )

    root, _ = await entity_service.create_or_update_entity(
        EntitySchema(
            title="Root",
            entity_type="test",
            folder="test",
            content=dedent("""
                # Root Entity
                - [note] Root note 1
                - [tech] Root tech note
                - connects_to [[Connected Entity 1]]
                """),
        )
    )

    # get latest
    entities = await entity_repository.find_all()
    relations = await relation_repository.find_all()

    # Index everything for search
    for entity in entities:
        await search_service.index_entity(entity)

    return {
        "root": root,
        "connected1": connected_1,
        "connected2": connected_2,
        "deep": deep,
        "observations": [e.observations for e in entities],
        "relations": relations,
    }


@pytest.fixture
def watch_service(app_config: BasicMemoryConfig, project_repository) -> WatchService:
    return WatchService(app_config=app_config, project_repository=project_repository)


@pytest.fixture
def test_files(project_config, project_root) -> dict[str, Path]:
    """Copy test files into the project directory.

    Returns a dict mapping file names to their paths in the project dir.
    """
    # Source files relative to tests directory
    source_files = {
        "pdf": Path(project_root / "tests/Non-MarkdownFileSupport.pdf"),
        "image": Path(project_root / "tests/Screenshot.png"),
    }

    # Create copies in temp project directory
    project_files = {}
    for name, src_path in source_files.items():
        # Read source file
        content = src_path.read_bytes()

        # Create destination path and ensure parent dirs exist
        dest_path = project_config.home / src_path.name
        dest_path.parent.mkdir(parents=True, exist_ok=True)

        # Write file
        dest_path.write_bytes(content)
        project_files[name] = dest_path

    return project_files


@pytest_asyncio.fixture
async def synced_files(sync_service, project_config, test_files):
    # Initial sync - should create forward reference
    await sync_service.sync(project_config.home)
    return test_files

```

--------------------------------------------------------------------------------
/tests/utils/test_validate_project_path.py:
--------------------------------------------------------------------------------

```python
"""Tests for the validate_project_path security function."""

import pytest
from pathlib import Path

from basic_memory.utils import validate_project_path


class TestValidateProjectPathSafety:
    """Test that validate_project_path correctly identifies safe paths."""

    def test_valid_relative_paths(self, tmp_path):
        """Test that legitimate relative paths are allowed."""
        project_path = tmp_path / "project"
        project_path.mkdir()

        safe_paths = [
            "notes/meeting.md",
            "docs/readme.txt",
            "folder/subfolder/file.txt",
            "simple-file.md",
            "research/findings-2025.md",
            "projects/basic-memory/docs.md",
            "deep/nested/directory/structure/file.txt",
            "file-with-hyphens.md",
            "file_with_underscores.txt",
            "file123.md",
            "UPPERCASE.MD",
            "MixedCase.txt",
        ]

        for path in safe_paths:
            assert validate_project_path(path, project_path), (
                f"Safe path '{path}' should be allowed"
            )

    def test_empty_and_current_directory(self, tmp_path):
        """Test handling of empty paths and current directory references."""
        project_path = tmp_path / "project"
        project_path.mkdir()

        # Current directory should be safe
        assert validate_project_path(".", project_path)

        # Files in current directory should be safe
        assert validate_project_path("./file.txt", project_path)

    def test_nested_safe_paths(self, tmp_path):
        """Test deeply nested but safe paths."""
        project_path = tmp_path / "project"
        project_path.mkdir()

        nested_paths = [
            "level1/level2/level3/level4/file.txt",
            "very/deeply/nested/directory/structure/with/many/levels/file.md",
            "a/b/c/d/e/f/g/h/i/j/file.txt",
        ]

        for path in nested_paths:
            assert validate_project_path(path, project_path), (
                f"Nested path '{path}' should be allowed"
            )


class TestValidateProjectPathAttacks:
    """Test that validate_project_path blocks path traversal attacks."""

    def test_unix_path_traversal(self, tmp_path):
        """Test that Unix-style path traversal is blocked."""
        project_path = tmp_path / "project"
        project_path.mkdir()

        attack_paths = [
            "../",
            "../../",
            "../../../",
            "../etc/passwd",
            "../../etc/passwd",
            "../../../etc/passwd",
            "../../../../etc/passwd",
            "../../.env",
            "../../../home/user/.ssh/id_rsa",
            "../../../../var/log/auth.log",
            "../../.bashrc",
            "../../../etc/shadow",
        ]

        for path in attack_paths:
            assert not validate_project_path(path, project_path), (
                f"Attack path '{path}' should be blocked"
            )

    def test_windows_path_traversal(self, tmp_path):
        """Test that Windows-style path traversal is blocked."""
        project_path = tmp_path / "project"
        project_path.mkdir()

        attack_paths = [
            "..\\",
            "..\\..\\",
            "..\\..\\..\\",
            "..\\..\\..\\Windows\\System32\\config\\SAM",
            "..\\..\\..\\Users\\user\\.env",
            "..\\..\\..\\Windows\\System32\\drivers\\etc\\hosts",
            "..\\..\\Boot.ini",
            "\\Windows\\System32",
            "\\..\\..\\Windows",
        ]

        for path in attack_paths:
            assert not validate_project_path(path, project_path), (
                f"Windows attack path '{path}' should be blocked"
            )

    def test_mixed_traversal_patterns(self, tmp_path):
        """Test paths that mix legitimate content with traversal."""
        project_path = tmp_path / "project"
        project_path.mkdir()

        mixed_attacks = [
            "notes/../../../etc/passwd",
            "docs/../../.env",
            "folder/subfolder/../../../etc/passwd",
            "legitimate/path/../../.ssh/id_rsa",
            "notes/../../../home/user/.bashrc",
            "documents/../../Windows/System32/config/SAM",
        ]

        for path in mixed_attacks:
            assert not validate_project_path(path, project_path), (
                f"Mixed attack path '{path}' should be blocked"
            )

    def test_home_directory_access(self, tmp_path):
        """Test that home directory access patterns are blocked."""
        project_path = tmp_path / "project"
        project_path.mkdir()

        home_attacks = [
            "~/",
            "~/.env",
            "~/.ssh/id_rsa",
            "~/secrets.txt",
            "~/Documents/passwords.txt",
            "~\\AppData\\secrets",
            "~\\Desktop\\config.ini",
        ]

        for path in home_attacks:
            assert not validate_project_path(path, project_path), (
                f"Home directory attack '{path}' should be blocked"
            )

    def test_unc_and_network_paths(self, tmp_path):
        """Test that UNC and network paths are blocked."""
        project_path = tmp_path / "project"
        project_path.mkdir()

        network_attacks = [
            "\\\\server\\share",
            "\\\\192.168.1.100\\c$",
            "\\\\evil-server\\malicious-share\\file.exe",
            "\\\\localhost\\c$\\Windows\\System32",
        ]

        for path in network_attacks:
            assert not validate_project_path(path, project_path), (
                f"Network path attack '{path}' should be blocked"
            )

    def test_absolute_paths(self, tmp_path):
        """Test that absolute paths are blocked (if they contain traversal)."""
        project_path = tmp_path / "project"
        project_path.mkdir()

        # Note: Some absolute paths might be allowed by pathlib resolution,
        # but our function should catch traversal patterns first
        absolute_attacks = [
            "/etc/passwd",
            "/home/user/.env",
            "/var/log/auth.log",
            "/root/.ssh/id_rsa",
            "C:\\Windows\\System32\\config\\SAM",
            "C:\\Users\\user\\.env",
            "D:\\secrets\\config.json",
        ]

        for path in absolute_attacks:
            # These should be blocked either by traversal detection or pathlib resolution
            result = validate_project_path(path, project_path)
            assert not result, f"Absolute path '{path}' should be blocked"


class TestValidateProjectPathEdgeCases:
    """Test edge cases and error conditions."""

    def test_malformed_paths(self, tmp_path):
        """Test handling of malformed or unusual paths."""
        project_path = tmp_path / "project"
        project_path.mkdir()

        malformed_paths = [
            "",  # Empty string
            "   ",  # Whitespace only
            "\n",  # Newline
            "\t",  # Tab
            "\r\n",  # Windows line ending
            "file\x00name",  # Null byte (if it gets this far)
            "file\x01name",  # Other control characters
        ]

        for path in malformed_paths:
            # These should either be blocked or cause an exception that's handled
            try:
                result = validate_project_path(path, project_path)
                if path.strip():  # Non-empty paths with control chars should be blocked
                    assert not result, f"Malformed path '{repr(path)}' should be blocked"
            except (ValueError, OSError):
                # It's acceptable for these to raise exceptions
                pass

    def test_very_long_paths(self, tmp_path):
        """Test handling of very long paths."""
        project_path = tmp_path / "project"
        project_path.mkdir()

        # Create a very long but legitimate path
        long_path = "/".join(["verylongdirectoryname" * 10 for _ in range(10)])

        # Should handle long paths gracefully (either allow or reject based on filesystem limits)
        try:
            result = validate_project_path(long_path, project_path)
            # Result can be True or False, just shouldn't crash
            assert isinstance(result, bool)
        except (ValueError, OSError):
            # It's acceptable for very long paths to raise exceptions
            pass

    def test_nonexistent_project_path(self):
        """Test behavior when project path doesn't exist."""
        nonexistent_project = Path("/this/path/does/not/exist")

        # Should still be able to validate relative paths
        assert validate_project_path("notes/file.txt", nonexistent_project)
        assert not validate_project_path("../../../etc/passwd", nonexistent_project)

    def test_unicode_and_special_characters(self, tmp_path):
        """Test paths with Unicode and special characters."""
        project_path = tmp_path / "project"
        project_path.mkdir()

        unicode_paths = [
            "notes/文档.md",  # Chinese characters
            "docs/résumé.txt",  # Accented characters
            "files/naïve.md",  # Diaeresis
            "notes/café.txt",  # Acute accent
            "docs/日本語.md",  # Japanese
            "files/αβγ.txt",  # Greek
            "notes/файл.md",  # Cyrillic
        ]

        for path in unicode_paths:
            try:
                result = validate_project_path(path, project_path)
                assert isinstance(result, bool), f"Unicode path '{path}' should return boolean"
                # Unicode paths should generally be allowed if they don't contain traversal
                assert result, f"Unicode path '{path}' should be allowed"
            except (UnicodeError, OSError):
                # Some unicode handling issues might be acceptable
                pass

    def test_case_sensitivity(self, tmp_path):
        """Test case sensitivity of traversal detection."""
        project_path = tmp_path / "project"
        project_path.mkdir()

        # These should all be blocked regardless of case
        case_variations = [
            "../file.txt",
            "../FILE.TXT",
            "~/file.txt",
            "~/FILE.TXT",
        ]

        for path in case_variations:
            assert not validate_project_path(path, project_path), (
                f"Case variation '{path}' should be blocked"
            )

    def test_symbolic_link_behavior(self, tmp_path):
        """Test behavior with symbolic links (if supported by filesystem)."""
        project_path = tmp_path / "project"
        project_path.mkdir()

        # Create a directory outside the project
        outside_dir = tmp_path / "outside"
        outside_dir.mkdir()

        try:
            # Try to create a symlink inside the project pointing outside
            symlink_path = project_path / "symlink"
            symlink_path.symlink_to(outside_dir)

            # Paths through symlinks should be handled safely
            result = validate_project_path("symlink/file.txt", project_path)
            # The result can vary based on how pathlib handles symlinks,
            # but it shouldn't crash and should be a boolean
            assert isinstance(result, bool)

        except (OSError, NotImplementedError):
            # Symlinks might not be supported on this filesystem
            pytest.skip("Symbolic links not supported on this filesystem")

    def test_relative_path_edge_cases(self, tmp_path):
        """Test edge cases in relative path handling."""
        project_path = tmp_path / "project"
        project_path.mkdir()

        edge_cases = [
            ".",  # Current directory
            "./",  # Current directory with slash
            "./file.txt",  # File in current directory
            "./folder/file.txt",  # Nested file through current directory
            "folder/./file.txt",  # Current directory in middle of path
            "folder/subfolder/.",  # Current directory at end
        ]

        for path in edge_cases:
            result = validate_project_path(path, project_path)
            # These should generally be safe as they don't escape the project
            assert result, f"Relative path edge case '{path}' should be allowed"


class TestValidateProjectPathPerformance:
    """Test performance characteristics of path validation."""

    def test_performance_with_many_paths(self, tmp_path):
        """Test that validation performs reasonably with many paths."""
        project_path = tmp_path / "project"
        project_path.mkdir()

        # Test a mix of safe and dangerous paths
        test_paths = []

        # Add safe paths
        for i in range(100):
            test_paths.append(f"folder{i}/file{i}.txt")

        # Add dangerous paths
        for i in range(100):
            test_paths.append(f"../../../etc/passwd{i}")

        import time

        start_time = time.time()

        for path in test_paths:
            result = validate_project_path(path, project_path)
            assert isinstance(result, bool)

        end_time = time.time()

        # Should complete reasonably quickly (adjust threshold as needed)
        assert end_time - start_time < 1.0, "Path validation should be fast"


class TestValidateProjectPathIntegration:
    """Integration tests with real filesystem scenarios."""

    def test_with_actual_filesystem_structure(self, tmp_path):
        """Test validation with actual files and directories."""
        project_path = tmp_path / "project"
        project_path.mkdir()

        # Create some actual files and directories
        (project_path / "notes").mkdir()
        (project_path / "docs").mkdir()
        (project_path / "notes" / "meeting.md").write_text("# Meeting Notes")
        (project_path / "docs" / "readme.txt").write_text("README")

        # Test accessing existing files
        assert validate_project_path("notes/meeting.md", project_path)
        assert validate_project_path("docs/readme.txt", project_path)

        # Test accessing non-existent but safe paths
        assert validate_project_path("notes/new-file.md", project_path)
        assert validate_project_path("new-folder/file.txt", project_path)

        # Test that attacks are still blocked even with real filesystem
        assert not validate_project_path("../../../etc/passwd", project_path)
        assert not validate_project_path("notes/../../../etc/passwd", project_path)

    def test_project_path_resolution_accuracy(self, tmp_path):
        """Test that path resolution works correctly with real paths."""
        # Create a more complex directory structure
        base_path = tmp_path / "workspace"
        project_path = base_path / "my-project"
        sibling_path = base_path / "other-project"

        base_path.mkdir()
        project_path.mkdir()
        sibling_path.mkdir()

        # Create a sensitive file in the sibling directory
        (sibling_path / "secrets.txt").write_text("secret data")

        # Try to access the sibling directory through traversal
        attack_path = "../other-project/secrets.txt"
        assert not validate_project_path(attack_path, project_path)

        # Verify that legitimate access within project works
        assert validate_project_path("my-file.txt", project_path)
        assert validate_project_path("subdir/my-file.txt", project_path)

```
Page 8/17FirstPrevNextLast