#
tokens: 48734/50000 27/348 files (page 4/17)
lines: off (toggle) GitHub
raw markdown copy
This is page 4 of 17. Use http://codebase.md/basicmachines-co/basic-memory?lines=false&page={x} to view the full context.

# Directory Structure

```
├── .claude
│   ├── agents
│   │   ├── python-developer.md
│   │   └── system-architect.md
│   └── commands
│       ├── release
│       │   ├── beta.md
│       │   ├── changelog.md
│       │   ├── release-check.md
│       │   └── release.md
│       ├── spec.md
│       └── test-live.md
├── .dockerignore
├── .github
│   ├── dependabot.yml
│   ├── ISSUE_TEMPLATE
│   │   ├── bug_report.md
│   │   ├── config.yml
│   │   ├── documentation.md
│   │   └── feature_request.md
│   └── workflows
│       ├── claude-code-review.yml
│       ├── claude-issue-triage.yml
│       ├── claude.yml
│       ├── dev-release.yml
│       ├── docker.yml
│       ├── pr-title.yml
│       ├── release.yml
│       └── test.yml
├── .gitignore
├── .python-version
├── CHANGELOG.md
├── CITATION.cff
├── CLA.md
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── docker-compose.yml
├── Dockerfile
├── docs
│   ├── ai-assistant-guide-extended.md
│   ├── character-handling.md
│   ├── cloud-cli.md
│   └── Docker.md
├── justfile
├── LICENSE
├── llms-install.md
├── pyproject.toml
├── README.md
├── SECURITY.md
├── smithery.yaml
├── specs
│   ├── SPEC-1 Specification-Driven Development Process.md
│   ├── SPEC-10 Unified Deployment Workflow and Event Tracking.md
│   ├── SPEC-11 Basic Memory API Performance Optimization.md
│   ├── SPEC-12 OpenTelemetry Observability.md
│   ├── SPEC-13 CLI Authentication with Subscription Validation.md
│   ├── SPEC-14 Cloud Git Versioning & GitHub Backup.md
│   ├── SPEC-14- Cloud Git Versioning & GitHub Backup.md
│   ├── SPEC-15 Configuration Persistence via Tigris for Cloud Tenants.md
│   ├── SPEC-16 MCP Cloud Service Consolidation.md
│   ├── SPEC-17 Semantic Search with ChromaDB.md
│   ├── SPEC-18 AI Memory Management Tool.md
│   ├── SPEC-19 Sync Performance and Memory Optimization.md
│   ├── SPEC-2 Slash Commands Reference.md
│   ├── SPEC-3 Agent Definitions.md
│   ├── SPEC-4 Notes Web UI Component Architecture.md
│   ├── SPEC-5 CLI Cloud Upload via WebDAV.md
│   ├── SPEC-6 Explicit Project Parameter Architecture.md
│   ├── SPEC-7 POC to spike Tigris Turso for local access to cloud data.md
│   ├── SPEC-8 TigrisFS Integration.md
│   ├── SPEC-9 Multi-Project Bidirectional Sync Architecture.md
│   ├── SPEC-9 Signed Header Tenant Information.md
│   └── SPEC-9-1 Follow-Ups- Conflict, Sync, and Observability.md
├── src
│   └── basic_memory
│       ├── __init__.py
│       ├── alembic
│       │   ├── alembic.ini
│       │   ├── env.py
│       │   ├── migrations.py
│       │   ├── script.py.mako
│       │   └── versions
│       │       ├── 3dae7c7b1564_initial_schema.py
│       │       ├── 502b60eaa905_remove_required_from_entity_permalink.py
│       │       ├── 5fe1ab1ccebe_add_projects_table.py
│       │       ├── 647e7a75e2cd_project_constraint_fix.py
│       │       ├── 9d9c1cb7d8f5_add_mtime_and_size_columns_to_entity_.py
│       │       ├── a1b2c3d4e5f6_fix_project_foreign_keys.py
│       │       ├── b3c3938bacdb_relation_to_name_unique_index.py
│       │       ├── cc7172b46608_update_search_index_schema.py
│       │       └── e7e1f4367280_add_scan_watermark_tracking_to_project.py
│       ├── api
│       │   ├── __init__.py
│       │   ├── app.py
│       │   ├── routers
│       │   │   ├── __init__.py
│       │   │   ├── directory_router.py
│       │   │   ├── importer_router.py
│       │   │   ├── knowledge_router.py
│       │   │   ├── management_router.py
│       │   │   ├── memory_router.py
│       │   │   ├── project_router.py
│       │   │   ├── prompt_router.py
│       │   │   ├── resource_router.py
│       │   │   ├── search_router.py
│       │   │   └── utils.py
│       │   └── template_loader.py
│       ├── cli
│       │   ├── __init__.py
│       │   ├── app.py
│       │   ├── auth.py
│       │   ├── commands
│       │   │   ├── __init__.py
│       │   │   ├── cloud
│       │   │   │   ├── __init__.py
│       │   │   │   ├── api_client.py
│       │   │   │   ├── bisync_commands.py
│       │   │   │   ├── cloud_utils.py
│       │   │   │   ├── core_commands.py
│       │   │   │   ├── mount_commands.py
│       │   │   │   ├── rclone_config.py
│       │   │   │   ├── rclone_installer.py
│       │   │   │   ├── upload_command.py
│       │   │   │   └── upload.py
│       │   │   ├── command_utils.py
│       │   │   ├── db.py
│       │   │   ├── import_chatgpt.py
│       │   │   ├── import_claude_conversations.py
│       │   │   ├── import_claude_projects.py
│       │   │   ├── import_memory_json.py
│       │   │   ├── mcp.py
│       │   │   ├── project.py
│       │   │   ├── status.py
│       │   │   ├── sync.py
│       │   │   └── tool.py
│       │   └── main.py
│       ├── config.py
│       ├── db.py
│       ├── deps.py
│       ├── file_utils.py
│       ├── ignore_utils.py
│       ├── importers
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── chatgpt_importer.py
│       │   ├── claude_conversations_importer.py
│       │   ├── claude_projects_importer.py
│       │   ├── memory_json_importer.py
│       │   └── utils.py
│       ├── markdown
│       │   ├── __init__.py
│       │   ├── entity_parser.py
│       │   ├── markdown_processor.py
│       │   ├── plugins.py
│       │   ├── schemas.py
│       │   └── utils.py
│       ├── mcp
│       │   ├── __init__.py
│       │   ├── async_client.py
│       │   ├── project_context.py
│       │   ├── prompts
│       │   │   ├── __init__.py
│       │   │   ├── ai_assistant_guide.py
│       │   │   ├── continue_conversation.py
│       │   │   ├── recent_activity.py
│       │   │   ├── search.py
│       │   │   └── utils.py
│       │   ├── resources
│       │   │   ├── ai_assistant_guide.md
│       │   │   └── project_info.py
│       │   ├── server.py
│       │   └── tools
│       │       ├── __init__.py
│       │       ├── build_context.py
│       │       ├── canvas.py
│       │       ├── chatgpt_tools.py
│       │       ├── delete_note.py
│       │       ├── edit_note.py
│       │       ├── list_directory.py
│       │       ├── move_note.py
│       │       ├── project_management.py
│       │       ├── read_content.py
│       │       ├── read_note.py
│       │       ├── recent_activity.py
│       │       ├── search.py
│       │       ├── utils.py
│       │       ├── view_note.py
│       │       └── write_note.py
│       ├── models
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── knowledge.py
│       │   ├── project.py
│       │   └── search.py
│       ├── repository
│       │   ├── __init__.py
│       │   ├── entity_repository.py
│       │   ├── observation_repository.py
│       │   ├── project_info_repository.py
│       │   ├── project_repository.py
│       │   ├── relation_repository.py
│       │   ├── repository.py
│       │   └── search_repository.py
│       ├── schemas
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── cloud.py
│       │   ├── delete.py
│       │   ├── directory.py
│       │   ├── importer.py
│       │   ├── memory.py
│       │   ├── project_info.py
│       │   ├── prompt.py
│       │   ├── request.py
│       │   ├── response.py
│       │   ├── search.py
│       │   └── sync_report.py
│       ├── services
│       │   ├── __init__.py
│       │   ├── context_service.py
│       │   ├── directory_service.py
│       │   ├── entity_service.py
│       │   ├── exceptions.py
│       │   ├── file_service.py
│       │   ├── initialization.py
│       │   ├── link_resolver.py
│       │   ├── project_service.py
│       │   ├── search_service.py
│       │   └── service.py
│       ├── sync
│       │   ├── __init__.py
│       │   ├── background_sync.py
│       │   ├── sync_service.py
│       │   └── watch_service.py
│       ├── templates
│       │   └── prompts
│       │       ├── continue_conversation.hbs
│       │       └── search.hbs
│       └── utils.py
├── test-int
│   ├── BENCHMARKS.md
│   ├── cli
│   │   ├── test_project_commands_integration.py
│   │   ├── test_sync_commands_integration.py
│   │   └── test_version_integration.py
│   ├── conftest.py
│   ├── mcp
│   │   ├── test_build_context_underscore.py
│   │   ├── test_build_context_validation.py
│   │   ├── test_chatgpt_tools_integration.py
│   │   ├── test_default_project_mode_integration.py
│   │   ├── test_delete_note_integration.py
│   │   ├── test_edit_note_integration.py
│   │   ├── test_list_directory_integration.py
│   │   ├── test_move_note_integration.py
│   │   ├── test_project_management_integration.py
│   │   ├── test_project_state_sync_integration.py
│   │   ├── test_read_content_integration.py
│   │   ├── test_read_note_integration.py
│   │   ├── test_search_integration.py
│   │   ├── test_single_project_mcp_integration.py
│   │   └── test_write_note_integration.py
│   ├── test_db_wal_mode.py
│   ├── test_disable_permalinks_integration.py
│   └── test_sync_performance_benchmark.py
├── tests
│   ├── __init__.py
│   ├── api
│   │   ├── conftest.py
│   │   ├── test_async_client.py
│   │   ├── test_continue_conversation_template.py
│   │   ├── test_directory_router.py
│   │   ├── test_importer_router.py
│   │   ├── test_knowledge_router.py
│   │   ├── test_management_router.py
│   │   ├── test_memory_router.py
│   │   ├── test_project_router_operations.py
│   │   ├── test_project_router.py
│   │   ├── test_prompt_router.py
│   │   ├── test_relation_background_resolution.py
│   │   ├── test_resource_router.py
│   │   ├── test_search_router.py
│   │   ├── test_search_template.py
│   │   ├── test_template_loader_helpers.py
│   │   └── test_template_loader.py
│   ├── cli
│   │   ├── conftest.py
│   │   ├── test_bisync_commands.py
│   │   ├── test_cli_tools.py
│   │   ├── test_cloud_authentication.py
│   │   ├── test_cloud_utils.py
│   │   ├── test_ignore_utils.py
│   │   ├── test_import_chatgpt.py
│   │   ├── test_import_claude_conversations.py
│   │   ├── test_import_claude_projects.py
│   │   ├── test_import_memory_json.py
│   │   └── test_upload.py
│   ├── conftest.py
│   ├── db
│   │   └── test_issue_254_foreign_key_constraints.py
│   ├── importers
│   │   ├── test_importer_base.py
│   │   └── test_importer_utils.py
│   ├── markdown
│   │   ├── __init__.py
│   │   ├── test_date_frontmatter_parsing.py
│   │   ├── test_entity_parser_error_handling.py
│   │   ├── test_entity_parser.py
│   │   ├── test_markdown_plugins.py
│   │   ├── test_markdown_processor.py
│   │   ├── test_observation_edge_cases.py
│   │   ├── test_parser_edge_cases.py
│   │   ├── test_relation_edge_cases.py
│   │   └── test_task_detection.py
│   ├── mcp
│   │   ├── conftest.py
│   │   ├── test_obsidian_yaml_formatting.py
│   │   ├── test_permalink_collision_file_overwrite.py
│   │   ├── test_prompts.py
│   │   ├── test_resources.py
│   │   ├── test_tool_build_context.py
│   │   ├── test_tool_canvas.py
│   │   ├── test_tool_delete_note.py
│   │   ├── test_tool_edit_note.py
│   │   ├── test_tool_list_directory.py
│   │   ├── test_tool_move_note.py
│   │   ├── test_tool_read_content.py
│   │   ├── test_tool_read_note.py
│   │   ├── test_tool_recent_activity.py
│   │   ├── test_tool_resource.py
│   │   ├── test_tool_search.py
│   │   ├── test_tool_utils.py
│   │   ├── test_tool_view_note.py
│   │   ├── test_tool_write_note.py
│   │   └── tools
│   │       └── test_chatgpt_tools.py
│   ├── Non-MarkdownFileSupport.pdf
│   ├── repository
│   │   ├── test_entity_repository_upsert.py
│   │   ├── test_entity_repository.py
│   │   ├── test_entity_upsert_issue_187.py
│   │   ├── test_observation_repository.py
│   │   ├── test_project_info_repository.py
│   │   ├── test_project_repository.py
│   │   ├── test_relation_repository.py
│   │   ├── test_repository.py
│   │   ├── test_search_repository_edit_bug_fix.py
│   │   └── test_search_repository.py
│   ├── schemas
│   │   ├── test_base_timeframe_minimum.py
│   │   ├── test_memory_serialization.py
│   │   ├── test_memory_url_validation.py
│   │   ├── test_memory_url.py
│   │   ├── test_schemas.py
│   │   └── test_search.py
│   ├── Screenshot.png
│   ├── services
│   │   ├── test_context_service.py
│   │   ├── test_directory_service.py
│   │   ├── test_entity_service_disable_permalinks.py
│   │   ├── test_entity_service.py
│   │   ├── test_file_service.py
│   │   ├── test_initialization.py
│   │   ├── test_link_resolver.py
│   │   ├── test_project_removal_bug.py
│   │   ├── test_project_service_operations.py
│   │   ├── test_project_service.py
│   │   └── test_search_service.py
│   ├── sync
│   │   ├── test_character_conflicts.py
│   │   ├── test_sync_service_incremental.py
│   │   ├── test_sync_service.py
│   │   ├── test_sync_wikilink_issue.py
│   │   ├── test_tmp_files.py
│   │   ├── test_watch_service_edge_cases.py
│   │   ├── test_watch_service_reload.py
│   │   └── test_watch_service.py
│   ├── test_config.py
│   ├── test_db_migration_deduplication.py
│   ├── test_deps.py
│   ├── test_production_cascade_delete.py
│   └── utils
│       ├── test_file_utils.py
│       ├── test_frontmatter_obsidian_compatible.py
│       ├── test_parse_tags.py
│       ├── test_permalink_formatting.py
│       ├── test_utf8_handling.py
│       └── test_validate_project_path.py
├── uv.lock
├── v0.15.0-RELEASE-DOCS.md
└── v15-docs
    ├── api-performance.md
    ├── background-relations.md
    ├── basic-memory-home.md
    ├── bug-fixes.md
    ├── chatgpt-integration.md
    ├── cloud-authentication.md
    ├── cloud-bisync.md
    ├── cloud-mode-usage.md
    ├── cloud-mount.md
    ├── default-project-mode.md
    ├── env-file-removal.md
    ├── env-var-overrides.md
    ├── explicit-project-parameter.md
    ├── gitignore-integration.md
    ├── project-root-env-var.md
    ├── README.md
    └── sqlite-performance.md
```

# Files

--------------------------------------------------------------------------------
/src/basic_memory/services/initialization.py:
--------------------------------------------------------------------------------

```python
"""Shared initialization service for Basic Memory.

This module provides shared initialization functions used by both CLI and API
to ensure consistent application startup across all entry points.
"""

import asyncio
from pathlib import Path

from loguru import logger

from basic_memory import db
from basic_memory.config import BasicMemoryConfig
from basic_memory.models import Project
from basic_memory.repository import (
    ProjectRepository,
)


async def initialize_database(app_config: BasicMemoryConfig) -> None:
    """Initialize database with migrations handled automatically by get_or_create_db.

    Args:
        app_config: The Basic Memory project configuration

    Note:
        Database migrations are now handled automatically when the database
        connection is first established via get_or_create_db().
    """
    # Trigger database initialization and migrations by getting the database connection
    try:
        await db.get_or_create_db(app_config.database_path)
        logger.info("Database initialization completed")
    except Exception as e:
        logger.error(f"Error initializing database: {e}")
        # Allow application to continue - it might still work
        # depending on what the error was, and will fail with a
        # more specific error if the database is actually unusable


async def reconcile_projects_with_config(app_config: BasicMemoryConfig):
    """Ensure all projects in config.json exist in the projects table and vice versa.

    This uses the ProjectService's synchronize_projects method to ensure bidirectional
    synchronization between the configuration file and the database.

    Args:
        app_config: The Basic Memory application configuration
    """
    logger.info("Reconciling projects from config with database...")

    # Get database session - migrations handled centrally
    _, session_maker = await db.get_or_create_db(
        db_path=app_config.database_path,
        db_type=db.DatabaseType.FILESYSTEM,
        ensure_migrations=False,
    )
    project_repository = ProjectRepository(session_maker)

    # Import ProjectService here to avoid circular imports
    from basic_memory.services.project_service import ProjectService

    try:
        # Create project service and synchronize projects
        project_service = ProjectService(repository=project_repository)
        await project_service.synchronize_projects()
        logger.info("Projects successfully reconciled between config and database")
    except Exception as e:
        # Log the error but continue with initialization
        logger.error(f"Error during project synchronization: {e}")
        logger.info("Continuing with initialization despite synchronization error")


async def initialize_file_sync(
    app_config: BasicMemoryConfig,
):
    """Initialize file synchronization services. This function starts the watch service and does not return

    Args:
        app_config: The Basic Memory project configuration

    Returns:
        The watch service task that's monitoring file changes
    """

    # delay import
    from basic_memory.sync import WatchService

    # Load app configuration - migrations handled centrally
    _, session_maker = await db.get_or_create_db(
        db_path=app_config.database_path,
        db_type=db.DatabaseType.FILESYSTEM,
        ensure_migrations=False,
    )
    project_repository = ProjectRepository(session_maker)

    # Initialize watch service
    watch_service = WatchService(
        app_config=app_config,
        project_repository=project_repository,
        quiet=True,
    )

    # Get active projects
    active_projects = await project_repository.get_active_projects()

    # Start sync for all projects as background tasks (non-blocking)
    async def sync_project_background(project: Project):
        """Sync a single project in the background."""
        # avoid circular imports
        from basic_memory.sync.sync_service import get_sync_service

        logger.info(f"Starting background sync for project: {project.name}")
        try:
            # Create sync service
            sync_service = await get_sync_service(project)

            sync_dir = Path(project.path)
            await sync_service.sync(sync_dir, project_name=project.name)
            logger.info(f"Background sync completed successfully for project: {project.name}")
        except Exception as e:  # pragma: no cover
            logger.error(f"Error in background sync for project {project.name}: {e}")

    # Create background tasks for all project syncs (non-blocking)
    sync_tasks = [
        asyncio.create_task(sync_project_background(project)) for project in active_projects
    ]
    logger.info(f"Created {len(sync_tasks)} background sync tasks")

    # Don't await the tasks - let them run in background while we continue

    # Then start the watch service in the background
    logger.info("Starting watch service for all projects")
    # run the watch service
    try:
        await watch_service.run()
        logger.info("Watch service started")
    except Exception as e:  # pragma: no cover
        logger.error(f"Error starting watch service: {e}")

    return None


async def initialize_app(
    app_config: BasicMemoryConfig,
):
    """Initialize the Basic Memory application.

    This function handles all initialization steps:
    - Running database migrations
    - Reconciling projects from config.json with projects table
    - Setting up file synchronization
    - Starting background migration for legacy project data

    Args:
        app_config: The Basic Memory project configuration
    """
    logger.info("Initializing app...")
    # Initialize database first
    await initialize_database(app_config)

    # Reconcile projects from config.json with projects table
    await reconcile_projects_with_config(app_config)

    logger.info("App initialization completed (migration running in background if needed)")


def ensure_initialization(app_config: BasicMemoryConfig) -> None:
    """Ensure initialization runs in a synchronous context.

    This is a wrapper for the async initialize_app function that can be
    called from synchronous code like CLI entry points.

    No-op if app_config.cloud_mode == True. Cloud basic memory manages it's own projects

    Args:
        app_config: The Basic Memory project configuration
    """
    # Skip initialization in cloud mode - cloud manages its own projects
    if app_config.cloud_mode_enabled:
        logger.debug("Skipping initialization in cloud mode - projects managed by cloud")
        return

    try:
        result = asyncio.run(initialize_app(app_config))
        logger.info(f"Initialization completed successfully: result={result}")
    except Exception as e:  # pragma: no cover
        logger.exception(f"Error during initialization: {e}")
        # Continue execution even if initialization fails
        # The command might still work, or will fail with a
        # more specific error message

```

--------------------------------------------------------------------------------
/specs/SPEC-9 Signed Header Tenant Information.md:
--------------------------------------------------------------------------------

```markdown
---
title: 'SPEC-9: Signed Header Tenant Information'
type: spec
permalink: specs/spec-9-signed-header-tenant-information
tags:
- authentication
- tenant-isolation
- proxy
- security
- mcp
---

# SPEC-9: Signed Header Tenant Information

## Why

WorkOS JWT templates don't work with MCP's dynamic client registration requirement, preventing us from getting tenant information directly in JWT tokens. We need an alternative secure method to pass tenant context from the Cloud Proxy Service to tenant instances.

**Problem Context:**
- MCP spec requires dynamic client registration
- WorkOS JWT templates only apply to statically configured clients
- Without tenant information, we can't properly route requests or isolate tenant data
- Current JWT tokens only contain standard OIDC claims (sub, email, etc.)

**Affected Areas:**
- Cloud Proxy Service (`apps/cloud`) - request forwarding
- Tenant API instances (`apps/api`) - tenant context validation
- MCP Gateway (`apps/mcp`) - authentication flow
- Overall tenant isolation security model

## What

Implement HMAC-signed headers that the Cloud Proxy Service adds when forwarding requests to tenant instances. This provides secure, tamper-proof tenant information without relying on JWT custom claims.

**Components:**
- Header signing utility in Cloud Proxy Service
- Header validation middleware in Tenant API instances
- Shared secret configuration across services
- Fallback mechanisms for development and error cases

## How (High Level)

### 1. Header Format
Add these signed headers to all proxied requests:
```
X-BM-Tenant-ID: {tenant_id}
X-BM-Timestamp: {unix_timestamp}
X-BM-Signature: {hmac_sha256_signature}
```

### 2. Signature Algorithm
```python
# Canonical message format
message = f"{tenant_id}:{timestamp}"

# HMAC-SHA256 signature
signature = hmac.new(
    key=shared_secret.encode('utf-8'),
    msg=message.encode('utf-8'),
    digestmod=hashlib.sha256
).hexdigest()
```

### 3. Implementation Flow

#### Cloud Proxy Service (`apps/cloud`)
1. Extract `tenant_id` from authenticated user profile
2. Generate timestamp and canonical message
3. Sign message with shared secret
4. Add headers to request before forwarding to tenant instance

#### Tenant API Instances (`apps/api`)
1. Middleware validates headers on all incoming requests
2. Extract tenant_id, timestamp from headers
3. Verify timestamp is within acceptable window (5 minutes)
4. Recompute signature and compare in constant time
5. If valid, make tenant context available to Basic Memory tools

### 4. Security Properties
- **Authenticity**: Only services with shared secret can create valid signatures
- **Integrity**: Header tampering invalidates signature
- **Replay Protection**: Timestamp prevents reuse of old signatures
- **Non-repudiation**: Each request is cryptographically tied to specific tenant

### 5. Configuration
```bash
# Shared across Cloud Proxy and Tenant instances
BM_TENANT_HEADER_SECRET=randomly-generated-256-bit-secret

# Tenant API configuration
BM_TENANT_HEADER_VALIDATION=true  # true (production) | false (dev only)
```

## How to Evaluate

### Unit Tests
- [ ] Header signing utility generates correct signatures
- [ ] Header validation correctly accepts/rejects signatures
- [ ] Timestamp validation within acceptable windows
- [ ] Constant-time signature comparison prevents timing attacks

### Integration Tests
- [ ] End-to-end request flow from MCP client → proxy → tenant
- [ ] Tenant isolation verified with signed headers
- [ ] Error handling for missing/invalid headers
- [ ] Disabled validation in development environment

### Security Validation
- [ ] Shared secret rotation procedure
- [ ] Header tampering detection
- [ ] Clock skew tolerance testing
- [ ] Performance impact measurement

### Production Readiness
- [ ] Logging and monitoring of header validation
- [ ] Graceful degradation for header validation failures
- [ ] Documentation for secret management
- [ ] Deployment configuration templates

## Implementation Notes

### Shared Secret Management
- Generate cryptographically secure 256-bit secret
- Same secret deployed to Cloud Proxy and all Tenant instances
- Consider secret rotation strategy for production

### Error Handling
```python
# Strict mode (production)
if not validate_headers(request):
    raise HTTPException(status_code=401, detail="Invalid tenant headers")

# Fallback mode (development)
if not validate_headers(request):
    logger.warning("Invalid headers, falling back to default tenant")
    tenant_id = "default"
```

### Performance Considerations
- HMAC-SHA256 computation is fast (~microseconds)
- Headers add ~200 bytes to each request
- Validation happens once per request in middleware

## Benefits

✅ **Works with MCP dynamic client registration** - No dependency on JWT custom claims
✅ **Simple and reliable** - Standard HMAC signature approach
✅ **Secure by design** - Cryptographic authenticity and integrity
✅ **Infrastructure controlled** - No external service dependencies
✅ **Easy to implement** - Clear signature algorithm and validation

## Trade-offs

⚠️ **Shared secret management** - Need secure distribution and rotation
⚠️ **Clock synchronization** - Timestamp validation requires reasonably synced clocks
⚠️ **Header visibility** - Headers visible in logs (tenant_id not sensitive)
⚠️ **Additional complexity** - More moving parts in proxy forwarding

## Implementation Tasks

### Cloud Service (Header Signing)
- [ ] Create `utils/header_signing.py` with HMAC-SHA256 signing function
- [ ] Add `bm_tenant_header_secret` to Cloud service configuration
- [ ] Update `ProxyService.forward_request()` to call signing utility
- [ ] Add signed headers (X-BM-Tenant-ID, X-BM-Timestamp, X-BM-Signature)

### Tenant API (Header Validation)
- [ ] Create `utils/header_validation.py` with signature verification
- [ ] Add `bm_tenant_header_secret` to API service configuration
- [ ] Create `TenantHeaderValidationMiddleware` class
- [ ] Add middleware to FastAPI app (before other middleware)
- [ ] Skip validation for `/health` endpoint
- [ ] Store validated tenant_id in request.state

### Testing
- [ ] Unit test for header signing utility
- [ ] Unit test for header validation utility
- [ ] Integration test for proxy → tenant flow
- [ ] Test invalid/missing header handling
- [ ] Test timestamp window validation
- [ ] Test signature tampering detection

### Configuration & Deployment
- [ ] Update `.env.example` with BM_TENANT_HEADER_SECRET
- [ ] Generate secure 256-bit secret for production
- [ ] Update Fly.io secrets for both services
- [ ] Document secret rotation procedure

## Status

- [x] **Specification Complete** - Design finalized and documented
- [ ] **Implementation Started** - Header signing utility development
- [ ] **Cloud Proxy Updated** - ProxyService adds signed headers
- [ ] **Tenant Validation Added** - Middleware validates headers
- [ ] **Testing Complete** - All validation criteria met
- [ ] **Production Deployed** - Live with tenant isolation via headers
```

--------------------------------------------------------------------------------
/src/basic_memory/cli/commands/cloud/upload.py:
--------------------------------------------------------------------------------

```python
"""WebDAV upload functionality for basic-memory projects."""

import os
from pathlib import Path

import aiofiles
import httpx

from basic_memory.ignore_utils import load_gitignore_patterns, should_ignore_path
from basic_memory.mcp.async_client import get_client
from basic_memory.mcp.tools.utils import call_put


async def upload_path(
    local_path: Path,
    project_name: str,
    verbose: bool = False,
    use_gitignore: bool = True,
    dry_run: bool = False,
) -> bool:
    """
    Upload a file or directory to cloud project via WebDAV.

    Args:
        local_path: Path to local file or directory
        project_name: Name of cloud project (destination)
        verbose: Show detailed information about filtering and upload
        use_gitignore: If False, skip .gitignore patterns (still use .bmignore)
        dry_run: If True, show what would be uploaded without uploading

    Returns:
        True if upload succeeded, False otherwise
    """
    try:
        # Resolve path
        local_path = local_path.resolve()

        # Check if path exists
        if not local_path.exists():
            print(f"Error: Path does not exist: {local_path}")
            return False

        # Get files to upload
        if local_path.is_file():
            files_to_upload = [(local_path, local_path.name)]
            if verbose:
                print(f"Uploading single file: {local_path.name}")
        else:
            files_to_upload = _get_files_to_upload(local_path, verbose, use_gitignore)

        if not files_to_upload:
            print("No files found to upload")
            if verbose:
                print(
                    "\nTip: Use --verbose to see which files are being filtered, "
                    "or --no-gitignore to skip .gitignore patterns"
                )
            return True

        print(f"Found {len(files_to_upload)} file(s) to upload")

        # Calculate total size
        total_bytes = sum(file_path.stat().st_size for file_path, _ in files_to_upload)

        # If dry run, just show what would be uploaded
        if dry_run:
            print("\nFiles that would be uploaded:")
            for file_path, relative_path in files_to_upload:
                size = file_path.stat().st_size
                if size < 1024:
                    size_str = f"{size} bytes"
                elif size < 1024 * 1024:
                    size_str = f"{size / 1024:.1f} KB"
                else:
                    size_str = f"{size / (1024 * 1024):.1f} MB"
                print(f"  {relative_path} ({size_str})")
        else:
            # Upload files using httpx
            async with get_client() as client:
                for i, (file_path, relative_path) in enumerate(files_to_upload, 1):
                    # Build remote path: /webdav/{project_name}/{relative_path}
                    remote_path = f"/webdav/{project_name}/{relative_path}"
                    print(f"Uploading {relative_path} ({i}/{len(files_to_upload)})")

                    # Get file modification time
                    file_stat = file_path.stat()
                    mtime = int(file_stat.st_mtime)

                    # Read file content asynchronously
                    async with aiofiles.open(file_path, "rb") as f:
                        content = await f.read()

                    # Upload via HTTP PUT to WebDAV endpoint with mtime header
                    # Using X-OC-Mtime (ownCloud/Nextcloud standard)
                    response = await call_put(
                        client, remote_path, content=content, headers={"X-OC-Mtime": str(mtime)}
                    )
                    response.raise_for_status()

        # Format total size based on magnitude
        if total_bytes < 1024:
            size_str = f"{total_bytes} bytes"
        elif total_bytes < 1024 * 1024:
            size_str = f"{total_bytes / 1024:.1f} KB"
        else:
            size_str = f"{total_bytes / (1024 * 1024):.1f} MB"

        if dry_run:
            print(f"\nTotal: {len(files_to_upload)} file(s) ({size_str})")
        else:
            print(f"✓ Upload complete: {len(files_to_upload)} file(s) ({size_str})")

        return True

    except httpx.HTTPStatusError as e:
        print(f"Upload failed: HTTP {e.response.status_code} - {e.response.text}")
        return False
    except Exception as e:
        print(f"Upload failed: {e}")
        return False


def _get_files_to_upload(
    directory: Path, verbose: bool = False, use_gitignore: bool = True
) -> list[tuple[Path, str]]:
    """
    Get list of files to upload from directory.

    Uses .bmignore and optionally .gitignore patterns for filtering.

    Args:
        directory: Directory to scan
        verbose: Show detailed filtering information
        use_gitignore: If False, skip .gitignore patterns (still use .bmignore)

    Returns:
        List of (absolute_path, relative_path) tuples
    """
    files = []
    ignored_files = []

    # Load ignore patterns from .bmignore and optionally .gitignore
    ignore_patterns = load_gitignore_patterns(directory, use_gitignore=use_gitignore)

    if verbose:
        gitignore_path = directory / ".gitignore"
        gitignore_exists = gitignore_path.exists() and use_gitignore
        print(f"\nScanning directory: {directory}")
        print("Using .bmignore: Yes")
        print(f"Using .gitignore: {'Yes' if gitignore_exists else 'No'}")
        print(f"Ignore patterns loaded: {len(ignore_patterns)}")
        if ignore_patterns and len(ignore_patterns) <= 20:
            print(f"Patterns: {', '.join(sorted(ignore_patterns))}")
        print()

    # Walk through directory
    for root, dirs, filenames in os.walk(directory):
        root_path = Path(root)

        # Filter directories based on ignore patterns
        filtered_dirs = []
        for d in dirs:
            dir_path = root_path / d
            if should_ignore_path(dir_path, directory, ignore_patterns):
                if verbose:
                    rel_path = dir_path.relative_to(directory)
                    print(f"  [IGNORED DIR] {rel_path}/")
            else:
                filtered_dirs.append(d)
        dirs[:] = filtered_dirs

        # Process files
        for filename in filenames:
            file_path = root_path / filename

            # Calculate relative path for display/remote
            rel_path = file_path.relative_to(directory)
            remote_path = str(rel_path).replace("\\", "/")

            # Check if file should be ignored
            if should_ignore_path(file_path, directory, ignore_patterns):
                ignored_files.append(remote_path)
                if verbose:
                    print(f"  [IGNORED] {remote_path}")
                continue

            if verbose:
                print(f"  [INCLUDE] {remote_path}")

            files.append((file_path, remote_path))

    if verbose:
        print("\nSummary:")
        print(f"  Files to upload: {len(files)}")
        print(f"  Files ignored: {len(ignored_files)}")

    return files

```

--------------------------------------------------------------------------------
/test-int/mcp/test_build_context_validation.py:
--------------------------------------------------------------------------------

```python
"""Integration tests for build_context memory URL validation."""

import pytest
from fastmcp import Client


@pytest.mark.asyncio
async def test_build_context_valid_urls(mcp_server, app, test_project):
    """Test that build_context works with valid memory URLs."""

    async with Client(mcp_server) as client:
        # Create a test note to ensure we have something to find
        await client.call_tool(
            "write_note",
            {
                "project": test_project.name,
                "title": "URL Validation Test",
                "folder": "testing",
                "content": "# URL Validation Test\n\nThis note tests URL validation.",
                "tags": "test,validation",
            },
        )

        # Test various valid URL formats
        valid_urls = [
            "memory://testing/url-validation-test",  # Full memory URL
            "testing/url-validation-test",  # Relative path
            "testing/*",  # Pattern matching
        ]

        for url in valid_urls:
            result = await client.call_tool(
                "build_context", {"project": test_project.name, "url": url}
            )

            # Should return a valid GraphContext response
            assert len(result.content) == 1
            response = result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]
            assert '"results"' in response  # Should contain results structure
            assert '"metadata"' in response  # Should contain metadata


@pytest.mark.asyncio
async def test_build_context_invalid_urls_fail_validation(mcp_server, app, test_project):
    """Test that build_context properly validates and rejects invalid memory URLs."""

    async with Client(mcp_server) as client:
        # Test cases: (invalid_url, expected_error_fragment)
        invalid_test_cases = [
            ("memory//test", "double slashes"),
            ("invalid://test", "protocol scheme"),
            ("notes<brackets>", "invalid characters"),
            ('notes"quotes"', "invalid characters"),
        ]

        for invalid_url, expected_error in invalid_test_cases:
            with pytest.raises(Exception) as exc_info:
                await client.call_tool(
                    "build_context", {"project": test_project.name, "url": invalid_url}
                )

            error_message = str(exc_info.value).lower()
            assert expected_error in error_message, (
                f"URL '{invalid_url}' should fail with '{expected_error}' error"
            )


@pytest.mark.asyncio
async def test_build_context_empty_urls_fail_validation(mcp_server, app, test_project):
    """Test that empty or whitespace-only URLs fail validation."""

    async with Client(mcp_server) as client:
        # These should fail validation
        empty_urls = [
            "",  # Empty string
            "   ",  # Whitespace only
        ]

        for empty_url in empty_urls:
            with pytest.raises(Exception) as exc_info:
                await client.call_tool(
                    "build_context", {"project": test_project.name, "url": empty_url}
                )

            error_message = str(exc_info.value)
            # Should fail with validation error
            assert (
                "cannot be empty" in error_message
                or "empty or whitespace" in error_message
                or "value_error" in error_message
                or "should be non-empty" in error_message
            )


@pytest.mark.asyncio
async def test_build_context_nonexistent_urls_return_empty_results(mcp_server, app, test_project):
    """Test that valid but nonexistent URLs return empty results (not errors)."""

    async with Client(mcp_server) as client:
        # These are valid URL formats but don't exist in the system
        nonexistent_valid_urls = [
            "memory://nonexistent/note",
            "nonexistent/note",
            "missing/*",
        ]

        for url in nonexistent_valid_urls:
            result = await client.call_tool(
                "build_context", {"project": test_project.name, "url": url}
            )

            # Should return valid response with empty results
            assert len(result.content) == 1
            response = result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]
            assert '"results":[]' in response  # Empty results
            assert '"total_results":0' in response  # Zero count
            assert '"metadata"' in response  # But should have metadata


@pytest.mark.asyncio
async def test_build_context_error_messages_are_helpful(mcp_server, app, test_project):
    """Test that validation error messages provide helpful guidance."""

    async with Client(mcp_server) as client:
        # Test double slash error message
        with pytest.raises(Exception) as exc_info:
            await client.call_tool(
                "build_context", {"project": test_project.name, "url": "memory//bad"}
            )

        error_msg = str(exc_info.value).lower()
        # Should contain validation error info
        assert (
            "double slashes" in error_msg
            or "value_error" in error_msg
            or "validation error" in error_msg
        )

        # Test protocol scheme error message
        with pytest.raises(Exception) as exc_info:
            await client.call_tool(
                "build_context", {"project": test_project.name, "url": "http://example.com"}
            )

        error_msg = str(exc_info.value).lower()
        assert (
            "protocol scheme" in error_msg
            or "protocol" in error_msg
            or "value_error" in error_msg
            or "validation error" in error_msg
        )


@pytest.mark.asyncio
async def test_build_context_pattern_matching_works(mcp_server, app, test_project):
    """Test that valid pattern matching URLs work correctly."""

    async with Client(mcp_server) as client:
        # Create multiple test notes
        test_notes = [
            ("Pattern Test One", "patterns", "# Pattern Test One\n\nFirst pattern test."),
            ("Pattern Test Two", "patterns", "# Pattern Test Two\n\nSecond pattern test."),
            ("Other Note", "other", "# Other Note\n\nNot a pattern match."),
        ]

        for title, folder, content in test_notes:
            await client.call_tool(
                "write_note",
                {
                    "project": test_project.name,
                    "title": title,
                    "folder": folder,
                    "content": content,
                },
            )

        # Test pattern matching
        result = await client.call_tool(
            "build_context", {"project": test_project.name, "url": "patterns/*"}
        )

        assert len(result.content) == 1
        response = result.content[0].text  # pyright: ignore [reportAttributeAccessIssue]

        # Should find the pattern matches but not the other note
        assert '"total_results":2' in response or '"primary_count":2' in response
        assert "Pattern Test" in response
        assert "Other Note" not in response

```

--------------------------------------------------------------------------------
/tests/mcp/test_tool_resource.py:
--------------------------------------------------------------------------------

```python
"""Tests for resource tools that exercise the full stack with SQLite."""

import io
import base64
from PIL import Image as PILImage

import pytest
from mcp.server.fastmcp.exceptions import ToolError

from basic_memory.mcp.tools import read_content, write_note
from basic_memory.mcp.tools.read_content import (
    calculate_target_params,
    resize_image,
    optimize_image,
)


@pytest.mark.asyncio
async def test_read_file_text_file(app, synced_files, test_project):
    """Test reading a text file.

    Should:
    - Correctly identify text content
    - Return the content as text
    - Include correct metadata
    """
    # First create a text file via notes
    result = await write_note.fn(
        project=test_project.name,
        title="Text Resource",
        folder="test",
        content="This is a test text resource",
        tags=["test", "resource"],
    )
    assert result is not None

    # Now read it as a resource
    response = await read_content.fn("test/text-resource", project=test_project.name)

    assert response["type"] == "text"
    assert "This is a test text resource" in response["text"]
    assert response["content_type"].startswith("text/")
    assert response["encoding"] == "utf-8"


@pytest.mark.asyncio
async def test_read_content_file_path(app, synced_files, test_project):
    """Test reading a text file.

    Should:
    - Correctly identify text content
    - Return the content as text
    - Include correct metadata
    """
    # First create a text file via notes
    result = await write_note.fn(
        project=test_project.name,
        title="Text Resource",
        folder="test",
        content="This is a test text resource",
        tags=["test", "resource"],
    )
    assert result is not None

    # Now read it as a resource
    response = await read_content.fn("test/Text Resource.md", project=test_project.name)

    assert response["type"] == "text"
    assert "This is a test text resource" in response["text"]
    assert response["content_type"].startswith("text/")
    assert response["encoding"] == "utf-8"


@pytest.mark.asyncio
async def test_read_file_image_file(app, synced_files, test_project):
    """Test reading an image file.

    Should:
    - Correctly identify image content
    - Optimize the image
    - Return base64 encoded image data
    """
    # Get the path to the synced image file
    image_path = synced_files["image"].name

    # Read it as a resource
    response = await read_content.fn(image_path, project=test_project.name)

    assert response["type"] == "image"
    assert response["source"]["type"] == "base64"
    assert response["source"]["media_type"] == "image/jpeg"

    # Verify the image data is valid base64 that can be decoded
    img_data = base64.b64decode(response["source"]["data"])
    assert len(img_data) > 0

    # Should be able to open as an image
    img = PILImage.open(io.BytesIO(img_data))
    assert img.width > 0
    assert img.height > 0


@pytest.mark.asyncio
async def test_read_file_pdf_file(app, synced_files, test_project):
    """Test reading a PDF file.

    Should:
    - Correctly identify PDF content
    - Return base64 encoded PDF data
    """
    # Get the path to the synced PDF file
    pdf_path = synced_files["pdf"].name

    # Read it as a resource
    response = await read_content.fn(pdf_path, project=test_project.name)

    assert response["type"] == "document"
    assert response["source"]["type"] == "base64"
    assert response["source"]["media_type"] == "application/pdf"

    # Verify the PDF data is valid base64 that can be decoded
    pdf_data = base64.b64decode(response["source"]["data"])
    assert len(pdf_data) > 0
    assert pdf_data.startswith(b"%PDF")  # PDF signature


@pytest.mark.asyncio
async def test_read_file_not_found(app, test_project):
    """Test trying to read a non-existent"""
    with pytest.raises(ToolError, match="Resource not found"):
        await read_content.fn("does-not-exist", project=test_project.name)


@pytest.mark.asyncio
async def test_read_file_memory_url(app, synced_files, test_project):
    """Test reading a resource using a memory:// URL."""
    # Create a text file via notes
    await write_note.fn(
        project=test_project.name,
        title="Memory URL Test",
        folder="test",
        content="Testing memory:// URL handling for resources",
    )

    # Read it with a memory:// URL
    memory_url = "memory://test/memory-url-test"
    response = await read_content.fn(memory_url, project=test_project.name)

    assert response["type"] == "text"
    assert "Testing memory:// URL handling for resources" in response["text"]


@pytest.mark.asyncio
async def test_image_optimization_functions(app):
    """Test the image optimization helper functions."""
    # Create a test image
    img = PILImage.new("RGB", (1000, 800), color="white")

    # Test calculate_target_params function
    # Small image
    quality, size = calculate_target_params(100000)
    assert quality == 70
    assert size == 1000

    # Medium image
    quality, size = calculate_target_params(800000)
    assert quality == 60
    assert size == 800

    # Large image
    quality, size = calculate_target_params(2000000)
    assert quality == 50
    assert size == 600

    # Test resize_image function
    # Image that needs resizing
    resized = resize_image(img, 500)
    assert resized.width <= 500
    assert resized.height <= 500

    # Image that doesn't need resizing
    small_img = PILImage.new("RGB", (300, 200), color="white")
    resized = resize_image(small_img, 500)
    assert resized.width == 300
    assert resized.height == 200

    # Test optimize_image function
    img_bytes = io.BytesIO()
    img.save(img_bytes, format="PNG")
    img_bytes.seek(0)
    content_length = len(img_bytes.getvalue())

    # In a small test image, optimization might make the image larger
    # because of JPEG overhead. Let's just test that it returns something
    optimized = optimize_image(img, content_length)
    assert len(optimized) > 0


@pytest.mark.asyncio
async def test_image_conversion(app, synced_files, test_project):
    """Test reading an image and verify conversion works.

    Should:
    - Handle image content correctly
    - Return optimized image data
    """
    # Use the synced image file that's already part of our test fixtures
    image_path = synced_files["image"].name

    # Test reading the resource
    response = await read_content.fn(image_path, project=test_project.name)

    assert response["type"] == "image"
    assert response["source"]["media_type"] == "image/jpeg"

    # Verify the image data is valid
    img_data = base64.b64decode(response["source"]["data"])
    img = PILImage.open(io.BytesIO(img_data))
    assert img.width > 0
    assert img.height > 0
    assert img.mode == "RGB"  # Should be in RGB mode


# Skip testing the large document size handling since it would require
# complex mocking of internal logic. We've already tested the happy path
# with the PDF file, and the error handling with our updated tool_utils tests.
# We have 100% coverage of this code in read_file.py according to the coverage report.

```

--------------------------------------------------------------------------------
/tests/services/test_entity_service_disable_permalinks.py:
--------------------------------------------------------------------------------

```python
"""Tests for EntityService with disable_permalinks flag."""

from textwrap import dedent
import pytest
import yaml

from basic_memory.config import BasicMemoryConfig
from basic_memory.schemas import Entity as EntitySchema
from basic_memory.services import FileService
from basic_memory.services.entity_service import EntityService


@pytest.mark.asyncio
async def test_create_entity_with_permalinks_disabled(
    entity_repository,
    observation_repository,
    relation_repository,
    entity_parser,
    file_service: FileService,
    link_resolver,
):
    """Test that entities created with disable_permalinks=True don't have permalinks."""
    # Create entity service with permalinks disabled
    app_config = BasicMemoryConfig(disable_permalinks=True)
    entity_service = EntityService(
        entity_parser=entity_parser,
        entity_repository=entity_repository,
        observation_repository=observation_repository,
        relation_repository=relation_repository,
        file_service=file_service,
        link_resolver=link_resolver,
        app_config=app_config,
    )

    entity_data = EntitySchema(
        title="Test Entity",
        folder="test",
        entity_type="note",
        content="Test content",
    )

    # Create entity
    entity = await entity_service.create_entity(entity_data)

    # Assert entity has no permalink
    assert entity.permalink is None

    # Verify file frontmatter doesn't contain permalink
    file_path = file_service.get_entity_path(entity)
    file_content, _ = await file_service.read_file(file_path)
    _, frontmatter, doc_content = file_content.split("---", 2)
    metadata = yaml.safe_load(frontmatter)

    assert "permalink" not in metadata
    assert metadata["title"] == "Test Entity"
    assert metadata["type"] == "note"


@pytest.mark.asyncio
async def test_update_entity_with_permalinks_disabled(
    entity_repository,
    observation_repository,
    relation_repository,
    entity_parser,
    file_service: FileService,
    link_resolver,
):
    """Test that entities updated with disable_permalinks=True don't get permalinks added."""
    # First create with permalinks enabled
    app_config_enabled = BasicMemoryConfig(disable_permalinks=False)
    entity_service_enabled = EntityService(
        entity_parser=entity_parser,
        entity_repository=entity_repository,
        observation_repository=observation_repository,
        relation_repository=relation_repository,
        file_service=file_service,
        link_resolver=link_resolver,
        app_config=app_config_enabled,
    )

    entity_data = EntitySchema(
        title="Test Entity",
        folder="test",
        entity_type="note",
        content="Original content",
    )

    # Create entity with permalinks enabled
    entity = await entity_service_enabled.create_entity(entity_data)
    assert entity.permalink is not None
    original_permalink = entity.permalink

    # Now create service with permalinks disabled
    app_config_disabled = BasicMemoryConfig(disable_permalinks=True)
    entity_service_disabled = EntityService(
        entity_parser=entity_parser,
        entity_repository=entity_repository,
        observation_repository=observation_repository,
        relation_repository=relation_repository,
        file_service=file_service,
        link_resolver=link_resolver,
        app_config=app_config_disabled,
    )

    # Update entity with permalinks disabled
    entity_data.content = "Updated content"
    updated = await entity_service_disabled.update_entity(entity, entity_data)

    # Permalink should remain unchanged (not removed, just not updated)
    assert updated.permalink == original_permalink

    # Verify file still has the original permalink
    file_path = file_service.get_entity_path(updated)
    file_content, _ = await file_service.read_file(file_path)
    assert "Updated content" in file_content
    assert f"permalink: {original_permalink}" in file_content


@pytest.mark.asyncio
async def test_create_entity_with_content_frontmatter_permalinks_disabled(
    entity_repository,
    observation_repository,
    relation_repository,
    entity_parser,
    file_service: FileService,
    link_resolver,
):
    """Test that content frontmatter permalinks are ignored when disabled."""
    # Create entity service with permalinks disabled
    app_config = BasicMemoryConfig(disable_permalinks=True)
    entity_service = EntityService(
        entity_parser=entity_parser,
        entity_repository=entity_repository,
        observation_repository=observation_repository,
        relation_repository=relation_repository,
        file_service=file_service,
        link_resolver=link_resolver,
        app_config=app_config,
    )

    # Content with frontmatter containing permalink
    content = dedent(
        """
        ---
        permalink: custom-permalink
        ---
        # Test Content
        """
    ).strip()

    entity_data = EntitySchema(
        title="Test Entity",
        folder="test",
        entity_type="note",
        content=content,
    )

    # Create entity
    entity = await entity_service.create_entity(entity_data)

    # Entity should not have a permalink set
    assert entity.permalink is None

    # Verify file doesn't have permalink in frontmatter
    file_path = file_service.get_entity_path(entity)
    file_content, _ = await file_service.read_file(file_path)
    _, frontmatter, doc_content = file_content.split("---", 2)
    metadata = yaml.safe_load(frontmatter)

    # The permalink from content frontmatter should not be present
    assert "permalink" not in metadata


@pytest.mark.asyncio
async def test_move_entity_with_permalinks_disabled(
    entity_repository,
    observation_repository,
    relation_repository,
    entity_parser,
    file_service: FileService,
    link_resolver,
    project_config,
):
    """Test that moving an entity with disable_permalinks=True doesn't update permalinks."""
    # First create with permalinks enabled
    app_config = BasicMemoryConfig(disable_permalinks=False, update_permalinks_on_move=True)
    entity_service = EntityService(
        entity_parser=entity_parser,
        entity_repository=entity_repository,
        observation_repository=observation_repository,
        relation_repository=relation_repository,
        file_service=file_service,
        link_resolver=link_resolver,
        app_config=app_config,
    )

    entity_data = EntitySchema(
        title="Test Entity",
        folder="test",
        entity_type="note",
        content="Test content",
    )

    # Create entity
    entity = await entity_service.create_entity(entity_data)
    original_permalink = entity.permalink

    # Now disable permalinks
    app_config_disabled = BasicMemoryConfig(disable_permalinks=True, update_permalinks_on_move=True)

    # Move entity
    moved = await entity_service.move_entity(
        identifier=entity.permalink,
        destination_path="new_folder/test_entity.md",
        project_config=project_config,
        app_config=app_config_disabled,
    )

    # Permalink should remain unchanged even though update_permalinks_on_move is True
    assert moved.permalink == original_permalink

```

--------------------------------------------------------------------------------
/tests/db/test_issue_254_foreign_key_constraints.py:
--------------------------------------------------------------------------------

```python
"""Test to verify that issue #254 is fixed.

Issue #254: Foreign key constraint failures when deleting projects with related entities.

The issue was that when migration 647e7a75e2cd recreated the project table,
it did not re-establish the foreign key constraint from entity.project_id to project.id
with CASCADE DELETE, causing foreign key constraint failures when trying to delete
projects that have related entities.

Migration a1b2c3d4e5f6 was created to fix this by adding the missing foreign key
constraint with CASCADE DELETE behavior.

This test file verifies that the fix works correctly in production databases
that have had the migration applied.
"""

import tempfile
from datetime import datetime, timezone
from pathlib import Path

import pytest

from basic_memory.services.project_service import ProjectService


# @pytest.mark.skip(reason="Issue #254 not fully resolved yet - foreign key constraint errors still occur")
@pytest.mark.asyncio
async def test_issue_254_foreign_key_constraint_fix(project_service: ProjectService):
    """Test to verify issue #254 is fixed: project removal with foreign key constraints.

    This test reproduces the exact scenario from issue #254:
    1. Create a project
    2. Create entities, observations, and relations linked to that project
    3. Attempt to remove the project
    4. Verify it succeeds without "FOREIGN KEY constraint failed" errors
    5. Verify all related data is properly cleaned up via CASCADE DELETE

    Once issue #254 is fully fixed, remove the @pytest.mark.skip decorator.
    """
    test_project_name = "issue-254-verification"
    with tempfile.TemporaryDirectory() as temp_dir:
        test_root = Path(temp_dir)
        test_project_path = str(test_root / "issue-254-verification")

        # Step 1: Create test project
        await project_service.add_project(test_project_name, test_project_path)
        project = await project_service.get_project(test_project_name)
        assert project is not None, "Project should be created successfully"

        # Step 2: Create related entities that would cause foreign key constraint issues
        from basic_memory.repository.entity_repository import EntityRepository
        from basic_memory.repository.observation_repository import ObservationRepository
        from basic_memory.repository.relation_repository import RelationRepository

        entity_repo = EntityRepository(
            project_service.repository.session_maker, project_id=project.id
        )
        obs_repo = ObservationRepository(
            project_service.repository.session_maker, project_id=project.id
        )
        rel_repo = RelationRepository(
            project_service.repository.session_maker, project_id=project.id
        )

        # Create entity
        entity_data = {
            "title": "Issue 254 Test Entity",
            "entity_type": "note",
            "content_type": "text/markdown",
            "project_id": project.id,
            "permalink": "issue-254-entity",
            "file_path": "issue-254-entity.md",
            "checksum": "issue254test",
            "created_at": datetime.now(timezone.utc),
            "updated_at": datetime.now(timezone.utc),
        }
        entity = await entity_repo.create(entity_data)

        # Create observation linked to entity
        observation_data = {
            "entity_id": entity.id,
            "content": "This observation should be cascade deleted",
            "category": "test",
        }
        observation = await obs_repo.create(observation_data)

        # Create relation involving the entity
        relation_data = {
            "from_id": entity.id,
            "to_name": "some-other-entity",
            "relation_type": "relates-to",
        }
        relation = await rel_repo.create(relation_data)

        # Step 3: Attempt to remove the project
        # This is where issue #254 manifested - should NOT raise "FOREIGN KEY constraint failed"
        try:
            await project_service.remove_project(test_project_name)
        except Exception as e:
            if "FOREIGN KEY constraint failed" in str(e):
                pytest.fail(
                    f"Issue #254 not fixed - foreign key constraint error still occurs: {e}. "
                    f"The migration a1b2c3d4e5f6 may not have been applied correctly or "
                    f"the CASCADE DELETE constraint is not working as expected."
                )
            else:
                # Re-raise unexpected errors
                raise

        # Step 4: Verify project was successfully removed
        removed_project = await project_service.get_project(test_project_name)
        assert removed_project is None, "Project should have been removed"

        # Step 5: Verify related data was cascade deleted
        remaining_entity = await entity_repo.find_by_id(entity.id)
        assert remaining_entity is None, "Entity should have been cascade deleted"

        remaining_observation = await obs_repo.find_by_id(observation.id)
        assert remaining_observation is None, "Observation should have been cascade deleted"

        remaining_relation = await rel_repo.find_by_id(relation.id)
        assert remaining_relation is None, "Relation should have been cascade deleted"


@pytest.mark.asyncio
async def test_issue_254_reproduction(project_service: ProjectService):
    """Test that reproduces issue #254 to document the current state.

    This test demonstrates the current behavior and will fail until the issue is fixed.
    It serves as documentation of what the problem was.
    """
    test_project_name = "issue-254-reproduction"
    with tempfile.TemporaryDirectory() as temp_dir:
        test_root = Path(temp_dir)
        test_project_path = str(test_root / "issue-254-reproduction")

        # Create project and entity
        await project_service.add_project(test_project_name, test_project_path)
        project = await project_service.get_project(test_project_name)

        from basic_memory.repository.entity_repository import EntityRepository

        entity_repo = EntityRepository(
            project_service.repository.session_maker, project_id=project.id
        )

        entity_data = {
            "title": "Reproduction Entity",
            "entity_type": "note",
            "content_type": "text/markdown",
            "project_id": project.id,
            "permalink": "reproduction-entity",
            "file_path": "reproduction-entity.md",
            "checksum": "repro123",
            "created_at": datetime.now(timezone.utc),
            "updated_at": datetime.now(timezone.utc),
        }
        await entity_repo.create(entity_data)

        # This should eventually work without errors once issue #254 is fixed
        # with pytest.raises(Exception) as exc_info:
        await project_service.remove_project(test_project_name)

        # Document the current error for tracking
        # error_message = str(exc_info.value)
        # assert any(keyword in error_message for keyword in [
        #     "FOREIGN KEY constraint failed",
        #     "constraint",
        #     "integrity"
        # ]), f"Expected foreign key or integrity constraint error, got: {error_message}"

```

--------------------------------------------------------------------------------
/src/basic_memory/mcp/resources/ai_assistant_guide.md:
--------------------------------------------------------------------------------

```markdown
# AI Assistant Guide for Basic Memory

Quick reference for using Basic Memory tools effectively through MCP.

**For comprehensive coverage**: See the [Extended AI Assistant Guide](https://github.com/basicmachines-co/basic-memory/blob/main/docs/ai-assistant-guide-extended.md) with detailed examples, advanced patterns, and self-contained sections.

## Overview

Basic Memory creates a semantic knowledge graph from markdown files. Focus on building rich connections between notes.

- **Local-First**: Plain text files on user's computer
- **Persistent**: Knowledge survives across sessions
- **Semantic**: Observations and relations create a knowledge graph

**Your role**: You're helping humans build enduring knowledge they'll own forever. The semantic graph (observations, relations, context) helps you provide better assistance by understanding connections and maintaining continuity. Think: lasting insights worth keeping, not disposable chat logs.

## Project Management 

All tools require explicit project specification.

**Three-tier resolution:**
1. CLI constraint: `--project name` (highest priority)
2. Explicit parameter: `project="name"` in tool calls
3. Default mode: `default_project_mode=true` in config (fallback)

### Quick Setup Check

```python
# Discover projects
projects = await list_memory_projects()

# Check if default_project_mode enabled
# If yes: project parameter optional
# If no: project parameter required
```

### Default Project Mode

When `default_project_mode=true`:
```python
# These are equivalent:
await write_note("Note", "Content", "folder")
await write_note("Note", "Content", "folder", project="main")
```

When `default_project_mode=false` (default):
```python
# Project required:
await write_note("Note", "Content", "folder", project="main")  # ✓
await write_note("Note", "Content", "folder")  # ✗ Error
```

## Core Tools

### Writing Knowledge

```python
await write_note(
    title="Topic",
    content="# Topic\n## Observations\n- [category] fact\n## Relations\n- relates_to [[Other]]",
    folder="notes",
    project="main"  # Required unless default_project_mode=true
)
```

### Reading Knowledge

```python
# By identifier
content = await read_note("Topic", project="main")

# By memory:// URL
content = await read_note("memory://folder/topic", project="main")
```

### Searching

```python
results = await search_notes(
    query="authentication",
    project="main",
    page_size=10
)
```

### Building Context

```python
context = await build_context(
    url="memory://specs/auth",
    project="main",
    depth=2,
    timeframe="1 week"
)
```

## Knowledge Graph Essentials

### Observations

Categorized facts with optional tags:
```markdown
- [decision] Use JWT for authentication #security
- [technique] Hash passwords with bcrypt #best-practice
- [requirement] Support OAuth 2.0 providers
```

### Relations

Directional links between entities:
```markdown
- implements [[Authentication Spec]]
- requires [[User Database]]
- extends [[Base Security Model]]
```

**Common relation types:** `relates_to`, `implements`, `requires`, `extends`, `part_of`, `contrasts_with`

### Forward References

Reference entities that don't exist yet:
```python
# Create note with forward reference
await write_note(
    title="Login Flow",
    content="## Relations\n- requires [[OAuth Provider]]",  # Doesn't exist yet
    folder="auth",
    project="main"
)

# Later, create referenced entity
await write_note(
    title="OAuth Provider",
    content="# OAuth Provider\n...",
    folder="auth",
    project="main"
)
# → Relation automatically resolved
```

## Best Practices

### 1. Project Management

**Single-project users:**
- Enable `default_project_mode=true`
- Simpler tool calls

**Multi-project users:**
- Keep `default_project_mode=false`
- Always specify project explicitly

**Discovery:**
```python
# Start with discovery
projects = await list_memory_projects()

# Cross-project activity (no project param = all projects)
activity = await recent_activity()

# Or specific project
activity = await recent_activity(project="main")
```

### 2. Building Rich Graphs

**Always include:**
- 3-5 observations per note
- 2-3 relations per note
- Meaningful categories and relation types

**Search before creating:**
```python
# Find existing entities to reference
results = await search_notes(query="authentication", project="main")
# Use exact titles in [[WikiLinks]]
```

### 3. Writing Effective Notes

**Structure:**
```markdown
# Title

## Context
Background information

## Observations
- [category] Fact with #tags
- [category] Another fact

## Relations
- relation_type [[Exact Entity Title]]
```

**Categories:** `[idea]`, `[decision]`, `[fact]`, `[technique]`, `[requirement]`

### 4. Error Handling

**Missing project:**
```python
try:
    await search_notes(query="test")  # Missing project parameter - will error
except:
    # Show available projects
    projects = await list_memory_projects()
    # Then retry with project
    results = await search_notes(query="test", project=projects[0].name)
```

**Forward references:**
```python
# Check response for unresolved relations
response = await write_note(
    title="New Topic",
    content="## Relations\n- relates_to [[Future Topic]]",
    folder="notes",
    project="main"
)
# Forward refs will resolve when target created
```

### 5. Recording Context

**Ask permission:**
> "Would you like me to save our discussion about [topic] to Basic Memory?"

**Confirm when done:**
> "I've saved our discussion to Basic Memory."

**What to record:**
- Decisions and rationales
- Important discoveries
- Action items and plans
- Connected topics

## Common Patterns

### Capture Decision

```python
await write_note(
    title="DB Choice",
    content="""# DB Choice\n## Decision\nUse PostgreSQL\n## Observations\n- [requirement] ACID compliance #reliability\n- [decision] PostgreSQL over MySQL\n## Relations\n- implements [[Data Architecture]]""",
    folder="decisions",
    project="main"
)
```

### Link Topics & Build Context

```python
# Link bidirectionally
await write_note(title="API Auth", content="## Relations\n- part_of [[API Design]]", folder="api", project="main")
await edit_note(identifier="API Design", operation="append", content="\n- includes [[API Auth]]", project="main")

# Search and build context
results = await search_notes(query="authentication", project="main")
context = await build_context(url=f"memory://{results[0].permalink}", project="main", depth=2)
```

## Tool Quick Reference

| Tool | Purpose | Key Params |
|------|---------|------------|
| `write_note` | Create/update | title, content, folder, project |
| `read_note` | Read content | identifier, project |
| `edit_note` | Modify existing | identifier, operation, content, project |
| `search_notes` | Find notes | query, project |
| `build_context` | Graph traversal | url, depth, project |
| `recent_activity` | Recent changes | timeframe, project |
| `list_memory_projects` | Show projects | (none) |

## memory:// URL Format

- `memory://title` - By title
- `memory://folder/title` - By folder + title
- `memory://permalink` - By permalink
- `memory://folder/*` - All in folder

For full documentation: https://docs.basicmemory.com

Built with ♥️ by Basic Machines

```

--------------------------------------------------------------------------------
/docs/character-handling.md:
--------------------------------------------------------------------------------

```markdown
# Character Handling and Conflict Resolution

Basic Memory handles various character encoding scenarios and file naming conventions to provide consistent permalink generation and conflict resolution. This document explains how the system works and how to resolve common character-related issues.

## Overview

Basic Memory uses a sophisticated system to generate permalinks from file paths while maintaining consistency across different operating systems and character encodings. The system normalizes file paths and generates unique permalinks to prevent conflicts.

## Character Normalization Rules

### 1. Permalink Generation

When Basic Memory processes a file path, it applies these normalization rules:

```
Original: "Finance/My Investment Strategy.md"
Permalink: "finance/my-investment-strategy"
```

**Transformation process:**
1. Remove file extension (`.md`)
2. Convert to lowercase (case-insensitive)
3. Replace spaces with hyphens
4. Replace underscores with hyphens
5. Handle international characters (transliteration for Latin, preservation for non-Latin)
6. Convert camelCase to kebab-case

### 2. International Character Support

**Latin characters with diacritics** are transliterated:
- `ø` → `o` (Søren → soren)
- `ü` → `u` (Müller → muller)
- `é` → `e` (Café → cafe)
- `ñ` → `n` (Niño → nino)

**Non-Latin characters** are preserved:
- Chinese: `中文/测试文档.md` → `中文/测试文档`
- Japanese: `日本語/文書.md` → `日本語/文書`

## Common Conflict Scenarios

### 1. Hyphen vs Space Conflicts

**Problem:** Files with existing hyphens conflict with generated permalinks from spaces.

**Example:**
```
File 1: "basic memory bug.md"     → permalink: "basic-memory-bug"
File 2: "basic-memory-bug.md"    → permalink: "basic-memory-bug" (CONFLICT!)
```

**Resolution:** The system automatically resolves this by adding suffixes:
```
File 1: "basic memory bug.md"     → permalink: "basic-memory-bug"
File 2: "basic-memory-bug.md"    → permalink: "basic-memory-bug-1"
```

**Best Practice:** Choose consistent naming conventions within your project.

### 2. Case Sensitivity Conflicts

**Problem:** Different case variations that normalize to the same permalink.

**Example on macOS:**
```
Directory: Finance/investment.md
Directory: finance/investment.md  (different on filesystem, same permalink)
```

**Resolution:** Basic Memory detects case conflicts and prevents them during sync operations with helpful error messages.

**Best Practice:** Use consistent casing for directory and file names.

### 3. Character Encoding Conflicts

**Problem:** Different Unicode normalizations of the same logical character.

**Example:**
```
File 1: "café.md" (é as single character)
File 2: "café.md" (e + combining accent)
```

**Resolution:** Basic Memory normalizes Unicode characters using NFD normalization to detect these conflicts.

### 4. Forward Slash Conflicts

**Problem:** Forward slashes in frontmatter or file names interpreted as path separators.

**Example:**
```yaml
---
permalink: finance/investment/strategy
---
```

**Resolution:** Basic Memory validates frontmatter permalinks and warns about path separator conflicts.

## Error Messages and Troubleshooting

### "UNIQUE constraint failed: entity.file_path, entity.project_id"

**Cause:** Two entities trying to use the same file path within a project.

**Common scenarios:**
1. File move operation where destination is already occupied
2. Case sensitivity differences on macOS
3. Character encoding conflicts
4. Concurrent file operations

**Resolution steps:**
1. Check for duplicate file names with different cases
2. Look for files with similar names but different character encodings
3. Rename conflicting files to have unique names
4. Run sync again after resolving conflicts

### "File path conflict detected during move"

**Cause:** Enhanced conflict detection preventing potential database integrity violations.

**What this means:** The system detected that moving a file would create a conflict before attempting the database operation.

**Resolution:** Follow the specific guidance in the error message, which will indicate the type of conflict detected.

## Best Practices

### 1. File Naming Conventions

**Recommended patterns:**
- Use consistent casing (prefer lowercase)
- Use hyphens instead of spaces for multi-word files
- Avoid special characters that could conflict with path separators
- Be consistent with directory structure casing

**Examples:**
```
✅ Good:
- finance/investment-strategy.md
- projects/basic-memory-features.md
- docs/api-reference.md

❌ Problematic:
- Finance/Investment Strategy.md  (mixed case, spaces)
- finance/Investment Strategy.md  (inconsistent case)
- docs/API/Reference.md          (mixed case directories)
```

### 2. Permalink Management

**Custom permalinks in frontmatter:**
```yaml
---
type: knowledge
permalink: custom-permalink-name
---
```

**Guidelines:**
- Use lowercase permalinks
- Use hyphens for word separation
- Avoid path separators unless creating sub-paths
- Ensure uniqueness within your project

### 3. Directory Structure

**Consistent casing:**
```
✅ Good:
finance/
  investment-strategies.md
  portfolio-management.md

❌ Problematic:  
Finance/           (capital F)
  investment-strategies.md
finance/           (lowercase f) 
  portfolio-management.md
```

## Migration and Cleanup

### Identifying Conflicts

Use Basic Memory's built-in conflict detection:

```bash
# Sync will report conflicts
basic-memory sync

# Check sync status for warnings
basic-memory status
```

### Resolving Existing Conflicts

1. **Identify conflicting files** from sync error messages
2. **Choose consistent naming convention** for your project
3. **Rename files** to follow the convention
4. **Re-run sync** to verify resolution

### Bulk Renaming Strategy

For projects with many conflicts:

1. **Backup your project** before making changes
2. **Standardize on lowercase** file and directory names
3. **Replace spaces with hyphens** in file names
4. **Use consistent character encoding** (UTF-8)
5. **Test sync after each batch** of changes

## System Enhancements

### Recent Improvements (v0.13+)

1. **Enhanced conflict detection** before database operations
2. **Improved error messages** with specific resolution guidance
3. **Character normalization utilities** for consistent handling
4. **File swap detection** for complex move scenarios
5. **Proactive conflict warnings** during permalink resolution

### Monitoring and Logging

The system now provides detailed logging for conflict resolution:

```
DEBUG: Detected potential file path conflicts for 'Finance/Investment.md': ['finance/investment.md']
WARNING: File path conflict detected during move: entity_id=123 trying to move from 'old.md' to 'new.md'
```

These logs help identify and resolve conflicts before they cause sync failures.

## Support and Resources

If you encounter character-related conflicts not covered in this guide:

1. **Check the logs** for specific conflict details
2. **Review error messages** for resolution guidance  
3. **Report issues** with examples of the conflicting files
4. **Consider the file naming best practices** outlined above

The Basic Memory system is designed to handle most character conflicts automatically while providing clear guidance for manual resolution when needed.
```

--------------------------------------------------------------------------------
/src/basic_memory/cli/commands/cloud/rclone_installer.py:
--------------------------------------------------------------------------------

```python
"""Cross-platform rclone installation utilities."""

import platform
import shutil
import subprocess
from typing import Optional

from rich.console import Console

console = Console()


class RcloneInstallError(Exception):
    """Exception raised for rclone installation errors."""

    pass


def is_rclone_installed() -> bool:
    """Check if rclone is already installed and available in PATH."""
    return shutil.which("rclone") is not None


def get_platform() -> str:
    """Get the current platform identifier."""
    system = platform.system().lower()
    if system == "darwin":
        return "macos"
    elif system == "linux":
        return "linux"
    elif system == "windows":
        return "windows"
    else:
        raise RcloneInstallError(f"Unsupported platform: {system}")


def run_command(command: list[str], check: bool = True) -> subprocess.CompletedProcess:
    """Run a command with proper error handling."""
    try:
        console.print(f"[dim]Running: {' '.join(command)}[/dim]")
        result = subprocess.run(command, capture_output=True, text=True, check=check)
        if result.stdout:
            console.print(f"[dim]Output: {result.stdout.strip()}[/dim]")
        return result
    except subprocess.CalledProcessError as e:
        console.print(f"[red]Command failed: {e}[/red]")
        if e.stderr:
            console.print(f"[red]Error output: {e.stderr}[/red]")
        raise RcloneInstallError(f"Command failed: {e}") from e
    except FileNotFoundError as e:
        raise RcloneInstallError(f"Command not found: {' '.join(command)}") from e


def install_rclone_macos() -> None:
    """Install rclone on macOS using Homebrew or official script."""
    # Try Homebrew first
    if shutil.which("brew"):
        try:
            console.print("[blue]Installing rclone via Homebrew...[/blue]")
            run_command(["brew", "install", "rclone"])
            console.print("[green]✓ rclone installed via Homebrew[/green]")
            return
        except RcloneInstallError:
            console.print(
                "[yellow]Homebrew installation failed, trying official script...[/yellow]"
            )

    # Fallback to official script
    console.print("[blue]Installing rclone via official script...[/blue]")
    try:
        run_command(["sh", "-c", "curl https://rclone.org/install.sh | sudo bash"])
        console.print("[green]✓ rclone installed via official script[/green]")
    except RcloneInstallError:
        raise RcloneInstallError(
            "Failed to install rclone. Please install manually: brew install rclone"
        )


def install_rclone_linux() -> None:
    """Install rclone on Linux using package managers or official script."""
    # Try snap first (most universal)
    if shutil.which("snap"):
        try:
            console.print("[blue]Installing rclone via snap...[/blue]")
            run_command(["sudo", "snap", "install", "rclone"])
            console.print("[green]✓ rclone installed via snap[/green]")
            return
        except RcloneInstallError:
            console.print("[yellow]Snap installation failed, trying apt...[/yellow]")

    # Try apt (Debian/Ubuntu)
    if shutil.which("apt"):
        try:
            console.print("[blue]Installing rclone via apt...[/blue]")
            run_command(["sudo", "apt", "update"])
            run_command(["sudo", "apt", "install", "-y", "rclone"])
            console.print("[green]✓ rclone installed via apt[/green]")
            return
        except RcloneInstallError:
            console.print("[yellow]apt installation failed, trying official script...[/yellow]")

    # Fallback to official script
    console.print("[blue]Installing rclone via official script...[/blue]")
    try:
        run_command(["sh", "-c", "curl https://rclone.org/install.sh | sudo bash"])
        console.print("[green]✓ rclone installed via official script[/green]")
    except RcloneInstallError:
        raise RcloneInstallError(
            "Failed to install rclone. Please install manually: sudo snap install rclone"
        )


def install_rclone_windows() -> None:
    """Install rclone on Windows using package managers."""
    # Try winget first (built into Windows 10+)
    if shutil.which("winget"):
        try:
            console.print("[blue]Installing rclone via winget...[/blue]")
            run_command(["winget", "install", "Rclone.Rclone"])
            console.print("[green]✓ rclone installed via winget[/green]")
            return
        except RcloneInstallError:
            console.print("[yellow]winget installation failed, trying chocolatey...[/yellow]")

    # Try chocolatey
    if shutil.which("choco"):
        try:
            console.print("[blue]Installing rclone via chocolatey...[/blue]")
            run_command(["choco", "install", "rclone", "-y"])
            console.print("[green]✓ rclone installed via chocolatey[/green]")
            return
        except RcloneInstallError:
            console.print("[yellow]chocolatey installation failed, trying scoop...[/yellow]")

    # Try scoop
    if shutil.which("scoop"):
        try:
            console.print("[blue]Installing rclone via scoop...[/blue]")
            run_command(["scoop", "install", "rclone"])
            console.print("[green]✓ rclone installed via scoop[/green]")
            return
        except RcloneInstallError:
            console.print("[yellow]scoop installation failed[/yellow]")

    # No package manager available
    raise RcloneInstallError(
        "Could not install rclone automatically. Please install a package manager "
        "(winget, chocolatey, or scoop) or install rclone manually from https://rclone.org/downloads/"
    )


def install_rclone(platform_override: Optional[str] = None) -> None:
    """Install rclone for the current platform."""
    if is_rclone_installed():
        console.print("[green]rclone is already installed[/green]")
        return

    platform_name = platform_override or get_platform()
    console.print(f"[blue]Installing rclone for {platform_name}...[/blue]")

    try:
        if platform_name == "macos":
            install_rclone_macos()
        elif platform_name == "linux":
            install_rclone_linux()
        elif platform_name == "windows":
            install_rclone_windows()
        else:
            raise RcloneInstallError(f"Unsupported platform: {platform_name}")

        # Verify installation
        if not is_rclone_installed():
            raise RcloneInstallError("rclone installation completed but command not found in PATH")

        console.print("[green]✓ rclone installation completed successfully[/green]")

    except RcloneInstallError:
        raise
    except Exception as e:
        raise RcloneInstallError(f"Unexpected error during installation: {e}") from e


def get_rclone_version() -> Optional[str]:
    """Get the installed rclone version."""
    if not is_rclone_installed():
        return None

    try:
        result = run_command(["rclone", "version"], check=False)
        if result.returncode == 0:
            # Parse version from output (format: "rclone v1.64.0")
            lines = result.stdout.strip().split("\n")
            for line in lines:
                if line.startswith("rclone v"):
                    return line.split()[1]
        return "unknown"
    except Exception:
        return "unknown"

```

--------------------------------------------------------------------------------
/tests/markdown/test_markdown_plugins.py:
--------------------------------------------------------------------------------

```python
"""Tests for markdown plugins."""

from textwrap import dedent
from markdown_it import MarkdownIt
from markdown_it.token import Token

from basic_memory.markdown.plugins import (
    observation_plugin,
    relation_plugin,
    is_observation,
    is_explicit_relation,
    parse_relation,
    parse_inline_relations,
)


def test_observation_plugin():
    """Test observation plugin."""
    # Set up markdown-it instance
    md = MarkdownIt().use(observation_plugin)

    # Test basic observation with all features
    content = dedent("""
        - [design] Basic observation #tag1 #tag2 (with context)
        """)

    tokens = md.parse(content)
    token = [t for t in tokens if t.type == "inline"][0]
    assert "observation" in token.meta
    obs = token.meta["observation"]
    assert obs["category"] == "design"
    assert obs["content"] == "Basic observation #tag1 #tag2"
    assert set(obs["tags"]) == {"tag1", "tag2"}
    assert obs["context"] == "with context"

    # Test without category
    content = "- Basic observation #tag1 (context)"
    token = [t for t in md.parse(content) if t.type == "inline"][0]
    obs = token.meta["observation"]
    assert obs["category"] is None
    assert obs["content"] == "Basic observation #tag1"
    assert obs["tags"] == ["tag1"]
    assert obs["context"] == "context"

    # Test without tags
    content = "- [note] Basic observation (context)"
    token = [t for t in md.parse(content) if t.type == "inline"][0]
    obs = token.meta["observation"]
    assert obs["category"] == "note"
    assert obs["content"] == "Basic observation"
    assert obs["tags"] is None
    assert obs["context"] == "context"


def test_observation_edge_cases():
    """Test observation parsing edge cases."""
    # Test non-inline token
    token = Token("paragraph", "", 0)
    assert not is_observation(token)

    # Test empty content
    token = Token("inline", "", 0)
    assert not is_observation(token)

    # Test markdown task
    token = Token("inline", "[ ] Task item", 0)
    assert not is_observation(token)

    # Test completed task
    token = Token("inline", "[x] Done task", 0)
    assert not is_observation(token)

    # Test in-progress task
    token = Token("inline", "[-] Ongoing task", 0)
    assert not is_observation(token)


def test_observation_excludes_markdown_and_wiki_links():
    """Test that markdown links and wiki links are NOT parsed as observations.

    This test validates the fix for issue #247 where:
    - [text](url) markdown links were incorrectly parsed as observations
    - [[text]] wiki links were incorrectly parsed as observations
    """
    # Test markdown links are NOT observations
    token = Token("inline", "[Click here](https://example.com)", 0)
    assert not is_observation(token), "Markdown links should not be parsed as observations"

    token = Token("inline", "[Documentation](./docs/readme.md)", 0)
    assert not is_observation(token), "Relative markdown links should not be parsed as observations"

    token = Token("inline", "[Empty link]()", 0)
    assert not is_observation(token), "Empty markdown links should not be parsed as observations"

    # Test wiki links are NOT observations
    token = Token("inline", "[[SomeWikiPage]]", 0)
    assert not is_observation(token), "Wiki links should not be parsed as observations"

    token = Token("inline", "[[Multi Word Page]]", 0)
    assert not is_observation(token), "Multi-word wiki links should not be parsed as observations"

    # Test nested brackets are NOT observations
    token = Token("inline", "[[Nested [[Inner]] Link]]", 0)
    assert not is_observation(token), "Nested wiki links should not be parsed as observations"

    # Test valid observations still work (should return True)
    token = Token("inline", "[category] This is a valid observation", 0)
    assert is_observation(token), "Valid observations should still be parsed correctly"

    token = Token("inline", "[design] Valid observation #tag", 0)
    assert is_observation(token), "Valid observations with tags should still work"

    token = Token("inline", "Just some text #tag", 0)
    assert is_observation(token), "Tag-only observations should still work"

    # Test edge cases that should NOT be observations
    token = Token("inline", "[]Empty brackets", 0)
    assert not is_observation(token), "Empty category brackets should not be observations"

    token = Token("inline", "[category]No space after category", 0)
    assert not is_observation(token), "No space after category should not be valid observation"


def test_relation_plugin():
    """Test relation plugin."""
    md = MarkdownIt().use(relation_plugin)

    # Test explicit relation with all features
    content = dedent("""
        - implements [[Component]] (with context)
        """)

    tokens = md.parse(content)
    token = [t for t in tokens if t.type == "inline"][0]
    assert "relations" in token.meta
    rel = token.meta["relations"][0]
    assert rel["type"] == "implements"
    assert rel["target"] == "Component"
    assert rel["context"] == "with context"

    # Test implicit relations in text
    content = "Some text with a [[Link]] and [[Another Link]]"
    token = [t for t in md.parse(content) if t.type == "inline"][0]
    rels = token.meta["relations"]
    assert len(rels) == 2
    assert rels[0]["type"] == "links to"
    assert rels[0]["target"] == "Link"
    assert rels[1]["target"] == "Another Link"


def test_relation_edge_cases():
    """Test relation parsing edge cases."""
    # Test non-inline token
    token = Token("paragraph", "", 0)
    assert not is_explicit_relation(token)

    # Test empty content
    token = Token("inline", "", 0)
    assert not is_explicit_relation(token)

    # Test incomplete relation (missing target)
    token = Token("inline", "relates_to [[]]", 0)
    result = parse_relation(token)
    assert result is None

    # Test non-relation content
    token = Token("inline", "Just some text", 0)
    result = parse_relation(token)
    assert result is None

    # Test invalid inline link (empty target)
    assert not parse_inline_relations("Text with [[]] empty link")

    # Test nested links (avoid duplicates)
    result = parse_inline_relations("Text with [[Outer [[Inner]] Link]]")
    assert len(result) == 1
    assert result[0]["target"] == "Outer [[Inner]] Link"


def test_combined_plugins():
    """Test both plugins working together."""
    md = MarkdownIt().use(observation_plugin).use(relation_plugin)

    content = dedent("""
        # Section
        - [design] Observation with [[Link]] #tag (context)
        - implements [[Component]] (details)
        - Just a [[Reference]] in text
        
        Some text with a [[Link]] reference.
        """)

    tokens = md.parse(content)
    inline_tokens = [t for t in tokens if t.type == "inline"]

    # First token has both observation and relation
    obs_token = inline_tokens[1]
    assert "observation" in obs_token.meta
    assert "relations" in obs_token.meta

    # Second token has explicit relation
    rel_token = inline_tokens[2]
    assert "relations" in rel_token.meta
    rel = rel_token.meta["relations"][0]
    assert rel["type"] == "implements"

    # Third token has implicit relation
    text_token = inline_tokens[4]
    assert "relations" in text_token.meta
    link = text_token.meta["relations"][0]
    assert link["type"] == "links to"

```

--------------------------------------------------------------------------------
/.claude/agents/python-developer.md:
--------------------------------------------------------------------------------

```markdown
---
name: python-developer
description: Python backend developer specializing in FastAPI, DBOS workflows, and API implementation. Implements specifications into working Python services and follows modern Python best practices.
model: sonnet
color: red
---

You are an expert Python developer specializing in implementing specifications into working Python services and APIs. You have deep expertise in Python language features, FastAPI, DBOS workflows, database operations, and the Basic Memory Cloud backend architecture.

**Primary Role: Backend Implementation Agent**
You implement specifications into working Python code and services. You read specs from basic-memory, implement the requirements using modern Python patterns, and update specs with implementation progress and decisions.

**Core Responsibilities:**

**Specification Implementation:**
- Read specs using basic-memory MCP tools to understand backend requirements
- Implement Python services, APIs, and workflows that fulfill spec requirements
- Update specs with implementation progress, decisions, and completion status
- Document any architectural decisions or modifications needed during implementation

**Python/FastAPI Development:**
- Create FastAPI applications with proper middleware and dependency injection
- Implement DBOS workflows for durable, long-running operations
- Design database schemas and implement repository patterns
- Handle authentication, authorization, and security requirements
- Implement async/await patterns for optimal performance

**Backend Implementation Process:**
1. **Read Spec**: Use `mcp__basic-memory__read_note` to get spec requirements
2. **Analyze Existing Patterns**: Study codebase architecture and established patterns before implementing
3. **Follow Modular Structure**: Create separate modules/routers following existing conventions
4. **Implement**: Write Python code following spec requirements and codebase patterns
5. **Test**: Create tests that validate spec success criteria
6. **Update Spec**: Document completion and any implementation decisions
7. **Validate**: Run tests and ensure integration works correctly

**Technical Standards:**
- Follow PEP 8 and modern Python conventions
- Use type hints throughout the codebase
- Implement proper error handling and logging
- Use async/await for all database and external service calls
- Write comprehensive tests using pytest
- Follow security best practices for web APIs
- Document functions and classes with clear docstrings

**Codebase Architecture Patterns:**

**CLI Structure Patterns:**
- Follow existing modular CLI pattern: create separate CLI modules (e.g., `upload_cli.py`) instead of adding commands directly to `main.py`
- Existing examples: `polar_cli.py`, `tenant_cli.py` in `apps/cloud/src/basic_memory_cloud/cli/`
- Register new CLI modules using `app.add_typer(new_cli, name="command", help="description")`
- Maintain consistent command structure and help text patterns

**FastAPI Router Patterns:**
- Create dedicated routers for logical endpoint groups instead of adding routes directly to main app
- Place routers in dedicated files (e.g., `apps/api/src/basic_memory_cloud_api/routers/webdav_router.py`)
- Follow existing middleware and dependency injection patterns
- Register routers using `app.include_router(router, prefix="/api-path")`

**Modular Organization:**
- Always analyze existing codebase structure before implementing new features
- Follow established file organization and naming conventions
- Create separate modules for distinct functionality areas
- Maintain consistency with existing architectural decisions
- Preserve separation of concerns across service boundaries

**Pattern Analysis Process:**
1. Examine similar existing functionality in the codebase
2. Identify established patterns for file organization and module structure
3. Follow the same architectural approach for consistency
4. Create new modules/routers following existing conventions
5. Integrate new code using established registration patterns

**Basic Memory Cloud Expertise:**

**FastAPI Service Patterns:**
- Multi-app architecture (Cloud, MCP, API services)
- Shared middleware for JWT validation, CORS, logging
- Dependency injection for services and repositories
- Proper async request handling and error responses

**DBOS Workflow Implementation:**
- Durable workflows for tenant provisioning and infrastructure operations
- Service layer pattern with repository data access
- Event sourcing for audit trails and business processes
- Idempotent operations with proper error handling

**Database & Repository Patterns:**
- SQLAlchemy with async patterns
- Repository pattern for data access abstraction
- Database migration strategies
- Multi-tenant data isolation patterns

**Authentication & Security:**
- JWT token validation and middleware
- OAuth 2.1 flow implementation
- Tenant-specific authorization patterns
- Secure API design and input validation

**Code Quality Standards:**
- Clear, descriptive variable and function names
- Proper docstrings for functions and classes
- Handle edge cases and error conditions gracefully
- Use context managers for resource management
- Apply composition over inheritance
- Consider security implications for all API endpoints
- Optimize for performance while maintaining readability

**Testing & Validation:**
- Write pytest tests that validate spec requirements
- Include unit tests for business logic
- Integration tests for API endpoints
- Test error conditions and edge cases
- Use fixtures for consistent test setup
- Mock external dependencies appropriately

**Debugging & Problem Solving:**
- Analyze error messages and stack traces methodically
- Identify root causes rather than applying quick fixes
- Use logging effectively for troubleshooting
- Apply systematic debugging approaches
- Document solutions for future reference

**Basic Memory Integration:**
- Use `mcp__basic-memory__read_note` to read specifications
- Use `mcp__basic-memory__edit_note` to update specs with progress
- Document implementation patterns and decisions
- Link related services and database schemas
- Maintain implementation history and troubleshooting guides

**Communication Style:**
- Focus on concrete implementation results and working code
- Document technical decisions and trade-offs clearly
- Ask specific questions about requirements and constraints
- Provide clear status updates on implementation progress
- Explain code choices and architectural patterns

**Deliverables:**
- Working Python services that meet spec requirements
- Updated specifications with implementation status
- Comprehensive tests validating functionality
- Clean, maintainable, type-safe Python code
- Proper error handling and logging
- Database migrations and schema updates

**Key Principles:**
- Implement specifications faithfully and completely
- Write clean, efficient, and maintainable Python code
- Follow established patterns and conventions
- Apply proper error handling and security practices
- Test thoroughly and document implementation decisions
- Balance performance with code clarity and maintainability

When handed a specification via `/spec implement`, you will read the spec, understand the requirements, implement the Python solution using appropriate patterns and frameworks, create tests to validate functionality, and update the spec with completion status and any implementation notes.
```

--------------------------------------------------------------------------------
/tests/api/test_template_loader_helpers.py:
--------------------------------------------------------------------------------

```python
"""Tests for additional template loader helpers."""

import pytest
from datetime import datetime

from basic_memory.api.template_loader import TemplateLoader


@pytest.fixture
def temp_template_dir(tmpdir):
    """Create a temporary directory for test templates."""
    template_dir = tmpdir.mkdir("templates").mkdir("prompts")
    return template_dir


@pytest.fixture
def custom_template_loader(temp_template_dir):
    """Return a TemplateLoader instance with a custom template directory."""
    return TemplateLoader(str(temp_template_dir))


@pytest.mark.asyncio
async def test_round_helper(custom_template_loader, temp_template_dir):
    """Test the round helper for number formatting."""
    # Create template file
    round_path = temp_template_dir / "round.hbs"
    round_path.write_text(
        "{{round number}} {{round number 0}} {{round number 3}}",
        encoding="utf-8",
    )

    # Test with various values
    result = await custom_template_loader.render("round.hbs", {"number": 3.14159})
    assert result == "3.14 3.0 3.142" or result == "3.14 3 3.142"

    # Test with non-numeric value
    result = await custom_template_loader.render("round.hbs", {"number": "not-a-number"})
    assert "not-a-number" in result

    # Test with insufficient args
    empty_path = temp_template_dir / "round_empty.hbs"
    empty_path.write_text("{{round}}", encoding="utf-8")
    result = await custom_template_loader.render("round_empty.hbs", {})
    assert result == ""


@pytest.mark.asyncio
async def test_date_helper_edge_cases(custom_template_loader, temp_template_dir):
    """Test edge cases for the date helper."""
    # Create template file
    date_path = temp_template_dir / "date_edge.hbs"
    date_path.write_text(
        "{{date timestamp}} {{date timestamp '%Y'}} {{date string_date}} {{date invalid_date}} {{date}}",
        encoding="utf-8",
    )

    # Test with various values
    result = await custom_template_loader.render(
        "date_edge.hbs",
        {
            "timestamp": datetime(2023, 1, 1, 12, 30),
            "string_date": "2023-01-01T12:30:00",
            "invalid_date": "not-a-date",
        },
    )

    assert "2023-01-01" in result
    assert "2023" in result  # Custom format
    assert "not-a-date" in result  # Invalid date passed through
    assert result.strip() != ""  # Empty date case


@pytest.mark.asyncio
async def test_size_helper_edge_cases(custom_template_loader, temp_template_dir):
    """Test edge cases for the size helper."""
    # Create template file
    size_path = temp_template_dir / "size_edge.hbs"
    size_path.write_text(
        "{{size list}} {{size string}} {{size dict}} {{size null}} {{size}}",
        encoding="utf-8",
    )

    # Test with various values
    result = await custom_template_loader.render(
        "size_edge.hbs",
        {
            "list": [1, 2, 3, 4, 5],
            "string": "hello",
            "dict": {"a": 1, "b": 2, "c": 3},
            "null": None,
        },
    )

    assert "5" in result  # List size
    assert "hello".find("5") == -1  # String size should be 5
    assert "3" in result  # Dict size
    assert "0" in result  # Null size
    assert result.count("0") >= 2  # At least two zeros (null and empty args)


@pytest.mark.asyncio
async def test_math_helper(custom_template_loader, temp_template_dir):
    """Test the math helper for basic arithmetic."""
    # Create template file
    math_path = temp_template_dir / "math.hbs"
    math_path.write_text(
        "{{math 5 '+' 3}} {{math 10 '-' 4}} {{math 6 '*' 7}} {{math 20 '/' 5}}",
        encoding="utf-8",
    )

    # Test basic operations
    result = await custom_template_loader.render("math.hbs", {})
    assert "8" in result  # Addition
    assert "6" in result  # Subtraction
    assert "42" in result  # Multiplication
    assert "4" in result  # Division

    # Test with invalid operator
    invalid_op_path = temp_template_dir / "math_invalid_op.hbs"
    invalid_op_path.write_text("{{math 5 'invalid' 3}}", encoding="utf-8")
    result = await custom_template_loader.render("math_invalid_op.hbs", {})
    assert "Unsupported operator" in result

    # Test with invalid numeric values
    invalid_num_path = temp_template_dir / "math_invalid_num.hbs"
    invalid_num_path.write_text("{{math 'not-a-number' '+' 3}}", encoding="utf-8")
    result = await custom_template_loader.render("math_invalid_num.hbs", {})
    assert "Math error" in result

    # Test with insufficient arguments
    insufficient_path = temp_template_dir / "math_insufficient.hbs"
    insufficient_path.write_text("{{math 5 '+'}}", encoding="utf-8")
    result = await custom_template_loader.render("math_insufficient.hbs", {})
    assert "Insufficient arguments" in result


@pytest.mark.asyncio
async def test_if_cond_helper(custom_template_loader, temp_template_dir):
    """Test the if_cond helper for conditionals."""
    # Create template file with true condition
    if_true_path = temp_template_dir / "if_true.hbs"
    if_true_path.write_text(
        "{{#if_cond (lt 5 10)}}True condition{{else}}False condition{{/if_cond}}",
        encoding="utf-8",
    )

    # Create template file with false condition
    if_false_path = temp_template_dir / "if_false.hbs"
    if_false_path.write_text(
        "{{#if_cond (lt 15 10)}}True condition{{else}}False condition{{/if_cond}}",
        encoding="utf-8",
    )

    # Test true condition
    result = await custom_template_loader.render("if_true.hbs", {})
    assert result == "True condition"

    # Test false condition
    result = await custom_template_loader.render("if_false.hbs", {})
    assert result == "False condition"


@pytest.mark.asyncio
async def test_lt_helper_edge_cases(custom_template_loader, temp_template_dir):
    """Test edge cases for the lt (less than) helper."""
    # Create template file
    lt_path = temp_template_dir / "lt_edge.hbs"
    lt_path.write_text(
        "{{#if_cond (lt 'a' 'b')}}String LT True{{else}}String LT False{{/if_cond}} "
        "{{#if_cond (lt 'z' 'a')}}String LT2 True{{else}}String LT2 False{{/if_cond}} "
        "{{#if_cond (lt)}}Missing args True{{else}}Missing args False{{/if_cond}}",
        encoding="utf-8",
    )

    # Test with string values and missing args
    result = await custom_template_loader.render("lt_edge.hbs", {})
    assert "String LT True" in result  # 'a' < 'b' is true
    assert "String LT2 False" in result  # 'z' < 'a' is false
    assert "Missing args False" in result  # Missing args should return false


@pytest.mark.asyncio
async def test_dedent_helper_edge_case(custom_template_loader, temp_template_dir):
    """Test an edge case for the dedent helper."""
    # Create template with empty dedent block
    empty_dedent_path = temp_template_dir / "empty_dedent.hbs"
    empty_dedent_path.write_text("{{#dedent}}{{/dedent}}", encoding="utf-8")

    # Test empty block
    result = await custom_template_loader.render("empty_dedent.hbs", {})
    assert result == ""

    # Test with complex content including lists
    complex_dedent_path = temp_template_dir / "complex_dedent.hbs"
    complex_dedent_path.write_text(
        "{{#dedent}}\n    {{#each items}}\n        - {{this}}\n    {{/each}}\n{{/dedent}}",
        encoding="utf-8",
    )

    result = await custom_template_loader.render("complex_dedent.hbs", {"items": [1, 2, 3]})
    assert "- 1" in result
    assert "- 2" in result
    assert "- 3" in result

```

--------------------------------------------------------------------------------
/src/basic_memory/markdown/plugins.py:
--------------------------------------------------------------------------------

```python
"""Markdown-it plugins for Basic Memory markdown parsing."""

from typing import List, Any, Dict
from markdown_it import MarkdownIt
from markdown_it.token import Token


# Observation handling functions
def is_observation(token: Token) -> bool:
    """Check if token looks like our observation format."""
    import re

    if token.type != "inline":  # pragma: no cover
        return False
    # Use token.tag which contains the actual content for test tokens, fallback to content
    content = (token.tag or token.content).strip()
    if not content:  # pragma: no cover
        return False
    # if it's a markdown_task, return false
    if content.startswith("[ ]") or content.startswith("[x]") or content.startswith("[-]"):
        return False

    # Exclude markdown links: [text](url)
    if re.match(r"^\[.*?\]\(.*?\)$", content):
        return False

    # Exclude wiki links: [[text]]
    if re.match(r"^\[\[.*?\]\]$", content):
        return False

    # Check for proper observation format: [category] content
    match = re.match(r"^\[([^\[\]()]+)\]\s+(.+)", content)
    has_tags = "#" in content
    return bool(match) or has_tags


def parse_observation(token: Token) -> Dict[str, Any]:
    """Extract observation parts from token."""
    import re

    # Use token.tag which contains the actual content for test tokens, fallback to content
    content = (token.tag or token.content).strip()

    # Parse [category] with regex
    match = re.match(r"^\[([^\[\]()]+)\]\s+(.+)", content)
    category = None
    if match:
        category = match.group(1).strip()
        content = match.group(2).strip()
    else:
        # Handle empty brackets [] followed by content
        empty_match = re.match(r"^\[\]\s+(.+)", content)
        if empty_match:
            content = empty_match.group(1).strip()

    # Parse (context)
    context = None
    if content.endswith(")"):
        start = content.rfind("(")
        if start != -1:
            context = content[start + 1 : -1].strip()
            content = content[:start].strip()

    # Extract tags and keep original content
    tags = []
    parts = content.split()
    for part in parts:
        if part.startswith("#"):
            if "#" in part[1:]:
                subtags = [t for t in part.split("#") if t]
                tags.extend(subtags)
            else:
                tags.append(part[1:])

    return {
        "category": category,
        "content": content,
        "tags": tags if tags else None,
        "context": context,
    }


# Relation handling functions
def is_explicit_relation(token: Token) -> bool:
    """Check if token looks like our relation format."""
    if token.type != "inline":  # pragma: no cover
        return False

    # Use token.tag which contains the actual content for test tokens, fallback to content
    content = (token.tag or token.content).strip()
    return "[[" in content and "]]" in content


def parse_relation(token: Token) -> Dict[str, Any] | None:
    """Extract relation parts from token."""
    # Remove bullet point if present
    # Use token.tag which contains the actual content for test tokens, fallback to content
    content = (token.tag or token.content).strip()

    # Extract [[target]]
    target = None
    rel_type = "relates_to"  # default
    context = None

    start = content.find("[[")
    end = content.find("]]")

    if start != -1 and end != -1:
        # Get text before link as relation type
        before = content[:start].strip()
        if before:
            rel_type = before

        # Get target
        target = content[start + 2 : end].strip()

        # Look for context after
        after = content[end + 2 :].strip()
        if after.startswith("(") and after.endswith(")"):
            context = after[1:-1].strip() or None

    if not target:  # pragma: no cover
        return None

    return {"type": rel_type, "target": target, "context": context}


def parse_inline_relations(content: str) -> List[Dict[str, Any]]:
    """Find wiki-style links in regular content."""
    relations = []
    start = 0

    while True:
        # Find next outer-most [[
        start = content.find("[[", start)
        if start == -1:  # pragma: no cover
            break

        # Find matching ]]
        depth = 1
        pos = start + 2
        end = -1

        while pos < len(content):
            if content[pos : pos + 2] == "[[":
                depth += 1
                pos += 2
            elif content[pos : pos + 2] == "]]":
                depth -= 1
                if depth == 0:
                    end = pos
                    break
                pos += 2
            else:
                pos += 1

        if end == -1:
            # No matching ]] found
            break

        target = content[start + 2 : end].strip()
        if target:
            relations.append({"type": "links to", "target": target, "context": None})

        start = end + 2

    return relations


def observation_plugin(md: MarkdownIt) -> None:
    """Plugin for parsing observation format:
    - [category] Content text #tag1 #tag2 (context)
    - Content text #tag1 (context)  # No category is also valid
    """

    def observation_rule(state: Any) -> None:
        """Process observations in token stream."""
        tokens = state.tokens

        for idx in range(len(tokens)):
            token = tokens[idx]

            # Initialize meta for all tokens
            token.meta = token.meta or {}

            # Parse observations in list items
            if token.type == "inline" and is_observation(token):
                obs = parse_observation(token)
                if obs["content"]:  # Only store if we have content
                    token.meta["observation"] = obs

    # Add the rule after inline processing
    md.core.ruler.after("inline", "observations", observation_rule)


def relation_plugin(md: MarkdownIt) -> None:
    """Plugin for parsing relation formats:

    Explicit relations:
    - relation_type [[target]] (context)

    Implicit relations (links in content):
    Some text with [[target]] reference
    """

    def relation_rule(state: Any) -> None:
        """Process relations in token stream."""
        tokens = state.tokens
        in_list_item = False

        for idx in range(len(tokens)):
            token = tokens[idx]

            # Track list nesting
            if token.type == "list_item_open":
                in_list_item = True
            elif token.type == "list_item_close":
                in_list_item = False

            # Initialize meta for all tokens
            token.meta = token.meta or {}

            # Only process inline tokens
            if token.type == "inline":
                # Check for explicit relations in list items
                if in_list_item and is_explicit_relation(token):
                    rel = parse_relation(token)
                    if rel:
                        token.meta["relations"] = [rel]

                # Always check for inline links in any text
                else:
                    content = token.tag or token.content
                    if "[[" in content:
                        rels = parse_inline_relations(content)
                        if rels:
                            token.meta["relations"] = token.meta.get("relations", []) + rels

    # Add the rule after inline processing
    md.core.ruler.after("inline", "relations", relation_rule)

```

--------------------------------------------------------------------------------
/specs/SPEC-5 CLI Cloud Upload via WebDAV.md:
--------------------------------------------------------------------------------

```markdown
---
title: 'SPEC-5: CLI Cloud Upload via WebDAV'
type: spec
permalink: specs/spec-5-cli-cloud-upload-via-webdav
tags:
- cli
- webdav
- upload
- migration
- poc
---

# SPEC-5: CLI Cloud Upload via WebDAV

## Why

Existing basic-memory users need a simple migration path to basic-memory-cloud. The web UI drag-and-drop approach outlined in GitHub issue #59, while user-friendly, introduces significant complexity for a proof-of-concept:

- Complex web UI components for file upload and progress tracking
- Browser file handling limitations and CORS complexity
- Proxy routing overhead for large file transfers
- Authentication integration across multiple services

A CLI-first approach solves these issues by:

- **Leveraging existing infrastructure**: Both cloud CLI and tenant API already exist with WorkOS JWT authentication
- **Familiar user experience**: Basic-memory users are CLI-comfortable and expect command-line tools
- **Direct connection efficiency**: Bypassing the MCP gateway/proxy for bulk file transfers
- **Rapid implementation**: Building on existing `CLIAuth` and FastAPI foundations

The fundamental problem is migration friction - users have local basic-memory projects but no path to cloud tenants. A simple CLI upload command removes this barrier immediately.

## What

This spec defines a CLI-based project upload system using WebDAV for direct tenant connections.

**Affected Areas:**
- `apps/cloud/src/basic_memory_cloud/cli/main.py` - Add upload command to existing CLI
- `apps/api/src/basic_memory_cloud_api/main.py` - Add WebDAV endpoints to tenant FastAPI
- Authentication flow - Reuse existing WorkOS JWT validation
- File transfer protocol - WebDAV for cross-platform compatibility

**Core Components:**

### CLI Upload Command
```bash
basic-memory-cloud upload <project-path> --tenant-url https://basic-memory-{tenant}.fly.dev
```

### WebDAV Server Endpoints
- `GET/PUT/DELETE /webdav/*` - Standard WebDAV operations on tenant file system
- Authentication via existing JWT validation
- File operations preserve timestamps and directory structure

### Authentication Flow
```
1. User runs `basic-memory-cloud login` (existing)
2. CLI stores WorkOS JWT token (existing)
3. Upload command reads JWT from storage
4. WebDAV requests include JWT in Authorization header
5. Tenant API validates JWT using existing middleware
```

## How (High Level)

### Implementation Strategy

**Phase 1: CLI Command**
- Add `upload` command to existing Typer app
- Reuse `CLIAuth` class for token management
- Implement WebDAV client using `webdavclient3` or similar
- Rich progress bars for transfer feedback

**Phase 2: WebDAV Server**
- Add WebDAV endpoints to existing tenant FastAPI app
- Leverage existing `get_current_user` dependency for authentication
- Map WebDAV operations to tenant file system
- Preserve file modification times using `os.utime()`

**Phase 3: Integration**
- Direct connection bypasses MCP gateway and proxy
- Simple conflict resolution: overwrite existing files
- Error handling: fail fast with clear error messages

### Technical Architecture

```
basic-memory-cloud CLI → WorkOS JWT → Direct WebDAV → Tenant FastAPI
                                                    ↓
                                               Tenant File System
```

**Key Libraries:**
- CLI: `webdavclient3` for WebDAV client operations
- API: `wsgidav` or FastAPI-compatible WebDAV server
- Progress: `rich` library (already imported in CLI)
- Auth: Existing WorkOS JWT infrastructure

### WebDAV Protocol Choice

WebDAV provides:
- **Cross-platform clients**: Native support in most operating systems
- **Standardized protocol**: Well-defined for file operations
- **HTTP-based**: Works with existing FastAPI and JWT auth
- **Library support**: Good Python libraries for both client and server

### POC Constraints

**Simplifications for rapid implementation:**
- **Known tenant URLs**: Assume `https://basic-memory-{tenant}.fly.dev` format
- **Upload only**: No download or bidirectional sync
- **Overwrite conflicts**: No merge or conflict resolution prompting
- **No fallbacks**: Fail fast if WebDAV connection issues occur
- **Direct connection only**: No proxy fallback mechanism

## How to Evaluate

### Success Criteria

**Functional Requirements:**
- [ ] Transfer complete basic-memory project (100+ files) in < 30 seconds
- [ ] Preserve directory structure exactly as in source project
- [ ] Preserve file modification timestamps for proper sync behavior
- [ ] Rich progress bars show real-time transfer status (files/MB transferred)
- [ ] WorkOS JWT authentication validates correctly on WebDAV endpoints
- [ ] Direct tenant connection bypasses MCP gateway successfully

**Quality Requirements:**
- [ ] Clear error messages for authentication failures
- [ ] Graceful handling of network interruptions
- [ ] CLI follows existing command patterns and help text standards
- [ ] WebDAV endpoints integrate cleanly with existing FastAPI app

**Performance Requirements:**
- [ ] File transfer speed > 1MB/s on typical connections
- [ ] Memory usage remains reasonable for large projects
- [ ] No timeout issues with 500+ file projects

### Testing Procedure

**Unit Testing:**
1. CLI command parsing and argument validation
2. WebDAV client connection and authentication
3. File timestamp preservation during transfer
4. JWT token validation on WebDAV endpoints

**Integration Testing:**
1. End-to-end upload of test project
2. Direct tenant connection without proxy
3. File integrity verification after upload
4. Progress tracking accuracy during transfer

**User Experience Testing:**
1. Upload existing basic-memory project from local installation
2. Verify uploaded files appear correctly in cloud tenant
3. Confirm basic-memory database rebuilds properly with uploaded files
4. Test CLI help text and error message clarity

### Validation Commands

**Setup:**
```bash
# Login to WorkOS
basic-memory-cloud login

# Upload project
basic-memory-cloud upload ~/my-notes --tenant-url https://basic-memory-test.fly.dev
```

**Verification:**
```bash
# Check tenant health and file count via API
curl -H "Authorization: Bearer $JWT" https://basic-memory-test.fly.dev/health
curl -H "Authorization: Bearer $JWT" https://basic-memory-test.fly.dev/notes/search
```

### Performance Benchmarks

**Target metrics for 100MB basic-memory project:**
- Transfer time: < 30 seconds
- Memory usage: < 100MB during transfer
- Progress updates: Every 1MB or 10 files
- Authentication time: < 2 seconds

## Observations

- [implementation-speed] CLI approach significantly faster than web UI for POC development #rapid-prototyping
- [user-experience] Basic-memory users already comfortable with CLI tools #user-familiarity
- [architecture-benefit] Direct connection eliminates proxy complexity and latency #performance
- [auth-reuse] Existing WorkOS JWT infrastructure handles authentication cleanly #code-reuse
- [webdav-choice] WebDAV protocol provides cross-platform compatibility and standard libraries #protocol-selection
- [poc-scope] Simple conflict handling and error recovery sufficient for proof-of-concept #scope-management
- [migration-value] Removes primary barrier for local users migrating to cloud platform #business-value

## Relations

- depends_on [[SPEC-1: Specification-Driven Development Process]]
- enables [[GitHub Issue #59: Web UI Upload Feature]]
- uses [[WorkOS Authentication Integration]]
- builds_on [[Existing Cloud CLI Infrastructure]]
- builds_on [[Existing Tenant API Architecture]]
```

--------------------------------------------------------------------------------
/tests/mcp/tools/test_chatgpt_tools.py:
--------------------------------------------------------------------------------

```python
"""Tests for ChatGPT-compatible MCP tools."""

import json
import pytest
from unittest.mock import AsyncMock, patch

from basic_memory.schemas.search import SearchResponse, SearchResult, SearchItemType


@pytest.mark.asyncio
async def test_search_successful_results():
    """Test search with successful results returns proper MCP content array format."""
    # Mock successful search results
    mock_results = SearchResponse(
        results=[
            SearchResult(
                title="Test Document 1",
                permalink="docs/test-doc-1",
                content="This is test content for document 1",
                type=SearchItemType.ENTITY,
                score=1.0,
                file_path="/test/docs/test-doc-1.md",
            ),
            SearchResult(
                title="Test Document 2",
                permalink="docs/test-doc-2",
                content="This is test content for document 2",
                type=SearchItemType.ENTITY,
                score=0.9,
                file_path="/test/docs/test-doc-2.md",
            ),
        ],
        current_page=1,
        page_size=10,
    )

    with patch(
        "basic_memory.mcp.tools.chatgpt_tools.search_notes.fn", new_callable=AsyncMock
    ) as mock_search:
        mock_search.return_value = mock_results

        # Import and call the actual function
        from basic_memory.mcp.tools.chatgpt_tools import search

        result = await search.fn("test query")

        # Verify MCP content array format
        assert isinstance(result, list)
        assert len(result) == 1
        assert result[0]["type"] == "text"

        # Parse the JSON content
        content = json.loads(result[0]["text"])
        assert "results" in content
        assert "query" in content

        # Verify result structure
        assert len(content["results"]) == 2
        assert content["query"] == "test query"

        # Verify individual result format
        result_item = content["results"][0]
        assert result_item["id"] == "docs/test-doc-1"
        assert result_item["title"] == "Test Document 1"
        assert result_item["url"] == "docs/test-doc-1"


@pytest.mark.asyncio
async def test_search_with_error_response():
    """Test search when underlying search_notes returns error string."""
    error_message = "# Search Failed - Invalid Syntax\n\nThe search query contains errors..."

    with patch(
        "basic_memory.mcp.tools.chatgpt_tools.search_notes.fn", new_callable=AsyncMock
    ) as mock_search:
        mock_search.return_value = error_message

        from basic_memory.mcp.tools.chatgpt_tools import search

        result = await search.fn("invalid query")

        # Verify MCP content array format
        assert isinstance(result, list)
        assert len(result) == 1
        assert result[0]["type"] == "text"

        # Parse the JSON content
        content = json.loads(result[0]["text"])
        assert content["results"] == []
        assert content["error"] == "Search failed"
        assert "error_details" in content


@pytest.mark.asyncio
async def test_fetch_successful_document():
    """Test fetch with successful document retrieval."""
    document_content = """# Test Document

This is the content of a test document.

## Section 1
Some content here.

## Observations
- [observation] This is a test observation

## Relations
- relates_to [[Another Document]]
"""

    with patch(
        "basic_memory.mcp.tools.chatgpt_tools.read_note.fn", new_callable=AsyncMock
    ) as mock_read:
        mock_read.return_value = document_content

        from basic_memory.mcp.tools.chatgpt_tools import fetch

        result = await fetch.fn("docs/test-document")

        # Verify MCP content array format
        assert isinstance(result, list)
        assert len(result) == 1
        assert result[0]["type"] == "text"

        # Parse the JSON content
        content = json.loads(result[0]["text"])
        assert content["id"] == "docs/test-document"
        assert content["title"] == "Test Document"  # Extracted from markdown
        assert content["text"] == document_content
        assert content["url"] == "docs/test-document"
        assert content["metadata"]["format"] == "markdown"


@pytest.mark.asyncio
async def test_fetch_document_not_found():
    """Test fetch when document is not found."""
    error_content = """# Note Not Found: "nonexistent-doc"

I couldn't find any notes matching "nonexistent-doc". Here are some suggestions:

## Check Identifier Type
- If you provided a title, try using the exact permalink instead
"""

    with patch(
        "basic_memory.mcp.tools.chatgpt_tools.read_note.fn", new_callable=AsyncMock
    ) as mock_read:
        mock_read.return_value = error_content

        from basic_memory.mcp.tools.chatgpt_tools import fetch

        result = await fetch.fn("nonexistent-doc")

        # Verify MCP content array format
        assert isinstance(result, list)
        assert len(result) == 1
        assert result[0]["type"] == "text"

        # Parse the JSON content
        content = json.loads(result[0]["text"])
        assert content["id"] == "nonexistent-doc"
        assert content["text"] == error_content
        assert content["metadata"]["error"] == "Document not found"


def test_format_search_results_for_chatgpt():
    """Test search results formatting."""
    from basic_memory.mcp.tools.chatgpt_tools import _format_search_results_for_chatgpt

    mock_results = SearchResponse(
        results=[
            SearchResult(
                title="Document One",
                permalink="docs/doc-one",
                content="Content for document one",
                type=SearchItemType.ENTITY,
                score=1.0,
                file_path="/test/docs/doc-one.md",
            ),
            SearchResult(
                title="",  # Test empty title handling
                permalink="docs/untitled",
                content="Content without title",
                type=SearchItemType.ENTITY,
                score=0.8,
                file_path="/test/docs/untitled.md",
            ),
        ],
        current_page=1,
        page_size=10,
    )

    formatted = _format_search_results_for_chatgpt(mock_results)

    assert len(formatted) == 2
    assert formatted[0]["id"] == "docs/doc-one"
    assert formatted[0]["title"] == "Document One"
    assert formatted[0]["url"] == "docs/doc-one"

    # Test empty title handling
    assert formatted[1]["title"] == "Untitled"


def test_format_document_for_chatgpt():
    """Test document formatting."""
    from basic_memory.mcp.tools.chatgpt_tools import _format_document_for_chatgpt

    content = "# Test Document\n\nThis is test content."
    result = _format_document_for_chatgpt(content, "docs/test")

    assert result["id"] == "docs/test"
    assert result["title"] == "Test Document"
    assert result["text"] == content
    assert result["url"] == "docs/test"
    assert result["metadata"]["format"] == "markdown"


def test_format_document_error_handling():
    """Test document formatting with error content."""
    from basic_memory.mcp.tools.chatgpt_tools import _format_document_for_chatgpt

    error_content = '# Note Not Found: "missing-doc"\n\nDocument not found.'
    result = _format_document_for_chatgpt(error_content, "missing-doc", "Missing Doc")

    assert result["id"] == "missing-doc"
    assert result["title"] == "Missing Doc"
    assert result["text"] == error_content
    assert result["metadata"]["error"] == "Document not found"

```

--------------------------------------------------------------------------------
/src/basic_memory/importers/chatgpt_importer.py:
--------------------------------------------------------------------------------

```python
"""ChatGPT import service for Basic Memory."""

import logging
from datetime import datetime
from typing import Any, Dict, List, Optional, Set

from basic_memory.markdown.schemas import EntityFrontmatter, EntityMarkdown
from basic_memory.importers.base import Importer
from basic_memory.schemas.importer import ChatImportResult
from basic_memory.importers.utils import clean_filename, format_timestamp

logger = logging.getLogger(__name__)


class ChatGPTImporter(Importer[ChatImportResult]):
    """Service for importing ChatGPT conversations."""

    async def import_data(
        self, source_data, destination_folder: str, **kwargs: Any
    ) -> ChatImportResult:
        """Import conversations from ChatGPT JSON export.

        Args:
            source_path: Path to the ChatGPT conversations.json file.
            destination_folder: Destination folder within the project.
            **kwargs: Additional keyword arguments.

        Returns:
            ChatImportResult containing statistics and status of the import.
        """
        try:  # pragma: no cover
            # Ensure the destination folder exists
            self.ensure_folder_exists(destination_folder)
            conversations = source_data

            # Process each conversation
            messages_imported = 0
            chats_imported = 0

            for chat in conversations:
                # Convert to entity
                entity = self._format_chat_content(destination_folder, chat)

                # Write file
                file_path = self.base_path / f"{entity.frontmatter.metadata['permalink']}.md"
                await self.write_entity(entity, file_path)

                # Count messages
                msg_count = sum(
                    1
                    for node in chat["mapping"].values()
                    if node.get("message")
                    and not node.get("message", {})
                    .get("metadata", {})
                    .get("is_visually_hidden_from_conversation")
                )

                chats_imported += 1
                messages_imported += msg_count

            return ChatImportResult(
                import_count={"conversations": chats_imported, "messages": messages_imported},
                success=True,
                conversations=chats_imported,
                messages=messages_imported,
            )

        except Exception as e:  # pragma: no cover
            logger.exception("Failed to import ChatGPT conversations")
            return self.handle_error("Failed to import ChatGPT conversations", e)  # pyright: ignore [reportReturnType]

    def _format_chat_content(
        self, folder: str, conversation: Dict[str, Any]
    ) -> EntityMarkdown:  # pragma: no cover
        """Convert chat conversation to Basic Memory entity.

        Args:
            folder: Destination folder name.
            conversation: ChatGPT conversation data.

        Returns:
            EntityMarkdown instance representing the conversation.
        """
        # Extract timestamps
        created_at = conversation["create_time"]
        modified_at = conversation["update_time"]

        root_id = None
        # Find root message
        for node_id, node in conversation["mapping"].items():
            if node.get("parent") is None:
                root_id = node_id
                break

        # Generate permalink
        date_prefix = datetime.fromtimestamp(created_at).astimezone().strftime("%Y%m%d")
        clean_title = clean_filename(conversation["title"])

        # Format content
        content = self._format_chat_markdown(
            title=conversation["title"],
            mapping=conversation["mapping"],
            root_id=root_id,
            created_at=created_at,
            modified_at=modified_at,
        )

        # Create entity
        entity = EntityMarkdown(
            frontmatter=EntityFrontmatter(
                metadata={
                    "type": "conversation",
                    "title": conversation["title"],
                    "created": format_timestamp(created_at),
                    "modified": format_timestamp(modified_at),
                    "permalink": f"{folder}/{date_prefix}-{clean_title}",
                }
            ),
            content=content,
        )

        return entity

    def _format_chat_markdown(
        self,
        title: str,
        mapping: Dict[str, Any],
        root_id: Optional[str],
        created_at: float,
        modified_at: float,
    ) -> str:  # pragma: no cover
        """Format chat as clean markdown.

        Args:
            title: Chat title.
            mapping: Message mapping.
            root_id: Root message ID.
            created_at: Creation timestamp.
            modified_at: Modification timestamp.

        Returns:
            Formatted markdown content.
        """
        # Start with title
        lines = [f"# {title}\n"]

        # Traverse message tree
        seen_msgs: Set[str] = set()
        messages = self._traverse_messages(mapping, root_id, seen_msgs)

        # Format each message
        for msg in messages:
            # Skip hidden messages
            if msg.get("metadata", {}).get("is_visually_hidden_from_conversation"):
                continue

            # Get author and timestamp
            author = msg["author"]["role"].title()
            ts = format_timestamp(msg["create_time"]) if msg.get("create_time") else ""

            # Add message header
            lines.append(f"### {author} ({ts})")

            # Add message content
            content = self._get_message_content(msg)
            if content:
                lines.append(content)

            # Add spacing
            lines.append("")

        return "\n".join(lines)

    def _get_message_content(self, message: Dict[str, Any]) -> str:  # pragma: no cover
        """Extract clean message content.

        Args:
            message: Message data.

        Returns:
            Cleaned message content.
        """
        if not message or "content" not in message:
            return ""

        content = message["content"]
        if content.get("content_type") == "text":
            return "\n".join(content.get("parts", []))
        elif content.get("content_type") == "code":
            return f"```{content.get('language', '')}\n{content.get('text', '')}\n```"
        return ""

    def _traverse_messages(
        self, mapping: Dict[str, Any], root_id: Optional[str], seen: Set[str]
    ) -> List[Dict[str, Any]]:  # pragma: no cover
        """Traverse message tree iteratively to handle deep conversations.

        Args:
            mapping: Message mapping.
            root_id: Root message ID.
            seen: Set of seen message IDs.

        Returns:
            List of message data.
        """
        messages = []
        if not root_id:
            return messages

        # Use iterative approach with stack to avoid recursion depth issues
        stack = [root_id]

        while stack:
            node_id = stack.pop()
            if not node_id:
                continue

            node = mapping.get(node_id)
            if not node:
                continue

            # Process current node if it has a message and hasn't been seen
            if node["id"] not in seen and node.get("message"):
                seen.add(node["id"])
                messages.append(node["message"])

            # Add children to stack in reverse order to maintain conversation flow
            children = node.get("children", [])
            for child_id in reversed(children):
                stack.append(child_id)

        return messages

```

--------------------------------------------------------------------------------
/v15-docs/basic-memory-home.md:
--------------------------------------------------------------------------------

```markdown
# BASIC_MEMORY_HOME Environment Variable

**Status**: Existing (clarified in v0.15.0)
**Related**: project-root-env-var.md

## What It Is

`BASIC_MEMORY_HOME` specifies the location of your **default "main" project**. This is the primary directory where Basic Memory stores knowledge files when no other project is specified.

## Quick Reference

```bash
# Default (if not set)
~/basic-memory

# Custom location
export BASIC_MEMORY_HOME=/Users/you/Documents/knowledge-base
```

## How It Works

### Default Project Location

When Basic Memory initializes, it creates a "main" project:

```python
# Without BASIC_MEMORY_HOME
projects = {
    "main": "~/basic-memory"  # Default
}

# With BASIC_MEMORY_HOME set
export BASIC_MEMORY_HOME=/Users/you/custom-location
projects = {
    "main": "/Users/you/custom-location"  # Uses env var
}
```

### Only Affects "main" Project

**Important:** `BASIC_MEMORY_HOME` ONLY sets the path for the "main" project. Other projects are unaffected.

```bash
export BASIC_MEMORY_HOME=/Users/you/my-knowledge

# config.json will have:
{
  "projects": {
    "main": "/Users/you/my-knowledge",    # ← From BASIC_MEMORY_HOME
    "work": "/Users/you/work-notes",       # ← Independently configured
    "personal": "/Users/you/personal-kb"   # ← Independently configured
  }
}
```

## Relationship with BASIC_MEMORY_PROJECT_ROOT

These are **separate** environment variables with **different purposes**:

| Variable | Purpose | Scope | Default |
|----------|---------|-------|---------|
| `BASIC_MEMORY_HOME` | Where "main" project lives | Single project | `~/basic-memory` |
| `BASIC_MEMORY_PROJECT_ROOT` | Security boundary for ALL projects | All projects | None (unrestricted) |

### Using Together

```bash
# Common containerized setup
export BASIC_MEMORY_HOME=/app/data/basic-memory          # Main project location
export BASIC_MEMORY_PROJECT_ROOT=/app/data               # All projects must be under here
```

**Result:**
- Main project created at `/app/data/basic-memory`
- All other projects must be under `/app/data/`
- Provides both convenience and security

### Comparison Table

| Scenario | BASIC_MEMORY_HOME | BASIC_MEMORY_PROJECT_ROOT | Result |
|----------|-------------------|---------------------------|---------|
| **Default** | Not set | Not set | Main at `~/basic-memory`, projects anywhere |
| **Custom main** | `/Users/you/kb` | Not set | Main at `/Users/you/kb`, projects anywhere |
| **Containerized** | `/app/data/main` | `/app/data` | Main at `/app/data/main`, all projects under `/app/data/` |
| **Secure SaaS** | `/app/tenant-123/main` | `/app/tenant-123` | Main at `/app/tenant-123/main`, tenant isolated |

## Use Cases

### Personal Setup (Default)

```bash
# Use default location
# BASIC_MEMORY_HOME not set

# Main project created at:
~/basic-memory/
```

### Custom Location

```bash
# Store in Documents folder
export BASIC_MEMORY_HOME=~/Documents/BasicMemory

# Main project created at:
~/Documents/BasicMemory/
```

### Synchronized Cloud Folder

```bash
# Store in Dropbox/iCloud
export BASIC_MEMORY_HOME=~/Dropbox/BasicMemory

# Main project syncs via Dropbox:
~/Dropbox/BasicMemory/
```

### Docker Deployment

```bash
# Mount volume for persistence
docker run \
  -e BASIC_MEMORY_HOME=/app/data/basic-memory \
  -v $(pwd)/data:/app/data \
  basic-memory:latest

# Main project persists at:
./data/basic-memory/  # (host)
/app/data/basic-memory/  # (container)
```

### Multi-User System

```bash
# Per-user isolation
export BASIC_MEMORY_HOME=/home/$USER/basic-memory

# Alice's main project:
/home/alice/basic-memory/

# Bob's main project:
/home/bob/basic-memory/
```

## Configuration Examples

### Basic Setup

```bash
# .bashrc or .zshrc
export BASIC_MEMORY_HOME=~/Documents/knowledge
```

### Docker Compose

```yaml
services:
  basic-memory:
    environment:
      BASIC_MEMORY_HOME: /app/data/basic-memory
    volumes:
      - ./data:/app/data
```

### Kubernetes

```yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: basic-memory-config
data:
  BASIC_MEMORY_HOME: "/app/data/basic-memory"
---
apiVersion: v1
kind: Pod
spec:
  containers:
  - name: basic-memory
    envFrom:
    - configMapRef:
        name: basic-memory-config
```

### systemd Service

```ini
[Service]
Environment="BASIC_MEMORY_HOME=/var/lib/basic-memory"
ExecStart=/usr/local/bin/basic-memory serve
```

## Migration

### Changing BASIC_MEMORY_HOME

If you need to change the location:

**Option 1: Move files**
```bash
# Stop services
bm sync --stop

# Move data
mv ~/basic-memory ~/Documents/knowledge

# Update environment
export BASIC_MEMORY_HOME=~/Documents/knowledge

# Restart
bm sync
```

**Option 2: Copy and sync**
```bash
# Copy to new location
cp -r ~/basic-memory ~/Documents/knowledge

# Update environment
export BASIC_MEMORY_HOME=~/Documents/knowledge

# Verify
bm status

# Remove old location once verified
rm -rf ~/basic-memory
```

### From v0.14.x

No changes needed - `BASIC_MEMORY_HOME` works the same way:

```bash
# v0.14.x and v0.15.0+ both use:
export BASIC_MEMORY_HOME=~/my-knowledge
```

## Common Patterns

### Development vs Production

```bash
# Development (.bashrc)
export BASIC_MEMORY_HOME=~/dev/basic-memory-dev

# Production (systemd/docker)
export BASIC_MEMORY_HOME=/var/lib/basic-memory
```

### Shared Team Setup

```bash
# Shared network drive
export BASIC_MEMORY_HOME=/mnt/shared/team-knowledge

# Note: Use with caution, consider file locking
```

### Backup Strategy

```bash
# Primary location
export BASIC_MEMORY_HOME=~/basic-memory

# Automated backup script
rsync -av ~/basic-memory/ ~/Backups/basic-memory-$(date +%Y%m%d)/
```

## Verification

### Check Current Value

```bash
# View environment variable
echo $BASIC_MEMORY_HOME

# View resolved config
bm project list
# Shows actual path for "main" project
```

### Verify Main Project Location

```python
from basic_memory.config import ConfigManager

config = ConfigManager().config
print(config.projects["main"])
# Shows where "main" project is located
```

## Troubleshooting

### Main Project Not at Expected Location

**Problem:** Files not where you expect

**Check:**
```bash
# What's the environment variable?
echo $BASIC_MEMORY_HOME

# Where is main project actually?
bm project list | grep main
```

**Solution:** Set environment variable and restart

### Permission Errors

**Problem:** Can't write to BASIC_MEMORY_HOME location

```bash
$ bm sync
Error: Permission denied: /var/lib/basic-memory
```

**Solution:**
```bash
# Fix permissions
sudo chown -R $USER:$USER /var/lib/basic-memory

# Or use accessible location
export BASIC_MEMORY_HOME=~/basic-memory
```

### Conflicts with PROJECT_ROOT

**Problem:** BASIC_MEMORY_HOME outside PROJECT_ROOT

```bash
export BASIC_MEMORY_HOME=/Users/you/kb
export BASIC_MEMORY_PROJECT_ROOT=/app/data

# Error: /Users/you/kb not under /app/data
```

**Solution:** Align both variables
```bash
export BASIC_MEMORY_HOME=/app/data/basic-memory
export BASIC_MEMORY_PROJECT_ROOT=/app/data
```

## Best Practices

1. **Use absolute paths:**
   ```bash
   export BASIC_MEMORY_HOME=/Users/you/knowledge  # ✓
   # not: export BASIC_MEMORY_HOME=~/knowledge    # ✗ (may not expand)
   ```

2. **Document the location:**
   - Add comment in shell config
   - Document for team if shared

3. **Backup regularly:**
   - Main project contains your primary knowledge
   - Automate backups of this directory

4. **Consider PROJECT_ROOT for security:**
   - Use both together in production/containers

5. **Test changes:**
   - Verify with `bm project list` after changing

## See Also

- `project-root-env-var.md` - Security constraints for all projects
- `env-var-overrides.md` - Environment variable precedence
- Project management documentation

```

--------------------------------------------------------------------------------
/src/basic_memory/ignore_utils.py:
--------------------------------------------------------------------------------

```python
"""Utilities for handling .gitignore patterns and file filtering."""

import fnmatch
from pathlib import Path
from typing import Set


# Common directories and patterns to ignore by default
# These are used as fallback if .bmignore doesn't exist
DEFAULT_IGNORE_PATTERNS = {
    # Hidden files (files starting with dot)
    ".*",
    # Basic Memory internal files
    "*.db",
    "*.db-shm",
    "*.db-wal",
    "config.json",
    # Version control
    ".git",
    ".svn",
    # Python
    "__pycache__",
    "*.pyc",
    "*.pyo",
    "*.pyd",
    ".pytest_cache",
    ".coverage",
    "*.egg-info",
    ".tox",
    ".mypy_cache",
    ".ruff_cache",
    # Virtual environments
    ".venv",
    "venv",
    "env",
    ".env",
    # Node.js
    "node_modules",
    # Build artifacts
    "build",
    "dist",
    ".cache",
    # IDE
    ".idea",
    ".vscode",
    # OS files
    ".DS_Store",
    "Thumbs.db",
    "desktop.ini",
    # Obsidian
    ".obsidian",
    # Temporary files
    "*.tmp",
    "*.swp",
    "*.swo",
    "*~",
}


def get_bmignore_path() -> Path:
    """Get path to .bmignore file.

    Returns:
        Path to ~/.basic-memory/.bmignore
    """
    return Path.home() / ".basic-memory" / ".bmignore"


def create_default_bmignore() -> None:
    """Create default .bmignore file if it doesn't exist.

    This ensures users have a file they can customize for all Basic Memory operations.
    """
    bmignore_path = get_bmignore_path()

    if bmignore_path.exists():
        return

    bmignore_path.parent.mkdir(parents=True, exist_ok=True)
    bmignore_path.write_text("""# Basic Memory Ignore Patterns
# This file is used by both 'bm cloud upload', 'bm cloud bisync', and file sync
# Patterns use standard gitignore-style syntax

# Hidden files (files starting with dot)
.*

# Basic Memory internal files (includes test databases)
*.db
*.db-shm
*.db-wal
config.json

# Version control
.git
.svn

# Python
__pycache__
*.pyc
*.pyo
*.pyd
.pytest_cache
.coverage
*.egg-info
.tox
.mypy_cache
.ruff_cache

# Virtual environments
.venv
venv
env
.env

# Node.js
node_modules

# Build artifacts
build
dist
.cache

# IDE
.idea
.vscode

# OS files
.DS_Store
Thumbs.db
desktop.ini

# Obsidian
.obsidian

# Temporary files
*.tmp
*.swp
*.swo
*~
""")


def load_bmignore_patterns() -> Set[str]:
    """Load patterns from .bmignore file.

    Returns:
        Set of patterns from .bmignore, or DEFAULT_IGNORE_PATTERNS if file doesn't exist
    """
    bmignore_path = get_bmignore_path()

    # Create default file if it doesn't exist
    if not bmignore_path.exists():
        create_default_bmignore()

    patterns = set()

    try:
        with bmignore_path.open("r", encoding="utf-8") as f:
            for line in f:
                line = line.strip()
                # Skip empty lines and comments
                if line and not line.startswith("#"):
                    patterns.add(line)
    except Exception:
        # If we can't read .bmignore, fall back to defaults
        return set(DEFAULT_IGNORE_PATTERNS)

    # If no patterns were loaded, use defaults
    if not patterns:
        return set(DEFAULT_IGNORE_PATTERNS)

    return patterns


def load_gitignore_patterns(base_path: Path, use_gitignore: bool = True) -> Set[str]:
    """Load gitignore patterns from .gitignore file and .bmignore.

    Combines patterns from:
    1. ~/.basic-memory/.bmignore (user's global ignore patterns)
    2. {base_path}/.gitignore (project-specific patterns, if use_gitignore=True)

    Args:
        base_path: The base directory to search for .gitignore file
        use_gitignore: If False, only load patterns from .bmignore (default: True)

    Returns:
        Set of patterns to ignore
    """
    # Start with patterns from .bmignore
    patterns = load_bmignore_patterns()

    if use_gitignore:
        gitignore_file = base_path / ".gitignore"
        if gitignore_file.exists():
            try:
                with gitignore_file.open("r", encoding="utf-8") as f:
                    for line in f:
                        line = line.strip()
                        # Skip empty lines and comments
                        if line and not line.startswith("#"):
                            patterns.add(line)
            except Exception:
                # If we can't read .gitignore, just use default patterns
                pass

    return patterns


def should_ignore_path(file_path: Path, base_path: Path, ignore_patterns: Set[str]) -> bool:
    """Check if a file path should be ignored based on gitignore patterns.

    Args:
        file_path: The file path to check
        base_path: The base directory for relative path calculation
        ignore_patterns: Set of patterns to match against

    Returns:
        True if the path should be ignored, False otherwise
    """
    # Get the relative path from base
    try:
        relative_path = file_path.relative_to(base_path)
        relative_str = str(relative_path)
        relative_posix = relative_path.as_posix()  # Use forward slashes for matching

        # Check each pattern
        for pattern in ignore_patterns:
            # Handle patterns starting with / (root relative)
            if pattern.startswith("/"):
                root_pattern = pattern[1:]  # Remove leading /

                # For directory patterns ending with /
                if root_pattern.endswith("/"):
                    dir_name = root_pattern[:-1]  # Remove trailing /
                    # Check if the first part of the path matches the directory name
                    if len(relative_path.parts) > 0 and relative_path.parts[0] == dir_name:
                        return True
                else:
                    # Regular root-relative pattern
                    if fnmatch.fnmatch(relative_posix, root_pattern):
                        return True
                continue

            # Handle directory patterns (ending with /)
            if pattern.endswith("/"):
                dir_name = pattern[:-1]  # Remove trailing /
                # Check if any path part matches the directory name
                if dir_name in relative_path.parts:
                    return True
                continue

            # Direct name match (e.g., ".git", "node_modules")
            if pattern in relative_path.parts:
                return True

            # Check if any individual path part matches the glob pattern
            # This handles cases like ".*" matching ".hidden.md" in "concept/.hidden.md"
            for part in relative_path.parts:
                if fnmatch.fnmatch(part, pattern):
                    return True

            # Glob pattern match on full path
            if fnmatch.fnmatch(relative_posix, pattern) or fnmatch.fnmatch(relative_str, pattern):
                return True

        return False
    except ValueError:
        # If we can't get relative path, don't ignore
        return False


def filter_files(
    files: list[Path], base_path: Path, ignore_patterns: Set[str] | None = None
) -> tuple[list[Path], int]:
    """Filter a list of files based on gitignore patterns.

    Args:
        files: List of file paths to filter
        base_path: The base directory for relative path calculation
        ignore_patterns: Set of patterns to ignore. If None, loads from .gitignore

    Returns:
        Tuple of (filtered_files, ignored_count)
    """
    if ignore_patterns is None:
        ignore_patterns = load_gitignore_patterns(base_path)

    filtered_files = []
    ignored_count = 0

    for file_path in files:
        if should_ignore_path(file_path, base_path, ignore_patterns):
            ignored_count += 1
        else:
            filtered_files.append(file_path)

    return filtered_files, ignored_count

```

--------------------------------------------------------------------------------
/v15-docs/cloud-authentication.md:
--------------------------------------------------------------------------------

```markdown
# Cloud Authentication (SPEC-13)

**Status**: New Feature
**PR**: #327
**Requires**: Active Basic Memory subscription

## What's New

v0.15.0 introduces **JWT-based cloud authentication** with automatic subscription validation. This enables secure access to Basic Memory Cloud features including bidirectional sync, cloud storage, and multi-device access.

## Quick Start

### Login to Cloud

```bash
# Authenticate with Basic Memory Cloud
bm cloud login

# Opens browser for OAuth flow
# Validates subscription status
# Stores JWT token locally
```

### Check Authentication Status

```bash
# View current authentication status
bm cloud status
```

### Logout

```bash
# Clear authentication session
bm cloud logout
```

## How It Works

### Authentication Flow

1. **Initiate Login**: `bm cloud login`
2. **Browser Opens**: OAuth 2.1 flow with PKCE
3. **Authorize**: Login with your Basic Memory account
4. **Subscription Check**: Validates active subscription
5. **Token Storage**: JWT stored in `~/.basic-memory/cloud-auth.json`
6. **Auto-Refresh**: Token automatically refreshed when needed

### Subscription Validation

All cloud commands validate your subscription status:

**Active Subscription:**
```bash
$ bm cloud sync
✓ Syncing with cloud...
```

**No Active Subscription:**
```bash
$ bm cloud sync
✗ Active subscription required
Subscribe at: https://basicmemory.com/subscribe
```

## Authentication Commands

### bm cloud login

Authenticate with Basic Memory Cloud.

```bash
# Basic login
bm cloud login

# Login opens browser automatically
# Redirects to: https://eloquent-lotus-05.authkit.app/...
```

**What happens:**
- Opens OAuth authorization in browser
- Handles PKCE challenge/response
- Validates subscription
- Stores JWT token
- Displays success message

**Error cases:**
- No subscription: Shows subscribe URL
- Network error: Retries with exponential backoff
- Invalid credentials: Prompts to try again

### bm cloud logout

Clear authentication session.

```bash
bm cloud logout
```

**What happens:**
- Removes `~/.basic-memory/cloud-auth.json`
- Clears cached credentials
- Requires re-authentication for cloud commands

### bm cloud status

View authentication and sync status.

```bash
bm cloud status
```

**Shows:**
- Authentication status (logged in/out)
- Subscription status (active/expired)
- Last sync time
- Cloud project count
- Tenant information

## Token Management

### Automatic Token Refresh

The CLI automatically handles token refresh:

```python
# Internal - happens automatically
async def get_authenticated_headers():
    # Checks token expiration
    # Refreshes if needed
    # Returns valid Bearer token
    return {"Authorization": f"Bearer {token}"}
```

### Token Storage

Location: `~/.basic-memory/cloud-auth.json`

```json
{
  "access_token": "eyJ0eXAiOiJKV1QiLCJhbGc...",
  "refresh_token": "eyJ0eXAiOiJKV1QiLCJhbGc...",
  "expires_at": 1234567890,
  "tenant_id": "org_abc123"
}
```

**Security:**
- File permissions: 600 (user read/write only)
- Tokens expire after 1 hour
- Refresh tokens valid for 30 days
- Never commit this file to git

### Manual Token Revocation

To revoke access:
1. `bm cloud logout` (clears local token)
2. Visit account settings to revoke all sessions

## Subscription Management

### Check Subscription Status

```bash
# View current subscription
bm cloud status

# Shows:
# - Subscription tier
# - Expiration date
# - Features enabled
```

### Subscribe

If you don't have a subscription:

```bash
# Displays subscribe URL
bm cloud login
# > Active subscription required
# > Subscribe at: https://basicmemory.com/subscribe
```

### Subscription Tiers

| Feature | Free | Pro | Team |
|---------|------|-----|------|
| Cloud Authentication | ✓ | ✓ | ✓ |
| Cloud Sync | - | ✓ | ✓ |
| Cloud Storage | - | 10GB | 100GB |
| Multi-device | - | ✓ | ✓ |
| API Access | - | ✓ | ✓ |

## Using Authenticated APIs

### In CLI Commands

Authentication is automatic for all cloud commands:

```bash
# These all use stored JWT automatically
bm cloud sync
bm cloud mount
bm cloud check
bm cloud bisync
```

### In Custom Scripts

```python
from basic_memory.cli.auth import CLIAuth

# Get authenticated headers
client_id, domain, _ = get_cloud_config()
auth = CLIAuth(client_id=client_id, authkit_domain=domain)
token = await auth.get_valid_token()

headers = {"Authorization": f"Bearer {token}"}

# Use with httpx or requests
import httpx
async with httpx.AsyncClient() as client:
    response = await client.get(
        "https://api.basicmemory.cloud/tenant/projects",
        headers=headers
    )
```

### Error Handling

```python
from basic_memory.cli.commands.cloud.api_client import (
    CloudAPIError,
    SubscriptionRequiredError
)

try:
    response = await make_api_request("GET", url)
except SubscriptionRequiredError as e:
    print(f"Subscription required: {e.message}")
    print(f"Subscribe at: {e.subscribe_url}")
except CloudAPIError as e:
    print(f"API error: {e.status_code} - {e.detail}")
```

## OAuth Configuration

### Default Settings

```python
# From config.py
cloud_client_id = "client_01K6KWQPW6J1M8VV7R3TZP5A6M"
cloud_domain = "https://eloquent-lotus-05.authkit.app"
cloud_host = "https://api.basicmemory.cloud"
```

### Custom Configuration

Override via environment variables:

```bash
export BASIC_MEMORY_CLOUD_CLIENT_ID="your_client_id"
export BASIC_MEMORY_CLOUD_DOMAIN="https://your-authkit.app"
export BASIC_MEMORY_CLOUD_HOST="https://your-api.example.com"

bm cloud login
```

Or in `~/.basic-memory/config.json`:

```json
{
  "cloud_client_id": "your_client_id",
  "cloud_domain": "https://your-authkit.app",
  "cloud_host": "https://your-api.example.com"
}
```

## Troubleshooting

### "Not authenticated" Error

```bash
$ bm cloud sync
[red]Not authenticated. Please run 'bm cloud login' first.[/red]
```

**Solution**: Run `bm cloud login`

### Token Expired

```bash
$ bm cloud status
Token expired, refreshing...
✓ Authenticated
```

**Automatic**: Token refresh happens automatically

### Subscription Expired

```bash
$ bm cloud sync
Active subscription required
Subscribe at: https://basicmemory.com/subscribe
```

**Solution**: Renew subscription at provided URL

### Browser Not Opening

```bash
$ bm cloud login
# If browser doesn't open automatically:
# Visit this URL: https://eloquent-lotus-05.authkit.app/...
```

**Manual**: Copy/paste URL into browser

### Network Issues

```bash
$ bm cloud login
Connection error, retrying in 2s...
Connection error, retrying in 4s...
```

**Automatic**: Exponential backoff with retries

## Security Best Practices

1. **Never share tokens**: Keep `cloud-auth.json` private
2. **Use logout**: Always logout on shared machines
3. **Monitor sessions**: Check `bm cloud status` regularly
4. **Revoke access**: Use account settings to revoke compromised tokens
5. **Use HTTPS only**: Cloud commands enforce HTTPS

## Related Commands

- `bm cloud sync` - Bidirectional cloud sync (see `cloud-bisync.md`)
- `bm cloud mount` - Mount cloud storage (see `cloud-mount.md`)
- `bm cloud check` - Verify cloud integrity
- `bm cloud status` - View authentication and sync status

## Technical Details

### JWT Claims

```json
{
  "sub": "user_abc123",
  "org_id": "org_xyz789",
  "tenant_id": "org_xyz789",
  "subscription_status": "active",
  "subscription_tier": "pro",
  "exp": 1234567890,
  "iat": 1234564290
}
```

### API Integration

The cloud API validates JWT on every request:

```python
# Middleware validates JWT and extracts tenant context
@app.middleware("http")
async def tenant_middleware(request: Request, call_next):
    token = request.headers.get("Authorization")
    claims = verify_jwt(token)
    request.state.tenant_id = claims["tenant_id"]
    request.state.subscription = claims["subscription_status"]
    # ...
```

## See Also

- SPEC-13: CLI Authentication with Subscription Validation
- `cloud-bisync.md` - Using authenticated sync
- `cloud-mode-usage.md` - Working with cloud APIs

```

--------------------------------------------------------------------------------
/test-int/conftest.py:
--------------------------------------------------------------------------------

```python
"""
Shared fixtures for integration tests.

Integration tests verify the complete flow: MCP Client → MCP Server → FastAPI → Database.
Unlike unit tests which use in-memory databases and mocks, integration tests use real SQLite
files and test the full application stack to ensure all components work together correctly.

## Architecture

The integration test setup creates this flow:

```
Test → MCP Client → MCP Server → HTTP Request (ASGITransport) → FastAPI App → Database
                                                                      ↑
                                                               Dependency overrides
                                                               point to test database
```

## Key Components

1. **Real SQLite Database**: Uses `DatabaseType.FILESYSTEM` with actual SQLite files
   in temporary directories instead of in-memory databases.

2. **Shared Database Connection**: Both MCP server and FastAPI app use the same
   database via dependency injection overrides.

3. **Project Session Management**: Initializes the MCP project session with test
   project configuration so tools know which project to operate on.

4. **Search Index Initialization**: Creates the FTS5 search index tables that
   the application requires for search functionality.

5. **Global Configuration Override**: Modifies the global `basic_memory_app_config`
   so MCP tools use test project settings instead of user configuration.

## Usage

Integration tests should include both `mcp_server` and `app` fixtures to ensure
the complete stack is wired correctly:

```python
@pytest.mark.asyncio
async def test_my_mcp_tool(mcp_server, app):
    async with Client(mcp_server) as client:
        result = await client.call_tool("tool_name", {"param": "value"})
        # Assert on results...
```

The `app` fixture ensures FastAPI dependency overrides are active, and
`mcp_server` provides the MCP server with proper project session initialization.
"""

from typing import AsyncGenerator

import pytest
import pytest_asyncio
from pathlib import Path

from httpx import AsyncClient, ASGITransport

from basic_memory.config import BasicMemoryConfig, ProjectConfig, ConfigManager
from basic_memory.db import engine_session_factory, DatabaseType
from basic_memory.models import Project
from basic_memory.repository.project_repository import ProjectRepository
from fastapi import FastAPI

from basic_memory.deps import get_project_config, get_engine_factory, get_app_config


# Import MCP tools so they're available for testing
from basic_memory.mcp import tools  # noqa: F401


@pytest_asyncio.fixture(scope="function")
async def engine_factory(tmp_path):
    """Create a SQLite file engine factory for integration testing."""
    db_path = tmp_path / "test.db"
    async with engine_session_factory(db_path, DatabaseType.FILESYSTEM) as (
        engine,
        session_maker,
    ):
        # Initialize database schema
        from basic_memory.models.base import Base

        async with engine.begin() as conn:
            await conn.run_sync(Base.metadata.create_all)

        yield engine, session_maker


@pytest_asyncio.fixture(scope="function")
async def test_project(config_home, engine_factory) -> Project:
    """Create a test project."""
    project_data = {
        "name": "test-project",
        "description": "Project used for integration tests",
        "path": str(config_home),
        "is_active": True,
        "is_default": True,
    }

    engine, session_maker = engine_factory
    project_repository = ProjectRepository(session_maker)
    project = await project_repository.create(project_data)
    return project


@pytest.fixture
def config_home(tmp_path, monkeypatch) -> Path:
    monkeypatch.setenv("HOME", str(tmp_path))
    # Set BASIC_MEMORY_HOME to the test directory
    monkeypatch.setenv("BASIC_MEMORY_HOME", str(tmp_path / "basic-memory"))
    return tmp_path


@pytest.fixture(scope="function", autouse=True)
def app_config(config_home, tmp_path, monkeypatch) -> BasicMemoryConfig:
    """Create test app configuration."""
    # Disable cloud mode for CLI tests
    monkeypatch.setenv("BASIC_MEMORY_CLOUD_MODE", "false")

    # Create a basic config with test-project like unit tests do
    projects = {"test-project": str(config_home)}
    app_config = BasicMemoryConfig(
        env="test",
        projects=projects,
        default_project="test-project",
        default_project_mode=False,  # Match real-world usage - tools must pass explicit project
        update_permalinks_on_move=True,
        cloud_mode=False,  # Explicitly disable cloud mode
    )
    return app_config


@pytest.fixture(scope="function", autouse=True)
def config_manager(app_config: BasicMemoryConfig, config_home) -> ConfigManager:
    config_manager = ConfigManager()
    # Update its paths to use the test directory
    config_manager.config_dir = config_home / ".basic-memory"
    config_manager.config_file = config_manager.config_dir / "config.json"
    config_manager.config_dir.mkdir(parents=True, exist_ok=True)

    # Ensure the config file is written to disk
    config_manager.save_config(app_config)
    return config_manager


@pytest.fixture(scope="function", autouse=True)
def project_config(test_project):
    """Create test project configuration."""

    project_config = ProjectConfig(
        name=test_project.name,
        home=Path(test_project.path),
    )

    return project_config


@pytest.fixture(scope="function")
def app(app_config, project_config, engine_factory, test_project, config_manager) -> FastAPI:
    """Create test FastAPI application with single project."""

    # Import the FastAPI app AFTER the config_manager has written the test config to disk
    # This ensures that when the app's lifespan manager runs, it reads the correct test config
    from basic_memory.api.app import app as fastapi_app

    app = fastapi_app
    app.dependency_overrides[get_project_config] = lambda: project_config
    app.dependency_overrides[get_engine_factory] = lambda: engine_factory
    app.dependency_overrides[get_app_config] = lambda: app_config
    return app


@pytest_asyncio.fixture(scope="function")
async def search_service(engine_factory, test_project):
    """Create and initialize search service for integration tests."""
    from basic_memory.repository.search_repository import SearchRepository
    from basic_memory.repository.entity_repository import EntityRepository
    from basic_memory.services.file_service import FileService
    from basic_memory.services.search_service import SearchService
    from basic_memory.markdown.markdown_processor import MarkdownProcessor
    from basic_memory.markdown import EntityParser

    engine, session_maker = engine_factory

    # Create repositories
    search_repository = SearchRepository(session_maker, project_id=test_project.id)
    entity_repository = EntityRepository(session_maker, project_id=test_project.id)

    # Create file service
    entity_parser = EntityParser(Path(test_project.path))
    markdown_processor = MarkdownProcessor(entity_parser)
    file_service = FileService(Path(test_project.path), markdown_processor)

    # Create and initialize search service
    service = SearchService(search_repository, entity_repository, file_service)
    await service.init_search_index()
    return service


@pytest.fixture(scope="function")
def mcp_server(config_manager, search_service):
    # Import mcp instance
    from basic_memory.mcp.server import mcp as server

    # Import mcp tools to register them
    import basic_memory.mcp.tools  # noqa: F401

    # Import prompts to register them
    import basic_memory.mcp.prompts  # noqa: F401

    return server


@pytest_asyncio.fixture(scope="function")
async def client(app: FastAPI) -> AsyncGenerator[AsyncClient, None]:
    """Create test client that both MCP and tests will use."""
    async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
        yield client

```

--------------------------------------------------------------------------------
/tests/test_deps.py:
--------------------------------------------------------------------------------

```python
"""Tests for dependency injection functions in deps.py."""

from datetime import datetime, timezone
from pathlib import Path

import pytest
import pytest_asyncio
from fastapi import HTTPException

from basic_memory.deps import get_project_config, get_project_id
from basic_memory.models.project import Project
from basic_memory.repository.project_repository import ProjectRepository


@pytest_asyncio.fixture
async def project_with_spaces(project_repository: ProjectRepository) -> Project:
    """Create a project with spaces in the name for testing permalink normalization."""
    project_data = {
        "name": "My Test Project",
        "description": "A project with spaces in the name",
        "path": "/my/test/project",
        "is_active": True,
        "is_default": False,
        "created_at": datetime.now(timezone.utc),
        "updated_at": datetime.now(timezone.utc),
    }
    return await project_repository.create(project_data)


@pytest_asyncio.fixture
async def project_with_special_chars(project_repository: ProjectRepository) -> Project:
    """Create a project with special characters for testing permalink normalization."""
    project_data = {
        "name": "Project: Test & Development!",
        "description": "A project with special characters",
        "path": "/project/test/dev",
        "is_active": True,
        "is_default": False,
        "created_at": datetime.now(timezone.utc),
        "updated_at": datetime.now(timezone.utc),
    }
    return await project_repository.create(project_data)


@pytest.mark.asyncio
async def test_get_project_config_with_spaces(
    project_repository: ProjectRepository, project_with_spaces: Project
):
    """Test that get_project_config normalizes project names with spaces."""
    # The project name has spaces: "My Test Project"
    # The permalink should be: "my-test-project"
    assert project_with_spaces.name == "My Test Project"
    assert project_with_spaces.permalink == "my-test-project"

    # Call get_project_config with the project name (not permalink)
    # This simulates what happens when the project name comes from URL path
    config = await get_project_config(
        project="My Test Project", project_repository=project_repository
    )

    # Verify we got the correct project config
    assert config.name == "My Test Project"
    assert config.home == Path("/my/test/project")


@pytest.mark.asyncio
async def test_get_project_config_with_permalink(
    project_repository: ProjectRepository, project_with_spaces: Project
):
    """Test that get_project_config works when already given a permalink."""
    # Call with the permalink directly
    config = await get_project_config(
        project="my-test-project", project_repository=project_repository
    )

    # Verify we got the correct project config
    assert config.name == "My Test Project"
    assert config.home == Path("/my/test/project")


@pytest.mark.asyncio
async def test_get_project_config_with_special_chars(
    project_repository: ProjectRepository, project_with_special_chars: Project
):
    """Test that get_project_config normalizes project names with special characters."""
    # The project name has special chars: "Project: Test & Development!"
    # The permalink should be: "project-test-development"
    assert project_with_special_chars.name == "Project: Test & Development!"
    assert project_with_special_chars.permalink == "project-test-development"

    # Call get_project_config with the project name
    config = await get_project_config(
        project="Project: Test & Development!", project_repository=project_repository
    )

    # Verify we got the correct project config
    assert config.name == "Project: Test & Development!"
    assert config.home == Path("/project/test/dev")


@pytest.mark.asyncio
async def test_get_project_config_not_found(project_repository: ProjectRepository):
    """Test that get_project_config raises HTTPException when project not found."""
    with pytest.raises(HTTPException) as exc_info:
        await get_project_config(
            project="Nonexistent Project", project_repository=project_repository
        )

    assert exc_info.value.status_code == 404
    assert "Project 'Nonexistent Project' not found" in exc_info.value.detail


@pytest.mark.asyncio
async def test_get_project_id_with_spaces(
    project_repository: ProjectRepository, project_with_spaces: Project
):
    """Test that get_project_id normalizes project names with spaces."""
    # Call get_project_id with the project name (not permalink)
    project_id = await get_project_id(
        project_repository=project_repository, project="My Test Project"
    )

    # Verify we got the correct project ID
    assert project_id == project_with_spaces.id


@pytest.mark.asyncio
async def test_get_project_id_with_permalink(
    project_repository: ProjectRepository, project_with_spaces: Project
):
    """Test that get_project_id works when already given a permalink."""
    # Call with the permalink directly
    project_id = await get_project_id(
        project_repository=project_repository, project="my-test-project"
    )

    # Verify we got the correct project ID
    assert project_id == project_with_spaces.id


@pytest.mark.asyncio
async def test_get_project_id_with_special_chars(
    project_repository: ProjectRepository, project_with_special_chars: Project
):
    """Test that get_project_id normalizes project names with special characters."""
    # Call get_project_id with the project name
    project_id = await get_project_id(
        project_repository=project_repository, project="Project: Test & Development!"
    )

    # Verify we got the correct project ID
    assert project_id == project_with_special_chars.id


@pytest.mark.asyncio
async def test_get_project_id_not_found(project_repository: ProjectRepository):
    """Test that get_project_id raises HTTPException when project not found."""
    with pytest.raises(HTTPException) as exc_info:
        await get_project_id(project_repository=project_repository, project="Nonexistent Project")

    assert exc_info.value.status_code == 404
    assert "Project 'Nonexistent Project' not found" in exc_info.value.detail


@pytest.mark.asyncio
async def test_get_project_id_fallback_to_name(
    project_repository: ProjectRepository, test_project: Project
):
    """Test that get_project_id falls back to name lookup if permalink lookup fails.

    This test verifies the fallback behavior in get_project_id where it tries
    get_by_name if get_by_permalink returns None.
    """
    # The test_project fixture has name "test-project" and permalink "test-project"
    # Since both are the same, we can't easily test the fallback with existing fixtures
    # So this test just verifies the normal path works with test_project
    project_id = await get_project_id(project_repository=project_repository, project="test-project")

    assert project_id == test_project.id


@pytest.mark.asyncio
async def test_get_project_config_case_sensitivity(
    project_repository: ProjectRepository, project_with_spaces: Project
):
    """Test that get_project_config handles case variations correctly.

    Permalink normalization should convert to lowercase, so different case
    variations of the same name should resolve to the same project.
    """
    # Create project with mixed case: "My Test Project" -> permalink "my-test-project"

    # Try with different case variations
    config1 = await get_project_config(
        project="My Test Project", project_repository=project_repository
    )
    config2 = await get_project_config(
        project="my test project", project_repository=project_repository
    )
    config3 = await get_project_config(
        project="MY TEST PROJECT", project_repository=project_repository
    )

    # All should resolve to the same project
    assert config1.name == config2.name == config3.name == "My Test Project"
    assert config1.home == config2.home == config3.home == Path("/my/test/project")

```

--------------------------------------------------------------------------------
/tests/api/test_template_loader.py:
--------------------------------------------------------------------------------

```python
"""Tests for the template loader functionality."""

import datetime
import pytest
from pathlib import Path

from basic_memory.api.template_loader import TemplateLoader


@pytest.fixture
def temp_template_dir(tmpdir):
    """Create a temporary directory for test templates."""
    template_dir = tmpdir.mkdir("templates").mkdir("prompts")
    return template_dir


@pytest.fixture
def custom_template_loader(temp_template_dir):
    """Return a TemplateLoader instance with a custom template directory."""
    return TemplateLoader(str(temp_template_dir))


@pytest.fixture
def simple_template(temp_template_dir):
    """Create a simple test template."""
    template_path = temp_template_dir / "simple.hbs"
    template_path.write_text("Hello, {{name}}!", encoding="utf-8")
    return "simple.hbs"


@pytest.mark.asyncio
async def test_render_simple_template(custom_template_loader, simple_template):
    """Test rendering a simple template."""
    context = {"name": "World"}
    result = await custom_template_loader.render(simple_template, context)
    assert result == "Hello, World!"


@pytest.mark.asyncio
async def test_template_cache(custom_template_loader, simple_template):
    """Test that templates are cached."""
    context = {"name": "World"}

    # First render, should load template
    await custom_template_loader.render(simple_template, context)

    # Check that template is in cache
    assert simple_template in custom_template_loader.template_cache

    # Modify the template file - shouldn't affect the cached version
    template_path = Path(custom_template_loader.template_dir) / simple_template
    template_path.write_text("Goodbye, {{name}}!", encoding="utf-8")

    # Second render, should use cached template
    result = await custom_template_loader.render(simple_template, context)
    assert result == "Hello, World!"

    # Clear cache and render again - should use updated template
    custom_template_loader.clear_cache()
    assert simple_template not in custom_template_loader.template_cache

    result = await custom_template_loader.render(simple_template, context)
    assert result == "Goodbye, World!"


@pytest.mark.asyncio
async def test_date_helper(custom_template_loader, temp_template_dir):
    # Test date helper
    date_path = temp_template_dir / "date.hbs"
    date_path.write_text("{{date timestamp}}", encoding="utf-8")
    date_result = await custom_template_loader.render(
        "date.hbs", {"timestamp": datetime.datetime(2023, 1, 1, 12, 30)}
    )
    assert "2023-01-01" in date_result


@pytest.mark.asyncio
async def test_default_helper(custom_template_loader, temp_template_dir):
    # Test default helper
    default_path = temp_template_dir / "default.hbs"
    default_path.write_text("{{default null 'default-value'}}", encoding="utf-8")
    default_result = await custom_template_loader.render("default.hbs", {"null": None})
    assert default_result == "default-value"


@pytest.mark.asyncio
async def test_capitalize_helper(custom_template_loader, temp_template_dir):
    # Test capitalize helper
    capitalize_path = temp_template_dir / "capitalize.hbs"
    capitalize_path.write_text("{{capitalize 'test'}}", encoding="utf-8")
    capitalize_result = await custom_template_loader.render("capitalize.hbs", {})
    assert capitalize_result == "Test"


@pytest.mark.asyncio
async def test_size_helper(custom_template_loader, temp_template_dir):
    # Test size helper
    size_path = temp_template_dir / "size.hbs"
    size_path.write_text("{{size collection}}", encoding="utf-8")
    size_result = await custom_template_loader.render("size.hbs", {"collection": [1, 2, 3]})
    assert size_result == "3"


@pytest.mark.asyncio
async def test_json_helper(custom_template_loader, temp_template_dir):
    # Test json helper
    json_path = temp_template_dir / "json.hbs"
    json_path.write_text("{{json data}}", encoding="utf-8")
    json_result = await custom_template_loader.render("json.hbs", {"data": {"key": "value"}})
    assert json_result == '{"key": "value"}'


@pytest.mark.asyncio
async def test_less_than_helper(custom_template_loader, temp_template_dir):
    # Test lt (less than) helper
    lt_path = temp_template_dir / "lt.hbs"
    lt_path.write_text("{{#if_cond (lt 2 3)}}true{{else}}false{{/if_cond}}", encoding="utf-8")
    lt_result = await custom_template_loader.render("lt.hbs", {})
    assert lt_result == "true"


@pytest.mark.asyncio
async def test_file_not_found(custom_template_loader):
    """Test that FileNotFoundError is raised when a template doesn't exist."""
    with pytest.raises(FileNotFoundError):
        await custom_template_loader.render("non_existent_template.hbs", {})


@pytest.mark.asyncio
async def test_extension_handling(custom_template_loader, temp_template_dir):
    """Test that template extensions are handled correctly."""
    # Create template with .hbs extension
    template_path = temp_template_dir / "test_extension.hbs"
    template_path.write_text("Template with extension: {{value}}", encoding="utf-8")

    # Test accessing with full extension
    result = await custom_template_loader.render("test_extension.hbs", {"value": "works"})
    assert result == "Template with extension: works"

    # Test accessing without extension
    result = await custom_template_loader.render("test_extension", {"value": "also works"})
    assert result == "Template with extension: also works"

    # Test accessing with wrong extension gets converted
    template_path = temp_template_dir / "liquid_template.hbs"
    template_path.write_text("Liquid template: {{value}}", encoding="utf-8")

    result = await custom_template_loader.render("liquid_template.liquid", {"value": "converted"})
    assert result == "Liquid template: converted"


@pytest.mark.asyncio
async def test_dedent_helper(custom_template_loader, temp_template_dir):
    """Test the dedent helper for text blocks."""
    dedent_path = temp_template_dir / "dedent.hbs"

    # Create a template with indented text blocks
    template_content = """Before
    {{#dedent}}
        This is indented text
            with nested indentation
        that should be dedented
        while preserving relative indentation
    {{/dedent}}
After"""

    dedent_path.write_text(template_content, encoding="utf-8")

    # Render the template
    result = await custom_template_loader.render("dedent.hbs", {})

    # Print the actual output for debugging
    print(f"Dedent helper result: {repr(result)}")

    # Check that the indentation is properly removed
    assert "This is indented text" in result
    assert "with nested indentation" in result
    assert "that should be dedented" in result
    assert "while preserving relative indentation" in result
    assert "Before" in result
    assert "After" in result

    # Check that relative indentation is preserved
    assert result.find("with nested indentation") > result.find("This is indented text")


@pytest.mark.asyncio
async def test_nested_dedent_helper(custom_template_loader, temp_template_dir):
    """Test the dedent helper with nested content."""
    dedent_path = temp_template_dir / "nested_dedent.hbs"

    # Create a template with nested indented blocks
    template_content = """
{{#each items}}
    {{#dedent}}
        --- Item {{this}}
        
        Details for item {{this}}
          - Indented detail 1
          - Indented detail 2
    {{/dedent}}
{{/each}}"""

    dedent_path.write_text(template_content, encoding="utf-8")

    # Render the template
    result = await custom_template_loader.render("nested_dedent.hbs", {"items": [1, 2]})

    # Print the actual output for debugging
    print(f"Actual result: {repr(result)}")

    # Use a more flexible assertion that checks individual components
    # instead of exact string matching
    assert "--- Item 1" in result
    assert "Details for item 1" in result
    assert "- Indented detail 1" in result
    assert "--- Item 2" in result
    assert "Details for item 2" in result
    assert "- Indented detail 2" in result

```

--------------------------------------------------------------------------------
/src/basic_memory/models/knowledge.py:
--------------------------------------------------------------------------------

```python
"""Knowledge graph models."""

from datetime import datetime
from basic_memory.utils import ensure_timezone_aware
from typing import Optional

from sqlalchemy import (
    Integer,
    String,
    Text,
    ForeignKey,
    UniqueConstraint,
    DateTime,
    Index,
    JSON,
    Float,
    text,
)
from sqlalchemy.orm import Mapped, mapped_column, relationship

from basic_memory.models.base import Base
from basic_memory.utils import generate_permalink


class Entity(Base):
    """Core entity in the knowledge graph.

    Entities represent semantic nodes maintained by the AI layer. Each entity:
    - Has a unique numeric ID (database-generated)
    - Maps to a file on disk
    - Maintains a checksum for change detection
    - Tracks both source file and semantic properties
    - Belongs to a specific project
    """

    __tablename__ = "entity"
    __table_args__ = (
        # Regular indexes
        Index("ix_entity_type", "entity_type"),
        Index("ix_entity_title", "title"),
        Index("ix_entity_created_at", "created_at"),  # For timeline queries
        Index("ix_entity_updated_at", "updated_at"),  # For timeline queries
        Index("ix_entity_project_id", "project_id"),  # For project filtering
        # Project-specific uniqueness constraints
        Index(
            "uix_entity_permalink_project",
            "permalink",
            "project_id",
            unique=True,
            sqlite_where=text("content_type = 'text/markdown' AND permalink IS NOT NULL"),
        ),
        Index(
            "uix_entity_file_path_project",
            "file_path",
            "project_id",
            unique=True,
        ),
    )

    # Core identity
    id: Mapped[int] = mapped_column(Integer, primary_key=True)
    title: Mapped[str] = mapped_column(String)
    entity_type: Mapped[str] = mapped_column(String)
    entity_metadata: Mapped[Optional[dict]] = mapped_column(JSON, nullable=True)
    content_type: Mapped[str] = mapped_column(String)

    # Project reference
    project_id: Mapped[int] = mapped_column(Integer, ForeignKey("project.id"), nullable=False)

    # Normalized path for URIs - required for markdown files only
    permalink: Mapped[Optional[str]] = mapped_column(String, nullable=True, index=True)
    # Actual filesystem relative path
    file_path: Mapped[str] = mapped_column(String, index=True)
    # checksum of file
    checksum: Mapped[Optional[str]] = mapped_column(String, nullable=True)

    # File metadata for sync
    # mtime: file modification timestamp (Unix epoch float) for change detection
    mtime: Mapped[Optional[float]] = mapped_column(Float, nullable=True)
    # size: file size in bytes for quick change detection
    size: Mapped[Optional[int]] = mapped_column(Integer, nullable=True)

    # Metadata and tracking
    created_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True), default=lambda: datetime.now().astimezone()
    )
    updated_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True),
        default=lambda: datetime.now().astimezone(),
        onupdate=lambda: datetime.now().astimezone(),
    )

    # Relationships
    project = relationship("Project", back_populates="entities")
    observations = relationship(
        "Observation", back_populates="entity", cascade="all, delete-orphan"
    )
    outgoing_relations = relationship(
        "Relation",
        back_populates="from_entity",
        foreign_keys="[Relation.from_id]",
        cascade="all, delete-orphan",
    )
    incoming_relations = relationship(
        "Relation",
        back_populates="to_entity",
        foreign_keys="[Relation.to_id]",
        cascade="all, delete-orphan",
    )

    @property
    def relations(self):
        """Get all relations (incoming and outgoing) for this entity."""
        return self.incoming_relations + self.outgoing_relations

    @property
    def is_markdown(self):
        """Check if the entity is a markdown file."""
        return self.content_type == "text/markdown"

    def __getattribute__(self, name):
        """Override attribute access to ensure datetime fields are timezone-aware."""
        value = super().__getattribute__(name)

        # Ensure datetime fields are timezone-aware
        if name in ("created_at", "updated_at") and isinstance(value, datetime):
            return ensure_timezone_aware(value)

        return value

    def __repr__(self) -> str:
        return f"Entity(id={self.id}, name='{self.title}', type='{self.entity_type}'"


class Observation(Base):
    """An observation about an entity.

    Observations are atomic facts or notes about an entity.
    """

    __tablename__ = "observation"
    __table_args__ = (
        Index("ix_observation_entity_id", "entity_id"),  # Add FK index
        Index("ix_observation_category", "category"),  # Add category index
    )

    id: Mapped[int] = mapped_column(Integer, primary_key=True)
    entity_id: Mapped[int] = mapped_column(Integer, ForeignKey("entity.id", ondelete="CASCADE"))
    content: Mapped[str] = mapped_column(Text)
    category: Mapped[str] = mapped_column(String, nullable=False, default="note")
    context: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
    tags: Mapped[Optional[list[str]]] = mapped_column(
        JSON, nullable=True, default=list, server_default="[]"
    )

    # Relationships
    entity = relationship("Entity", back_populates="observations")

    @property
    def permalink(self) -> str:
        """Create synthetic permalink for the observation.

        We can construct these because observations are always defined in
        and owned by a single entity.
        """
        return generate_permalink(
            f"{self.entity.permalink}/observations/{self.category}/{self.content}"
        )

    def __repr__(self) -> str:  # pragma: no cover
        return f"Observation(id={self.id}, entity_id={self.entity_id}, content='{self.content}')"


class Relation(Base):
    """A directed relation between two entities."""

    __tablename__ = "relation"
    __table_args__ = (
        UniqueConstraint("from_id", "to_id", "relation_type", name="uix_relation_from_id_to_id"),
        UniqueConstraint(
            "from_id", "to_name", "relation_type", name="uix_relation_from_id_to_name"
        ),
        Index("ix_relation_type", "relation_type"),
        Index("ix_relation_from_id", "from_id"),  # Add FK indexes
        Index("ix_relation_to_id", "to_id"),
    )

    id: Mapped[int] = mapped_column(Integer, primary_key=True)
    from_id: Mapped[int] = mapped_column(Integer, ForeignKey("entity.id", ondelete="CASCADE"))
    to_id: Mapped[Optional[int]] = mapped_column(
        Integer, ForeignKey("entity.id", ondelete="CASCADE"), nullable=True
    )
    to_name: Mapped[str] = mapped_column(String)
    relation_type: Mapped[str] = mapped_column(String)
    context: Mapped[Optional[str]] = mapped_column(Text, nullable=True)

    # Relationships
    from_entity = relationship(
        "Entity", foreign_keys=[from_id], back_populates="outgoing_relations"
    )
    to_entity = relationship("Entity", foreign_keys=[to_id], back_populates="incoming_relations")

    @property
    def permalink(self) -> str:
        """Create relation permalink showing the semantic connection.

        Format: source/relation_type/target
        Example: "specs/search/implements/features/search-ui"
        """
        # Only create permalinks when both source and target have permalinks
        from_permalink = self.from_entity.permalink or self.from_entity.file_path

        if self.to_entity:
            to_permalink = self.to_entity.permalink or self.to_entity.file_path
            return generate_permalink(f"{from_permalink}/{self.relation_type}/{to_permalink}")
        return generate_permalink(f"{from_permalink}/{self.relation_type}/{self.to_name}")

    def __repr__(self) -> str:
        return f"Relation(id={self.id}, from_id={self.from_id}, to_id={self.to_id}, to_name={self.to_name}, type='{self.relation_type}')"  # pragma: no cover

```

--------------------------------------------------------------------------------
/src/basic_memory/mcp/tools/project_management.py:
--------------------------------------------------------------------------------

```python
"""Project management tools for Basic Memory MCP server.

These tools allow users to switch between projects, list available projects,
and manage project context during conversations.
"""

import os
from fastmcp import Context

from basic_memory.mcp.async_client import get_client
from basic_memory.mcp.server import mcp
from basic_memory.mcp.tools.utils import call_get, call_post, call_delete
from basic_memory.schemas.project_info import (
    ProjectList,
    ProjectStatusResponse,
    ProjectInfoRequest,
)
from basic_memory.utils import generate_permalink


@mcp.tool("list_memory_projects")
async def list_memory_projects(context: Context | None = None) -> str:
    """List all available projects with their status.

    Shows all Basic Memory projects that are available for MCP operations.
    Use this tool to discover projects when you need to know which project to use.

    Use this tool:
    - At conversation start when project is unknown
    - When user asks about available projects
    - Before any operation requiring a project

    After calling:
    - Ask user which project to use
    - Remember their choice for the session

    Returns:
        Formatted list of projects with session management guidance

    Example:
        list_memory_projects()
    """
    async with get_client() as client:
        if context:  # pragma: no cover
            await context.info("Listing all available projects")

        # Check if server is constrained to a specific project
        constrained_project = os.environ.get("BASIC_MEMORY_MCP_PROJECT")

        # Get projects from API
        response = await call_get(client, "/projects/projects")
        project_list = ProjectList.model_validate(response.json())

        if constrained_project:
            result = f"Project: {constrained_project}\n\n"
            result += "Note: This MCP server is constrained to a single project.\n"
            result += "All operations will automatically use this project."
        else:
            # Show all projects with session guidance
            result = "Available projects:\n"

            for project in project_list.projects:
                result += f"• {project.name}\n"

            result += "\n" + "─" * 40 + "\n"
            result += "Next: Ask which project to use for this session.\n"
            result += "Example: 'Which project should I use for this task?'\n\n"
            result += "Session reminder: Track the selected project for all subsequent operations in this conversation.\n"
            result += "The user can say 'switch to [project]' to change projects."

        return result


@mcp.tool("create_memory_project")
async def create_memory_project(
    project_name: str, project_path: str, set_default: bool = False, context: Context | None = None
) -> str:
    """Create a new Basic Memory project.

    Creates a new project with the specified name and path. The project directory
    will be created if it doesn't exist. Optionally sets the new project as default.

    Args:
        project_name: Name for the new project (must be unique)
        project_path: File system path where the project will be stored
        set_default: Whether to set this project as the default (optional, defaults to False)

    Returns:
        Confirmation message with project details

    Example:
        create_memory_project("my-research", "~/Documents/research")
        create_memory_project("work-notes", "/home/user/work", set_default=True)
    """
    async with get_client() as client:
        # Check if server is constrained to a specific project
        constrained_project = os.environ.get("BASIC_MEMORY_MCP_PROJECT")
        if constrained_project:
            return f'# Error\n\nProject creation disabled - MCP server is constrained to project \'{constrained_project}\'.\nUse the CLI to create projects: `basic-memory project add "{project_name}" "{project_path}"`'

        if context:  # pragma: no cover
            await context.info(f"Creating project: {project_name} at {project_path}")

        # Create the project request
        project_request = ProjectInfoRequest(
            name=project_name, path=project_path, set_default=set_default
        )

        # Call API to create project
        response = await call_post(client, "/projects/projects", json=project_request.model_dump())
        status_response = ProjectStatusResponse.model_validate(response.json())

        result = f"✓ {status_response.message}\n\n"

        if status_response.new_project:
            result += "Project Details:\n"
            result += f"• Name: {status_response.new_project.name}\n"
            result += f"• Path: {status_response.new_project.path}\n"

            if set_default:
                result += "• Set as default project\n"

        result += "\nProject is now available for use in tool calls.\n"
        result += f"Use '{project_name}' as the project parameter in MCP tool calls.\n"

        return result


@mcp.tool()
async def delete_project(project_name: str, context: Context | None = None) -> str:
    """Delete a Basic Memory project.

    Removes a project from the configuration and database. This does NOT delete
    the actual files on disk - only removes the project from Basic Memory's
    configuration and database records.

    Args:
        project_name: Name of the project to delete

    Returns:
        Confirmation message about project deletion

    Example:
        delete_project("old-project")

    Warning:
        This action cannot be undone. The project will need to be re-added
        to access its content through Basic Memory again.
    """
    async with get_client() as client:
        # Check if server is constrained to a specific project
        constrained_project = os.environ.get("BASIC_MEMORY_MCP_PROJECT")
        if constrained_project:
            return f"# Error\n\nProject deletion disabled - MCP server is constrained to project '{constrained_project}'.\nUse the CLI to delete projects: `basic-memory project remove \"{project_name}\"`"

        if context:  # pragma: no cover
            await context.info(f"Deleting project: {project_name}")

        # Get project info before deletion to validate it exists
        response = await call_get(client, "/projects/projects")
        project_list = ProjectList.model_validate(response.json())

        # Find the project by name (case-insensitive) or permalink - same logic as switch_project
        project_permalink = generate_permalink(project_name)
        target_project = None
        for p in project_list.projects:
            # Match by permalink (handles case-insensitive input)
            if p.permalink == project_permalink:
                target_project = p
                break
            # Also match by name comparison (case-insensitive)
            if p.name.lower() == project_name.lower():
                target_project = p
                break

        if not target_project:
            available_projects = [p.name for p in project_list.projects]
            raise ValueError(
                f"Project '{project_name}' not found. Available projects: {', '.join(available_projects)}"
            )

        # Call API to delete project using URL encoding for special characters
        from urllib.parse import quote

        encoded_name = quote(target_project.name, safe="")
        response = await call_delete(client, f"/projects/{encoded_name}")
        status_response = ProjectStatusResponse.model_validate(response.json())

        result = f"✓ {status_response.message}\n\n"

        if status_response.old_project:
            result += "Removed project details:\n"
            result += f"• Name: {status_response.old_project.name}\n"
            if hasattr(status_response.old_project, "path"):
                result += f"• Path: {status_response.old_project.path}\n"

        result += "Files remain on disk but project is no longer tracked by Basic Memory.\n"
        result += "Re-add the project to access its content again.\n"

        return result

```

--------------------------------------------------------------------------------
/src/basic_memory/api/routers/resource_router.py:
--------------------------------------------------------------------------------

```python
"""Routes for getting entity content."""

import tempfile
from pathlib import Path
from typing import Annotated

from fastapi import APIRouter, HTTPException, BackgroundTasks, Body
from fastapi.responses import FileResponse, JSONResponse
from loguru import logger

from basic_memory.deps import (
    ProjectConfigDep,
    LinkResolverDep,
    SearchServiceDep,
    EntityServiceDep,
    FileServiceDep,
    EntityRepositoryDep,
)
from basic_memory.repository.search_repository import SearchIndexRow
from basic_memory.schemas.memory import normalize_memory_url
from basic_memory.schemas.search import SearchQuery, SearchItemType
from basic_memory.models.knowledge import Entity as EntityModel
from datetime import datetime

router = APIRouter(prefix="/resource", tags=["resources"])


def get_entity_ids(item: SearchIndexRow) -> set[int]:
    match item.type:
        case SearchItemType.ENTITY:
            return {item.id}
        case SearchItemType.OBSERVATION:
            return {item.entity_id}  # pyright: ignore [reportReturnType]
        case SearchItemType.RELATION:
            from_entity = item.from_id
            to_entity = item.to_id  # pyright: ignore [reportReturnType]
            return {from_entity, to_entity} if to_entity else {from_entity}  # pyright: ignore [reportReturnType]
        case _:  # pragma: no cover
            raise ValueError(f"Unexpected type: {item.type}")


@router.get("/{identifier:path}")
async def get_resource_content(
    config: ProjectConfigDep,
    link_resolver: LinkResolverDep,
    search_service: SearchServiceDep,
    entity_service: EntityServiceDep,
    file_service: FileServiceDep,
    background_tasks: BackgroundTasks,
    identifier: str,
    page: int = 1,
    page_size: int = 10,
) -> FileResponse:
    """Get resource content by identifier: name or permalink."""
    logger.debug(f"Getting content for: {identifier}")

    # Find single entity by permalink
    entity = await link_resolver.resolve_link(identifier)
    results = [entity] if entity else []

    # pagination for multiple results
    limit = page_size
    offset = (page - 1) * page_size

    # search using the identifier as a permalink
    if not results:
        # if the identifier contains a wildcard, use GLOB search
        query = (
            SearchQuery(permalink_match=identifier)
            if "*" in identifier
            else SearchQuery(permalink=identifier)
        )
        search_results = await search_service.search(query, limit, offset)
        if not search_results:
            raise HTTPException(status_code=404, detail=f"Resource not found: {identifier}")

        # get the deduplicated entities related to the search results
        entity_ids = {id for result in search_results for id in get_entity_ids(result)}
        results = await entity_service.get_entities_by_id(list(entity_ids))

    # return single response
    if len(results) == 1:
        entity = results[0]
        file_path = Path(f"{config.home}/{entity.file_path}")
        if not file_path.exists():
            raise HTTPException(
                status_code=404,
                detail=f"File not found: {file_path}",
            )
        return FileResponse(path=file_path)

    # for multiple files, initialize a temporary file for writing the results
    with tempfile.NamedTemporaryFile(delete=False, mode="w", suffix=".md") as tmp_file:
        temp_file_path = tmp_file.name

        for result in results:
            # Read content for each entity
            content = await file_service.read_entity_content(result)
            memory_url = normalize_memory_url(result.permalink)
            modified_date = result.updated_at.isoformat()
            checksum = result.checksum[:8] if result.checksum else ""

            # Prepare the delimited content
            response_content = f"--- {memory_url} {modified_date} {checksum}\n"
            response_content += f"\n{content}\n"
            response_content += "\n"

            # Write content directly to the temporary file in append mode
            tmp_file.write(response_content)

        # Ensure all content is written to disk
        tmp_file.flush()

    # Schedule the temporary file to be deleted after the response
    background_tasks.add_task(cleanup_temp_file, temp_file_path)

    # Return the file response
    return FileResponse(path=temp_file_path)


def cleanup_temp_file(file_path: str):
    """Delete the temporary file."""
    try:
        Path(file_path).unlink()  # Deletes the file
        logger.debug(f"Temporary file deleted: {file_path}")
    except Exception as e:  # pragma: no cover
        logger.error(f"Error deleting temporary file {file_path}: {e}")


@router.put("/{file_path:path}")
async def write_resource(
    config: ProjectConfigDep,
    file_service: FileServiceDep,
    entity_repository: EntityRepositoryDep,
    search_service: SearchServiceDep,
    file_path: str,
    content: Annotated[str, Body()],
) -> JSONResponse:
    """Write content to a file in the project.

    This endpoint allows writing content directly to a file in the project.
    Also creates an entity record and indexes the file for search.

    Args:
        file_path: Path to write to, relative to project root
        request: Contains the content to write

    Returns:
        JSON response with file information
    """
    try:
        # Get content from request body

        # Ensure it's UTF-8 string content
        if isinstance(content, bytes):  # pragma: no cover
            content_str = content.decode("utf-8")
        else:
            content_str = str(content)

        # Get full file path
        full_path = Path(f"{config.home}/{file_path}")

        # Ensure parent directory exists
        full_path.parent.mkdir(parents=True, exist_ok=True)

        # Write content to file
        checksum = await file_service.write_file(full_path, content_str)

        # Get file info
        file_stats = file_service.file_stats(full_path)

        # Determine file details
        file_name = Path(file_path).name
        content_type = file_service.content_type(full_path)

        entity_type = "canvas" if file_path.endswith(".canvas") else "file"

        # Check if entity already exists
        existing_entity = await entity_repository.get_by_file_path(file_path)

        if existing_entity:
            # Update existing entity
            entity = await entity_repository.update(
                existing_entity.id,
                {
                    "title": file_name,
                    "entity_type": entity_type,
                    "content_type": content_type,
                    "file_path": file_path,
                    "checksum": checksum,
                    "updated_at": datetime.fromtimestamp(file_stats.st_mtime).astimezone(),
                },
            )
            status_code = 200
        else:
            # Create a new entity model
            entity = EntityModel(
                title=file_name,
                entity_type=entity_type,
                content_type=content_type,
                file_path=file_path,
                checksum=checksum,
                created_at=datetime.fromtimestamp(file_stats.st_ctime).astimezone(),
                updated_at=datetime.fromtimestamp(file_stats.st_mtime).astimezone(),
            )
            entity = await entity_repository.add(entity)
            status_code = 201

        # Index the file for search
        await search_service.index_entity(entity)  # pyright: ignore

        # Return success response
        return JSONResponse(
            status_code=status_code,
            content={
                "file_path": file_path,
                "checksum": checksum,
                "size": file_stats.st_size,
                "created_at": file_stats.st_ctime,
                "modified_at": file_stats.st_mtime,
            },
        )
    except Exception as e:  # pragma: no cover
        logger.error(f"Error writing resource {file_path}: {e}")
        raise HTTPException(status_code=500, detail=f"Failed to write resource: {str(e)}")

```

--------------------------------------------------------------------------------
/src/basic_memory/cli/commands/cloud/rclone_config.py:
--------------------------------------------------------------------------------

```python
"""rclone configuration management for Basic Memory Cloud."""

import configparser
import os
import shutil
import subprocess
from pathlib import Path
from typing import Dict, List, Optional

from rich.console import Console

console = Console()


class RcloneConfigError(Exception):
    """Exception raised for rclone configuration errors."""

    pass


class RcloneMountProfile:
    """Mount profile with optimized settings."""

    def __init__(
        self,
        name: str,
        cache_time: str,
        poll_interval: str,
        attr_timeout: str,
        write_back: str,
        description: str,
        extra_args: Optional[List[str]] = None,
    ):
        self.name = name
        self.cache_time = cache_time
        self.poll_interval = poll_interval
        self.attr_timeout = attr_timeout
        self.write_back = write_back
        self.description = description
        self.extra_args = extra_args or []


# Mount profiles based on SPEC-7 Phase 4 testing
MOUNT_PROFILES = {
    "fast": RcloneMountProfile(
        name="fast",
        cache_time="5s",
        poll_interval="3s",
        attr_timeout="3s",
        write_back="1s",
        description="Ultra-fast development (5s sync, higher bandwidth)",
    ),
    "balanced": RcloneMountProfile(
        name="balanced",
        cache_time="10s",
        poll_interval="5s",
        attr_timeout="5s",
        write_back="2s",
        description="Fast development (10-15s sync, recommended)",
    ),
    "safe": RcloneMountProfile(
        name="safe",
        cache_time="15s",
        poll_interval="10s",
        attr_timeout="10s",
        write_back="5s",
        description="Conflict-aware mount with backup",
        extra_args=[
            "--conflict-suffix",
            ".conflict-{DateTimeExt}",
            "--backup-dir",
            "~/.basic-memory/conflicts",
            "--track-renames",
        ],
    ),
}


def get_rclone_config_path() -> Path:
    """Get the path to rclone configuration file."""
    config_dir = Path.home() / ".config" / "rclone"
    config_dir.mkdir(parents=True, exist_ok=True)
    return config_dir / "rclone.conf"


def backup_rclone_config() -> Optional[Path]:
    """Create a backup of existing rclone config."""
    config_path = get_rclone_config_path()
    if not config_path.exists():
        return None

    backup_path = config_path.with_suffix(f".conf.backup-{os.getpid()}")
    shutil.copy2(config_path, backup_path)
    console.print(f"[dim]Created backup: {backup_path}[/dim]")
    return backup_path


def load_rclone_config() -> configparser.ConfigParser:
    """Load existing rclone configuration."""
    config = configparser.ConfigParser()
    config_path = get_rclone_config_path()

    if config_path.exists():
        config.read(config_path)

    return config


def save_rclone_config(config: configparser.ConfigParser) -> None:
    """Save rclone configuration to file."""
    config_path = get_rclone_config_path()

    with open(config_path, "w") as f:
        config.write(f)

    console.print(f"[dim]Updated rclone config: {config_path}[/dim]")


def add_tenant_to_rclone_config(
    tenant_id: str,
    bucket_name: str,
    access_key: str,
    secret_key: str,
    endpoint: str = "https://fly.storage.tigris.dev",
    region: str = "auto",
) -> str:
    """Add tenant configuration to rclone config file."""

    # Backup existing config
    backup_rclone_config()

    # Load existing config
    config = load_rclone_config()

    # Create section name
    section_name = f"basic-memory-{tenant_id}"

    # Add/update the tenant section
    if not config.has_section(section_name):
        config.add_section(section_name)

    config.set(section_name, "type", "s3")
    config.set(section_name, "provider", "Other")
    config.set(section_name, "access_key_id", access_key)
    config.set(section_name, "secret_access_key", secret_key)
    config.set(section_name, "endpoint", endpoint)
    config.set(section_name, "region", region)

    # Save updated config
    save_rclone_config(config)

    console.print(f"[green]✓ Added tenant {tenant_id} to rclone config[/green]")
    return section_name


def remove_tenant_from_rclone_config(tenant_id: str) -> bool:
    """Remove tenant configuration from rclone config."""
    config = load_rclone_config()
    section_name = f"basic-memory-{tenant_id}"

    if config.has_section(section_name):
        backup_rclone_config()
        config.remove_section(section_name)
        save_rclone_config(config)
        console.print(f"[green]✓ Removed tenant {tenant_id} from rclone config[/green]")
        return True

    return False


def get_default_mount_path() -> Path:
    """Get default mount path (fixed location per SPEC-9).

    Returns:
        Fixed mount path: ~/basic-memory-cloud/
    """
    return Path.home() / "basic-memory-cloud"


def build_mount_command(
    tenant_id: str, bucket_name: str, mount_path: Path, profile: RcloneMountProfile
) -> List[str]:
    """Build rclone mount command with optimized settings."""

    rclone_remote = f"basic-memory-{tenant_id}:{bucket_name}"

    cmd = [
        "rclone",
        "nfsmount",
        rclone_remote,
        str(mount_path),
        "--vfs-cache-mode",
        "writes",
        "--dir-cache-time",
        profile.cache_time,
        "--vfs-cache-poll-interval",
        profile.poll_interval,
        "--attr-timeout",
        profile.attr_timeout,
        "--vfs-write-back",
        profile.write_back,
        "--daemon",
    ]

    # Add profile-specific extra arguments
    cmd.extend(profile.extra_args)

    return cmd


def is_path_mounted(mount_path: Path) -> bool:
    """Check if a path is currently mounted."""
    if not mount_path.exists():
        return False

    try:
        # Check if mount point is actually mounted by looking for mount table entry
        result = subprocess.run(["mount"], capture_output=True, text=True, check=False)

        if result.returncode == 0:
            # Look for our mount path in mount output
            mount_str = str(mount_path.resolve())
            return mount_str in result.stdout

        return False
    except Exception:
        return False


def get_rclone_processes() -> List[Dict[str, str]]:
    """Get list of running rclone processes."""
    try:
        # Use ps to find rclone processes
        result = subprocess.run(
            ["ps", "-eo", "pid,args"], capture_output=True, text=True, check=False
        )

        processes = []
        if result.returncode == 0:
            for line in result.stdout.split("\n"):
                if "rclone" in line and "basic-memory" in line:
                    parts = line.strip().split(None, 1)
                    if len(parts) >= 2:
                        processes.append({"pid": parts[0], "command": parts[1]})

        return processes
    except Exception:
        return []


def kill_rclone_process(pid: str) -> bool:
    """Kill a specific rclone process."""
    try:
        subprocess.run(["kill", pid], check=True)
        console.print(f"[green]✓ Killed rclone process {pid}[/green]")
        return True
    except subprocess.CalledProcessError:
        console.print(f"[red]✗ Failed to kill rclone process {pid}[/red]")
        return False


def unmount_path(mount_path: Path) -> bool:
    """Unmount a mounted path."""
    if not is_path_mounted(mount_path):
        return True

    try:
        subprocess.run(["umount", str(mount_path)], check=True)
        console.print(f"[green]✓ Unmounted {mount_path}[/green]")
        return True
    except subprocess.CalledProcessError as e:
        console.print(f"[red]✗ Failed to unmount {mount_path}: {e}[/red]")
        return False


def cleanup_orphaned_rclone_processes() -> int:
    """Clean up orphaned rclone processes for basic-memory."""
    processes = get_rclone_processes()
    killed_count = 0

    for proc in processes:
        console.print(
            f"[yellow]Found rclone process: {proc['pid']} - {proc['command'][:80]}...[/yellow]"
        )
        if kill_rclone_process(proc["pid"]):
            killed_count += 1

    return killed_count

```
Page 4/17FirstPrevNextLast