This is page 10 of 17. Use http://codebase.md/basicmachines-co/basic-memory?lines=false&page={x} to view the full context.
# Directory Structure
```
├── .claude
│   ├── agents
│   │   ├── python-developer.md
│   │   └── system-architect.md
│   └── commands
│       ├── release
│       │   ├── beta.md
│       │   ├── changelog.md
│       │   ├── release-check.md
│       │   └── release.md
│       ├── spec.md
│       └── test-live.md
├── .dockerignore
├── .github
│   ├── dependabot.yml
│   ├── ISSUE_TEMPLATE
│   │   ├── bug_report.md
│   │   ├── config.yml
│   │   ├── documentation.md
│   │   └── feature_request.md
│   └── workflows
│       ├── claude-code-review.yml
│       ├── claude-issue-triage.yml
│       ├── claude.yml
│       ├── dev-release.yml
│       ├── docker.yml
│       ├── pr-title.yml
│       ├── release.yml
│       └── test.yml
├── .gitignore
├── .python-version
├── CHANGELOG.md
├── CITATION.cff
├── CLA.md
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── docker-compose.yml
├── Dockerfile
├── docs
│   ├── ai-assistant-guide-extended.md
│   ├── character-handling.md
│   ├── cloud-cli.md
│   └── Docker.md
├── justfile
├── LICENSE
├── llms-install.md
├── pyproject.toml
├── README.md
├── SECURITY.md
├── smithery.yaml
├── specs
│   ├── SPEC-1 Specification-Driven Development Process.md
│   ├── SPEC-10 Unified Deployment Workflow and Event Tracking.md
│   ├── SPEC-11 Basic Memory API Performance Optimization.md
│   ├── SPEC-12 OpenTelemetry Observability.md
│   ├── SPEC-13 CLI Authentication with Subscription Validation.md
│   ├── SPEC-14 Cloud Git Versioning & GitHub Backup.md
│   ├── SPEC-14- Cloud Git Versioning & GitHub Backup.md
│   ├── SPEC-15 Configuration Persistence via Tigris for Cloud Tenants.md
│   ├── SPEC-16 MCP Cloud Service Consolidation.md
│   ├── SPEC-17 Semantic Search with ChromaDB.md
│   ├── SPEC-18 AI Memory Management Tool.md
│   ├── SPEC-19 Sync Performance and Memory Optimization.md
│   ├── SPEC-2 Slash Commands Reference.md
│   ├── SPEC-20 Simplified Project-Scoped Rclone Sync.md
│   ├── SPEC-3 Agent Definitions.md
│   ├── SPEC-4 Notes Web UI Component Architecture.md
│   ├── SPEC-5 CLI Cloud Upload via WebDAV.md
│   ├── SPEC-6 Explicit Project Parameter Architecture.md
│   ├── SPEC-7 POC to spike Tigris Turso for local access to cloud data.md
│   ├── SPEC-8 TigrisFS Integration.md
│   ├── SPEC-9 Multi-Project Bidirectional Sync Architecture.md
│   ├── SPEC-9 Signed Header Tenant Information.md
│   └── SPEC-9-1 Follow-Ups- Conflict, Sync, and Observability.md
├── src
│   └── basic_memory
│       ├── __init__.py
│       ├── alembic
│       │   ├── alembic.ini
│       │   ├── env.py
│       │   ├── migrations.py
│       │   ├── script.py.mako
│       │   └── versions
│       │       ├── 3dae7c7b1564_initial_schema.py
│       │       ├── 502b60eaa905_remove_required_from_entity_permalink.py
│       │       ├── 5fe1ab1ccebe_add_projects_table.py
│       │       ├── 647e7a75e2cd_project_constraint_fix.py
│       │       ├── 9d9c1cb7d8f5_add_mtime_and_size_columns_to_entity_.py
│       │       ├── a1b2c3d4e5f6_fix_project_foreign_keys.py
│       │       ├── b3c3938bacdb_relation_to_name_unique_index.py
│       │       ├── cc7172b46608_update_search_index_schema.py
│       │       └── e7e1f4367280_add_scan_watermark_tracking_to_project.py
│       ├── api
│       │   ├── __init__.py
│       │   ├── app.py
│       │   ├── routers
│       │   │   ├── __init__.py
│       │   │   ├── directory_router.py
│       │   │   ├── importer_router.py
│       │   │   ├── knowledge_router.py
│       │   │   ├── management_router.py
│       │   │   ├── memory_router.py
│       │   │   ├── project_router.py
│       │   │   ├── prompt_router.py
│       │   │   ├── resource_router.py
│       │   │   ├── search_router.py
│       │   │   └── utils.py
│       │   └── template_loader.py
│       ├── cli
│       │   ├── __init__.py
│       │   ├── app.py
│       │   ├── auth.py
│       │   ├── commands
│       │   │   ├── __init__.py
│       │   │   ├── cloud
│       │   │   │   ├── __init__.py
│       │   │   │   ├── api_client.py
│       │   │   │   ├── bisync_commands.py
│       │   │   │   ├── cloud_utils.py
│       │   │   │   ├── core_commands.py
│       │   │   │   ├── rclone_commands.py
│       │   │   │   ├── rclone_config.py
│       │   │   │   ├── rclone_installer.py
│       │   │   │   ├── upload_command.py
│       │   │   │   └── upload.py
│       │   │   ├── command_utils.py
│       │   │   ├── db.py
│       │   │   ├── import_chatgpt.py
│       │   │   ├── import_claude_conversations.py
│       │   │   ├── import_claude_projects.py
│       │   │   ├── import_memory_json.py
│       │   │   ├── mcp.py
│       │   │   ├── project.py
│       │   │   ├── status.py
│       │   │   └── tool.py
│       │   └── main.py
│       ├── config.py
│       ├── db.py
│       ├── deps.py
│       ├── file_utils.py
│       ├── ignore_utils.py
│       ├── importers
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── chatgpt_importer.py
│       │   ├── claude_conversations_importer.py
│       │   ├── claude_projects_importer.py
│       │   ├── memory_json_importer.py
│       │   └── utils.py
│       ├── markdown
│       │   ├── __init__.py
│       │   ├── entity_parser.py
│       │   ├── markdown_processor.py
│       │   ├── plugins.py
│       │   ├── schemas.py
│       │   └── utils.py
│       ├── mcp
│       │   ├── __init__.py
│       │   ├── async_client.py
│       │   ├── project_context.py
│       │   ├── prompts
│       │   │   ├── __init__.py
│       │   │   ├── ai_assistant_guide.py
│       │   │   ├── continue_conversation.py
│       │   │   ├── recent_activity.py
│       │   │   ├── search.py
│       │   │   └── utils.py
│       │   ├── resources
│       │   │   ├── ai_assistant_guide.md
│       │   │   └── project_info.py
│       │   ├── server.py
│       │   └── tools
│       │       ├── __init__.py
│       │       ├── build_context.py
│       │       ├── canvas.py
│       │       ├── chatgpt_tools.py
│       │       ├── delete_note.py
│       │       ├── edit_note.py
│       │       ├── list_directory.py
│       │       ├── move_note.py
│       │       ├── project_management.py
│       │       ├── read_content.py
│       │       ├── read_note.py
│       │       ├── recent_activity.py
│       │       ├── search.py
│       │       ├── utils.py
│       │       ├── view_note.py
│       │       └── write_note.py
│       ├── models
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── knowledge.py
│       │   ├── project.py
│       │   └── search.py
│       ├── repository
│       │   ├── __init__.py
│       │   ├── entity_repository.py
│       │   ├── observation_repository.py
│       │   ├── project_info_repository.py
│       │   ├── project_repository.py
│       │   ├── relation_repository.py
│       │   ├── repository.py
│       │   └── search_repository.py
│       ├── schemas
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── cloud.py
│       │   ├── delete.py
│       │   ├── directory.py
│       │   ├── importer.py
│       │   ├── memory.py
│       │   ├── project_info.py
│       │   ├── prompt.py
│       │   ├── request.py
│       │   ├── response.py
│       │   ├── search.py
│       │   └── sync_report.py
│       ├── services
│       │   ├── __init__.py
│       │   ├── context_service.py
│       │   ├── directory_service.py
│       │   ├── entity_service.py
│       │   ├── exceptions.py
│       │   ├── file_service.py
│       │   ├── initialization.py
│       │   ├── link_resolver.py
│       │   ├── project_service.py
│       │   ├── search_service.py
│       │   └── service.py
│       ├── sync
│       │   ├── __init__.py
│       │   ├── background_sync.py
│       │   ├── sync_service.py
│       │   └── watch_service.py
│       ├── templates
│       │   └── prompts
│       │       ├── continue_conversation.hbs
│       │       └── search.hbs
│       └── utils.py
├── test-int
│   ├── BENCHMARKS.md
│   ├── cli
│   │   ├── test_project_commands_integration.py
│   │   └── test_version_integration.py
│   ├── conftest.py
│   ├── mcp
│   │   ├── test_build_context_underscore.py
│   │   ├── test_build_context_validation.py
│   │   ├── test_chatgpt_tools_integration.py
│   │   ├── test_default_project_mode_integration.py
│   │   ├── test_delete_note_integration.py
│   │   ├── test_edit_note_integration.py
│   │   ├── test_list_directory_integration.py
│   │   ├── test_move_note_integration.py
│   │   ├── test_project_management_integration.py
│   │   ├── test_project_state_sync_integration.py
│   │   ├── test_read_content_integration.py
│   │   ├── test_read_note_integration.py
│   │   ├── test_search_integration.py
│   │   ├── test_single_project_mcp_integration.py
│   │   └── test_write_note_integration.py
│   ├── test_db_wal_mode.py
│   ├── test_disable_permalinks_integration.py
│   └── test_sync_performance_benchmark.py
├── tests
│   ├── __init__.py
│   ├── api
│   │   ├── conftest.py
│   │   ├── test_async_client.py
│   │   ├── test_continue_conversation_template.py
│   │   ├── test_directory_router.py
│   │   ├── test_importer_router.py
│   │   ├── test_knowledge_router.py
│   │   ├── test_management_router.py
│   │   ├── test_memory_router.py
│   │   ├── test_project_router_operations.py
│   │   ├── test_project_router.py
│   │   ├── test_prompt_router.py
│   │   ├── test_relation_background_resolution.py
│   │   ├── test_resource_router.py
│   │   ├── test_search_router.py
│   │   ├── test_search_template.py
│   │   ├── test_template_loader_helpers.py
│   │   └── test_template_loader.py
│   ├── cli
│   │   ├── conftest.py
│   │   ├── test_cli_tools.py
│   │   ├── test_cloud_authentication.py
│   │   ├── test_ignore_utils.py
│   │   ├── test_import_chatgpt.py
│   │   ├── test_import_claude_conversations.py
│   │   ├── test_import_claude_projects.py
│   │   ├── test_import_memory_json.py
│   │   ├── test_project_add_with_local_path.py
│   │   └── test_upload.py
│   ├── conftest.py
│   ├── db
│   │   └── test_issue_254_foreign_key_constraints.py
│   ├── importers
│   │   ├── test_importer_base.py
│   │   └── test_importer_utils.py
│   ├── markdown
│   │   ├── __init__.py
│   │   ├── test_date_frontmatter_parsing.py
│   │   ├── test_entity_parser_error_handling.py
│   │   ├── test_entity_parser.py
│   │   ├── test_markdown_plugins.py
│   │   ├── test_markdown_processor.py
│   │   ├── test_observation_edge_cases.py
│   │   ├── test_parser_edge_cases.py
│   │   ├── test_relation_edge_cases.py
│   │   └── test_task_detection.py
│   ├── mcp
│   │   ├── conftest.py
│   │   ├── test_obsidian_yaml_formatting.py
│   │   ├── test_permalink_collision_file_overwrite.py
│   │   ├── test_prompts.py
│   │   ├── test_resources.py
│   │   ├── test_tool_build_context.py
│   │   ├── test_tool_canvas.py
│   │   ├── test_tool_delete_note.py
│   │   ├── test_tool_edit_note.py
│   │   ├── test_tool_list_directory.py
│   │   ├── test_tool_move_note.py
│   │   ├── test_tool_read_content.py
│   │   ├── test_tool_read_note.py
│   │   ├── test_tool_recent_activity.py
│   │   ├── test_tool_resource.py
│   │   ├── test_tool_search.py
│   │   ├── test_tool_utils.py
│   │   ├── test_tool_view_note.py
│   │   ├── test_tool_write_note.py
│   │   └── tools
│   │       └── test_chatgpt_tools.py
│   ├── Non-MarkdownFileSupport.pdf
│   ├── repository
│   │   ├── test_entity_repository_upsert.py
│   │   ├── test_entity_repository.py
│   │   ├── test_entity_upsert_issue_187.py
│   │   ├── test_observation_repository.py
│   │   ├── test_project_info_repository.py
│   │   ├── test_project_repository.py
│   │   ├── test_relation_repository.py
│   │   ├── test_repository.py
│   │   ├── test_search_repository_edit_bug_fix.py
│   │   └── test_search_repository.py
│   ├── schemas
│   │   ├── test_base_timeframe_minimum.py
│   │   ├── test_memory_serialization.py
│   │   ├── test_memory_url_validation.py
│   │   ├── test_memory_url.py
│   │   ├── test_schemas.py
│   │   └── test_search.py
│   ├── Screenshot.png
│   ├── services
│   │   ├── test_context_service.py
│   │   ├── test_directory_service.py
│   │   ├── test_entity_service_disable_permalinks.py
│   │   ├── test_entity_service.py
│   │   ├── test_file_service.py
│   │   ├── test_initialization.py
│   │   ├── test_link_resolver.py
│   │   ├── test_project_removal_bug.py
│   │   ├── test_project_service_operations.py
│   │   ├── test_project_service.py
│   │   └── test_search_service.py
│   ├── sync
│   │   ├── test_character_conflicts.py
│   │   ├── test_sync_service_incremental.py
│   │   ├── test_sync_service.py
│   │   ├── test_sync_wikilink_issue.py
│   │   ├── test_tmp_files.py
│   │   ├── test_watch_service_edge_cases.py
│   │   ├── test_watch_service_reload.py
│   │   └── test_watch_service.py
│   ├── test_config.py
│   ├── test_db_migration_deduplication.py
│   ├── test_deps.py
│   ├── test_production_cascade_delete.py
│   ├── test_rclone_commands.py
│   └── utils
│       ├── test_file_utils.py
│       ├── test_frontmatter_obsidian_compatible.py
│       ├── test_parse_tags.py
│       ├── test_permalink_formatting.py
│       ├── test_utf8_handling.py
│       └── test_validate_project_path.py
├── uv.lock
├── v0.15.0-RELEASE-DOCS.md
└── v15-docs
    ├── api-performance.md
    ├── background-relations.md
    ├── basic-memory-home.md
    ├── bug-fixes.md
    ├── chatgpt-integration.md
    ├── cloud-authentication.md
    ├── cloud-bisync.md
    ├── cloud-mode-usage.md
    ├── cloud-mount.md
    ├── default-project-mode.md
    ├── env-file-removal.md
    ├── env-var-overrides.md
    ├── explicit-project-parameter.md
    ├── gitignore-integration.md
    ├── project-root-env-var.md
    ├── README.md
    └── sqlite-performance.md
```
# Files
--------------------------------------------------------------------------------
/tests/schemas/test_schemas.py:
--------------------------------------------------------------------------------
```python
"""Tests for Pydantic schema validation and conversion."""
import os
import pytest
from datetime import datetime, timedelta
from pydantic import ValidationError, BaseModel
from basic_memory.schemas import (
    Entity,
    EntityResponse,
    Relation,
    SearchNodesRequest,
    GetEntitiesRequest,
    RelationResponse,
)
from basic_memory.schemas.request import EditEntityRequest
from basic_memory.schemas.base import to_snake_case, TimeFrame, parse_timeframe, validate_timeframe
def test_entity_project_name():
    """Test creating EntityIn with minimal required fields."""
    data = {"title": "Test Entity", "folder": "test", "entity_type": "knowledge"}
    entity = Entity.model_validate(data)
    assert entity.file_path == os.path.join("test", "Test Entity.md")
    assert entity.permalink == "test/test-entity"
    assert entity.entity_type == "knowledge"
def test_entity_project_id():
    """Test creating EntityIn with minimal required fields."""
    data = {"project": 2, "title": "Test Entity", "folder": "test", "entity_type": "knowledge"}
    entity = Entity.model_validate(data)
    assert entity.file_path == os.path.join("test", "Test Entity.md")
    assert entity.permalink == "test/test-entity"
    assert entity.entity_type == "knowledge"
def test_entity_non_markdown():
    """Test entity for regular non-markdown file."""
    data = {
        "title": "Test Entity.txt",
        "folder": "test",
        "entity_type": "file",
        "content_type": "text/plain",
    }
    entity = Entity.model_validate(data)
    assert entity.file_path == os.path.join("test", "Test Entity.txt")
    assert entity.permalink == "test/test-entity"
    assert entity.entity_type == "file"
def test_entity_in_validation():
    """Test validation errors for EntityIn."""
    with pytest.raises(ValidationError):
        Entity.model_validate({"entity_type": "test"})  # Missing required fields
def test_relation_in_validation():
    """Test RelationIn validation."""
    data = {"from_id": "test/123", "to_id": "test/456", "relation_type": "test"}
    relation = Relation.model_validate(data)
    assert relation.from_id == "test/123"
    assert relation.to_id == "test/456"
    assert relation.relation_type == "test"
    assert relation.context is None
    # With context
    data["context"] = "test context"
    relation = Relation.model_validate(data)
    assert relation.context == "test context"
    # Missing required fields
    with pytest.raises(ValidationError):
        Relation.model_validate({"from_id": "123", "to_id": "456"})  # Missing relationType
def test_relation_response():
    """Test RelationResponse validation."""
    data = {
        "permalink": "test/123/relates_to/test/456",
        "from_id": "test/123",
        "to_id": "test/456",
        "relation_type": "relates_to",
        "from_entity": {"permalink": "test/123"},
        "to_entity": {"permalink": "test/456"},
    }
    relation = RelationResponse.model_validate(data)
    assert relation.from_id == "test/123"
    assert relation.to_id == "test/456"
    assert relation.relation_type == "relates_to"
    assert relation.context is None
def test_entity_out_from_attributes():
    """Test EntityOut creation from database model attributes."""
    # Simulate database model attributes
    db_data = {
        "title": "Test Entity",
        "permalink": "test/test",
        "file_path": "test",
        "entity_type": "knowledge",
        "content_type": "text/markdown",
        "observations": [
            {
                "id": 1,
                "permalink": "permalink",
                "category": "note",
                "content": "test obs",
                "context": None,
            }
        ],
        "relations": [
            {
                "id": 1,
                "permalink": "test/test/relates_to/test/test",
                "from_id": "test/test",
                "to_id": "test/test",
                "relation_type": "relates_to",
                "context": None,
            }
        ],
        "created_at": "2023-01-01T00:00:00",
        "updated_at": "2023-01-01T00:00:00",
    }
    entity = EntityResponse.model_validate(db_data)
    assert entity.permalink == "test/test"
    assert len(entity.observations) == 1
    assert len(entity.relations) == 1
def test_entity_response_with_none_permalink():
    """Test EntityResponse can handle None permalink (fixes issue #170).
    This test ensures that EntityResponse properly validates when the permalink
    field is None, which can occur when markdown files don't have explicit
    permalinks in their frontmatter during edit operations.
    """
    # Simulate database model attributes with None permalink
    db_data = {
        "title": "Test Entity",
        "permalink": None,  # This should not cause validation errors
        "file_path": "test/test-entity.md",
        "entity_type": "note",
        "content_type": "text/markdown",
        "observations": [],
        "relations": [],
        "created_at": "2023-01-01T00:00:00",
        "updated_at": "2023-01-01T00:00:00",
    }
    # This should not raise a ValidationError
    entity = EntityResponse.model_validate(db_data)
    assert entity.permalink is None
    assert entity.title == "Test Entity"
    assert entity.file_path == "test/test-entity.md"
    assert entity.entity_type == "note"
    assert len(entity.observations) == 0
    assert len(entity.relations) == 0
def test_search_nodes_input():
    """Test SearchNodesInput validation."""
    search = SearchNodesRequest.model_validate({"query": "test query"})
    assert search.query == "test query"
    with pytest.raises(ValidationError):
        SearchNodesRequest.model_validate({})  # Missing required query
def test_open_nodes_input():
    """Test OpenNodesInput validation."""
    open_input = GetEntitiesRequest.model_validate({"permalinks": ["test/test", "test/test2"]})
    assert len(open_input.permalinks) == 2
    # Empty names list should fail
    with pytest.raises(ValidationError):
        GetEntitiesRequest.model_validate({"permalinks": []})
def test_path_sanitization():
    """Test to_snake_case() handles various inputs correctly."""
    test_cases = [
        ("BasicMemory", "basic_memory"),  # CamelCase
        ("Memory Service", "memory_service"),  # Spaces
        ("memory-service", "memory_service"),  # Hyphens
        ("Memory_Service", "memory_service"),  # Already has underscore
        ("API2Service", "api2_service"),  # Numbers
        ("  Spaces  ", "spaces"),  # Extra spaces
        ("mixedCase", "mixed_case"),  # Mixed case
        ("snake_case_already", "snake_case_already"),  # Already snake case
        ("ALLCAPS", "allcaps"),  # All caps
        ("with.dots", "with_dots"),  # Dots
    ]
    for input_str, expected in test_cases:
        result = to_snake_case(input_str)
        assert result == expected, f"Failed for input: {input_str}"
def test_permalink_generation():
    """Test permalink property generates correct paths."""
    test_cases = [
        ({"title": "BasicMemory", "folder": "test"}, "test/basic-memory"),
        ({"title": "Memory Service", "folder": "test"}, "test/memory-service"),
        ({"title": "API Gateway", "folder": "test"}, "test/api-gateway"),
        ({"title": "TestCase1", "folder": "test"}, "test/test-case1"),
        ({"title": "TestCaseRoot", "folder": ""}, "test-case-root"),
    ]
    for input_data, expected_path in test_cases:
        entity = Entity.model_validate(input_data)
        assert entity.permalink == expected_path, f"Failed for input: {input_data}"
@pytest.mark.parametrize(
    "timeframe,expected_valid",
    [
        ("7d", True),
        ("yesterday", True),
        ("2 days ago", True),
        ("last week", True),
        ("3 weeks ago", True),
        ("invalid", False),
        # NOTE: "tomorrow" and "next week" now return 1 day ago due to timezone safety
        # They no longer raise errors - this is intentional for remote MCP
        ("tomorrow", True),  # Now valid - returns 1 day ago
        ("next week", True),  # Now valid - returns 1 day ago
        ("", False),
        ("0d", True),
        ("366d", False),
        (1, False),
    ],
)
def test_timeframe_validation(timeframe: str, expected_valid: bool):
    """Test TimeFrame validation directly."""
    class TimeFrameModel(BaseModel):
        timeframe: TimeFrame
    if expected_valid:
        try:
            tf = TimeFrameModel.model_validate({"timeframe": timeframe})
            assert isinstance(tf.timeframe, str)
        except ValueError as e:
            pytest.fail(f"TimeFrame failed to validate '{timeframe}' with error: {e}")
    else:
        with pytest.raises(ValueError):
            tf = TimeFrameModel.model_validate({"timeframe": timeframe})
def test_edit_entity_request_validation():
    """Test EditEntityRequest validation for operation-specific parameters."""
    # Valid request - append operation
    edit_request = EditEntityRequest.model_validate(
        {"operation": "append", "content": "New content to append"}
    )
    assert edit_request.operation == "append"
    assert edit_request.content == "New content to append"
    # Valid request - find_replace operation with required find_text
    edit_request = EditEntityRequest.model_validate(
        {"operation": "find_replace", "content": "replacement text", "find_text": "text to find"}
    )
    assert edit_request.operation == "find_replace"
    assert edit_request.find_text == "text to find"
    # Valid request - replace_section operation with required section
    edit_request = EditEntityRequest.model_validate(
        {"operation": "replace_section", "content": "new section content", "section": "## Header"}
    )
    assert edit_request.operation == "replace_section"
    assert edit_request.section == "## Header"
    # Test that the validators return the value when validation passes
    # This ensures the `return v` statements are covered
    edit_request = EditEntityRequest.model_validate(
        {
            "operation": "find_replace",
            "content": "replacement",
            "find_text": "valid text",
            "section": "## Valid Section",
        }
    )
    assert edit_request.find_text == "valid text"  # Covers line 88 (return v)
    assert edit_request.section == "## Valid Section"  # Covers line 80 (return v)
def test_edit_entity_request_find_replace_empty_find_text():
    """Test that find_replace operation requires non-empty find_text parameter."""
    with pytest.raises(
        ValueError, match="find_text parameter is required for find_replace operation"
    ):
        EditEntityRequest.model_validate(
            {
                "operation": "find_replace",
                "content": "replacement text",
                "find_text": "",  # Empty string triggers validation
            }
        )
def test_edit_entity_request_replace_section_empty_section():
    """Test that replace_section operation requires non-empty section parameter."""
    with pytest.raises(
        ValueError, match="section parameter is required for replace_section operation"
    ):
        EditEntityRequest.model_validate(
            {
                "operation": "replace_section",
                "content": "new content",
                "section": "",  # Empty string triggers validation
            }
        )
# New tests for timeframe parsing functions
class TestTimeframeParsing:
    """Test cases for parse_timeframe() and validate_timeframe() functions."""
    def test_parse_timeframe_today(self):
        """Test that parse_timeframe('today') returns 1 day ago for remote MCP timezone safety."""
        result = parse_timeframe("today")
        now = datetime.now()
        one_day_ago = now - timedelta(days=1)
        # Should be approximately 1 day ago (within a second for test tolerance)
        diff = abs((result.replace(tzinfo=None) - one_day_ago).total_seconds())
        assert diff < 2, f"Expected ~1 day ago for 'today', got {result}"
        assert result.tzinfo is not None
    def test_parse_timeframe_today_case_insensitive(self):
        """Test that parse_timeframe handles 'today' case-insensitively."""
        test_cases = ["today", "TODAY", "Today", "ToDay"]
        now = datetime.now()
        one_day_ago = now - timedelta(days=1)
        for case in test_cases:
            result = parse_timeframe(case)
            # Should be approximately 1 day ago (within a second for test tolerance)
            diff = abs((result.replace(tzinfo=None) - one_day_ago).total_seconds())
            assert diff < 2, f"Expected ~1 day ago for '{case}', got {result}"
    def test_parse_timeframe_other_formats(self):
        """Test that parse_timeframe works with other dateparser formats."""
        now = datetime.now().astimezone()
        # Test 1d ago - should be approximately 24 hours ago
        result_1d = parse_timeframe("1d")
        expected_1d = now - timedelta(days=1)
        diff = abs((result_1d - expected_1d).total_seconds())
        assert diff < 3600  # Within 1 hour tolerance (accounts for DST transitions)
        assert result_1d.tzinfo is not None
        # Test yesterday - should be yesterday at same time
        result_yesterday = parse_timeframe("yesterday")
        # dateparser returns yesterday at current time, not start of yesterday
        assert result_yesterday.date() == (now.date() - timedelta(days=1))
        assert result_yesterday.tzinfo is not None
        # Test 1 week ago
        result_week = parse_timeframe("1 week ago")
        expected_week = now - timedelta(weeks=1)
        diff = abs((result_week - expected_week).total_seconds())
        assert diff < 3600  # Within 1 hour tolerance
        assert result_week.tzinfo is not None
    def test_parse_timeframe_invalid(self):
        """Test that parse_timeframe raises ValueError for invalid input."""
        with pytest.raises(ValueError, match="Could not parse timeframe: invalid-timeframe"):
            parse_timeframe("invalid-timeframe")
        with pytest.raises(ValueError, match="Could not parse timeframe: not-a-date"):
            parse_timeframe("not-a-date")
    def test_validate_timeframe_preserves_special_cases(self):
        """Test that validate_timeframe preserves special timeframe strings."""
        # Should preserve 'today' as-is
        result = validate_timeframe("today")
        assert result == "today"
        # Should preserve case-normalized version
        result = validate_timeframe("TODAY")
        assert result == "today"
        result = validate_timeframe("Today")
        assert result == "today"
    def test_validate_timeframe_converts_regular_formats(self):
        """Test that validate_timeframe converts regular formats to duration."""
        # Test 1d format (should return as-is since it's already in standard format)
        result = validate_timeframe("1d")
        assert result == "1d"
        # Test other formats get converted to days
        result = validate_timeframe("yesterday")
        assert result == "1d"  # Yesterday is 1 day ago
        # Test week format
        result = validate_timeframe("1 week ago")
        assert result == "7d"  # 1 week = 7 days
    def test_validate_timeframe_error_cases(self):
        """Test that validate_timeframe raises appropriate errors."""
        # Invalid type
        with pytest.raises(ValueError, match="Timeframe must be a string"):
            validate_timeframe(123)  # type: ignore
        # NOTE: Future timeframes no longer raise errors due to 1-day minimum enforcement
        # "tomorrow" and "next week" now return 1 day ago for timezone safety
        # This is intentional for remote MCP deployments
        # Too far in past (>365 days)
        with pytest.raises(ValueError, match="Timeframe should be <= 1 year"):
            validate_timeframe("2 years ago")
        # Invalid format that can't be parsed
        with pytest.raises(ValueError, match="Could not parse timeframe"):
            validate_timeframe("not-a-real-timeframe")
    def test_timeframe_annotation_with_today(self):
        """Test that TimeFrame annotation works correctly with 'today'."""
        class TestModel(BaseModel):
            timeframe: TimeFrame
        # Should preserve 'today'
        model = TestModel(timeframe="today")
        assert model.timeframe == "today"
        # Should work with other formats
        model = TestModel(timeframe="1d")
        assert model.timeframe == "1d"
        model = TestModel(timeframe="yesterday")
        assert model.timeframe == "1d"
    def test_timeframe_integration_today_vs_1d(self):
        """Test that 'today' and '1d' both return 1 day ago due to timezone safety minimum."""
        class TestModel(BaseModel):
            timeframe: TimeFrame
        # 'today' should be preserved as special case in validation
        today_model = TestModel(timeframe="today")
        assert today_model.timeframe == "today"
        # '1d' should also be preserved (it's already in standard format)
        oneday_model = TestModel(timeframe="1d")
        assert oneday_model.timeframe == "1d"
        # When parsed by parse_timeframe, both should return approximately 1 day ago
        # due to the 1-day minimum enforcement for remote MCP timezone safety
        today_parsed = parse_timeframe("today")
        oneday_parsed = parse_timeframe("1d")
        now = datetime.now()
        one_day_ago = now - timedelta(days=1)
        # Both should be approximately 1 day ago
        today_diff = abs((today_parsed.replace(tzinfo=None) - one_day_ago).total_seconds())
        assert today_diff < 60, f"'today' should be ~1 day ago, got {today_parsed}"
        oneday_diff = abs((oneday_parsed.replace(tzinfo=None) - one_day_ago).total_seconds())
        assert oneday_diff < 60, f"'1d' should be ~1 day ago, got {oneday_parsed}"
        # They should be approximately the same time (within an hour due to parsing differences)
        time_diff = abs((today_parsed - oneday_parsed).total_seconds())
        assert time_diff < 3600, f"'today' and '1d' should be similar times, diff: {time_diff}s"
```
--------------------------------------------------------------------------------
/test-int/mcp/test_write_note_integration.py:
--------------------------------------------------------------------------------
```python
"""
Integration tests for write_note MCP tool.
Comprehensive tests covering all scenarios including note creation, content formatting,
tag handling, error conditions, and edge cases from bug reports.
"""
from textwrap import dedent
import pytest
from fastmcp import Client
from unittest.mock import patch
from basic_memory.config import ConfigManager
@pytest.mark.asyncio
async def test_write_note_basic_creation(mcp_server, app, test_project):
    """Test creating a simple note with basic content."""
    async with Client(mcp_server) as client:
        result = await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Simple Note",
                "folder": "basic",
                "content": "# Simple Note\n\nThis is a simple note for testing.",
                "tags": "simple,test",
            },
        )
        assert len(result.content) == 1
        assert result.content[0].type == "text"
        response_text = result.content[0].text
        assert "# Created note" in response_text
        assert f"project: {test_project.name}" in response_text
        assert "file_path: basic/Simple Note.md" in response_text
        assert "permalink: basic/simple-note" in response_text
        assert "## Tags" in response_text
        assert "- simple, test" in response_text
        assert f"[Session: Using project '{test_project.name}']" in response_text
@pytest.mark.asyncio
async def test_write_note_no_tags(mcp_server, app, test_project):
    """Test creating a note without tags."""
    async with Client(mcp_server) as client:
        result = await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "No Tags Note",
                "folder": "test",
                "content": "Just some plain text without tags.",
            },
        )
        assert len(result.content) == 1
        assert result.content[0].type == "text"
        response_text = result.content[0].text
        assert "# Created note" in response_text
        assert "file_path: test/No Tags Note.md" in response_text
        assert "permalink: test/no-tags-note" in response_text
        # Should not have tags section when no tags provided
@pytest.mark.asyncio
async def test_write_note_update_existing(mcp_server, app, test_project):
    """Test updating an existing note."""
    async with Client(mcp_server) as client:
        # Create initial note
        result1 = await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Update Test",
                "folder": "test",
                "content": "# Update Test\n\nOriginal content.",
                "tags": "original",
            },
        )
        assert "# Created note" in result1.content[0].text  # pyright: ignore [reportAttributeAccessIssue]
        # Update the same note
        result2 = await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Update Test",
                "folder": "test",
                "content": "# Update Test\n\nUpdated content with changes.",
                "tags": "updated,modified",
            },
        )
        assert len(result2.content) == 1
        assert result2.content[0].type == "text"
        response_text = result2.content[0].text
        assert "# Updated note" in response_text
        assert f"project: {test_project.name}" in response_text
        assert "file_path: test/Update Test.md" in response_text
        assert "permalink: test/update-test" in response_text
        assert "- updated, modified" in response_text
        assert f"[Session: Using project '{test_project.name}']" in response_text
@pytest.mark.asyncio
async def test_write_note_tag_array(mcp_server, app, test_project):
    """Test creating a note with tag array (Issue #38 regression test)."""
    async with Client(mcp_server) as client:
        # This reproduces the exact bug from Issue #38
        result = await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Array Tags Test",
                "folder": "test",
                "content": "Testing tag array handling",
                "tags": ["python", "testing", "integration", "mcp"],
            },
        )
        assert len(result.content) == 1
        assert result.content[0].type == "text"
        response_text = result.content[0].text
        assert "# Created note" in response_text
        assert f"project: {test_project.name}" in response_text
        assert "file_path: test/Array Tags Test.md" in response_text
        assert "permalink: test/array-tags-test" in response_text
        assert "## Tags" in response_text
        assert "python" in response_text
        assert f"[Session: Using project '{test_project.name}']" in response_text
@pytest.mark.asyncio
async def test_write_note_custom_permalink(mcp_server, app, test_project):
    """Test custom permalink handling (Issue #93 regression test)."""
    async with Client(mcp_server) as client:
        content_with_custom_permalink = dedent("""
            ---
            permalink: custom/my-special-permalink
            ---
            # Custom Permalink Note
            This note has a custom permalink in frontmatter.
            - [note] Testing custom permalink preservation
        """).strip()
        result = await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Custom Permalink Note",
                "folder": "notes",
                "content": content_with_custom_permalink,
            },
        )
        assert len(result.content) == 1
        assert result.content[0].type == "text"
        response_text = result.content[0].text
        assert "# Created note" in response_text
        assert f"project: {test_project.name}" in response_text
        assert "file_path: notes/Custom Permalink Note.md" in response_text
        assert "permalink: custom/my-special-permalink" in response_text
        assert f"[Session: Using project '{test_project.name}']" in response_text
@pytest.mark.asyncio
async def test_write_note_unicode_content(mcp_server, app, test_project):
    """Test handling Unicode content including emojis."""
    async with Client(mcp_server) as client:
        unicode_content = "# Unicode Test 🚀\n\nThis note has emoji 🎉 and unicode ♠♣♥♦\n\n- [note] Testing unicode handling 测试"
        result = await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Unicode Test 🌟",
                "folder": "test",
                "content": unicode_content,
                "tags": "unicode,emoji,测试",
            },
        )
        assert len(result.content) == 1
        assert result.content[0].type == "text"
        response_text = result.content[0].text
        assert "# Created note" in response_text
        assert f"project: {test_project.name}" in response_text
        assert "file_path: test/Unicode Test 🌟.md" in response_text
        # Permalink should be sanitized
        assert "permalink: test/unicode-test" in response_text
        assert "## Tags" in response_text
        assert f"[Session: Using project '{test_project.name}']" in response_text
@pytest.mark.asyncio
async def test_write_note_complex_content_with_observations_relations(
    mcp_server, app, test_project
):
    """Test creating note with complex content including observations and relations."""
    async with Client(mcp_server) as client:
        complex_content = dedent("""
            # Complex Note
            This note demonstrates the full knowledge format.
            ## Observations
            - [tech] Uses Python and FastAPI
            - [design] Follows MCP protocol specification
            - [note] Integration tests are comprehensive
            ## Relations
            - implements [[MCP Protocol]]
            - depends_on [[FastAPI Framework]]
            - tested_by [[Integration Tests]]
            ## Additional Content
            Some more regular markdown content here.
        """).strip()
        result = await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Complex Knowledge Note",
                "folder": "knowledge",
                "content": complex_content,
                "tags": "complex,knowledge,relations",
            },
        )
        assert len(result.content) == 1
        assert result.content[0].type == "text"
        response_text = result.content[0].text
        assert "# Created note" in response_text
        assert f"project: {test_project.name}" in response_text
        assert "file_path: knowledge/Complex Knowledge Note.md" in response_text
        assert "permalink: knowledge/complex-knowledge-note" in response_text
        # Should show observation and relation counts
        assert "## Observations" in response_text
        assert "tech: 1" in response_text
        assert "design: 1" in response_text
        assert "note: 1" in response_text
        assert "## Relations" in response_text
        # Should show outgoing relations
        assert "## Tags" in response_text
        assert "complex, knowledge, relations" in response_text
        assert f"[Session: Using project '{test_project.name}']" in response_text
@pytest.mark.asyncio
async def test_write_note_preserve_frontmatter(mcp_server, app, test_project):
    """Test that custom frontmatter is preserved when updating notes."""
    async with Client(mcp_server) as client:
        content_with_frontmatter = dedent("""
            ---
            title: Frontmatter Note
            type: note
            version: 1.0
            author: Test Author
            status: draft
            ---
            # Frontmatter Note
            This note has custom frontmatter that should be preserved.
        """).strip()
        result = await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Frontmatter Note",
                "folder": "test",
                "content": content_with_frontmatter,
                "tags": "frontmatter,preservation",
            },
        )
        assert len(result.content) == 1
        assert result.content[0].type == "text"
        response_text = result.content[0].text
        assert "# Created note" in response_text
        assert f"project: {test_project.name}" in response_text
        assert "file_path: test/Frontmatter Note.md" in response_text
        assert "permalink: test/frontmatter-note" in response_text
        assert f"[Session: Using project '{test_project.name}']" in response_text
@pytest.mark.asyncio
async def test_write_note_kebab_filenames_basic(mcp_server, test_project):
    """Test note creation with kebab_filenames=True and invalid filename characters."""
    config = ConfigManager().config
    curr_config_val = config.kebab_filenames
    config.kebab_filenames = True
    with patch.object(ConfigManager, "config", config):
        async with Client(mcp_server) as client:
            result = await client.call_tool(
                "write_note",
                {
                    "project": test_project.name,
                    "title": "My Note: With/Invalid|Chars?",
                    "folder": "my-folder",
                    "content": "Testing kebab-case and invalid characters.",
                    "tags": "kebab,invalid,filename",
                },
            )
            assert len(result.content) == 1
            response_text = result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]
            # File path and permalink should be kebab-case and sanitized
            assert f"project: {test_project.name}" in response_text
            assert "file_path: my-folder/my-note-with-invalid-chars.md" in response_text
            assert "permalink: my-folder/my-note-with-invalid-chars" in response_text
            assert f"[Session: Using project '{test_project.name}']" in response_text
    # Restore original config value
    config.kebab_filenames = curr_config_val
@pytest.mark.asyncio
async def test_write_note_kebab_filenames_repeat_invalid(mcp_server, test_project):
    """Test note creation with multiple invalid and repeated characters."""
    config = ConfigManager().config
    curr_config_val = config.kebab_filenames
    config.kebab_filenames = True
    with patch.object(ConfigManager, "config", config):
        async with Client(mcp_server) as client:
            result = await client.call_tool(
                "write_note",
                {
                    "project": test_project.name,
                    "title": 'Crazy<>:"|?*Note/Name',
                    "folder": "my-folder",
                    "content": "Should be fully kebab-case and safe.",
                    "tags": "crazy,filename,test",
                },
            )
            assert len(result.content) == 1
            response_text = result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]
            assert f"project: {test_project.name}" in response_text
            assert "file_path: my-folder/crazy-note-name.md" in response_text
            assert "permalink: my-folder/crazy-note-name" in response_text
            assert f"[Session: Using project '{test_project.name}']" in response_text
    # Restore original config value
    config.kebab_filenames = curr_config_val
@pytest.mark.asyncio
async def test_write_note_file_path_os_path_join(mcp_server, test_project):
    """Test that os.path.join logic in Entity.file_path works for various folder/title combinations."""
    config = ConfigManager().config
    curr_config_val = config.kebab_filenames
    config.kebab_filenames = True
    test_cases = [
        # (folder, title, expected file_path, expected permalink)
        ("my-folder", "Test Note", "my-folder/test-note.md", "my-folder/test-note"),
        (
            "nested/folder",
            "Another Note",
            "nested/folder/another-note.md",
            "nested/folder/another-note",
        ),
        ("", "Root Note", "root-note.md", "root-note"),
        ("/", "Root Slash Note", "root-slash-note.md", "root-slash-note"),
        (
            "folder with spaces",
            "Note Title",
            "folder with spaces/note-title.md",
            "folder-with-spaces/note-title",
        ),
        ("folder//subfolder", "Note", "folder/subfolder/note.md", "folder/subfolder/note"),
    ]
    with patch.object(ConfigManager, "config", config):
        async with Client(mcp_server) as client:
            for folder, title, expected_path, expected_permalink in test_cases:
                result = await client.call_tool(
                    "write_note",
                    {
                        "project": test_project.name,
                        "title": title,
                        "folder": folder,
                        "content": "Testing os.path.join logic.",
                        "tags": "integration,ospath",
                    },
                )
                assert len(result.content) == 1
                response_text = result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]
                print(response_text)
                assert f"project: {test_project.name}" in response_text
                assert f"file_path: {expected_path}" in response_text
                assert f"permalink: {expected_permalink}" in response_text
                assert f"[Session: Using project '{test_project.name}']" in response_text
    # Restore original config value
    config.kebab_filenames = curr_config_val
@pytest.mark.asyncio
async def test_write_note_project_path_validation(mcp_server, test_project):
    """Test that ProjectItem.home uses expanded path, not name (Issue #340).
    Regression test verifying that:
    1. ProjectItem.home returns Path(self.path).expanduser()
    2. Not Path(self.name) which was the bug
    This test verifies the fix works correctly even though in the test environment
    the project name and path happen to be the same. The fix in src/basic_memory/schemas/project_info.py:186
    ensures .expanduser() is called, which is critical for paths with ~ like "~/Documents/Test BiSync".
    """
    from basic_memory.schemas.project_info import ProjectItem
    from pathlib import Path
    # Test the fix directly: ProjectItem.home should expand tilde paths
    project_with_tilde = ProjectItem(
        id=1,
        name="Test BiSync",  # Name differs from path structure
        description="Test",
        path="~/Documents/Test BiSync",  # Path with tilde
        is_active=True,
        is_default=False,
    )
    # Before fix: Path("Test BiSync") - wrong!
    # After fix: Path("~/Documents/Test BiSync").expanduser() - correct!
    home_path = project_with_tilde.home
    # Verify it's a Path object
    assert isinstance(home_path, Path)
    # Verify tilde was expanded (won't contain ~)
    assert "~" not in str(home_path)
    # Verify it ends with the expected structure (use Path.parts for cross-platform)
    assert home_path.parts[-2:] == ("Documents", "Test BiSync")
    # Also test that write_note works with regular project
    async with Client(mcp_server) as client:
        result = await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Validation Test",
                "folder": "documents",
                "content": "Testing path validation",
                "tags": "test",
            },
        )
        response_text = result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]
        # Should successfully create without path validation errors
        assert "# Created note" in response_text
        assert "not allowed" not in response_text
```
--------------------------------------------------------------------------------
/src/basic_memory/config.py:
--------------------------------------------------------------------------------
```python
"""Configuration management for basic-memory."""
import json
import os
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Literal, Optional, List, Tuple
from loguru import logger
from pydantic import BaseModel, Field, field_validator
from pydantic_settings import BaseSettings, SettingsConfigDict
import basic_memory
from basic_memory.utils import setup_logging, generate_permalink
DATABASE_NAME = "memory.db"
APP_DATABASE_NAME = "memory.db"  # Using the same name but in the app directory
DATA_DIR_NAME = ".basic-memory"
CONFIG_FILE_NAME = "config.json"
WATCH_STATUS_JSON = "watch-status.json"
Environment = Literal["test", "dev", "user"]
@dataclass
class ProjectConfig:
    """Configuration for a specific basic-memory project."""
    name: str
    home: Path
    @property
    def project(self):
        return self.name
    @property
    def project_url(self) -> str:  # pragma: no cover
        return f"/{generate_permalink(self.name)}"
class CloudProjectConfig(BaseModel):
    """Sync configuration for a cloud project.
    This tracks the local working directory and sync state for a project
    that is synced with Basic Memory Cloud.
    """
    local_path: str = Field(description="Local working directory path for this cloud project")
    last_sync: Optional[datetime] = Field(
        default=None, description="Timestamp of last successful sync operation"
    )
    bisync_initialized: bool = Field(
        default=False, description="Whether rclone bisync baseline has been established"
    )
class BasicMemoryConfig(BaseSettings):
    """Pydantic model for Basic Memory global configuration."""
    env: Environment = Field(default="dev", description="Environment name")
    projects: Dict[str, str] = Field(
        default_factory=lambda: {
            "main": Path(os.getenv("BASIC_MEMORY_HOME", Path.home() / "basic-memory")).as_posix()
        },
        description="Mapping of project names to their filesystem paths",
    )
    default_project: str = Field(
        default="main",
        description="Name of the default project to use",
    )
    default_project_mode: bool = Field(
        default=False,
        description="When True, MCP tools automatically use default_project when no project parameter is specified. Enables simplified UX for single-project workflows.",
    )
    # overridden by ~/.basic-memory/config.json
    log_level: str = "INFO"
    # Watch service configuration
    sync_delay: int = Field(
        default=1000, description="Milliseconds to wait after changes before syncing", gt=0
    )
    watch_project_reload_interval: int = Field(
        default=30, description="Seconds between reloading project list in watch service", gt=0
    )
    # update permalinks on move
    update_permalinks_on_move: bool = Field(
        default=False,
        description="Whether to update permalinks when files are moved or renamed. default (False)",
    )
    sync_changes: bool = Field(
        default=True,
        description="Whether to sync changes in real time. default (True)",
    )
    sync_thread_pool_size: int = Field(
        default=4,
        description="Size of thread pool for file I/O operations in sync service. Default of 4 is optimized for cloud deployments with 1-2GB RAM.",
        gt=0,
    )
    sync_max_concurrent_files: int = Field(
        default=10,
        description="Maximum number of files to process concurrently during sync. Limits memory usage on large projects (2000+ files). Lower values reduce memory consumption.",
        gt=0,
    )
    kebab_filenames: bool = Field(
        default=False,
        description="Format for generated filenames. False preserves spaces and special chars, True converts them to hyphens for consistency with permalinks",
    )
    disable_permalinks: bool = Field(
        default=False,
        description="Disable automatic permalink generation in frontmatter. When enabled, new notes won't have permalinks added and sync won't update permalinks. Existing permalinks will still work for reading.",
    )
    skip_initialization_sync: bool = Field(
        default=False,
        description="Skip expensive initialization synchronization. Useful for cloud/stateless deployments where project reconciliation is not needed.",
    )
    # Project path constraints
    project_root: Optional[str] = Field(
        default=None,
        description="If set, all projects must be created underneath this directory. Paths will be sanitized and constrained to this root. If not set, projects can be created anywhere (default behavior).",
    )
    # Cloud configuration
    cloud_client_id: str = Field(
        default="client_01K6KWQPW6J1M8VV7R3TZP5A6M",
        description="OAuth client ID for Basic Memory Cloud",
    )
    cloud_domain: str = Field(
        default="https://eloquent-lotus-05.authkit.app",
        description="AuthKit domain for Basic Memory Cloud",
    )
    cloud_host: str = Field(
        default_factory=lambda: os.getenv(
            "BASIC_MEMORY_CLOUD_HOST", "https://cloud.basicmemory.com"
        ),
        description="Basic Memory Cloud host URL",
    )
    cloud_mode: bool = Field(
        default=False,
        description="Enable cloud mode - all requests go to cloud instead of local (config file value)",
    )
    cloud_projects: Dict[str, CloudProjectConfig] = Field(
        default_factory=dict,
        description="Cloud project sync configuration mapping project names to their local paths and sync state",
    )
    @property
    def cloud_mode_enabled(self) -> bool:
        """Check if cloud mode is enabled.
        Priority:
        1. BASIC_MEMORY_CLOUD_MODE environment variable
        2. Config file value (cloud_mode)
        """
        env_value = os.environ.get("BASIC_MEMORY_CLOUD_MODE", "").lower()
        if env_value in ("true", "1", "yes"):
            return True
        elif env_value in ("false", "0", "no"):
            return False
        # Fall back to config file value
        return self.cloud_mode
    model_config = SettingsConfigDict(
        env_prefix="BASIC_MEMORY_",
        extra="ignore",
    )
    def get_project_path(self, project_name: Optional[str] = None) -> Path:  # pragma: no cover
        """Get the path for a specific project or the default project."""
        name = project_name or self.default_project
        if name not in self.projects:
            raise ValueError(f"Project '{name}' not found in configuration")
        return Path(self.projects[name])
    def model_post_init(self, __context: Any) -> None:
        """Ensure configuration is valid after initialization."""
        # Ensure main project exists
        if "main" not in self.projects:  # pragma: no cover
            self.projects["main"] = (
                Path(os.getenv("BASIC_MEMORY_HOME", Path.home() / "basic-memory"))
            ).as_posix()
        # Ensure default project is valid
        if self.default_project not in self.projects:  # pragma: no cover
            self.default_project = "main"
    @property
    def app_database_path(self) -> Path:
        """Get the path to the app-level database.
        This is the single database that will store all knowledge data
        across all projects.
        """
        database_path = Path.home() / DATA_DIR_NAME / APP_DATABASE_NAME
        if not database_path.exists():  # pragma: no cover
            database_path.parent.mkdir(parents=True, exist_ok=True)
            database_path.touch()
        return database_path
    @property
    def database_path(self) -> Path:
        """Get SQLite database path.
        Rreturns the app-level database path
        for backward compatibility in the codebase.
        """
        # Load the app-level database path from the global config
        config_manager = ConfigManager()
        config = config_manager.load_config()  # pragma: no cover
        return config.app_database_path  # pragma: no cover
    @property
    def project_list(self) -> List[ProjectConfig]:  # pragma: no cover
        """Get all configured projects as ProjectConfig objects."""
        return [ProjectConfig(name=name, home=Path(path)) for name, path in self.projects.items()]
    @field_validator("projects")
    @classmethod
    def ensure_project_paths_exists(cls, v: Dict[str, str]) -> Dict[str, str]:  # pragma: no cover
        """Ensure project path exists."""
        for name, path_value in v.items():
            path = Path(path_value)
            if not Path(path).exists():
                try:
                    path.mkdir(parents=True)
                except Exception as e:
                    logger.error(f"Failed to create project path: {e}")
                    raise e
        return v
    @property
    def data_dir_path(self):
        return Path.home() / DATA_DIR_NAME
# Module-level cache for configuration
_CONFIG_CACHE: Optional[BasicMemoryConfig] = None
class ConfigManager:
    """Manages Basic Memory configuration."""
    def __init__(self) -> None:
        """Initialize the configuration manager."""
        home = os.getenv("HOME", Path.home())
        if isinstance(home, str):
            home = Path(home)
        # Allow override via environment variable
        if config_dir := os.getenv("BASIC_MEMORY_CONFIG_DIR"):
            self.config_dir = Path(config_dir)
        else:
            self.config_dir = home / DATA_DIR_NAME
        self.config_file = self.config_dir / CONFIG_FILE_NAME
        # Ensure config directory exists
        self.config_dir.mkdir(parents=True, exist_ok=True)
    @property
    def config(self) -> BasicMemoryConfig:
        """Get configuration, loading it lazily if needed."""
        return self.load_config()
    def load_config(self) -> BasicMemoryConfig:
        """Load configuration from file or create default.
        Environment variables take precedence over file config values,
        following Pydantic Settings best practices.
        Uses module-level cache for performance across ConfigManager instances.
        """
        global _CONFIG_CACHE
        # Return cached config if available
        if _CONFIG_CACHE is not None:
            return _CONFIG_CACHE
        if self.config_file.exists():
            try:
                file_data = json.loads(self.config_file.read_text(encoding="utf-8"))
                # First, create config from environment variables (Pydantic will read them)
                # Then overlay with file data for fields that aren't set via env vars
                # This ensures env vars take precedence
                # Get env-based config fields that are actually set
                env_config = BasicMemoryConfig()
                env_dict = env_config.model_dump()
                # Merge: file data as base, but only use it for fields not set by env
                # We detect env-set fields by comparing to default values
                merged_data = file_data.copy()
                # For fields that have env var overrides, use those instead of file values
                # The env_prefix is "BASIC_MEMORY_" so we check those
                for field_name in BasicMemoryConfig.model_fields.keys():
                    env_var_name = f"BASIC_MEMORY_{field_name.upper()}"
                    if env_var_name in os.environ:
                        # Environment variable is set, use it
                        merged_data[field_name] = env_dict[field_name]
                _CONFIG_CACHE = BasicMemoryConfig(**merged_data)
                return _CONFIG_CACHE
            except Exception as e:  # pragma: no cover
                logger.exception(f"Failed to load config: {e}")
                raise e
        else:
            config = BasicMemoryConfig()
            self.save_config(config)
            return config
    def save_config(self, config: BasicMemoryConfig) -> None:
        """Save configuration to file and invalidate cache."""
        global _CONFIG_CACHE
        save_basic_memory_config(self.config_file, config)
        # Invalidate cache so next load_config() reads fresh data
        _CONFIG_CACHE = None
    @property
    def projects(self) -> Dict[str, str]:
        """Get all configured projects."""
        return self.config.projects.copy()
    @property
    def default_project(self) -> str:
        """Get the default project name."""
        return self.config.default_project
    def add_project(self, name: str, path: str) -> ProjectConfig:
        """Add a new project to the configuration."""
        project_name, _ = self.get_project(name)
        if project_name:  # pragma: no cover
            raise ValueError(f"Project '{name}' already exists")
        # Ensure the path exists
        project_path = Path(path)
        project_path.mkdir(parents=True, exist_ok=True)  # pragma: no cover
        # Load config, modify it, and save it
        config = self.load_config()
        config.projects[name] = project_path.as_posix()
        self.save_config(config)
        return ProjectConfig(name=name, home=project_path)
    def remove_project(self, name: str) -> None:
        """Remove a project from the configuration."""
        project_name, path = self.get_project(name)
        if not project_name:  # pragma: no cover
            raise ValueError(f"Project '{name}' not found")
        # Load config, check, modify, and save
        config = self.load_config()
        if project_name == config.default_project:  # pragma: no cover
            raise ValueError(f"Cannot remove the default project '{name}'")
        # Use the found project_name (which may differ from input name due to permalink matching)
        del config.projects[project_name]
        self.save_config(config)
    def set_default_project(self, name: str) -> None:
        """Set the default project."""
        project_name, path = self.get_project(name)
        if not project_name:  # pragma: no cover
            raise ValueError(f"Project '{name}' not found")
        # Load config, modify, and save
        config = self.load_config()
        config.default_project = project_name
        self.save_config(config)
    def get_project(self, name: str) -> Tuple[str, str] | Tuple[None, None]:
        """Look up a project from the configuration by name or permalink"""
        project_permalink = generate_permalink(name)
        app_config = self.config
        for project_name, path in app_config.projects.items():
            if project_permalink == generate_permalink(project_name):
                return project_name, path
        return None, None
def get_project_config(project_name: Optional[str] = None) -> ProjectConfig:
    """
    Get the project configuration for the current session.
    If project_name is provided, it will be used instead of the default project.
    """
    actual_project_name = None
    # load the config from file
    config_manager = ConfigManager()
    app_config = config_manager.load_config()
    # Get project name from environment variable
    os_project_name = os.environ.get("BASIC_MEMORY_PROJECT", None)
    if os_project_name:  # pragma: no cover
        logger.warning(
            f"BASIC_MEMORY_PROJECT is not supported anymore. Set the default project in the config instead. Setting default project to {os_project_name}"
        )
        actual_project_name = project_name
    # if the project_name is passed in, use it
    elif not project_name:
        # use default
        actual_project_name = app_config.default_project
    else:  # pragma: no cover
        actual_project_name = project_name
    # the config contains a dict[str,str] of project names and absolute paths
    assert actual_project_name is not None, "actual_project_name cannot be None"
    project_permalink = generate_permalink(actual_project_name)
    for name, path in app_config.projects.items():
        if project_permalink == generate_permalink(name):
            return ProjectConfig(name=name, home=Path(path))
    # otherwise raise error
    raise ValueError(f"Project '{actual_project_name}' not found")  # pragma: no cover
def save_basic_memory_config(file_path: Path, config: BasicMemoryConfig) -> None:
    """Save configuration to file."""
    try:
        # Use model_dump with mode='json' to serialize datetime objects properly
        config_dict = config.model_dump(mode="json")
        file_path.write_text(json.dumps(config_dict, indent=2))
    except Exception as e:  # pragma: no cover
        logger.error(f"Failed to save config: {e}")
# setup logging to a single log file in user home directory
user_home = Path.home()
log_dir = user_home / DATA_DIR_NAME
log_dir.mkdir(parents=True, exist_ok=True)
# Process info for logging
def get_process_name():  # pragma: no cover
    """
    get the type of process for logging
    """
    import sys
    if "sync" in sys.argv:
        return "sync"
    elif "mcp" in sys.argv:
        return "mcp"
    elif "cli" in sys.argv:
        return "cli"
    else:
        return "api"
process_name = get_process_name()
# Global flag to track if logging has been set up
_LOGGING_SETUP = False
# Logging
def setup_basic_memory_logging():  # pragma: no cover
    """Set up logging for basic-memory, ensuring it only happens once."""
    global _LOGGING_SETUP
    if _LOGGING_SETUP:
        # We can't log before logging is set up
        # print("Skipping duplicate logging setup")
        return
    # Check for console logging environment variable - accept more truthy values
    console_logging_env = os.getenv("BASIC_MEMORY_CONSOLE_LOGGING", "false").lower()
    console_logging = console_logging_env in ("true", "1", "yes", "on")
    # Check for log level environment variable first, fall back to config
    log_level = os.getenv("BASIC_MEMORY_LOG_LEVEL")
    if not log_level:
        config_manager = ConfigManager()
        log_level = config_manager.config.log_level
    config_manager = ConfigManager()
    config = get_project_config()
    setup_logging(
        env=config_manager.config.env,
        home_dir=user_home,  # Use user home for logs
        log_level=log_level,
        log_file=f"{DATA_DIR_NAME}/basic-memory-{process_name}.log",
        console=console_logging,
    )
    logger.info(f"Basic Memory {basic_memory.__version__} (Project: {config.project})")
    _LOGGING_SETUP = True
# Set up logging
setup_basic_memory_logging()
```
--------------------------------------------------------------------------------
/tests/mcp/test_tool_read_note.py:
--------------------------------------------------------------------------------
```python
"""Tests for note tools that exercise the full stack with SQLite."""
from textwrap import dedent
import pytest
from basic_memory.mcp.tools import write_note, read_note
import pytest_asyncio
from unittest.mock import MagicMock, patch
from basic_memory.schemas.search import SearchResponse
from basic_memory.utils import normalize_newlines
@pytest_asyncio.fixture
async def mock_call_get():
    """Mock for call_get to simulate different responses."""
    with patch("basic_memory.mcp.tools.read_note.call_get") as mock:
        # Default to 404 - not found
        mock_response = MagicMock()
        mock_response.status_code = 404
        mock.return_value = mock_response
        yield mock
@pytest_asyncio.fixture
async def mock_search():
    """Mock for search tool."""
    with patch("basic_memory.mcp.tools.read_note.search_notes.fn") as mock:
        # Default to empty results
        mock.return_value = SearchResponse(results=[], current_page=1, page_size=1)
        yield mock
@pytest.mark.asyncio
async def test_read_note_by_title(app, test_project):
    """Test reading a note by its title."""
    # First create a note
    await write_note.fn(
        project=test_project.name, title="Special Note", folder="test", content="Note content here"
    )
    # Should be able to read it by title
    content = await read_note.fn("Special Note", project=test_project.name)
    assert "Note content here" in content
@pytest.mark.asyncio
async def test_note_unicode_content(app, test_project):
    """Test handling of unicode content in"""
    content = "# Test 🚀\nThis note has emoji 🎉 and unicode ♠♣♥♦"
    result = await write_note.fn(
        project=test_project.name, title="Unicode Test", folder="test", content=content
    )
    assert (
        dedent(f"""
        # Created note
        project: {test_project.name}
        file_path: test/Unicode Test.md
        permalink: test/unicode-test
        checksum: 272389cd
        """).strip()
        in result
    )
    # Read back should preserve unicode
    result = await read_note.fn("test/unicode-test", project=test_project.name)
    assert normalize_newlines(content) in result
@pytest.mark.asyncio
async def test_multiple_notes(app, test_project):
    """Test creating and managing multiple"""
    # Create several notes
    notes_data = [
        ("test/note-1", "Note 1", "test", "Content 1", ["tag1"]),
        ("test/note-2", "Note 2", "test", "Content 2", ["tag1", "tag2"]),
        ("test/note-3", "Note 3", "test", "Content 3", []),
    ]
    for _, title, folder, content, tags in notes_data:
        await write_note.fn(
            project=test_project.name, title=title, folder=folder, content=content, tags=tags
        )
    # Should be able to read each one
    for permalink, title, folder, content, _ in notes_data:
        note = await read_note.fn(permalink, project=test_project.name)
        assert content in note
    # read multiple notes at once
    result = await read_note.fn("test/*", project=test_project.name)
    # note we can't compare times
    assert "--- memory://test/note-1" in result
    assert "Content 1" in result
    assert "--- memory://test/note-2" in result
    assert "Content 2" in result
    assert "--- memory://test/note-3" in result
    assert "Content 3" in result
@pytest.mark.asyncio
async def test_multiple_notes_pagination(app, test_project):
    """Test creating and managing multiple"""
    # Create several notes
    notes_data = [
        ("test/note-1", "Note 1", "test", "Content 1", ["tag1"]),
        ("test/note-2", "Note 2", "test", "Content 2", ["tag1", "tag2"]),
        ("test/note-3", "Note 3", "test", "Content 3", []),
    ]
    for _, title, folder, content, tags in notes_data:
        await write_note.fn(
            project=test_project.name, title=title, folder=folder, content=content, tags=tags
        )
    # Should be able to read each one
    for permalink, title, folder, content, _ in notes_data:
        note = await read_note.fn(permalink, project=test_project.name)
        assert content in note
    # read multiple notes at once with pagination
    result = await read_note.fn("test/*", page=1, page_size=2, project=test_project.name)
    # note we can't compare times
    assert "--- memory://test/note-1" in result
    assert "Content 1" in result
    assert "--- memory://test/note-2" in result
    assert "Content 2" in result
@pytest.mark.asyncio
async def test_read_note_memory_url(app, test_project):
    """Test reading a note using a memory:// URL.
    Should:
    - Handle memory:// URLs correctly
    - Normalize the URL before resolving
    - Return the note content
    """
    # First create a note
    result = await write_note.fn(
        project=test_project.name,
        title="Memory URL Test",
        folder="test",
        content="Testing memory:// URL handling",
    )
    assert result
    # Should be able to read it with a memory:// URL
    memory_url = "memory://test/memory-url-test"
    content = await read_note.fn(memory_url, project=test_project.name)
    assert "Testing memory:// URL handling" in content
class TestReadNoteSecurityValidation:
    """Test read_note security validation features."""
    @pytest.mark.asyncio
    async def test_read_note_blocks_path_traversal_unix(self, app, test_project):
        """Test that Unix-style path traversal attacks are blocked in identifier parameter."""
        # Test various Unix-style path traversal patterns
        attack_identifiers = [
            "../secrets.txt",
            "../../etc/passwd",
            "../../../root/.ssh/id_rsa",
            "notes/../../../etc/shadow",
            "folder/../../outside/file.md",
            "../../../../etc/hosts",
            "../../../home/user/.env",
        ]
        for attack_identifier in attack_identifiers:
            result = await read_note.fn(attack_identifier, project=test_project.name)
            assert isinstance(result, str)
            assert "# Error" in result
            assert "paths must stay within project boundaries" in result
            assert attack_identifier in result
    @pytest.mark.asyncio
    async def test_read_note_blocks_path_traversal_windows(self, app, test_project):
        """Test that Windows-style path traversal attacks are blocked in identifier parameter."""
        # Test various Windows-style path traversal patterns
        attack_identifiers = [
            "..\\secrets.txt",
            "..\\..\\Windows\\System32\\config\\SAM",
            "notes\\..\\..\\..\\Windows\\System32",
            "\\\\server\\share\\file.txt",
            "..\\..\\Users\\user\\.env",
            "\\\\..\\..\\Windows",
            "..\\..\\..\\Boot.ini",
        ]
        for attack_identifier in attack_identifiers:
            result = await read_note.fn(attack_identifier, project=test_project.name)
            assert isinstance(result, str)
            assert "# Error" in result
            assert "paths must stay within project boundaries" in result
            assert attack_identifier in result
    @pytest.mark.asyncio
    async def test_read_note_blocks_absolute_paths(self, app, test_project):
        """Test that absolute paths are blocked in identifier parameter."""
        # Test various absolute path patterns
        attack_identifiers = [
            "/etc/passwd",
            "/home/user/.env",
            "/var/log/auth.log",
            "/root/.ssh/id_rsa",
            "C:\\Windows\\System32\\config\\SAM",
            "C:\\Users\\user\\.env",
            "D:\\secrets\\config.json",
            "/tmp/malicious.txt",
            "/usr/local/bin/evil",
        ]
        for attack_identifier in attack_identifiers:
            result = await read_note.fn(project=test_project.name, identifier=attack_identifier)
            assert isinstance(result, str)
            assert "# Error" in result
            assert "paths must stay within project boundaries" in result
            assert attack_identifier in result
    @pytest.mark.asyncio
    async def test_read_note_blocks_home_directory_access(self, app, test_project):
        """Test that home directory access patterns are blocked in identifier parameter."""
        # Test various home directory access patterns
        attack_identifiers = [
            "~/secrets.txt",
            "~/.env",
            "~/.ssh/id_rsa",
            "~/Documents/passwords.txt",
            "~\\AppData\\secrets",
            "~\\Desktop\\config.ini",
            "~/.bashrc",
            "~/Library/Preferences/secret.plist",
        ]
        for attack_identifier in attack_identifiers:
            result = await read_note.fn(project=test_project.name, identifier=attack_identifier)
            assert isinstance(result, str)
            assert "# Error" in result
            assert "paths must stay within project boundaries" in result
            assert attack_identifier in result
    @pytest.mark.asyncio
    async def test_read_note_blocks_memory_url_attacks(self, app, test_project):
        """Test that memory URLs with path traversal are blocked."""
        # Test memory URLs with attacks embedded
        attack_identifiers = [
            "memory://../../etc/passwd",
            "memory://../../../root/.ssh/id_rsa",
            "memory://~/.env",
            "memory:///etc/passwd",
            "memory://notes/../../../etc/shadow",
            "memory://..\\..\\Windows\\System32",
        ]
        for attack_identifier in attack_identifiers:
            result = await read_note.fn(project=test_project.name, identifier=attack_identifier)
            assert isinstance(result, str)
            assert "# Error" in result
            assert "paths must stay within project boundaries" in result
    @pytest.mark.asyncio
    async def test_read_note_blocks_mixed_attack_patterns(self, app, test_project):
        """Test that mixed legitimate/attack patterns are blocked in identifier parameter."""
        # Test mixed patterns that start legitimate but contain attacks
        attack_identifiers = [
            "notes/../../../etc/passwd",
            "docs/../../.env",
            "legitimate/path/../../.ssh/id_rsa",
            "project/folder/../../../Windows/System32",
            "valid/folder/../../home/user/.bashrc",
            "assets/../../../tmp/evil.exe",
        ]
        for attack_identifier in attack_identifiers:
            result = await read_note.fn(project=test_project.name, identifier=attack_identifier)
            assert isinstance(result, str)
            assert "# Error" in result
            assert "paths must stay within project boundaries" in result
    @pytest.mark.asyncio
    async def test_read_note_allows_safe_identifiers(self, app, test_project):
        """Test that legitimate identifiers are still allowed."""
        # Test various safe identifier patterns
        safe_identifiers = [
            "notes/meeting",
            "docs/readme",
            "projects/2025/planning",
            "archive/old-notes/backup",
            "folder/subfolder/document",
            "research/ml/algorithms",
            "meeting-notes",
            "test/simple-note",
        ]
        for safe_identifier in safe_identifiers:
            result = await read_note.fn(project=test_project.name, identifier=safe_identifier)
            assert isinstance(result, str)
            # Should not contain security error message
            assert (
                "# Error" not in result or "paths must stay within project boundaries" not in result
            )
            # Should either succeed or fail for legitimate reasons (not found, etc.)
            # but not due to security validation
    @pytest.mark.asyncio
    async def test_read_note_allows_legitimate_titles(self, app, test_project):
        """Test that legitimate note titles work normally."""
        # Create a test note first
        await write_note.fn(
            project=test_project.name,
            title="Security Test Note",
            folder="security-tests",
            content="# Security Test Note\nThis is a legitimate note for security testing.",
        )
        # Test reading by title (should work)
        result = await read_note.fn("Security Test Note", project=test_project.name)
        assert isinstance(result, str)
        # Should not be a security error
        assert "# Error" not in result or "paths must stay within project boundaries" not in result
        # Should either return the note content or search results
    @pytest.mark.asyncio
    async def test_read_note_empty_identifier_security(self, app, test_project):
        """Test that empty identifier is handled securely."""
        # Empty identifier should be allowed (may return search results or error, but not security error)
        result = await read_note.fn(identifier="", project=test_project.name)
        assert isinstance(result, str)
        # Empty identifier should not trigger security error
        assert "# Error" not in result or "paths must stay within project boundaries" not in result
    @pytest.mark.asyncio
    async def test_read_note_security_with_all_parameters(self, app, test_project):
        """Test security validation works with all read_note parameters."""
        # Test that security validation is applied even when all other parameters are provided
        result = await read_note.fn(
            project=test_project.name,
            identifier="../../../etc/malicious",
            page=1,
            page_size=5,
        )
        assert isinstance(result, str)
        assert "# Error" in result
        assert "paths must stay within project boundaries" in result
        assert "../../../etc/malicious" in result
    @pytest.mark.asyncio
    async def test_read_note_security_logging(self, app, caplog, test_project):
        """Test that security violations are properly logged."""
        # Attempt path traversal attack
        result = await read_note.fn(identifier="../../../etc/passwd", project=test_project.name)
        assert "# Error" in result
        assert "paths must stay within project boundaries" in result
        # Check that security violation was logged
        # Note: This test may need adjustment based on the actual logging setup
        # The security validation should generate a warning log entry
    @pytest.mark.asyncio
    async def test_read_note_preserves_functionality_with_security(self, app, test_project):
        """Test that security validation doesn't break normal note reading functionality."""
        # Create a note with complex content to ensure security validation doesn't interfere
        await write_note.fn(
            project=test_project.name,
            title="Full Feature Security Test Note",
            folder="security-tests",
            content=dedent("""
                # Full Feature Security Test Note
                
                This note tests that security validation doesn't break normal functionality.
                
                ## Observations
                - [security] Path validation working correctly #security
                - [feature] All features still functional #test
                
                ## Relations
                - relates_to [[Security Implementation]]
                - depends_on [[Path Validation]]
                
                Additional content with various formatting.
            """).strip(),
            tags=["security", "test", "full-feature"],
            entity_type="guide",
        )
        # Test reading by permalink
        result = await read_note.fn(
            "security-tests/full-feature-security-test-note", project=test_project.name
        )
        # Should succeed normally (not a security error)
        assert isinstance(result, str)
        assert "# Error" not in result or "paths must stay within project boundaries" not in result
        # Should either return content or search results, but not security error
class TestReadNoteSecurityEdgeCases:
    """Test edge cases for read_note security validation."""
    @pytest.mark.asyncio
    async def test_read_note_unicode_identifier_attacks(self, app, test_project):
        """Test that Unicode-based path traversal attempts are blocked."""
        # Test Unicode path traversal attempts
        unicode_attack_identifiers = [
            "notes/文档/../../../etc/passwd",  # Chinese characters
            "docs/café/../../.env",  # Accented characters
            "files/αβγ/../../../secret.txt",  # Greek characters
        ]
        for attack_identifier in unicode_attack_identifiers:
            result = await read_note.fn(attack_identifier, project=test_project.name)
            assert isinstance(result, str)
            assert "# Error" in result
            assert "paths must stay within project boundaries" in result
    @pytest.mark.asyncio
    async def test_read_note_very_long_attack_identifier(self, app, test_project):
        """Test handling of very long attack identifiers."""
        # Create a very long path traversal attack
        long_attack_identifier = "../" * 1000 + "etc/malicious"
        result = await read_note.fn(long_attack_identifier, project=test_project.name)
        assert isinstance(result, str)
        assert "# Error" in result
        assert "paths must stay within project boundaries" in result
    @pytest.mark.asyncio
    async def test_read_note_case_variations_attacks(self, app, test_project):
        """Test that case variations don't bypass security."""
        # Test case variations (though case sensitivity depends on filesystem)
        case_attack_identifiers = [
            "../ETC/passwd",
            "../Etc/PASSWD",
            "..\\WINDOWS\\system32",
            "~/.SSH/id_rsa",
        ]
        for attack_identifier in case_attack_identifiers:
            result = await read_note.fn(attack_identifier, project=test_project.name)
            assert isinstance(result, str)
            assert "# Error" in result
            assert "paths must stay within project boundaries" in result
    @pytest.mark.asyncio
    async def test_read_note_whitespace_in_attack_identifiers(self, app, test_project):
        """Test that whitespace doesn't help bypass security."""
        # Test attack identifiers with various whitespace
        whitespace_attack_identifiers = [
            " ../../../etc/passwd ",
            "\t../../../secrets\t",
            " ..\\..\\Windows ",
            "notes/ ../../ malicious",
        ]
        for attack_identifier in whitespace_attack_identifiers:
            result = await read_note.fn(attack_identifier, project=test_project.name)
            assert isinstance(result, str)
            # The attack should still be blocked even with whitespace
            if ".." in attack_identifier.strip() or "~" in attack_identifier.strip():
                assert "# Error" in result
                assert "paths must stay within project boundaries" in result
```
--------------------------------------------------------------------------------
/test-int/mcp/test_list_directory_integration.py:
--------------------------------------------------------------------------------
```python
"""
Integration tests for list_directory MCP tool.
Tests the complete list directory workflow: MCP client -> MCP server -> FastAPI -> database -> file system
"""
import pytest
from fastmcp import Client
@pytest.mark.asyncio
async def test_list_directory_basic_operation(mcp_server, app, test_project):
    """Test basic list_directory operation showing root contents."""
    async with Client(mcp_server) as client:
        # Create some test files and directories first
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Root Note",
                "folder": "",  # Root folder
                "content": "# Root Note\n\nThis is in the root directory.",
                "tags": "test,root",
            },
        )
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Project Planning",
                "folder": "projects",
                "content": "# Project Planning\n\nPlanning document for projects.",
                "tags": "planning,project",
            },
        )
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Meeting Notes",
                "folder": "meetings",
                "content": "# Meeting Notes\n\nNotes from the meeting.",
                "tags": "meeting,notes",
            },
        )
        # List root directory
        list_result = await client.call_tool(
            "list_directory",
            {
                "project": test_project.name,
                "dir_name": "/",
                "depth": 1,
            },
        )
        # Should return formatted directory listing
        assert len(list_result.content) == 1
        list_text = list_result.content[0].text
        # Should show the structure
        assert "Contents of '/' (depth 1):" in list_text
        assert "📁 meetings" in list_text
        assert "📁 projects" in list_text
        assert "📄 Root Note.md" in list_text
        assert "Root Note" in list_text  # Title should be shown
        assert "Total:" in list_text
        assert "directories" in list_text
        assert "file" in list_text
@pytest.mark.asyncio
async def test_list_directory_specific_folder(mcp_server, app, test_project):
    """Test listing contents of a specific folder."""
    async with Client(mcp_server) as client:
        # Create nested structure
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Task List",
                "folder": "work",
                "content": "# Task List\n\nWork tasks for today.",
                "tags": "work,tasks",
            },
        )
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Project Alpha",
                "folder": "work/projects",
                "content": "# Project Alpha\n\nAlpha project documentation.",
                "tags": "project,alpha",
            },
        )
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Daily Standup",
                "folder": "work/meetings",
                "content": "# Daily Standup\n\nStandup meeting notes.",
                "tags": "meeting,standup",
            },
        )
        # List specific folder
        list_result = await client.call_tool(
            "list_directory",
            {
                "project": test_project.name,
                "dir_name": "/work",
                "depth": 1,
            },
        )
        assert len(list_result.content) == 1
        list_text = list_result.content[0].text
        # Should show work folder contents
        assert "Contents of '/work' (depth 1):" in list_text
        assert "📁 meetings" in list_text
        assert "📁 projects" in list_text
        assert "📄 Task List.md" in list_text
        assert "work/Task List.md" in list_text  # Path should be shown without leading slash
@pytest.mark.asyncio
async def test_list_directory_with_depth(mcp_server, app, test_project):
    """Test recursive directory listing with depth control."""
    async with Client(mcp_server) as client:
        # Create deep nested structure
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Deep Note",
                "folder": "research/ml/algorithms/neural-networks",
                "content": "# Deep Note\n\nDeep learning research.",
                "tags": "research,ml,deep",
            },
        )
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "ML Overview",
                "folder": "research/ml",
                "content": "# ML Overview\n\nMachine learning overview.",
                "tags": "research,ml,overview",
            },
        )
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Research Index",
                "folder": "research",
                "content": "# Research Index\n\nIndex of research topics.",
                "tags": "research,index",
            },
        )
        # List with depth=3 to see nested structure
        list_result = await client.call_tool(
            "list_directory",
            {
                "project": test_project.name,
                "dir_name": "/research",
                "depth": 3,
            },
        )
        assert len(list_result.content) == 1
        list_text = list_result.content[0].text
        # Should show nested structure within depth=3
        assert "Contents of '/research' (depth 3):" in list_text
        assert "📁 ml" in list_text
        assert "📄 Research Index.md" in list_text
        assert "📄 ML Overview.md" in list_text
        assert "📁 algorithms" in list_text  # Should show nested dirs within depth
@pytest.mark.asyncio
async def test_list_directory_with_glob_pattern(mcp_server, app, test_project):
    """Test directory listing with glob pattern filtering."""
    async with Client(mcp_server) as client:
        # Create files with different patterns
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Meeting 2025-01-15",
                "folder": "meetings",
                "content": "# Meeting 2025-01-15\n\nMonday meeting notes.",
                "tags": "meeting,january",
            },
        )
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Meeting 2025-01-22",
                "folder": "meetings",
                "content": "# Meeting 2025-01-22\n\nMonday meeting notes.",
                "tags": "meeting,january",
            },
        )
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Project Status",
                "folder": "meetings",
                "content": "# Project Status\n\nProject status update.",
                "tags": "meeting,project",
            },
        )
        # List with glob pattern for meeting files
        list_result = await client.call_tool(
            "list_directory",
            {
                "project": test_project.name,
                "dir_name": "/meetings",
                "depth": 1,
                "file_name_glob": "Meeting*",
            },
        )
        assert len(list_result.content) == 1
        list_text = list_result.content[0].text
        # Should show only matching files
        assert "Files in '/meetings' matching 'Meeting*' (depth 1):" in list_text
        assert "📄 Meeting 2025-01-15.md" in list_text
        assert "📄 Meeting 2025-01-22.md" in list_text
        assert "Project Status" not in list_text  # Should be filtered out
@pytest.mark.asyncio
async def test_list_directory_empty_directory(mcp_server, app, test_project):
    """Test listing an empty directory."""
    async with Client(mcp_server) as client:
        # List non-existent/empty directory
        list_result = await client.call_tool(
            "list_directory",
            {
                "project": test_project.name,
                "dir_name": "/empty",
                "depth": 1,
            },
        )
        assert len(list_result.content) == 1
        list_text = list_result.content[0].text
        # Should indicate no files found
        assert "No files found in directory '/empty'" in list_text
@pytest.mark.asyncio
async def test_list_directory_glob_no_matches(mcp_server, app, test_project):
    """Test glob pattern that matches no files."""
    async with Client(mcp_server) as client:
        # Create some files
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Document One",
                "folder": "docs",
                "content": "# Document One\n\nFirst document.",
                "tags": "doc",
            },
        )
        # List with glob pattern that won't match
        list_result = await client.call_tool(
            "list_directory",
            {
                "project": test_project.name,
                "dir_name": "/docs",
                "depth": 1,
                "file_name_glob": "*.py",  # No Python files
            },
        )
        assert len(list_result.content) == 1
        list_text = list_result.content[0].text
        # Should indicate no matches for the pattern
        assert "No files found in directory '/docs' matching '*.py'" in list_text
@pytest.mark.asyncio
async def test_list_directory_various_file_types(mcp_server, app, test_project):
    """Test listing directories with various file types and metadata display."""
    async with Client(mcp_server) as client:
        # Create files with different characteristics
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Simple Note",
                "folder": "mixed",
                "content": "# Simple Note\n\nA simple note.",
                "tags": "simple",
            },
        )
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Complex Document with Long Title",
                "folder": "mixed",
                "content": "# Complex Document with Long Title\n\nA more complex document.",
                "tags": "complex,long",
            },
        )
        # List the mixed directory
        list_result = await client.call_tool(
            "list_directory",
            {
                "project": test_project.name,
                "dir_name": "/mixed",
                "depth": 1,
            },
        )
        assert len(list_result.content) == 1
        list_text = list_result.content[0].text
        # Should show file names, paths, and titles
        assert "📄 Simple Note.md" in list_text
        assert "mixed/Simple Note.md" in list_text
        assert "📄 Complex Document with Long Title.md" in list_text
        assert "mixed/Complex Document with Long Title.md" in list_text
        assert "Total: 2 items (2 files)" in list_text
@pytest.mark.asyncio
async def test_list_directory_default_parameters(mcp_server, app, test_project):
    """Test list_directory with default parameters (root, depth=1)."""
    async with Client(mcp_server) as client:
        # Create some content
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Default Test",
                "folder": "default-test",
                "content": "# Default Test\n\nTesting default parameters.",
                "tags": "default",
            },
        )
        # List with minimal parameters (should use defaults)
        list_result = await client.call_tool(
            "list_directory",
            {"project": test_project.name},  # Add project parameter but use other defaults
        )
        assert len(list_result.content) == 1
        list_text = list_result.content[0].text
        # Should show root directory with depth 1
        assert "Contents of '/' (depth 1):" in list_text
        assert "📁 default-test" in list_text
        assert "Total:" in list_text
@pytest.mark.asyncio
async def test_list_directory_deep_recursion(mcp_server, app, test_project):
    """Test directory listing with maximum depth."""
    async with Client(mcp_server) as client:
        # Create very deep structure
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Level 5 Note",
                "folder": "level1/level2/level3/level4/level5",
                "content": "# Level 5 Note\n\nVery deep note.",
                "tags": "deep,level5",
            },
        )
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Level 3 Note",
                "folder": "level1/level2/level3",
                "content": "# Level 3 Note\n\nMid-level note.",
                "tags": "medium,level3",
            },
        )
        # List with maximum depth (depth=10)
        list_result = await client.call_tool(
            "list_directory",
            {
                "project": test_project.name,
                "dir_name": "/level1",
                "depth": 10,  # Maximum allowed depth
            },
        )
        assert len(list_result.content) == 1
        list_text = list_result.content[0].text
        # Should show deep structure
        assert "Contents of '/level1' (depth 10):" in list_text
        assert "📁 level2" in list_text
        assert "📄 Level 3 Note.md" in list_text
        assert "📄 Level 5 Note.md" in list_text
@pytest.mark.asyncio
async def test_list_directory_complex_glob_patterns(mcp_server, app, test_project):
    """Test various glob patterns for file filtering."""
    async with Client(mcp_server) as client:
        # Create files with different naming patterns
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Project Alpha Plan",
                "folder": "patterns",
                "content": "# Project Alpha Plan\n\nAlpha planning.",
                "tags": "project,alpha",
            },
        )
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Project Beta Plan",
                "folder": "patterns",
                "content": "# Project Beta Plan\n\nBeta planning.",
                "tags": "project,beta",
            },
        )
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Meeting Minutes",
                "folder": "patterns",
                "content": "# Meeting Minutes\n\nMeeting notes.",
                "tags": "meeting",
            },
        )
        # Test wildcard pattern
        list_result = await client.call_tool(
            "list_directory",
            {
                "project": test_project.name,
                "dir_name": "/patterns",
                "file_name_glob": "Project*",
            },
        )
        assert len(list_result.content) == 1
        list_text = list_result.content[0].text
        # Should show only Project files
        assert "Project Alpha Plan.md" in list_text
        assert "Project Beta Plan.md" in list_text
        assert "Meeting Minutes" not in list_text
        assert "matching 'Project*'" in list_text
@pytest.mark.asyncio
async def test_list_directory_dot_slash_prefix_paths(mcp_server, app, test_project):
    """Test directory listing with ./ prefix paths (reproduces bug report issue)."""
    async with Client(mcp_server) as client:
        # Create test files in a subdirectory
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Artifact One",
                "folder": "artifacts",
                "content": "# Artifact One\n\nFirst artifact document.",
                "tags": "artifact,test",
            },
        )
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "Artifact Two",
                "folder": "artifacts",
                "content": "# Artifact Two\n\nSecond artifact document.",
                "tags": "artifact,test",
            },
        )
        # Test normal path without ./ prefix (should work)
        normal_result = await client.call_tool(
            "list_directory",
            {
                "project": test_project.name,
                "dir_name": "artifacts",
                "depth": 1,
            },
        )
        assert len(normal_result.content) == 1
        normal_text = normal_result.content[0].text
        assert "Artifact One.md" in normal_text
        assert "Artifact Two.md" in normal_text
        assert "2 files" in normal_text
        # Test with ./ prefix (this was the failing case in the bug report)
        dot_slash_result = await client.call_tool(
            "list_directory",
            {
                "project": test_project.name,
                "dir_name": "./artifacts",
                "depth": 1,
            },
        )
        assert len(dot_slash_result.content) == 1
        dot_slash_text = dot_slash_result.content[0].text
        # Should show the same files as normal path
        assert "Artifact One.md" in dot_slash_text
        assert "Artifact Two.md" in dot_slash_text
        assert "2 files" in dot_slash_text
        # Test with trailing slash after ./ prefix
        dot_slash_trailing_result = await client.call_tool(
            "list_directory",
            {
                "project": test_project.name,
                "dir_name": "./artifacts/",
                "depth": 1,
            },
        )
        assert len(dot_slash_trailing_result.content) == 1
        dot_slash_trailing_text = dot_slash_trailing_result.content[0].text
        # Should show the same files
        assert "Artifact One.md" in dot_slash_trailing_text
        assert "Artifact Two.md" in dot_slash_trailing_text
        assert "2 files" in dot_slash_trailing_text
```
--------------------------------------------------------------------------------
/tests/cli/test_upload.py:
--------------------------------------------------------------------------------
```python
"""Tests for upload module."""
from unittest.mock import AsyncMock, Mock, patch
import httpx
import pytest
from basic_memory.cli.commands.cloud.upload import _get_files_to_upload, upload_path
class TestGetFilesToUpload:
    """Tests for _get_files_to_upload()."""
    def test_collects_files_from_directory(self, tmp_path):
        """Test collecting files from a directory."""
        # Create test directory structure
        (tmp_path / "file1.txt").write_text("content1")
        (tmp_path / "file2.md").write_text("content2")
        (tmp_path / "subdir").mkdir()
        (tmp_path / "subdir" / "file3.py").write_text("content3")
        # Call with real ignore utils (no mocking)
        result = _get_files_to_upload(tmp_path, verbose=False, use_gitignore=True)
        # Should find all 3 files
        assert len(result) == 3
        # Extract just the relative paths for easier assertion
        relative_paths = [rel_path for _, rel_path in result]
        assert "file1.txt" in relative_paths
        assert "file2.md" in relative_paths
        assert "subdir/file3.py" in relative_paths
    def test_respects_gitignore_patterns(self, tmp_path):
        """Test that gitignore patterns are respected."""
        # Create test files
        (tmp_path / "keep.txt").write_text("keep")
        (tmp_path / "ignore.pyc").write_text("ignore")
        # Create .gitignore file
        gitignore_file = tmp_path / ".gitignore"
        gitignore_file.write_text("*.pyc\n")
        result = _get_files_to_upload(tmp_path)
        # Should only find keep.txt (not .pyc or .gitignore itself)
        relative_paths = [rel_path for _, rel_path in result]
        assert "keep.txt" in relative_paths
        assert "ignore.pyc" not in relative_paths
    def test_handles_empty_directory(self, tmp_path):
        """Test handling of empty directory."""
        empty_dir = tmp_path / "empty"
        empty_dir.mkdir()
        result = _get_files_to_upload(empty_dir)
        assert result == []
    def test_converts_windows_paths_to_forward_slashes(self, tmp_path):
        """Test that Windows backslashes are converted to forward slashes."""
        # Create nested structure
        (tmp_path / "dir1").mkdir()
        (tmp_path / "dir1" / "dir2").mkdir()
        (tmp_path / "dir1" / "dir2" / "file.txt").write_text("content")
        result = _get_files_to_upload(tmp_path)
        # Remote path should use forward slashes
        _, remote_path = result[0]
        assert "\\" not in remote_path  # No backslashes
        assert "dir1/dir2/file.txt" == remote_path
class TestUploadPath:
    """Tests for upload_path()."""
    @pytest.mark.asyncio
    async def test_uploads_single_file(self, tmp_path):
        """Test uploading a single file."""
        test_file = tmp_path / "test.txt"
        test_file.write_text("test content")
        # Mock the client and HTTP response
        mock_client = AsyncMock()
        mock_response = Mock()
        mock_response.raise_for_status = Mock()
        with patch("basic_memory.cli.commands.cloud.upload.get_client") as mock_get_client:
            with patch("basic_memory.cli.commands.cloud.upload.call_put") as mock_put:
                with patch("aiofiles.open", create=True) as mock_aiofiles_open:
                    # Setup mocks
                    mock_get_client.return_value.__aenter__.return_value = mock_client
                    mock_get_client.return_value.__aexit__.return_value = None
                    mock_put.return_value = mock_response
                    # Mock file reading
                    mock_file = AsyncMock()
                    mock_file.read.return_value = b"test content"
                    mock_aiofiles_open.return_value.__aenter__.return_value = mock_file
                    result = await upload_path(test_file, "test-project")
        # Verify success
        assert result is True
        # Verify PUT was called with correct path
        mock_put.assert_called_once()
        call_args = mock_put.call_args
        assert call_args[0][0] == mock_client
        assert call_args[0][1] == "/webdav/test-project/test.txt"
        assert call_args[1]["content"] == b"test content"
    @pytest.mark.asyncio
    async def test_uploads_directory(self, tmp_path):
        """Test uploading a directory with multiple files."""
        # Create test files
        (tmp_path / "file1.txt").write_text("content1")
        (tmp_path / "file2.txt").write_text("content2")
        mock_client = AsyncMock()
        mock_response = Mock()
        mock_response.raise_for_status = Mock()
        with patch("basic_memory.cli.commands.cloud.upload.get_client") as mock_get_client:
            with patch("basic_memory.cli.commands.cloud.upload.call_put") as mock_put:
                with patch(
                    "basic_memory.cli.commands.cloud.upload._get_files_to_upload"
                ) as mock_get_files:
                    with patch("aiofiles.open", create=True) as mock_aiofiles_open:
                        # Setup mocks
                        mock_get_client.return_value.__aenter__.return_value = mock_client
                        mock_get_client.return_value.__aexit__.return_value = None
                        mock_put.return_value = mock_response
                        # Mock file listing
                        mock_get_files.return_value = [
                            (tmp_path / "file1.txt", "file1.txt"),
                            (tmp_path / "file2.txt", "file2.txt"),
                        ]
                        # Mock file reading
                        mock_file = AsyncMock()
                        mock_file.read.side_effect = [b"content1", b"content2"]
                        mock_aiofiles_open.return_value.__aenter__.return_value = mock_file
                        result = await upload_path(tmp_path, "test-project")
        # Verify success
        assert result is True
        # Verify PUT was called twice
        assert mock_put.call_count == 2
    @pytest.mark.asyncio
    async def test_handles_nonexistent_path(self, tmp_path):
        """Test handling of nonexistent path."""
        nonexistent = tmp_path / "does-not-exist"
        result = await upload_path(nonexistent, "test-project")
        # Should return False
        assert result is False
    @pytest.mark.asyncio
    async def test_handles_http_error(self, tmp_path):
        """Test handling of HTTP errors during upload."""
        test_file = tmp_path / "test.txt"
        test_file.write_text("test content")
        mock_client = AsyncMock()
        mock_response = Mock()
        mock_response.status_code = 403
        mock_response.text = "Forbidden"
        mock_response.raise_for_status.side_effect = httpx.HTTPStatusError(
            "Forbidden", request=Mock(), response=mock_response
        )
        with patch("basic_memory.cli.commands.cloud.upload.get_client") as mock_get_client:
            with patch("basic_memory.cli.commands.cloud.upload.call_put") as mock_put:
                with patch("aiofiles.open", create=True) as mock_aiofiles_open:
                    # Setup mocks
                    mock_get_client.return_value.__aenter__.return_value = mock_client
                    mock_get_client.return_value.__aexit__.return_value = None
                    mock_put.return_value = mock_response
                    # Mock file reading
                    mock_file = AsyncMock()
                    mock_file.read.return_value = b"test content"
                    mock_aiofiles_open.return_value.__aenter__.return_value = mock_file
                    result = await upload_path(test_file, "test-project")
        # Should return False on error
        assert result is False
    @pytest.mark.asyncio
    async def test_handles_empty_directory(self, tmp_path):
        """Test uploading an empty directory."""
        empty_dir = tmp_path / "empty"
        empty_dir.mkdir()
        with patch("basic_memory.cli.commands.cloud.upload._get_files_to_upload") as mock_get_files:
            mock_get_files.return_value = []
            result = await upload_path(empty_dir, "test-project")
        # Should return True (no-op success)
        assert result is True
    @pytest.mark.asyncio
    async def test_formats_file_size_bytes(self, tmp_path, capsys):
        """Test file size formatting for small files (bytes)."""
        test_file = tmp_path / "small.txt"
        test_file.write_text("hi")  # 2 bytes
        mock_client = AsyncMock()
        mock_response = Mock()
        mock_response.raise_for_status = Mock()
        with patch("basic_memory.cli.commands.cloud.upload.get_client") as mock_get_client:
            with patch("basic_memory.cli.commands.cloud.upload.call_put") as mock_put:
                with patch("aiofiles.open", create=True) as mock_aiofiles_open:
                    mock_get_client.return_value.__aenter__.return_value = mock_client
                    mock_get_client.return_value.__aexit__.return_value = None
                    mock_put.return_value = mock_response
                    mock_file = AsyncMock()
                    mock_file.read.return_value = b"hi"
                    mock_aiofiles_open.return_value.__aenter__.return_value = mock_file
                    await upload_path(test_file, "test-project")
        # Check output contains "bytes"
        captured = capsys.readouterr()
        assert "bytes" in captured.out
    @pytest.mark.asyncio
    async def test_formats_file_size_kilobytes(self, tmp_path, capsys):
        """Test file size formatting for medium files (KB)."""
        test_file = tmp_path / "medium.txt"
        # Create file with 2KB of content
        test_file.write_text("x" * 2048)
        mock_client = AsyncMock()
        mock_response = Mock()
        mock_response.raise_for_status = Mock()
        with patch("basic_memory.cli.commands.cloud.upload.get_client") as mock_get_client:
            with patch("basic_memory.cli.commands.cloud.upload.call_put") as mock_put:
                with patch("aiofiles.open", create=True) as mock_aiofiles_open:
                    mock_get_client.return_value.__aenter__.return_value = mock_client
                    mock_get_client.return_value.__aexit__.return_value = None
                    mock_put.return_value = mock_response
                    mock_file = AsyncMock()
                    mock_file.read.return_value = b"x" * 2048
                    mock_aiofiles_open.return_value.__aenter__.return_value = mock_file
                    await upload_path(test_file, "test-project")
        # Check output contains "KB"
        captured = capsys.readouterr()
        assert "KB" in captured.out
    @pytest.mark.asyncio
    async def test_formats_file_size_megabytes(self, tmp_path, capsys):
        """Test file size formatting for large files (MB)."""
        test_file = tmp_path / "large.txt"
        # Create file with 2MB of content
        test_file.write_text("x" * (2 * 1024 * 1024))
        mock_client = AsyncMock()
        mock_response = Mock()
        mock_response.raise_for_status = Mock()
        with patch("basic_memory.cli.commands.cloud.upload.get_client") as mock_get_client:
            with patch("basic_memory.cli.commands.cloud.upload.call_put") as mock_put:
                with patch("aiofiles.open", create=True) as mock_aiofiles_open:
                    mock_get_client.return_value.__aenter__.return_value = mock_client
                    mock_get_client.return_value.__aexit__.return_value = None
                    mock_put.return_value = mock_response
                    mock_file = AsyncMock()
                    mock_file.read.return_value = b"x" * (2 * 1024 * 1024)
                    mock_aiofiles_open.return_value.__aenter__.return_value = mock_file
                    await upload_path(test_file, "test-project")
        # Check output contains "MB"
        captured = capsys.readouterr()
        assert "MB" in captured.out
    @pytest.mark.asyncio
    async def test_builds_correct_webdav_path(self, tmp_path):
        """Test that WebDAV path is correctly constructed."""
        # Create nested structure
        (tmp_path / "subdir").mkdir()
        test_file = tmp_path / "subdir" / "file.txt"
        test_file.write_text("content")
        mock_client = AsyncMock()
        mock_response = Mock()
        mock_response.raise_for_status = Mock()
        with patch("basic_memory.cli.commands.cloud.upload.get_client") as mock_get_client:
            with patch("basic_memory.cli.commands.cloud.upload.call_put") as mock_put:
                with patch(
                    "basic_memory.cli.commands.cloud.upload._get_files_to_upload"
                ) as mock_get_files:
                    with patch("aiofiles.open", create=True) as mock_aiofiles_open:
                        mock_get_client.return_value.__aenter__.return_value = mock_client
                        mock_get_client.return_value.__aexit__.return_value = None
                        mock_put.return_value = mock_response
                        # Mock file listing with relative path
                        mock_get_files.return_value = [(test_file, "subdir/file.txt")]
                        mock_file = AsyncMock()
                        mock_file.read.return_value = b"content"
                        mock_aiofiles_open.return_value.__aenter__.return_value = mock_file
                        await upload_path(tmp_path, "my-project")
        # Verify WebDAV path format: /webdav/{project_name}/{relative_path}
        mock_put.assert_called_once()
        call_args = mock_put.call_args
        assert call_args[0][1] == "/webdav/my-project/subdir/file.txt"
    def test_no_gitignore_skips_gitignore_patterns(self, tmp_path):
        """Test that --no-gitignore flag skips .gitignore patterns."""
        # Create test files
        (tmp_path / "keep.txt").write_text("keep")
        (tmp_path / "secret.bak").write_text("secret")  # Use .bak instead of .pyc
        # Create .gitignore file that ignores .bak files
        gitignore_file = tmp_path / ".gitignore"
        gitignore_file.write_text("*.bak\n")
        # With use_gitignore=False, should include .bak files
        result = _get_files_to_upload(tmp_path, verbose=False, use_gitignore=False)
        # Extract relative paths
        relative_paths = [rel_path for _, rel_path in result]
        # Both files should be included when gitignore is disabled
        assert "keep.txt" in relative_paths
        assert "secret.bak" in relative_paths
    def test_no_gitignore_still_respects_bmignore(self, tmp_path):
        """Test that --no-gitignore still respects .bmignore patterns."""
        # Create test files
        (tmp_path / "keep.txt").write_text("keep")
        (tmp_path / ".hidden").write_text(
            "hidden"
        )  # Should be ignored by .bmignore default pattern
        # Create .gitignore that would allow .hidden
        gitignore_file = tmp_path / ".gitignore"
        gitignore_file.write_text("# Allow all\n")
        # With use_gitignore=False, should still filter hidden files via .bmignore
        result = _get_files_to_upload(tmp_path, verbose=False, use_gitignore=False)
        # Extract relative paths
        relative_paths = [rel_path for _, rel_path in result]
        # keep.txt should be included, .hidden should be filtered by .bmignore
        assert "keep.txt" in relative_paths
        assert ".hidden" not in relative_paths
    def test_verbose_shows_filtering_info(self, tmp_path, capsys):
        """Test that verbose mode shows filtering information."""
        # Create test files
        (tmp_path / "keep.txt").write_text("keep")
        (tmp_path / "ignore.pyc").write_text("ignore")
        # Create .gitignore
        gitignore_file = tmp_path / ".gitignore"
        gitignore_file.write_text("*.pyc\n")
        # Run with verbose=True
        _get_files_to_upload(tmp_path, verbose=True, use_gitignore=True)
        # Capture output
        captured = capsys.readouterr()
        # Should show scanning information
        assert "Scanning directory:" in captured.out
        assert "Using .bmignore: Yes" in captured.out
        assert "Using .gitignore:" in captured.out
        assert "Ignore patterns loaded:" in captured.out
        # Should show file status
        assert "[INCLUDE]" in captured.out or "[IGNORED]" in captured.out
        # Should show summary
        assert "Summary:" in captured.out
        assert "Files to upload:" in captured.out
        assert "Files ignored:" in captured.out
    def test_wildcard_gitignore_filters_all_files(self, tmp_path):
        """Test that a wildcard * in .gitignore filters all files."""
        # Create test files
        (tmp_path / "file1.txt").write_text("content1")
        (tmp_path / "file2.md").write_text("content2")
        # Create .gitignore with wildcard
        gitignore_file = tmp_path / ".gitignore"
        gitignore_file.write_text("*\n")
        # Should filter all files
        result = _get_files_to_upload(tmp_path, verbose=False, use_gitignore=True)
        assert len(result) == 0
        # With use_gitignore=False, should include files
        result = _get_files_to_upload(tmp_path, verbose=False, use_gitignore=False)
        assert len(result) == 2
    @pytest.mark.asyncio
    async def test_dry_run_shows_files_without_uploading(self, tmp_path, capsys):
        """Test that --dry-run shows what would be uploaded without uploading."""
        # Create test files
        (tmp_path / "file1.txt").write_text("content1")
        (tmp_path / "file2.txt").write_text("content2")
        # Don't mock anything - we want to verify no actual upload happens
        result = await upload_path(tmp_path, "test-project", dry_run=True)
        # Should return success
        assert result is True
        # Check output shows dry run info
        captured = capsys.readouterr()
        assert "Found 2 file(s) to upload" in captured.out
        assert "Files that would be uploaded:" in captured.out
        assert "file1.txt" in captured.out
        assert "file2.txt" in captured.out
        assert "Total:" in captured.out
    @pytest.mark.asyncio
    async def test_dry_run_with_verbose(self, tmp_path, capsys):
        """Test that --dry-run works with --verbose."""
        # Create test files
        (tmp_path / "keep.txt").write_text("keep")
        (tmp_path / "ignore.pyc").write_text("ignore")
        # Create .gitignore
        gitignore_file = tmp_path / ".gitignore"
        gitignore_file.write_text("*.pyc\n")
        result = await upload_path(tmp_path, "test-project", verbose=True, dry_run=True)
        # Should return success
        assert result is True
        # Check output shows both verbose and dry run info
        captured = capsys.readouterr()
        assert "Scanning directory:" in captured.out
        assert "[INCLUDE] keep.txt" in captured.out
        assert "[IGNORED] ignore.pyc" in captured.out
        assert "Files that would be uploaded:" in captured.out
        assert "keep.txt" in captured.out
```
--------------------------------------------------------------------------------
/docs/cloud-cli.md:
--------------------------------------------------------------------------------
```markdown
# Basic Memory Cloud CLI Guide
The Basic Memory Cloud CLI provides seamless integration between local and cloud knowledge bases using **project-scoped synchronization**. Each project can optionally sync with the cloud, giving you fine-grained control over what syncs and where.
## Overview
The cloud CLI enables you to:
- **Toggle cloud mode** - All regular `bm` commands work with cloud when enabled
- **Project-scoped sync** - Each project independently manages its sync configuration
- **Explicit operations** - Sync only what you want, when you want
- **Bidirectional sync** - Keep local and cloud in sync with rclone bisync
- **Offline access** - Work locally, sync when ready
## Prerequisites
Before using Basic Memory Cloud, you need:
- **Active Subscription**: An active Basic Memory Cloud subscription is required to access cloud features
- **Subscribe**: Visit [https://basicmemory.com/subscribe](https://basicmemory.com/subscribe) to sign up
If you attempt to log in without an active subscription, you'll receive a "Subscription Required" error with a link to subscribe.
## Architecture: Project-Scoped Sync
### The Problem
**Old approach (SPEC-8):** All projects lived in a single `~/basic-memory-cloud-sync/` directory. This caused:
- ❌ Directory conflicts between mount and bisync
- ❌ Auto-discovery creating phantom projects
- ❌ Confusion about what syncs and when
- ❌ All-or-nothing sync (couldn't sync just one project)
**New approach (SPEC-20):** Each project independently configures sync.
### How It Works
**Projects can exist in three states:**
1. **Cloud-only** - Project exists on cloud, no local copy
2. **Cloud + Local (synced)** - Project has a local working directory that syncs
3. **Local-only** - Project exists locally (when cloud mode is disabled)
**Example:**
```bash
# You have 3 projects on cloud:
# - research: wants local sync at ~/Documents/research
# - work: wants local sync at ~/work-notes
# - temp: cloud-only, no local sync needed
bm project add research --local-path ~/Documents/research
bm project add work --local-path ~/work-notes
bm project add temp  # No local sync
# Now you can sync individually (after initial --resync):
bm project bisync --name research
bm project bisync --name work
# temp stays cloud-only
```
**What happens under the covers:**
- Config stores `cloud_projects` dict mapping project names to local paths
- Each project gets its own bisync state in `~/.basic-memory/bisync-state/{project}/`
- Rclone syncs using single remote: `basic-memory-cloud`
- Projects can live anywhere on your filesystem, not forced into sync directory
## Quick Start
### 1. Enable Cloud Mode
Authenticate and enable cloud mode:
```bash
bm cloud login
```
**What this does:**
1. Opens browser to Basic Memory Cloud authentication page
2. Stores authentication token in `~/.basic-memory/auth/token`
3. **Enables cloud mode** - all CLI commands now work against cloud
4. Validates your subscription status
**Result:** All `bm project`, `bm tools` commands now work with cloud.
### 2. Set Up Sync
Install rclone and configure credentials:
```bash
bm cloud setup
```
**What this does:**
1. Installs rclone automatically (if needed)
2. Fetches your tenant information from cloud
3. Generates scoped S3 credentials for sync
4. Configures single rclone remote: `basic-memory-cloud`
**Result:** You're ready to sync projects. No sync directories created yet - those come with project setup.
### 3. Add Projects with Sync
Create projects with optional local sync paths:
```bash
# Create cloud project without local sync
bm project add research
# Create cloud project WITH local sync
bm project add research --local-path ~/Documents/research
# Or configure sync for existing project
bm project sync-setup research ~/Documents/research
```
**What happens under the covers:**
When you add a project with `--local-path`:
1. Project created on cloud at `/app/data/research`
2. Local path stored in config: `cloud_projects.research.local_path = "~/Documents/research"`
3. Local directory created if it doesn't exist
4. Bisync state directory created at `~/.basic-memory/bisync-state/research/`
**Result:** Project is ready to sync, but no files synced yet.
### 4. Sync Your Project
Establish the initial sync baseline. **Best practice:** Always preview with `--dry-run` first:
```bash
# Step 1: Preview the initial sync (recommended)
bm project bisync --name research --resync --dry-run
# Step 2: If all looks good, run the actual sync
bm project bisync --name research --resync
```
**What happens under the covers:**
1. Rclone reads from `~/Documents/research` (local)
2. Connects to `basic-memory-cloud:bucket-name/app/data/research` (remote)
3. Creates bisync state files in `~/.basic-memory/bisync-state/research/`
4. Syncs files bidirectionally with settings:
   - `conflict_resolve=newer` (most recent wins)
   - `max_delete=25` (safety limit)
   - Respects `.bmignore` patterns
**Result:** Local and cloud are in sync. Baseline established.
**Why `--resync`?** This is an rclone requirement for the first bisync run. It establishes the initial state that future syncs will compare against. After the first sync, never use `--resync` unless you need to force a new baseline.
See: https://rclone.org/bisync/#resync
```
--resync
This will effectively make both Path1 and Path2 filesystems contain a matching superset of all files. By default, Path2 files that do not exist in Path1 will be copied to Path1, and the process will then copy the Path1 tree to Path2.
```
### 5. Subsequent Syncs
After the first sync, just run bisync without `--resync`:
```bash
bm project bisync --name research
```
**What happens:**
1. Rclone compares local and cloud states
2. Syncs changes in both directions
3. Auto-resolves conflicts (newer file wins)
4. Updates `last_sync` timestamp in config
**Result:** Changes flow both ways - edit locally or in cloud, both stay in sync.
### 6. Verify Setup
Check status:
```bash
bm cloud status
```
You should see:
- `Mode: Cloud (enabled)`
- `Cloud instance is healthy`
- Instructions for project sync commands
## Working with Projects
### Understanding Project Commands
**Key concept:** When cloud mode is enabled, use regular `bm project` commands (not `bm cloud project`).
```bash
# In cloud mode:
bm project list              # Lists cloud projects
bm project add research      # Creates cloud project
# In local mode:
bm project list              # Lists local projects
bm project add research ~/Documents/research  # Creates local project
```
### Creating Projects
**Use case 1: Cloud-only project (no local sync)**
```bash
bm project add temp-notes
```
**What this does:**
- Creates project on cloud at `/app/data/temp-notes`
- No local directory created
- No sync configuration
**Result:** Project exists on cloud, accessible via MCP tools, but no local copy.
**Use case 2: Cloud project with local sync**
```bash
bm project add research --local-path ~/Documents/research
```
**What this does:**
- Creates project on cloud at `/app/data/research`
- Creates local directory `~/Documents/research`
- Stores sync config in `~/.basic-memory/config.json`
- Prepares for bisync (but doesn't sync yet)
**Result:** Project ready to sync. Run `bm project bisync --name research --resync` to establish baseline.
**Use case 3: Add sync to existing cloud project**
```bash
# Project already exists on cloud
bm project sync-setup research ~/Documents/research
```
**What this does:**
- Updates existing project's sync configuration
- Creates local directory
- Prepares for bisync
**Result:** Existing cloud project now has local sync path. Run bisync to pull files down.
### Listing Projects
View all projects:
```bash
bm project list
```
**What you see:**
- All projects in cloud (when cloud mode enabled)
- Default project marked
- Project paths shown
**Future:** Will show sync status (synced/not synced, last sync time).
## File Synchronization
### Understanding the Sync Commands
**There are three sync-related commands:**
1. `bm project sync` - One-way: local → cloud (make cloud match local)
2. `bm project bisync` - Two-way: local ↔ cloud (recommended)
3. `bm project check` - Verify files match (no changes)
### One-Way Sync: Local → Cloud
**Use case:** You made changes locally and want to push to cloud (overwrite cloud).
```bash
bm project sync --name research
```
**What happens:**
1. Reads files from `~/Documents/research` (local)
2. Uses rclone sync to make cloud identical to local
3. Respects `.bmignore` patterns
4. Shows progress bar
**Result:** Cloud now matches local exactly. Any cloud-only changes are overwritten.
**When to use:**
- You know local is the source of truth
- You want to force cloud to match local
- You don't care about cloud changes
### Two-Way Sync: Local ↔ Cloud (Recommended)
**Use case:** You edit files both locally and in cloud UI, want both to stay in sync.
```bash
# First time - establish baseline
bm project bisync --name research --resync
# Subsequent syncs
bm project bisync --name research
```
**What happens:**
1. Compares local and cloud states using bisync metadata
2. Syncs changes in both directions
3. Auto-resolves conflicts (newer file wins)
4. Detects excessive deletes and fails safely (max 25 files)
**Conflict resolution example:**
```bash
# Edit locally
echo "Local change" > ~/Documents/research/notes.md
# Edit same file in cloud UI
# Cloud now has: "Cloud change"
# Run bisync
bm project bisync --name research
# Result: Newer file wins (based on modification time)
# If cloud was more recent, cloud version kept
# If local was more recent, local version kept
```
**When to use:**
- Default workflow for most users
- You edit in multiple places
- You want automatic conflict resolution
### Verify Sync Integrity
**Use case:** Check if local and cloud match without making changes.
```bash
bm project check --name research
```
**What happens:**
1. Compares file checksums between local and cloud
2. Reports differences
3. No files transferred
**Result:** Shows which files differ. Run bisync to sync them.
```bash
# One-way check (faster)
bm project check --name research --one-way
```
### Preview Changes (Dry Run)
**Use case:** See what would change without actually syncing.
```bash
bm project bisync --name research --dry-run
```
**What happens:**
1. Runs bisync logic
2. Shows what would be transferred/deleted
3. No actual changes made
**Result:** Safe preview of sync operations.
### Advanced: List Remote Files
**Use case:** See what files exist on cloud without syncing.
```bash
# List all files in project
bm project ls --name research
# List files in subdirectory
bm project ls --name research --path subfolder
```
**What happens:**
1. Connects to cloud via rclone
2. Lists files in remote project path
3. No files transferred
**Result:** See cloud file listing.
## Multiple Projects
### Syncing Multiple Projects
**Use case:** You have several projects with local sync, want to sync all at once.
```bash
# Setup multiple projects
bm project add research --local-path ~/Documents/research
bm project add work --local-path ~/work-notes
bm project add personal --local-path ~/personal
# Establish baselines
bm project bisync --name research --resync
bm project bisync --name work --resync
bm project bisync --name personal --resync
# Daily workflow: sync everything
bm project bisync --name research
bm project bisync --name work
bm project bisync --name personal
```
**Future:** `--all` flag will sync all configured projects:
```bash
bm project bisync --all  # Coming soon
```
### Mixed Usage
**Use case:** Some projects sync, some stay cloud-only.
```bash
# Projects with sync
bm project add research --local-path ~/Documents/research
bm project add work --local-path ~/work
# Cloud-only projects
bm project add archive
bm project add temp-notes
# Sync only the configured ones
bm project bisync --name research
bm project bisync --name work
# Archive and temp-notes stay cloud-only
```
**Result:** Fine-grained control over what syncs.
## Disable Cloud Mode
Return to local mode:
```bash
bm cloud logout
```
**What this does:**
1. Disables cloud mode in config
2. All commands now work locally
3. Auth token remains (can re-enable with login)
**Result:** All `bm` commands work with local projects again.
## Filter Configuration
### Understanding .bmignore
**The problem:** You don't want to sync everything (e.g., `.git`, `node_modules`, database files).
**The solution:** `.bmignore` file with gitignore-style patterns.
**Location:** `~/.basic-memory/.bmignore`
**Default patterns:**
```gitignore
# Version control
.git/**
# Python
__pycache__/**
*.pyc
.venv/**
venv/**
# Node.js
node_modules/**
# Basic Memory internals
memory.db/**
memory.db-shm/**
memory.db-wal/**
config.json/**
watch-status.json/**
.bmignore.rclone/**
# OS files
.DS_Store/**
Thumbs.db/**
# Environment files
.env/**
.env.local/**
```
**How it works:**
1. On first sync, `.bmignore` created with defaults
2. Patterns converted to rclone filter format (`.bmignore.rclone`)
3. Rclone uses filters during sync
4. Same patterns used by all projects
**Customizing:**
```bash
# Edit patterns
code ~/.basic-memory/.bmignore
# Add custom patterns
echo "*.tmp/**" >> ~/.basic-memory/.bmignore
# Next sync uses updated patterns
bm project bisync --name research
```
## Troubleshooting
### Authentication Issues
**Problem:** "Authentication failed" or "Invalid token"
**Solution:** Re-authenticate:
```bash
bm cloud logout
bm cloud login
```
### Subscription Issues
**Problem:** "Subscription Required" error
**Solution:**
1. Visit subscribe URL shown in error
2. Sign up for subscription
3. Run `bm cloud login` again
**Note:** Access is immediate when subscription becomes active.
### Bisync Initialization
**Problem:** "First bisync requires --resync"
**Explanation:** Bisync needs a baseline state before it can sync changes.
**Solution:**
```bash
bm project bisync --name research --resync
```
**What this does:**
- Establishes initial sync state
- Creates baseline in `~/.basic-memory/bisync-state/research/`
- Syncs all files bidirectionally
**Result:** Future syncs work without `--resync`.
### Empty Directory Issues
**Problem:** "Empty prior Path1 listing. Cannot sync to an empty directory"
**Explanation:** Rclone bisync doesn't work well with completely empty directories. It needs at least one file to establish a baseline.
**Solution:** Add at least one file before running `--resync`:
```bash
# Create a placeholder file
echo "# Research Notes" > ~/Documents/research/README.md
# Now run bisync
bm project bisync --name research --resync
```
**Why this happens:** Bisync creates listing files that track the state of each side. When both directories are completely empty, these listing files are considered invalid by rclone.
**Best practice:** Always have at least one file (like a README.md) in your project directory before setting up sync.
### Bisync State Corruption
**Problem:** Bisync fails with errors about corrupted state or listing files
**Explanation:** Sometimes bisync state can become inconsistent (e.g., after mixing dry-run and actual runs, or after manual file operations).
**Solution:** Clear bisync state and re-establish baseline:
```bash
# Clear bisync state
bm project bisync-reset research
# Re-establish baseline
bm project bisync --name research --resync
```
**What this does:**
- Removes all bisync metadata from `~/.basic-memory/bisync-state/research/`
- Forces fresh baseline on next `--resync`
- Safe operation (doesn't touch your files)
**Note:** This command also runs automatically when you remove a project to clean up state directories.
### Too Many Deletes
**Problem:** "Error: max delete limit (25) exceeded"
**Explanation:** Bisync detected you're about to delete more than 25 files. This is a safety check to prevent accidents.
**Solution 1:** Review what you're deleting, then force resync:
```bash
# Check what would be deleted
bm project bisync --name research --dry-run
# If correct, establish new baseline
bm project bisync --name research --resync
```
**Solution 2:** Use one-way sync if you know local is correct:
```bash
bm project sync --name research
```
### Project Not Configured for Sync
**Problem:** "Project research has no local_sync_path configured"
**Explanation:** Project exists on cloud but has no local sync path.
**Solution:**
```bash
bm project sync-setup research ~/Documents/research
bm project bisync --name research --resync
```
### Connection Issues
**Problem:** "Cannot connect to cloud instance"
**Solution:** Check status:
```bash
bm cloud status
```
If instance is down, wait a few minutes and retry.
## Security
- **Authentication**: OAuth 2.1 with PKCE flow
- **Tokens**: Stored securely in `~/.basic-memory/basic-memory-cloud.json`
- **Transport**: All data encrypted in transit (HTTPS)
- **Credentials**: Scoped S3 credentials (read-write to your tenant only)
- **Isolation**: Your data isolated from other tenants
- **Ignore patterns**: Sensitive files automatically excluded via `.bmignore`
## Command Reference
### Cloud Mode Management
```bash
bm cloud login              # Authenticate and enable cloud mode
bm cloud logout             # Disable cloud mode
bm cloud status             # Check cloud mode and instance health
```
### Setup
```bash
bm cloud setup              # Install rclone and configure credentials
```
### Project Management
When cloud mode is enabled:
```bash
bm project list                           # List cloud projects
bm project add <name>                     # Create cloud project (no sync)
bm project add <name> --local-path <path> # Create with local sync
bm project sync-setup <name> <path>       # Add sync to existing project
bm project rm <name>                      # Delete project
```
### File Synchronization
```bash
# One-way sync (local → cloud)
bm project sync --name <project>
bm project sync --name <project> --dry-run
bm project sync --name <project> --verbose
# Two-way sync (local ↔ cloud) - Recommended
bm project bisync --name <project>          # After first --resync
bm project bisync --name <project> --resync # First time / force baseline
bm project bisync --name <project> --dry-run
bm project bisync --name <project> --verbose
# Integrity check
bm project check --name <project>
bm project check --name <project> --one-way
# List remote files
bm project ls --name <project>
bm project ls --name <project> --path <subpath>
```
## Summary
**Basic Memory Cloud uses project-scoped sync:**
1. **Enable cloud mode** - `bm cloud login`
2. **Install rclone** - `bm cloud setup`
3. **Add projects with sync** - `bm project add research --local-path ~/Documents/research`
4. **Preview first sync** - `bm project bisync --name research --resync --dry-run`
5. **Establish baseline** - `bm project bisync --name research --resync`
6. **Daily workflow** - `bm project bisync --name research`
**Key benefits:**
- ✅ Each project independently syncs (or doesn't)
- ✅ Projects can live anywhere on disk
- ✅ Explicit sync operations (no magic)
- ✅ Safe by design (max delete limits, conflict resolution)
- ✅ Full offline access (work locally, sync when ready)
**Future enhancements:**
- `--all` flag to sync all configured projects
- Project list showing sync status
- Watch mode for automatic sync
```
--------------------------------------------------------------------------------
/.claude/commands/test-live.md:
--------------------------------------------------------------------------------
```markdown
# /project:test-live - Live Basic Memory Testing Suite
Execute comprehensive real-world testing of Basic Memory using the installed version. 
All test results are recorded as notes in a dedicated test project.
## Usage
```
/project:test-live [phase]
```
**Parameters:**
- `phase` (optional): Specific test phase to run (`recent`, `core`, `features`, `edge`, `workflows`, `stress`, or `all`)
- `recent` - Focus on recent changes and new features (recommended for regular testing)
- `core` - Essential tools only (Tier 1: write_note, read_note, search_notes, edit_note, list_memory_projects, recent_activity)
- `features` - Core + important workflows (Tier 1 + Tier 2)
- `all` - Comprehensive testing of all tools and scenarios
## Implementation
You are an expert QA engineer conducting live testing of Basic Memory. 
When the user runs `/project:test-live`, execute comprehensive test plan:
## Tool Testing Priority
### **Tier 1: Critical Core (Always Test)**
1. **write_note** - Foundation of all knowledge creation
2. **read_note** - Primary knowledge retrieval mechanism
3. **search_notes** - Essential for finding information
4. **edit_note** - Core content modification capability
5. **list_memory_projects** - Project discovery and session guidance
6. **recent_activity** - Project discovery mode and activity analysis
### **Tier 2: Important Workflows (Usually Test)**
7. **build_context** - Conversation continuity via memory:// URLs
8. **create_memory_project** - Essential for project setup
9. **move_note** - Knowledge organization
10. **sync_status** - Understanding system state
11. **delete_project** - Project lifecycle management
### **Tier 3: Enhanced Functionality (Sometimes Test)**
12. **view_note** - Claude Desktop artifact display
13. **read_content** - Raw content access
14. **delete_note** - Content removal
15. **list_directory** - File system exploration
16. **edit_note** (advanced modes) - Complex find/replace operations
### **Tier 4: Specialized (Rarely Test)**
17. **canvas** - Obsidian visualization (specialized use case)
18. **MCP Prompts** - Enhanced UX tools (ai_assistant_guide, continue_conversation)
## Stateless Architecture Testing
### **Project Discovery Workflow (CRITICAL)**
Test the new stateless project selection flow:
1. **Initial Discovery**
   - Call `list_memory_projects()` without knowing which project to use
   - Verify clear session guidance appears: "Next: Ask which project to use"
   - Confirm removal of CLI-specific references
2. **Activity-Based Discovery**
   - Call `recent_activity()` without project parameter (discovery mode)
   - Verify intelligent project suggestions based on activity
   - Test guidance: "Should I use [most-active-project] for this task?"
3. **Session Tracking Validation**
   - Verify all tool responses include `[Session: Using project 'name']`
   - Confirm guidance reminds about session-wide project tracking
4. **Single Project Constraint Mode**
   - Test MCP server with `--project` parameter
   - Verify all operations constrained to specified project
   - Test project override behavior in constrained mode
### **Explicit Project Parameters (CRITICAL)**
All tools must require explicit project parameters:
1. **Parameter Validation**
   - Test all Tier 1 tools require `project` parameter
   - Verify clear error messages for missing project
   - Test invalid project name handling
2. **No Session State Dependencies**
   - Confirm no tool relies on "current project" concept
   - Test rapid project switching within conversation
   - Verify each call is truly independent
### Pre-Test Setup
1. **Environment Verification**
   - Verify basic-memory is installed and accessible via MCP
   - Check version and confirm it's the expected release
   - Test MCP connection and tool availability
2. **Recent Changes Analysis** (if phase includes 'recent' or 'all')
   - Run `git log --oneline -20` to examine recent commits
   - Identify new features, bug fixes, and enhancements
   - Generate targeted test scenarios for recent changes
   - Prioritize regression testing for recently fixed issues
3. **Test Project Creation**
Run the bash `date` command to get the current date/time. 
   ```
   Create project: "basic-memory-testing-[timestamp]"
   Location: ~/basic-memory-testing-[timestamp]
   Purpose: Record all test observations and results
   ```
Make sure to use the newly created project for all subsequent test operations by specifying it in the `project` parameter of each tool call.
4. **Baseline Documentation**
   Create initial test session note with:
   - Test environment details
   - Version being tested
   - Recent changes identified (if applicable)
   - Test objectives and scope
   - Start timestamp
### Phase 0: Recent Changes Validation (if 'recent' or 'all' phase)
Based on recent commit analysis, create targeted test scenarios:
**Recent Changes Test Protocol:**
1. **Feature Addition Tests** - For each new feature identified:
   - Test basic functionality
   - Test integration with existing tools
   - Verify documentation accuracy
   - Test edge cases and error handling
2. **Bug Fix Regression Tests** - For each recent fix:
   - Recreate the original problem scenario
   - Verify the fix works as expected
   - Test related functionality isn't broken
   - Document the verification in test notes
3. **Performance/Enhancement Validation** - For optimizations:
   - Establish baseline timing
   - Compare with expected improvements
   - Test under various load conditions
   - Document performance observations
**Example Recent Changes (Update based on actual git log):**
- Watch Service Restart (#156): Test project creation → file modification → automatic restart
- Cross-Project Moves (#161): Test move_note with cross-project detection
- Docker Environment Support (#174): Test BASIC_MEMORY_HOME behavior
- MCP Server Logging (#164): Verify log level configurations
### Phase 1: Core Functionality Validation (Tier 1 Tools)
Test essential MCP tools that form the foundation of Basic Memory:
**1. write_note Tests (Critical):**
- ✅ Basic note creation with frontmatter
- ✅ Special characters and Unicode in titles
- ✅ Various content types (lists, headings, code blocks)
- ✅ Empty notes and minimal content edge cases
- ⚠️ Error handling for invalid parameters
**2. read_note Tests (Critical):**
- ✅ Read by title, permalink, memory:// URLs
- ✅ Non-existent notes (error handling)
- ✅ Notes with complex markdown formatting
- ⚠️ Performance with large notes (>10MB)
**3. search_notes Tests (Critical):**
- ✅ Simple text queries across content
- ✅ Tag-based searches with multiple tags
- ✅ Boolean operators (AND, OR, NOT)
- ✅ Empty/no results scenarios
- ⚠️ Performance with 100+ notes
**4. edit_note Tests (Critical):**
- ✅ Append operations preserving frontmatter
- ✅ Prepend operations
- ✅ Find/replace with validation
- ✅ Section replacement under headers
- ⚠️ Error scenarios (invalid operations)
**5. list_memory_projects Tests (Critical):**
- ✅ Display all projects with clear session guidance
- ✅ Project discovery workflow prompts
- ✅ Removal of CLI-specific references
- ✅ Empty project list handling
- ✅ Single project constraint mode display
**6. recent_activity Tests (Critical - Discovery Mode):**
- ✅ Discovery mode without project parameter
- ✅ Intelligent project suggestions based on activity
- ✅ Guidance prompts for project selection
- ✅ Session tracking reminders in responses
- ⚠️ Performance with multiple projects
### Phase 2: Important Workflows (Tier 2 Tools)
**7. build_context Tests (Important):**
- ✅ Different depth levels (1, 2, 3+)
- ✅ Various timeframes for context
- ✅ memory:// URL navigation
- ⚠️ Performance with complex relation graphs
**8. create_memory_project Tests (Important):**
- ✅ Create projects dynamically
- ✅ Set default during creation
- ✅ Path validation and creation
- ⚠️ Invalid paths and names
- ✅ Integration with existing projects
**9. move_note Tests (Important):**
- ✅ Move within same project
- ✅ Cross-project moves with detection (#161)
- ✅ Automatic folder creation
- ✅ Database consistency validation
- ⚠️ Special characters in paths
**10. sync_status Tests (Important):**
- ✅ Background operation monitoring
- ✅ File synchronization status
- ✅ Project sync state reporting
- ⚠️ Error state handling
### Phase 3: Enhanced Functionality (Tier 3 Tools)
**11. view_note Tests (Enhanced):**
- ✅ Claude Desktop artifact display
- ✅ Title extraction from frontmatter
- ✅ Unicode and emoji content rendering
- ⚠️ Error handling for non-existent notes
**12. read_content Tests (Enhanced):**
- ✅ Raw file content access
- ✅ Binary file handling
- ✅ Image file reading
- ⚠️ Large file performance
**13. delete_note Tests (Enhanced):**
- ✅ Single note deletion
- ✅ Database consistency after deletion
- ⚠️ Non-existent note handling
- ✅ Confirmation of successful deletion
**14. list_directory Tests (Enhanced):**
- ✅ Directory content listing
- ✅ Depth control and filtering
- ✅ File name globbing
- ⚠️ Empty directory handling
**15. delete_project Tests (Enhanced):**
- ✅ Project removal from config
- ✅ Database cleanup
- ⚠️ Default project protection
- ⚠️ Non-existent project handling
### Phase 4: Edge Case Exploration
**Boundary Testing:**
- Very long titles and content (stress limits)
- Empty projects and notes
- Unicode, emojis, special symbols
- Deeply nested folder structures
- Circular relations and self-references
- Maximum relation depths
**Error Scenarios:**
- Invalid memory:// URLs
- Missing files referenced in database
- Invalid project names and paths
- Malformed note structures
- Concurrent operation conflicts
**Performance Testing:**
- Create 100+ notes rapidly
- Complex search queries
- Deep relation chains (5+ levels)
- Rapid successive operations
- Memory usage monitoring
### Phase 5: Real-World Workflow Scenarios
**Meeting Notes Pipeline:**
1. Create meeting notes with action items
2. Extract action items using edit_note
3. Build relations to project documents
4. Update progress incrementally
5. Search and track completion
**Research Knowledge Building:**
1. Create research topic hierarchy
2. Build complex relation networks
3. Add incremental findings over time
4. Search for connections and patterns
5. Reorganize as knowledge evolves
**Multi-Project Workflow:**
1. Technical documentation project
2. Personal recipe collection project
3. Learning/course notes project
4. Specify different projects for different operations
5. Cross-reference related concepts
**Content Evolution:**
1. Start with basic notes
2. Enhance with relations and observations
3. Reorganize file structure using moves
4. Update content with edit operations
5. Validate knowledge graph integrity
### Phase 6: Specialized Tools Testing (Tier 4)
**16. canvas Tests (Specialized):**
- ✅ JSON Canvas generation
- ✅ Node and edge creation
- ✅ Obsidian compatibility
- ⚠️ Complex graph handling
**17. MCP Prompts Tests (Specialized):**
- ✅ ai_assistant_guide output
- ✅ continue_conversation functionality
- ✅ Formatted search results
- ✅ Enhanced activity reports
### Phase 7: Integration & File Watching Tests
**File System Integration:**
- ✅ Watch service behavior with file changes
- ✅ Project creation → watch restart (#156)
- ✅ Multi-project synchronization
- ⚠️ MCP→API→DB→File stack validation
**Real Integration Testing:**
- ✅ End-to-end file watching vs manual operations
- ✅ Cross-session persistence
- ✅ Database consistency across operations
- ⚠️ Performance under real file system changes
### Phase 8: Creative Stress Testing
**Creative Exploration:**
- Rapid project creation/switching patterns
- Unusual but valid markdown structures
- Creative observation categories
- Novel relation types and patterns
- Unexpected tool combinations
**Stress Scenarios:**
- Bulk operations (many notes quickly)
- Complex nested moves and edits
- Deep context building
- Complex boolean search expressions
- Resource constraint testing
## Test Execution Guidelines
### Quick Testing (core/features phases)
- Focus on Tier 1 tools (core) or Tier 1+2 (features)
- Test essential functionality and common edge cases
- Record critical issues immediately
- Complete in 15-20 minutes
### Comprehensive Testing (all phase)
- Cover all tiers systematically
- Include specialized tools and stress testing
- Document performance baselines
- Complete in 45-60 minutes
### Recent Changes Focus (recent phase)
- Analyze git log for recent commits
- Generate targeted test scenarios
- Focus on regression testing for fixes
- Validate new features thoroughly
## Test Observation Format
Record ALL observations immediately as Basic Memory notes:
```markdown
---
title: Test Session [Phase] YYYY-MM-DD HH:MM
tags: [testing, v0.13.0, live-testing, [phase]]
permalink: test-session-[phase]-[timestamp]
---
# Test Session [Phase] - [Date/Time]
## Environment
- Basic Memory version: [version]
- MCP connection: [status]
- Test project: [name]
- Phase focus: [description]
## Test Results
### ✅ Successful Operations
- [timestamp] ✅ write_note: Created note with emoji title 📝 #tier1 #functionality
- [timestamp] ✅ search_notes: Boolean query returned 23 results in 0.4s #tier1 #performance  
- [timestamp] ✅ edit_note: Append operation preserved frontmatter #tier1 #reliability
### ⚠️ Issues Discovered
- [timestamp] ⚠️ move_note: Slow with deep folder paths (2.1s) #tier2 #performance
- [timestamp] 🚨 search_notes: Unicode query returned unexpected results #tier1 #bug #critical
- [timestamp] ⚠️ build_context: Context lost for memory:// URLs #tier2 #issue
### 🚀 Enhancements Identified
- edit_note could benefit from preview mode #ux-improvement
- search_notes needs fuzzy matching for typos #feature-idea
- move_note could auto-suggest folder creation #usability
### 📊 Performance Metrics
- Average write_note time: 0.3s
- Search with 100+ notes: 0.6s
- Project parameter overhead: <0.1s
- Memory usage: [observed levels]
## Relations
- tests [[Basic Memory v0.13.0]]
- part_of [[Live Testing Suite]]
- found_issues [[Bug Report: Unicode Search]]
- discovered [[Performance Optimization Opportunities]]
```
## Quality Assessment Areas
**User Experience & Usability:**
- Tool instruction clarity and examples
- Error message actionability
- Response time acceptability
- Tool consistency and discoverability
- Learning curve and intuitiveness
**System Behavior:**
- Stateless operation independence
- memory:// URL navigation reliability
- Multi-step workflow cohesion
- Edge case graceful handling
- Recovery from user errors
**Documentation Alignment:**
- Tool output clarity and helpfulness
- Behavior vs. documentation accuracy
- Example validity and usefulness
- Real-world vs. documented workflows
**Mental Model Validation:**
- Natural user expectation alignment
- Surprising behavior identification
- Mistake recovery ease
- Knowledge graph concept naturalness
**Performance & Reliability:**
- Operation completion times
- Consistency across sessions
- Scaling behavior with growth
- Unexpected slowness identification
## Error Documentation Protocol
For each error discovered:
1. **Immediate Recording**
   - Create dedicated error note
   - Include exact reproduction steps
   - Capture error messages verbatim
   - Note system state when error occurred
2. **Error Note Format**
   ```markdown
   ---
   title: Bug Report - [Short Description]
   tags: [bug, testing, v0.13.0, [severity]]
   ---
   
   # Bug Report: [Description]
   
   ## Reproduction Steps
   1. [Exact steps to reproduce]
   2. [Include all parameters used]
   3. [Note any special conditions]
   
   ## Expected Behavior
   [What should have happened]
   
   ## Actual Behavior  
   [What actually happened]
   
   ## Error Messages
   ```
   [Exact error text]
   ```
   
   ## Environment
   - Version: [version]
   - Project: [name]
   - Timestamp: [when]
   
   ## Severity
   - [ ] Critical (blocks major functionality)
   - [ ] High (impacts user experience)
   - [ ] Medium (workaround available)
   - [ ] Low (minor inconvenience)
   
   ## Relations
   - discovered_during [[Test Session [Phase]]]
   - affects [[Feature Name]]
   ```
## Success Metrics Tracking
**Quantitative Measures:**
- Test scenario completion rate
- Bug discovery count with severity
- Performance benchmark establishment
- Tool coverage completeness
**Qualitative Measures:**
- Conversation flow naturalness
- Knowledge graph quality
- User experience insights
- System reliability assessment
## Test Execution Flow
1. **Setup Phase** (5 minutes)
   - Verify environment and create test project
   - Record baseline system state
   - Establish performance benchmarks
2. **Core Testing** (15-20 minutes per phase)
   - Execute test scenarios systematically
   - Record observations immediately
   - Note timestamps for performance tracking
   - Explore variations when interesting behaviors occur
3. **Documentation** (5 minutes per phase)
   - Create phase summary note
   - Link related test observations
   - Update running issues list
   - Record enhancement ideas
4. **Analysis Phase** (10 minutes)
   - Review all observations across phases
   - Identify patterns and trends
   - Create comprehensive summary report
   - Generate development recommendations
## Testing Success Criteria
### Core Testing (Tier 1) - Must Pass
- All 6 critical tools function correctly
- No critical bugs in essential workflows
- Acceptable performance for basic operations
- Error handling works as expected
### Feature Testing (Tier 1+2) - Should Pass
- All 11 core + important tools function
- Workflow scenarios complete successfully
- Performance meets baseline expectations
- Integration points work correctly
### Comprehensive Testing (All Tiers) - Complete Coverage
- All tools tested across all scenarios
- Edge cases and stress testing completed
- Performance baselines established
- Full documentation of issues and enhancements
## Expected Outcomes
**System Validation:**
- Feature verification prioritized by tier importance
- Recent changes validated for regression
- Performance baseline establishment
- Bug identification with severity assessment
**Knowledge Base Creation:**
- Prioritized testing documentation
- Real usage examples for user guides
- Recent changes validation records
- Performance insights for optimization
**Development Insights:**
- Tier-based bug priority list
- Recent changes impact assessment
- Enhancement ideas from real usage
- User experience improvement areas
## Post-Test Deliverables
1. **Test Summary Note**
   - Overall results and findings
   - Critical issues requiring immediate attention
   - Enhancement opportunities discovered
   - System readiness assessment
2. **Bug Report Collection**
   - All discovered issues with reproduction steps
   - Severity and impact assessments
   - Suggested fixes where applicable
3. **Performance Baseline**
   - Timing data for all operations
   - Scaling behavior observations
   - Resource usage patterns
4. **UX Improvement Recommendations**
   - Usability enhancement suggestions
   - Documentation improvement areas
   - Tool design optimization ideas
5. **Updated TESTING.md**
   - Incorporate new test scenarios discovered
   - Update based on real execution experience
   - Add performance benchmarks and targets
## Context
- Uses real installed basic-memory version 
- Tests complete MCP→API→DB→File stack
- Creates living documentation in Basic Memory itself
- Follows integration over isolation philosophy
- Prioritizes testing by tool importance and usage frequency
- Adapts to recent development changes dynamically
- Focuses on real usage patterns over checklist validation
- Generates actionable insights prioritized by impact
```
--------------------------------------------------------------------------------
/specs/SPEC-6 Explicit Project Parameter Architecture.md:
--------------------------------------------------------------------------------
```markdown
---
title: 'SPEC-6: Explicit Project Parameter Architecture'
type: spec
permalink: specs/spec-6-explicit-project-parameter-architecture
tags:
- architecture
- mcp
- project-management
- stateless
---
# SPEC-6: Explicit Project Parameter Architecture
## Why
The current session-based project management system has critical reliability issues:
1. **Session State Fragility**: Claude iOS mobile client fails to maintain consistent session IDs across MCP tool calls, causing project switching to silently fail (Issue #74)
2. **Scaling Limitations**: Redis-backed session state creates single-point-of-failure and prevents horizontal scaling
3. **Client Compatibility**: Session tracking works inconsistently across different MCP clients (web, mobile, API)
4. **Hidden Complexity**: Users cannot see or understand "current project" state, leading to confusion when operations execute in wrong projects
5. **Silent Failures**: Operations appear successful but execute in unintended projects, risking data integrity
Evidence from production logs shows each MCP tool call from mobile client receives different session IDs:
```
create_memory_project: session_id=12cdfc24913b48f8b680ed4b2bfdb7ba
switch_project:       session_id=050a69275d98498cbdd227cdb74d9740
list_directory:       session_id=85f3483014af4136a5d435c76ded212f
```
Related Github issue: https://github.com/basicmachines-co/basic-memory-cloud/issues/75
## Status
**Current Status**: **ALL PHASES COMPLETE** ✅ **PRODUCTION DEPLOYED**
**Target**: Fix Claude iOS session ID consistency issues ✅ **ACHIEVED**
**Draft PR**: https://github.com/basicmachines-co/basic-memory/pull/298 ✅ **MERGED & DEPLOYED**
### 🎉 **COMPLETE SUCCESS - PRODUCTION READY**
**ALL PHASES OF SPEC-6 IMPLEMENTATION COMPLETE!** The stateless architecture has been successfully implemented across both Basic Memory core and Basic Memory Cloud, representing a **fundamental architectural improvement** that completely solves the Claude iOS compatibility issue while providing superior scalability and reliability.
#### Implementation Summary:
- **16 files modified** with 582 additions and 550 deletions
- **All 17 MCP tools** converted to stateless architecture
- **147 tests updated** across 5 test files (100% passing)
- **Complete session state removal** from core MCP tools
- **Enhanced error handling** and security validations preserved
### Progress Summary
✅ **Complete Stateless Architecture Implementation (All 17 tools)** - **PRODUCTION DEPLOYED**
- Stateless `get_active_project()` function implemented and deployed ✅
- All session state dependencies removed across entire MCP server ✅
- All MCP tools require explicit `project` parameter as first argument ✅
- **Cloud Service**: Redis removed, stateless HTTP enabled ✅
- **Production Validation**: Comprehensive testing completed with 100% success ✅
✅ **Content Management Tools Complete (6/6 tools)**
- `write_note`, `read_note`, `delete_note`, `edit_note` ✅
- `view_note`, `read_content` ✅
✅ **Knowledge Graph Navigation Tools Complete (3/3 tools)**
- `build_context`, `recent_activity`, `list_directory` ✅
✅ **Search & Discovery Tools Complete (1/1 tools)**
- `search_notes` ✅
✅ **Visualization Tools Complete (1/1 tools)**
- `canvas` ✅
✅ **Project Management Cleanup Complete**
- Removed `switch_project` and `get_current_project` tools ✅
- Updated `set_default_project` to remove activate parameter ✅
✅ **Comprehensive Testing Complete (157 tests)**
- All test suites updated to use stateless architecture (147 existing tests)
- Single project constraint mode integration tests (10 new tests)
- 100% test pass rate across all tool test files
- Security validations preserved and working
- Error handling comprehensive and user-friendly
✅ **Documentation & Examples Complete**
- All tool docstrings updated with stateless examples
- Project parameter usage clearly documented
- Error handling and security behavior documented
✅ **Enhanced Discovery Mode Complete**
- `recent_activity` tool supports dual-mode operation (discovery vs project-specific)
- ProjectActivitySummary schema provides cross-project insights
- Recent activity prompt updated to support both modes
- Comprehensive project distribution statistics and most active project tracking
✅ **Single Project Constraint Mode Complete**
- `--project` CLI parameter for MCP server constraint
- Environment variable control (`BASIC_MEMORY_MCP_PROJECT`)
- Automatic project override in `get_active_project()` function
- Project management tools disabled in constrained mode with helpful CLI guidance
- Comprehensive integration test suite (10 tests covering all constraint scenarios)
## What
Transform Basic Memory from stateful session-based to stateless explicit project parameter architecture:
### Core Changes
1. **Mandatory Project Parameter**: All MCP tools require explicit `project` parameter
2. **Remove Session State**: Eliminate Redis, session middleware, and `switch_project` tool
3. **Stateless HTTP**: Enable `stateless_http=True` for horizontal scaling
4. **Enhanced Context Discovery**: Improve `recent_activity` to show project distribution
5. **Clear Response Format**: All tool responses display target project information
Implementation Approach                                                                    
- Each tool will directly accept the project parameter                             
- Remove all calls to context-based project retrieval                              
- Validate project exists before operations                                        
- Clear error messages when project not found                                      
- Backward compatibility: Initially keep optional parameter, then make required  
### Affected MCP Tools
**Content Management** (require project parameter):
- `write_note(project, title, content, folder)`
- `read_note(project, identifier)`
- `edit_note(project, identifier, operation, content)`
- `delete_note(project, identifier)`
- `view_note(project, identifier)`
- `read_content(project, path)`
**Knowledge Graph Navigation** (require project parameter):
- `build_context(project, url, timeframe, depth, max_related)`
- `list_directory(project, dir_name, depth, file_name_glob)`
- `search_notes(project, query, search_type, types, entity_types)`
**Search & Discovery** (use project parameter for specific project or none for discovery):
- `recent_activity(project, timeframe, depth, max_related)`
**Visualization** (require project parameter):
- `canvas(project, nodes, edges, title, folder)`
**Project Management** (unchanged - already stateless):
- `list_memory_projects()`
- `create_memory_project(project_name, project_path, set_default)`
- `delete_project(project_name)`
- `get_current_project()` - Remove this tool
- `switch_project(project_name)` - Remove this tool
- `set_default_project(project_name, activate)` - Remove activate parameter
## How (High Level)
### Phase 1: Basic Memory Core (basic-memory repository)
#### MCP Tool Updates
Phase 1: Core Changes 
1. Update project_context.py
- [x] Make project parameter mandatory for get_active_project()
- [x] Remove session state handling
2. Update Content Management Tools (6 tools)
- [x] write_note: Make project parameter required, not optional
- [x] read_note: Make project parameter required
- [x] edit_note: Add required project parameter
- [x] delete_note: Add required project parameter
- [x] view_note: Add required project parameter
- [x] read_content: Add required project parameter
3. Update Knowledge Graph Navigation Tools (3 tools)
- [x] build_context: Add required project parameter
- [x] recent_activity: Make project parameter required
- [x] list_directory: Add required project parameter
4. Update Search & Visualization Tools (2 tools)
- [x] search_notes: Add required project parameter
- [x] canvas: Add required project parameter
5. Update Project Management Tools
- [x] Remove switch_project tool completely
- [x] Remove get_current_project tool completely
- [x] Update set_default_project to remove activate parameter
- [x] Keep list_memory_projects, create_memory_project, delete_project unchanged
    
6. Enhance recent_activity Response
- [x] Add project distribution info showing activity across all projects
- [x] Include project usage stats in response
- [x] Implement ProjectActivitySummary for discovery mode
- [x] Add dual-mode functionality (discovery vs project-specific)
7. Update Tool Documentation
- [x] Update write_note docstring with stateless architecture examples
- [x] Update read_note docstring with project parameter examples
- [x] Update delete_note docstring with comprehensive usage guidance
- [x] Update all remaining tool docstrings with project parameter examples
8. Update Tool Responses
- [x] Add clear project indicator to all tool responses across all tools
- [x] Format: "project: {project_name}" in response metadata
- [x] Add project metadata footer for LLM awareness
- [x] Update all tool responses to include project indicators
9. Comprehensive Testing
- [x] Update all write_note tests to use stateless architecture (34 tests passing)
- [x] Update all edit_note tests to use stateless architecture (17 tests passing)
- [x] Update all view_note tests to use stateless architecture (12 tests passing)
- [x] Update all search_notes tests to use stateless architecture (16 tests passing)
- [x] Update all move_note tests to use stateless architecture (31 tests passing)
- [x] Update all delete_note tests to use stateless architecture
- [x] Verify direct function call compatibility (bypassing MCP layer)
- [x] Test security validation with project parameters
- [x] Validate error handling for non-existent projects
- [x] **Total: 157 tests updated and passing (100% success rate)**
  - [x] **147 existing tests** updated for stateless architecture
  - [x] **10 new tests** for single project constraint mode                                                                   
                                                                                                                             
### Phase 1.5: Default Project Mode Enhancement
#### Problem
While the stateless architecture solves reliability issues, it introduces UX friction for single-project users (estimated 80% of usage) who must specify the project parameter in every tool call.
#### Solution: Default Project Mode
Add optional `default_project_mode` configuration that allows single-project users to have the simplicity of implicit project selection while maintaining the reliability of stateless architecture.
#### Configuration
```json
{
  "default_project": "main",
  "default_project_mode": true  // NEW: Auto-use default_project when not specified
}
```
#### Implementation Details
1. **Config Enhancement** (`src/basic_memory/config.py`)
   - Add `default_project_mode: bool = Field(default=False)`
   - Preserves backward compatibility (defaults to false)
2. **Project Resolution Logic** (`src/basic_memory/mcp/project_context.py`)
   Three-tier resolution hierarchy:
   - Priority 1: CLI `--project` constraint (BASIC_MEMORY_MCP_PROJECT env var)
   - Priority 2: Explicit project parameter in tool call
   - Priority 3: `default_project` if `default_project_mode=true` and no project specified
3. **Assistant Guide Updates** (`src/basic_memory/mcp/resources/ai_assistant_guide.md`)
   - Detect `default_project_mode` at runtime
   - Provide mode-specific instructions to LLMs
   - In default mode: "All operations use project 'main' automatically"
   - In regular mode: Current project discovery guidance
4. **Tool Parameter Handling** (all MCP tools)
   - Make project parameter Optional[str] = None
   - Add resolution logic: `project = project or get_default_project()`
   - Maintain explicit project override capability
#### Usage Modes Summary
- **Regular Mode**: Multi-project users, assistant tracks project per conversation
- **Default Project Mode**: Single-project users, automatic default project
- **Constrained Mode**: CLI --project flag, locked to specific project
#### Testing Requirements
- Integration test for default_project_mode=true with missing parameters
- Test explicit project override in default_project_mode
- Test mode=false requires explicit parameters
- Test CLI constraint overrides default_project_mode
Phase 2: Testing & Validation
8. Update Tests
- [x] Modify all MCP tool tests to pass required project parameter
- [x] Remove tests for deleted tools (switch_project, get_current_project)
- [x] Add tests for project parameter validation
- [x] **Complete: All 147 tests across 5 test files updated and passing**
#### Enhanced recent_activity Response
```json
{
  "recent_notes": [...],
  "project_activity": {
    "research-project": {
      "operations": 5,
      "last_used": "30 minutes ago",
      "recent_folders": ["experiments", "findings"]
    },
    "work-notes": {
      "operations": 2,
      "last_used": "2 hours ago",
      "recent_folders": ["meetings", "planning"]
    }
  },
  "total_projects": 3
}
```
#### Response Format Updates
```
✓ Note created successfully
Project: research-project
File: experiments/Neural Network Results.md
Permalink: research-project/neural-network-results
```
### Phase 2: Cloud Service Simplification (basic-memory-cloud repository) ✅ **COMPLETE**
#### ✅ Remove Session Infrastructure **COMPLETE**
1. ✅ Delete `apps/mcp/src/basic_memory_cloud_mcp/middleware/session_state.py`
2. ✅ Delete `apps/mcp/src/basic_memory_cloud_mcp/middleware/session_logging.py`
3. ✅ Update `apps/mcp/src/basic_memory_cloud_mcp/main.py`:
   ```python
   # Remove session middleware
   # server.add_middleware(SessionStateMiddleware)
   # Enable stateless HTTP
   mcp = FastMCP(name="basic-memory-mcp", stateless_http=True)
   ```
#### ✅ Deployment Simplification **COMPLETE**
1. ✅ Remove Redis from `fly.toml`
2. ✅ Remove Redis environment variables
3. ✅ Update health checks to not depend on Redis
4. ✅ Production deployment verified working with stateless architecture
### Phase 3: Conversational Project Management ✅ **COMPLETE**
#### ✅ Claude Behavior Pattern **VERIFIED WORKING**
1. ✅ **Project Discovery**:
   ```
   Claude: Let me check your recent activity...
   [calls recent_activity() - no project needed for discovery]
   I see you've been working in:
   - research-project (5 operations, 30 min ago)
   - work-notes (2 operations, 2 hours ago)
   Which project should I use for this operation?
   ```
2. ✅ **Context Maintenance**:
   ```
   User: Use research-project
   Claude: Working in research-project.
   [All subsequent operations use project="research-project"]
   ```
3. ✅ **Explicit Project Switching**:
   ```
   User: Check work-notes for that meeting summary
   Claude: Let me search work-notes for the meeting summary.
   [Uses project="work-notes" for specific operation]
   ```
**Validation**: Comprehensive testing confirmed all conversational patterns work naturally with the stateless architecture.
## How to Evaluate
### Success Criteria
#### 1. Functional Completeness
- [x] All MCP tools accept required `project` parameter
- [x] All MCP tools validate project exists before execution
- [x] `switch_project` and `get_current_project` tools removed
- [x] All responses display target project clearly
- [x] No Redis dependencies in deployment (Phase 2: Cloud Service) ✅ **COMPLETE**
- [x] `recent_activity` shows project distribution with ProjectActivitySummary
#### 2. Cross-Client Compatibility Testing ✅ **COMPLETE**
Test identical operations across all clients:
- [x] **Claude Desktop**: All operations work with explicit projects ✅
- [x] **Claude Code**: All operations work with explicit projects ✅
- [x] **Claude Mobile iOS**: All operations work with explicit projects ✅ **CRITICAL SUCCESS**
- [x] **API clients**: All operations work with explicit projects ✅
- [x] **CLI tools**: All operations work with explicit projects ✅
**Critical Achievement**: Claude iOS mobile client session tracking issues completely eliminated through stateless architecture.
#### 3. Session Independence Verification ✅ **COMPLETE**
- [x] Operations work identically with/without session tracking ✅
- [x] No behavioral differences between clients ✅
- [x] Mobile client session ID changes do not affect operations ✅
- [x] Redis can be completely removed without functional impact ✅
**Production Validation**: Redis removed from production deployment with zero functional impact.
#### 4. Performance & Scaling ✅ **COMPLETE**
- [x] `stateless_http=True` enabled successfully ✅
- [x] No Redis memory usage ✅
- [x] Horizontal scaling possible (multiple MCP instances) ✅
- [x] Response times unchanged or improved ✅
#### 5. User Experience Testing
**Project Discovery Flow**:
- [x] `recent_activity()` provides useful project context
- [x] Claude can intelligently suggest projects based on activity
- [x] Project switching is explicit and clear in conversation
**Error Handling**:
- [x] Clear error messages for non-existent projects
- [x] Helpful suggestions when project parameter missing
- [x] No silent failures or wrong-project operations
**Response Clarity**:
- [x] Every operation clearly shows target project
- [x] Users always know which project is being operated on
- [x] No confusion about "current project" state
#### 6. Migration Safety ✅ **COMPLETE**
- [x] Backward compatibility period with optional project parameter ✅
- [x] Clear migration documentation for existing users ✅
- [x] Data integrity maintained during transition ✅
- [x] No data loss during migration ✅
**Production Migration**: Successfully deployed to production with zero data loss and maintained system integrity.
### Test Scenarios
#### Core Functionality Test
```bash
# Test all tools work with explicit project
write_note(project="test-proj", title="Test", content="Content", folder="docs")
read_note(project="test-proj", identifier="Test")
edit_note(project="test-proj", identifier="Test", operation="append", content="More")
search_notes(project="test-proj", query="Content")
list_directory(project="test-proj", dir_name="docs")
delete_note(project="test-proj", identifier="Test")
```
#### Cross-Client Consistency Test
Run identical test sequence on:
1. Claude Desktop
2. Claude Code
3. Claude Mobile iOS
4. API client
5. CLI tools
Verify all clients:
- Accept explicit project parameters
- Return identical responses
- Show same project information
- Have no session dependencies
#### Session Independence Test
1. Monitor session IDs during operations
2. Verify operations work with changing session IDs
3. Confirm Redis removal doesn't affect functionality
4. Test with multiple concurrent clients
### Acceptance Criteria
**Must Have**:
- All MCP tools require and use explicit project parameter
- No session state dependencies remain
- Universal client compatibility achieved
- Clear project information in all responses
**Should Have**:
- Enhanced `recent_activity` with project distribution
- Smooth migration path for existing users
- Improved performance with stateless architecture
**Could Have**:
- Smart project suggestions based on content/context
- Project shortcuts for common operations
- Advanced project analytics in responses
## Notes
### Breaking Changes
This is a **breaking change** that requires:
- All MCP clients to pass project parameter
- Migration of existing workflows
- Update of all documentation and examples
### Implementation Order
1. **basic-memory core** - Update MCP tools to accept project parameter (optional initially)
2. **Testing** - Verify all clients work with explicit projects
3. **Cloud service** - Remove session infrastructure
4. **Migration** - Make project parameter mandatory
5. **Cleanup** - Remove deprecated tools and middleware
### Related Issues
- Fixes #74 (Claude iOS session state bug)
- Implements #75 (Mandatory project parameter architecture)
- Enables future horizontal scaling
- Simplifies multi-tenant architecture
### Dependencies
- Requires coordination between basic-memory and basic-memory-cloud repositories
- Needs client-side updates for smooth transition
- Documentation updates across all materials
```
--------------------------------------------------------------------------------
/tests/mcp/test_tool_read_content.py:
--------------------------------------------------------------------------------
```python
"""Tests for the read_content MCP tool security validation."""
import pytest
from unittest.mock import patch, MagicMock
from pathlib import Path
from basic_memory.mcp.tools.read_content import read_content
from basic_memory.mcp.tools.write_note import write_note
class TestReadContentSecurityValidation:
    """Test read_content security validation features."""
    @pytest.mark.asyncio
    async def test_read_content_blocks_path_traversal_unix(self, client, test_project):
        """Test that Unix-style path traversal attacks are blocked."""
        # Test various Unix-style path traversal patterns
        attack_paths = [
            "../secrets.txt",
            "../../etc/passwd",
            "../../../root/.ssh/id_rsa",
            "notes/../../../etc/shadow",
            "folder/../../outside/file.md",
            "../../../../etc/hosts",
            "../../../home/user/.env",
        ]
        for attack_path in attack_paths:
            result = await read_content.fn(project=test_project.name, path=attack_path)
            assert isinstance(result, dict)
            assert result["type"] == "error"
            assert "paths must stay within project boundaries" in result["error"]
            assert attack_path in result["error"]
    @pytest.mark.asyncio
    async def test_read_content_blocks_path_traversal_windows(self, client, test_project):
        """Test that Windows-style path traversal attacks are blocked."""
        # Test various Windows-style path traversal patterns
        attack_paths = [
            "..\\secrets.txt",
            "..\\..\\Windows\\System32\\config\\SAM",
            "notes\\..\\..\\..\\Windows\\System32",
            "\\\\server\\share\\file.txt",
            "..\\..\\Users\\user\\.env",
            "\\\\..\\..\\Windows",
            "..\\..\\..\\Boot.ini",
        ]
        for attack_path in attack_paths:
            result = await read_content.fn(project=test_project.name, path=attack_path)
            assert isinstance(result, dict)
            assert result["type"] == "error"
            assert "paths must stay within project boundaries" in result["error"]
            assert attack_path in result["error"]
    @pytest.mark.asyncio
    async def test_read_content_blocks_absolute_paths(self, client, test_project):
        """Test that absolute paths are blocked."""
        # Test various absolute path patterns
        attack_paths = [
            "/etc/passwd",
            "/home/user/.env",
            "/var/log/auth.log",
            "/root/.ssh/id_rsa",
            "C:\\Windows\\System32\\config\\SAM",
            "C:\\Users\\user\\.env",
            "D:\\secrets\\config.json",
            "/tmp/malicious.txt",
            "/usr/local/bin/evil",
        ]
        for attack_path in attack_paths:
            result = await read_content.fn(project=test_project.name, path=attack_path)
            assert isinstance(result, dict)
            assert result["type"] == "error"
            assert "paths must stay within project boundaries" in result["error"]
            assert attack_path in result["error"]
    @pytest.mark.asyncio
    async def test_read_content_blocks_home_directory_access(self, client, test_project):
        """Test that home directory access patterns are blocked."""
        # Test various home directory access patterns
        attack_paths = [
            "~/secrets.txt",
            "~/.env",
            "~/.ssh/id_rsa",
            "~/Documents/passwords.txt",
            "~\\AppData\\secrets",
            "~\\Desktop\\config.ini",
            "~/.bashrc",
            "~/Library/Preferences/secret.plist",
        ]
        for attack_path in attack_paths:
            result = await read_content.fn(project=test_project.name, path=attack_path)
            assert isinstance(result, dict)
            assert result["type"] == "error"
            assert "paths must stay within project boundaries" in result["error"]
            assert attack_path in result["error"]
    @pytest.mark.asyncio
    async def test_read_content_blocks_mixed_attack_patterns(self, client, test_project):
        """Test that mixed legitimate/attack patterns are blocked."""
        # Test mixed patterns that start legitimate but contain attacks
        attack_paths = [
            "notes/../../../etc/passwd",
            "docs/../../.env",
            "legitimate/path/../../.ssh/id_rsa",
            "project/folder/../../../Windows/System32",
            "valid/folder/../../home/user/.bashrc",
            "assets/../../../tmp/evil.exe",
        ]
        for attack_path in attack_paths:
            result = await read_content.fn(project=test_project.name, path=attack_path)
            assert isinstance(result, dict)
            assert result["type"] == "error"
            assert "paths must stay within project boundaries" in result["error"]
    @pytest.mark.asyncio
    async def test_read_content_allows_safe_paths_with_mocked_api(self, client, test_project):
        """Test that legitimate paths are still allowed with mocked API responses."""
        # Test various safe path patterns with mocked API responses
        safe_paths = [
            "notes/meeting.md",
            "docs/readme.txt",
            "projects/2025/planning.md",
            "archive/old-notes/backup.md",
            "assets/diagram.png",
            "folder/subfolder/document.md",
        ]
        for safe_path in safe_paths:
            # Mock the API call to simulate a successful response
            with patch("basic_memory.mcp.tools.read_content.call_get") as mock_call_get:
                mock_response = MagicMock()
                mock_response.headers = {"content-type": "text/markdown", "content-length": "100"}
                mock_response.text = f"# Content for {safe_path}\nThis is test content."
                mock_call_get.return_value = mock_response
                result = await read_content.fn(project=test_project.name, path=safe_path)
                # Should succeed (not a security error)
                assert isinstance(result, dict)
                assert result[
                    "type"
                ] != "error" or "paths must stay within project boundaries" not in result.get(
                    "error", ""
                )
    @pytest.mark.asyncio
    async def test_read_content_memory_url_processing(self, client, test_project):
        """Test that memory URLs are processed correctly for security validation."""
        # Test memory URLs with attacks
        attack_paths = [
            "memory://../../etc/passwd",
            "memory://../../../root/.ssh/id_rsa",
            "memory://~/.env",
            "memory:///etc/passwd",
        ]
        for attack_path in attack_paths:
            result = await read_content.fn(project=test_project.name, path=attack_path)
            assert isinstance(result, dict)
            assert result["type"] == "error"
            assert "paths must stay within project boundaries" in result["error"]
    @pytest.mark.asyncio
    async def test_read_content_security_logging(self, client, caplog, test_project):
        """Test that security violations are properly logged."""
        # Attempt path traversal attack
        result = await read_content.fn(project=test_project.name, path="../../../etc/passwd")
        assert result["type"] == "error"
        assert "paths must stay within project boundaries" in result["error"]
        # Check that security violation was logged
        # Note: This test may need adjustment based on the actual logging setup
        # The security validation should generate a warning log entry
    @pytest.mark.asyncio
    async def test_read_content_empty_path_security(self, client, test_project):
        """Test that empty path is handled securely."""
        # Mock the API call since empty path should be allowed (resolves to project root)
        with patch("basic_memory.mcp.tools.read_content.call_get") as mock_call_get:
            mock_response = MagicMock()
            mock_response.headers = {"content-type": "text/markdown", "content-length": "50"}
            mock_response.text = "# Root content"
            mock_call_get.return_value = mock_response
            result = await read_content.fn(project=test_project.name, path="")
            assert isinstance(result, dict)
            # Empty path should not trigger security error (it's handled as project root)
            assert result[
                "type"
            ] != "error" or "paths must stay within project boundaries" not in result.get(
                "error", ""
            )
    @pytest.mark.asyncio
    async def test_read_content_current_directory_references_security(self, client, test_project):
        """Test that current directory references are handled securely."""
        # Test current directory references (should be safe)
        safe_paths = [
            "./notes/file.md",
            "folder/./file.md",
            "./folder/subfolder/file.md",
        ]
        for safe_path in safe_paths:
            # Mock the API call for these safe paths
            with patch("basic_memory.mcp.tools.read_content.call_get") as mock_call_get:
                mock_response = MagicMock()
                mock_response.headers = {"content-type": "text/markdown", "content-length": "100"}
                mock_response.text = f"# Content for {safe_path}"
                mock_call_get.return_value = mock_response
                result = await read_content.fn(project=test_project.name, path=safe_path)
                assert isinstance(result, dict)
                # Should NOT contain security error message
                assert result[
                    "type"
                ] != "error" or "paths must stay within project boundaries" not in result.get(
                    "error", ""
                )
class TestReadContentFunctionality:
    """Test read_content basic functionality with security validation in place."""
    @pytest.mark.asyncio
    async def test_read_content_text_file_success(self, client, test_project):
        """Test reading a text file works correctly with security validation."""
        # First create a file to read
        await write_note.fn(
            project=test_project.name,
            title="Test Document",
            folder="docs",
            content="# Test Document\nThis is test content for reading.",
        )
        # Mock the API call to simulate reading the file
        with patch("basic_memory.mcp.tools.read_content.call_get") as mock_call_get:
            mock_response = MagicMock()
            mock_response.headers = {"content-type": "text/markdown", "content-length": "100"}
            mock_response.text = "# Test Document\nThis is test content for reading."
            mock_call_get.return_value = mock_response
            result = await read_content.fn(project=test_project.name, path="docs/test-document.md")
            assert isinstance(result, dict)
            assert result["type"] == "text"
            assert "Test Document" in result["text"]
            assert result["content_type"] == "text/markdown"
            assert result["encoding"] == "utf-8"
    @pytest.mark.asyncio
    async def test_read_content_image_file_handling(self, client, test_project):
        """Test reading an image file with security validation."""
        # Mock the API call to simulate reading an image
        with patch("basic_memory.mcp.tools.read_content.call_get") as mock_call_get:
            # Create a simple fake image data
            fake_image_data = b"\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x01\x00\x00\x00\x01\x08\x06\x00\x00\x00\x1f\x15\xc4\x89\x00\x00\x00\rIDATx\x9cc\x00\x01\x00\x00\x05\x00\x01\r\n-\xdb\x00\x00\x00\x00IEND\xaeB`\x82"
            mock_response = MagicMock()
            mock_response.headers = {
                "content-type": "image/png",
                "content-length": str(len(fake_image_data)),
            }
            mock_response.content = fake_image_data
            mock_call_get.return_value = mock_response
            # Mock PIL Image processing
            with patch("basic_memory.mcp.tools.read_content.PILImage") as mock_pil:
                mock_img = MagicMock()
                mock_img.width = 100
                mock_img.height = 100
                mock_img.mode = "RGB"
                mock_img.getbands.return_value = ["R", "G", "B"]
                mock_pil.open.return_value = mock_img
                with patch("basic_memory.mcp.tools.read_content.optimize_image") as mock_optimize:
                    mock_optimize.return_value = b"optimized_image_data"
                    result = await read_content.fn(
                        project=test_project.name, path="assets/safe-image.png"
                    )
                    assert isinstance(result, dict)
                    assert result["type"] == "image"
                    assert "source" in result
                    assert result["source"]["type"] == "base64"
                    assert result["source"]["media_type"] == "image/jpeg"
    @pytest.mark.asyncio
    async def test_read_content_with_project_parameter(self, client, test_project):
        """Test reading content with explicit project parameter."""
        # Mock the API call and project configuration
        with patch("basic_memory.mcp.tools.read_content.call_get") as mock_call_get:
            with patch(
                "basic_memory.mcp.tools.read_content.get_active_project"
            ) as mock_get_project:
                # Mock project configuration
                mock_project = MagicMock()
                mock_project.project_url = "http://test"
                mock_project.home = Path("/test/project")
                mock_get_project.return_value = mock_project
                mock_response = MagicMock()
                mock_response.headers = {"content-type": "text/plain", "content-length": "50"}
                mock_response.text = "Project-specific content"
                mock_call_get.return_value = mock_response
                result = await read_content.fn(
                    path="notes/project-file.txt", project="specific-project"
                )
                assert isinstance(result, dict)
                assert result["type"] == "text"
                assert "Project-specific content" in result["text"]
    @pytest.mark.asyncio
    async def test_read_content_nonexistent_file_handling(self, client, test_project):
        """Test handling of nonexistent files (after security validation)."""
        # Mock API call to return 404
        with patch("basic_memory.mcp.tools.read_content.call_get") as mock_call_get:
            mock_call_get.side_effect = Exception("File not found")
            # This should pass security validation but fail on API call
            try:
                result = await read_content.fn(
                    project=test_project.name, path="docs/nonexistent-file.md"
                )
                # If no exception is raised, check the result format
                assert isinstance(result, dict)
            except Exception as e:
                # Exception due to API failure is acceptable for this test
                assert "File not found" in str(e)
    @pytest.mark.asyncio
    async def test_read_content_binary_file_handling(self, client, test_project):
        """Test reading binary files with security validation."""
        # Mock the API call to simulate reading a binary file
        with patch("basic_memory.mcp.tools.read_content.call_get") as mock_call_get:
            binary_data = b"Binary file content with special bytes: \x00\x01\x02\x03"
            mock_response = MagicMock()
            mock_response.headers = {
                "content-type": "application/octet-stream",
                "content-length": str(len(binary_data)),
            }
            mock_response.content = binary_data
            mock_call_get.return_value = mock_response
            result = await read_content.fn(project=test_project.name, path="files/safe-binary.bin")
            assert isinstance(result, dict)
            assert result["type"] == "document"
            assert "source" in result
            assert result["source"]["type"] == "base64"
            assert result["source"]["media_type"] == "application/octet-stream"
class TestReadContentEdgeCases:
    """Test edge cases for read_content security validation."""
    @pytest.mark.asyncio
    async def test_read_content_unicode_path_attacks(self, client, test_project):
        """Test that Unicode-based path traversal attempts are blocked."""
        # Test Unicode path traversal attempts
        unicode_attacks = [
            "notes/文档/../../../etc/passwd",  # Chinese characters
            "docs/café/../../.env",  # Accented characters
            "files/αβγ/../../../secret.txt",  # Greek characters
        ]
        for attack_path in unicode_attacks:
            result = await read_content.fn(project=test_project.name, path=attack_path)
            assert isinstance(result, dict)
            assert result["type"] == "error"
            assert "paths must stay within project boundaries" in result["error"]
    @pytest.mark.asyncio
    async def test_read_content_url_encoded_attacks(self, client, test_project):
        """Test that URL-encoded path traversal attempts are handled safely."""
        # Note: The current implementation may not handle URL encoding,
        # but this tests the behavior with URL-encoded patterns
        encoded_attacks = [
            "notes%2f..%2f..%2f..%2fetc%2fpasswd",
            "docs%2f%2e%2e%2f%2e%2e%2f.env",
        ]
        for attack_path in encoded_attacks:
            try:
                result = await read_content.fn(project=test_project.name, path=attack_path)
                # These may or may not be blocked depending on URL decoding,
                # but should not cause security issues
                assert isinstance(result, dict)
                # If not blocked by security validation, may fail at API level
                # which is also acceptable
            except Exception:
                # Exception due to API failure or other issues is acceptable
                # as long as no actual traversal occurs
                pass
    @pytest.mark.asyncio
    async def test_read_content_null_byte_injection(self, client, test_project):
        """Test that null byte injection attempts are blocked."""
        # Test null byte injection patterns
        null_byte_attacks = [
            "notes/file.txt\x00../../etc/passwd",
            "docs/document.md\x00../../../.env",
        ]
        for attack_path in null_byte_attacks:
            result = await read_content.fn(project=test_project.name, path=attack_path)
            assert isinstance(result, dict)
            # Should be blocked by security validation or cause an error
            if result["type"] == "error":
                # Either blocked by security validation or failed due to invalid characters
                pass  # This is acceptable
    @pytest.mark.asyncio
    async def test_read_content_very_long_attack_path(self, client, test_project):
        """Test handling of very long attack paths."""
        # Create a very long path traversal attack
        long_attack = "../" * 1000 + "etc/passwd"
        result = await read_content.fn(project=test_project.name, path=long_attack)
        assert isinstance(result, dict)
        assert result["type"] == "error"
        assert "paths must stay within project boundaries" in result["error"]
    @pytest.mark.asyncio
    async def test_read_content_case_variations_attacks(self, client, test_project):
        """Test that case variations don't bypass security."""
        # Test case variations (though case sensitivity depends on filesystem)
        case_attacks = [
            "../ETC/passwd",
            "../Etc/PASSWD",
            "..\\WINDOWS\\system32",
            "~/.SSH/id_rsa",
        ]
        for attack_path in case_attacks:
            result = await read_content.fn(project=test_project.name, path=attack_path)
            assert isinstance(result, dict)
            assert result["type"] == "error"
            assert "paths must stay within project boundaries" in result["error"]
```
--------------------------------------------------------------------------------
/src/basic_memory/sync/watch_service.py:
--------------------------------------------------------------------------------
```python
"""Watch service for Basic Memory."""
import asyncio
import os
from collections import defaultdict
from datetime import datetime
from pathlib import Path
from typing import List, Optional, Set, Sequence
from basic_memory.config import BasicMemoryConfig, WATCH_STATUS_JSON
from basic_memory.ignore_utils import load_gitignore_patterns, should_ignore_path
from basic_memory.models import Project
from basic_memory.repository import ProjectRepository
from loguru import logger
from pydantic import BaseModel
from rich.console import Console
from watchfiles import awatch
from watchfiles.main import FileChange, Change
import time
class WatchEvent(BaseModel):
    timestamp: datetime
    path: str
    action: str  # new, delete, etc
    status: str  # success, error
    checksum: Optional[str]
    error: Optional[str] = None
class WatchServiceState(BaseModel):
    # Service status
    running: bool = False
    start_time: datetime = datetime.now()  # Use directly with Pydantic model
    pid: int = os.getpid()  # Use directly with Pydantic model
    # Stats
    error_count: int = 0
    last_error: Optional[datetime] = None
    last_scan: Optional[datetime] = None
    # File counts
    synced_files: int = 0
    # Recent activity
    recent_events: List[WatchEvent] = []  # Use directly with Pydantic model
    def add_event(
        self,
        path: str,
        action: str,
        status: str,
        checksum: Optional[str] = None,
        error: Optional[str] = None,
    ) -> WatchEvent:
        event = WatchEvent(
            timestamp=datetime.now(),
            path=path,
            action=action,
            status=status,
            checksum=checksum,
            error=error,
        )
        self.recent_events.insert(0, event)
        self.recent_events = self.recent_events[:100]  # Keep last 100
        return event
    def record_error(self, error: str):
        self.error_count += 1
        self.add_event(path="", action="sync", status="error", error=error)
        self.last_error = datetime.now()
class WatchService:
    def __init__(
        self,
        app_config: BasicMemoryConfig,
        project_repository: ProjectRepository,
        quiet: bool = False,
    ):
        self.app_config = app_config
        self.project_repository = project_repository
        self.state = WatchServiceState()
        self.status_path = Path.home() / ".basic-memory" / WATCH_STATUS_JSON
        self.status_path.parent.mkdir(parents=True, exist_ok=True)
        self._ignore_patterns_cache: dict[Path, Set[str]] = {}
        # quiet mode for mcp so it doesn't mess up stdout
        self.console = Console(quiet=quiet)
    async def _schedule_restart(self, stop_event: asyncio.Event):
        """Schedule a restart of the watch service after the configured interval."""
        await asyncio.sleep(self.app_config.watch_project_reload_interval)
        stop_event.set()
    def _get_ignore_patterns(self, project_path: Path) -> Set[str]:
        """Get or load ignore patterns for a project path."""
        if project_path not in self._ignore_patterns_cache:
            self._ignore_patterns_cache[project_path] = load_gitignore_patterns(project_path)
        return self._ignore_patterns_cache[project_path]
    async def _watch_projects_cycle(self, projects: Sequence[Project], stop_event: asyncio.Event):
        """Run one cycle of watching the given projects until stop_event is set."""
        project_paths = [project.path for project in projects]
        async for changes in awatch(
            *project_paths,
            debounce=self.app_config.sync_delay,
            watch_filter=self.filter_changes,
            recursive=True,
            stop_event=stop_event,
        ):
            # group changes by project and filter using ignore patterns
            project_changes = defaultdict(list)
            for change, path in changes:
                for project in projects:
                    if self.is_project_path(project, path):
                        # Check if the file should be ignored based on gitignore patterns
                        project_path = Path(project.path)
                        file_path = Path(path)
                        ignore_patterns = self._get_ignore_patterns(project_path)
                        if should_ignore_path(file_path, project_path, ignore_patterns):
                            logger.trace(
                                f"Ignoring watched file change: {file_path.relative_to(project_path)}"
                            )
                            continue
                        project_changes[project].append((change, path))
                        break
            # create coroutines to handle changes
            change_handlers = [
                self.handle_changes(project, changes)  # pyright: ignore
                for project, changes in project_changes.items()
            ]
            # process changes
            await asyncio.gather(*change_handlers)
    async def run(self):  # pragma: no cover
        """Watch for file changes and sync them"""
        self.state.running = True
        self.state.start_time = datetime.now()
        await self.write_status()
        logger.info(
            "Watch service started",
            f"debounce_ms={self.app_config.sync_delay}",
            f"pid={os.getpid()}",
        )
        try:
            while self.state.running:
                # Clear ignore patterns cache to pick up any .gitignore changes
                self._ignore_patterns_cache.clear()
                # Reload projects to catch any new/removed projects
                projects = await self.project_repository.get_active_projects()
                project_paths = [project.path for project in projects]
                logger.debug(f"Starting watch cycle for directories: {project_paths}")
                # Create stop event for this watch cycle
                stop_event = asyncio.Event()
                # Schedule restart after configured interval to reload projects
                timer_task = asyncio.create_task(self._schedule_restart(stop_event))
                try:
                    await self._watch_projects_cycle(projects, stop_event)
                except Exception as e:
                    logger.exception("Watch service error during cycle", error=str(e))
                    self.state.record_error(str(e))
                    await self.write_status()
                    # Continue to next cycle instead of exiting
                    await asyncio.sleep(5)  # Brief pause before retry
                finally:
                    # Cancel timer task if it's still running
                    if not timer_task.done():
                        timer_task.cancel()
                        try:
                            await timer_task
                        except asyncio.CancelledError:
                            pass
        except Exception as e:
            logger.exception("Watch service error", error=str(e))
            self.state.record_error(str(e))
            await self.write_status()
            raise
        finally:
            logger.info(
                "Watch service stopped",
                f"runtime_seconds={int((datetime.now() - self.state.start_time).total_seconds())}",
            )
            self.state.running = False
            await self.write_status()
    def filter_changes(self, change: Change, path: str) -> bool:  # pragma: no cover
        """Filter to only watch non-hidden files and directories.
        Returns:
            True if the file should be watched, False if it should be ignored
        """
        # Skip hidden directories and files
        path_parts = Path(path).parts
        for part in path_parts:
            if part.startswith("."):
                return False
        # Skip temp files used in atomic operations
        if path.endswith(".tmp"):
            return False
        return True
    async def write_status(self):
        """Write current state to status file"""
        self.status_path.write_text(WatchServiceState.model_dump_json(self.state, indent=2))
    def is_project_path(self, project: Project, path):
        """
        Checks if path is a subdirectory or file within a project
        """
        project_path = Path(project.path).resolve()
        sub_path = Path(path).resolve()
        return project_path in sub_path.parents
    async def handle_changes(self, project: Project, changes: Set[FileChange]) -> None:
        """Process a batch of file changes"""
        # avoid circular imports
        from basic_memory.sync.sync_service import get_sync_service
        # Check if project still exists in configuration before processing
        # This prevents deleted projects from being recreated by background sync
        from basic_memory.config import ConfigManager
        config_manager = ConfigManager()
        if (
            project.name not in config_manager.projects
            and project.permalink not in config_manager.projects
        ):
            logger.info(
                f"Skipping sync for deleted project: {project.name}, change_count={len(changes)}"
            )
            return
        sync_service = await get_sync_service(project)
        file_service = sync_service.file_service
        start_time = time.time()
        directory = Path(project.path).resolve()
        logger.info(
            f"Processing project: {project.name} changes, change_count={len(changes)}, directory={directory}"
        )
        # Group changes by type
        adds: List[str] = []
        deletes: List[str] = []
        modifies: List[str] = []
        for change, path in changes:
            # convert to relative path
            relative_path = Path(path).relative_to(directory).as_posix()
            # Skip .tmp files - they're temporary and shouldn't be synced
            if relative_path.endswith(".tmp"):
                continue
            if change == Change.added:
                adds.append(relative_path)
            elif change == Change.deleted:
                deletes.append(relative_path)
            elif change == Change.modified:
                modifies.append(relative_path)
        logger.debug(
            f"Grouped file changes, added={len(adds)}, deleted={len(deletes)}, modified={len(modifies)}"
        )
        # because of our atomic writes on updates, an add may be an existing file
        for added_path in adds:  # pragma: no cover TODO add test
            entity = await sync_service.entity_repository.get_by_file_path(added_path)
            if entity is not None:
                logger.debug(f"Existing file will be processed as modified, path={added_path}")
                adds.remove(added_path)
                modifies.append(added_path)
        # Track processed files to avoid duplicates
        processed: Set[str] = set()
        # First handle potential moves
        for added_path in adds:
            if added_path in processed:
                continue  # pragma: no cover
            # Skip directories for added paths
            # We don't need to process directories, only the files inside them
            # This prevents errors when trying to compute checksums or read directories as files
            added_full_path = directory / added_path
            if not added_full_path.exists() or added_full_path.is_dir():
                logger.debug("Skipping non-existent or directory path", path=added_path)
                processed.add(added_path)
                continue
            for deleted_path in deletes:
                if deleted_path in processed:
                    continue  # pragma: no cover
                # Skip directories for deleted paths (based on entity type in db)
                deleted_entity = await sync_service.entity_repository.get_by_file_path(deleted_path)
                if deleted_entity is None:
                    # If this was a directory, it wouldn't have an entity
                    logger.debug("Skipping unknown path for move detection", path=deleted_path)
                    continue
                if added_path != deleted_path:
                    # Compare checksums to detect moves
                    try:
                        added_checksum = await file_service.compute_checksum(added_path)
                        if deleted_entity and deleted_entity.checksum == added_checksum:
                            await sync_service.handle_move(deleted_path, added_path)
                            self.state.add_event(
                                path=f"{deleted_path} -> {added_path}",
                                action="moved",
                                status="success",
                            )
                            self.console.print(f"[blue]→[/blue] {deleted_path} → {added_path}")
                            logger.info(f"move: {deleted_path} -> {added_path}")
                            processed.add(added_path)
                            processed.add(deleted_path)
                            break
                    except Exception as e:  # pragma: no cover
                        logger.warning(
                            "Error checking for move",
                            f"old_path={deleted_path}",
                            f"new_path={added_path}",
                            f"error={str(e)}",
                        )
        # Handle remaining changes - group them by type for concise output
        moved_count = len([p for p in processed if p in deletes or p in adds])
        delete_count = 0
        add_count = 0
        modify_count = 0
        # Process deletes
        for path in deletes:
            if path not in processed:
                # Check if file still exists on disk (vim atomic write edge case)
                full_path = directory / path
                if full_path.exists() and full_path.is_file():
                    # File still exists despite DELETE event - treat as modification
                    logger.debug(
                        "File exists despite DELETE event, treating as modification", path=path
                    )
                    entity, checksum = await sync_service.sync_file(path, new=False)
                    self.state.add_event(
                        path=path, action="modified", status="success", checksum=checksum
                    )
                    self.console.print(f"[yellow]✎[/yellow] {path} (atomic write)")
                    logger.info(f"atomic write detected: {path}")
                    processed.add(path)
                    modify_count += 1
                else:
                    # Check if this was a directory - skip if so
                    # (we can't tell if the deleted path was a directory since it no longer exists,
                    # so we check if there's an entity in the database for it)
                    entity = await sync_service.entity_repository.get_by_file_path(path)
                    if entity is None:
                        # No entity means this was likely a directory - skip it
                        logger.debug(
                            f"Skipping deleted path with no entity (likely directory), path={path}"
                        )
                        processed.add(path)
                        continue
                    # File truly deleted
                    logger.debug("Processing deleted file", path=path)
                    await sync_service.handle_delete(path)
                    self.state.add_event(path=path, action="deleted", status="success")
                    self.console.print(f"[red]✕[/red] {path}")
                    logger.info(f"deleted: {path}")
                    processed.add(path)
                    delete_count += 1
        # Process adds
        for path in adds:
            if path not in processed:
                # Skip directories - only process files
                full_path = directory / path
                if not full_path.exists() or full_path.is_dir():
                    logger.debug(
                        f"Skipping non-existent or directory path, path={path}"
                    )  # pragma: no cover
                    processed.add(path)  # pragma: no cover
                    continue  # pragma: no cover
                logger.debug(f"Processing new file, path={path}")
                entity, checksum = await sync_service.sync_file(path, new=True)
                if checksum:
                    self.state.add_event(
                        path=path, action="new", status="success", checksum=checksum
                    )
                    self.console.print(f"[green]✓[/green] {path}")
                    logger.info(
                        "new file processed",
                        f"path={path}",
                        f"checksum={checksum}",
                    )
                    processed.add(path)
                    add_count += 1
                else:  # pragma: no cover
                    logger.warning(f"Error syncing new file, path={path}")  # pragma: no cover
                    self.console.print(
                        f"[orange]?[/orange] Error syncing: {path}"
                    )  # pragma: no cover
        # Process modifies - detect repeats
        last_modified_path = None
        repeat_count = 0
        for path in modifies:
            if path not in processed:
                # Skip directories - only process files
                full_path = directory / path
                if not full_path.exists() or full_path.is_dir():
                    logger.debug("Skipping non-existent or directory path", path=path)
                    processed.add(path)
                    continue
                logger.debug(f"Processing modified file: path={path}")
                entity, checksum = await sync_service.sync_file(path, new=False)
                self.state.add_event(
                    path=path, action="modified", status="success", checksum=checksum
                )
                # Check if this is a repeat of the last modified file
                if path == last_modified_path:  # pragma: no cover
                    repeat_count += 1  # pragma: no cover
                    # Only show a message for the first repeat
                    if repeat_count == 1:  # pragma: no cover
                        self.console.print(
                            f"[yellow]...[/yellow] Repeated changes to {path}"
                        )  # pragma: no cover
                else:
                    # haven't processed this file
                    self.console.print(f"[yellow]✎[/yellow] {path}")
                    logger.info(f"modified: {path}")
                    last_modified_path = path
                    repeat_count = 0
                    modify_count += 1
                logger.debug(  # pragma: no cover
                    "Modified file processed, "
                    f"path={path} "
                    f"entity_id={entity.id if entity else None} "
                    f"checksum={checksum}",
                )
                processed.add(path)
        # Add a concise summary instead of a divider
        if processed:
            changes = []  # pyright: ignore
            if add_count > 0:
                changes.append(f"[green]{add_count} added[/green]")  # pyright: ignore
            if modify_count > 0:
                changes.append(f"[yellow]{modify_count} modified[/yellow]")  # pyright: ignore
            if moved_count > 0:
                changes.append(f"[blue]{moved_count} moved[/blue]")  # pyright: ignore
            if delete_count > 0:
                changes.append(f"[red]{delete_count} deleted[/red]")  # pyright: ignore
            if changes:
                self.console.print(f"{', '.join(changes)}", style="dim")  # pyright: ignore
                logger.info(f"changes: {len(changes)}")
        duration_ms = int((time.time() - start_time) * 1000)
        self.state.last_scan = datetime.now()
        self.state.synced_files += len(processed)
        logger.info(
            "File change processing completed, "
            f"processed_files={len(processed)}, "
            f"total_synced_files={self.state.synced_files}, "
            f"duration_ms={duration_ms}"
        )
        await self.write_status()
```