This is page 12 of 17. Use http://codebase.md/basicmachines-co/basic-memory?lines=false&page={x} to view the full context.
# Directory Structure
```
├── .claude
│   ├── agents
│   │   ├── python-developer.md
│   │   └── system-architect.md
│   └── commands
│       ├── release
│       │   ├── beta.md
│       │   ├── changelog.md
│       │   ├── release-check.md
│       │   └── release.md
│       ├── spec.md
│       └── test-live.md
├── .dockerignore
├── .github
│   ├── dependabot.yml
│   ├── ISSUE_TEMPLATE
│   │   ├── bug_report.md
│   │   ├── config.yml
│   │   ├── documentation.md
│   │   └── feature_request.md
│   └── workflows
│       ├── claude-code-review.yml
│       ├── claude-issue-triage.yml
│       ├── claude.yml
│       ├── dev-release.yml
│       ├── docker.yml
│       ├── pr-title.yml
│       ├── release.yml
│       └── test.yml
├── .gitignore
├── .python-version
├── CHANGELOG.md
├── CITATION.cff
├── CLA.md
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── docker-compose.yml
├── Dockerfile
├── docs
│   ├── ai-assistant-guide-extended.md
│   ├── character-handling.md
│   ├── cloud-cli.md
│   └── Docker.md
├── justfile
├── LICENSE
├── llms-install.md
├── pyproject.toml
├── README.md
├── SECURITY.md
├── smithery.yaml
├── specs
│   ├── SPEC-1 Specification-Driven Development Process.md
│   ├── SPEC-10 Unified Deployment Workflow and Event Tracking.md
│   ├── SPEC-11 Basic Memory API Performance Optimization.md
│   ├── SPEC-12 OpenTelemetry Observability.md
│   ├── SPEC-13 CLI Authentication with Subscription Validation.md
│   ├── SPEC-14 Cloud Git Versioning & GitHub Backup.md
│   ├── SPEC-14- Cloud Git Versioning & GitHub Backup.md
│   ├── SPEC-15 Configuration Persistence via Tigris for Cloud Tenants.md
│   ├── SPEC-16 MCP Cloud Service Consolidation.md
│   ├── SPEC-17 Semantic Search with ChromaDB.md
│   ├── SPEC-18 AI Memory Management Tool.md
│   ├── SPEC-19 Sync Performance and Memory Optimization.md
│   ├── SPEC-2 Slash Commands Reference.md
│   ├── SPEC-20 Simplified Project-Scoped Rclone Sync.md
│   ├── SPEC-3 Agent Definitions.md
│   ├── SPEC-4 Notes Web UI Component Architecture.md
│   ├── SPEC-5 CLI Cloud Upload via WebDAV.md
│   ├── SPEC-6 Explicit Project Parameter Architecture.md
│   ├── SPEC-7 POC to spike Tigris Turso for local access to cloud data.md
│   ├── SPEC-8 TigrisFS Integration.md
│   ├── SPEC-9 Multi-Project Bidirectional Sync Architecture.md
│   ├── SPEC-9 Signed Header Tenant Information.md
│   └── SPEC-9-1 Follow-Ups- Conflict, Sync, and Observability.md
├── src
│   └── basic_memory
│       ├── __init__.py
│       ├── alembic
│       │   ├── alembic.ini
│       │   ├── env.py
│       │   ├── migrations.py
│       │   ├── script.py.mako
│       │   └── versions
│       │       ├── 3dae7c7b1564_initial_schema.py
│       │       ├── 502b60eaa905_remove_required_from_entity_permalink.py
│       │       ├── 5fe1ab1ccebe_add_projects_table.py
│       │       ├── 647e7a75e2cd_project_constraint_fix.py
│       │       ├── 9d9c1cb7d8f5_add_mtime_and_size_columns_to_entity_.py
│       │       ├── a1b2c3d4e5f6_fix_project_foreign_keys.py
│       │       ├── b3c3938bacdb_relation_to_name_unique_index.py
│       │       ├── cc7172b46608_update_search_index_schema.py
│       │       └── e7e1f4367280_add_scan_watermark_tracking_to_project.py
│       ├── api
│       │   ├── __init__.py
│       │   ├── app.py
│       │   ├── routers
│       │   │   ├── __init__.py
│       │   │   ├── directory_router.py
│       │   │   ├── importer_router.py
│       │   │   ├── knowledge_router.py
│       │   │   ├── management_router.py
│       │   │   ├── memory_router.py
│       │   │   ├── project_router.py
│       │   │   ├── prompt_router.py
│       │   │   ├── resource_router.py
│       │   │   ├── search_router.py
│       │   │   └── utils.py
│       │   └── template_loader.py
│       ├── cli
│       │   ├── __init__.py
│       │   ├── app.py
│       │   ├── auth.py
│       │   ├── commands
│       │   │   ├── __init__.py
│       │   │   ├── cloud
│       │   │   │   ├── __init__.py
│       │   │   │   ├── api_client.py
│       │   │   │   ├── bisync_commands.py
│       │   │   │   ├── cloud_utils.py
│       │   │   │   ├── core_commands.py
│       │   │   │   ├── rclone_commands.py
│       │   │   │   ├── rclone_config.py
│       │   │   │   ├── rclone_installer.py
│       │   │   │   ├── upload_command.py
│       │   │   │   └── upload.py
│       │   │   ├── command_utils.py
│       │   │   ├── db.py
│       │   │   ├── import_chatgpt.py
│       │   │   ├── import_claude_conversations.py
│       │   │   ├── import_claude_projects.py
│       │   │   ├── import_memory_json.py
│       │   │   ├── mcp.py
│       │   │   ├── project.py
│       │   │   ├── status.py
│       │   │   └── tool.py
│       │   └── main.py
│       ├── config.py
│       ├── db.py
│       ├── deps.py
│       ├── file_utils.py
│       ├── ignore_utils.py
│       ├── importers
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── chatgpt_importer.py
│       │   ├── claude_conversations_importer.py
│       │   ├── claude_projects_importer.py
│       │   ├── memory_json_importer.py
│       │   └── utils.py
│       ├── markdown
│       │   ├── __init__.py
│       │   ├── entity_parser.py
│       │   ├── markdown_processor.py
│       │   ├── plugins.py
│       │   ├── schemas.py
│       │   └── utils.py
│       ├── mcp
│       │   ├── __init__.py
│       │   ├── async_client.py
│       │   ├── project_context.py
│       │   ├── prompts
│       │   │   ├── __init__.py
│       │   │   ├── ai_assistant_guide.py
│       │   │   ├── continue_conversation.py
│       │   │   ├── recent_activity.py
│       │   │   ├── search.py
│       │   │   └── utils.py
│       │   ├── resources
│       │   │   ├── ai_assistant_guide.md
│       │   │   └── project_info.py
│       │   ├── server.py
│       │   └── tools
│       │       ├── __init__.py
│       │       ├── build_context.py
│       │       ├── canvas.py
│       │       ├── chatgpt_tools.py
│       │       ├── delete_note.py
│       │       ├── edit_note.py
│       │       ├── list_directory.py
│       │       ├── move_note.py
│       │       ├── project_management.py
│       │       ├── read_content.py
│       │       ├── read_note.py
│       │       ├── recent_activity.py
│       │       ├── search.py
│       │       ├── utils.py
│       │       ├── view_note.py
│       │       └── write_note.py
│       ├── models
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── knowledge.py
│       │   ├── project.py
│       │   └── search.py
│       ├── repository
│       │   ├── __init__.py
│       │   ├── entity_repository.py
│       │   ├── observation_repository.py
│       │   ├── project_info_repository.py
│       │   ├── project_repository.py
│       │   ├── relation_repository.py
│       │   ├── repository.py
│       │   └── search_repository.py
│       ├── schemas
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── cloud.py
│       │   ├── delete.py
│       │   ├── directory.py
│       │   ├── importer.py
│       │   ├── memory.py
│       │   ├── project_info.py
│       │   ├── prompt.py
│       │   ├── request.py
│       │   ├── response.py
│       │   ├── search.py
│       │   └── sync_report.py
│       ├── services
│       │   ├── __init__.py
│       │   ├── context_service.py
│       │   ├── directory_service.py
│       │   ├── entity_service.py
│       │   ├── exceptions.py
│       │   ├── file_service.py
│       │   ├── initialization.py
│       │   ├── link_resolver.py
│       │   ├── project_service.py
│       │   ├── search_service.py
│       │   └── service.py
│       ├── sync
│       │   ├── __init__.py
│       │   ├── background_sync.py
│       │   ├── sync_service.py
│       │   └── watch_service.py
│       ├── templates
│       │   └── prompts
│       │       ├── continue_conversation.hbs
│       │       └── search.hbs
│       └── utils.py
├── test-int
│   ├── BENCHMARKS.md
│   ├── cli
│   │   ├── test_project_commands_integration.py
│   │   └── test_version_integration.py
│   ├── conftest.py
│   ├── mcp
│   │   ├── test_build_context_underscore.py
│   │   ├── test_build_context_validation.py
│   │   ├── test_chatgpt_tools_integration.py
│   │   ├── test_default_project_mode_integration.py
│   │   ├── test_delete_note_integration.py
│   │   ├── test_edit_note_integration.py
│   │   ├── test_list_directory_integration.py
│   │   ├── test_move_note_integration.py
│   │   ├── test_project_management_integration.py
│   │   ├── test_project_state_sync_integration.py
│   │   ├── test_read_content_integration.py
│   │   ├── test_read_note_integration.py
│   │   ├── test_search_integration.py
│   │   ├── test_single_project_mcp_integration.py
│   │   └── test_write_note_integration.py
│   ├── test_db_wal_mode.py
│   ├── test_disable_permalinks_integration.py
│   └── test_sync_performance_benchmark.py
├── tests
│   ├── __init__.py
│   ├── api
│   │   ├── conftest.py
│   │   ├── test_async_client.py
│   │   ├── test_continue_conversation_template.py
│   │   ├── test_directory_router.py
│   │   ├── test_importer_router.py
│   │   ├── test_knowledge_router.py
│   │   ├── test_management_router.py
│   │   ├── test_memory_router.py
│   │   ├── test_project_router_operations.py
│   │   ├── test_project_router.py
│   │   ├── test_prompt_router.py
│   │   ├── test_relation_background_resolution.py
│   │   ├── test_resource_router.py
│   │   ├── test_search_router.py
│   │   ├── test_search_template.py
│   │   ├── test_template_loader_helpers.py
│   │   └── test_template_loader.py
│   ├── cli
│   │   ├── conftest.py
│   │   ├── test_cli_tools.py
│   │   ├── test_cloud_authentication.py
│   │   ├── test_ignore_utils.py
│   │   ├── test_import_chatgpt.py
│   │   ├── test_import_claude_conversations.py
│   │   ├── test_import_claude_projects.py
│   │   ├── test_import_memory_json.py
│   │   ├── test_project_add_with_local_path.py
│   │   └── test_upload.py
│   ├── conftest.py
│   ├── db
│   │   └── test_issue_254_foreign_key_constraints.py
│   ├── importers
│   │   ├── test_importer_base.py
│   │   └── test_importer_utils.py
│   ├── markdown
│   │   ├── __init__.py
│   │   ├── test_date_frontmatter_parsing.py
│   │   ├── test_entity_parser_error_handling.py
│   │   ├── test_entity_parser.py
│   │   ├── test_markdown_plugins.py
│   │   ├── test_markdown_processor.py
│   │   ├── test_observation_edge_cases.py
│   │   ├── test_parser_edge_cases.py
│   │   ├── test_relation_edge_cases.py
│   │   └── test_task_detection.py
│   ├── mcp
│   │   ├── conftest.py
│   │   ├── test_obsidian_yaml_formatting.py
│   │   ├── test_permalink_collision_file_overwrite.py
│   │   ├── test_prompts.py
│   │   ├── test_resources.py
│   │   ├── test_tool_build_context.py
│   │   ├── test_tool_canvas.py
│   │   ├── test_tool_delete_note.py
│   │   ├── test_tool_edit_note.py
│   │   ├── test_tool_list_directory.py
│   │   ├── test_tool_move_note.py
│   │   ├── test_tool_read_content.py
│   │   ├── test_tool_read_note.py
│   │   ├── test_tool_recent_activity.py
│   │   ├── test_tool_resource.py
│   │   ├── test_tool_search.py
│   │   ├── test_tool_utils.py
│   │   ├── test_tool_view_note.py
│   │   ├── test_tool_write_note.py
│   │   └── tools
│   │       └── test_chatgpt_tools.py
│   ├── Non-MarkdownFileSupport.pdf
│   ├── repository
│   │   ├── test_entity_repository_upsert.py
│   │   ├── test_entity_repository.py
│   │   ├── test_entity_upsert_issue_187.py
│   │   ├── test_observation_repository.py
│   │   ├── test_project_info_repository.py
│   │   ├── test_project_repository.py
│   │   ├── test_relation_repository.py
│   │   ├── test_repository.py
│   │   ├── test_search_repository_edit_bug_fix.py
│   │   └── test_search_repository.py
│   ├── schemas
│   │   ├── test_base_timeframe_minimum.py
│   │   ├── test_memory_serialization.py
│   │   ├── test_memory_url_validation.py
│   │   ├── test_memory_url.py
│   │   ├── test_schemas.py
│   │   └── test_search.py
│   ├── Screenshot.png
│   ├── services
│   │   ├── test_context_service.py
│   │   ├── test_directory_service.py
│   │   ├── test_entity_service_disable_permalinks.py
│   │   ├── test_entity_service.py
│   │   ├── test_file_service.py
│   │   ├── test_initialization.py
│   │   ├── test_link_resolver.py
│   │   ├── test_project_removal_bug.py
│   │   ├── test_project_service_operations.py
│   │   ├── test_project_service.py
│   │   └── test_search_service.py
│   ├── sync
│   │   ├── test_character_conflicts.py
│   │   ├── test_sync_service_incremental.py
│   │   ├── test_sync_service.py
│   │   ├── test_sync_wikilink_issue.py
│   │   ├── test_tmp_files.py
│   │   ├── test_watch_service_edge_cases.py
│   │   ├── test_watch_service_reload.py
│   │   └── test_watch_service.py
│   ├── test_config.py
│   ├── test_db_migration_deduplication.py
│   ├── test_deps.py
│   ├── test_production_cascade_delete.py
│   ├── test_rclone_commands.py
│   └── utils
│       ├── test_file_utils.py
│       ├── test_frontmatter_obsidian_compatible.py
│       ├── test_parse_tags.py
│       ├── test_permalink_formatting.py
│       ├── test_utf8_handling.py
│       └── test_validate_project_path.py
├── uv.lock
├── v0.15.0-RELEASE-DOCS.md
└── v15-docs
    ├── api-performance.md
    ├── background-relations.md
    ├── basic-memory-home.md
    ├── bug-fixes.md
    ├── chatgpt-integration.md
    ├── cloud-authentication.md
    ├── cloud-bisync.md
    ├── cloud-mode-usage.md
    ├── cloud-mount.md
    ├── default-project-mode.md
    ├── env-file-removal.md
    ├── env-var-overrides.md
    ├── explicit-project-parameter.md
    ├── gitignore-integration.md
    ├── project-root-env-var.md
    ├── README.md
    └── sqlite-performance.md
```
# Files
--------------------------------------------------------------------------------
/tests/api/test_project_router.py:
--------------------------------------------------------------------------------
```python
"""Tests for the project router API endpoints."""
import tempfile
from pathlib import Path
import pytest
from basic_memory.schemas.project_info import ProjectItem
@pytest.mark.asyncio
async def test_get_project_item(test_graph, client, project_config, test_project, project_url):
    """Test the project item endpoint returns correctly structured data."""
    # Set up some test data in the database
    # Call the endpoint
    response = await client.get(f"{project_url}/project/item")
    # Verify response
    assert response.status_code == 200
    project_info = ProjectItem.model_validate(response.json())
    assert project_info.name == test_project.name
    assert project_info.path == test_project.path
    assert project_info.is_default == test_project.is_default
@pytest.mark.asyncio
async def test_get_project_item_not_found(
    test_graph, client, project_config, test_project, project_url
):
    """Test the project item endpoint returns correctly structured data."""
    # Set up some test data in the database
    # Call the endpoint
    response = await client.get("/not-found/project/item")
    # Verify response
    assert response.status_code == 404
@pytest.mark.asyncio
async def test_get_default_project(test_graph, client, project_config, test_project, project_url):
    """Test the default project item endpoint returns the default project."""
    # Set up some test data in the database
    # Call the endpoint
    response = await client.get("/projects/default")
    # Verify response
    assert response.status_code == 200
    project_info = ProjectItem.model_validate(response.json())
    assert project_info.name == test_project.name
    assert project_info.path == test_project.path
    assert project_info.is_default == test_project.is_default
@pytest.mark.asyncio
async def test_get_project_info_endpoint(test_graph, client, project_config, project_url):
    """Test the project-info endpoint returns correctly structured data."""
    # Set up some test data in the database
    # Call the endpoint
    response = await client.get(f"{project_url}/project/info")
    # Verify response
    assert response.status_code == 200
    data = response.json()
    # Check top-level keys
    assert "project_name" in data
    assert "project_path" in data
    assert "available_projects" in data
    assert "default_project" in data
    assert "statistics" in data
    assert "activity" in data
    assert "system" in data
    # Check statistics
    stats = data["statistics"]
    assert "total_entities" in stats
    assert stats["total_entities"] >= 0
    assert "total_observations" in stats
    assert stats["total_observations"] >= 0
    assert "total_relations" in stats
    assert stats["total_relations"] >= 0
    # Check activity
    activity = data["activity"]
    assert "recently_created" in activity
    assert "recently_updated" in activity
    assert "monthly_growth" in activity
    # Check system
    system = data["system"]
    assert "version" in system
    assert "database_path" in system
    assert "database_size" in system
    assert "timestamp" in system
@pytest.mark.asyncio
async def test_get_project_info_content(test_graph, client, project_config, project_url):
    """Test that project-info contains actual data from the test database."""
    # Call the endpoint
    response = await client.get(f"{project_url}/project/info")
    # Verify response
    assert response.status_code == 200
    data = response.json()
    # Check that test_graph content is reflected in statistics
    stats = data["statistics"]
    # Our test graph should have at least a few entities
    assert stats["total_entities"] > 0
    # It should also have some observations
    assert stats["total_observations"] > 0
    # And relations
    assert stats["total_relations"] > 0
    # Check that entity types include 'test'
    assert "test" in stats["entity_types"] or "entity" in stats["entity_types"]
@pytest.mark.asyncio
async def test_list_projects_endpoint(test_config, test_graph, client, project_config, project_url):
    """Test the list projects endpoint returns correctly structured data."""
    # Call the endpoint
    response = await client.get("/projects/projects")
    # Verify response
    assert response.status_code == 200
    data = response.json()
    # Check that the response contains expected fields
    assert "projects" in data
    assert "default_project" in data
    # Check that projects is a list
    assert isinstance(data["projects"], list)
    # There should be at least one project (the test project)
    assert len(data["projects"]) > 0
    # Verify project item structure
    if data["projects"]:
        project = data["projects"][0]
        assert "name" in project
        assert "path" in project
        assert "is_default" in project
        # Default project should be marked
        default_project = next((p for p in data["projects"] if p["is_default"]), None)
        assert default_project is not None
        assert default_project["name"] == data["default_project"]
@pytest.mark.asyncio
async def test_remove_project_endpoint(test_config, client, project_service):
    """Test the remove project endpoint."""
    # First create a test project to remove
    test_project_name = "test-remove-project"
    await project_service.add_project(test_project_name, "/tmp/test-remove-project")
    # Verify it exists
    project = await project_service.get_project(test_project_name)
    assert project is not None
    # Remove the project
    response = await client.delete(f"/projects/{test_project_name}")
    # Verify response
    assert response.status_code == 200
    data = response.json()
    # Check response structure
    assert "message" in data
    assert "status" in data
    assert data["status"] == "success"
    assert "old_project" in data
    assert data["old_project"]["name"] == test_project_name
    # Verify project is actually removed
    removed_project = await project_service.get_project(test_project_name)
    assert removed_project is None
@pytest.mark.asyncio
async def test_set_default_project_endpoint(test_config, client, project_service):
    """Test the set default project endpoint."""
    # Create a test project to set as default
    test_project_name = "test-default-project"
    await project_service.add_project(test_project_name, "/tmp/test-default-project")
    # Set it as default
    response = await client.put(f"/projects/{test_project_name}/default")
    # Verify response
    assert response.status_code == 200
    data = response.json()
    # Check response structure
    assert "message" in data
    assert "status" in data
    assert data["status"] == "success"
    assert "new_project" in data
    assert data["new_project"]["name"] == test_project_name
    # Verify it's actually set as default
    assert project_service.default_project == test_project_name
@pytest.mark.asyncio
async def test_update_project_path_endpoint(test_config, client, project_service, project_url):
    """Test the update project endpoint for changing project path."""
    # Create a test project to update
    test_project_name = "test-update-project"
    with tempfile.TemporaryDirectory() as temp_dir:
        test_root = Path(temp_dir)
        old_path = (test_root / "old-location").as_posix()
        new_path = (test_root / "new-location").as_posix()
        await project_service.add_project(test_project_name, old_path)
        try:
            # Verify initial state
            project = await project_service.get_project(test_project_name)
            assert project is not None
            assert project.path == old_path
            # Update the project path
            response = await client.patch(
                f"{project_url}/project/{test_project_name}", json={"path": new_path}
            )
            # Verify response
            assert response.status_code == 200
            data = response.json()
            # Check response structure
            assert "message" in data
            assert "status" in data
            assert data["status"] == "success"
            assert "old_project" in data
            assert "new_project" in data
            # Check old project data
            assert data["old_project"]["name"] == test_project_name
            assert data["old_project"]["path"] == old_path
            # Check new project data
            assert data["new_project"]["name"] == test_project_name
            assert data["new_project"]["path"] == new_path
            # Verify project was actually updated in database
            updated_project = await project_service.get_project(test_project_name)
            assert updated_project is not None
            assert updated_project.path == new_path
        finally:
            # Clean up
            try:
                await project_service.remove_project(test_project_name)
            except Exception:
                pass
@pytest.mark.asyncio
async def test_update_project_is_active_endpoint(test_config, client, project_service, project_url):
    """Test the update project endpoint for changing is_active status."""
    # Create a test project to update
    test_project_name = "test-update-active-project"
    test_path = "/tmp/test-update-active"
    await project_service.add_project(test_project_name, test_path)
    try:
        # Update the project is_active status
        response = await client.patch(
            f"{project_url}/project/{test_project_name}", json={"is_active": False}
        )
        # Verify response
        assert response.status_code == 200
        data = response.json()
        # Check response structure
        assert "message" in data
        assert "status" in data
        assert data["status"] == "success"
        assert f"Project '{test_project_name}' updated successfully" == data["message"]
    finally:
        # Clean up
        try:
            await project_service.remove_project(test_project_name)
        except Exception:
            pass
@pytest.mark.asyncio
async def test_update_project_both_params_endpoint(
    test_config, client, project_service, project_url
):
    """Test the update project endpoint with both path and is_active parameters."""
    # Create a test project to update
    test_project_name = "test-update-both-project"
    with tempfile.TemporaryDirectory() as temp_dir:
        test_root = Path(temp_dir)
        old_path = (test_root / "old-location").as_posix()
        new_path = (test_root / "new-location").as_posix()
        await project_service.add_project(test_project_name, old_path)
        try:
            # Update both path and is_active (path should take precedence)
            response = await client.patch(
                f"{project_url}/project/{test_project_name}",
                json={"path": new_path, "is_active": False},
            )
            # Verify response
            assert response.status_code == 200
            data = response.json()
            # Check that path update was performed (takes precedence)
            assert data["new_project"]["path"] == new_path
            # Verify project was actually updated in database
            updated_project = await project_service.get_project(test_project_name)
            assert updated_project is not None
            assert updated_project.path == new_path
        finally:
            # Clean up
            try:
                await project_service.remove_project(test_project_name)
            except Exception:
                pass
@pytest.mark.asyncio
async def test_update_project_nonexistent_endpoint(client, project_url):
    """Test the update project endpoint with a nonexistent project."""
    # Try to update a project that doesn't exist
    response = await client.patch(
        f"{project_url}/project/nonexistent-project", json={"path": "/tmp/new-path"}
    )
    # Should return 400 error
    assert response.status_code == 400
    data = response.json()
    assert "detail" in data
    assert "not found in configuration" in data["detail"]
@pytest.mark.asyncio
async def test_update_project_relative_path_error_endpoint(
    test_config, client, project_service, project_url
):
    """Test the update project endpoint with relative path (should fail)."""
    # Create a test project to update
    test_project_name = "test-update-relative-project"
    test_path = "/tmp/test-update-relative"
    await project_service.add_project(test_project_name, test_path)
    try:
        # Try to update with relative path
        response = await client.patch(
            f"{project_url}/project/{test_project_name}", json={"path": "./relative-path"}
        )
        # Should return 400 error
        assert response.status_code == 400
        data = response.json()
        assert "detail" in data
        assert "Path must be absolute" in data["detail"]
    finally:
        # Clean up
        try:
            await project_service.remove_project(test_project_name)
        except Exception:
            pass
@pytest.mark.asyncio
async def test_update_project_no_params_endpoint(test_config, client, project_service, project_url):
    """Test the update project endpoint with no parameters (should fail)."""
    # Create a test project to update
    test_project_name = "test-update-no-params-project"
    test_path = "/tmp/test-update-no-params"
    await project_service.add_project(test_project_name, test_path)
    proj_info = await project_service.get_project(test_project_name)
    assert proj_info.name == test_project_name
    # On Windows the path is prepended with a drive letter
    assert test_path in proj_info.path
    try:
        # Try to update with no parameters
        response = await client.patch(f"{project_url}/project/{test_project_name}", json={})
        # Should return 200 (no-op)
        assert response.status_code == 200
        proj_info = await project_service.get_project(test_project_name)
        assert proj_info.name == test_project_name
        # On Windows the path is prepended with a drive letter
        assert test_path in proj_info.path
    finally:
        # Clean up
        try:
            await project_service.remove_project(test_project_name)
        except Exception:
            pass
@pytest.mark.asyncio
async def test_update_project_empty_path_endpoint(
    test_config, client, project_service, project_url
):
    """Test the update project endpoint with empty path parameter."""
    # Create a test project to update
    test_project_name = "test-update-empty-path-project"
    test_path = "/tmp/test-update-empty-path"
    await project_service.add_project(test_project_name, test_path)
    try:
        # Try to update with empty/null path - should be treated as no path update
        response = await client.patch(
            f"{project_url}/project/{test_project_name}", json={"path": None, "is_active": True}
        )
        # Should succeed and perform is_active update
        assert response.status_code == 200
        data = response.json()
        assert data["status"] == "success"
    finally:
        # Clean up
        try:
            await project_service.remove_project(test_project_name)
        except Exception:
            pass
@pytest.mark.asyncio
async def test_sync_project_endpoint(test_graph, client, project_url):
    """Test the project sync endpoint initiates background sync."""
    # Call the sync endpoint
    response = await client.post(f"{project_url}/project/sync")
    # Verify response
    assert response.status_code == 200
    data = response.json()
    # Check response structure
    assert "status" in data
    assert "message" in data
    assert data["status"] == "sync_started"
    assert "Filesystem sync initiated" in data["message"]
@pytest.mark.asyncio
async def test_sync_project_endpoint_with_force_full(test_graph, client, project_url):
    """Test the project sync endpoint with force_full parameter."""
    # Call the sync endpoint with force_full=true
    response = await client.post(f"{project_url}/project/sync?force_full=true")
    # Verify response
    assert response.status_code == 200
    data = response.json()
    # Check response structure
    assert "status" in data
    assert "message" in data
    assert data["status"] == "sync_started"
    assert "Filesystem sync initiated" in data["message"]
@pytest.mark.asyncio
async def test_sync_project_endpoint_with_force_full_false(test_graph, client, project_url):
    """Test the project sync endpoint with force_full=false."""
    # Call the sync endpoint with force_full=false
    response = await client.post(f"{project_url}/project/sync?force_full=false")
    # Verify response
    assert response.status_code == 200
    data = response.json()
    # Check response structure
    assert "status" in data
    assert "message" in data
    assert data["status"] == "sync_started"
    assert "Filesystem sync initiated" in data["message"]
@pytest.mark.asyncio
async def test_sync_project_endpoint_not_found(client):
    """Test the project sync endpoint with nonexistent project."""
    # Call the sync endpoint for a project that doesn't exist
    response = await client.post("/nonexistent-project/project/sync")
    # Should return 404
    assert response.status_code == 404
@pytest.mark.asyncio
async def test_remove_default_project_fails(test_config, client, project_service):
    """Test that removing the default project returns an error."""
    # Get the current default project
    default_project_name = project_service.default_project
    # Try to remove the default project
    response = await client.delete(f"/projects/{default_project_name}")
    # Should return 400 with helpful error message
    assert response.status_code == 400
    data = response.json()
    assert "detail" in data
    assert "Cannot delete default project" in data["detail"]
    assert default_project_name in data["detail"]
@pytest.mark.asyncio
async def test_remove_default_project_with_alternatives(test_config, client, project_service):
    """Test that error message includes alternative projects when trying to delete default."""
    # Get the current default project
    default_project_name = project_service.default_project
    # Create another project so there are alternatives
    test_project_name = "test-alternative-project"
    await project_service.add_project(test_project_name, "/tmp/test-alternative")
    try:
        # Try to remove the default project
        response = await client.delete(f"/projects/{default_project_name}")
        # Should return 400 with helpful error message including alternatives
        assert response.status_code == 400
        data = response.json()
        assert "detail" in data
        assert "Cannot delete default project" in data["detail"]
        assert "Set another project as default first" in data["detail"]
        assert test_project_name in data["detail"]
    finally:
        # Clean up
        try:
            await project_service.remove_project(test_project_name)
        except Exception:
            pass
@pytest.mark.asyncio
async def test_remove_non_default_project_succeeds(test_config, client, project_service):
    """Test that removing a non-default project succeeds."""
    # Create a test project to remove
    test_project_name = "test-remove-non-default"
    await project_service.add_project(test_project_name, "/tmp/test-remove-non-default")
    # Verify it's not the default
    assert project_service.default_project != test_project_name
    # Remove the project
    response = await client.delete(f"/projects/{test_project_name}")
    # Should succeed
    assert response.status_code == 200
    data = response.json()
    assert data["status"] == "success"
    # Verify project is removed
    removed_project = await project_service.get_project(test_project_name)
    assert removed_project is None
@pytest.mark.asyncio
async def test_set_nonexistent_project_as_default_fails(test_config, client, project_service):
    """Test that setting a non-existent project as default returns 404."""
    # Try to set a project that doesn't exist as default
    response = await client.put("/projects/nonexistent-project/default")
    # Should return 404
    assert response.status_code == 404
    data = response.json()
    assert "detail" in data
    assert "does not exist" in data["detail"]
@pytest.mark.asyncio
async def test_create_project_idempotent_same_path(test_config, client, project_service):
    """Test that creating a project with same name and same path is idempotent."""
    # Create a project with platform-independent path
    test_project_name = "test-idempotent"
    with tempfile.TemporaryDirectory() as temp_dir:
        test_project_path = (Path(temp_dir) / "test-idempotent").as_posix()
        response1 = await client.post(
            "/projects/projects",
            json={"name": test_project_name, "path": test_project_path, "set_default": False},
        )
        # Should succeed with 201 Created
        assert response1.status_code == 201
        data1 = response1.json()
        assert data1["status"] == "success"
        assert data1["new_project"]["name"] == test_project_name
        # Try to create the same project again with same name and path
        response2 = await client.post(
            "/projects/projects",
            json={"name": test_project_name, "path": test_project_path, "set_default": False},
        )
        # Should also succeed (idempotent)
        assert response2.status_code == 200
        data2 = response2.json()
        assert data2["status"] == "success"
        assert "already exists" in data2["message"]
        assert data2["new_project"]["name"] == test_project_name
        # Normalize paths for cross-platform comparison
        assert Path(data2["new_project"]["path"]).resolve() == Path(test_project_path).resolve()
        # Clean up
        try:
            await project_service.remove_project(test_project_name)
        except Exception:
            pass
@pytest.mark.asyncio
async def test_create_project_fails_different_path(test_config, client, project_service):
    """Test that creating a project with same name but different path fails."""
    # Create a project
    test_project_name = "test-path-conflict"
    test_project_path1 = "/tmp/test-path-conflict-1"
    response1 = await client.post(
        "/projects/projects",
        json={"name": test_project_name, "path": test_project_path1, "set_default": False},
    )
    # Should succeed with 201 Created
    assert response1.status_code == 201
    # Try to create the same project with different path
    test_project_path2 = "/tmp/test-path-conflict-2"
    response2 = await client.post(
        "/projects/projects",
        json={"name": test_project_name, "path": test_project_path2, "set_default": False},
    )
    # Should fail with 400
    assert response2.status_code == 400
    data2 = response2.json()
    assert "detail" in data2
    assert "already exists with different path" in data2["detail"]
    assert test_project_path1 in data2["detail"]
    assert test_project_path2 in data2["detail"]
    # Clean up
    try:
        await project_service.remove_project(test_project_name)
    except Exception:
        pass
@pytest.mark.asyncio
async def test_remove_project_with_delete_notes_false(test_config, client, project_service):
    """Test that removing a project with delete_notes=False leaves directory intact."""
    # Create a test project with actual directory
    test_project_name = "test-remove-keep-files"
    with tempfile.TemporaryDirectory() as temp_dir:
        test_path = Path(temp_dir) / "test-project"
        test_path.mkdir()
        test_file = test_path / "test.md"
        test_file.write_text("# Test Note")
        await project_service.add_project(test_project_name, str(test_path))
        # Remove the project without deleting files (default)
        response = await client.delete(f"/projects/{test_project_name}")
        # Verify response
        assert response.status_code == 200
        data = response.json()
        assert data["status"] == "success"
        # Verify project is removed from config/db
        removed_project = await project_service.get_project(test_project_name)
        assert removed_project is None
        # Verify directory still exists
        assert test_path.exists()
        assert test_file.exists()
@pytest.mark.asyncio
async def test_remove_project_with_delete_notes_true(test_config, client, project_service):
    """Test that removing a project with delete_notes=True deletes the directory."""
    # Create a test project with actual directory
    test_project_name = "test-remove-delete-files"
    with tempfile.TemporaryDirectory() as temp_dir:
        test_path = Path(temp_dir) / "test-project"
        test_path.mkdir()
        test_file = test_path / "test.md"
        test_file.write_text("# Test Note")
        await project_service.add_project(test_project_name, str(test_path))
        # Remove the project with delete_notes=True
        response = await client.delete(f"/projects/{test_project_name}?delete_notes=true")
        # Verify response
        assert response.status_code == 200
        data = response.json()
        assert data["status"] == "success"
        # Verify project is removed from config/db
        removed_project = await project_service.get_project(test_project_name)
        assert removed_project is None
        # Verify directory is deleted
        assert not test_path.exists()
@pytest.mark.asyncio
async def test_remove_project_delete_notes_nonexistent_directory(
    test_config, client, project_service
):
    """Test that removing a project with delete_notes=True handles missing directory gracefully."""
    # Create a project pointing to a non-existent path
    test_project_name = "test-remove-missing-dir"
    test_path = "/tmp/this-directory-does-not-exist-12345"
    await project_service.add_project(test_project_name, test_path)
    # Remove the project with delete_notes=True (should not fail even if dir doesn't exist)
    response = await client.delete(f"/projects/{test_project_name}?delete_notes=true")
    # Should succeed
    assert response.status_code == 200
    data = response.json()
    assert data["status"] == "success"
    # Verify project is removed
    removed_project = await project_service.get_project(test_project_name)
    assert removed_project is None
```
--------------------------------------------------------------------------------
/tests/sync/test_sync_service_incremental.py:
--------------------------------------------------------------------------------
```python
"""Tests for incremental scan watermark optimization (Phase 1.5).
These tests verify the scan watermark feature that dramatically improves sync
performance on large projects by:
- Using find -newermt for incremental scans (only changed files)
- Tracking last_scan_timestamp and last_file_count
- Falling back to full scan when deletions detected
Expected performance improvements:
- No changes: 225x faster (2s vs 450s for 1,460 files)
- Few changes: 84x faster (5s vs 420s)
"""
import time
from pathlib import Path
from textwrap import dedent
import pytest
from basic_memory.config import ProjectConfig
from basic_memory.sync.sync_service import SyncService
async def create_test_file(path: Path, content: str = "test content") -> None:
    """Create a test file with given content."""
    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(content)
async def sleep_past_watermark(duration: float = 1.1) -> None:
    """Sleep long enough to ensure mtime is newer than watermark.
    Args:
        duration: Sleep duration in seconds (default 1.1s for filesystem precision)
    """
    time.sleep(duration)
# ==============================================================================
# Scan Strategy Selection Tests
# ==============================================================================
@pytest.mark.asyncio
async def test_first_sync_uses_full_scan(sync_service: SyncService, project_config: ProjectConfig):
    """Test that first sync (no watermark) triggers full scan."""
    project_dir = project_config.home
    # Create test files
    await create_test_file(project_dir / "file1.md", "# File 1\nContent 1")
    await create_test_file(project_dir / "file2.md", "# File 2\nContent 2")
    # First sync - should use full scan (no watermark exists)
    report = await sync_service.sync(project_dir)
    assert len(report.new) == 2
    assert "file1.md" in report.new
    assert "file2.md" in report.new
    # Verify watermark was set
    project = await sync_service.project_repository.find_by_id(
        sync_service.entity_repository.project_id
    )
    assert project.last_scan_timestamp is not None
    assert project.last_file_count >= 2  # May include config files
@pytest.mark.asyncio
async def test_file_count_decreased_triggers_full_scan(
    sync_service: SyncService, project_config: ProjectConfig
):
    """Test that file deletion (count decreased) triggers full scan."""
    project_dir = project_config.home
    # Create initial files
    await create_test_file(project_dir / "file1.md", "# File 1")
    await create_test_file(project_dir / "file2.md", "# File 2")
    await create_test_file(project_dir / "file3.md", "# File 3")
    # First sync
    await sync_service.sync(project_dir)
    # Delete a file
    (project_dir / "file2.md").unlink()
    # Sleep to ensure file operations complete
    await sleep_past_watermark()
    # Second sync - should detect deletion via full scan (file count decreased)
    report = await sync_service.sync(project_dir)
    assert len(report.deleted) == 1
    assert "file2.md" in report.deleted
@pytest.mark.asyncio
async def test_file_count_same_uses_incremental_scan(
    sync_service: SyncService, project_config: ProjectConfig
):
    """Test that same file count uses incremental scan."""
    project_dir = project_config.home
    # Create initial files
    await create_test_file(project_dir / "file1.md", "# File 1\nOriginal")
    await create_test_file(project_dir / "file2.md", "# File 2\nOriginal")
    # First sync
    await sync_service.sync(project_dir)
    # Sleep to ensure mtime will be newer than watermark
    await sleep_past_watermark()
    # Modify one file (file count stays the same)
    await create_test_file(project_dir / "file1.md", "# File 1\nModified")
    # Second sync - should use incremental scan (same file count)
    report = await sync_service.sync(project_dir)
    assert len(report.modified) == 1
    assert "file1.md" in report.modified
@pytest.mark.asyncio
async def test_file_count_increased_uses_incremental_scan(
    sync_service: SyncService, project_config: ProjectConfig
):
    """Test that increased file count still uses incremental scan."""
    project_dir = project_config.home
    # Create initial files
    await create_test_file(project_dir / "file1.md", "# File 1")
    await create_test_file(project_dir / "file2.md", "# File 2")
    # First sync
    await sync_service.sync(project_dir)
    # Sleep to ensure mtime will be newer than watermark
    await sleep_past_watermark()
    # Add a new file (file count increased)
    await create_test_file(project_dir / "file3.md", "# File 3")
    # Second sync - should use incremental scan and detect new file
    report = await sync_service.sync(project_dir)
    assert len(report.new) == 1
    assert "file3.md" in report.new
@pytest.mark.asyncio
async def test_force_full_bypasses_watermark_optimization(
    sync_service: SyncService, project_config: ProjectConfig
):
    """Test that force_full=True bypasses watermark optimization and scans all files.
    This is critical for detecting changes made by external tools like rclone bisync
    that don't update mtimes detectably. See issue #407.
    """
    project_dir = project_config.home
    # Create initial files
    await create_test_file(project_dir / "file1.md", "# File 1\nOriginal")
    await create_test_file(project_dir / "file2.md", "# File 2\nOriginal")
    # First sync - establishes watermark
    report = await sync_service.sync(project_dir)
    assert len(report.new) == 2
    # Verify watermark was set
    project = await sync_service.project_repository.find_by_id(
        sync_service.entity_repository.project_id
    )
    assert project.last_scan_timestamp is not None
    initial_timestamp = project.last_scan_timestamp
    # Sleep to ensure time passes
    await sleep_past_watermark()
    # Modify a file WITHOUT updating mtime (simulates external tool like rclone)
    # We set mtime to be BEFORE the watermark to ensure incremental scan won't detect it
    file_path = project_dir / "file1.md"
    file_path.stat()
    await create_test_file(file_path, "# File 1\nModified by external tool")
    # Set mtime to be before the watermark (use time from before first sync)
    # This simulates rclone bisync which may preserve original timestamps
    import os
    old_time = initial_timestamp - 10  # 10 seconds before watermark
    os.utime(file_path, (old_time, old_time))
    # Normal incremental sync should NOT detect the change (mtime before watermark)
    report = await sync_service.sync(project_dir)
    assert len(report.modified) == 0, (
        "Incremental scan should not detect changes with mtime older than watermark"
    )
    # Force full scan should detect the change via checksum comparison
    report = await sync_service.sync(project_dir, force_full=True)
    assert len(report.modified) == 1, "Force full scan should detect changes via checksum"
    assert "file1.md" in report.modified
    # Verify watermark was still updated after force_full
    project = await sync_service.project_repository.find_by_id(
        sync_service.entity_repository.project_id
    )
    assert project.last_scan_timestamp is not None
    assert project.last_scan_timestamp > initial_timestamp
# ==============================================================================
# Incremental Scan Base Cases
# ==============================================================================
@pytest.mark.asyncio
async def test_incremental_scan_no_changes(
    sync_service: SyncService, project_config: ProjectConfig
):
    """Test that incremental scan with no changes returns empty report."""
    project_dir = project_config.home
    # Create initial files
    await create_test_file(project_dir / "file1.md", "# File 1")
    await create_test_file(project_dir / "file2.md", "# File 2")
    # First sync
    await sync_service.sync(project_dir)
    # Sleep to ensure time passes
    await sleep_past_watermark()
    # Second sync - no changes
    report = await sync_service.sync(project_dir)
    assert len(report.new) == 0
    assert len(report.modified) == 0
    assert len(report.deleted) == 0
    assert len(report.moves) == 0
@pytest.mark.asyncio
async def test_incremental_scan_detects_new_file(
    sync_service: SyncService, project_config: ProjectConfig
):
    """Test that incremental scan detects newly created files."""
    project_dir = project_config.home
    # Create initial file
    await create_test_file(project_dir / "file1.md", "# File 1")
    # First sync
    await sync_service.sync(project_dir)
    # Sleep to ensure mtime will be newer than watermark
    await sleep_past_watermark()
    # Create new file
    await create_test_file(project_dir / "file2.md", "# File 2")
    # Second sync - should detect new file via incremental scan
    report = await sync_service.sync(project_dir)
    assert len(report.new) == 1
    assert "file2.md" in report.new
    assert len(report.modified) == 0
@pytest.mark.asyncio
async def test_incremental_scan_detects_modified_file(
    sync_service: SyncService, project_config: ProjectConfig
):
    """Test that incremental scan detects modified files."""
    project_dir = project_config.home
    # Create initial files
    file_path = project_dir / "file1.md"
    await create_test_file(file_path, "# File 1\nOriginal content")
    # First sync
    await sync_service.sync(project_dir)
    # Sleep to ensure mtime will be newer than watermark
    await sleep_past_watermark()
    # Modify the file
    await create_test_file(file_path, "# File 1\nModified content")
    # Second sync - should detect modification via incremental scan
    report = await sync_service.sync(project_dir)
    assert len(report.modified) == 1
    assert "file1.md" in report.modified
    assert len(report.new) == 0
@pytest.mark.asyncio
async def test_incremental_scan_detects_multiple_changes(
    sync_service: SyncService, project_config: ProjectConfig
):
    """Test that incremental scan detects multiple file changes."""
    project_dir = project_config.home
    # Create initial files
    await create_test_file(project_dir / "file1.md", "# File 1\nOriginal")
    await create_test_file(project_dir / "file2.md", "# File 2\nOriginal")
    await create_test_file(project_dir / "file3.md", "# File 3\nOriginal")
    # First sync
    await sync_service.sync(project_dir)
    # Sleep to ensure mtime will be newer than watermark
    await sleep_past_watermark()
    # Modify multiple files
    await create_test_file(project_dir / "file1.md", "# File 1\nModified")
    await create_test_file(project_dir / "file3.md", "# File 3\nModified")
    await create_test_file(project_dir / "file4.md", "# File 4\nNew")
    # Second sync - should detect all changes via incremental scan
    report = await sync_service.sync(project_dir)
    assert len(report.modified) == 2
    assert "file1.md" in report.modified
    assert "file3.md" in report.modified
    assert len(report.new) == 1
    assert "file4.md" in report.new
# ==============================================================================
# Deletion Detection Tests
# ==============================================================================
@pytest.mark.asyncio
async def test_deletion_triggers_full_scan_single_file(
    sync_service: SyncService, project_config: ProjectConfig
):
    """Test that deleting a single file triggers full scan."""
    project_dir = project_config.home
    # Create initial files
    await create_test_file(project_dir / "file1.md", "# File 1")
    await create_test_file(project_dir / "file2.md", "# File 2")
    await create_test_file(project_dir / "file3.md", "# File 3")
    # First sync
    report1 = await sync_service.sync(project_dir)
    assert len(report1.new) == 3
    # Delete one file
    (project_dir / "file2.md").unlink()
    # Sleep to ensure file operations complete
    await sleep_past_watermark()
    # Second sync - should trigger full scan due to decreased file count
    report2 = await sync_service.sync(project_dir)
    assert len(report2.deleted) == 1
    assert "file2.md" in report2.deleted
@pytest.mark.asyncio
async def test_deletion_triggers_full_scan_multiple_files(
    sync_service: SyncService, project_config: ProjectConfig
):
    """Test that deleting multiple files triggers full scan."""
    project_dir = project_config.home
    # Create initial files
    await create_test_file(project_dir / "file1.md", "# File 1")
    await create_test_file(project_dir / "file2.md", "# File 2")
    await create_test_file(project_dir / "file3.md", "# File 3")
    await create_test_file(project_dir / "file4.md", "# File 4")
    # First sync
    await sync_service.sync(project_dir)
    # Delete multiple files
    (project_dir / "file2.md").unlink()
    (project_dir / "file4.md").unlink()
    # Sleep to ensure file operations complete
    await sleep_past_watermark()
    # Second sync - should trigger full scan and detect both deletions
    report = await sync_service.sync(project_dir)
    assert len(report.deleted) == 2
    assert "file2.md" in report.deleted
    assert "file4.md" in report.deleted
# ==============================================================================
# Move Detection Tests
# ==============================================================================
@pytest.mark.asyncio
async def test_move_detection_requires_full_scan(
    sync_service: SyncService, project_config: ProjectConfig
):
    """Test that file moves require full scan to be detected (cannot detect in incremental).
    Moves (renames) don't update mtime, so incremental scans can't detect them.
    To trigger a full scan for move detection, we need file count to decrease.
    This test verifies moves are detected when combined with a deletion.
    """
    project_dir = project_config.home
    # Create initial files - include extra file to delete later
    old_path = project_dir / "old" / "file.md"
    content = dedent(
        """
        ---
        title: Test File
        type: note
        ---
        # Test File
        Distinctive content for move detection
        """
    ).strip()
    await create_test_file(old_path, content)
    await create_test_file(project_dir / "other.md", "# Other\nContent")
    # First sync
    await sync_service.sync(project_dir)
    # Sleep to ensure operations complete and watermark is in the past
    await sleep_past_watermark()
    # Move file AND delete another to trigger full scan
    # Move alone won't work because file count stays same (no full scan)
    new_path = project_dir / "new" / "moved.md"
    new_path.parent.mkdir(parents=True, exist_ok=True)
    old_path.rename(new_path)
    (project_dir / "other.md").unlink()  # Delete to trigger full scan
    # Second sync - full scan due to deletion, move detected via checksum
    report = await sync_service.sync(project_dir)
    assert len(report.moves) == 1
    assert "old/file.md" in report.moves
    assert report.moves["old/file.md"] == "new/moved.md"
    assert len(report.deleted) == 1
    assert "other.md" in report.deleted
@pytest.mark.asyncio
async def test_move_detection_in_full_scan(
    sync_service: SyncService, project_config: ProjectConfig
):
    """Test that file moves are detected via checksum in full scan."""
    project_dir = project_config.home
    # Create initial files
    old_path = project_dir / "old" / "file.md"
    content = dedent(
        """
        ---
        title: Test File
        type: note
        ---
        # Test File
        Distinctive content for move detection
        """
    ).strip()
    await create_test_file(old_path, content)
    await create_test_file(project_dir / "other.md", "# Other\nContent")
    # First sync
    await sync_service.sync(project_dir)
    # Sleep to ensure operations complete
    await sleep_past_watermark()
    # Move file AND delete another to trigger full scan
    new_path = project_dir / "new" / "moved.md"
    new_path.parent.mkdir(parents=True, exist_ok=True)
    old_path.rename(new_path)
    (project_dir / "other.md").unlink()
    # Second sync - full scan due to deletion, should still detect move
    report = await sync_service.sync(project_dir)
    assert len(report.moves) == 1
    assert "old/file.md" in report.moves
    assert report.moves["old/file.md"] == "new/moved.md"
    assert len(report.deleted) == 1
    assert "other.md" in report.deleted
# ==============================================================================
# Watermark Update Tests
# ==============================================================================
@pytest.mark.asyncio
async def test_watermark_updated_after_successful_sync(
    sync_service: SyncService, project_config: ProjectConfig
):
    """Test that watermark is updated after each successful sync."""
    project_dir = project_config.home
    # Create initial file
    await create_test_file(project_dir / "file1.md", "# File 1")
    # Get project before sync
    project_before = await sync_service.project_repository.find_by_id(
        sync_service.entity_repository.project_id
    )
    assert project_before.last_scan_timestamp is None
    assert project_before.last_file_count is None
    # First sync
    sync_start = time.time()
    await sync_service.sync(project_dir)
    sync_end = time.time()
    # Verify watermark was set
    project_after = await sync_service.project_repository.find_by_id(
        sync_service.entity_repository.project_id
    )
    assert project_after.last_scan_timestamp is not None
    assert project_after.last_file_count >= 1  # May include config files
    # Watermark should be between sync start and end
    assert sync_start <= project_after.last_scan_timestamp <= sync_end
@pytest.mark.asyncio
async def test_watermark_uses_sync_start_time(
    sync_service: SyncService, project_config: ProjectConfig
):
    """Test that watermark uses sync start time, not end time."""
    project_dir = project_config.home
    # Create initial file
    await create_test_file(project_dir / "file1.md", "# File 1")
    # First sync - capture timestamps
    sync_start = time.time()
    await sync_service.sync(project_dir)
    sync_end = time.time()
    # Get watermark
    project = await sync_service.project_repository.find_by_id(
        sync_service.entity_repository.project_id
    )
    # Watermark should be closer to start than end
    # (In practice, watermark == sync_start_timestamp captured in sync())
    time_from_start = abs(project.last_scan_timestamp - sync_start)
    time_from_end = abs(project.last_scan_timestamp - sync_end)
    assert time_from_start < time_from_end
@pytest.mark.asyncio
async def test_watermark_file_count_accurate(
    sync_service: SyncService, project_config: ProjectConfig
):
    """Test that watermark file count accurately reflects synced files."""
    project_dir = project_config.home
    # Create initial files
    await create_test_file(project_dir / "file1.md", "# File 1")
    await create_test_file(project_dir / "file2.md", "# File 2")
    await create_test_file(project_dir / "file3.md", "# File 3")
    # First sync
    await sync_service.sync(project_dir)
    # Verify file count
    project1 = await sync_service.project_repository.find_by_id(
        sync_service.entity_repository.project_id
    )
    initial_count = project1.last_file_count
    assert initial_count >= 3  # May include config files
    # Add more files
    await sleep_past_watermark()
    await create_test_file(project_dir / "file4.md", "# File 4")
    await create_test_file(project_dir / "file5.md", "# File 5")
    # Second sync
    await sync_service.sync(project_dir)
    # Verify updated count increased by 2
    project2 = await sync_service.project_repository.find_by_id(
        sync_service.entity_repository.project_id
    )
    assert project2.last_file_count == initial_count + 2
# ==============================================================================
# Edge Cases and Error Handling
# ==============================================================================
@pytest.mark.asyncio
async def test_concurrent_file_changes_handled_gracefully(
    sync_service: SyncService, project_config: ProjectConfig
):
    """Test that files created/modified during sync are handled correctly.
    Files created during sync (between start and file processing) should be
    caught in the next sync, not cause errors in the current sync.
    """
    project_dir = project_config.home
    # Create initial file
    await create_test_file(project_dir / "file1.md", "# File 1")
    # First sync
    await sync_service.sync(project_dir)
    # Sleep to ensure mtime will be newer
    await sleep_past_watermark()
    # Create file that will have mtime very close to watermark
    # In real scenarios, this could be created during sync
    await create_test_file(project_dir / "concurrent.md", "# Concurrent")
    # Should be caught in next sync without errors
    report = await sync_service.sync(project_dir)
    assert "concurrent.md" in report.new
@pytest.mark.asyncio
async def test_empty_directory_handles_incremental_scan(
    sync_service: SyncService, project_config: ProjectConfig
):
    """Test that incremental scan handles empty directories correctly."""
    project_dir = project_config.home
    # First sync with empty directory (no user files)
    report1 = await sync_service.sync(project_dir)
    assert len(report1.new) == 0
    # Verify watermark was set even for empty directory
    project = await sync_service.project_repository.find_by_id(
        sync_service.entity_repository.project_id
    )
    assert project.last_scan_timestamp is not None
    # May have config files, so just check it's set
    assert project.last_file_count is not None
    # Second sync - still empty (no new user files)
    report2 = await sync_service.sync(project_dir)
    assert len(report2.new) == 0
@pytest.mark.asyncio
async def test_incremental_scan_respects_gitignore(
    sync_service: SyncService, project_config: ProjectConfig
):
    """Test that incremental scan respects .gitignore patterns."""
    project_dir = project_config.home
    # Create .gitignore
    (project_dir / ".gitignore").write_text("*.ignored\n.hidden/\n")
    # Reload ignore patterns
    from basic_memory.ignore_utils import load_gitignore_patterns
    sync_service._ignore_patterns = load_gitignore_patterns(project_dir)
    # Create files - some should be ignored
    await create_test_file(project_dir / "included.md", "# Included")
    await create_test_file(project_dir / "excluded.ignored", "# Excluded")
    # First sync
    report1 = await sync_service.sync(project_dir)
    assert "included.md" in report1.new
    assert "excluded.ignored" not in report1.new
    # Sleep and add more files
    await sleep_past_watermark()
    await create_test_file(project_dir / "included2.md", "# Included 2")
    await create_test_file(project_dir / "excluded2.ignored", "# Excluded 2")
    # Second sync - incremental scan should also respect ignore patterns
    report2 = await sync_service.sync(project_dir)
    assert "included2.md" in report2.new
    assert "excluded2.ignored" not in report2.new
# ==============================================================================
# Relation Resolution Optimization Tests
# ==============================================================================
@pytest.mark.asyncio
async def test_relation_resolution_skipped_when_no_changes(
    sync_service: SyncService, project_config: ProjectConfig
):
    """Test that relation resolution is skipped when no file changes detected.
    This optimization prevents wasting time resolving relations when there are
    no changes, dramatically improving sync performance for large projects.
    """
    project_dir = project_config.home
    # Create initial file with wikilink
    content = dedent(
        """
        ---
        title: File with Link
        type: note
        ---
        # File with Link
        This links to [[Target File]]
        """
    ).strip()
    await create_test_file(project_dir / "file1.md", content)
    # First sync - will resolve relations (or leave unresolved)
    report1 = await sync_service.sync(project_dir)
    assert len(report1.new) == 1
    # Check that there are unresolved relations (target doesn't exist)
    unresolved = await sync_service.relation_repository.find_unresolved_relations()
    unresolved_count_before = len(unresolved)
    assert unresolved_count_before > 0  # Should have unresolved relation to [[Target File]]
    # Sleep to ensure time passes
    await sleep_past_watermark()
    # Second sync - no changes, should skip relation resolution
    report2 = await sync_service.sync(project_dir)
    assert report2.total == 0  # No changes detected
    # Verify unresolved relations count unchanged (resolution was skipped)
    unresolved_after = await sync_service.relation_repository.find_unresolved_relations()
    assert len(unresolved_after) == unresolved_count_before
@pytest.mark.asyncio
async def test_relation_resolution_runs_when_files_modified(
    sync_service: SyncService, project_config: ProjectConfig
):
    """Test that relation resolution runs when files are actually modified."""
    project_dir = project_config.home
    # Create file with unresolved wikilink
    content1 = dedent(
        """
        ---
        title: File with Link
        type: note
        ---
        # File with Link
        This links to [[Target File]]
        """
    ).strip()
    await create_test_file(project_dir / "file1.md", content1)
    # First sync
    await sync_service.sync(project_dir)
    # Verify unresolved relation exists
    unresolved_before = await sync_service.relation_repository.find_unresolved_relations()
    assert len(unresolved_before) > 0
    # Sleep to ensure mtime will be newer
    await sleep_past_watermark()
    # Create the target file (should resolve the relation)
    content2 = dedent(
        """
        ---
        title: Target File
        type: note
        ---
        # Target File
        This is the target.
        """
    ).strip()
    await create_test_file(project_dir / "target.md", content2)
    # Second sync - should detect new file and resolve relations
    report = await sync_service.sync(project_dir)
    assert len(report.new) == 1
    assert "target.md" in report.new
    # Verify relation was resolved (unresolved count decreased)
    unresolved_after = await sync_service.relation_repository.find_unresolved_relations()
    assert len(unresolved_after) < len(unresolved_before)
```
--------------------------------------------------------------------------------
/specs/SPEC-10 Unified Deployment Workflow and Event Tracking.md:
--------------------------------------------------------------------------------
```markdown
---
title: 'SPEC-10: Unified Deployment Workflow and Event Tracking'
type: spec
permalink: specs/spec-10-unified-deployment-workflow-event-tracking
tags:
- workflow
- deployment
- event-sourcing
- architecture
- simplification
---
# SPEC-10: Unified Deployment Workflow and Event Tracking
## Why
We replaced a complex multi-workflow system with DBOS orchestration that was proving to be more trouble than it was worth. The previous architecture had four separate workflows (`tenant_provisioning`, `tenant_update`, `tenant_deployment`, `tenant_undeploy`) with overlapping logic, complex state management, and fragmented event tracking. DBOS added unnecessary complexity without providing sufficient value, leading to harder debugging and maintenance.
**Problems Solved:**
- **Framework Complexity**: DBOS configuration overhead and fighting framework limitations
- **Code Duplication**: Multiple workflows implementing similar operations with duplicate logic
- **Poor Observability**: Fragmented event tracking across workflow boundaries
- **Maintenance Overhead**: Complex orchestration for fundamentally simple operations
- **Debugging Difficulty**: Framework abstractions hiding simple Python stack traces
## What
This spec documents the architectural simplification that consolidates tenant lifecycle management into a unified system with comprehensive event tracking.
**Affected Areas:**
- Tenant deployment workflows (provisioning, updates, undeploying)
- Event sourcing and workflow tracking infrastructure
- API endpoints for tenant operations
- Database schema for workflow and event correlation
- Integration testing for tenant lifecycle operations
**Key Changes:**
- **Removed DBOS entirely** - eliminated framework dependency and complexity
- **Consolidated 4 workflows → 2 unified deployment workflows (deploy/undeploy)**
- **Added workflow tracking system** with complete event correlation
- **Simplified API surface** - single `/deploy` endpoint handles all scenarios
- **Enhanced observability** through event sourcing with workflow grouping
## How (High Level)
### Architectural Philosophy
**Embrace simplicity over framework complexity** - use well-structured Python with proper database design instead of complex orchestration frameworks.
### Core Components
#### 1. Unified Deployment Workflow
```python
class TenantDeploymentWorkflow:
    async def deploy_tenant_workflow(self, tenant_id: str, workflow_id: UUID, image_tag: str = None):
        # Single workflow handles both initial provisioning AND updates
        # Each step is idempotent and handles its own error recovery
        # Database transactions provide the durability we need
        await self.start_deployment_step(workflow_id, tenant_uuid, image_tag)
        await self.create_fly_app_step(workflow_id, tenant_uuid)
        await self.create_bucket_step(workflow_id, tenant_uuid)
        await self.deploy_machine_step(workflow_id, tenant_uuid, image_tag)
        await self.complete_deployment_step(workflow_id, tenant_uuid, image_tag, deployment_time)
```
**Key Benefits:**
- **Handles both provisioning and updates** in single workflow
- **Idempotent operations** - safe to retry any step
- **Clean error handling** via simple Python exceptions
- **Resumable** - can restart from any failed step
#### 2. Workflow Tracking System
**Database Schema:**
```sql
CREATE TABLE workflow (
    id UUID PRIMARY KEY,
    workflow_type VARCHAR(50) NOT NULL,  -- 'tenant_deployment', 'tenant_undeploy'
    tenant_id UUID REFERENCES tenant(id),
    status VARCHAR(20) DEFAULT 'running', -- 'running', 'completed', 'failed'
    workflow_metadata JSONB DEFAULT '{}'  -- image_tag, etc.
);
ALTER TABLE event ADD COLUMN workflow_id UUID REFERENCES workflow(id);
```
**Event Correlation:**
- Every workflow operation generates events tagged with `workflow_id`
- Complete audit trail from workflow start to completion
- Events grouped by workflow for easy reconstruction of operations
#### 3. Parameter Standardization
All workflow methods follow consistent signature pattern:
```python
async def method_name(self, session: AsyncSession, workflow_id: UUID | None, tenant_id: UUID, ...)
```
**Benefits:**
- **Consistent event tagging** - all events properly correlated
- **Clear method contracts** - workflow_id always first parameter
- **Type safety** - proper UUID handling throughout
### Implementation Strategy
#### Phase 1: Workflow Consolidation ✅ COMPLETED
- [x] **Remove DBOS dependency** - eliminated dbos_config.py and all DBOS imports
- [x] **Create unified TenantDeploymentWorkflow** - handles both provisioning and updates
- [x] **Remove legacy workflows** - deleted tenant_provisioning.py, tenant_update.py
- [x] **Simplify API endpoints** - consolidated to single `/deploy` endpoint
- [x] **Update integration tests** - comprehensive edge case testing
#### Phase 2: Workflow Tracking System ✅ COMPLETED
- [x] **Database migration** - added workflow table and event.workflow_id foreign key
- [x] **Workflow repository** - CRUD operations for workflow records
- [x] **Event correlation** - all workflow events tagged with workflow_id
- [x] **Comprehensive testing** - workflow lifecycle and event grouping tests
#### Phase 3: Parameter Standardization ✅ COMPLETED
- [x] **Standardize method signatures** - workflow_id as first parameter pattern
- [x] **Fix event tagging** - ensure all workflow events properly correlated
- [x] **Update service methods** - consistent parameter order across tenant_service
- [x] **Integration test validation** - verify complete event sequences
### Architectural Benefits
#### Code Simplification
- **39 files changed**: 2,247 additions, 3,256 deletions (net -1,009 lines)
- **Eliminated framework complexity** - no more DBOS configuration or abstractions
- **Consolidated logic** - single deployment workflow vs 4 separate workflows
- **Cleaner API surface** - unified endpoint vs multiple workflow-specific endpoints
#### Enhanced Observability
- **Complete event correlation** - every workflow event tagged with workflow_id
- **Audit trail reconstruction** - can trace entire tenant lifecycle through events
- **Workflow status tracking** - running/completed/failed states in database
- **Comprehensive testing** - edge cases covered with real infrastructure
#### Operational Benefits
- **Simpler debugging** - plain Python stack traces vs framework abstractions
- **Reduced dependencies** - one less complex framework to maintain
- **Better error handling** - explicit exception handling vs framework magic
- **Easier maintenance** - straightforward Python code vs orchestration complexity
## How to Evaluate
### Success Criteria
#### Functional Completeness ✅ VERIFIED
- [x] **Unified deployment workflow** handles both initial provisioning and updates
- [x] **Undeploy workflow** properly integrated with event tracking
- [x] **All operations idempotent** - safe to retry any step without duplication
- [x] **Complete tenant lifecycle** - provision → active → update → undeploy
#### Event Tracking and Correlation ✅ VERIFIED
- [x] **All workflow events tagged** with proper workflow_id
- [x] **Event sequence verification** - tests assert exact event order and content
- [x] **Workflow grouping** - events can be queried by workflow_id for complete audit trail
- [x] **Cross-workflow isolation** - deployment vs undeploy events properly separated
#### Database Schema and Performance ✅ VERIFIED
- [x] **Migration applied** - workflow table and event.workflow_id column created
- [x] **Proper indexing** - performance optimized queries on workflow_type, tenant_id, status
- [x] **Foreign key constraints** - referential integrity between workflows and events
- [x] **Database triggers** - updated_at timestamp automation
#### Test Coverage ✅ COMPREHENSIVE
- [x] **Unit tests**: 4 workflow tracking tests covering lifecycle and event grouping
- [x] **Integration tests**: Real infrastructure testing with Fly.io resources
- [x] **Edge case coverage**: Failed deployments, partial state recovery, resource conflicts
- [x] **Event sequence verification**: Exact event order and content validation
### Testing Procedure
#### Unit Test Validation ✅ PASSING
```bash
cd apps/cloud && pytest tests/test_workflow_tracking.py -v
# 4/4 tests passing - workflow lifecycle and event grouping
```
#### Integration Test Validation ✅ PASSING
```bash
cd apps/cloud && pytest tests/integration/test_tenant_workflow_deployment_integration.py -v
cd apps/cloud && pytest tests/integration/test_tenant_workflow_undeploy_integration.py -v
# Comprehensive real infrastructure testing with actual Fly.io resources
# Tests provision → deploy → update → undeploy → cleanup cycles
```
### Performance Metrics
#### Code Metrics ✅ ACHIEVED
- **Net code reduction**: -1,009 lines (3,256 deletions, 2,247 additions)
- **Workflow consolidation**: 4 workflows → 1 unified deployment workflow
- **Dependency reduction**: Removed DBOS framework dependency entirely
- **API simplification**: Multiple endpoints → single `/deploy` endpoint
#### Operational Metrics ✅ VERIFIED
- **Event correlation**: 100% of workflow events properly tagged with workflow_id
- **Audit trail completeness**: Full tenant lifecycle traceable through event sequences
- **Error handling**: Clean Python exceptions vs framework abstractions
- **Debugging simplicity**: Direct stack traces vs orchestration complexity
### Implementation Status: ✅ COMPLETE
All phases completed successfully with comprehensive testing and verification:
**Phase 1 - Workflow Consolidation**: ✅ COMPLETE
- Removed DBOS dependency and consolidated workflows
- Unified deployment workflow handles all scenarios
- Comprehensive integration testing with real infrastructure
**Phase 2 - Workflow Tracking**: ✅ COMPLETE
- Database schema implemented with proper indexing
- Event correlation system fully functional
- Complete audit trail capability verified
**Phase 3 - Parameter Standardization**: ✅ COMPLETE
- Consistent method signatures across all workflow methods
- All events properly tagged with workflow_id
- Type safety verified across entire codebase
**Phase 4 - Asynchronous Job Queuing**:
**Goal**: Transform synchronous deployment workflows into background jobs for better user experience and system reliability.
**Current Problem**:
- Deployment API calls are synchronous - users wait for entire tenant provisioning (30-60 seconds)
- No retry mechanism for failed operations
- HTTP timeouts on long-running deployments
- Poor user experience during infrastructure provisioning
**Solution**: Redis-backed job queue with arq for reliable background processing
#### Architecture Overview
```python
# API Layer: Return immediately with job tracking
@router.post("/{tenant_id}/deploy")
async def deploy_tenant(tenant_id: UUID):
    # Create workflow record in Postgres
    workflow = await workflow_repo.create_workflow("tenant_deployment", tenant_id)
    # Enqueue job in Redis
    job = await arq_pool.enqueue_job('deploy_tenant_task', tenant_id, workflow.id)
    # Return job ID immediately
    return {"job_id": job.job_id, "workflow_id": workflow.id, "status": "queued"}
# Background Worker: Process via existing unified workflow
async def deploy_tenant_task(ctx, tenant_id: str, workflow_id: str):
    # Existing workflow logic - zero changes needed!
    await workflow_manager.deploy_tenant(UUID(tenant_id), workflow_id=UUID(workflow_id))
```
#### Implementation Tasks
**Phase 4.1: Core Job Queue Setup** ✅ COMPLETED
- [x] **Add arq dependency** - integrated Redis job queue with existing infrastructure
- [x] **Create job definitions** - wrapped existing deployment/undeploy workflows as arq tasks
- [x] **Update API endpoints** - updated provisioning endpoints to return job IDs instead of waiting for completion
- [x] **JobQueueService implementation** - service layer for job enqueueing and status tracking
- [x] **Job status tracking** - integrated with existing workflow table for status updates
- [x] **Comprehensive testing** - 18 tests covering positive, negative, and edge cases
**Phase 4.2: Background Worker Implementation** ✅ COMPLETED
- [x] **Job status API** - GET /jobs/{job_id}/status endpoint integrated with JobQueueService
- [x] **Background worker process** - arq worker to process queued jobs with proper settings and Redis configuration
- [x] **Worker settings and configuration** - WorkerSettings class with proper timeouts, max jobs, and error handling
- [x] **Fix API endpoints** - updated job status API to use JobQueueService instead of direct Redis access
- [x] **Integration testing** - comprehensive end-to-end testing with real ARQ workers and Fly.io infrastructure
- [x] **Worker entry points** - dual-purpose entrypoint.sh script and __main__.py module support for both API and worker processes
- [x] **Test fixture updates** - fixed all API and service test fixtures to work with job queue dependencies
- [x] **AsyncIO event loop fixes** - resolved event loop issues in integration tests for subprocess worker compatibility
- [x] **Complete test coverage** - all 46 tests passing across unit, integration, and API test suites
- [x] **Type safety verification** - 0 type checking errors across entire ARQ job queue implementation
#### Phase 4.2 Implementation Summary ✅ COMPLETE
**Core ARQ Job Queue System:**
- **JobQueueService** - Centralized service for job enqueueing, status tracking, and Redis pool management
- **deployment_jobs.py** - ARQ job functions that wrap existing deployment/undeploy workflows
- **Worker Settings** - Production-ready ARQ configuration with proper timeouts and error handling
- **Dual-Process Architecture** - Single Docker image with entrypoint.sh supporting both API and worker modes
**Key Files Added:**
- `apps/cloud/src/basic_memory_cloud/jobs/` - Complete job queue implementation (7 files)
- `apps/cloud/entrypoint.sh` - Dual-purpose Docker container entry point
- `apps/cloud/tests/integration/test_worker_integration.py` - Real infrastructure integration tests
- `apps/cloud/src/basic_memory_cloud/schemas/job_responses.py` - API response schemas
**API Integration:**
- Provisioning endpoints return job IDs immediately instead of blocking for 60+ seconds
- Job status API endpoints for real-time monitoring of deployment progress
- Proper error handling and job failure scenarios with detailed error messages
**Testing Achievement:**
- **46 total tests passing** across all test suites (unit, integration, API, services)
- **Real infrastructure testing** - ARQ workers process actual Fly.io deployments
- **Event loop safety** - Fixed asyncio issues for subprocess worker compatibility
- **Test fixture updates** - All fixtures properly support job queue dependencies
- **Type checking** - 0 errors across entire codebase
**Technical Metrics:**
- **38 files changed** - +1,736 insertions, -334 deletions
- **Integration test runtime** - ~18 seconds with real ARQ workers and Fly.io verification
- **Event loop isolation** - Proper async session management for subprocess compatibility
- **Redis integration** - Production-ready Redis configuration with connection pooling
**Phase 4.3: Production Hardening** ✅ COMPLETED
- [x] **Configure Upstash Redis** - production Redis setup on Fly.io
- [x] **Retry logic for external APIs** - exponential backoff for flaky Tigris IAM operations
- [x] **Monitoring and observability** - comprehensive Redis queue monitoring with CLI tools
- [x] **Error handling improvements** - graceful handling of expected API errors with appropriate log levels
- [x] **CLI tooling enhancements** - bulk update commands for CI/CD automation
- [x] **Documentation improvements** - comprehensive monitoring guide with Redis patterns
- [x] **Job uniqueness** - ARQ-based duplicate prevention for tenant operations
- [ ] **Worker scaling** - multiple arq workers for parallel job processing
- [ ] **Job persistence** - ensure jobs survive Redis/worker restarts
- [ ] **Error alerting** - notifications for failed deployment jobs
**Phase 4.4: Advanced Features** (Future)
- [ ] **Job scheduling** - deploy tenants at specific times
- [ ] **Priority queues** - urgent deployments processed first
- [ ] **Batch operations** - bulk tenant deployments
- [ ] **Job dependencies** - deployment → configuration → activation chains
#### Benefits Achieved ✅ REALIZED
**User Experience Improvements:**
- **Immediate API responses** - users get job ID instantly vs waiting 60+ seconds for deployment completion
- **Real-time job tracking** - status API provides live updates on deployment progress
- **Better error visibility** - detailed error messages and job failure tracking
- **CI/CD automation ready** - bulk update commands for automated tenant deployments
**System Reliability:**
- **Redis persistence** - jobs survive Redis/worker restarts with proper queue durability
- **Idempotent job processing** - jobs can be safely retried without side effects
- **Event loop isolation** - worker processes operate independently from API server
- **Retry resilience** - exponential backoff for flaky external API calls (3 attempts, 1s/2s delays)
- **Graceful error handling** - expected API errors logged at INFO level, unexpected at ERROR level
- **Job uniqueness** - prevent duplicate tenant operations with ARQ's built-in uniqueness feature
**Operational Benefits:**
- **Horizontal scaling ready** - architecture supports adding more workers for parallel processing
- **Comprehensive testing** - real infrastructure integration tests ensure production reliability
- **Type safety** - full type checking prevents runtime errors in job processing
- **Clean separation** - API and worker processes use same codebase with different entry points
- **Queue monitoring** - Redis CLI integration for real-time queue activity monitoring
- **Comprehensive documentation** - detailed monitoring guide with Redis pattern explanations
**Development Benefits:**
- **Zero workflow changes** - existing deployment/undeploy workflows work unchanged as background jobs
- **Async/await native** - modern Python asyncio patterns throughout the implementation
- **Event correlation preserved** - all existing workflow tracking and event sourcing continues to work
- **Enhanced CLI tooling** - unified tenant commands with proper endpoint routing
- **Database integrity** - proper foreign key constraint handling in tenant deletion
#### Infrastructure Requirements
- **Local**: Redis via docker-compose (already exists) ✅
- **Production**: Upstash Redis on Fly.io (already configured) ✅
- **Workers**: arq worker processes (new deployment target)
- **Monitoring**: Job status dashboard (simple web interface)
#### API Evolution
```python
# Before: Synchronous (blocks for 60+ seconds)
POST /tenant/{id}/deploy → {status: "active", machine_id: "..."}
# After: Asynchronous (returns immediately)
POST /tenant/{id}/deploy → {job_id: "uuid", workflow_id: "uuid", status: "queued"}
GET  /jobs/{job_id}/status → {status: "running", progress: "deploying_machine", workflow_id: "uuid"}
GET  /workflows/{workflow_id}/events → [...] # Existing event tracking works unchanged
```
**Technology Choice**: **arq (Redis)** over pgqueuer
- **Existing Redis infrastructure** - Upstash + docker-compose already configured
- **Better ecosystem** - monitoring tools, documentation, community
- **Made by pydantic team** - aligns with existing Python stack
- **Hybrid approach** - Redis for queue operations + Postgres for workflow state
#### Job Uniqueness Implementation
**Problem**: Multiple concurrent deployment requests for the same tenant could create duplicate jobs, wasting resources and potentially causing conflicts.
**Solution**: Leverage ARQ's built-in job uniqueness feature using predictable job IDs:
```python
# JobQueueService implementation
async def enqueue_deploy_job(self, tenant_id: UUID, image_tag: str | None = None) -> str:
    unique_job_id = f"deploy-{tenant_id}"
    job = await self.redis_pool.enqueue_job(
        "deploy_tenant_job",
        str(tenant_id),
        image_tag,
        _job_id=unique_job_id,  # ARQ prevents duplicates
    )
    if job is None:
        # Job already exists - return existing job ID
        return unique_job_id
    else:
        # New job created - return ARQ job ID
        return job.job_id
```
**Key Features:**
- **Predictable Job IDs**: `deploy-{tenant_id}`, `undeploy-{tenant_id}`
- **Duplicate Prevention**: ARQ returns `None` for duplicate job IDs
- **Graceful Handling**: Return existing job ID instead of raising errors
- **Idempotent Operations**: Safe to retry deployment requests
- **Clear Logging**: Distinguish "Enqueued new" vs "Found existing" jobs
**Benefits:**
- Prevents resource waste from duplicate deployments
- Eliminates race conditions from concurrent requests
- Makes job monitoring more predictable with consistent IDs
- Provides natural deduplication without complex locking mechanisms
## Notes
### Design Philosophy Lessons
- **Simplicity beats framework magic** - removing DBOS made the system more reliable and debuggable
- **Event sourcing > complex orchestration** - database-backed event tracking provides better observability than framework abstractions
- **Idempotent operations > resumable workflows** - each step handling its own retry logic is simpler than framework-managed resumability
- **Explicit error handling > framework exception handling** - Python exceptions are clearer than orchestration framework error states
### Future Considerations
- **Monitoring integration** - workflow tracking events could feed into observability systems
- **Performance optimization** - event querying patterns may benefit from additional indexing
- **Audit compliance** - complete event trail supports regulatory requirements
- **Operational dashboards** - workflow status could drive tenant health monitoring
### Related Specifications
- **SPEC-8**: TigrisFS Integration - bucket provisioning integrated with deployment workflow
- **SPEC-1**: Specification-Driven Development Process - this spec follows the established format
## Observations
- [architecture] Removing framework complexity led to more maintainable system #simplification
- [workflow] Single unified deployment workflow handles both provisioning and updates #consolidation
- [observability] Event sourcing with workflow correlation provides complete audit trail #event-tracking
- [database] Foreign key relationships between workflows and events enable powerful queries #schema-design
- [testing] Integration tests with real infrastructure catch edge cases that unit tests miss #testing-strategy
- [parameters] Consistent method signatures (workflow_id first) reduce cognitive overhead #api-design
- [maintenance] Fewer workflows and dependencies reduce long-term maintenance burden #operational-excellence
- [debugging] Plain Python exceptions are clearer than framework abstraction layers #developer-experience
- [resilience] Exponential backoff retry patterns handle flaky external API calls gracefully #error-handling
- [monitoring] Redis queue monitoring provides real-time operational visibility #observability
- [ci-cd] Bulk update commands enable automated tenant deployments in continuous delivery pipelines #automation
- [documentation] Comprehensive monitoring guides reduce operational learning curve #knowledge-management
- [error-logging] Context-aware log levels (INFO for expected errors, ERROR for unexpected) improve signal-to-noise ratio #logging-strategy
- [job-uniqueness] ARQ job uniqueness with predictable tenant-based IDs prevents duplicate operations and resource waste #deduplication
## Implementation Notes
### Configuration Integration
- **Redis Configuration**: Add Redis settings to existing `apps/cloud/src/basic_memory_cloud/config.py`
- **Local Development**: Leverage existing Redis setup from `docker-compose.yml`
- **Production**: Use Upstash Redis configuration for production environments
### Docker Entrypoint Strategy
Create `entrypoint.sh` script to toggle between API server and worker processes using single Docker image:
```bash
#!/bin/bash
# Entrypoint script for Basic Memory Cloud service
# Supports multiple process types: api, worker
set -e
case "$1" in
  "api")
    echo "Starting Basic Memory Cloud API server..."
    exec uvicorn basic_memory_cloud.main:app \
      --host 0.0.0.0 \
      --port 8000 \
      --log-level info
    ;;
  "worker")
    echo "Starting Basic Memory Cloud ARQ worker..."
    # For ARQ worker implementation
    exec python -m arq basic_memory_cloud.jobs.settings.WorkerSettings
    ;;
  *)
    echo "Usage: $0 {api|worker}"
    echo "  api    - Start the FastAPI server"
    echo "  worker - Start the ARQ worker"
    exit 1
    ;;
esac
```
### Fly.io Process Groups Configuration
Use separate machine groups for API and worker processes with independent scaling:
```toml
# fly.toml app configuration for basic-memory-cloud
app = 'basic-memory-cloud-dev-basic-machines'
primary_region = 'dfw'
org = 'basic-machines'
kill_signal = 'SIGINT'
kill_timeout = '5s'
[build]
# Process groups for API server and worker
[processes]
  api = "api"
  worker = "worker"
# Machine scaling configuration
[[machine]]
  size = 'shared-cpu-1x'
  processes = ['api']
  min_machines_running = 1
  auto_stop_machines = false
  auto_start_machines = true
[[machine]]
  size = 'shared-cpu-1x'
  processes = ['worker']
  min_machines_running = 1
  auto_stop_machines = false
  auto_start_machines = true
[env]
  # Python configuration
  PYTHONUNBUFFERED = '1'
  PYTHONPATH = '/app'
  # Logging configuration
  LOG_LEVEL = 'DEBUG'
  # Redis configuration for ARQ
  REDIS_URL = 'redis://basic-memory-cloud-redis.upstash.io'
  # Database configuration
  DATABASE_HOST = 'basic-memory-cloud-db-dev-basic-machines.internal'
  DATABASE_PORT = '5432'
  DATABASE_NAME = 'basic_memory_cloud'
  DATABASE_USER = 'postgres'
  DATABASE_SSL = 'true'
  # Worker configuration
  ARQ_MAX_JOBS = '10'
  ARQ_KEEP_RESULT = '3600'
  # Fly.io configuration
  FLY_ORG = 'basic-machines'
  FLY_REGION = 'dfw'
# Internal service - no external HTTP exposure for worker
# API accessible via basic-memory-cloud-dev-basic-machines.flycast:8000
[[vm]]
  size = 'shared-cpu-1x'
```
### Benefits of This Architecture
- **Single Docker Image**: Both API and worker use same container with different entrypoints
- **Independent Scaling**: Scale API and worker processes separately based on demand
- **Clean Separation**: Web traffic handling separate from background job processing
- **Existing Infrastructure**: Leverages current PostgreSQL + Redis setup without complexity
- **Hybrid State Management**: Redis for queue operations, PostgreSQL for persistent workflow tracking
## Relations
- implements [[SPEC-8 TigrisFS Integration]]
- follows [[SPEC-1 Specification-Driven Development Process]]
- supersedes previous multi-workflow architecture
```
--------------------------------------------------------------------------------
/tests/repository/test_entity_repository.py:
--------------------------------------------------------------------------------
```python
"""Tests for the EntityRepository."""
from datetime import datetime, timezone
import pytest
import pytest_asyncio
from sqlalchemy import select
from basic_memory import db
from basic_memory.models import Entity, Observation, Relation, Project
from basic_memory.repository.entity_repository import EntityRepository
from basic_memory.utils import generate_permalink
@pytest_asyncio.fixture
async def entity_with_observations(session_maker, sample_entity):
    """Create an entity with observations."""
    async with db.scoped_session(session_maker) as session:
        observations = [
            Observation(
                entity_id=sample_entity.id,
                content="First observation",
            ),
            Observation(
                entity_id=sample_entity.id,
                content="Second observation",
            ),
        ]
        session.add_all(observations)
        return sample_entity
@pytest_asyncio.fixture
async def related_results(session_maker, test_project: Project):
    """Create entities with relations between them."""
    async with db.scoped_session(session_maker) as session:
        source = Entity(
            project_id=test_project.id,
            title="source",
            entity_type="test",
            permalink="source/source",
            file_path="source/source.md",
            content_type="text/markdown",
            created_at=datetime.now(timezone.utc),
            updated_at=datetime.now(timezone.utc),
        )
        target = Entity(
            project_id=test_project.id,
            title="target",
            entity_type="test",
            permalink="target/target",
            file_path="target/target.md",
            content_type="text/markdown",
            created_at=datetime.now(timezone.utc),
            updated_at=datetime.now(timezone.utc),
        )
        session.add(source)
        session.add(target)
        await session.flush()
        relation = Relation(
            from_id=source.id,
            to_id=target.id,
            to_name=target.title,
            relation_type="connects_to",
        )
        session.add(relation)
        return source, target, relation
@pytest.mark.asyncio
async def test_create_entity(entity_repository: EntityRepository):
    """Test creating a new entity"""
    entity_data = {
        "project_id": entity_repository.project_id,
        "title": "Test",
        "entity_type": "test",
        "permalink": "test/test",
        "file_path": "test/test.md",
        "content_type": "text/markdown",
        "created_at": datetime.now(timezone.utc),
        "updated_at": datetime.now(timezone.utc),
    }
    entity = await entity_repository.create(entity_data)
    # Verify returned object
    assert entity.id is not None
    assert entity.title == "Test"
    assert isinstance(entity.created_at, datetime)
    assert isinstance(entity.updated_at, datetime)
    # Verify in database
    found = await entity_repository.find_by_id(entity.id)
    assert found is not None
    assert found.id is not None
    assert found.id == entity.id
    assert found.title == entity.title
    # assert relations are eagerly loaded
    assert len(entity.observations) == 0
    assert len(entity.relations) == 0
@pytest.mark.asyncio
async def test_create_all(entity_repository: EntityRepository):
    """Test creating a new entity"""
    entity_data = [
        {
            "project_id": entity_repository.project_id,
            "title": "Test_1",
            "entity_type": "test",
            "permalink": "test/test-1",
            "file_path": "test/test_1.md",
            "content_type": "text/markdown",
            "created_at": datetime.now(timezone.utc),
            "updated_at": datetime.now(timezone.utc),
        },
        {
            "project_id": entity_repository.project_id,
            "title": "Test-2",
            "entity_type": "test",
            "permalink": "test/test-2",
            "file_path": "test/test_2.md",
            "content_type": "text/markdown",
            "created_at": datetime.now(timezone.utc),
            "updated_at": datetime.now(timezone.utc),
        },
    ]
    entities = await entity_repository.create_all(entity_data)
    assert len(entities) == 2
    entity = entities[0]
    # Verify in database
    found = await entity_repository.find_by_id(entity.id)
    assert found is not None
    assert found.id is not None
    assert found.id == entity.id
    assert found.title == entity.title
    # assert relations are eagerly loaded
    assert len(entity.observations) == 0
    assert len(entity.relations) == 0
@pytest.mark.asyncio
async def test_find_by_id(entity_repository: EntityRepository, sample_entity: Entity):
    """Test finding an entity by ID"""
    found = await entity_repository.find_by_id(sample_entity.id)
    assert found is not None
    assert found.id == sample_entity.id
    assert found.title == sample_entity.title
    # Verify against direct database query
    async with db.scoped_session(entity_repository.session_maker) as session:
        stmt = select(Entity).where(Entity.id == sample_entity.id)
        result = await session.execute(stmt)
        db_entity = result.scalar_one()
        assert db_entity.id == found.id
        assert db_entity.title == found.title
@pytest.mark.asyncio
async def test_update_entity(entity_repository: EntityRepository, sample_entity: Entity):
    """Test updating an entity"""
    updated = await entity_repository.update(sample_entity.id, {"title": "Updated title"})
    assert updated is not None
    assert updated.title == "Updated title"
    # Verify in database
    async with db.scoped_session(entity_repository.session_maker) as session:
        stmt = select(Entity).where(Entity.id == sample_entity.id)
        result = await session.execute(stmt)
        db_entity = result.scalar_one()
        assert db_entity.title == "Updated title"
@pytest.mark.asyncio
async def test_delete_entity(entity_repository: EntityRepository, sample_entity):
    """Test deleting an entity."""
    result = await entity_repository.delete(sample_entity.id)
    assert result is True
    # Verify deletion
    deleted = await entity_repository.find_by_id(sample_entity.id)
    assert deleted is None
@pytest.mark.asyncio
async def test_delete_entity_with_observations(
    entity_repository: EntityRepository, entity_with_observations
):
    """Test deleting an entity cascades to its observations."""
    entity = entity_with_observations
    result = await entity_repository.delete(entity.id)
    assert result is True
    # Verify entity deletion
    deleted = await entity_repository.find_by_id(entity.id)
    assert deleted is None
    # Verify observations were cascaded
    async with db.scoped_session(entity_repository.session_maker) as session:
        query = select(Observation).filter(Observation.entity_id == entity.id)
        result = await session.execute(query)
        remaining_observations = result.scalars().all()
        assert len(remaining_observations) == 0
@pytest.mark.asyncio
async def test_delete_entities_by_type(entity_repository: EntityRepository, sample_entity):
    """Test deleting entities by type."""
    result = await entity_repository.delete_by_fields(entity_type=sample_entity.entity_type)
    assert result is True
    # Verify deletion
    async with db.scoped_session(entity_repository.session_maker) as session:
        query = select(Entity).filter(Entity.entity_type == sample_entity.entity_type)
        result = await session.execute(query)
        remaining = result.scalars().all()
        assert len(remaining) == 0
@pytest.mark.asyncio
async def test_delete_entity_with_relations(entity_repository: EntityRepository, related_results):
    """Test deleting an entity cascades to its relations."""
    source, target, relation = related_results
    # Delete source entity
    result = await entity_repository.delete(source.id)
    assert result is True
    # Verify relation was cascaded
    async with db.scoped_session(entity_repository.session_maker) as session:
        query = select(Relation).filter(Relation.from_id == source.id)
        result = await session.execute(query)
        remaining_relations = result.scalars().all()
        assert len(remaining_relations) == 0
        # Verify target entity still exists
        target_exists = await entity_repository.find_by_id(target.id)
        assert target_exists is not None
@pytest.mark.asyncio
async def test_delete_nonexistent_entity(entity_repository: EntityRepository):
    """Test deleting an entity that doesn't exist."""
    result = await entity_repository.delete(0)
    assert result is False
@pytest_asyncio.fixture
async def test_entities(session_maker, test_project: Project):
    """Create multiple test entities."""
    async with db.scoped_session(session_maker) as session:
        entities = [
            Entity(
                project_id=test_project.id,
                title="entity1",
                entity_type="test",
                permalink="type1/entity1",
                file_path="type1/entity1.md",
                content_type="text/markdown",
                created_at=datetime.now(timezone.utc),
                updated_at=datetime.now(timezone.utc),
            ),
            Entity(
                project_id=test_project.id,
                title="entity2",
                entity_type="test",
                permalink="type1/entity2",
                file_path="type1/entity2.md",
                content_type="text/markdown",
                created_at=datetime.now(timezone.utc),
                updated_at=datetime.now(timezone.utc),
            ),
            Entity(
                project_id=test_project.id,
                title="entity3",
                entity_type="test",
                permalink="type2/entity3",
                file_path="type2/entity3.md",
                content_type="text/markdown",
                created_at=datetime.now(timezone.utc),
                updated_at=datetime.now(timezone.utc),
            ),
        ]
        session.add_all(entities)
        return entities
@pytest.mark.asyncio
async def test_find_by_permalinks(entity_repository: EntityRepository, test_entities):
    """Test finding multiple entities by their type/name pairs."""
    # Test finding multiple entities
    permalinks = [e.permalink for e in test_entities]
    found = await entity_repository.find_by_permalinks(permalinks)
    assert len(found) == 3
    names = {e.title for e in found}
    assert names == {"entity1", "entity2", "entity3"}
    # Test finding subset of entities
    permalinks = [e.permalink for e in test_entities if e.title != "entity2"]
    found = await entity_repository.find_by_permalinks(permalinks)
    assert len(found) == 2
    names = {e.title for e in found}
    assert names == {"entity1", "entity3"}
    # Test with non-existent entities
    permalinks = ["type1/entity1", "type3/nonexistent"]
    found = await entity_repository.find_by_permalinks(permalinks)
    assert len(found) == 1
    assert found[0].title == "entity1"
    # Test empty input
    found = await entity_repository.find_by_permalinks([])
    assert len(found) == 0
@pytest.mark.asyncio
async def test_generate_permalink_from_file_path():
    """Test permalink generation from different file paths."""
    test_cases = [
        ("docs/My Feature.md", "docs/my-feature"),
        ("specs/API (v2).md", "specs/api-v2"),
        ("notes/2024/Q1 Planning!!!.md", "notes/2024/q1-planning"),
        ("test/Über File.md", "test/uber-file"),
        ("docs/my_feature_name.md", "docs/my-feature-name"),
        ("specs/multiple--dashes.md", "specs/multiple-dashes"),
        ("notes/trailing/space/ file.md", "notes/trailing/space/file"),
    ]
    for input_path, expected in test_cases:
        result = generate_permalink(input_path)
        assert result == expected, f"Failed for {input_path}"
        # Verify the result passes validation
        Entity(
            title="test",
            entity_type="test",
            permalink=result,
            file_path=input_path,
            content_type="text/markdown",
        )  # This will raise ValueError if invalid
@pytest.mark.asyncio
async def test_get_by_title(entity_repository: EntityRepository, session_maker):
    """Test getting an entity by title."""
    # Create test entities
    async with db.scoped_session(session_maker) as session:
        entities = [
            Entity(
                project_id=entity_repository.project_id,
                title="Unique Title",
                entity_type="test",
                permalink="test/unique-title",
                file_path="test/unique-title.md",
                content_type="text/markdown",
                created_at=datetime.now(timezone.utc),
                updated_at=datetime.now(timezone.utc),
            ),
            Entity(
                project_id=entity_repository.project_id,
                title="Another Title",
                entity_type="test",
                permalink="test/another-title",
                file_path="test/another-title.md",
                content_type="text/markdown",
                created_at=datetime.now(timezone.utc),
                updated_at=datetime.now(timezone.utc),
            ),
            Entity(
                project_id=entity_repository.project_id,
                title="Another Title",
                entity_type="test",
                permalink="test/another-title-1",
                file_path="test/another-title-1.md",
                content_type="text/markdown",
                created_at=datetime.now(timezone.utc),
                updated_at=datetime.now(timezone.utc),
            ),
        ]
        session.add_all(entities)
        await session.flush()
    # Test getting by exact title
    found = await entity_repository.get_by_title("Unique Title")
    assert found is not None
    assert len(found) == 1
    assert found[0].title == "Unique Title"
    # Test case sensitivity
    found = await entity_repository.get_by_title("unique title")
    assert not found  # Should be case-sensitive
    # Test non-existent title
    found = await entity_repository.get_by_title("Non Existent")
    assert not found
    # Test multiple rows found
    found = await entity_repository.get_by_title("Another Title")
    assert len(found) == 2
@pytest.mark.asyncio
async def test_get_by_file_path(entity_repository: EntityRepository, session_maker):
    """Test getting an entity by title."""
    # Create test entities
    async with db.scoped_session(session_maker) as session:
        entities = [
            Entity(
                project_id=entity_repository.project_id,
                title="Unique Title",
                entity_type="test",
                permalink="test/unique-title",
                file_path="test/unique-title.md",
                content_type="text/markdown",
                created_at=datetime.now(timezone.utc),
                updated_at=datetime.now(timezone.utc),
            ),
        ]
        session.add_all(entities)
        await session.flush()
    # Test getting by file_path
    found = await entity_repository.get_by_file_path("test/unique-title.md")
    assert found is not None
    assert found.title == "Unique Title"
    # Test non-existent file_path
    found = await entity_repository.get_by_file_path("not/a/real/file.md")
    assert found is None
@pytest.mark.asyncio
async def test_get_distinct_directories(entity_repository: EntityRepository, session_maker):
    """Test getting distinct directory paths from entity file paths."""
    # Create test entities with various directory structures
    async with db.scoped_session(session_maker) as session:
        entities = [
            Entity(
                project_id=entity_repository.project_id,
                title="File 1",
                entity_type="test",
                permalink="docs/guides/file1",
                file_path="docs/guides/file1.md",
                content_type="text/markdown",
                created_at=datetime.now(timezone.utc),
                updated_at=datetime.now(timezone.utc),
            ),
            Entity(
                project_id=entity_repository.project_id,
                title="File 2",
                entity_type="test",
                permalink="docs/guides/file2",
                file_path="docs/guides/file2.md",
                content_type="text/markdown",
                created_at=datetime.now(timezone.utc),
                updated_at=datetime.now(timezone.utc),
            ),
            Entity(
                project_id=entity_repository.project_id,
                title="File 3",
                entity_type="test",
                permalink="docs/api/file3",
                file_path="docs/api/file3.md",
                content_type="text/markdown",
                created_at=datetime.now(timezone.utc),
                updated_at=datetime.now(timezone.utc),
            ),
            Entity(
                project_id=entity_repository.project_id,
                title="File 4",
                entity_type="test",
                permalink="specs/file4",
                file_path="specs/file4.md",
                content_type="text/markdown",
                created_at=datetime.now(timezone.utc),
                updated_at=datetime.now(timezone.utc),
            ),
            Entity(
                project_id=entity_repository.project_id,
                title="File 5",
                entity_type="test",
                permalink="notes/2024/q1/file5",
                file_path="notes/2024/q1/file5.md",
                content_type="text/markdown",
                created_at=datetime.now(timezone.utc),
                updated_at=datetime.now(timezone.utc),
            ),
        ]
        session.add_all(entities)
        await session.flush()
    # Get distinct directories
    directories = await entity_repository.get_distinct_directories()
    # Verify directories are extracted correctly
    assert isinstance(directories, list)
    assert len(directories) > 0
    # Should include all parent directories but not filenames
    expected_dirs = {
        "docs",
        "docs/guides",
        "docs/api",
        "notes",
        "notes/2024",
        "notes/2024/q1",
        "specs",
    }
    assert set(directories) == expected_dirs
    # Verify results are sorted
    assert directories == sorted(directories)
    # Verify no file paths are included
    for dir_path in directories:
        assert not dir_path.endswith(".md")
@pytest.mark.asyncio
async def test_get_distinct_directories_empty_db(entity_repository: EntityRepository):
    """Test getting distinct directories when database is empty."""
    directories = await entity_repository.get_distinct_directories()
    assert directories == []
@pytest.mark.asyncio
async def test_find_by_directory_prefix(entity_repository: EntityRepository, session_maker):
    """Test finding entities by directory prefix."""
    # Create test entities in various directories
    async with db.scoped_session(session_maker) as session:
        entities = [
            Entity(
                project_id=entity_repository.project_id,
                title="File 1",
                entity_type="test",
                permalink="docs/file1",
                file_path="docs/file1.md",
                content_type="text/markdown",
                created_at=datetime.now(timezone.utc),
                updated_at=datetime.now(timezone.utc),
            ),
            Entity(
                project_id=entity_repository.project_id,
                title="File 2",
                entity_type="test",
                permalink="docs/guides/file2",
                file_path="docs/guides/file2.md",
                content_type="text/markdown",
                created_at=datetime.now(timezone.utc),
                updated_at=datetime.now(timezone.utc),
            ),
            Entity(
                project_id=entity_repository.project_id,
                title="File 3",
                entity_type="test",
                permalink="docs/api/file3",
                file_path="docs/api/file3.md",
                content_type="text/markdown",
                created_at=datetime.now(timezone.utc),
                updated_at=datetime.now(timezone.utc),
            ),
            Entity(
                project_id=entity_repository.project_id,
                title="File 4",
                entity_type="test",
                permalink="specs/file4",
                file_path="specs/file4.md",
                content_type="text/markdown",
                created_at=datetime.now(timezone.utc),
                updated_at=datetime.now(timezone.utc),
            ),
        ]
        session.add_all(entities)
        await session.flush()
    # Test finding all entities in "docs" directory and subdirectories
    docs_entities = await entity_repository.find_by_directory_prefix("docs")
    assert len(docs_entities) == 3
    file_paths = {e.file_path for e in docs_entities}
    assert file_paths == {"docs/file1.md", "docs/guides/file2.md", "docs/api/file3.md"}
    # Test finding entities in "docs/guides" subdirectory
    guides_entities = await entity_repository.find_by_directory_prefix("docs/guides")
    assert len(guides_entities) == 1
    assert guides_entities[0].file_path == "docs/guides/file2.md"
    # Test finding entities in "specs" directory
    specs_entities = await entity_repository.find_by_directory_prefix("specs")
    assert len(specs_entities) == 1
    assert specs_entities[0].file_path == "specs/file4.md"
    # Test with root directory (empty string)
    all_entities = await entity_repository.find_by_directory_prefix("")
    assert len(all_entities) == 4
    # Test with root directory (slash)
    all_entities = await entity_repository.find_by_directory_prefix("/")
    assert len(all_entities) == 4
    # Test with non-existent directory
    nonexistent = await entity_repository.find_by_directory_prefix("nonexistent")
    assert len(nonexistent) == 0
@pytest.mark.asyncio
async def test_find_by_directory_prefix_basic_fields_only(
    entity_repository: EntityRepository, session_maker
):
    """Test that find_by_directory_prefix returns basic entity fields.
    Note: This method uses use_query_options=False for performance,
    so it doesn't eager load relationships. Directory trees only need
    basic entity fields.
    """
    # Create test entity
    async with db.scoped_session(session_maker) as session:
        entity = Entity(
            project_id=entity_repository.project_id,
            title="Test Entity",
            entity_type="test",
            permalink="docs/test",
            file_path="docs/test.md",
            content_type="text/markdown",
            created_at=datetime.now(timezone.utc),
            updated_at=datetime.now(timezone.utc),
        )
        session.add(entity)
        await session.flush()
    # Query entity by directory prefix
    entities = await entity_repository.find_by_directory_prefix("docs")
    assert len(entities) == 1
    # Verify basic fields are present (all we need for directory trees)
    entity = entities[0]
    assert entity.title == "Test Entity"
    assert entity.file_path == "docs/test.md"
    assert entity.permalink == "docs/test"
    assert entity.entity_type == "test"
    assert entity.content_type == "text/markdown"
    assert entity.updated_at is not None
@pytest.mark.asyncio
async def test_get_all_file_paths(entity_repository: EntityRepository, session_maker):
    """Test getting all file paths for deletion detection during sync."""
    # Create test entities with various file paths
    async with db.scoped_session(session_maker) as session:
        entities = [
            Entity(
                project_id=entity_repository.project_id,
                title="File 1",
                entity_type="test",
                permalink="docs/file1",
                file_path="docs/file1.md",
                content_type="text/markdown",
                created_at=datetime.now(timezone.utc),
                updated_at=datetime.now(timezone.utc),
            ),
            Entity(
                project_id=entity_repository.project_id,
                title="File 2",
                entity_type="test",
                permalink="specs/file2",
                file_path="specs/file2.md",
                content_type="text/markdown",
                created_at=datetime.now(timezone.utc),
                updated_at=datetime.now(timezone.utc),
            ),
            Entity(
                project_id=entity_repository.project_id,
                title="File 3",
                entity_type="test",
                permalink="notes/file3",
                file_path="notes/file3.md",
                content_type="text/markdown",
                created_at=datetime.now(timezone.utc),
                updated_at=datetime.now(timezone.utc),
            ),
        ]
        session.add_all(entities)
        await session.flush()
    # Get all file paths
    file_paths = await entity_repository.get_all_file_paths()
    # Verify results
    assert isinstance(file_paths, list)
    assert len(file_paths) == 3
    assert set(file_paths) == {"docs/file1.md", "specs/file2.md", "notes/file3.md"}
@pytest.mark.asyncio
async def test_get_all_file_paths_empty_db(entity_repository: EntityRepository):
    """Test getting all file paths when database is empty."""
    file_paths = await entity_repository.get_all_file_paths()
    assert file_paths == []
@pytest.mark.asyncio
async def test_get_all_file_paths_performance(entity_repository: EntityRepository, session_maker):
    """Test that get_all_file_paths doesn't load entities or relationships.
    This method is optimized for deletion detection during streaming sync.
    It should only query file_path strings, not full entity objects.
    """
    # Create test entity with observations and relations
    async with db.scoped_session(session_maker) as session:
        # Create entities
        entity1 = Entity(
            project_id=entity_repository.project_id,
            title="Entity 1",
            entity_type="test",
            permalink="test/entity1",
            file_path="test/entity1.md",
            content_type="text/markdown",
            created_at=datetime.now(timezone.utc),
            updated_at=datetime.now(timezone.utc),
        )
        entity2 = Entity(
            project_id=entity_repository.project_id,
            title="Entity 2",
            entity_type="test",
            permalink="test/entity2",
            file_path="test/entity2.md",
            content_type="text/markdown",
            created_at=datetime.now(timezone.utc),
            updated_at=datetime.now(timezone.utc),
        )
        session.add_all([entity1, entity2])
        await session.flush()
        # Add observations to entity1
        observation = Observation(
            entity_id=entity1.id,
            content="Test observation",
            category="note",
        )
        session.add(observation)
        # Add relation between entities
        relation = Relation(
            from_id=entity1.id,
            to_id=entity2.id,
            to_name=entity2.title,
            relation_type="relates_to",
        )
        session.add(relation)
        await session.flush()
    # Get all file paths - should be fast and not load relationships
    file_paths = await entity_repository.get_all_file_paths()
    # Verify results - just file paths, no entities or relationships loaded
    assert len(file_paths) == 2
    assert set(file_paths) == {"test/entity1.md", "test/entity2.md"}
    # Result should be list of strings, not entity objects
    for path in file_paths:
        assert isinstance(path, str)
@pytest.mark.asyncio
async def test_get_all_file_paths_project_isolation(
    entity_repository: EntityRepository, session_maker
):
    """Test that get_all_file_paths only returns paths from the current project."""
    # Create entities in the repository's project
    async with db.scoped_session(session_maker) as session:
        entity1 = Entity(
            project_id=entity_repository.project_id,
            title="Project 1 File",
            entity_type="test",
            permalink="test/file1",
            file_path="test/file1.md",
            content_type="text/markdown",
            created_at=datetime.now(timezone.utc),
            updated_at=datetime.now(timezone.utc),
        )
        session.add(entity1)
        await session.flush()
        # Create a second project
        project2 = Project(name="other-project", path="/tmp/other")
        session.add(project2)
        await session.flush()
        # Create entity in different project
        entity2 = Entity(
            project_id=project2.id,
            title="Project 2 File",
            entity_type="test",
            permalink="test/file2",
            file_path="test/file2.md",
            content_type="text/markdown",
            created_at=datetime.now(timezone.utc),
            updated_at=datetime.now(timezone.utc),
        )
        session.add(entity2)
        await session.flush()
    # Get all file paths for project 1
    file_paths = await entity_repository.get_all_file_paths()
    # Should only include files from project 1
    assert len(file_paths) == 1
    assert file_paths == ["test/file1.md"]
```
--------------------------------------------------------------------------------
/specs/SPEC-16 MCP Cloud Service Consolidation.md:
--------------------------------------------------------------------------------
```markdown
---
title: 'SPEC-16: MCP Cloud Service Consolidation'
type: spec
permalink: specs/spec-16-mcp-cloud-service-consolidation
tags:
- architecture
- mcp
- cloud
- performance
- deployment
status: in-progress
---
## Status Update
**Phase 0 (Basic Memory Refactor): ✅ COMPLETE**
- basic-memory PR #344: async_client context manager pattern implemented
- All 17 MCP tools updated to use `async with get_client() as client:`
- CLI commands updated to use context manager
- Removed `inject_auth_header()` and `headers.py` (~100 lines deleted)
- Factory pattern enables clean dependency injection
- Tests passing, typecheck clean
**Phase 0 Integration: ✅ COMPLETE**
- basic-memory-cloud updated to use async-client-context-manager branch
- Implemented `tenant_direct_client_factory()` with proper context manager pattern
- Removed module-level client override hacks
- Removed unnecessary `/proxy` prefix stripping (tools pass relative URLs)
- Typecheck and lint passing with proper noqa hints
- MCP tools confirmed working via inspector (local testing)
**Phase 1 (Code Consolidation): ✅ COMPLETE**
- MCP server mounted on Cloud FastAPI app at /mcp endpoint
- AuthKitProvider configured with WorkOS settings
- Combined lifespans (Cloud + MCP) working correctly
- JWT context middleware integrated
- All routes and MCP tools functional
**Phase 2 (Direct Tenant Transport): ✅ COMPLETE**
- TenantDirectTransport implemented with custom httpx transport
- Per-request JWT extraction via FastMCP DI
- Tenant lookup and signed header generation working
- Direct routing to tenant APIs (eliminating HTTP hop)
- Transport tests passing (11/11)
**Phase 3 (Testing & Validation): ✅ COMPLETE**
- Typecheck and lint passing across all services
- MCP OAuth authentication working in preview environment
- Tenant isolation via signed headers verified
- Fixed BM_TENANT_HEADER_SECRET mismatch between environments
- MCP tools successfully calling tenant APIs in preview
**Phase 4 (Deployment Configuration): ✅ COMPLETE**
- Updated apps/cloud/fly.template.toml with MCP environment variables
- Added HTTP/2 backend support for better MCP performance
- Added OAuth protected resource health check
- Removed MCP from preview deployment workflow
- Successfully deployed to preview environment (PR #113)
- All services operational at pr-113-basic-memory-cloud.fly.dev
**Next Steps:**
- Phase 5: Cleanup (remove apps/mcp directory)
- Phase 6: Production rollout and performance measurement
# SPEC-16: MCP Cloud Service Consolidation
## Why
### Original Architecture Constraints (Now Removed)
The current architecture deploys MCP Gateway and Cloud Service as separate Fly.io apps:
**Current Flow:**
```
LLM Client → MCP Gateway (OAuth) → Cloud Proxy (JWT + header signing) → Tenant API (JWT + header validation)
            apps/mcp                apps/cloud /proxy                    apps/api
```
This separation was originally necessary because:
1. **Stateful SSE requirement** - MCP needed server-sent events with session state for active project tracking
2. **fastmcp.run limitation** - The FastMCP demo helper didn't support worker processes
### Why These Constraints No Longer Apply
1. **State externalized** - Project state moved from in-memory to LLM context (external state)
2. **HTTP transport enabled** - Switched from SSE to stateless HTTP for MCP tools
3. **Worker support added** - Converted from `fastmcp.run()` to `uvicorn.run()` with workers
### Current Problems
- **Unnecessary HTTP hop** - MCP tools call Cloud /proxy endpoint which calls tenant API
- **Higher latency** - Extra network round trip for every MCP operation
- **Increased costs** - Two separate Fly.io apps instead of one
- **Complex deployment** - Two services to deploy, monitor, and maintain
- **Resource waste** - Separate database connections, HTTP clients, telemetry overhead
## What
### Services Affected
1. **apps/mcp** - MCP Gateway service (to be merged)
2. **apps/cloud** - Cloud service (will receive MCP functionality)
3. **basic-memory** - Update `async_client.py` to use direct calls
4. **Deployment** - Consolidate Fly.io deployment to single app
### Components Changed
**Merged:**
- MCP middleware and telemetry into Cloud app
- MCP tools mounted on Cloud FastAPI instance
- ProxyService used directly by MCP tools (not via HTTP)
**Kept:**
- `/proxy` endpoint (still needed by web UI)
- All existing Cloud routes (provisioning, webhooks, etc.)
- Dual validation in tenant API (JWT + signed headers)
**Removed:**
- apps/mcp directory
- Separate MCP Fly.io deployment
- HTTP calls from MCP tools to /proxy endpoint
## How (High Level)
### 1. Mount FastMCP on Cloud FastAPI App
```python
# apps/cloud/src/basic_memory_cloud/main.py
from basic_memory.mcp.server import mcp
from basic_memory_cloud_mcp.middleware import TelemetryMiddleware
# Configure MCP OAuth
auth_provider = AuthKitProvider(
    authkit_domain=settings.authkit_domain,
    base_url=settings.authkit_base_url,
    required_scopes=[],
)
mcp.auth = auth_provider
mcp.add_middleware(TelemetryMiddleware())
# Mount MCP at /mcp endpoint
mcp_app = mcp.http_app(path="/mcp", stateless_http=True)
app.mount("/mcp", mcp_app)
# Existing Cloud routes stay at root
app.include_router(proxy_router)
app.include_router(provisioning_router)
# ... etc
```
### 2. Direct Tenant Transport (No HTTP Hop)
Instead of calling `/proxy`, MCP tools call tenant APIs directly via custom httpx transport.
**Important:** No URL prefix stripping needed. The transport receives relative URLs like `/main/resource/notes/my-note` which are correctly routed to tenant APIs. The `/proxy` prefix only exists for web UI requests to the proxy router, not for MCP tools using the custom transport.
```python
# apps/cloud/src/basic_memory_cloud/transports/tenant_direct.py
from httpx import AsyncBaseTransport, Request, Response
from fastmcp.server.dependencies import get_http_headers
import jwt
class TenantDirectTransport(AsyncBaseTransport):
    """Direct transport to tenant APIs, bypassing /proxy endpoint."""
    async def handle_async_request(self, request: Request) -> Response:
        # 1. Get JWT from current MCP request (via FastMCP DI)
        http_headers = get_http_headers()
        auth_header = http_headers.get("authorization") or http_headers.get("Authorization")
        token = auth_header.replace("Bearer ", "")
        claims = jwt.decode(token, options={"verify_signature": False})
        workos_user_id = claims["sub"]
        # 2. Look up tenant for user
        tenant = await tenant_service.get_tenant_by_user_id(workos_user_id)
        # 3. Build tenant app URL with signed headers
        fly_app_name = f"{settings.tenant_prefix}-{tenant.id}"
        target_url = f"https://{fly_app_name}.fly.dev{request.url.path}"
        headers = dict(request.headers)
        signer = create_signer(settings.bm_tenant_header_secret)
        headers.update(signer.sign_tenant_headers(tenant.id))
        # 4. Make direct call to tenant API
        response = await self.client.request(
            method=request.method, url=target_url,
            headers=headers, content=request.content
        )
        return response
```
Then configure basic-memory's client factory before mounting MCP:
```python
# apps/cloud/src/basic_memory_cloud/main.py
from contextlib import asynccontextmanager
from basic_memory.mcp import async_client
from basic_memory_cloud.transports.tenant_direct import TenantDirectTransport
# Configure factory for basic-memory's async_client
@asynccontextmanager
async def tenant_direct_client_factory():
    """Factory for creating clients with tenant direct transport."""
    client = httpx.AsyncClient(
        transport=TenantDirectTransport(),
        base_url="http://direct",
    )
    try:
        yield client
    finally:
        await client.aclose()
# Set factory BEFORE importing MCP tools
async_client.set_client_factory(tenant_direct_client_factory)
# NOW import - tools will use our factory
import basic_memory.mcp.tools
import basic_memory.mcp.prompts
from basic_memory.mcp.server import mcp
# Mount MCP - tools use direct transport via factory
app.mount("/mcp", mcp_app)
```
**Key benefits:**
- Clean dependency injection via factory pattern
- Per-request tenant resolution via FastMCP DI
- Proper resource cleanup (client.aclose() guaranteed)
- Eliminates HTTP hop entirely
- /proxy endpoint remains for web UI
### 3. Keep /proxy Endpoint for Web UI
The existing `/proxy` HTTP endpoint remains functional for:
- Web UI requests
- Future external API consumers
- Backward compatibility
### 4. Security: Maintain Dual Validation
**Do NOT remove JWT validation from tenant API.** Keep defense in depth:
```python
# apps/api - Keep both validations
1. JWT validation (from WorkOS token)
2. Signed header validation (from Cloud/MCP)
```
This ensures if the Cloud service is compromised, attackers still cannot access tenant APIs without valid JWTs.
### 5. Deployment Changes
**Before:**
- `apps/mcp/fly.template.toml` → MCP Gateway deployment
- `apps/cloud/fly.template.toml` → Cloud Service deployment
**After:**
- Remove `apps/mcp/fly.template.toml`
- Update `apps/cloud/fly.template.toml` to expose port 8000 for both /mcp and /proxy
- Update deployment scripts to deploy single consolidated app
## Basic Memory Dependency: Async Client Refactor
### Problem
The current `basic_memory.mcp.async_client` creates a module-level `client` at import time:
```python
client = create_client()  # Runs immediately when module is imported
```
This prevents dependency injection - by the time we can override it, tools have already imported it.
### Solution: Context Manager Pattern with Auth at Client Creation
Refactor basic-memory to use httpx's context manager pattern instead of module-level client.
**Key principle:** Authentication happens at client creation time, not per-request.
```python
# basic_memory/src/basic_memory/mcp/async_client.py
from contextlib import asynccontextmanager
from httpx import AsyncClient, ASGITransport, Timeout
# Optional factory override for dependency injection
_client_factory = None
def set_client_factory(factory):
    """Override the default client factory (for cloud app, testing, etc)."""
    global _client_factory
    _client_factory = factory
@asynccontextmanager
async def get_client():
    """Get an AsyncClient as a context manager.
    Usage:
        async with get_client() as client:
            response = await client.get(...)
    """
    if _client_factory:
        # Cloud app: custom transport handles everything
        async with _client_factory() as client:
            yield client
    else:
        # Default: create based on config
        config = ConfigManager().config
        timeout = Timeout(connect=10.0, read=30.0, write=30.0, pool=30.0)
        if config.cloud_mode_enabled:
            # CLI cloud mode: inject auth when creating client
            from basic_memory.cli.auth import CLIAuth
            auth = CLIAuth(
                client_id=config.cloud_client_id,
                authkit_domain=config.cloud_domain
            )
            token = await auth.get_valid_token()
            if not token:
                raise RuntimeError(
                    "Cloud mode enabled but not authenticated. "
                    "Run 'basic-memory cloud login' first."
                )
            # Auth header set ONCE at client creation
            async with AsyncClient(
                base_url=f"{config.cloud_host}/proxy",
                headers={"Authorization": f"Bearer {token}"},
                timeout=timeout
            ) as client:
                yield client
        else:
            # Local mode: ASGI transport
            async with AsyncClient(
                transport=ASGITransport(app=fastapi_app),
                base_url="http://test",
                timeout=timeout
            ) as client:
                yield client
```
**Tool Updates:**
```python
# Before: from basic_memory.mcp.async_client import client
from basic_memory.mcp.async_client import get_client
async def read_note(...):
    # Before: response = await call_get(client, path, ...)
    async with get_client() as client:
        response = await call_get(client, path, ...)
        # ... use response
```
**Cloud Usage:**
```python
from contextlib import asynccontextmanager
from basic_memory.mcp import async_client
@asynccontextmanager
async def tenant_direct_client():
    """Factory for creating clients with tenant direct transport."""
    client = httpx.AsyncClient(
        transport=TenantDirectTransport(),
        base_url="http://direct",
    )
    try:
        yield client
    finally:
        await client.aclose()
# Before importing MCP tools:
async_client.set_client_factory(tenant_direct_client)
# Now import - tools will use our factory
import basic_memory.mcp.tools
```
### Benefits
- **No module-level state** - client created only when needed
- **Proper cleanup** - context manager ensures `aclose()` is called
- **Easy dependency injection** - factory pattern allows custom clients
- **httpx best practices** - follows official recommendations
- **Works for all modes** - stdio, cloud, testing
### Architecture Simplification: Auth at Client Creation
**Key design principle:** Authentication happens when creating the client, not on every request.
**Three modes, three approaches:**
1. **Local mode (ASGI)**
   - No auth needed
   - Direct in-process calls via ASGITransport
2. **CLI cloud mode (HTTP)**
   - Auth token from CLIAuth (stored in ~/.basic-memory/basic-memory-cloud.json)
   - Injected as default header when creating AsyncClient
   - Single auth check at client creation time
3. **Cloud app mode (Custom Transport)**
   - TenantDirectTransport handles everything
   - Extracts JWT from FastMCP context per-request
   - No interaction with inject_auth_header() logic
**What this removes:**
- `src/basic_memory/mcp/tools/headers.py` - entire file deleted
- `inject_auth_header()` calls in all request helpers (call_get, call_post, etc.)
- Per-request header manipulation complexity
- Circular dependency concerns between async_client and auth logic
**Benefits:**
- Cleaner separation of concerns
- Simpler request helper functions
- Auth happens at the right layer (client creation)
- Cloud app transport is completely independent
### Refactor Summary
This refactor achieves:
**Simplification:**
- Removes ~100 lines of per-request header injection logic
- Deletes entire `headers.py` module
- Auth happens once at client creation, not per-request
**Decoupling:**
- Cloud app's custom transport is completely independent
- No interaction with basic-memory's auth logic
- Each mode (local, CLI cloud, cloud app) has clean separation
**Better Design:**
- Follows httpx best practices (context managers)
- Proper resource cleanup (client.aclose() guaranteed)
- Easier testing via factory injection
- No circular import risks
**Three Distinct Modes:**
1. Local: ASGI transport, no auth
2. CLI cloud: HTTP transport with CLIAuth token injection
3. Cloud app: Custom transport with per-request tenant routing
### Implementation Plan Summary
1. Create branch `async-client-context-manager` in basic-memory
2. Update `async_client.py` with context manager pattern and CLIAuth integration
3. Remove `inject_auth_header()` from all request helpers
4. Delete `src/basic_memory/mcp/tools/headers.py`
5. Update all MCP tools to use `async with get_client() as client:`
6. Update CLI commands to use context manager and remove manual auth
7. Remove `api_url` config field
8. Update tests
9. Update basic-memory-cloud to use branch: `basic-memory @ git+https://github.com/basicmachines-co/basic-memory.git@async-client-context-manager`
Detailed breakdown in Phase 0 tasks below.
### Implementation Notes
**Potential Issues & Solutions:**
1. **Circular Import** (async_client imports CLIAuth)
   - **Risk:** CLIAuth might import something from async_client
   - **Solution:** Use lazy import inside `get_client()` function
   - **Already done:** Import is inside the function, not at module level
2. **Test Fixtures**
   - **Risk:** Tests using module-level client will break
   - **Solution:** Update fixtures to use factory pattern
   - **Example:**
     ```python
     @pytest.fixture
     def mock_client_factory():
         @asynccontextmanager
         async def factory():
             async with AsyncClient(...) as client:
                 yield client
         return factory
     ```
3. **Performance**
   - **Risk:** Creating client per tool call might be expensive
   - **Reality:** httpx is designed for this pattern, connection pooling at transport level
   - **Mitigation:** Monitor performance, can optimize later if needed
4. **CLI Cloud Commands Edge Cases**
   - **Risk:** Token expires mid-operation
   - **Solution:** CLIAuth.get_valid_token() already handles refresh
   - **Validation:** Test cloud login → use tools → token refresh flow
5. **Backward Compatibility**
   - **Risk:** External code importing `client` directly
   - **Solution:** Keep `create_client()` and `client` for one version, deprecate
   - **Timeline:** Remove in next major version
## Implementation Tasks
### Phase 0: Basic Memory Refactor (Prerequisite)
#### 0.1 Core Refactor - async_client.py
- [x] Create branch `async-client-context-manager` in basic-memory repo
- [x] Implement `get_client()` context manager
- [x] Implement `set_client_factory()` for dependency injection
- [x] Add CLI cloud mode auth injection (CLIAuth integration)
- [x] Remove `api_url` config field (legacy, unused)
- [x] Keep `create_client()` temporarily for backward compatibility (deprecate later)
#### 0.2 Simplify Request Helpers - tools/utils.py
- [x] Remove `inject_auth_header()` calls from `call_get()`
- [x] Remove `inject_auth_header()` calls from `call_post()`
- [x] Remove `inject_auth_header()` calls from `call_put()`
- [x] Remove `inject_auth_header()` calls from `call_patch()`
- [x] Remove `inject_auth_header()` calls from `call_delete()`
- [x] Delete `src/basic_memory/mcp/tools/headers.py` entirely
- [x] Update imports in utils.py
#### 0.3 Update MCP Tools (~16 files)
Convert from `from async_client import client` to `async with get_client() as client:`
- [x] `tools/write_note.py` (34/34 tests passing)
- [x] `tools/read_note.py` (21/21 tests passing)
- [x] `tools/view_note.py` (12/12 tests passing - no changes needed, delegates to read_note)
- [x] `tools/delete_note.py` (2/2 tests passing)
- [x] `tools/read_content.py` (20/20 tests passing)
- [x] `tools/list_directory.py` (11/11 tests passing)
- [x] `tools/move_note.py` (34/34 tests passing, 90% coverage)
- [x] `tools/search.py` (16/16 tests passing, 96% coverage)
- [x] `tools/recent_activity.py` (4/4 tests passing, 82% coverage)
- [x] `tools/project_management.py` (3 functions: list_memory_projects, create_memory_project, delete_project - typecheck passed)
- [x] `tools/edit_note.py` (17/17 tests passing)
- [x] `tools/canvas.py` (5/5 tests passing)
- [x] `tools/build_context.py` (6/6 tests passing)
- [x] `tools/sync_status.py` (typecheck passed)
- [x] `prompts/continue_conversation.py` (typecheck passed)
- [x] `prompts/search.py` (typecheck passed)
- [x] `resources/project_info.py` (typecheck passed)
#### 0.4 Update CLI Commands (~3 files)
Remove manual auth header passing, use context manager:
- [x] `cli/commands/project.py` - removed get_authenticated_headers() calls, use context manager
- [x] `cli/commands/status.py` - use context manager
- [x] `cli/commands/command_utils.py` - use context manager
#### 0.5 Update Config
- [x] Remove `api_url` field from `BasicMemoryConfig` in config.py
- [x] Update any lingering references/docs (added deprecation notice to v15-docs/cloud-mode-usage.md)
#### 0.6 Testing
- [-] Update test fixtures to use factory pattern
- [x] Run full test suite in basic-memory
- [x] Verify cloud_mode_enabled works with CLIAuth injection
- [x] Run typecheck and linting
#### 0.7 Cloud Integration Prep
- [x] Update basic-memory-cloud pyproject.toml to use branch
- [x] Implement factory pattern in cloud app main.py
- [x] Remove `/proxy` prefix stripping logic (not needed - tools pass relative URLs)
#### 0.8 Phase 0 Validation
**Before merging async-client-context-manager branch:**
- [x] All tests pass locally
- [x] Typecheck passes (pyright/mypy)
- [x] Linting passes (ruff)
- [x] Manual test: local mode works (ASGI transport)
- [x] Manual test: cloud login → cloud mode works (HTTP transport with auth)
- [x] No import of `inject_auth_header` anywhere
- [x] `headers.py` file deleted
- [x] `api_url` config removed
- [x] Tool functions properly scoped (client inside async with)
- [ ] CLI commands properly scoped (client inside async with)
**Integration validation:**
- [x] basic-memory-cloud can import and use factory pattern
- [x] TenantDirectTransport works without touching header injection
- [x] No circular imports or lazy import issues
- [x] MCP tools work via inspector (local testing confirmed)
### Phase 1: Code Consolidation
- [x] Create feature branch `consolidate-mcp-cloud`
- [x] Update `apps/cloud/src/basic_memory_cloud/config.py`:
  - [x] Add `authkit_base_url` field (already has authkit_domain)
  - [x] Workers config already exists ✓
- [x] Update `apps/cloud/src/basic_memory_cloud/telemetry.py`:
  - [x] Add `logfire.instrument_mcp()` to existing setup
  - [x] Skip complex two-phase setup - use Cloud's simpler approach
- [x] Create `apps/cloud/src/basic_memory_cloud/middleware/jwt_context.py`:
  - [x] FastAPI middleware to extract JWT claims from Authorization header
  - [x] Add tenant context (workos_user_id) to logfire baggage
  - [x] Simpler than FastMCP middleware version
- [x] Update `apps/cloud/src/basic_memory_cloud/main.py`:
  - [x] Import FastMCP server from basic-memory
  - [x] Configure AuthKitProvider with WorkOS settings
  - [x] No FastMCP telemetry middleware needed (using FastAPI middleware instead)
  - [x] Create MCP ASGI app: `mcp_app = mcp.http_app(path='/mcp', stateless_http=True)`
  - [x] Combine lifespans (Cloud + MCP) using nested async context managers
  - [x] Mount MCP: `app.mount("/mcp", mcp_app)`
  - [x] Add JWT context middleware to FastAPI app
- [x] Run typecheck - passes ✓
### Phase 2: Direct Tenant Transport
- [x] Create `apps/cloud/src/basic_memory_cloud/transports/tenant_direct.py`:
  - [x] Implement `TenantDirectTransport(AsyncBaseTransport)`
  - [x] Use FastMCP DI (`get_http_headers()`) to extract JWT per-request
  - [x] Decode JWT to get `workos_user_id`
  - [x] Look up/create tenant via `TenantRepository.get_or_create_tenant_for_workos_user()`
  - [x] Build tenant app URL and add signed headers
  - [x] Make direct httpx call to tenant API
  - [x] No `/proxy` prefix stripping needed (tools pass relative URLs like `/main/resource/...`)
- [x] Update `apps/cloud/src/basic_memory_cloud/main.py`:
  - [x] Refactored to use factory pattern instead of module-level override
  - [x] Implement `tenant_direct_client_factory()` context manager
  - [x] Call `async_client.set_client_factory()` before importing MCP tools
  - [x] Clean imports, proper noqa hints for lint
- [x] Basic-memory refactor integrated (PR #344)
- [x] Run typecheck - passes ✓
- [x] Run lint - passes ✓
### Phase 3: Testing & Validation
- [x] Run `just typecheck` in apps/cloud
- [x] Run `just check` in project
- [x] Run `just fix` - all lint errors fixed ✓
- [x] Write comprehensive transport tests (11 tests passing) ✓
- [x] Test MCP tools locally with consolidated service (inspector confirmed working)
- [x] Verify OAuth authentication works (requires full deployment)
- [x] Verify tenant isolation via signed headers (requires full deployment)
- [x] Test /proxy endpoint still works for web UI
- [ ] Measure latency before/after consolidation
- [ ] Check telemetry traces span correctly
### Phase 4: Deployment Configuration
- [x] Update `apps/cloud/fly.template.toml`:
  - [x] Merged MCP-specific environment variables (AUTHKIT_BASE_URL, FASTMCP_LOG_LEVEL, BASIC_MEMORY_*)
  - [x] Added HTTP/2 backend support (`h2_backend = true`) for better MCP performance
  - [x] Added health check for MCP OAuth endpoint (`/.well-known/oauth-protected-resource`)
  - [x] Port 8000 already exposed - serves both Cloud routes and /mcp endpoint
  - [x] Workers configured (UVICORN_WORKERS = 4)
- [x] Update `.env.example`:
  - [x] Consolidated MCP Gateway section into Cloud app section
  - [x] Added AUTHKIT_BASE_URL, FASTMCP_LOG_LEVEL, BASIC_MEMORY_HOME
  - [x] Added LOG_LEVEL to Development Settings
  - [x] Documented that MCP now served at /mcp on Cloud service (port 8000)
- [x] Test deployment to preview environment (PR #113)
  - [x] OAuth authentication verified
  - [x] MCP tools successfully calling tenant APIs
  - [x] Fixed BM_TENANT_HEADER_SECRET synchronization issue
### Phase 5: Cleanup
- [x] Remove `apps/mcp/` directory entirely
- [x] Remove MCP-specific fly.toml and deployment configs
- [x] Update repository documentation
- [x] Update CLAUDE.md with new architecture
- [-] Archive old MCP deployment configs (if needed)
### Phase 6: Production Rollout
- [ ] Deploy to development and validate
- [ ] Monitor metrics and logs
- [ ] Deploy to production
- [ ] Verify production functionality
- [ ] Document performance improvements
## Migration Plan
### Phase 1: Preparation
1. Create feature branch `consolidate-mcp-cloud`
2. Update basic-memory async_client.py for direct ProxyService calls
3. Update apps/cloud/main.py to mount MCP
### Phase 2: Testing
1. Local testing with consolidated app
2. Deploy to development environment
3. Run full test suite
4. Performance benchmarking
### Phase 3: Deployment
1. Deploy to development
2. Validate all functionality
3. Deploy to production
4. Monitor for issues
### Phase 4: Cleanup
1. Remove apps/mcp directory
2. Update documentation
3. Update deployment scripts
4. Archive old MCP deployment configs
## Rollback Plan
If issues arise:
1. Revert feature branch
2. Redeploy separate apps/mcp and apps/cloud services
3. Restore previous fly.toml configurations
4. Document issues encountered
The well-organized code structure makes splitting back out feasible if future scaling needs diverge.
## How to Evaluate
### 1. Functional Testing
**MCP Tools:**
- [ ] All 17 MCP tools work via consolidated /mcp endpoint
- [x] OAuth authentication validates correctly
- [x] Tenant isolation maintained via signed headers
- [x] Project management tools function correctly
**Cloud Routes:**
- [x] /proxy endpoint still works for web UI
- [x] /provisioning routes functional
- [x] /webhooks routes functional
- [x] /tenants routes functional
**API Validation:**
- [x] Tenant API validates both JWT and signed headers
- [x] Unauthorized requests rejected appropriately
- [x] Multi-tenant isolation verified
### 2. Performance Testing
**Latency Reduction:**
- [x] Measure MCP tool latency before consolidation
- [x] Measure MCP tool latency after consolidation
- [x] Verify reduction from eliminated HTTP hop (expected: 20-50ms improvement)
**Resource Usage:**
- [x] Single app uses less total memory than two apps
- [x] Database connection pooling more efficient
- [x] HTTP client overhead reduced
### 3. Deployment Testing
**Fly.io Deployment:**
- [x] Single app deploys successfully
- [x] Health checks pass for consolidated service
- [x] No apps/mcp deployment required
- [x] Environment variables configured correctly
**Local Development:**
- [x] `just setup` works with consolidated architecture
- [x] Local testing shows MCP tools working
- [x] No regression in developer experience
### 4. Security Validation
**Defense in Depth:**
- [x] Tenant API still validates JWT tokens
- [x] Tenant API still validates signed headers
- [x] No access possible with only signed headers (JWT required)
- [x] No access possible with only JWT (signed headers required)
**Authorization:**
- [x] Users can only access their own tenant data
- [x] Cross-tenant requests rejected
- [x] Admin operations require proper authentication
### 5. Observability
**Telemetry:**
- [x] OpenTelemetry traces span across MCP → ProxyService → Tenant API
- [x] Logfire shows consolidated traces correctly
- [x] Error tracking and debugging still functional
- [x] Performance metrics accurate
**Logging:**
- [x] Structured logs show proper context (tenant_id, operation, etc.)
- [x] Error logs contain actionable information
- [x] Log volume reasonable for single app
## Success Criteria
1. **Functionality**: All MCP tools and Cloud routes work identically to before
2. **Performance**: Measurable latency reduction (>20ms average)
3. **Cost**: Single Fly.io app instead of two (50% infrastructure reduction)
4. **Security**: Dual validation maintained, no security regression
5. **Deployment**: Simplified deployment process, single app to manage
6. **Observability**: Telemetry and logging work correctly
## Notes
### Future Considerations
- **Independent scaling**: If MCP and Cloud need different scaling profiles in future, code organization supports splitting back out
- **Regional deployment**: Consolidated app can still be deployed to multiple regions
- **Edge caching**: Could add edge caching layer in front of consolidated service
### Dependencies
- SPEC-9: Signed Header Tenant Information (already implemented)
- SPEC-12: OpenTelemetry Observability (telemetry must work across merged services)
### Related Work
- basic-memory v0.13.x: MCP server implementation
- FastMCP documentation: Mounting on existing FastAPI apps
- Fly.io multi-service patterns
```
--------------------------------------------------------------------------------
/tests/mcp/test_tool_move_note.py:
--------------------------------------------------------------------------------
```python
"""Tests for the move_note MCP tool."""
import pytest
from unittest.mock import patch
from basic_memory.mcp.tools.move_note import move_note, _format_move_error_response
from basic_memory.mcp.tools.write_note import write_note
from basic_memory.mcp.tools.read_note import read_note
@pytest.mark.asyncio
async def test_move_note_success(app, client, test_project):
    """Test successfully moving a note to a new location."""
    # Create initial note
    await write_note.fn(
        project=test_project.name,
        title="Test Note",
        folder="source",
        content="# Test Note\nOriginal content here.",
    )
    # Move note
    result = await move_note.fn(
        project=test_project.name,
        identifier="source/test-note",
        destination_path="target/MovedNote.md",
    )
    assert isinstance(result, str)
    assert "✅ Note moved successfully" in result
    # Verify original location no longer exists
    try:
        await read_note.fn(test_project.name, "source/test-note")
        assert False, "Original note should not exist after move"
    except Exception:
        pass  # Expected - note should not exist at original location
    # Verify note exists at new location with same content
    content = await read_note.fn("target/moved-note", project=test_project.name)
    assert "# Test Note" in content
    assert "Original content here" in content
    assert "permalink: target/moved-note" in content
@pytest.mark.asyncio
async def test_move_note_with_folder_creation(client, test_project):
    """Test moving note creates necessary folders."""
    # Create initial note
    await write_note.fn(
        project=test_project.name,
        title="Deep Note",
        folder="",
        content="# Deep Note\nContent in root folder.",
    )
    # Move to deeply nested path
    result = await move_note.fn(
        project=test_project.name,
        identifier="deep-note",
        destination_path="deeply/nested/folder/DeepNote.md",
    )
    assert isinstance(result, str)
    assert "✅ Note moved successfully" in result
    # Verify note exists at new location
    content = await read_note.fn("deeply/nested/folder/deep-note", project=test_project.name)
    assert "# Deep Note" in content
    assert "Content in root folder" in content
@pytest.mark.asyncio
async def test_move_note_with_observations_and_relations(app, client, test_project):
    """Test moving note preserves observations and relations."""
    # Create note with complex semantic content
    await write_note.fn(
        project=test_project.name,
        title="Complex Entity",
        folder="source",
        content="""# Complex Entity
## Observations
- [note] Important observation #tag1
- [feature] Key feature #feature
## Relations
- relation to [[SomeOtherEntity]]
- depends on [[Dependency]]
Some additional content.
        """,
    )
    # Move note
    result = await move_note.fn(
        project=test_project.name,
        identifier="source/complex-entity",
        destination_path="target/MovedComplex.md",
    )
    assert isinstance(result, str)
    assert "✅ Note moved successfully" in result
    # Verify moved note preserves all content
    content = await read_note.fn("target/moved-complex", project=test_project.name)
    assert "Important observation #tag1" in content
    assert "Key feature #feature" in content
    assert "[[SomeOtherEntity]]" in content
    assert "[[Dependency]]" in content
    assert "Some additional content" in content
@pytest.mark.asyncio
async def test_move_note_by_title(client, test_project):
    """Test moving note using title as identifier."""
    # Create note with unique title
    await write_note.fn(
        project=test_project.name,
        title="UniqueTestTitle",
        folder="source",
        content="# UniqueTestTitle\nTest content.",
    )
    # Move using title as identifier
    result = await move_note.fn(
        project=test_project.name,
        identifier="UniqueTestTitle",
        destination_path="target/MovedByTitle.md",
    )
    assert isinstance(result, str)
    assert "✅ Note moved successfully" in result
    # Verify note exists at new location
    content = await read_note.fn("target/moved-by-title", project=test_project.name)
    assert "# UniqueTestTitle" in content
    assert "Test content" in content
@pytest.mark.asyncio
async def test_move_note_by_file_path(client, test_project):
    """Test moving note using file path as identifier."""
    # Create initial note
    await write_note.fn(
        project=test_project.name,
        title="PathTest",
        folder="source",
        content="# PathTest\nContent for path test.",
    )
    # Move using file path as identifier
    result = await move_note.fn(
        project=test_project.name,
        identifier="source/PathTest.md",
        destination_path="target/MovedByPath.md",
    )
    assert isinstance(result, str)
    assert "✅ Note moved successfully" in result
    # Verify note exists at new location
    content = await read_note.fn("target/moved-by-path", project=test_project.name)
    assert "# PathTest" in content
    assert "Content for path test" in content
@pytest.mark.asyncio
async def test_move_note_nonexistent_note(client, test_project):
    """Test moving a note that doesn't exist."""
    result = await move_note.fn(
        project=test_project.name,
        identifier="nonexistent/note",
        destination_path="target/SomeFile.md",
    )
    # Should return user-friendly error message string
    assert isinstance(result, str)
    assert "# Move Failed - Note Not Found" in result
    assert "could not be found for moving" in result
    assert "Search for the note first" in result
@pytest.mark.asyncio
async def test_move_note_invalid_destination_path(client, test_project):
    """Test moving note with invalid destination path."""
    # Create initial note
    await write_note.fn(
        project=test_project.name,
        title="TestNote",
        folder="source",
        content="# TestNote\nTest content.",
    )
    # Test absolute path (should be rejected by validation)
    result = await move_note.fn(
        project=test_project.name,
        identifier="source/test-note",
        destination_path="/absolute/path.md",
    )
    # Should return user-friendly error message string
    assert isinstance(result, str)
    assert "# Move Failed" in result
    assert "/absolute/path.md" in result or "Invalid" in result or "path" in result
@pytest.mark.asyncio
async def test_move_note_missing_file_extension(client, test_project):
    """Test moving note without file extension in destination path."""
    # Create initial note
    await write_note.fn(
        project=test_project.name,
        title="ExtensionTest",
        folder="source",
        content="# Extension Test\nTesting extension validation.",
    )
    # Test path without extension
    result = await move_note.fn(
        project=test_project.name,
        identifier="source/extension-test",
        destination_path="target/renamed-note",
    )
    # Should return error about missing extension
    assert isinstance(result, str)
    assert "# Move Failed - File Extension Required" in result
    assert "must include a file extension" in result
    assert ".md" in result
    assert "renamed-note.md" in result  # Should suggest adding .md
    # Test path with empty extension (edge case)
    result = await move_note.fn(
        project=test_project.name,
        identifier="source/extension-test",
        destination_path="target/renamed-note.",
    )
    assert isinstance(result, str)
    assert "# Move Failed - File Extension Required" in result
    assert "must include a file extension" in result
    # Test that note still exists at original location
    content = await read_note.fn("source/extension-test", project=test_project.name)
    assert "# Extension Test" in content
    assert "Testing extension validation" in content
@pytest.mark.asyncio
async def test_move_note_file_extension_mismatch(client, test_project):
    """Test that moving note with different extension is blocked."""
    # Create initial note with .md extension
    await write_note.fn(
        project=test_project.name,
        title="MarkdownNote",
        folder="source",
        content="# Markdown Note\nThis is a markdown file.",
    )
    # Try to move with .txt extension
    result = await move_note.fn(
        project=test_project.name,
        identifier="source/markdown-note",
        destination_path="target/renamed-note.txt",
    )
    # Should return error about extension mismatch
    assert isinstance(result, str)
    assert "# Move Failed - File Extension Mismatch" in result
    assert "does not match the source file extension" in result
    assert ".md" in result
    assert ".txt" in result
    assert "renamed-note.md" in result  # Should suggest correct extension
    # Test that note still exists at original location with original extension
    content = await read_note.fn("source/markdown-note", project=test_project.name)
    assert "# Markdown Note" in content
    assert "This is a markdown file" in content
@pytest.mark.asyncio
async def test_move_note_preserves_file_extension(client, test_project):
    """Test that moving note with matching extension succeeds."""
    # Create initial note with .md extension
    await write_note.fn(
        project=test_project.name,
        title="PreserveExtension",
        folder="source",
        content="# Preserve Extension\nTesting that extension is preserved.",
    )
    # Move with same .md extension
    result = await move_note.fn(
        project=test_project.name,
        identifier="source/preserve-extension",
        destination_path="target/preserved-note.md",
    )
    # Should succeed
    assert isinstance(result, str)
    assert "✅ Note moved successfully" in result
    # Verify note exists at new location with same extension
    content = await read_note.fn("target/preserved-note", project=test_project.name)
    assert "# Preserve Extension" in content
    assert "Testing that extension is preserved" in content
    # Verify old location no longer exists
    try:
        await read_note.fn("source/preserve-extension")
        assert False, "Original note should not exist after move"
    except Exception:
        pass  # Expected
@pytest.mark.asyncio
async def test_move_note_destination_exists(client, test_project):
    """Test moving note to existing destination."""
    # Create source note
    await write_note.fn(
        project=test_project.name,
        title="SourceNote",
        folder="source",
        content="# SourceNote\nSource content.",
    )
    # Create destination note
    await write_note.fn(
        project=test_project.name,
        title="DestinationNote",
        folder="target",
        content="# DestinationNote\nDestination content.",
    )
    # Try to move source to existing destination
    result = await move_note.fn(
        project=test_project.name,
        identifier="source/source-note",
        destination_path="target/DestinationNote.md",
    )
    # Should return user-friendly error message string
    assert isinstance(result, str)
    assert "# Move Failed" in result
    assert "already exists" in result or "Destination" in result
@pytest.mark.asyncio
async def test_move_note_same_location(client, test_project):
    """Test moving note to the same location."""
    # Create initial note
    await write_note.fn(
        project=test_project.name,
        title="SameLocationTest",
        folder="test",
        content="# SameLocationTest\nContent here.",
    )
    # Try to move to same location
    result = await move_note.fn(
        project=test_project.name,
        identifier="test/same-location-test",
        destination_path="test/SameLocationTest.md",
    )
    # Should return user-friendly error message string
    assert isinstance(result, str)
    assert "# Move Failed" in result
    assert "already exists" in result or "same" in result or "Destination" in result
@pytest.mark.asyncio
async def test_move_note_rename_only(client, test_project):
    """Test moving note within same folder (rename operation)."""
    # Create initial note
    await write_note.fn(
        project=test_project.name,
        title="OriginalName",
        folder="test",
        content="# OriginalName\nContent to rename.",
    )
    # Rename within same folder
    await move_note.fn(
        project=test_project.name,
        identifier="test/original-name",
        destination_path="test/NewName.md",
    )
    # Verify original is gone
    try:
        await read_note.fn("test/original-name", project=test_project.name)
        assert False, "Original note should not exist after rename"
    except Exception:
        pass  # Expected
    # Verify new name exists with same content
    content = await read_note.fn("test/new-name", project=test_project.name)
    assert "# OriginalName" in content  # Title in content remains same
    assert "Content to rename" in content
    assert "permalink: test/new-name" in content
@pytest.mark.asyncio
async def test_move_note_complex_filename(client, test_project):
    """Test moving note with spaces in filename."""
    # Create note with spaces in name
    await write_note.fn(
        project=test_project.name,
        title="Meeting Notes 2025",
        folder="meetings",
        content="# Meeting Notes 2025\nMeeting content with dates.",
    )
    # Move to new location
    result = await move_note.fn(
        project=test_project.name,
        identifier="meetings/meeting-notes-2025",
        destination_path="archive/2025/meetings/Meeting Notes 2025.md",
    )
    assert isinstance(result, str)
    assert "✅ Note moved successfully" in result
    # Verify note exists at new location with correct content
    content = await read_note.fn(
        "archive/2025/meetings/meeting-notes-2025", project=test_project.name
    )
    assert "# Meeting Notes 2025" in content
    assert "Meeting content with dates" in content
@pytest.mark.asyncio
async def test_move_note_with_tags(app, client, test_project):
    """Test moving note with tags preserves tags."""
    # Create note with tags
    await write_note.fn(
        project=test_project.name,
        title="Tagged Note",
        folder="source",
        content="# Tagged Note\nContent with tags.",
        tags=["important", "work", "project"],
    )
    # Move note
    result = await move_note.fn(
        project=test_project.name,
        identifier="source/tagged-note",
        destination_path="target/MovedTaggedNote.md",
    )
    assert isinstance(result, str)
    assert "✅ Note moved successfully" in result
    # Verify tags are preserved in correct YAML format
    content = await read_note.fn("target/moved-tagged-note", project=test_project.name)
    assert "- important" in content
    assert "- work" in content
    assert "- project" in content
@pytest.mark.asyncio
async def test_move_note_empty_string_destination(client, test_project):
    """Test moving note with empty destination path."""
    # Create initial note
    await write_note.fn(
        project=test_project.name,
        title="TestNote",
        folder="source",
        content="# TestNote\nTest content.",
    )
    # Test empty destination path
    result = await move_note.fn(
        project=test_project.name,
        identifier="source/test-note",
        destination_path="",
    )
    # Should return user-friendly error message string
    assert isinstance(result, str)
    assert "# Move Failed" in result
    assert "empty" in result or "Invalid" in result or "path" in result
@pytest.mark.asyncio
async def test_move_note_parent_directory_path(client, test_project):
    """Test moving note with parent directory in destination path."""
    # Create initial note
    await write_note.fn(
        project=test_project.name,
        title="TestNote",
        folder="source",
        content="# TestNote\nTest content.",
    )
    # Test parent directory path
    result = await move_note.fn(
        project=test_project.name,
        identifier="source/test-note",
        destination_path="../parent/file.md",
    )
    # Should return user-friendly error message string
    assert isinstance(result, str)
    assert "# Move Failed" in result
    assert "parent" in result or "Invalid" in result or "path" in result or ".." in result
@pytest.mark.asyncio
async def test_move_note_identifier_variations(client, test_project):
    """Test that various identifier formats work for moving."""
    # Create a note to test different identifier formats
    await write_note.fn(
        project=test_project.name,
        title="Test Document",
        folder="docs",
        content="# Test Document\nContent for testing identifiers.",
    )
    # Test with permalink identifier
    result = await move_note.fn(
        project=test_project.name,
        identifier="docs/test-document",
        destination_path="moved/TestDocument.md",
    )
    assert isinstance(result, str)
    assert "✅ Note moved successfully" in result
    # Verify it moved correctly
    content = await read_note.fn("moved/test-document", project=test_project.name)
    assert "# Test Document" in content
    assert "Content for testing identifiers" in content
@pytest.mark.asyncio
async def test_move_note_preserves_frontmatter(app, client, test_project):
    """Test that moving preserves custom frontmatter."""
    # Create note with custom frontmatter by first creating it normally
    await write_note.fn(
        project=test_project.name,
        title="Custom Frontmatter Note",
        folder="source",
        content="# Custom Frontmatter Note\nContent with custom metadata.",
    )
    # Move the note
    result = await move_note.fn(
        project=test_project.name,
        identifier="source/custom-frontmatter-note",
        destination_path="target/MovedCustomNote.md",
    )
    assert isinstance(result, str)
    assert "✅ Note moved successfully" in result
    # Verify the moved note has proper frontmatter structure
    content = await read_note.fn("target/moved-custom-note", project=test_project.name)
    assert "title: Custom Frontmatter Note" in content
    assert "type: note" in content
    assert "permalink: target/moved-custom-note" in content
    assert "# Custom Frontmatter Note" in content
    assert "Content with custom metadata" in content
class TestMoveNoteErrorFormatting:
    """Test move note error formatting for better user experience."""
    def test_format_move_error_invalid_path(self):
        """Test formatting for invalid path errors."""
        result = _format_move_error_response("invalid path format", "test-note", "/invalid/path.md")
        assert "# Move Failed - Invalid Destination Path" in result
        assert "The destination path '/invalid/path.md' is not valid" in result
        assert "Relative paths only" in result
        assert "Include file extension" in result
    def test_format_move_error_permission_denied(self):
        """Test formatting for permission errors."""
        result = _format_move_error_response("permission denied", "test-note", "target/file.md")
        assert "# Move Failed - Permission Error" in result
        assert "You don't have permission to move 'test-note'" in result
        assert "Check file permissions" in result
        assert "Check file locks" in result
    def test_format_move_error_source_missing(self):
        """Test formatting for source file missing errors."""
        result = _format_move_error_response("source file missing", "test-note", "target/file.md")
        assert "# Move Failed - Source File Missing" in result
        assert "The source file for 'test-note' was not found on disk" in result
        assert "database and filesystem are out of sync" in result
    def test_format_move_error_server_error(self):
        """Test formatting for server errors."""
        result = _format_move_error_response("server error occurred", "test-note", "target/file.md")
        assert "# Move Failed - System Error" in result
        assert "A system error occurred while moving 'test-note'" in result
        assert "Try again" in result
        assert "Check disk space" in result
class TestMoveNoteSecurityValidation:
    """Test move note security validation features."""
    @pytest.mark.asyncio
    async def test_move_note_blocks_path_traversal_unix(self, client, test_project):
        """Test that Unix-style path traversal attacks are blocked."""
        # Create initial note
        await write_note.fn(
            project=test_project.name,
            title="Test Note",
            folder="source",
            content="# Test Note\nTest content for security testing.",
        )
        # Test various Unix-style path traversal patterns
        attack_paths = [
            "../secrets.txt",
            "../../etc/passwd",
            "../../../root/.ssh/id_rsa",
            "notes/../../../etc/shadow",
            "folder/../../outside/file.md",
            "../../../../etc/hosts",
        ]
        for attack_path in attack_paths:
            result = await move_note.fn(
                project=test_project.name,
                identifier="source/test-note",
                destination_path=attack_path,
            )
            assert isinstance(result, str)
            assert "# Move Failed - Security Validation Error" in result
            assert "paths must stay within project boundaries" in result
            assert attack_path in result
            assert "Try again with a safe path" in result
    @pytest.mark.asyncio
    async def test_move_note_blocks_path_traversal_windows(self, client, test_project):
        """Test that Windows-style path traversal attacks are blocked."""
        # Create initial note
        await write_note.fn(
            project=test_project.name,
            title="Test Note",
            folder="source",
            content="# Test Note\nTest content for security testing.",
        )
        # Test various Windows-style path traversal patterns
        attack_paths = [
            "..\\secrets.txt",
            "..\\..\\Windows\\System32\\config\\SAM",
            "notes\\..\\..\\..\\Windows\\System32",
            "\\\\server\\share\\file.txt",
            "..\\..\\Users\\user\\.env",
            "\\\\..\\..\\Windows",
        ]
        for attack_path in attack_paths:
            result = await move_note.fn(
                project=test_project.name,
                identifier="source/test-note",
                destination_path=attack_path,
            )
            assert isinstance(result, str)
            assert "# Move Failed - Security Validation Error" in result
            assert "paths must stay within project boundaries" in result
            assert attack_path in result
    @pytest.mark.asyncio
    async def test_move_note_blocks_absolute_paths(self, client, test_project):
        """Test that absolute paths are blocked."""
        # Create initial note
        await write_note.fn(
            project=test_project.name,
            title="Test Note",
            folder="source",
            content="# Test Note\nTest content for security testing.",
        )
        # Test various absolute path patterns
        attack_paths = [
            "/etc/passwd",
            "/home/user/.env",
            "/var/log/auth.log",
            "/root/.ssh/id_rsa",
            "C:\\Windows\\System32\\config\\SAM",
            "C:\\Users\\user\\.env",
            "D:\\secrets\\config.json",
            "/tmp/malicious.txt",
        ]
        for attack_path in attack_paths:
            result = await move_note.fn(
                project=test_project.name,
                identifier="source/test-note",
                destination_path=attack_path,
            )
            assert isinstance(result, str)
            assert "# Move Failed - Security Validation Error" in result
            assert "paths must stay within project boundaries" in result
            assert attack_path in result
    @pytest.mark.asyncio
    async def test_move_note_blocks_home_directory_access(self, client, test_project):
        """Test that home directory access patterns are blocked."""
        # Create initial note
        await write_note.fn(
            project=test_project.name,
            title="Test Note",
            folder="source",
            content="# Test Note\nTest content for security testing.",
        )
        # Test various home directory access patterns
        attack_paths = [
            "~/secrets.txt",
            "~/.env",
            "~/.ssh/id_rsa",
            "~/Documents/passwords.txt",
            "~\\AppData\\secrets",
            "~\\Desktop\\config.ini",
        ]
        for attack_path in attack_paths:
            result = await move_note.fn(
                project=test_project.name,
                identifier="source/test-note",
                destination_path=attack_path,
            )
            assert isinstance(result, str)
            assert "# Move Failed - Security Validation Error" in result
            assert "paths must stay within project boundaries" in result
            assert attack_path in result
    @pytest.mark.asyncio
    async def test_move_note_blocks_mixed_attack_patterns(self, client, test_project):
        """Test that mixed legitimate/attack patterns are blocked."""
        # Create initial note
        await write_note.fn(
            project=test_project.name,
            title="Test Note",
            folder="source",
            content="# Test Note\nTest content for security testing.",
        )
        # Test mixed patterns that start legitimate but contain attacks
        attack_paths = [
            "notes/../../../etc/passwd",
            "docs/../../.env",
            "legitimate/path/../../.ssh/id_rsa",
            "project/folder/../../../Windows/System32",
            "valid/folder/../../home/user/.bashrc",
        ]
        for attack_path in attack_paths:
            result = await move_note.fn(
                project=test_project.name,
                identifier="source/test-note",
                destination_path=attack_path,
            )
            assert isinstance(result, str)
            assert "# Move Failed - Security Validation Error" in result
            assert "paths must stay within project boundaries" in result
    @pytest.mark.asyncio
    async def test_move_note_allows_safe_paths(self, client, test_project):
        """Test that legitimate paths are still allowed."""
        # Create initial note
        await write_note.fn(
            project=test_project.name,
            title="Test Note",
            folder="source",
            content="# Test Note\nTest content for security testing.",
        )
        # Test various safe path patterns
        safe_paths = [
            "notes/meeting.md",
            "docs/readme.txt",
            "projects/2025/planning.md",
            "archive/old-notes/backup.md",
            "deep/nested/directory/structure/file.txt",
            "folder/subfolder/document.md",
        ]
        for safe_path in safe_paths:
            result = await move_note.fn(
                project=test_project.name,
                identifier="source/test-note",
                destination_path=safe_path,
            )
            # Should succeed or fail for legitimate reasons (not security)
            assert isinstance(result, str)
            # Should NOT contain security error message
            assert "Security Validation Error" not in result
            # If it fails, it should be for other reasons like "already exists" or API errors
            if "Move Failed" in result:
                assert "paths must stay within project boundaries" not in result
    @pytest.mark.asyncio
    async def test_move_note_security_logging(self, client, test_project, caplog):
        """Test that security violations are properly logged."""
        # Create initial note
        await write_note.fn(
            project=test_project.name,
            title="Test Note",
            folder="source",
            content="# Test Note\nTest content for security testing.",
        )
        # Attempt path traversal attack
        result = await move_note.fn(
            project=test_project.name,
            identifier="source/test-note",
            destination_path="../../../etc/passwd",
        )
        assert "# Move Failed - Security Validation Error" in result
        # Check that security violation was logged
        # Note: This test may need adjustment based on the actual logging setup
        # The security validation should generate a warning log entry
    @pytest.mark.asyncio
    async def test_move_note_empty_path_security(self, client, test_project):
        """Test that empty destination path is handled securely."""
        # Create initial note
        await write_note.fn(
            project=test_project.name,
            title="Test Note",
            folder="source",
            content="# Test Note\nTest content for security testing.",
        )
        # Test empty destination path (should be allowed as it resolves to project root)
        result = await move_note.fn(
            project=test_project.name,
            identifier="source/test-note",
            destination_path="",
        )
        assert isinstance(result, str)
        # Empty path should not trigger security error (it's handled by pathlib validation)
        # But may fail for other API-related reasons
    @pytest.mark.asyncio
    async def test_move_note_current_directory_references_security(self, client, test_project):
        """Test that current directory references are handled securely."""
        # Create initial note
        await write_note.fn(
            project=test_project.name,
            title="Test Note",
            folder="source",
            content="# Test Note\nTest content for security testing.",
        )
        # Test current directory references (should be safe)
        safe_paths = [
            "./notes/file.md",
            "folder/./file.md",
            "./folder/subfolder/file.md",
        ]
        for safe_path in safe_paths:
            result = await move_note.fn(
                project=test_project.name,
                identifier="source/test-note",
                destination_path=safe_path,
            )
            assert isinstance(result, str)
            # Should NOT contain security error message
            assert "Security Validation Error" not in result
class TestMoveNoteErrorHandling:
    """Test move note exception handling."""
    @pytest.mark.asyncio
    async def test_move_note_exception_handling(self):
        """Test exception handling in move_note."""
        with patch("basic_memory.mcp.tools.move_note.get_active_project") as mock_get_project:
            mock_get_project.return_value.project_url = "http://test"
            mock_get_project.return_value.name = "test-project"
            with patch(
                "basic_memory.mcp.tools.move_note.call_post",
                side_effect=Exception("entity not found"),
            ):
                result = await move_note.fn("test-note", "target/file.md", project="test-project")
                assert isinstance(result, str)
                assert "# Move Failed - Note Not Found" in result
    @pytest.mark.asyncio
    async def test_move_note_permission_error_handling(self):
        """Test permission error handling in move_note."""
        with patch("basic_memory.mcp.tools.move_note.get_active_project") as mock_get_project:
            mock_get_project.return_value.project_url = "http://test"
            mock_get_project.return_value.name = "test-project"
            with patch(
                "basic_memory.mcp.tools.move_note.call_post",
                side_effect=Exception("permission denied"),
            ):
                result = await move_note.fn("test-note", "target/file.md", project="test-project")
                assert isinstance(result, str)
                assert "# Move Failed - Permission Error" in result
```
--------------------------------------------------------------------------------
/specs/SPEC-8 TigrisFS Integration.md:
--------------------------------------------------------------------------------
```markdown
---
title: 'SPEC-8: TigrisFS Integration for Tenant API'
Date: September 22, 2025
Status: Phase 3.6 Complete - Tenant Mount API Endpoints Ready for CLI Implementation
Priority: High
Goal: Replace Fly volumes with Tigris bucket provisioning in production tenant API
permalink: spec-8-tigris-fs-integration
---
## Executive Summary
Based on SPEC-7 Phase 4 POC testing, this spec outlines productizing the TigrisFS/rclone implementation in the Basic Memory Cloud tenant API. 
We're moving from proof-of-concept to production integration, replacing Fly volume storage with Tigris bucket-per-tenant architecture.
## Current Architecture (Fly Volumes)
### Tenant Provisioning Flow
```python
# apps/cloud/src/basic_memory_cloud/workflows/tenant_provisioning.py
async def provision_tenant_infrastructure(tenant_id: str):
    # 1. Create Fly app
    # 2. Create Fly volume  ← REPLACE THIS
    # 3. Deploy API container with volume mount
    # 4. Configure health checks
```
### Storage Implementation
- Each tenant gets dedicated Fly volume (1GB-10GB)
- Volume mounted at `/app/data` in API container
- Local filesystem storage with Basic Memory indexing
- No global caching or edge distribution
## Proposed Architecture (Tigris Buckets)
### New Tenant Provisioning Flow
```python
async def provision_tenant_infrastructure(tenant_id: str):
    # 1. Create Fly app
    # 2. Create Tigris bucket with admin credentials  ← NEW
    # 3. Store bucket name in tenant record  ← NEW
    # 4. Deploy API container with TigrisFS mount using admin credentials
    # 5. Configure health checks
```
### Storage Implementation
- Each tenant gets dedicated Tigris bucket
- TigrisFS mounts bucket at `/app/data` in API container
- Global edge caching and distribution
- Configurable cache TTL for sync performance
## Implementation Plan
### Phase 1: Bucket Provisioning Service
**✅ IMPLEMENTED: StorageClient with Admin Credentials**
```python
# apps/cloud/src/basic_memory_cloud/clients/storage_client.py
class StorageClient:
    async def create_tenant_bucket(self, tenant_id: UUID) -> TigrisBucketCredentials
    async def delete_tenant_bucket(self, tenant_id: UUID, bucket_name: str) -> bool
    async def list_buckets(self) -> list[TigrisBucketResponse]
    async def test_tenant_credentials(self, credentials: TigrisBucketCredentials) -> bool
```
**Simplified Architecture Using Admin Credentials:**
- Single admin access key with full Tigris permissions (configured in console)
- No tenant-specific IAM user creation needed
- Bucket-per-tenant isolation for logical separation
- Admin credentials shared across all tenant operations
**Integrate with Provisioning workflow:**
```python
# Update tenant_provisioning.py
async def provision_tenant_infrastructure(tenant_id: str):
    storage_client = StorageClient(settings.aws_access_key_id, settings.aws_secret_access_key)
    bucket_creds = await storage_client.create_tenant_bucket(tenant_id)
    await store_bucket_name(tenant_id, bucket_creds.bucket_name)
    await deploy_api_with_tigris(tenant_id, bucket_creds)
```
### Phase 2: Simplified Bucket Management
**✅ SIMPLIFIED: Admin Credentials + Bucket Names Only**
Since we use admin credentials for all operations, we only need to track bucket names per tenant:
1. **Primary Storage (Fly Secrets)**
   ```bash
   flyctl secrets set -a basic-memory-{tenant_id} \
     AWS_ACCESS_KEY_ID="{admin_access_key}" \
     AWS_SECRET_ACCESS_KEY="{admin_secret_key}" \
     AWS_ENDPOINT_URL_S3="https://fly.storage.tigris.dev" \
     AWS_REGION="auto" \
     BUCKET_NAME="basic-memory-{tenant_id}"
   ```
2. **Database Storage (Bucket Name Only)**
   ```python
   # apps/cloud/src/basic_memory_cloud/models/tenant.py
   class Tenant(BaseModel):
       # ... existing fields
       tigris_bucket_name: Optional[str] = None  # Just store bucket name
       tigris_region: str = "auto"
       created_at: datetime
   ```
**Benefits of Simplified Approach:**
- No credential encryption/decryption needed
- Admin credentials managed centrally in environment
- Only bucket names stored in database (not sensitive)
- Simplified backup/restore scenarios
- Reduced security attack surface
### Phase 3: API Container Updates
**Update API container configuration:**
```dockerfile
# apps/api/Dockerfile
# Add TigrisFS installation
RUN curl -L https://github.com/tigrisdata/tigrisfs/releases/latest/download/tigrisfs-linux-amd64 \
    -o /usr/local/bin/tigrisfs && chmod +x /usr/local/bin/tigrisfs
```
**Startup script integration:**
```bash
# apps/api/tigrisfs-startup.sh (already exists)
# Mount TigrisFS → Start Basic Memory API
exec python -m basic_memory_cloud_api.main
```
**Fly.toml environment (optimized for < 5s startup):**
```toml
# apps/api/fly.tigris-production.toml
[env]
  TIGRISFS_MEMORY_LIMIT = '1024'     # Reduced for faster init
  TIGRISFS_MAX_FLUSHERS = '16'       # Fewer threads for faster startup
  TIGRISFS_STAT_CACHE_TTL = '30s'    # Balance sync speed vs startup
  TIGRISFS_LAZY_INIT = 'true'        # Enable lazy loading
  BASIC_MEMORY_HOME = '/app/data'
# Suspend optimization for wake-on-network
[machine]
  auto_stop_machines = "suspend"     # Faster than full stop
  auto_start_machines = true
  min_machines_running = 0
```
### Phase 4: Local Access Features
**CLI automation for local mounting:**
```python
# New CLI command: basic-memory cloud mount
async def setup_local_mount(tenant_id: str):
    # 1. Fetch bucket credentials from cloud API
    # 2. Configure rclone with scoped IAM policy
    # 3. Mount via rclone nfsmount (macOS) or FUSE (Linux)
    # 4. Start Basic Memory sync watcher
```
**Local mount configuration:**
```bash
# rclone config for tenant
rclone mount basic-memory-{tenant_id}: ~/basic-memory-{tenant_id} \
  --nfs-mount \
  --vfs-cache-mode writes \
  --cache-dir ~/.cache/rclone/basic-memory-{tenant_id}
```
### Phase 5: TigrisFS Cache Sync Solutions
**Problem**: When files are uploaded via CLI/bisync, the tenant API container doesn't see them immediately due to TigrisFS cache (30s TTL) and lack of inotify events on mounted filesystems.
**Multi-Layer Solution:**
**Layer 1: API Sync Endpoint** (Immediate)
```python
# POST /sync - Force TigrisFS cache refresh
# Callable by CLI after uploads
subprocess.run(["sync", "fsync /app/data"], check=True)
```
**Layer 2: Tigris Webhook Integration** (Real-time)
https://www.tigrisdata.com/docs/buckets/object-notifications/#webhook
```python
# Webhook endpoint for bucket changes
@app.post("/webhooks/tigris/{tenant_id}")
async def handle_bucket_notification(tenant_id: str, event: TigrisEvent):
    if event.eventName in ["OBJECT_CREATED_PUT", "OBJECT_DELETED"]:
        await notify_container_sync(tenant_id, event.object.key)
```
**Layer 3: CLI Sync Notification** (User-triggered)
```bash
# CLI calls container sync endpoint after successful bisync
basic-memory cloud bisync  # Automatically notifies container
curl -X POST https://basic-memory-{tenant-id}.fly.dev/sync
```
**Layer 4: Periodic Sync Fallback** (Safety net)
```python
# Background task: fsync /app/data every 30s as fallback
# Ensures eventual consistency even if other layers fail
```
**Implementation Priority:**
1. Layer 1 (API endpoint) - Quick testing capability
2. Layer 3 (CLI integration) - Improved UX
3. Layer 4 (Periodic fallback) - Safety net
4. Layer 2 (Webhooks) - Production real-time sync
## Performance Targets
### Sync Latency
- **Target**: < 5 seconds local→cloud→container
- **Configuration**: `TIGRISFS_STAT_CACHE_TTL = '5s'`
- **Monitoring**: Track sync metrics in production
### Container Startup
- **Target**: < 5 seconds including TigrisFS mount
- **Fast retry**: 0.5s intervals for mount verification
- **Fallback**: Container fails fast if mount fails
### Memory Usage
- **TigrisFS cache**: 2GB memory limit per container
- **Concurrent uploads**: 32 flushers max
- **VM sizing**: shared-cpu-2x (2048mb) minimum
## Security Considerations
### Bucket Isolation
- Each tenant has dedicated bucket
- IAM policies prevent cross-tenant access
- No shared bucket with subdirectories
### Credential Security
- Fly secrets for runtime access
- Encrypted database backup for disaster recovery
- Credential rotation capability
### Data Residency
- Tigris global edge caching
- SOC2 Type II compliance
- Encryption at rest and in transit
## Operational Benefits
### Scalability
- Horizontal scaling with stateless API containers
- Global edge distribution
- Better resource utilization
### Reliability
- No cold starts between tenants
- Built-in redundancy and caching
- Simplified backup strategy
### Cost Efficiency
- Pay-per-use storage pricing
- Shared infrastructure benefits
- Reduced operational overhead
## Risk Mitigation
### Data Loss Prevention
- Dual credential storage (Fly + database)
- Automated backup workflows to R2/S3
- Tigris built-in redundancy
### Performance Degradation
- Configurable cache settings per tenant
- Monitoring and alerting on sync latency
- Fallback to volume storage if needed
### Security Vulnerabilities
- Bucket-per-tenant isolation
- Regular credential rotation
- Security scanning and monitoring
## Success Metrics
### Technical Metrics
- Sync latency P50 < 5 seconds
- Container startup time < 5 seconds
- Zero data loss incidents
- 99.9% uptime per tenant
### Business Metrics
- Reduced infrastructure costs vs volumes
- Improved user experience with faster sync
- Enhanced enterprise security posture
- Simplified operational overhead
## Open Questions
1. **Tigris rate limits**: What are the API limits for bucket creation?
2. **Cost analysis**: What's the break-even point vs Fly volumes?
3. **Regional preferences**: Should enterprise customers choose regions?
4. **Backup retention**: How long to keep automated backups?
## Implementation Checklist
### Phase 1: Bucket Provisioning Service ✅ COMPLETED
- [x] **Research Tigris bucket API** - Document bucket creation and S3 API compatibility
- [x] **Create StorageClient class** - Implemented with admin credentials and comprehensive integration tests
- [x] **Test bucket creation** - Full test suite validates API integration with real Tigris environment
- [x] **Add bucket provisioning to DBOS workflow** - Integrated StorageClient with tenant_provisioning.py
### Phase 2: Simplified Bucket Management ✅ COMPLETED
- [x] **Update Tenant model** with tigris_bucket_name field (replaced fly_volume_id)
- [x] **Implement bucket name storage** - Database migration and model updates completed
- [x] **Test bucket provisioning integration** - Full test suite validates workflow from tenant creation to bucket assignment
- [x] **Remove volume logic from all tests** - Complete migration from volume-based to bucket-based architecture
### Phase 3: API Container Integration ✅ COMPLETED
- [x] **Update Dockerfile** to install TigrisFS binary in API container with configurable version
- [x] **Optimize tigrisfs-startup.sh** with production-ready security and reliability improvements
- [x] **Create production-ready container** with proper signal handling and mount validation
- [x] **Implement security fixes** based on Claude code review (conditional debug, credential protection)
- [x] **Add proper process supervision** with cleanup traps and error handling
- [x] **Remove debug artifacts** - Cleaned up all debug Dockerfiles and test scripts
### Phase 3.5: IAM Access Key Management ✅ COMPLETED
- [x] **Research Tigris IAM API** - Documented create_policy, attach_user_policy, delete_access_key operations
- [x] **Implement bucket-scoped credential generation** - StorageClient.create_tenant_access_keys() with IAM policies
- [x] **Add comprehensive security test suite** - 5 security-focused integration tests covering all attack vectors
- [x] **Verify cross-bucket access prevention** - Scoped credentials can ONLY access their designated bucket
- [x] **Test credential lifecycle management** - Create, validate, delete, and revoke access keys
- [x] **Validate admin vs scoped credential isolation** - Different access patterns and security boundaries
- [x] **Test multi-tenant isolation** - Multiple tenants cannot access each other's buckets
### Phase 3.6: Tenant Mount API Endpoints ✅ COMPLETED
- [x] **Implement GET /tenant/mount/info** - Returns mount info without exposing credentials
- [x] **Implement POST /tenant/mount/credentials** - Creates new bucket-scoped credentials for CLI mounting
- [x] **Implement DELETE /tenant/mount/credentials/{cred_id}** - Revoke specific credentials with proper cleanup
- [x] **Implement GET /tenant/mount/credentials** - List active credentials without exposing secrets
- [x] **Add TenantMountCredentials database model** - Tracks credential metadata (no secret storage)
- [x] **Create comprehensive test suite** - 28 tests covering all scenarios including multi-session support
- [x] **Implement multi-session credential flow** - Multiple active credentials per tenant supported
- [x] **Secure credential handling** - Secret keys never stored, returned once only for immediate use
- [x] **Add dependency injection for StorageClient** - Clean integration with existing API architecture
- [x] **Fix Tigris configuration for cloud service** - Added AWS environment variables to fly.template.toml
- [x] **Update tenant machine configurations** - Include AWS credentials for TigrisFS mounting with clear credential strategy
**Security Test Results:**
```
✅ Cross-bucket access prevention - PASS
✅ Deleted credentials access revoked - PASS
✅ Invalid credentials rejected - PASS
✅ Admin vs scoped credential isolation - PASS
✅ Multiple scoped credentials isolation - PASS
```
**Implementation Details:**
- Uses Tigris IAM managed policies (create_policy + attach_user_policy)
- Bucket-scoped S3 policies with Actions: GetObject, PutObject, DeleteObject, ListBucket
- Resource ARNs limited to specific bucket: `arn:aws:s3:::bucket-name` and `arn:aws:s3:::bucket-name/*`
- Access keys follow Tigris format: `tid_` prefix with secure random suffix
- Complete cleanup on deletion removes both access keys and associated policies
### Phase 4: Local Access CLI
- [x] **Design local mount CLI command** for automated rclone configuration
- [x] **Implement credential fetching** from cloud API for local setup
- [x] **Create rclone config automation** for tenant-specific bucket mounting
- [x] **Test local→cloud→container sync** with optimized cache settings
- [x] **Document local access setup** for beta users
### Phase 5: Webhook Integration (Future)
- [ ] **Research Tigris webhook API** for object notifications and payload format
- [ ] **Design webhook endpoint** for real-time sync notifications
- [ ] **Implement notification handling** to trigger Basic Memory sync events
- [ ] **Test webhook delivery** and sync latency improvements
## Success Metrics
- [ ] **Container startup < 5 seconds** including TigrisFS mount and Basic Memory init
- [ ] **Sync latency < 5 seconds** for local→cloud→container file changes
- [ ] **Zero data loss** during bucket provisioning and credential management
- [ ] **100% test coverage** for new TigrisBucketService and credential functions
- [ ] **Beta deployment** with internal users validating local-cloud workflow
## Implementation Notes
## Phase 4.1: Bidirectional Sync with rclone bisync (NEW)
### Problem Statement
During testing, we discovered that some applications (particularly Obsidian) don't detect file changes over NFS mounts. Rather than building a custom sync daemon, we can leverage `rclone bisync` - rclone's built-in bidirectional synchronization feature.
### Solution: rclone bisync
Use rclone's proven bidirectional sync instead of custom implementation:
**Core Architecture:**
```bash
# rclone bisync handles all the complexity
rclone bisync ~/basic-memory-{tenant_id} basic-memory-{tenant_id}:{bucket_name} \
  --create-empty-src-dirs \
  --conflict-resolve newer \
  --resilient \
  --check-access
```
**Key Benefits:**
- ✅ **Battle-tested**: Production-proven rclone functionality
- ✅ **MIT licensed**: Open source with permissive licensing
- ✅ **No custom code**: Zero maintenance burden for sync logic
- ✅ **Built-in safety**: max-delete protection, conflict resolution
- ✅ **Simple installation**: Works with Homebrew rclone (no FUSE needed)
- ✅ **File watcher compatible**: Works with Obsidian and all applications
- ✅ **Offline support**: Can work offline and sync when connected
### bisync Conflict Resolution Options
**Built-in conflict strategies:**
```bash
--conflict-resolve none     # Keep both files with .conflict suffixes (safest)
--conflict-resolve newer    # Always pick the most recently modified file
--conflict-resolve larger   # Choose based on file size
--conflict-resolve path1    # Always prefer local changes
--conflict-resolve path2    # Always prefer cloud changes
```
### Sync Profiles Using bisync
**Profile configurations:**
```python
BISYNC_PROFILES = {
    "safe": {
        "conflict_resolve": "none",      # Keep both versions
        "max_delete": 10,                # Prevent mass deletion
        "check_access": True,            # Verify sync integrity
        "description": "Safe mode with conflict preservation"
    },
    "balanced": {
        "conflict_resolve": "newer",     # Auto-resolve to newer file
        "max_delete": 25,
        "check_access": True,
        "description": "Balanced mode (recommended default)"
    },
    "fast": {
        "conflict_resolve": "newer",
        "max_delete": 50,
        "check_access": False,           # Skip verification for speed
        "description": "Fast mode for rapid iteration"
    }
}
```
### CLI Commands
**Manual sync commands:**
```bash
basic-memory cloud bisync                    # Manual bidirectional sync
basic-memory cloud bisync --dry-run          # Preview changes
basic-memory cloud bisync --profile safe     # Use specific profile
basic-memory cloud bisync --resync           # Force full baseline resync
```
**Watch mode (Step 1):**
```bash
basic-memory cloud bisync --watch            # Long-running process, sync every 60s
basic-memory cloud bisync --watch --interval 30s  # Custom interval
```
**System integration (Step 2 - Future):**
```bash
basic-memory cloud bisync-service install    # Install as system service
basic-memory cloud bisync-service start      # Start background service
basic-memory cloud bisync-service status     # Check service status
```
### Implementation Strategy
**Phase 4.1.1: Core bisync Implementation**
- [ ] Implement `run_bisync()` function wrapping rclone bisync
- [ ] Add profile-based configuration (safe/balanced/fast)
- [ ] Create conflict resolution and safety options
- [ ] Test with sample files and conflict scenarios
**Phase 4.1.2: Watch Mode**
- [ ] Add `--watch` flag for continuous sync
- [ ] Implement configurable sync intervals
- [ ] Add graceful shutdown and signal handling
- [ ] Create status monitoring and progress indicators
**Phase 4.1.3: User Experience**
- [ ] Add conflict reporting and resolution guidance
- [ ] Implement dry-run preview functionality
- [ ] Create troubleshooting and diagnostic commands
- [ ] Add filtering configuration (.gitignore-style)
**Phase 4.1.4: System Integration (Future)**
- [ ] Generate platform-specific service files (launchd/systemd)
- [ ] Add service management commands
- [ ] Implement automatic startup and recovery
- [ ] Create monitoring and logging integration
### Technical Implementation
**Core bisync wrapper:**
```python
def run_bisync(
    tenant_id: str,
    bucket_name: str,
    profile: str = "balanced",
    dry_run: bool = False
) -> bool:
    """Run rclone bisync with specified profile."""
    local_path = Path.home() / f"basic-memory-{tenant_id}"
    remote_path = f"basic-memory-{tenant_id}:{bucket_name}"
    profile_config = BISYNC_PROFILES[profile]
    cmd = [
        "rclone", "bisync",
        str(local_path), remote_path,
        "--create-empty-src-dirs",
        "--resilient",
        f"--conflict-resolve={profile_config['conflict_resolve']}",
        f"--max-delete={profile_config['max_delete']}",
        "--filters-file", "~/.basic-memory/bisync-filters.txt"
    ]
    if profile_config.get("check_access"):
        cmd.append("--check-access")
    if dry_run:
        cmd.append("--dry-run")
    return subprocess.run(cmd, check=True).returncode == 0
```
**Default filter file (~/.basic-memory/bisync-filters.txt):**
```
- .DS_Store
- .git/**
- __pycache__/**
- *.pyc
- .pytest_cache/**
- node_modules/**
- .conflict-*
- Thumbs.db
- desktop.ini
```
**Advantages Over Custom Daemon:**
- ✅ **Zero maintenance**: No custom sync logic to debug/maintain
- ✅ **Production proven**: Used by thousands in production
- ✅ **Safety features**: Built-in max-delete, conflict handling, recovery
- ✅ **Filtering**: Advanced exclude patterns and rules
- ✅ **Performance**: Optimized for various storage backends
- ✅ **Community support**: Extensive documentation and community
## Phase 4.2: NFS Mount Support (Direct Access)
### Solution: rclone nfsmount
Keep the existing NFS mount functionality for users who prefer direct file access:
**Core Architecture:**
```bash
# rclone nfsmount provides transparent file access
rclone nfsmount basic-memory-{tenant_id}:{bucket_name} ~/basic-memory-{tenant_id} \
  --vfs-cache-mode writes \
  --dir-cache-time 10s \
  --daemon
```
**Key Benefits:**
- ✅ **Real-time access**: Files appear immediately as they're created/modified
- ✅ **Transparent**: Works with any application that reads/writes files
- ✅ **Low latency**: Direct access without sync delays
- ✅ **Simple**: No periodic sync commands needed
- ✅ **Homebrew compatible**: Works with Homebrew rclone (no FUSE required)
**Limitations:**
- ❌ **File watcher compatibility**: Some apps (Obsidian) don't detect changes over NFS
- ❌ **Network dependency**: Requires active connection to cloud storage
- ❌ **Potential conflicts**: Simultaneous edits from multiple locations can cause issues
### Mount Profiles (Existing)
**Already implemented profiles from SPEC-7 testing:**
```python
MOUNT_PROFILES = {
    "fast": {
        "cache_time": "5s",
        "poll_interval": "3s",
        "description": "Ultra-fast development (5s sync)"
    },
    "balanced": {
        "cache_time": "10s",
        "poll_interval": "5s",
        "description": "Fast development (10-15s sync, recommended)"
    },
    "safe": {
        "cache_time": "15s",
        "poll_interval": "10s",
        "description": "Conflict-aware mount with backup",
        "extra_args": ["--conflict-suffix", ".conflict-{DateTimeExt}"]
    }
}
```
### CLI Commands (Existing)
**Mount commands already implemented:**
```bash
basic-memory cloud mount                     # Mount with balanced profile
basic-memory cloud mount --profile fast     # Ultra-fast caching
basic-memory cloud mount --profile safe     # Conflict detection
basic-memory cloud unmount                  # Clean unmount
basic-memory cloud mount-status             # Show mount status
```
## User Choice: Mount vs Bisync
### When to Use Each Approach
| Use Case | Recommended Solution | Why |
|----------|---------------------|-----|
| **Obsidian users** | `bisync` | File watcher support for live preview |
| **CLI/vim/emacs users** | `mount` | Direct file access, lower latency |
| **Offline work** | `bisync` | Can work offline, sync when connected |
| **Real-time collaboration** | `mount` | Immediate visibility of changes |
| **Multiple machines** | `bisync` | Better conflict handling |
| **Single machine** | `mount` | Simpler, more transparent |
| **Development work** | Either | Both work well, user preference |
| **Large files** | `mount` | Streaming access vs full download |
### Installation Simplicity
**Both approaches now use simple Homebrew installation:**
```bash
# Single installation command for both approaches
brew install rclone
# No macFUSE, no system modifications needed
# Works immediately with both mount and bisync
```
### Implementation Status
**Phase 4.1: bisync** (NEW)
- [ ] Implement bisync command wrapper
- [ ] Add watch mode with configurable intervals
- [ ] Create conflict resolution workflows
- [ ] Add filtering and safety options
**Phase 4.2: mount** (EXISTING - ✅ IMPLEMENTED)
- [x] NFS mount commands with profile support
- [x] Mount management and cleanup
- [x] Process monitoring and health checks
- [x] Credential integration with cloud API
**Both approaches share:**
- [x] Credential management via cloud API
- [x] Secure rclone configuration
- [x] Tenant isolation and bucket scoping
- [x] Simple Homebrew rclone installation
Key Features:
1. Cross-Platform rclone Installation (rclone_installer.py):
- macOS: Homebrew → official script fallback
- Linux: snap → apt → official script fallback
- Windows: winget → chocolatey → scoop fallback
- Automatic version detection and verification
2. Smart rclone Configuration (rclone_config.py):
- Automatic tenant-specific config generation
- Three optimized mount profiles from your SPEC-7 testing:
- fast: 5s sync (ultra-performance)
- balanced: 10-15s sync (recommended default)
- safe: 15s sync + conflict detection
- Backup existing configs before modification
3. Robust Mount Management (mount_commands.py):
- Automatic tenant credential generation
- Mount path management (~/basic-memory-{tenant-id})
- Process lifecycle management (prevent duplicate mounts)
- Orphaned process cleanup
- Mount verification and health checking
4. Clean Architecture (api_client.py):
- Separated API client to avoid circular imports
- Reuses existing authentication infrastructure
- Consistent error handling and logging
User Experience:
One-Command Setup:
basic-memory cloud setup
```bash 
# 1. Installs rclone automatically
# 2. Authenticates with existing login
# 3. Generates secure credentials  
# 4. Configures rclone
# 5. Performs initial mount
```
Profile-Based Mounting:
basic-memory cloud mount --profile fast      # 5s sync
basic-memory cloud mount --profile balanced  # 15s sync (default)
basic-memory cloud mount --profile safe      # conflict detection
Status Monitoring:
basic-memory cloud mount-status
```bash 
# Shows: tenant info, mount path, sync profile, rclone processes
```
### local mount api 
Endpoint 1: Get Tenant Info for user
Purpose: Get tenant details for mounting
- pass in jwt
- service returns mount info
**✅ IMPLEMENTED API Specification:**
**Endpoint 1: GET /tenant/mount/info**
- Purpose: Get tenant mount information without exposing credentials
- Authentication: JWT token (tenant_id extracted from claims)
Request:
```
GET /tenant/mount/info
Authorization: Bearer {jwt_token}
```
Response:
```json
{
    "tenant_id": "434252dd-d83b-4b20-bf70-8a950ff875c4",
    "bucket_name": "basic-memory-434252dd",
    "has_credentials": true,
    "credentials_created_at": "2025-09-22T16:48:50.414694"
}
```
**Endpoint 2: POST /tenant/mount/credentials**
- Purpose: Generate NEW bucket-scoped S3 credentials for rclone mounting
- Authentication: JWT token (tenant_id extracted from claims)
- Multi-session: Creates new credentials without revoking existing ones
Request:
```
POST /tenant/mount/credentials
Authorization: Bearer {jwt_token}
Content-Type: application/json
```
*Note: No request body needed - tenant_id extracted from JWT*
Response:
```json
{
    "tenant_id": "434252dd-d83b-4b20-bf70-8a950ff875c4",
    "bucket_name": "basic-memory-434252dd",
    "access_key": "test_access_key_12345",
    "secret_key": "test_secret_key_abcdef",
    "endpoint_url": "https://fly.storage.tigris.dev",
    "region": "auto"
}
```
**🔒 Security Notes:**
- Secret key returned ONCE only - never stored in database
- Credentials are bucket-scoped (cannot access other tenants' buckets)
- Multiple active credentials supported per tenant (work laptop + personal machine)
Implementation Notes
Security:
- Both endpoints require JWT authentication
- Extract tenant_id from JWT claims (not request body)
- Generate scoped credentials (not admin credentials)
- Credentials should have bucket-specific access only
Integration Points:
- Use your existing StorageClient from SPEC-8 implementation
- Leverage existing JWT middleware for tenant extraction
- Return same credential format as your Tigris bucket provisioning
Error Handling:
- 401 if not authenticated
- 403 if tenant doesn't exist
- 500 if credential generation fails
**🔄 Design Decisions:**
1. **Secure Credential Flow (No Secret Storage)**
Based on CLI flow analysis, we follow security best practices:
- ✅ API generates both access_key + secret_key via Tigris IAM
- ✅ Returns both in API response for immediate use
- ✅ CLI uses credentials immediately to configure rclone
- ✅ Database stores only metadata (access_key + policy_arn for cleanup)
- ✅ rclone handles secure local credential storage
- ❌ **Never store secret_key in database (even encrypted)**
2. **CLI Credential Flow**
```bash
# CLI calls API
POST /tenant/mount/credentials → {access_key, secret_key, ...}
# CLI immediately configures rclone
rclone config create basic-memory-{tenant_id} s3 \
  access_key_id={access_key} \
  secret_access_key={secret_key} \
  endpoint=https://fly.storage.tigris.dev
# Database tracks metadata only
INSERT INTO tenant_mount_credentials (tenant_id, access_key, policy_arn, ...)
```
3. **Multiple Sessions Supported**
- Users can have multiple active credential sets (work laptop, personal machine, etc.)
- Each credential generation creates a new Tigris access key
- List active credentials via API (shows access_key but never secret)
4. **Failure Handling & Cleanup**
- **Happy Path**: Credentials created → Used immediately → rclone configured
- **Orphaned Credentials**: Background job revokes unused credentials
- **API Failure Recovery**: Retry Tigris deletion with stored policy_arn
- **Status Tracking**: Track tigris_deletion_status (pending/completed/failed)
5. **Event Sourcing & Audit**
- MountCredentialCreatedEvent
- MountCredentialRevokedEvent
- MountCredentialOrphanedEvent (for cleanup)
- Full audit trail for security compliance
6. **Tenant/Bucket Validation**
- Verify tenant exists and has valid bucket before credential generation
- Use existing StorageClient to validate bucket access
- Prevent credential generation for inactive/invalid tenants
📋 **Implemented API Endpoints:**
```
✅ IMPLEMENTED:
GET    /tenant/mount/info                    # Get tenant/bucket info (no credentials exposed)
POST   /tenant/mount/credentials             # Generate new credentials (returns secret once)
GET    /tenant/mount/credentials             # List active credentials (no secrets)
DELETE /tenant/mount/credentials/{cred_id}   # Revoke specific credentials
```
**API Implementation Status:**
- ✅ **GET /tenant/mount/info**: Returns tenant_id, bucket_name, has_credentials, credentials_created_at
- ✅ **POST /tenant/mount/credentials**: Creates new bucket-scoped access keys, returns access_key + secret_key once
- ✅ **GET /tenant/mount/credentials**: Lists active credentials without exposing secret keys
- ✅ **DELETE /tenant/mount/credentials/{cred_id}**: Revokes specific credentials with proper Tigris IAM cleanup
- ✅ **Multi-session support**: Multiple active credentials per tenant (work laptop + personal machine)
- ✅ **Security**: Secret keys never stored in database, returned once only for immediate use
- ✅ **Comprehensive test suite**: 28 tests covering all scenarios including error handling and multi-session flows
- ✅ **Dependency injection**: Clean integration with existing FastAPI architecture
- ✅ **Production-ready configuration**: Tigris credentials properly configured for tenant machines
🗄️ **Secure Database Schema:**
```sql
CREATE TABLE tenant_mount_credentials (
  id UUID PRIMARY KEY,
  tenant_id UUID REFERENCES tenant(id),
  access_key VARCHAR(255) NOT NULL,
  -- secret_key REMOVED - never store secrets (security best practice)
  policy_arn VARCHAR(255) NOT NULL,           -- For Tigris IAM cleanup
  tigris_deletion_status VARCHAR(20) DEFAULT 'pending',  -- Track cleanup
  created_at TIMESTAMP DEFAULT NOW(),
  updated_at TIMESTAMP DEFAULT NOW(),
  revoked_at TIMESTAMP NULL,
  last_used_at TIMESTAMP NULL,               -- Track usage for orphan cleanup
  description VARCHAR(255) DEFAULT 'CLI mount credentials'
);
```
**Security Benefits:**
- ✅ Database breach cannot expose secrets
- ✅ Follows "secrets don't persist" security principle
- ✅ Meets compliance requirements (SOC2, etc.)
- ✅ Reduced attack surface
- ✅ CLI gets credentials once and stores securely via rclone
```