#
tokens: 46352/50000 10/347 files (page 11/23)
lines: on (toggle) GitHub
raw markdown copy reset
This is page 11 of 23. Use http://codebase.md/basicmachines-co/basic-memory?lines=true&page={x} to view the full context.

# Directory Structure

```
├── .claude
│   ├── agents
│   │   ├── python-developer.md
│   │   └── system-architect.md
│   └── commands
│       ├── release
│       │   ├── beta.md
│       │   ├── changelog.md
│       │   ├── release-check.md
│       │   └── release.md
│       ├── spec.md
│       └── test-live.md
├── .dockerignore
├── .github
│   ├── dependabot.yml
│   ├── ISSUE_TEMPLATE
│   │   ├── bug_report.md
│   │   ├── config.yml
│   │   ├── documentation.md
│   │   └── feature_request.md
│   └── workflows
│       ├── claude-code-review.yml
│       ├── claude-issue-triage.yml
│       ├── claude.yml
│       ├── dev-release.yml
│       ├── docker.yml
│       ├── pr-title.yml
│       ├── release.yml
│       └── test.yml
├── .gitignore
├── .python-version
├── CHANGELOG.md
├── CITATION.cff
├── CLA.md
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── docker-compose.yml
├── Dockerfile
├── docs
│   ├── ai-assistant-guide-extended.md
│   ├── character-handling.md
│   ├── cloud-cli.md
│   └── Docker.md
├── justfile
├── LICENSE
├── llms-install.md
├── pyproject.toml
├── README.md
├── SECURITY.md
├── smithery.yaml
├── specs
│   ├── SPEC-1 Specification-Driven Development Process.md
│   ├── SPEC-10 Unified Deployment Workflow and Event Tracking.md
│   ├── SPEC-11 Basic Memory API Performance Optimization.md
│   ├── SPEC-12 OpenTelemetry Observability.md
│   ├── SPEC-13 CLI Authentication with Subscription Validation.md
│   ├── SPEC-14 Cloud Git Versioning & GitHub Backup.md
│   ├── SPEC-14- Cloud Git Versioning & GitHub Backup.md
│   ├── SPEC-15 Configuration Persistence via Tigris for Cloud Tenants.md
│   ├── SPEC-16 MCP Cloud Service Consolidation.md
│   ├── SPEC-17 Semantic Search with ChromaDB.md
│   ├── SPEC-18 AI Memory Management Tool.md
│   ├── SPEC-19 Sync Performance and Memory Optimization.md
│   ├── SPEC-2 Slash Commands Reference.md
│   ├── SPEC-20 Simplified Project-Scoped Rclone Sync.md
│   ├── SPEC-3 Agent Definitions.md
│   ├── SPEC-4 Notes Web UI Component Architecture.md
│   ├── SPEC-5 CLI Cloud Upload via WebDAV.md
│   ├── SPEC-6 Explicit Project Parameter Architecture.md
│   ├── SPEC-7 POC to spike Tigris Turso for local access to cloud data.md
│   ├── SPEC-8 TigrisFS Integration.md
│   ├── SPEC-9 Multi-Project Bidirectional Sync Architecture.md
│   ├── SPEC-9 Signed Header Tenant Information.md
│   └── SPEC-9-1 Follow-Ups- Conflict, Sync, and Observability.md
├── src
│   └── basic_memory
│       ├── __init__.py
│       ├── alembic
│       │   ├── alembic.ini
│       │   ├── env.py
│       │   ├── migrations.py
│       │   ├── script.py.mako
│       │   └── versions
│       │       ├── 3dae7c7b1564_initial_schema.py
│       │       ├── 502b60eaa905_remove_required_from_entity_permalink.py
│       │       ├── 5fe1ab1ccebe_add_projects_table.py
│       │       ├── 647e7a75e2cd_project_constraint_fix.py
│       │       ├── 9d9c1cb7d8f5_add_mtime_and_size_columns_to_entity_.py
│       │       ├── a1b2c3d4e5f6_fix_project_foreign_keys.py
│       │       ├── b3c3938bacdb_relation_to_name_unique_index.py
│       │       ├── cc7172b46608_update_search_index_schema.py
│       │       └── e7e1f4367280_add_scan_watermark_tracking_to_project.py
│       ├── api
│       │   ├── __init__.py
│       │   ├── app.py
│       │   ├── routers
│       │   │   ├── __init__.py
│       │   │   ├── directory_router.py
│       │   │   ├── importer_router.py
│       │   │   ├── knowledge_router.py
│       │   │   ├── management_router.py
│       │   │   ├── memory_router.py
│       │   │   ├── project_router.py
│       │   │   ├── prompt_router.py
│       │   │   ├── resource_router.py
│       │   │   ├── search_router.py
│       │   │   └── utils.py
│       │   └── template_loader.py
│       ├── cli
│       │   ├── __init__.py
│       │   ├── app.py
│       │   ├── auth.py
│       │   ├── commands
│       │   │   ├── __init__.py
│       │   │   ├── cloud
│       │   │   │   ├── __init__.py
│       │   │   │   ├── api_client.py
│       │   │   │   ├── bisync_commands.py
│       │   │   │   ├── cloud_utils.py
│       │   │   │   ├── core_commands.py
│       │   │   │   ├── rclone_commands.py
│       │   │   │   ├── rclone_config.py
│       │   │   │   ├── rclone_installer.py
│       │   │   │   ├── upload_command.py
│       │   │   │   └── upload.py
│       │   │   ├── command_utils.py
│       │   │   ├── db.py
│       │   │   ├── import_chatgpt.py
│       │   │   ├── import_claude_conversations.py
│       │   │   ├── import_claude_projects.py
│       │   │   ├── import_memory_json.py
│       │   │   ├── mcp.py
│       │   │   ├── project.py
│       │   │   ├── status.py
│       │   │   └── tool.py
│       │   └── main.py
│       ├── config.py
│       ├── db.py
│       ├── deps.py
│       ├── file_utils.py
│       ├── ignore_utils.py
│       ├── importers
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── chatgpt_importer.py
│       │   ├── claude_conversations_importer.py
│       │   ├── claude_projects_importer.py
│       │   ├── memory_json_importer.py
│       │   └── utils.py
│       ├── markdown
│       │   ├── __init__.py
│       │   ├── entity_parser.py
│       │   ├── markdown_processor.py
│       │   ├── plugins.py
│       │   ├── schemas.py
│       │   └── utils.py
│       ├── mcp
│       │   ├── __init__.py
│       │   ├── async_client.py
│       │   ├── project_context.py
│       │   ├── prompts
│       │   │   ├── __init__.py
│       │   │   ├── ai_assistant_guide.py
│       │   │   ├── continue_conversation.py
│       │   │   ├── recent_activity.py
│       │   │   ├── search.py
│       │   │   └── utils.py
│       │   ├── resources
│       │   │   ├── ai_assistant_guide.md
│       │   │   └── project_info.py
│       │   ├── server.py
│       │   └── tools
│       │       ├── __init__.py
│       │       ├── build_context.py
│       │       ├── canvas.py
│       │       ├── chatgpt_tools.py
│       │       ├── delete_note.py
│       │       ├── edit_note.py
│       │       ├── list_directory.py
│       │       ├── move_note.py
│       │       ├── project_management.py
│       │       ├── read_content.py
│       │       ├── read_note.py
│       │       ├── recent_activity.py
│       │       ├── search.py
│       │       ├── utils.py
│       │       ├── view_note.py
│       │       └── write_note.py
│       ├── models
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── knowledge.py
│       │   ├── project.py
│       │   └── search.py
│       ├── repository
│       │   ├── __init__.py
│       │   ├── entity_repository.py
│       │   ├── observation_repository.py
│       │   ├── project_info_repository.py
│       │   ├── project_repository.py
│       │   ├── relation_repository.py
│       │   ├── repository.py
│       │   └── search_repository.py
│       ├── schemas
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── cloud.py
│       │   ├── delete.py
│       │   ├── directory.py
│       │   ├── importer.py
│       │   ├── memory.py
│       │   ├── project_info.py
│       │   ├── prompt.py
│       │   ├── request.py
│       │   ├── response.py
│       │   ├── search.py
│       │   └── sync_report.py
│       ├── services
│       │   ├── __init__.py
│       │   ├── context_service.py
│       │   ├── directory_service.py
│       │   ├── entity_service.py
│       │   ├── exceptions.py
│       │   ├── file_service.py
│       │   ├── initialization.py
│       │   ├── link_resolver.py
│       │   ├── project_service.py
│       │   ├── search_service.py
│       │   └── service.py
│       ├── sync
│       │   ├── __init__.py
│       │   ├── background_sync.py
│       │   ├── sync_service.py
│       │   └── watch_service.py
│       ├── templates
│       │   └── prompts
│       │       ├── continue_conversation.hbs
│       │       └── search.hbs
│       └── utils.py
├── test-int
│   ├── BENCHMARKS.md
│   ├── cli
│   │   ├── test_project_commands_integration.py
│   │   └── test_version_integration.py
│   ├── conftest.py
│   ├── mcp
│   │   ├── test_build_context_underscore.py
│   │   ├── test_build_context_validation.py
│   │   ├── test_chatgpt_tools_integration.py
│   │   ├── test_default_project_mode_integration.py
│   │   ├── test_delete_note_integration.py
│   │   ├── test_edit_note_integration.py
│   │   ├── test_list_directory_integration.py
│   │   ├── test_move_note_integration.py
│   │   ├── test_project_management_integration.py
│   │   ├── test_project_state_sync_integration.py
│   │   ├── test_read_content_integration.py
│   │   ├── test_read_note_integration.py
│   │   ├── test_search_integration.py
│   │   ├── test_single_project_mcp_integration.py
│   │   └── test_write_note_integration.py
│   ├── test_db_wal_mode.py
│   ├── test_disable_permalinks_integration.py
│   └── test_sync_performance_benchmark.py
├── tests
│   ├── __init__.py
│   ├── api
│   │   ├── conftest.py
│   │   ├── test_async_client.py
│   │   ├── test_continue_conversation_template.py
│   │   ├── test_directory_router.py
│   │   ├── test_importer_router.py
│   │   ├── test_knowledge_router.py
│   │   ├── test_management_router.py
│   │   ├── test_memory_router.py
│   │   ├── test_project_router_operations.py
│   │   ├── test_project_router.py
│   │   ├── test_prompt_router.py
│   │   ├── test_relation_background_resolution.py
│   │   ├── test_resource_router.py
│   │   ├── test_search_router.py
│   │   ├── test_search_template.py
│   │   ├── test_template_loader_helpers.py
│   │   └── test_template_loader.py
│   ├── cli
│   │   ├── conftest.py
│   │   ├── test_cli_tools.py
│   │   ├── test_cloud_authentication.py
│   │   ├── test_ignore_utils.py
│   │   ├── test_import_chatgpt.py
│   │   ├── test_import_claude_conversations.py
│   │   ├── test_import_claude_projects.py
│   │   ├── test_import_memory_json.py
│   │   ├── test_project_add_with_local_path.py
│   │   └── test_upload.py
│   ├── conftest.py
│   ├── db
│   │   └── test_issue_254_foreign_key_constraints.py
│   ├── importers
│   │   ├── test_importer_base.py
│   │   └── test_importer_utils.py
│   ├── markdown
│   │   ├── __init__.py
│   │   ├── test_date_frontmatter_parsing.py
│   │   ├── test_entity_parser_error_handling.py
│   │   ├── test_entity_parser.py
│   │   ├── test_markdown_plugins.py
│   │   ├── test_markdown_processor.py
│   │   ├── test_observation_edge_cases.py
│   │   ├── test_parser_edge_cases.py
│   │   ├── test_relation_edge_cases.py
│   │   └── test_task_detection.py
│   ├── mcp
│   │   ├── conftest.py
│   │   ├── test_obsidian_yaml_formatting.py
│   │   ├── test_permalink_collision_file_overwrite.py
│   │   ├── test_prompts.py
│   │   ├── test_resources.py
│   │   ├── test_tool_build_context.py
│   │   ├── test_tool_canvas.py
│   │   ├── test_tool_delete_note.py
│   │   ├── test_tool_edit_note.py
│   │   ├── test_tool_list_directory.py
│   │   ├── test_tool_move_note.py
│   │   ├── test_tool_read_content.py
│   │   ├── test_tool_read_note.py
│   │   ├── test_tool_recent_activity.py
│   │   ├── test_tool_resource.py
│   │   ├── test_tool_search.py
│   │   ├── test_tool_utils.py
│   │   ├── test_tool_view_note.py
│   │   ├── test_tool_write_note.py
│   │   └── tools
│   │       └── test_chatgpt_tools.py
│   ├── Non-MarkdownFileSupport.pdf
│   ├── repository
│   │   ├── test_entity_repository_upsert.py
│   │   ├── test_entity_repository.py
│   │   ├── test_entity_upsert_issue_187.py
│   │   ├── test_observation_repository.py
│   │   ├── test_project_info_repository.py
│   │   ├── test_project_repository.py
│   │   ├── test_relation_repository.py
│   │   ├── test_repository.py
│   │   ├── test_search_repository_edit_bug_fix.py
│   │   └── test_search_repository.py
│   ├── schemas
│   │   ├── test_base_timeframe_minimum.py
│   │   ├── test_memory_serialization.py
│   │   ├── test_memory_url_validation.py
│   │   ├── test_memory_url.py
│   │   ├── test_schemas.py
│   │   └── test_search.py
│   ├── Screenshot.png
│   ├── services
│   │   ├── test_context_service.py
│   │   ├── test_directory_service.py
│   │   ├── test_entity_service_disable_permalinks.py
│   │   ├── test_entity_service.py
│   │   ├── test_file_service.py
│   │   ├── test_initialization.py
│   │   ├── test_link_resolver.py
│   │   ├── test_project_removal_bug.py
│   │   ├── test_project_service_operations.py
│   │   ├── test_project_service.py
│   │   └── test_search_service.py
│   ├── sync
│   │   ├── test_character_conflicts.py
│   │   ├── test_sync_service_incremental.py
│   │   ├── test_sync_service.py
│   │   ├── test_sync_wikilink_issue.py
│   │   ├── test_tmp_files.py
│   │   ├── test_watch_service_edge_cases.py
│   │   ├── test_watch_service_reload.py
│   │   └── test_watch_service.py
│   ├── test_config.py
│   ├── test_db_migration_deduplication.py
│   ├── test_deps.py
│   ├── test_production_cascade_delete.py
│   ├── test_rclone_commands.py
│   └── utils
│       ├── test_file_utils.py
│       ├── test_frontmatter_obsidian_compatible.py
│       ├── test_parse_tags.py
│       ├── test_permalink_formatting.py
│       ├── test_utf8_handling.py
│       └── test_validate_project_path.py
├── uv.lock
├── v0.15.0-RELEASE-DOCS.md
└── v15-docs
    ├── api-performance.md
    ├── background-relations.md
    ├── basic-memory-home.md
    ├── bug-fixes.md
    ├── chatgpt-integration.md
    ├── cloud-authentication.md
    ├── cloud-bisync.md
    ├── cloud-mode-usage.md
    ├── cloud-mount.md
    ├── default-project-mode.md
    ├── env-file-removal.md
    ├── env-var-overrides.md
    ├── explicit-project-parameter.md
    ├── gitignore-integration.md
    ├── project-root-env-var.md
    ├── README.md
    └── sqlite-performance.md
```

# Files

--------------------------------------------------------------------------------
/specs/SPEC-9-1 Follow-Ups- Conflict, Sync, and Observability.md:
--------------------------------------------------------------------------------

```markdown
  1 | ---
  2 | title: 'SPEC-9-1 Follow-Ups: Conflict, Sync, and Observability'
  3 | type: tasklist
  4 | permalink: specs/spec-9-follow-ups-conflict-sync-and-observability
  5 | related: specs/spec-9-multi-project-bisync
  6 | status: revised
  7 | revision_date: 2025-10-03
  8 | ---
  9 | 
 10 | # SPEC-9-1 Follow-Ups: Conflict, Sync, and Observability
 11 | 
 12 | **REVISED 2025-10-03:** Simplified to leverage rclone built-ins instead of custom conflict handling.
 13 | 
 14 | **Context:** SPEC-9 delivered multi-project bidirectional sync and a unified CLI. This follow-up focuses on **observability and safety** using rclone's built-in capabilities rather than reinventing conflict handling.
 15 | 
 16 | **Design Philosophy: "Be Dumb Like Git"**
 17 | - Let rclone bisync handle conflict detection (it already does this)
 18 | - Make conflicts visible and recoverable, don't prevent them
 19 | - Cloud is always the winner on conflict (cloud-primary model)
 20 | - Users who want version history can just use Git locally in their sync directory
 21 | 
 22 | **What Changed from Original Version:**
 23 | - **Replaced:** Custom `.bmmeta` sidecars → Use rclone's `.bisync/` state tracking
 24 | - **Replaced:** Custom conflict detection → Use rclone bisync 3-way merge
 25 | - **Replaced:** Tombstone files → rclone delete tracking handles this
 26 | - **Replaced:** Distributed lease → Local process lock only (document multi-device warning)
 27 | - **Replaced:** S3 versioning service → Users just use Git locally if they want history
 28 | - **Deferred:** SPEC-14 Git integration → Postponed to teams/multi-user features
 29 | 
 30 | ## ✅ Now 
 31 | - [ ] **Local process lock**: Prevent concurrent bisync runs on same device (`~/.basic-memory/sync.lock`)
 32 | - [ ] **Structured sync reports**: Parse rclone bisync output into JSON reports (creates/updates/deletes/conflicts, bytes, duration); `bm sync --report`
 33 | - [ ] **Multi-device warning**: Document that users should not run `--watch` on multiple devices simultaneously
 34 | - [ ] **Version control guidance**: Document pattern for users to use Git locally in their sync directory if they want version history
 35 | - [ ] **Docs polish**: cloud-mode toggle, mount↔bisync directory isolation, conflict semantics, quick start, migration guide, short demo clip/GIF
 36 | 
 37 | ## 🔜 Next
 38 | - [ ] **Observability commands**: `bm conflicts list`, `bm sync history` to view sync reports and conflicts
 39 | - [ ] **Conflict resolution UI**: `bm conflicts resolve <file>` to interactively pick winner from conflict files
 40 | - [ ] **Selective sync**: allow include/exclude by project; per-project profile (safe/balanced/fast)
 41 | 
 42 | ## 🧭 Later
 43 | - [ ] **Near real-time sync**: File watcher → targeted `rclone copy` for individual files (keep bisync as backstop)
 44 | - [ ] **Sharing / scoped tokens**: cross-tenant/project access
 45 | - [ ] **Bandwidth controls & backpressure**: policy for large repos
 46 | - [ ] **Client-side encryption (optional)**: with clear trade-offs
 47 | 
 48 | ## 📏 Acceptance criteria (for "Now" items)
 49 | - [ ] Local process lock prevents concurrent bisync runs on same device
 50 | - [ ] rclone bisync conflict files visible and documented (`file.conflict1.md`, `file.conflict2.md`)
 51 | - [ ] `bm sync --report` generates parsable JSON with sync statistics
 52 | - [ ] Documentation clearly warns about multi-device `--watch` mode
 53 | - [ ] Documentation shows users how to use Git locally for version history
 54 | 
 55 | ## What We're NOT Building (Deferred to rclone)
 56 | - ❌ Custom `.bmmeta` sidecars (rclone tracks state in `.bisync/` workdir)
 57 | - ❌ Custom conflict detection (rclone bisync already does 3-way merge detection)
 58 | - ❌ Tombstone files (S3 versioning + rclone delete tracking handles this)
 59 | - ❌ Distributed lease (low probability issue, rclone detects state divergence)
 60 | - ❌ Rename/move tracking (rclone has size+modtime heuristics built-in)
 61 | 
 62 | ## Implementation Summary
 63 | 
 64 | **Current State (SPEC-9):**
 65 | - ✅ rclone bisync with 3 profiles (safe/balanced/fast)
 66 | - ✅ `--max-delete` safety limits (10/25/50 files)
 67 | - ✅ `--conflict-resolve=newer` for auto-resolution
 68 | - ✅ Watch mode: `bm sync --watch` (60s intervals)
 69 | - ✅ Integrity checking: `bm cloud check`
 70 | - ✅ Mount vs bisync directory isolation
 71 | 
 72 | **What's Needed (This Spec):**
 73 | 1. **Process lock** - Simple file-based lock in `~/.basic-memory/sync.lock`
 74 | 2. **Sync reports** - Parse rclone output, save to `~/.basic-memory/sync-history/`
 75 | 3. **Documentation** - Multi-device warnings, conflict resolution workflow, Git usage pattern
 76 | 
 77 | **User Model:**
 78 | - Cloud is always the winner on conflict (cloud-primary)
 79 | - rclone creates `.conflict` files for divergent edits
 80 | - Users who want version history just use Git in their local sync directory
 81 | - Users warned: don't run `--watch` on multiple devices
 82 | 
 83 | ## Decision Rationale & Trade-offs
 84 | 
 85 | ### Why Trust rclone Instead of Custom Conflict Handling?
 86 | 
 87 | **rclone bisync already provides:**
 88 | - 3-way merge detection (compares local, remote, and last-known state)
 89 | - File state tracking in `.bisync/` workdir (hashes, modtimes)
 90 | - Automatic conflict file creation: `file.conflict1.md`, `file.conflict2.md`
 91 | - Rename detection via size+modtime heuristics
 92 | - Delete tracking (prevents resurrection of deleted files)
 93 | - Battle-tested with extensive edge case handling
 94 | 
 95 | **What we'd have to build with custom approach:**
 96 | - Per-file metadata tracking (`.bmmeta` sidecars)
 97 | - 3-way diff algorithm
 98 | - Conflict detection logic
 99 | - Tombstone files for deletes
100 | - Rename/move detection
101 | - Testing for all edge cases
102 | 
103 | **Decision:** Use what rclone already does well. Don't reinvent the wheel.
104 | 
105 | ### Why Let Users Use Git Locally Instead of Building Versioning?
106 | 
107 | **The simplest solution: Just use Git**
108 | 
109 | Users who want version history can literally just use Git in their sync directory:
110 | 
111 | ```bash
112 | cd ~/basic-memory-cloud-sync/
113 | git init
114 | git add .
115 | git commit -m "backup"
116 | 
117 | # Push to their own GitHub if they want
118 | git remote add origin [email protected]:user/my-knowledge.git
119 | git push
120 | ```
121 | 
122 | **Why this is perfect:**
123 | - ✅ We build nothing
124 | - ✅ Users who want Git... just use Git
125 | - ✅ Users who don't care... don't need to
126 | - ✅ rclone bisync already handles sync conflicts
127 | - ✅ Users own their data, they can version it however they want (Git, Time Machine, etc.)
128 | 
129 | **What we'd have to build for S3 versioning:**
130 | - API to enable versioning on Tigris buckets
131 |   - **Problem**: Tigris doesn't support S3 bucket versioning
132 | - Restore commands: `bm cloud restore --version-id`
133 | - Version listing: `bm cloud versions <path>`
134 | - Lifecycle policies for version retention
135 | - Documentation and user education
136 | 
137 | **What we'd have to build for SPEC-14 Git integration:**
138 | - Committer service (daemon watching `/app/data/`)
139 | - Puller service (webhook handler for GitHub pushes)
140 | - Git LFS for large files
141 | - Loop prevention between Git ↔ bisync ↔ local
142 | - Merge conflict handling at TWO layers (rclone + Git)
143 | - Webhook infrastructure and monitoring
144 | 
145 | **Decision:** Don't build version control. Document the pattern. "The easiest problem to solve is the one you avoid."
146 | 
147 | **When to revisit:** Teams/multi-user features where server-side version control becomes necessary for collaboration.
148 | 
149 | ### Why No Distributed Lease?
150 | 
151 | **Low probability issue:**
152 | - Requires user to manually run `bm sync` on multiple devices at exact same time
153 | - Most users run `--watch` on one primary device
154 | - rclone bisync detects state divergence and fails safely
155 | 
156 | **Safety nets in place:**
157 | - Local process lock prevents concurrent runs on same device
158 | - rclone bisync aborts if bucket state changed during sync
159 | - S3 versioning recovers from any overwrites
160 | - Documentation warns against multi-device `--watch`
161 | 
162 | **Failure mode:**
163 | ```bash
164 | # Device A and B sync simultaneously
165 | Device A: bm sync → succeeds
166 | Device B: bm sync → "Error: path has changed, run --resync"
167 | 
168 | # User fixes with resync
169 | Device B: bm sync --resync → establishes new baseline
170 | ```
171 | 
172 | **Decision:** Document the issue, add local lock, defer distributed coordination until users report actual problems.
173 | 
174 | ### Cloud-Primary Conflict Model
175 | 
176 | **User mental model:**
177 | - Cloud is the source of truth (like Dropbox/iCloud)
178 | - Local is working copy
179 | - On conflict: cloud wins, local edits → `.conflict` file
180 | - User manually picks winner
181 | 
182 | **Why this works:**
183 | - Simpler than bidirectional merge (no automatic resolution risk)
184 | - Matches user expectations from Dropbox
185 | - S3 versioning provides safety net for overwrites
186 | - Clear recovery path: restore from S3 version if needed
187 | 
188 | **Example workflow:**
189 | ```bash
190 | # Edit file on Device A and Device B while offline
191 | # Both devices come online and sync
192 | 
193 | Device A: bm sync
194 | # → Pushes to cloud first, becomes canonical version
195 | 
196 | Device B: bm sync
197 | # → Detects conflict
198 | # → Cloud version: work/notes.md
199 | # → Local version: work/notes.md.conflict1
200 | # → User manually merges or picks winner
201 | 
202 | # Restore if needed
203 | bm cloud restore work/notes.md --version-id abc123
204 | ```
205 | 
206 | ## Implementation Details
207 | 
208 | ### 1. Local Process Lock
209 | 
210 | ```python
211 | # ~/.basic-memory/sync.lock
212 | import os
213 | import psutil
214 | from pathlib import Path
215 | 
216 | class SyncLock:
217 |     def __init__(self):
218 |         self.lock_file = Path.home() / '.basic-memory' / 'sync.lock'
219 | 
220 |     def acquire(self):
221 |         if self.lock_file.exists():
222 |             pid = int(self.lock_file.read_text())
223 |             if psutil.pid_exists(pid):
224 |                 raise BisyncError(
225 |                     f"Sync already running (PID {pid}). "
226 |                     f"Wait for completion or kill stale process."
227 |                 )
228 |             # Stale lock, remove it
229 |             self.lock_file.unlink()
230 | 
231 |         self.lock_file.write_text(str(os.getpid()))
232 | 
233 |     def release(self):
234 |         if self.lock_file.exists():
235 |             self.lock_file.unlink()
236 | 
237 |     def __enter__(self):
238 |         self.acquire()
239 |         return self
240 | 
241 |     def __exit__(self, *args):
242 |         self.release()
243 | 
244 | # Usage
245 | with SyncLock():
246 |     run_rclone_bisync()
247 | ```
248 | 
249 | ### 3. Sync Report Parsing
250 | 
251 | ```python
252 | # Parse rclone bisync output
253 | import json
254 | from datetime import datetime
255 | from pathlib import Path
256 | 
257 | def parse_sync_report(rclone_output: str, duration: float, exit_code: int) -> dict:
258 |     """Parse rclone bisync output into structured report."""
259 | 
260 |     # rclone bisync outputs lines like:
261 |     # "Synching Path1 /local/path with Path2 remote:bucket"
262 |     # "- Path1    File was copied to Path2"
263 |     # "Bisync successful"
264 | 
265 |     report = {
266 |         "timestamp": datetime.now().isoformat(),
267 |         "duration_seconds": duration,
268 |         "exit_code": exit_code,
269 |         "success": exit_code == 0,
270 |         "files_created": 0,
271 |         "files_updated": 0,
272 |         "files_deleted": 0,
273 |         "conflicts": [],
274 |         "errors": []
275 |     }
276 | 
277 |     for line in rclone_output.split('\n'):
278 |         if 'was copied to' in line:
279 |             report['files_created'] += 1
280 |         elif 'was updated in' in line:
281 |             report['files_updated'] += 1
282 |         elif 'was deleted from' in line:
283 |             report['files_deleted'] += 1
284 |         elif '.conflict' in line:
285 |             report['conflicts'].append(line.strip())
286 |         elif 'ERROR' in line:
287 |             report['errors'].append(line.strip())
288 | 
289 |     return report
290 | 
291 | def save_sync_report(report: dict):
292 |     """Save sync report to history."""
293 |     history_dir = Path.home() / '.basic-memory' / 'sync-history'
294 |     history_dir.mkdir(parents=True, exist_ok=True)
295 | 
296 |     timestamp = datetime.now().strftime('%Y%m%d-%H%M%S')
297 |     report_file = history_dir / f'{timestamp}.json'
298 | 
299 |     report_file.write_text(json.dumps(report, indent=2))
300 | 
301 | # Usage in run_bisync()
302 | start_time = time.time()
303 | result = subprocess.run(bisync_cmd, capture_output=True, text=True)
304 | duration = time.time() - start_time
305 | 
306 | report = parse_sync_report(result.stdout, duration, result.returncode)
307 | save_sync_report(report)
308 | 
309 | if report['conflicts']:
310 |     console.print(f"[yellow]⚠ {len(report['conflicts'])} conflict(s) detected[/yellow]")
311 |     console.print("[dim]Run 'bm conflicts list' to view[/dim]")
312 | ```
313 | 
314 | ### 4. User Commands
315 | 
316 | ```bash
317 | # View sync history
318 | bm sync history
319 | # → Lists recent syncs from ~/.basic-memory/sync-history/*.json
320 | # → Shows: timestamp, duration, files changed, conflicts, errors
321 | 
322 | # View current conflicts
323 | bm conflicts list
324 | # → Scans sync directory for *.conflict* files
325 | # → Shows: file path, conflict versions, timestamps
326 | 
327 | # Restore from S3 version
328 | bm cloud restore work/notes.md --version-id abc123
329 | # → Uses aws s3api get-object with version-id
330 | # → Downloads to original path
331 | 
332 | bm cloud restore work/notes.md --timestamp "2025-10-03 14:30"
333 | # → Lists versions, finds closest to timestamp
334 | # → Downloads that version
335 | 
336 | # List file versions
337 | bm cloud versions work/notes.md
338 | # → Uses aws s3api list-object-versions
339 | # → Shows: version-id, timestamp, size, author
340 | 
341 | # Interactive conflict resolution
342 | bm conflicts resolve work/notes.md
343 | # → Shows both versions side-by-side
344 | # → Prompts: Keep local, keep cloud, merge manually, restore from S3 version
345 | # → Cleans up .conflict files after resolution
346 | ```
347 | 
348 | ## Success Metrics & Monitoring
349 | 
350 | **Phase 1 (v1) - Basic Safety:**
351 | - [ ] Conflict detection rate < 5% of syncs (measure in telemetry)
352 | - [ ] User can resolve conflicts within 5 minutes (UX testing)
353 | - [ ] Documentation prevents 90% of multi-device issues
354 | 
355 | **Phase 2 (v2) - Observability:**
356 | - [ ] 80% of users check `bm sync history` when troubleshooting
357 | - [ ] Average time to restore from S3 version < 2 minutes
358 | - 
359 | - [ ] Conflict resolution success rate > 95%
360 | 
361 | **What to measure:**
362 | ```python
363 | # Telemetry in sync reports
364 | {
365 |     "conflict_rate": conflicts / total_syncs,
366 |     "multi_device_collisions": count_state_divergence_errors,
367 |     "version_restores": count_restore_operations,
368 |     "avg_sync_duration": sum(durations) / count,
369 |     "max_delete_trips": count_max_delete_aborts
370 | }
371 | ```
372 | 
373 | **When to add distributed lease:**
374 | - Multi-device collision rate > 5% of syncs
375 | - User complaints about state divergence errors
376 | - Evidence that local lock isn't sufficient
377 | 
378 | **When to revisit Git (SPEC-14):**
379 | - Teams feature launches (multi-user collaboration)
380 | - Users request commit messages / audit trail
381 | - PR-based review workflow becomes valuable
382 | 
383 | ## Links
384 | - SPEC-9: `specs/spec-9-multi-project-bisync`
385 | - SPEC-14: `specs/spec-14-cloud-git-versioning` (deferred in favor of S3 versioning)
386 | - rclone bisync docs: https://rclone.org/bisync/
387 | - Tigris S3 versioning: https://www.tigrisdata.com/docs/buckets/versioning/
388 | 
389 | ---
390 | **Owner:** <assign>  |  **Review cadence:** weekly in standup  |  **Last updated:** 2025-10-03
391 | 
```

--------------------------------------------------------------------------------
/tests/api/test_resource_router.py:
--------------------------------------------------------------------------------

```python
  1 | """Tests for resource router endpoints."""
  2 | 
  3 | import json
  4 | from datetime import datetime, timezone
  5 | from pathlib import Path
  6 | 
  7 | import pytest
  8 | 
  9 | from basic_memory.schemas import EntityResponse
 10 | from basic_memory.utils import normalize_newlines
 11 | 
 12 | 
 13 | @pytest.mark.asyncio
 14 | async def test_get_resource_content(client, project_config, entity_repository, project_url):
 15 |     """Test getting content by permalink."""
 16 |     # Create a test file
 17 |     content = "# Test Content\n\nThis is a test file."
 18 |     test_file = Path(project_config.home) / "test" / "test.md"
 19 |     test_file.parent.mkdir(parents=True, exist_ok=True)
 20 |     test_file.write_text(content)
 21 | 
 22 |     # Create entity referencing the file
 23 |     entity = await entity_repository.create(
 24 |         {
 25 |             "title": "Test Entity",
 26 |             "entity_type": "test",
 27 |             "permalink": "test/test",
 28 |             "file_path": "test/test.md",  # Relative to config.home
 29 |             "content_type": "text/markdown",
 30 |             "created_at": datetime.now(timezone.utc),
 31 |             "updated_at": datetime.now(timezone.utc),
 32 |         }
 33 |     )
 34 | 
 35 |     # Test getting the content
 36 |     response = await client.get(f"{project_url}/resource/{entity.permalink}")
 37 |     assert response.status_code == 200
 38 |     assert response.headers["content-type"] == "text/markdown; charset=utf-8"
 39 |     assert response.text == normalize_newlines(content)
 40 | 
 41 | 
 42 | @pytest.mark.asyncio
 43 | async def test_get_resource_pagination(client, project_config, entity_repository, project_url):
 44 |     """Test getting content by permalink with pagination."""
 45 |     # Create a test file
 46 |     content = "# Test Content\n\nThis is a test file."
 47 |     test_file = Path(project_config.home) / "test" / "test.md"
 48 |     test_file.parent.mkdir(parents=True, exist_ok=True)
 49 |     test_file.write_text(content)
 50 | 
 51 |     # Create entity referencing the file
 52 |     entity = await entity_repository.create(
 53 |         {
 54 |             "title": "Test Entity",
 55 |             "entity_type": "test",
 56 |             "permalink": "test/test",
 57 |             "file_path": "test/test.md",  # Relative to config.home
 58 |             "content_type": "text/markdown",
 59 |             "created_at": datetime.now(timezone.utc),
 60 |             "updated_at": datetime.now(timezone.utc),
 61 |         }
 62 |     )
 63 | 
 64 |     # Test getting the content
 65 |     response = await client.get(
 66 |         f"{project_url}/resource/{entity.permalink}", params={"page": 1, "page_size": 1}
 67 |     )
 68 |     assert response.status_code == 200
 69 |     assert response.headers["content-type"] == "text/markdown; charset=utf-8"
 70 |     assert response.text == normalize_newlines(content)
 71 | 
 72 | 
 73 | @pytest.mark.asyncio
 74 | async def test_get_resource_by_title(client, project_config, entity_repository, project_url):
 75 |     """Test getting content by permalink."""
 76 |     # Create a test file
 77 |     content = "# Test Content\n\nThis is a test file."
 78 |     test_file = Path(project_config.home) / "test" / "test.md"
 79 |     test_file.parent.mkdir(parents=True, exist_ok=True)
 80 |     test_file.write_text(content)
 81 | 
 82 |     # Create entity referencing the file
 83 |     entity = await entity_repository.create(
 84 |         {
 85 |             "title": "Test Entity",
 86 |             "entity_type": "test",
 87 |             "permalink": "test/test",
 88 |             "file_path": "test/test.md",  # Relative to config.home
 89 |             "content_type": "text/markdown",
 90 |             "created_at": datetime.now(timezone.utc),
 91 |             "updated_at": datetime.now(timezone.utc),
 92 |         }
 93 |     )
 94 | 
 95 |     # Test getting the content
 96 |     response = await client.get(f"{project_url}/resource/{entity.title}")
 97 |     assert response.status_code == 200
 98 | 
 99 | 
100 | @pytest.mark.asyncio
101 | async def test_get_resource_missing_entity(client, project_url):
102 |     """Test 404 when entity doesn't exist."""
103 |     response = await client.get(f"{project_url}/resource/does/not/exist")
104 |     assert response.status_code == 404
105 |     assert "Resource not found" in response.json()["detail"]
106 | 
107 | 
108 | @pytest.mark.asyncio
109 | async def test_get_resource_missing_file(client, project_config, entity_repository, project_url):
110 |     """Test 404 when file doesn't exist."""
111 |     # Create entity referencing non-existent file
112 |     entity = await entity_repository.create(
113 |         {
114 |             "title": "Missing File",
115 |             "entity_type": "test",
116 |             "permalink": "test/missing",
117 |             "file_path": "test/missing.md",
118 |             "content_type": "text/markdown",
119 |             "created_at": datetime.now(timezone.utc),
120 |             "updated_at": datetime.now(timezone.utc),
121 |         }
122 |     )
123 | 
124 |     response = await client.get(f"{project_url}/resource/{entity.permalink}")
125 |     assert response.status_code == 404
126 |     assert "File not found" in response.json()["detail"]
127 | 
128 | 
129 | @pytest.mark.asyncio
130 | async def test_get_resource_observation(client, project_config, entity_repository, project_url):
131 |     """Test getting content by observation permalink."""
132 |     # Create entity
133 |     content = "# Test Content\n\n- [note] an observation."
134 |     data = {
135 |         "title": "Test Entity",
136 |         "folder": "test",
137 |         "entity_type": "test",
138 |         "content": f"{content}",
139 |     }
140 |     response = await client.post(f"{project_url}/knowledge/entities", json=data)
141 |     entity_response = response.json()
142 |     entity = EntityResponse(**entity_response)
143 | 
144 |     assert len(entity.observations) == 1
145 |     observation = entity.observations[0]
146 | 
147 |     # Test getting the content via the observation
148 |     response = await client.get(f"{project_url}/resource/{observation.permalink}")
149 |     assert response.status_code == 200
150 |     assert response.headers["content-type"] == "text/markdown; charset=utf-8"
151 |     assert (
152 |         normalize_newlines(
153 |             """
154 | ---
155 | title: Test Entity
156 | type: test
157 | permalink: test/test-entity
158 | ---
159 | 
160 | # Test Content
161 | 
162 | - [note] an observation.
163 |     """.strip()
164 |         )
165 |         in response.text
166 |     )
167 | 
168 | 
169 | @pytest.mark.asyncio
170 | async def test_get_resource_entities(client, project_config, entity_repository, project_url):
171 |     """Test getting content by permalink match."""
172 |     # Create entity
173 |     content1 = "# Test Content\n"
174 |     data = {
175 |         "title": "Test Entity",
176 |         "folder": "test",
177 |         "entity_type": "test",
178 |         "content": f"{content1}",
179 |     }
180 |     response = await client.post(f"{project_url}/knowledge/entities", json=data)
181 |     entity_response = response.json()
182 |     entity1 = EntityResponse(**entity_response)
183 | 
184 |     content2 = "# Related Content\n- links to [[Test Entity]]"
185 |     data = {
186 |         "title": "Related Entity",
187 |         "folder": "test",
188 |         "entity_type": "test",
189 |         "content": f"{content2}",
190 |     }
191 |     response = await client.post(f"{project_url}/knowledge/entities", json=data)
192 |     entity_response = response.json()
193 |     entity2 = EntityResponse(**entity_response)
194 | 
195 |     assert len(entity2.relations) == 1
196 | 
197 |     # Test getting the content via the relation
198 |     response = await client.get(f"{project_url}/resource/test/*")
199 |     assert response.status_code == 200
200 |     assert response.headers["content-type"] == "text/markdown; charset=utf-8"
201 |     assert (
202 |         normalize_newlines(
203 |             f"""
204 | --- memory://test/test-entity {entity1.updated_at.isoformat()} {entity1.checksum[:8]}
205 | 
206 | # Test Content
207 | 
208 | --- memory://test/related-entity {entity2.updated_at.isoformat()} {entity2.checksum[:8]}
209 | 
210 | # Related Content
211 | - links to [[Test Entity]]
212 | 
213 |     """.strip()
214 |         )
215 |         in response.text
216 |     )
217 | 
218 | 
219 | @pytest.mark.asyncio
220 | async def test_get_resource_entities_pagination(
221 |     client, project_config, entity_repository, project_url
222 | ):
223 |     """Test getting content by permalink match."""
224 |     # Create entity
225 |     content1 = "# Test Content\n"
226 |     data = {
227 |         "title": "Test Entity",
228 |         "folder": "test",
229 |         "entity_type": "test",
230 |         "content": f"{content1}",
231 |     }
232 |     response = await client.post(f"{project_url}/knowledge/entities", json=data)
233 |     entity_response = response.json()
234 |     entity1 = EntityResponse(**entity_response)
235 |     assert entity1
236 | 
237 |     content2 = "# Related Content\n- links to [[Test Entity]]"
238 |     data = {
239 |         "title": "Related Entity",
240 |         "folder": "test",
241 |         "entity_type": "test",
242 |         "content": f"{content2}",
243 |     }
244 |     response = await client.post(f"{project_url}/knowledge/entities", json=data)
245 |     entity_response = response.json()
246 |     entity2 = EntityResponse(**entity_response)
247 | 
248 |     assert len(entity2.relations) == 1
249 | 
250 |     # Test getting second result
251 |     response = await client.get(
252 |         f"{project_url}/resource/test/*", params={"page": 2, "page_size": 1}
253 |     )
254 |     assert response.status_code == 200
255 |     assert response.headers["content-type"] == "text/markdown; charset=utf-8"
256 |     assert (
257 |         normalize_newlines(
258 |             """
259 | ---
260 | title: Related Entity
261 | type: test
262 | permalink: test/related-entity
263 | ---
264 | 
265 | # Related Content
266 | - links to [[Test Entity]]
267 | """.strip()
268 |         )
269 |         in response.text
270 |     )
271 | 
272 | 
273 | @pytest.mark.asyncio
274 | async def test_get_resource_relation(client, project_config, entity_repository, project_url):
275 |     """Test getting content by relation permalink."""
276 |     # Create entity
277 |     content1 = "# Test Content\n"
278 |     data = {
279 |         "title": "Test Entity",
280 |         "folder": "test",
281 |         "entity_type": "test",
282 |         "content": f"{content1}",
283 |     }
284 |     response = await client.post(f"{project_url}/knowledge/entities", json=data)
285 |     entity_response = response.json()
286 |     entity1 = EntityResponse(**entity_response)
287 | 
288 |     content2 = "# Related Content\n- links to [[Test Entity]]"
289 |     data = {
290 |         "title": "Related Entity",
291 |         "folder": "test",
292 |         "entity_type": "test",
293 |         "content": f"{content2}",
294 |     }
295 |     response = await client.post(f"{project_url}/knowledge/entities", json=data)
296 |     entity_response = response.json()
297 |     entity2 = EntityResponse(**entity_response)
298 | 
299 |     assert len(entity2.relations) == 1
300 |     relation = entity2.relations[0]
301 | 
302 |     # Test getting the content via the relation
303 |     response = await client.get(f"{project_url}/resource/{relation.permalink}")
304 |     assert response.status_code == 200
305 |     assert response.headers["content-type"] == "text/markdown; charset=utf-8"
306 |     assert (
307 |         normalize_newlines(
308 |             f"""
309 | --- memory://test/test-entity {entity1.updated_at.isoformat()} {entity1.checksum[:8]}
310 | 
311 | # Test Content
312 | 
313 | --- memory://test/related-entity {entity2.updated_at.isoformat()} {entity2.checksum[:8]}
314 | 
315 | # Related Content
316 | - links to [[Test Entity]]
317 |     
318 |     """.strip()
319 |         )
320 |         in response.text
321 |     )
322 | 
323 | 
324 | @pytest.mark.asyncio
325 | async def test_put_resource_new_file(
326 |     client, project_config, entity_repository, search_repository, project_url
327 | ):
328 |     """Test creating a new file via PUT."""
329 |     # Test data
330 |     file_path = "visualizations/test.canvas"
331 |     canvas_data = {
332 |         "nodes": [
333 |             {
334 |                 "id": "node1",
335 |                 "type": "text",
336 |                 "text": "Test node content",
337 |                 "x": 100,
338 |                 "y": 200,
339 |                 "width": 400,
340 |                 "height": 300,
341 |             }
342 |         ],
343 |         "edges": [],
344 |     }
345 | 
346 |     # Make sure the file doesn't exist yet
347 |     full_path = Path(project_config.home) / file_path
348 |     if full_path.exists():
349 |         full_path.unlink()
350 | 
351 |     # Execute PUT request
352 |     response = await client.put(
353 |         f"{project_url}/resource/{file_path}", json=json.dumps(canvas_data, indent=2)
354 |     )
355 | 
356 |     # Verify response
357 |     assert response.status_code == 201
358 |     response_data = response.json()
359 |     assert response_data["file_path"] == file_path
360 |     assert "checksum" in response_data
361 |     assert "size" in response_data
362 | 
363 |     # Verify file was created
364 |     full_path = Path(project_config.home) / file_path
365 |     assert full_path.exists()
366 | 
367 |     # Verify file content
368 |     file_content = full_path.read_text(encoding="utf-8")
369 |     assert json.loads(file_content) == canvas_data
370 | 
371 |     # Verify entity was created in DB
372 |     entity = await entity_repository.get_by_file_path(file_path)
373 |     assert entity is not None
374 |     assert entity.entity_type == "canvas"
375 |     assert entity.content_type == "application/json"
376 | 
377 |     # Verify entity was indexed for search
378 |     search_results = await search_repository.search(title="test.canvas")
379 |     assert len(search_results) > 0
380 | 
381 | 
382 | @pytest.mark.asyncio
383 | async def test_put_resource_update_existing(client, project_config, entity_repository, project_url):
384 |     """Test updating an existing file via PUT."""
385 |     # Create an initial file and entity
386 |     file_path = "visualizations/update-test.canvas"
387 |     full_path = Path(project_config.home) / file_path
388 |     full_path.parent.mkdir(parents=True, exist_ok=True)
389 | 
390 |     initial_data = {
391 |         "nodes": [
392 |             {
393 |                 "id": "initial",
394 |                 "type": "text",
395 |                 "text": "Initial content",
396 |                 "x": 0,
397 |                 "y": 0,
398 |                 "width": 200,
399 |                 "height": 100,
400 |             }
401 |         ],
402 |         "edges": [],
403 |     }
404 |     full_path.write_text(json.dumps(initial_data))
405 | 
406 |     # Create the initial entity
407 |     initial_entity = await entity_repository.create(
408 |         {
409 |             "title": "update-test.canvas",
410 |             "entity_type": "canvas",
411 |             "file_path": file_path,
412 |             "content_type": "application/json",
413 |             "checksum": "initial123",
414 |             "created_at": datetime.now(timezone.utc),
415 |             "updated_at": datetime.now(timezone.utc),
416 |         }
417 |     )
418 | 
419 |     # New data for update
420 |     updated_data = {
421 |         "nodes": [
422 |             {
423 |                 "id": "updated",
424 |                 "type": "text",
425 |                 "text": "Updated content",
426 |                 "x": 100,
427 |                 "y": 100,
428 |                 "width": 300,
429 |                 "height": 200,
430 |             }
431 |         ],
432 |         "edges": [],
433 |     }
434 | 
435 |     # Execute PUT request to update
436 |     response = await client.put(
437 |         f"{project_url}/resource/{file_path}", json=json.dumps(updated_data, indent=2)
438 |     )
439 | 
440 |     # Verify response
441 |     assert response.status_code == 200
442 | 
443 |     # Verify file was updated
444 |     updated_content = full_path.read_text(encoding="utf-8")
445 |     assert json.loads(updated_content) == updated_data
446 | 
447 |     # Verify entity was updated
448 |     updated_entity = await entity_repository.get_by_file_path(file_path)
449 |     assert updated_entity.id == initial_entity.id  # Same entity, updated
450 |     assert updated_entity.checksum != initial_entity.checksum  # Checksum changed
451 | 
```

--------------------------------------------------------------------------------
/tests/mcp/test_permalink_collision_file_overwrite.py:
--------------------------------------------------------------------------------

```python
  1 | """Tests for permalink collision file overwrite bug discovered in live testing.
  2 | 
  3 | This test reproduces a critical data loss bug where creating notes with
  4 | titles that normalize to different permalinks but resolve to the same
  5 | file location causes silent file overwrites without warning.
  6 | 
  7 | Related to GitHub Issue #139 but tests a different aspect - not database
  8 | UNIQUE constraints, but actual file overwrite behavior.
  9 | 
 10 | Example scenario from live testing:
 11 | 1. Create "Node A" → file: edge-cases/Node A.md, permalink: edge-cases/node-a
 12 | 2. Create "Node C" → file: edge-cases/Node C.md, permalink: edge-cases/node-c
 13 | 3. BUG: Node C creation overwrites edge-cases/Node A.md file content
 14 | 4. Result: File "Node A.md" exists but contains "Node C" content
 15 | """
 16 | 
 17 | import pytest
 18 | from pathlib import Path
 19 | from textwrap import dedent
 20 | 
 21 | from basic_memory.mcp.tools import write_note, read_note
 22 | from basic_memory.sync.sync_service import SyncService
 23 | from basic_memory.config import ProjectConfig
 24 | from basic_memory.services import EntityService
 25 | 
 26 | 
 27 | async def force_full_scan(sync_service: SyncService) -> None:
 28 |     """Force next sync to do a full scan by clearing watermark (for testing moves/deletions)."""
 29 |     if sync_service.entity_repository.project_id is not None:
 30 |         project = await sync_service.project_repository.find_by_id(
 31 |             sync_service.entity_repository.project_id
 32 |         )
 33 |         if project:
 34 |             await sync_service.project_repository.update(
 35 |                 project.id,
 36 |                 {
 37 |                     "last_scan_timestamp": None,
 38 |                     "last_file_count": None,
 39 |                 },
 40 |             )
 41 | 
 42 | 
 43 | @pytest.mark.asyncio
 44 | async def test_permalink_collision_should_not_overwrite_different_file(app, test_project):
 45 |     """Test that creating notes with different titles doesn't overwrite existing files.
 46 | 
 47 |     This test reproduces the critical bug discovered in Phase 4 of live testing where:
 48 |     - Creating "Node A" worked fine
 49 |     - Creating "Node C" silently overwrote Node A.md's content
 50 |     - No warning or error was shown to the user
 51 |     - Original Node A content was permanently lost
 52 | 
 53 |     Expected behavior:
 54 |     - Each note with a different title should create/update its own file
 55 |     - No silent overwrites should occur
 56 |     - Files should maintain their distinct content
 57 | 
 58 |     Current behavior (BUG):
 59 |     - Second note creation sometimes overwrites first note's file
 60 |     - File "Node A.md" contains "Node C" content after creating Node C
 61 |     - Data loss occurs without user warning
 62 |     """
 63 |     # Step 1: Create first note "Node A"
 64 |     result_a = await write_note.fn(
 65 |         project=test_project.name,
 66 |         title="Node A",
 67 |         folder="edge-cases",
 68 |         content="# Node A\n\nOriginal content for Node A\n\n## Relations\n- links_to [[Node B]]",
 69 |     )
 70 | 
 71 |     assert "# Created note" in result_a
 72 |     assert "file_path: edge-cases/Node A.md" in result_a
 73 |     assert "permalink: edge-cases/node-a" in result_a
 74 | 
 75 |     # Verify Node A content via read
 76 |     content_a = await read_note.fn("edge-cases/node-a", project=test_project.name)
 77 |     assert "Node A" in content_a
 78 |     assert "Original content for Node A" in content_a
 79 | 
 80 |     # Step 2: Create second note "Node B" (should be independent)
 81 |     result_b = await write_note.fn(
 82 |         project=test_project.name,
 83 |         title="Node B",
 84 |         folder="edge-cases",
 85 |         content="# Node B\n\nContent for Node B",
 86 |     )
 87 | 
 88 |     assert "# Created note" in result_b
 89 |     assert "file_path: edge-cases/Node B.md" in result_b
 90 |     assert "permalink: edge-cases/node-b" in result_b
 91 | 
 92 |     # Step 3: Create third note "Node C" (this is where the bug occurs)
 93 |     result_c = await write_note.fn(
 94 |         project=test_project.name,
 95 |         title="Node C",
 96 |         folder="edge-cases",
 97 |         content="# Node C\n\nContent for Node C\n\n## Relations\n- links_to [[Node A]]",
 98 |     )
 99 | 
100 |     assert "# Created note" in result_c
101 |     assert "file_path: edge-cases/Node C.md" in result_c
102 |     assert "permalink: edge-cases/node-c" in result_c
103 | 
104 |     # CRITICAL CHECK: Verify Node A still has its original content
105 |     # This is where the bug manifests - Node A.md gets overwritten with Node C content
106 |     content_a_after = await read_note.fn("edge-cases/node-a", project=test_project.name)
107 |     assert "Node A" in content_a_after, "Node A title should still be 'Node A'"
108 |     assert "Original content for Node A" in content_a_after, (
109 |         "Node A file should NOT be overwritten by Node C creation"
110 |     )
111 |     assert "Content for Node C" not in content_a_after, "Node A should NOT contain Node C's content"
112 | 
113 |     # Verify Node C has its own content
114 |     content_c = await read_note.fn("edge-cases/node-c", project=test_project.name)
115 |     assert "Node C" in content_c
116 |     assert "Content for Node C" in content_c
117 |     assert "Original content for Node A" not in content_c, (
118 |         "Node C should not contain Node A's content"
119 |     )
120 | 
121 |     # Verify files physically exist with correct content
122 |     project_path = Path(test_project.path)
123 |     node_a_file = project_path / "edge-cases" / "Node A.md"
124 |     node_c_file = project_path / "edge-cases" / "Node C.md"
125 | 
126 |     assert node_a_file.exists(), "Node A.md file should exist"
127 |     assert node_c_file.exists(), "Node C.md file should exist"
128 | 
129 |     # Read actual file contents to verify no overwrite occurred
130 |     node_a_file_content = node_a_file.read_text()
131 |     node_c_file_content = node_c_file.read_text()
132 | 
133 |     assert "Node A" in node_a_file_content, "Physical file Node A.md should contain Node A title"
134 |     assert "Original content for Node A" in node_a_file_content, (
135 |         "Physical file Node A.md should contain original Node A content"
136 |     )
137 |     assert "Content for Node C" not in node_a_file_content, (
138 |         "Physical file Node A.md should NOT contain Node C content"
139 |     )
140 | 
141 |     assert "Node C" in node_c_file_content, "Physical file Node C.md should contain Node C title"
142 |     assert "Content for Node C" in node_c_file_content, (
143 |         "Physical file Node C.md should contain Node C content"
144 |     )
145 | 
146 | 
147 | @pytest.mark.asyncio
148 | async def test_notes_with_similar_titles_maintain_separate_files(app, test_project):
149 |     """Test that notes with similar titles that normalize differently don't collide.
150 | 
151 |     Tests additional edge cases around permalink normalization to ensure
152 |     we don't have collision issues with various title patterns.
153 |     """
154 |     # Create notes with titles that could potentially cause issues
155 |     titles_and_folders = [
156 |         ("My Note", "test"),
157 |         ("My-Note", "test"),  # Different title, similar permalink
158 |         ("My_Note", "test"),  # Underscore vs hyphen
159 |         ("my note", "test"),  # Case variation
160 |     ]
161 | 
162 |     created_permalinks = []
163 | 
164 |     for title, folder in titles_and_folders:
165 |         result = await write_note.fn(
166 |             project=test_project.name,
167 |             title=title,
168 |             folder=folder,
169 |             content=f"# {title}\n\nUnique content for {title}",
170 |         )
171 | 
172 |         permalink = None
173 |         # Extract permalink from result
174 |         for line in result.split("\n"):
175 |             if line.startswith("permalink:"):
176 |                 permalink = line.split(":", 1)[1].strip()
177 |                 created_permalinks.append((title, permalink))
178 |                 break
179 | 
180 |         # Verify each note can be read back with its own content
181 |         content = await read_note.fn(permalink, project=test_project.name)
182 |         assert f"Unique content for {title}" in content, (
183 |             f"Note with title '{title}' should maintain its unique content"
184 |         )
185 | 
186 |     # Verify all created permalinks are tracked
187 |     assert len(created_permalinks) == len(titles_and_folders), (
188 |         "All notes should be created successfully"
189 |     )
190 | 
191 | 
192 | @pytest.mark.asyncio
193 | async def test_sequential_note_creation_preserves_all_files(app, test_project):
194 |     """Test that rapid sequential note creation doesn't cause file overwrites.
195 | 
196 |     This test creates multiple notes in sequence to ensure that file
197 |     creation/update logic doesn't have race conditions or state issues
198 |     that could cause overwrites.
199 |     """
200 |     notes_data = [
201 |         ("Alpha", "# Alpha\n\nAlpha content"),
202 |         ("Beta", "# Beta\n\nBeta content"),
203 |         ("Gamma", "# Gamma\n\nGamma content"),
204 |         ("Delta", "# Delta\n\nDelta content"),
205 |         ("Epsilon", "# Epsilon\n\nEpsilon content"),
206 |     ]
207 | 
208 |     # Create all notes
209 |     for title, content in notes_data:
210 |         result = await write_note.fn(
211 |             project=test_project.name,
212 |             title=title,
213 |             folder="sequence-test",
214 |             content=content,
215 |         )
216 |         assert "# Created note" in result or "# Updated note" in result
217 | 
218 |     # Verify all notes still exist with correct content
219 |     for title, expected_content in notes_data:
220 |         # Normalize title to permalink format
221 |         permalink = f"sequence-test/{title.lower()}"
222 |         content = await read_note.fn(permalink, project=test_project.name)
223 | 
224 |         assert title in content, f"Note '{title}' should still have its title"
225 |         assert expected_content.split("\n\n")[1] in content, (
226 |             f"Note '{title}' should still have its original content"
227 |         )
228 | 
229 |     # Verify physical files exist
230 |     project_path = Path(test_project.path)
231 |     sequence_dir = project_path / "sequence-test"
232 | 
233 |     for title, _ in notes_data:
234 |         file_path = sequence_dir / f"{title}.md"
235 |         assert file_path.exists(), f"File for '{title}' should exist"
236 | 
237 |         file_content = file_path.read_text()
238 |         assert title in file_content, f"Physical file for '{title}' should contain correct title"
239 | 
240 | 
241 | @pytest.mark.asyncio
242 | async def test_sync_permalink_collision_file_overwrite_bug(
243 |     sync_service: SyncService,
244 |     project_config: ProjectConfig,
245 |     entity_service: EntityService,
246 | ):
247 |     """Test that reproduces the permalink collision file overwrite bug via sync.
248 | 
249 |     This test directly creates files and runs sync to reproduce the exact bug
250 |     discovered in live testing where Node C overwrote Node A.md.
251 | 
252 |     The bug occurs when:
253 |     1. File "Node A.md" exists with permalink "edge-cases/node-a"
254 |     2. File "Node C.md" is created with permalink "edge-cases/node-c"
255 |     3. During sync, somehow Node C content overwrites Node A.md
256 |     4. Result: File "Node A.md" contains Node C content (data loss!)
257 |     """
258 |     project_dir = project_config.home
259 |     edge_cases_dir = project_dir / "edge-cases"
260 |     edge_cases_dir.mkdir(parents=True, exist_ok=True)
261 | 
262 |     # Step 1: Create Node A file
263 |     node_a_content = dedent("""
264 |         ---
265 |         title: Node A
266 |         type: note
267 |         tags:
268 |         - circular-test
269 |         ---
270 | 
271 |         # Node A
272 | 
273 |         Original content for Node A
274 | 
275 |         ## Relations
276 |         - links_to [[Node B]]
277 |         - references [[Node C]]
278 |     """).strip()
279 | 
280 |     node_a_file = edge_cases_dir / "Node A.md"
281 |     node_a_file.write_text(node_a_content)
282 | 
283 |     # Sync to create Node A in database
284 |     await sync_service.sync(project_dir)
285 | 
286 |     # Verify Node A is in database
287 |     node_a = await entity_service.get_by_permalink("edge-cases/node-a")
288 |     assert node_a is not None
289 |     assert node_a.title == "Node A"
290 | 
291 |     # Verify Node A file has correct content
292 |     assert node_a_file.exists()
293 |     node_a_file_content = node_a_file.read_text()
294 |     assert "title: Node A" in node_a_file_content
295 |     assert "Original content for Node A" in node_a_file_content
296 | 
297 |     # Step 2: Create Node B file
298 |     node_b_content = dedent("""
299 |         ---
300 |         title: Node B
301 |         type: note
302 |         tags:
303 |         - circular-test
304 |         ---
305 | 
306 |         # Node B
307 | 
308 |         Content for Node B
309 | 
310 |         ## Relations
311 |         - links_to [[Node C]]
312 |         - part_of [[Node A]]
313 |     """).strip()
314 | 
315 |     node_b_file = edge_cases_dir / "Node B.md"
316 |     node_b_file.write_text(node_b_content)
317 | 
318 |     # Force full scan to detect the new file
319 |     # (file just created may not be newer than watermark due to timing precision)
320 |     await force_full_scan(sync_service)
321 | 
322 |     # Sync to create Node B
323 |     await sync_service.sync(project_dir)
324 | 
325 |     # Step 3: Create Node C file (this is where the bug might occur)
326 |     node_c_content = dedent("""
327 |         ---
328 |         title: Node C
329 |         type: note
330 |         tags:
331 |         - circular-test
332 |         ---
333 | 
334 |         # Node C
335 | 
336 |         Content for Node C
337 | 
338 |         ## Relations
339 |         - links_to [[Node A]]
340 |         - references [[Node B]]
341 |     """).strip()
342 | 
343 |     node_c_file = edge_cases_dir / "Node C.md"
344 |     node_c_file.write_text(node_c_content)
345 | 
346 |     # Force full scan to detect the new file
347 |     # (file just created may not be newer than watermark due to timing precision)
348 |     await force_full_scan(sync_service)
349 | 
350 |     # Sync to create Node C - THIS IS WHERE THE BUG OCCURS
351 |     await sync_service.sync(project_dir)
352 | 
353 |     # CRITICAL VERIFICATION: Check if Node A file was overwritten
354 |     assert node_a_file.exists(), "Node A.md file should still exist"
355 | 
356 |     # Read Node A file content to check for overwrite bug
357 |     node_a_after_sync = node_a_file.read_text()
358 | 
359 |     # The bug: Node A.md contains Node C content instead of Node A content
360 |     assert "title: Node A" in node_a_after_sync, (
361 |         "Node A.md file should still have title: Node A in frontmatter"
362 |     )
363 |     assert "Node A" in node_a_after_sync, "Node A.md file should still contain 'Node A' title"
364 |     assert "Original content for Node A" in node_a_after_sync, (
365 |         f"Node A.md file should NOT be overwritten! Content: {node_a_after_sync[:200]}"
366 |     )
367 |     assert "Content for Node C" not in node_a_after_sync, (
368 |         f"Node A.md should NOT contain Node C content! Content: {node_a_after_sync[:200]}"
369 |     )
370 | 
371 |     # Verify Node C file exists with correct content
372 |     assert node_c_file.exists(), "Node C.md file should exist"
373 |     node_c_after_sync = node_c_file.read_text()
374 |     assert "Node C" in node_c_after_sync
375 |     assert "Content for Node C" in node_c_after_sync
376 | 
377 |     # Verify database has both entities correctly
378 |     node_a_db = await entity_service.get_by_permalink("edge-cases/node-a")
379 |     node_c_db = await entity_service.get_by_permalink("edge-cases/node-c")
380 | 
381 |     assert node_a_db is not None, "Node A should exist in database"
382 |     assert node_a_db.title == "Node A", "Node A database entry should have correct title"
383 | 
384 |     assert node_c_db is not None, "Node C should exist in database"
385 |     assert node_c_db.title == "Node C", "Node C database entry should have correct title"
386 | 
```

--------------------------------------------------------------------------------
/src/basic_memory/api/routers/project_router.py:
--------------------------------------------------------------------------------

```python
  1 | """Router for project management."""
  2 | 
  3 | import os
  4 | from fastapi import APIRouter, HTTPException, Path, Body, BackgroundTasks, Response, Query
  5 | from typing import Optional
  6 | from loguru import logger
  7 | 
  8 | from basic_memory.deps import (
  9 |     ProjectConfigDep,
 10 |     ProjectServiceDep,
 11 |     ProjectPathDep,
 12 |     SyncServiceDep,
 13 | )
 14 | from basic_memory.schemas import ProjectInfoResponse, SyncReportResponse
 15 | from basic_memory.schemas.project_info import (
 16 |     ProjectList,
 17 |     ProjectItem,
 18 |     ProjectInfoRequest,
 19 |     ProjectStatusResponse,
 20 | )
 21 | from basic_memory.utils import normalize_project_path
 22 | 
 23 | # Router for resources in a specific project
 24 | # The ProjectPathDep is used in the path as a prefix, so the request path is like /{project}/project/info
 25 | project_router = APIRouter(prefix="/project", tags=["project"])
 26 | 
 27 | # Router for managing project resources
 28 | project_resource_router = APIRouter(prefix="/projects", tags=["project_management"])
 29 | 
 30 | 
 31 | @project_router.get("/info", response_model=ProjectInfoResponse)
 32 | async def get_project_info(
 33 |     project_service: ProjectServiceDep,
 34 |     project: ProjectPathDep,
 35 | ) -> ProjectInfoResponse:
 36 |     """Get comprehensive information about the specified Basic Memory project."""
 37 |     return await project_service.get_project_info(project)
 38 | 
 39 | 
 40 | @project_router.get("/item", response_model=ProjectItem)
 41 | async def get_project(
 42 |     project_service: ProjectServiceDep,
 43 |     project: ProjectPathDep,
 44 | ) -> ProjectItem:
 45 |     """Get bassic info about the specified Basic Memory project."""
 46 |     found_project = await project_service.get_project(project)
 47 |     if not found_project:
 48 |         raise HTTPException(
 49 |             status_code=404, detail=f"Project: '{project}' does not exist"
 50 |         )  # pragma: no cover
 51 | 
 52 |     return ProjectItem(
 53 |         name=found_project.name,
 54 |         path=normalize_project_path(found_project.path),
 55 |         is_default=found_project.is_default or False,
 56 |     )
 57 | 
 58 | 
 59 | # Update a project
 60 | @project_router.patch("/{name}", response_model=ProjectStatusResponse)
 61 | async def update_project(
 62 |     project_service: ProjectServiceDep,
 63 |     name: str = Path(..., description="Name of the project to update"),
 64 |     path: Optional[str] = Body(None, description="New absolute path for the project"),
 65 |     is_active: Optional[bool] = Body(None, description="Status of the project (active/inactive)"),
 66 | ) -> ProjectStatusResponse:
 67 |     """Update a project's information in configuration and database.
 68 | 
 69 |     Args:
 70 |         name: The name of the project to update
 71 |         path: Optional new absolute path for the project
 72 |         is_active: Optional status update for the project
 73 | 
 74 |     Returns:
 75 |         Response confirming the project was updated
 76 |     """
 77 |     try:
 78 |         # Validate that path is absolute if provided
 79 |         if path and not os.path.isabs(path):
 80 |             raise HTTPException(status_code=400, detail="Path must be absolute")
 81 | 
 82 |         # Get original project info for the response
 83 |         old_project_info = ProjectItem(
 84 |             name=name,
 85 |             path=project_service.projects.get(name, ""),
 86 |         )
 87 | 
 88 |         if path:
 89 |             await project_service.move_project(name, path)
 90 |         elif is_active is not None:
 91 |             await project_service.update_project(name, is_active=is_active)
 92 | 
 93 |         # Get updated project info
 94 |         updated_path = path if path else project_service.projects.get(name, "")
 95 | 
 96 |         return ProjectStatusResponse(
 97 |             message=f"Project '{name}' updated successfully",
 98 |             status="success",
 99 |             default=(name == project_service.default_project),
100 |             old_project=old_project_info,
101 |             new_project=ProjectItem(name=name, path=updated_path),
102 |         )
103 |     except ValueError as e:
104 |         raise HTTPException(status_code=400, detail=str(e))
105 | 
106 | 
107 | # Sync project filesystem
108 | @project_router.post("/sync")
109 | async def sync_project(
110 |     background_tasks: BackgroundTasks,
111 |     sync_service: SyncServiceDep,
112 |     project_config: ProjectConfigDep,
113 |     force_full: bool = Query(
114 |         False, description="Force full scan, bypassing watermark optimization"
115 |     ),
116 | ):
117 |     """Force project filesystem sync to database.
118 | 
119 |     Scans the project directory and updates the database with any new or modified files.
120 | 
121 |     Args:
122 |         background_tasks: FastAPI background tasks
123 |         sync_service: Sync service for this project
124 |         project_config: Project configuration
125 |         force_full: If True, force a full scan even if watermark exists
126 | 
127 |     Returns:
128 |         Response confirming sync was initiated
129 |     """
130 |     background_tasks.add_task(
131 |         sync_service.sync, project_config.home, project_config.name, force_full=force_full
132 |     )
133 |     logger.info(
134 |         f"Filesystem sync initiated for project: {project_config.name} (force_full={force_full})"
135 |     )
136 | 
137 |     return {
138 |         "status": "sync_started",
139 |         "message": f"Filesystem sync initiated for project '{project_config.name}'",
140 |     }
141 | 
142 | 
143 | @project_router.post("/status", response_model=SyncReportResponse)
144 | async def project_sync_status(
145 |     sync_service: SyncServiceDep,
146 |     project_config: ProjectConfigDep,
147 | ) -> SyncReportResponse:
148 |     """Scan directory for changes compared to database state.
149 | 
150 |     Args:
151 |         sync_service: Sync service for this project
152 |         project_config: Project configuration
153 | 
154 |     Returns:
155 |         Scan report with details on files that need syncing
156 |     """
157 |     logger.info(f"Scanning filesystem for project: {project_config.name}")
158 |     sync_report = await sync_service.scan(project_config.home)
159 | 
160 |     return SyncReportResponse.from_sync_report(sync_report)
161 | 
162 | 
163 | # List all available projects
164 | @project_resource_router.get("/projects", response_model=ProjectList)
165 | async def list_projects(
166 |     project_service: ProjectServiceDep,
167 | ) -> ProjectList:
168 |     """List all configured projects.
169 | 
170 |     Returns:
171 |         A list of all projects with metadata
172 |     """
173 |     projects = await project_service.list_projects()
174 |     default_project = project_service.default_project
175 | 
176 |     project_items = [
177 |         ProjectItem(
178 |             name=project.name,
179 |             path=normalize_project_path(project.path),
180 |             is_default=project.is_default or False,
181 |         )
182 |         for project in projects
183 |     ]
184 | 
185 |     return ProjectList(
186 |         projects=project_items,
187 |         default_project=default_project,
188 |     )
189 | 
190 | 
191 | # Add a new project
192 | @project_resource_router.post("/projects", response_model=ProjectStatusResponse, status_code=201)
193 | async def add_project(
194 |     response: Response,
195 |     project_data: ProjectInfoRequest,
196 |     project_service: ProjectServiceDep,
197 | ) -> ProjectStatusResponse:
198 |     """Add a new project to configuration and database.
199 | 
200 |     Args:
201 |         project_data: The project name and path, with option to set as default
202 | 
203 |     Returns:
204 |         Response confirming the project was added
205 |     """
206 |     # Check if project already exists before attempting to add
207 |     existing_project = await project_service.get_project(project_data.name)
208 |     if existing_project:
209 |         # Project exists - check if paths match for true idempotency
210 |         # Normalize paths for comparison (resolve symlinks, etc.)
211 |         from pathlib import Path
212 | 
213 |         requested_path = Path(project_data.path).resolve()
214 |         existing_path = Path(existing_project.path).resolve()
215 | 
216 |         if requested_path == existing_path:
217 |             # Same name, same path - return 200 OK (idempotent)
218 |             response.status_code = 200
219 |             return ProjectStatusResponse(  # pyright: ignore [reportCallIssue]
220 |                 message=f"Project '{project_data.name}' already exists",
221 |                 status="success",
222 |                 default=existing_project.is_default or False,
223 |                 new_project=ProjectItem(
224 |                     name=existing_project.name,
225 |                     path=existing_project.path,
226 |                     is_default=existing_project.is_default or False,
227 |                 ),
228 |             )
229 |         else:
230 |             # Same name, different path - this is an error
231 |             raise HTTPException(
232 |                 status_code=400,
233 |                 detail=f"Project '{project_data.name}' already exists with different path. Existing: {existing_project.path}, Requested: {project_data.path}",
234 |             )
235 | 
236 |     try:  # pragma: no cover
237 |         # The service layer now handles cloud mode validation and path sanitization
238 |         await project_service.add_project(
239 |             project_data.name, project_data.path, set_default=project_data.set_default
240 |         )
241 | 
242 |         return ProjectStatusResponse(  # pyright: ignore [reportCallIssue]
243 |             message=f"Project '{project_data.name}' added successfully",
244 |             status="success",
245 |             default=project_data.set_default,
246 |             new_project=ProjectItem(
247 |                 name=project_data.name, path=project_data.path, is_default=project_data.set_default
248 |             ),
249 |         )
250 |     except ValueError as e:  # pragma: no cover
251 |         raise HTTPException(status_code=400, detail=str(e))
252 | 
253 | 
254 | # Remove a project
255 | @project_resource_router.delete("/{name}", response_model=ProjectStatusResponse)
256 | async def remove_project(
257 |     project_service: ProjectServiceDep,
258 |     name: str = Path(..., description="Name of the project to remove"),
259 |     delete_notes: bool = Query(
260 |         False, description="If True, delete project directory from filesystem"
261 |     ),
262 | ) -> ProjectStatusResponse:
263 |     """Remove a project from configuration and database.
264 | 
265 |     Args:
266 |         name: The name of the project to remove
267 |         delete_notes: If True, delete the project directory from the filesystem
268 | 
269 |     Returns:
270 |         Response confirming the project was removed
271 |     """
272 |     try:
273 |         old_project = await project_service.get_project(name)
274 |         if not old_project:  # pragma: no cover
275 |             raise HTTPException(
276 |                 status_code=404, detail=f"Project: '{name}' does not exist"
277 |             )  # pragma: no cover
278 | 
279 |         # Check if trying to delete the default project
280 |         if name == project_service.default_project:
281 |             available_projects = await project_service.list_projects()
282 |             other_projects = [p.name for p in available_projects if p.name != name]
283 |             detail = f"Cannot delete default project '{name}'. "
284 |             if other_projects:
285 |                 detail += (
286 |                     f"Set another project as default first. Available: {', '.join(other_projects)}"
287 |                 )
288 |             else:
289 |                 detail += "This is the only project in your configuration."
290 |             raise HTTPException(status_code=400, detail=detail)
291 | 
292 |         await project_service.remove_project(name, delete_notes=delete_notes)
293 | 
294 |         return ProjectStatusResponse(
295 |             message=f"Project '{name}' removed successfully",
296 |             status="success",
297 |             default=False,
298 |             old_project=ProjectItem(name=old_project.name, path=old_project.path),
299 |             new_project=None,
300 |         )
301 |     except ValueError as e:  # pragma: no cover
302 |         raise HTTPException(status_code=400, detail=str(e))
303 | 
304 | 
305 | # Set a project as default
306 | @project_resource_router.put("/{name}/default", response_model=ProjectStatusResponse)
307 | async def set_default_project(
308 |     project_service: ProjectServiceDep,
309 |     name: str = Path(..., description="Name of the project to set as default"),
310 | ) -> ProjectStatusResponse:
311 |     """Set a project as the default project.
312 | 
313 |     Args:
314 |         name: The name of the project to set as default
315 | 
316 |     Returns:
317 |         Response confirming the project was set as default
318 |     """
319 |     try:
320 |         # Get the old default project
321 |         default_name = project_service.default_project
322 |         default_project = await project_service.get_project(default_name)
323 |         if not default_project:  # pragma: no cover
324 |             raise HTTPException(  # pragma: no cover
325 |                 status_code=404, detail=f"Default Project: '{default_name}' does not exist"
326 |             )
327 | 
328 |         # get the new project
329 |         new_default_project = await project_service.get_project(name)
330 |         if not new_default_project:  # pragma: no cover
331 |             raise HTTPException(
332 |                 status_code=404, detail=f"Project: '{name}' does not exist"
333 |             )  # pragma: no cover
334 | 
335 |         await project_service.set_default_project(name)
336 | 
337 |         return ProjectStatusResponse(
338 |             message=f"Project '{name}' set as default successfully",
339 |             status="success",
340 |             default=True,
341 |             old_project=ProjectItem(name=default_name, path=default_project.path),
342 |             new_project=ProjectItem(
343 |                 name=name,
344 |                 path=new_default_project.path,
345 |                 is_default=True,
346 |             ),
347 |         )
348 |     except ValueError as e:  # pragma: no cover
349 |         raise HTTPException(status_code=400, detail=str(e))
350 | 
351 | 
352 | # Get the default project
353 | @project_resource_router.get("/default", response_model=ProjectItem)
354 | async def get_default_project(
355 |     project_service: ProjectServiceDep,
356 | ) -> ProjectItem:
357 |     """Get the default project.
358 | 
359 |     Returns:
360 |         Response with project default information
361 |     """
362 |     # Get the old default project
363 |     default_name = project_service.default_project
364 |     default_project = await project_service.get_project(default_name)
365 |     if not default_project:  # pragma: no cover
366 |         raise HTTPException(  # pragma: no cover
367 |             status_code=404, detail=f"Default Project: '{default_name}' does not exist"
368 |         )
369 | 
370 |     return ProjectItem(name=default_project.name, path=default_project.path, is_default=True)
371 | 
372 | 
373 | # Synchronize projects between config and database
374 | @project_resource_router.post("/config/sync", response_model=ProjectStatusResponse)
375 | async def synchronize_projects(
376 |     project_service: ProjectServiceDep,
377 | ) -> ProjectStatusResponse:
378 |     """Synchronize projects between configuration file and database.
379 | 
380 |     Ensures that all projects in the configuration file exist in the database
381 |     and vice versa.
382 | 
383 |     Returns:
384 |         Response confirming synchronization was completed
385 |     """
386 |     try:  # pragma: no cover
387 |         await project_service.synchronize_projects()
388 | 
389 |         return ProjectStatusResponse(  # pyright: ignore [reportCallIssue]
390 |             message="Projects synchronized successfully between configuration and database",
391 |             status="success",
392 |             default=False,
393 |         )
394 |     except ValueError as e:  # pragma: no cover
395 |         raise HTTPException(status_code=400, detail=str(e))
396 | 
```

--------------------------------------------------------------------------------
/test-int/mcp/test_delete_note_integration.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Integration tests for delete_note MCP tool.
  3 | 
  4 | Tests the complete delete note workflow: MCP client -> MCP server -> FastAPI -> database
  5 | """
  6 | 
  7 | import pytest
  8 | from fastmcp import Client
  9 | 
 10 | 
 11 | @pytest.mark.asyncio
 12 | async def test_delete_note_by_title(mcp_server, app, test_project):
 13 |     """Test deleting a note by its title."""
 14 | 
 15 |     async with Client(mcp_server) as client:
 16 |         # First create a note
 17 |         await client.call_tool(
 18 |             "write_note",
 19 |             {
 20 |                 "project": test_project.name,
 21 |                 "title": "Note to Delete",
 22 |                 "folder": "test",
 23 |                 "content": "# Note to Delete\n\nThis note will be deleted.",
 24 |                 "tags": "test,delete",
 25 |             },
 26 |         )
 27 | 
 28 |         # Verify the note exists by reading it
 29 |         read_result = await client.call_tool(
 30 |             "read_note",
 31 |             {
 32 |                 "project": test_project.name,
 33 |                 "identifier": "Note to Delete",
 34 |             },
 35 |         )
 36 |         assert len(read_result.content) == 1
 37 |         assert "Note to Delete" in read_result.content[0].text
 38 | 
 39 |         # Delete the note by title
 40 |         delete_result = await client.call_tool(
 41 |             "delete_note",
 42 |             {
 43 |                 "project": test_project.name,
 44 |                 "identifier": "Note to Delete",
 45 |             },
 46 |         )
 47 | 
 48 |         # Should return True for successful deletion
 49 |         assert len(delete_result.content) == 1
 50 |         assert delete_result.content[0].type == "text"
 51 |         assert "true" in delete_result.content[0].text.lower()
 52 | 
 53 |         # Verify the note no longer exists
 54 |         read_after_delete = await client.call_tool(
 55 |             "read_note",
 56 |             {
 57 |                 "project": test_project.name,
 58 |                 "identifier": "Note to Delete",
 59 |             },
 60 |         )
 61 | 
 62 |         # Should return helpful "Note Not Found" message instead of the actual note
 63 |         assert len(read_after_delete.content) == 1
 64 |         result_text = read_after_delete.content[0].text
 65 |         assert "Note Not Found" in result_text
 66 |         assert "Note to Delete" in result_text
 67 | 
 68 | 
 69 | @pytest.mark.asyncio
 70 | async def test_delete_note_by_permalink(mcp_server, app, test_project):
 71 |     """Test deleting a note by its permalink."""
 72 | 
 73 |     async with Client(mcp_server) as client:
 74 |         # Create a note
 75 |         await client.call_tool(
 76 |             "write_note",
 77 |             {
 78 |                 "project": test_project.name,
 79 |                 "title": "Permalink Delete Test",
 80 |                 "folder": "tests",
 81 |                 "content": "# Permalink Delete Test\n\nTesting deletion by permalink.",
 82 |                 "tags": "test,permalink",
 83 |             },
 84 |         )
 85 | 
 86 |         # Delete the note by permalink
 87 |         delete_result = await client.call_tool(
 88 |             "delete_note",
 89 |             {
 90 |                 "project": test_project.name,
 91 |                 "identifier": "tests/permalink-delete-test",
 92 |             },
 93 |         )
 94 | 
 95 |         # Should return True for successful deletion
 96 |         assert len(delete_result.content) == 1
 97 |         assert "true" in delete_result.content[0].text.lower()
 98 | 
 99 |         # Verify the note no longer exists by searching
100 |         search_result = await client.call_tool(
101 |             "search_notes",
102 |             {
103 |                 "project": test_project.name,
104 |                 "query": "Permalink Delete Test",
105 |             },
106 |         )
107 | 
108 |         # Should have no results
109 |         assert (
110 |             '"results": []' in search_result.content[0].text
111 |             or '"results":[]' in search_result.content[0].text
112 |         )
113 | 
114 | 
115 | @pytest.mark.asyncio
116 | async def test_delete_note_with_observations_and_relations(mcp_server, app, test_project):
117 |     """Test deleting a note that has observations and relations."""
118 | 
119 |     async with Client(mcp_server) as client:
120 |         # Create a complex note with observations and relations
121 |         complex_content = """# Project Management System
122 | 
123 | This is a comprehensive project management system.
124 | 
125 | ## Observations
126 | - [feature] Task tracking functionality
127 | - [feature] User authentication system
128 | - [tech] Built with Python and Flask
129 | - [status] Currently in development
130 | 
131 | ## Relations
132 | - depends_on [[Database Schema]]
133 | - implements [[User Stories]]
134 | - part_of [[Main Application]]
135 | 
136 | The system handles multiple projects and users."""
137 | 
138 |         await client.call_tool(
139 |             "write_note",
140 |             {
141 |                 "project": test_project.name,
142 |                 "title": "Project Management System",
143 |                 "folder": "projects",
144 |                 "content": complex_content,
145 |                 "tags": "project,management,system",
146 |             },
147 |         )
148 | 
149 |         # Verify the note exists and has content
150 |         read_result = await client.call_tool(
151 |             "read_note",
152 |             {
153 |                 "project": test_project.name,
154 |                 "identifier": "Project Management System",
155 |             },
156 |         )
157 |         assert len(read_result.content) == 1
158 |         result_text = read_result.content[0].text
159 |         assert "Task tracking functionality" in result_text
160 |         assert "depends_on" in result_text
161 | 
162 |         # Delete the complex note
163 |         delete_result = await client.call_tool(
164 |             "delete_note",
165 |             {
166 |                 "project": test_project.name,
167 |                 "identifier": "projects/project-management-system",
168 |             },
169 |         )
170 | 
171 |         # Should return True for successful deletion
172 |         assert "true" in delete_result.content[0].text.lower()
173 | 
174 |         # Verify the note and all its components are deleted
175 |         read_after_delete_2 = await client.call_tool(
176 |             "read_note",
177 |             {
178 |                 "project": test_project.name,
179 |                 "identifier": "Project Management System",
180 |             },
181 |         )
182 | 
183 |         # Should return "Note Not Found" message
184 |         assert len(read_after_delete_2.content) == 1
185 |         result_text = read_after_delete_2.content[0].text
186 |         assert "Note Not Found" in result_text
187 |         assert "Project Management System" in result_text
188 | 
189 | 
190 | @pytest.mark.asyncio
191 | async def test_delete_note_special_characters_in_title(mcp_server, app, test_project):
192 |     """Test deleting notes with special characters in the title."""
193 | 
194 |     async with Client(mcp_server) as client:
195 |         # Create notes with special characters
196 |         special_titles = [
197 |             "Note with spaces",
198 |             "Note-with-dashes",
199 |             "Note_with_underscores",
200 |             "Note (with parentheses)",
201 |             "Note & Symbols!",
202 |         ]
203 | 
204 |         # Create all the notes
205 |         for title in special_titles:
206 |             await client.call_tool(
207 |                 "write_note",
208 |                 {
209 |                     "project": test_project.name,
210 |                     "title": title,
211 |                     "folder": "special",
212 |                     "content": f"# {title}\n\nContent for {title}",
213 |                     "tags": "special,characters",
214 |                 },
215 |             )
216 | 
217 |         # Delete each note by title
218 |         for title in special_titles:
219 |             delete_result = await client.call_tool(
220 |                 "delete_note",
221 |                 {
222 |                     "project": test_project.name,
223 |                     "identifier": title,
224 |                 },
225 |             )
226 | 
227 |             # Should return True for successful deletion
228 |             assert "true" in delete_result.content[0].text.lower(), (
229 |                 f"Failed to delete note: {title}"
230 |             )
231 | 
232 |             # Verify the note is deleted
233 |             read_after_delete = await client.call_tool(
234 |                 "read_note",
235 |                 {
236 |                     "project": test_project.name,
237 |                     "identifier": title,
238 |                 },
239 |             )
240 | 
241 |             # Should return "Note Not Found" message
242 |             assert len(read_after_delete.content) == 1
243 |             result_text = read_after_delete.content[0].text
244 |             assert "Note Not Found" in result_text
245 |             assert title in result_text
246 | 
247 | 
248 | @pytest.mark.asyncio
249 | async def test_delete_nonexistent_note(mcp_server, app, test_project):
250 |     """Test attempting to delete a note that doesn't exist."""
251 | 
252 |     async with Client(mcp_server) as client:
253 |         # Try to delete a note that doesn't exist
254 |         delete_result = await client.call_tool(
255 |             "delete_note",
256 |             {
257 |                 "project": test_project.name,
258 |                 "identifier": "Nonexistent Note",
259 |             },
260 |         )
261 | 
262 |         # Should return False for unsuccessful deletion
263 |         assert len(delete_result.content) == 1
264 |         assert "false" in delete_result.content[0].text.lower()
265 | 
266 | 
267 | @pytest.mark.asyncio
268 | async def test_delete_note_by_file_path(mcp_server, app, test_project):
269 |     """Test deleting a note using its file path."""
270 | 
271 |     async with Client(mcp_server) as client:
272 |         # Create a note
273 |         await client.call_tool(
274 |             "write_note",
275 |             {
276 |                 "project": test_project.name,
277 |                 "title": "File Path Delete",
278 |                 "folder": "docs",
279 |                 "content": "# File Path Delete\n\nTesting deletion by file path.",
280 |                 "tags": "test,filepath",
281 |             },
282 |         )
283 | 
284 |         # Try to delete using the file path (should work as an identifier)
285 |         delete_result = await client.call_tool(
286 |             "delete_note",
287 |             {
288 |                 "project": test_project.name,
289 |                 "identifier": "docs/File Path Delete.md",
290 |             },
291 |         )
292 | 
293 |         # Should return True for successful deletion
294 |         assert "true" in delete_result.content[0].text.lower()
295 | 
296 |         # Verify deletion
297 |         read_after_delete = await client.call_tool(
298 |             "read_note",
299 |             {
300 |                 "project": test_project.name,
301 |                 "identifier": "File Path Delete",
302 |             },
303 |         )
304 | 
305 |         # Should return "Note Not Found" message
306 |         assert len(read_after_delete.content) == 1
307 |         result_text = read_after_delete.content[0].text
308 |         assert "Note Not Found" in result_text
309 |         assert "File Path Delete" in result_text
310 | 
311 | 
312 | @pytest.mark.asyncio
313 | async def test_delete_note_case_insensitive(mcp_server, app, test_project):
314 |     """Test that note deletion is case insensitive for titles."""
315 | 
316 |     async with Client(mcp_server) as client:
317 |         # Create a note with mixed case
318 |         await client.call_tool(
319 |             "write_note",
320 |             {
321 |                 "project": test_project.name,
322 |                 "title": "CamelCase Note Title",
323 |                 "folder": "test",
324 |                 "content": "# CamelCase Note Title\n\nTesting case sensitivity.",
325 |                 "tags": "test,case",
326 |             },
327 |         )
328 | 
329 |         # Try to delete with different case
330 |         delete_result = await client.call_tool(
331 |             "delete_note",
332 |             {
333 |                 "project": test_project.name,
334 |                 "identifier": "camelcase note title",
335 |             },
336 |         )
337 | 
338 |         # Should return True for successful deletion
339 |         assert "true" in delete_result.content[0].text.lower()
340 | 
341 | 
342 | @pytest.mark.asyncio
343 | async def test_delete_multiple_notes_sequentially(mcp_server, app, test_project):
344 |     """Test deleting multiple notes in sequence."""
345 | 
346 |     async with Client(mcp_server) as client:
347 |         # Create multiple notes
348 |         note_titles = [
349 |             "First Note",
350 |             "Second Note",
351 |             "Third Note",
352 |             "Fourth Note",
353 |             "Fifth Note",
354 |         ]
355 | 
356 |         for title in note_titles:
357 |             await client.call_tool(
358 |                 "write_note",
359 |                 {
360 |                     "project": test_project.name,
361 |                     "title": title,
362 |                     "folder": "batch",
363 |                     "content": f"# {title}\n\nContent for {title}",
364 |                     "tags": "batch,test",
365 |                 },
366 |             )
367 | 
368 |         # Delete all notes sequentially
369 |         for title in note_titles:
370 |             delete_result = await client.call_tool(
371 |                 "delete_note",
372 |                 {
373 |                     "project": test_project.name,
374 |                     "identifier": title,
375 |                 },
376 |             )
377 | 
378 |             # Each deletion should be successful
379 |             assert "true" in delete_result.content[0].text.lower(), f"Failed to delete {title}"
380 | 
381 |         # Verify all notes are deleted by searching
382 |         search_result = await client.call_tool(
383 |             "search_notes",
384 |             {
385 |                 "project": test_project.name,
386 |                 "query": "batch",
387 |             },
388 |         )
389 | 
390 |         # Should have no results
391 |         assert (
392 |             '"results": []' in search_result.content[0].text
393 |             or '"results":[]' in search_result.content[0].text
394 |         )
395 | 
396 | 
397 | @pytest.mark.asyncio
398 | async def test_delete_note_with_unicode_content(mcp_server, app, test_project):
399 |     """Test deleting notes with Unicode content."""
400 | 
401 |     async with Client(mcp_server) as client:
402 |         # Create a note with Unicode content
403 |         unicode_content = """# Unicode Test Note 🚀
404 | 
405 | This note contains various Unicode characters:
406 | - Emojis: 🎉 🔥 ⚡ 💡
407 | - Languages: 测试中文 Tëst Übër
408 | - Symbols: ♠♣♥♦ ←→↑↓ ∞≠≤≥
409 | - Math: ∑∏∂∇∆Ω
410 | 
411 | ## Observations
412 | - [test] Unicode characters preserved ✓
413 | - [note] Emoji support working 🎯
414 | 
415 | ## Relations  
416 | - supports [[Unicode Standards]]
417 | - tested_with [[Various Languages]]"""
418 | 
419 |         await client.call_tool(
420 |             "write_note",
421 |             {
422 |                 "project": test_project.name,
423 |                 "title": "Unicode Test Note",
424 |                 "folder": "unicode",
425 |                 "content": unicode_content,
426 |                 "tags": "unicode,test,emoji",
427 |             },
428 |         )
429 | 
430 |         # Delete the Unicode note
431 |         delete_result = await client.call_tool(
432 |             "delete_note",
433 |             {
434 |                 "project": test_project.name,
435 |                 "identifier": "Unicode Test Note",
436 |             },
437 |         )
438 | 
439 |         # Should return True for successful deletion
440 |         assert "true" in delete_result.content[0].text.lower()
441 | 
442 |         # Verify deletion
443 |         read_after_delete = await client.call_tool(
444 |             "read_note",
445 |             {
446 |                 "project": test_project.name,
447 |                 "identifier": "Unicode Test Note",
448 |             },
449 |         )
450 | 
451 |         # Should return "Note Not Found" message
452 |         assert len(read_after_delete.content) == 1
453 |         result_text = read_after_delete.content[0].text
454 |         assert "Note Not Found" in result_text
455 |         assert "Unicode Test Note" in result_text
456 | 
```

--------------------------------------------------------------------------------
/src/basic_memory/utils.py:
--------------------------------------------------------------------------------

```python
  1 | """Utility functions for basic-memory."""
  2 | 
  3 | import os
  4 | 
  5 | import logging
  6 | import re
  7 | import sys
  8 | from datetime import datetime
  9 | from pathlib import Path
 10 | from typing import Optional, Protocol, Union, runtime_checkable, List
 11 | 
 12 | from loguru import logger
 13 | from unidecode import unidecode
 14 | 
 15 | 
 16 | def normalize_project_path(path: str) -> str:
 17 |     """Normalize project path by stripping mount point prefix.
 18 | 
 19 |     In cloud deployments, the S3 bucket is mounted at /app/data. We strip this
 20 |     prefix from project paths to avoid leaking implementation details and to
 21 |     ensure paths match the actual S3 bucket structure.
 22 | 
 23 |     For local paths (including Windows paths), returns the path unchanged.
 24 | 
 25 |     Args:
 26 |         path: Project path (e.g., "/app/data/basic-memory-llc" or "C:\\Users\\...")
 27 | 
 28 |     Returns:
 29 |         Normalized path (e.g., "/basic-memory-llc" or "C:\\Users\\...")
 30 | 
 31 |     Examples:
 32 |         >>> normalize_project_path("/app/data/my-project")
 33 |         '/my-project'
 34 |         >>> normalize_project_path("/my-project")
 35 |         '/my-project'
 36 |         >>> normalize_project_path("app/data/my-project")
 37 |         '/my-project'
 38 |         >>> normalize_project_path("C:\\\\Users\\\\project")
 39 |         'C:\\\\Users\\\\project'
 40 |     """
 41 |     # Check if this is a Windows absolute path (e.g., C:\Users\...)
 42 |     # Windows paths have a drive letter followed by a colon
 43 |     if len(path) >= 2 and path[1] == ":":
 44 |         # Windows absolute path - return unchanged
 45 |         return path
 46 | 
 47 |     # Handle both absolute and relative Unix paths
 48 |     normalized = path.lstrip("/")
 49 |     if normalized.startswith("app/data/"):
 50 |         normalized = normalized.removeprefix("app/data/")
 51 | 
 52 |     # Ensure leading slash for Unix absolute paths
 53 |     if not normalized.startswith("/"):
 54 |         normalized = "/" + normalized
 55 | 
 56 |     return normalized
 57 | 
 58 | 
 59 | @runtime_checkable
 60 | class PathLike(Protocol):
 61 |     """Protocol for objects that can be used as paths."""
 62 | 
 63 |     def __str__(self) -> str: ...
 64 | 
 65 | 
 66 | # In type annotations, use Union[Path, str] instead of FilePath for now
 67 | # This preserves compatibility with existing code while we migrate
 68 | FilePath = Union[Path, str]
 69 | 
 70 | # Disable the "Queue is full" warning
 71 | logging.getLogger("opentelemetry.sdk.metrics._internal.instrument").setLevel(logging.ERROR)
 72 | 
 73 | 
 74 | def generate_permalink(file_path: Union[Path, str, PathLike], split_extension: bool = True) -> str:
 75 |     """Generate a stable permalink from a file path.
 76 | 
 77 |     Args:
 78 |         file_path: Original file path (str, Path, or PathLike)
 79 | 
 80 |     Returns:
 81 |         Normalized permalink that matches validation rules. Converts spaces and underscores
 82 |         to hyphens for consistency. Preserves non-ASCII characters like Chinese.
 83 | 
 84 |     Examples:
 85 |         >>> generate_permalink("docs/My Feature.md")
 86 |         'docs/my-feature'
 87 |         >>> generate_permalink("specs/API (v2).md")
 88 |         'specs/api-v2'
 89 |         >>> generate_permalink("design/unified_model_refactor.md")
 90 |         'design/unified-model-refactor'
 91 |         >>> generate_permalink("中文/测试文档.md")
 92 |         '中文/测试文档'
 93 |     """
 94 |     # Convert Path to string if needed
 95 |     path_str = Path(str(file_path)).as_posix()
 96 | 
 97 |     # Remove extension (for now, possibly)
 98 |     (base, extension) = os.path.splitext(path_str)
 99 | 
100 |     # Check if we have CJK characters that should be preserved
101 |     # CJK ranges: \u4e00-\u9fff (CJK Unified Ideographs), \u3000-\u303f (CJK symbols),
102 |     # \u3400-\u4dbf (CJK Extension A), \uff00-\uffef (Fullwidth forms)
103 |     has_cjk_chars = any(
104 |         "\u4e00" <= char <= "\u9fff"
105 |         or "\u3000" <= char <= "\u303f"
106 |         or "\u3400" <= char <= "\u4dbf"
107 |         or "\uff00" <= char <= "\uffef"
108 |         for char in base
109 |     )
110 | 
111 |     if has_cjk_chars:
112 |         # For text with CJK characters, selectively transliterate only Latin accented chars
113 |         result = ""
114 |         for char in base:
115 |             if (
116 |                 "\u4e00" <= char <= "\u9fff"
117 |                 or "\u3000" <= char <= "\u303f"
118 |                 or "\u3400" <= char <= "\u4dbf"
119 |             ):
120 |                 # Preserve CJK ideographs and symbols
121 |                 result += char
122 |             elif "\uff00" <= char <= "\uffef":
123 |                 # Remove Chinese fullwidth punctuation entirely (like ,!?)
124 |                 continue
125 |             else:
126 |                 # Transliterate Latin accented characters to ASCII
127 |                 result += unidecode(char)
128 | 
129 |         # Insert hyphens between CJK and Latin character transitions
130 |         # Match: CJK followed by Latin letter/digit, or Latin letter/digit followed by CJK
131 |         result = re.sub(
132 |             r"([\u4e00-\u9fff\u3000-\u303f\u3400-\u4dbf])([a-zA-Z0-9])", r"\1-\2", result
133 |         )
134 |         result = re.sub(
135 |             r"([a-zA-Z0-9])([\u4e00-\u9fff\u3000-\u303f\u3400-\u4dbf])", r"\1-\2", result
136 |         )
137 | 
138 |         # Insert dash between camelCase
139 |         result = re.sub(r"([a-z0-9])([A-Z])", r"\1-\2", result)
140 | 
141 |         # Convert ASCII letters to lowercase, preserve CJK
142 |         lower_text = "".join(c.lower() if c.isascii() and c.isalpha() else c for c in result)
143 | 
144 |         # Replace underscores with hyphens
145 |         text_with_hyphens = lower_text.replace("_", "-")
146 | 
147 |         # Remove apostrophes entirely (don't replace with hyphens)
148 |         text_no_apostrophes = text_with_hyphens.replace("'", "")
149 | 
150 |         # Replace unsafe chars with hyphens, but preserve CJK characters
151 |         clean_text = re.sub(
152 |             r"[^a-z0-9\u4e00-\u9fff\u3000-\u303f\u3400-\u4dbf/\-]", "-", text_no_apostrophes
153 |         )
154 |     else:
155 |         # Original ASCII-only processing for backward compatibility
156 |         # Transliterate unicode to ascii
157 |         ascii_text = unidecode(base)
158 | 
159 |         # Insert dash between camelCase
160 |         ascii_text = re.sub(r"([a-z0-9])([A-Z])", r"\1-\2", ascii_text)
161 | 
162 |         # Convert to lowercase
163 |         lower_text = ascii_text.lower()
164 | 
165 |         # replace underscores with hyphens
166 |         text_with_hyphens = lower_text.replace("_", "-")
167 | 
168 |         # Remove apostrophes entirely (don't replace with hyphens)
169 |         text_no_apostrophes = text_with_hyphens.replace("'", "")
170 | 
171 |         # Replace remaining invalid chars with hyphens
172 |         clean_text = re.sub(r"[^a-z0-9/\-]", "-", text_no_apostrophes)
173 | 
174 |     # Collapse multiple hyphens
175 |     clean_text = re.sub(r"-+", "-", clean_text)
176 | 
177 |     # Clean each path segment
178 |     segments = clean_text.split("/")
179 |     clean_segments = [s.strip("-") for s in segments]
180 | 
181 |     return_val = "/".join(clean_segments)
182 | 
183 |     # Append file extension back, if necessary
184 |     if not split_extension and extension:
185 |         return_val += extension
186 | 
187 |     return return_val
188 | 
189 | 
190 | def setup_logging(
191 |     env: str,
192 |     home_dir: Path,
193 |     log_file: Optional[str] = None,
194 |     log_level: str = "INFO",
195 |     console: bool = True,
196 | ) -> None:  # pragma: no cover
197 |     """
198 |     Configure logging for the application.
199 | 
200 |     Args:
201 |         env: The environment name (dev, test, prod)
202 |         home_dir: The root directory for the application
203 |         log_file: The name of the log file to write to
204 |         log_level: The logging level to use
205 |         console: Whether to log to the console
206 |     """
207 |     # Remove default handler and any existing handlers
208 |     logger.remove()
209 | 
210 |     # Add file handler if we are not running tests and a log file is specified
211 |     if log_file and env != "test":
212 |         # Setup file logger
213 |         log_path = home_dir / log_file
214 |         logger.add(
215 |             str(log_path),
216 |             level=log_level,
217 |             rotation="10 MB",
218 |             retention="10 days",
219 |             backtrace=True,
220 |             diagnose=True,
221 |             enqueue=True,
222 |             colorize=False,
223 |         )
224 | 
225 |     # Add console logger if requested or in test mode
226 |     if env == "test" or console:
227 |         logger.add(sys.stderr, level=log_level, backtrace=True, diagnose=True, colorize=True)
228 | 
229 |     logger.info(f"ENV: '{env}' Log level: '{log_level}' Logging to {log_file}")
230 | 
231 |     # Bind environment context for structured logging (works in both local and cloud)
232 |     tenant_id = os.getenv("BASIC_MEMORY_TENANT_ID", "local")
233 |     fly_app_name = os.getenv("FLY_APP_NAME", "local")
234 |     fly_machine_id = os.getenv("FLY_MACHINE_ID", "local")
235 |     fly_region = os.getenv("FLY_REGION", "local")
236 | 
237 |     logger.configure(
238 |         extra={
239 |             "tenant_id": tenant_id,
240 |             "fly_app_name": fly_app_name,
241 |             "fly_machine_id": fly_machine_id,
242 |             "fly_region": fly_region,
243 |         }
244 |     )
245 | 
246 |     # Reduce noise from third-party libraries
247 |     noisy_loggers = {
248 |         # HTTP client logs
249 |         "httpx": logging.WARNING,
250 |         # File watching logs
251 |         "watchfiles.main": logging.WARNING,
252 |     }
253 | 
254 |     # Set log levels for noisy loggers
255 |     for logger_name, level in noisy_loggers.items():
256 |         logging.getLogger(logger_name).setLevel(level)
257 | 
258 | 
259 | def parse_tags(tags: Union[List[str], str, None]) -> List[str]:
260 |     """Parse tags from various input formats into a consistent list.
261 | 
262 |     Args:
263 |         tags: Can be a list of strings, a comma-separated string, or None
264 | 
265 |     Returns:
266 |         A list of tag strings, or an empty list if no tags
267 | 
268 |     Note:
269 |         This function strips leading '#' characters from tags to prevent
270 |         their accumulation when tags are processed multiple times.
271 |     """
272 |     if tags is None:
273 |         return []
274 | 
275 |     # Process list of tags
276 |     if isinstance(tags, list):
277 |         # First strip whitespace, then strip leading '#' characters to prevent accumulation
278 |         return [tag.strip().lstrip("#") for tag in tags if tag and tag.strip()]
279 | 
280 |     # Process string input
281 |     if isinstance(tags, str):
282 |         # Check if it's a JSON array string (common issue from AI assistants)
283 |         import json
284 | 
285 |         if tags.strip().startswith("[") and tags.strip().endswith("]"):
286 |             try:
287 |                 # Try to parse as JSON array
288 |                 parsed_json = json.loads(tags)
289 |                 if isinstance(parsed_json, list):
290 |                     # Recursively parse the JSON array as a list
291 |                     return parse_tags(parsed_json)
292 |             except json.JSONDecodeError:
293 |                 # Not valid JSON, fall through to comma-separated parsing
294 |                 pass
295 | 
296 |         # Split by comma, strip whitespace, then strip leading '#' characters
297 |         return [tag.strip().lstrip("#") for tag in tags.split(",") if tag and tag.strip()]
298 | 
299 |     # For any other type, try to convert to string and parse
300 |     try:  # pragma: no cover
301 |         return parse_tags(str(tags))
302 |     except (ValueError, TypeError):  # pragma: no cover
303 |         logger.warning(f"Couldn't parse tags from input of type {type(tags)}: {tags}")
304 |         return []
305 | 
306 | 
307 | def normalize_newlines(multiline: str) -> str:
308 |     """Replace any \r\n, \r, or \n with the native newline.
309 | 
310 |     Args:
311 |         multiline: String containing any mixture of newlines.
312 | 
313 |     Returns:
314 |         A string with normalized newlines native to the platform.
315 |     """
316 |     return re.sub(r"\r\n?|\n", os.linesep, multiline)
317 | 
318 | 
319 | def normalize_file_path_for_comparison(file_path: str) -> str:
320 |     """Normalize a file path for conflict detection.
321 | 
322 |     This function normalizes file paths to help detect potential conflicts:
323 |     - Converts to lowercase for case-insensitive comparison
324 |     - Normalizes Unicode characters
325 |     - Handles path separators consistently
326 | 
327 |     Args:
328 |         file_path: The file path to normalize
329 | 
330 |     Returns:
331 |         Normalized file path for comparison purposes
332 |     """
333 |     import unicodedata
334 | 
335 |     # Convert to lowercase for case-insensitive comparison
336 |     normalized = file_path.lower()
337 | 
338 |     # Normalize Unicode characters (NFD normalization)
339 |     normalized = unicodedata.normalize("NFD", normalized)
340 | 
341 |     # Replace path separators with forward slashes
342 |     normalized = normalized.replace("\\", "/")
343 | 
344 |     # Remove multiple slashes
345 |     normalized = re.sub(r"/+", "/", normalized)
346 | 
347 |     return normalized
348 | 
349 | 
350 | def detect_potential_file_conflicts(file_path: str, existing_paths: List[str]) -> List[str]:
351 |     """Detect potential conflicts between a file path and existing paths.
352 | 
353 |     This function checks for various types of conflicts:
354 |     - Case sensitivity differences
355 |     - Unicode normalization differences
356 |     - Path separator differences
357 |     - Permalink generation conflicts
358 | 
359 |     Args:
360 |         file_path: The file path to check
361 |         existing_paths: List of existing file paths to check against
362 | 
363 |     Returns:
364 |         List of existing paths that might conflict with the given file path
365 |     """
366 |     conflicts = []
367 | 
368 |     # Normalize the input file path
369 |     normalized_input = normalize_file_path_for_comparison(file_path)
370 |     input_permalink = generate_permalink(file_path)
371 | 
372 |     for existing_path in existing_paths:
373 |         # Skip identical paths
374 |         if existing_path == file_path:
375 |             continue
376 | 
377 |         # Check for case-insensitive path conflicts
378 |         normalized_existing = normalize_file_path_for_comparison(existing_path)
379 |         if normalized_input == normalized_existing:
380 |             conflicts.append(existing_path)
381 |             continue
382 | 
383 |         # Check for permalink conflicts
384 |         existing_permalink = generate_permalink(existing_path)
385 |         if input_permalink == existing_permalink:
386 |             conflicts.append(existing_path)
387 |             continue
388 | 
389 |     return conflicts
390 | 
391 | 
392 | def valid_project_path_value(path: str):
393 |     """Ensure project path is valid."""
394 |     # Allow empty strings as they resolve to the project root
395 |     if not path:
396 |         return True
397 | 
398 |     # Check for obvious path traversal patterns first
399 |     if ".." in path or "~" in path:
400 |         return False
401 | 
402 |     # Check for Windows-style path traversal (even on Unix systems)
403 |     if "\\.." in path or path.startswith("\\"):
404 |         return False
405 | 
406 |     # Block absolute paths (Unix-style starting with / or Windows-style with drive letters)
407 |     if path.startswith("/") or (len(path) >= 2 and path[1] == ":"):
408 |         return False
409 | 
410 |     # Block paths with control characters (but allow whitespace that will be stripped)
411 |     if path.strip() and any(ord(c) < 32 and c not in [" ", "\t"] for c in path):
412 |         return False
413 | 
414 |     return True
415 | 
416 | 
417 | def validate_project_path(path: str, project_path: Path) -> bool:
418 |     """Ensure path is valid and stays within project boundaries."""
419 | 
420 |     if not valid_project_path_value(path):
421 |         return False
422 | 
423 |     try:
424 |         resolved = (project_path / path).resolve()
425 |         return resolved.is_relative_to(project_path.resolve())
426 |     except (ValueError, OSError):
427 |         return False
428 | 
429 | 
430 | def ensure_timezone_aware(dt: datetime) -> datetime:
431 |     """Ensure a datetime is timezone-aware using system timezone.
432 | 
433 |     If the datetime is naive, convert it to timezone-aware using the system's local timezone.
434 |     If it's already timezone-aware, return it unchanged.
435 | 
436 |     Args:
437 |         dt: The datetime to ensure is timezone-aware
438 | 
439 |     Returns:
440 |         A timezone-aware datetime
441 |     """
442 |     if dt.tzinfo is None:
443 |         # Naive datetime - assume it's in local time and add timezone
444 |         return dt.astimezone()
445 |     else:
446 |         # Already timezone-aware
447 |         return dt
448 | 
```

--------------------------------------------------------------------------------
/test-int/mcp/test_chatgpt_tools_integration.py:
--------------------------------------------------------------------------------

```python
  1 | """
  2 | Integration tests for ChatGPT-compatible MCP tools.
  3 | 
  4 | Tests the complete flow of search and fetch tools designed for ChatGPT integration,
  5 | ensuring they properly wrap Basic Memory's MCP tools and return OpenAI-compatible
  6 | MCP content array format.
  7 | """
  8 | 
  9 | import json
 10 | import pytest
 11 | from fastmcp import Client
 12 | 
 13 | 
 14 | def extract_mcp_json_content(mcp_result):
 15 |     """
 16 |     Helper to extract JSON content from MCP CallToolResult.
 17 | 
 18 |     FastMCP auto-serializes our List[Dict[str, Any]] return values, so we need to:
 19 |     1. Get the content list from the CallToolResult
 20 |     2. Parse the JSON string in the text field (which is our serialized list)
 21 |     3. Extract the actual JSON from the MCP content array structure
 22 |     """
 23 |     content_list = mcp_result.content
 24 |     mcp_content_list = json.loads(content_list[0].text)
 25 |     return json.loads(mcp_content_list[0]["text"])
 26 | 
 27 | 
 28 | @pytest.mark.asyncio
 29 | async def test_chatgpt_search_basic(mcp_server, app, test_project):
 30 |     """Test basic ChatGPT search functionality with MCP content array format."""
 31 | 
 32 |     async with Client(mcp_server) as client:
 33 |         # Create test notes for searching
 34 |         await client.call_tool(
 35 |             "write_note",
 36 |             {
 37 |                 "project": test_project.name,
 38 |                 "title": "Machine Learning Fundamentals",
 39 |                 "folder": "ai",
 40 |                 "content": (
 41 |                     "# Machine Learning Fundamentals\n\nIntroduction to ML concepts and algorithms."
 42 |                 ),
 43 |                 "tags": "ml,ai,fundamentals",
 44 |             },
 45 |         )
 46 | 
 47 |         await client.call_tool(
 48 |             "write_note",
 49 |             {
 50 |                 "project": test_project.name,
 51 |                 "title": "Deep Learning with PyTorch",
 52 |                 "folder": "ai",
 53 |                 "content": (
 54 |                     "# Deep Learning with PyTorch\n\n"
 55 |                     "Building neural networks using PyTorch framework."
 56 |                 ),
 57 |                 "tags": "pytorch,deep-learning,ai",
 58 |             },
 59 |         )
 60 | 
 61 |         await client.call_tool(
 62 |             "write_note",
 63 |             {
 64 |                 "project": test_project.name,
 65 |                 "title": "Data Visualization Guide",
 66 |                 "folder": "data",
 67 |                 "content": (
 68 |                     "# Data Visualization Guide\n\nCreating charts and graphs for data analysis."
 69 |                 ),
 70 |                 "tags": "visualization,data,charts",
 71 |             },
 72 |         )
 73 | 
 74 |         # Test ChatGPT search tool
 75 |         search_result = await client.call_tool(
 76 |             "search",
 77 |             {
 78 |                 "query": "Machine Learning",
 79 |             },
 80 |         )
 81 | 
 82 |         # Extract JSON content from MCP result
 83 |         results_json = extract_mcp_json_content(search_result)
 84 |         assert "results" in results_json
 85 |         assert len(results_json["results"]) > 0
 86 | 
 87 |         # Check result structure
 88 |         first_result = results_json["results"][0]
 89 |         assert "id" in first_result
 90 |         assert "title" in first_result
 91 |         assert "url" in first_result
 92 | 
 93 |         # Verify correct content found
 94 |         titles = [r["title"] for r in results_json["results"]]
 95 |         assert "Machine Learning Fundamentals" in titles
 96 |         assert "Data Visualization Guide" not in titles
 97 | 
 98 | 
 99 | @pytest.mark.asyncio
100 | async def test_chatgpt_search_empty_results(mcp_server, app, test_project):
101 |     """Test ChatGPT search with no matching results."""
102 | 
103 |     async with Client(mcp_server) as client:
104 |         # Search for non-existent content
105 |         search_result = await client.call_tool(
106 |             "search",
107 |             {
108 |                 "query": "NonExistentTopic12345",
109 |             },
110 |         )
111 | 
112 |         # Extract JSON content from MCP result
113 |         results_json = extract_mcp_json_content(search_result)
114 |         assert "results" in results_json
115 |         assert len(results_json["results"]) == 0
116 |         assert results_json["query"] == "NonExistentTopic12345"
117 | 
118 | 
119 | @pytest.mark.asyncio
120 | async def test_chatgpt_search_with_boolean_operators(mcp_server, app, test_project):
121 |     """Test ChatGPT search with boolean operators."""
122 | 
123 |     async with Client(mcp_server) as client:
124 |         # Create test notes
125 |         await client.call_tool(
126 |             "write_note",
127 |             {
128 |                 "project": test_project.name,
129 |                 "title": "Python Web Frameworks",
130 |                 "folder": "dev",
131 |                 "content": (
132 |                     "# Python Web Frameworks\n\nComparing Django and Flask for web development."
133 |                 ),
134 |                 "tags": "python,web,frameworks",
135 |             },
136 |         )
137 | 
138 |         await client.call_tool(
139 |             "write_note",
140 |             {
141 |                 "project": test_project.name,
142 |                 "title": "JavaScript Frameworks",
143 |                 "folder": "dev",
144 |                 "content": "# JavaScript Frameworks\n\nReact, Vue, and Angular comparison.",
145 |                 "tags": "javascript,web,frameworks",
146 |             },
147 |         )
148 | 
149 |         # Test with AND operator
150 |         search_result = await client.call_tool(
151 |             "search",
152 |             {
153 |                 "query": "Python AND frameworks",
154 |             },
155 |         )
156 | 
157 |         results_json = extract_mcp_json_content(search_result)
158 |         titles = [r["title"] for r in results_json["results"]]
159 |         assert "Python Web Frameworks" in titles
160 |         assert "JavaScript Frameworks" not in titles
161 | 
162 | 
163 | @pytest.mark.asyncio
164 | async def test_chatgpt_fetch_document(mcp_server, app, test_project):
165 |     """Test ChatGPT fetch tool for retrieving full document content."""
166 | 
167 |     async with Client(mcp_server) as client:
168 |         # Create a test note
169 |         note_content = """# Advanced Python Techniques
170 | 
171 | ## Overview
172 | This document covers advanced Python programming techniques.
173 | 
174 | ## Topics Covered
175 | - Decorators
176 | - Context Managers
177 | - Metaclasses
178 | - Async/Await patterns
179 | 
180 | ## Code Examples
181 | ```python
182 | def my_decorator(func):
183 |     def wrapper(*args, **kwargs):
184 |         return func(*args, **kwargs)
185 |     return wrapper
186 | ```
187 | """
188 | 
189 |         await client.call_tool(
190 |             "write_note",
191 |             {
192 |                 "project": test_project.name,
193 |                 "title": "Advanced Python Techniques",
194 |                 "folder": "programming",
195 |                 "content": note_content,
196 |                 "tags": "python,advanced,programming",
197 |             },
198 |         )
199 | 
200 |         # Fetch the document using its title
201 |         fetch_result = await client.call_tool(
202 |             "fetch",
203 |             {
204 |                 "id": "Advanced Python Techniques",
205 |             },
206 |         )
207 | 
208 |         # Extract JSON content from MCP result
209 |         document_json = extract_mcp_json_content(fetch_result)
210 |         assert "id" in document_json
211 |         assert "title" in document_json
212 |         assert "text" in document_json
213 |         assert "url" in document_json
214 |         assert "metadata" in document_json
215 | 
216 |         # Verify content
217 |         assert document_json["title"] == "Advanced Python Techniques"
218 |         assert "Decorators" in document_json["text"]
219 |         assert "Context Managers" in document_json["text"]
220 |         assert "def my_decorator" in document_json["text"]
221 | 
222 | 
223 | @pytest.mark.asyncio
224 | async def test_chatgpt_fetch_by_permalink(mcp_server, app, test_project):
225 |     """Test ChatGPT fetch using permalink identifier."""
226 | 
227 |     async with Client(mcp_server) as client:
228 |         # Create a note with known content
229 |         await client.call_tool(
230 |             "write_note",
231 |             {
232 |                 "project": test_project.name,
233 |                 "title": "Test Document",
234 |                 "folder": "test",
235 |                 "content": "# Test Document\n\nThis is test content for permalink fetching.",
236 |                 "tags": "test",
237 |             },
238 |         )
239 | 
240 |         # First search to get the permalink
241 |         search_result = await client.call_tool(
242 |             "search",
243 |             {
244 |                 "query": "Test Document",
245 |             },
246 |         )
247 | 
248 |         results_json = extract_mcp_json_content(search_result)
249 |         assert len(results_json["results"]) > 0
250 |         permalink = results_json["results"][0]["id"]
251 | 
252 |         # Fetch using the permalink
253 |         fetch_result = await client.call_tool(
254 |             "fetch",
255 |             {
256 |                 "id": permalink,
257 |             },
258 |         )
259 | 
260 |         # Verify the fetched document
261 |         document_json = extract_mcp_json_content(fetch_result)
262 |         assert document_json["id"] == permalink
263 |         assert "Test Document" in document_json["title"]
264 |         assert "test content for permalink fetching" in document_json["text"]
265 | 
266 | 
267 | @pytest.mark.asyncio
268 | async def test_chatgpt_fetch_nonexistent_document(mcp_server, app, test_project):
269 |     """Test ChatGPT fetch with non-existent document ID."""
270 | 
271 |     async with Client(mcp_server) as client:
272 |         # Try to fetch a non-existent document
273 |         fetch_result = await client.call_tool(
274 |             "fetch",
275 |             {
276 |                 "id": "NonExistentDocument12345",
277 |             },
278 |         )
279 | 
280 |         # Extract JSON content from MCP result
281 |         document_json = extract_mcp_json_content(fetch_result)
282 | 
283 |         # Should have document structure even for errors
284 |         assert "id" in document_json
285 |         assert "title" in document_json
286 |         assert "text" in document_json
287 | 
288 |         # Check for error indication
289 |         assert document_json["id"] == "NonExistentDocument12345"
290 |         assert "Not Found" in document_json["text"] or "not found" in document_json["text"]
291 | 
292 | 
293 | @pytest.mark.asyncio
294 | async def test_chatgpt_fetch_with_empty_title(mcp_server, app, test_project):
295 |     """Test ChatGPT fetch handles documents with empty or missing titles."""
296 | 
297 |     async with Client(mcp_server) as client:
298 |         # Create a note without a title in the content
299 |         await client.call_tool(
300 |             "write_note",
301 |             {
302 |                 "project": test_project.name,
303 |                 "title": "untitled-note",
304 |                 "folder": "misc",
305 |                 "content": "This is content without a markdown header.\n\nJust plain text.",
306 |                 "tags": "misc",
307 |             },
308 |         )
309 | 
310 |         # Fetch the document
311 |         fetch_result = await client.call_tool(
312 |             "fetch",
313 |             {
314 |                 "id": "untitled-note",
315 |             },
316 |         )
317 | 
318 |         # Parse JSON response
319 |         document_json = extract_mcp_json_content(fetch_result)
320 | 
321 |         # Should have a title even if content doesn't have one
322 |         assert "title" in document_json
323 |         assert document_json["title"] != ""
324 |         assert document_json["title"] is not None
325 |         assert "content without a markdown header" in document_json["text"]
326 | 
327 | 
328 | @pytest.mark.asyncio
329 | async def test_chatgpt_search_pagination_default(mcp_server, app, test_project):
330 |     """Test that ChatGPT search uses reasonable pagination defaults."""
331 | 
332 |     async with Client(mcp_server) as client:
333 |         # Create more than 10 notes to test pagination
334 |         for i in range(15):
335 |             await client.call_tool(
336 |                 "write_note",
337 |                 {
338 |                     "project": test_project.name,
339 |                     "title": f"Test Note {i}",
340 |                     "folder": "bulk",
341 |                     "content": f"# Test Note {i}\n\nThis is test content number {i}.",
342 |                     "tags": "test,bulk",
343 |                 },
344 |             )
345 | 
346 |         # Search should return max 10 results by default
347 |         search_result = await client.call_tool(
348 |             "search",
349 |             {
350 |                 "query": "Test Note",
351 |             },
352 |         )
353 | 
354 |         results_json = extract_mcp_json_content(search_result)
355 | 
356 |         # Should have at most 10 results (the default page_size)
357 |         assert len(results_json["results"]) <= 10
358 |         assert results_json["total_count"] <= 10
359 | 
360 | 
361 | @pytest.mark.asyncio
362 | async def test_chatgpt_tools_error_handling(mcp_server, app, test_project):
363 |     """Test error handling in ChatGPT tools returns proper MCP format."""
364 | 
365 |     async with Client(mcp_server) as client:
366 |         # Test search with invalid query (if validation exists)
367 |         # Using empty query to potentially trigger an error
368 |         search_result = await client.call_tool(
369 |             "search",
370 |             {
371 |                 "query": "",  # Empty query might cause an error
372 |             },
373 |         )
374 | 
375 |         # Should still return MCP content array format
376 |         assert hasattr(search_result, "content")
377 |         content_list = search_result.content
378 |         assert isinstance(content_list, list)
379 |         assert len(content_list) == 1
380 |         assert content_list[0].type == "text"
381 | 
382 |         # Should be valid JSON even on error
383 |         results_json = extract_mcp_json_content(search_result)
384 |         assert "results" in results_json  # Should have results key even if empty
385 | 
386 | 
387 | @pytest.mark.asyncio
388 | async def test_chatgpt_integration_workflow(mcp_server, app, test_project):
389 |     """Test complete workflow: search then fetch, as ChatGPT would use it."""
390 | 
391 |     async with Client(mcp_server) as client:
392 |         # Step 1: Create multiple documents
393 |         docs = [
394 |             {
395 |                 "title": "API Design Best Practices",
396 |                 "content": (
397 |                     "# API Design Best Practices\n\nRESTful API design principles and patterns."
398 |                 ),
399 |                 "tags": "api,rest,design",
400 |             },
401 |             {
402 |                 "title": "GraphQL vs REST",
403 |                 "content": "# GraphQL vs REST\n\nComparing GraphQL and REST API architectures.",
404 |                 "tags": "api,graphql,rest",
405 |             },
406 |             {
407 |                 "title": "Database Design Patterns",
408 |                 "content": (
409 |                     "# Database Design Patterns\n\n"
410 |                     "Common database design patterns and anti-patterns."
411 |                 ),
412 |                 "tags": "database,design,patterns",
413 |             },
414 |         ]
415 | 
416 |         for doc in docs:
417 |             await client.call_tool(
418 |                 "write_note",
419 |                 {
420 |                     "project": test_project.name,
421 |                     "title": doc["title"],
422 |                     "folder": "architecture",
423 |                     "content": doc["content"],
424 |                     "tags": doc["tags"],
425 |                 },
426 |             )
427 | 
428 |         # Step 2: Search for API-related content (as ChatGPT would)
429 |         search_result = await client.call_tool(
430 |             "search",
431 |             {
432 |                 "query": "API",
433 |             },
434 |         )
435 | 
436 |         results_json = extract_mcp_json_content(search_result)
437 |         assert len(results_json["results"]) >= 2
438 | 
439 |         # Step 3: Fetch one of the search results (as ChatGPT would)
440 |         first_result_id = results_json["results"][0]["id"]
441 |         fetch_result = await client.call_tool(
442 |             "fetch",
443 |             {
444 |                 "id": first_result_id,
445 |             },
446 |         )
447 | 
448 |         document_json = extract_mcp_json_content(fetch_result)
449 | 
450 |         # Verify the fetched document matches search result
451 |         assert document_json["id"] == first_result_id
452 |         assert "API" in document_json["text"] or "api" in document_json["text"].lower()
453 | 
454 |         # Verify document has expected structure
455 |         assert document_json["metadata"]["format"] == "markdown"
456 | 
```

--------------------------------------------------------------------------------
/tests/cli/test_cli_tools.py:
--------------------------------------------------------------------------------

```python
  1 | """Tests for the Basic Memory CLI tools.
  2 | 
  3 | These tests use real MCP tools with the test environment instead of mocks.
  4 | """
  5 | 
  6 | # Import for testing
  7 | 
  8 | import io
  9 | from datetime import datetime, timedelta
 10 | import json
 11 | from textwrap import dedent
 12 | from typing import AsyncGenerator
 13 | from unittest.mock import patch
 14 | 
 15 | import pytest_asyncio
 16 | from typer.testing import CliRunner
 17 | 
 18 | from basic_memory.cli.commands.tool import tool_app
 19 | from basic_memory.schemas.base import Entity as EntitySchema
 20 | 
 21 | runner = CliRunner()
 22 | 
 23 | 
 24 | @pytest_asyncio.fixture
 25 | async def setup_test_note(entity_service, search_service) -> AsyncGenerator[dict, None]:
 26 |     """Create a test note for CLI tests."""
 27 |     note_content = dedent("""
 28 |         # Test Note
 29 |         
 30 |         This is a test note for CLI commands.
 31 |         
 32 |         ## Observations
 33 |         - [tech] Test observation #test
 34 |         - [note] Another observation
 35 |         
 36 |         ## Relations
 37 |         - connects_to [[Another Note]]
 38 |     """)
 39 | 
 40 |     entity, created = await entity_service.create_or_update_entity(
 41 |         EntitySchema(
 42 |             title="Test Note",
 43 |             folder="test",
 44 |             entity_type="note",
 45 |             content=note_content,
 46 |         )
 47 |     )
 48 | 
 49 |     # Index the entity for search
 50 |     await search_service.index_entity(entity)
 51 | 
 52 |     yield {
 53 |         "title": entity.title,
 54 |         "permalink": entity.permalink,
 55 |         "content": note_content,
 56 |     }
 57 | 
 58 | 
 59 | def test_write_note(cli_env, project_config, test_project):
 60 |     """Test write_note command with basic arguments."""
 61 |     result = runner.invoke(
 62 |         tool_app,
 63 |         [
 64 |             "write-note",
 65 |             "--title",
 66 |             "CLI Test Note",
 67 |             "--content",
 68 |             "This is a CLI test note",
 69 |             "--folder",
 70 |             "test",
 71 |             "--project",
 72 |             test_project.name,
 73 |         ],
 74 |     )
 75 |     assert result.exit_code == 0
 76 | 
 77 |     # Check for expected success message
 78 |     assert "CLI Test Note" in result.stdout
 79 |     assert "Created" in result.stdout or "Updated" in result.stdout
 80 |     assert "permalink" in result.stdout
 81 | 
 82 | 
 83 | def test_write_note_with_project_arg(cli_env, project_config, test_project):
 84 |     """Test write_note command with basic arguments."""
 85 |     result = runner.invoke(
 86 |         tool_app,
 87 |         [
 88 |             "write-note",
 89 |             "--project",
 90 |             test_project.name,
 91 |             "--title",
 92 |             "CLI Test Note",
 93 |             "--content",
 94 |             "This is a CLI test note",
 95 |             "--folder",
 96 |             "test",
 97 |         ],
 98 |     )
 99 |     assert result.exit_code == 0
100 | 
101 |     # Check for expected success message
102 |     assert "CLI Test Note" in result.stdout
103 |     assert "Created" in result.stdout or "Updated" in result.stdout
104 |     assert "permalink" in result.stdout
105 | 
106 | 
107 | def test_write_note_with_tags(cli_env, project_config):
108 |     """Test write_note command with tags."""
109 |     result = runner.invoke(
110 |         tool_app,
111 |         [
112 |             "write-note",
113 |             "--title",
114 |             "Tagged CLI Test Note",
115 |             "--content",
116 |             "This is a test note with tags",
117 |             "--folder",
118 |             "test",
119 |             "--tags",
120 |             "tag1",
121 |             "--tags",
122 |             "tag2",
123 |         ],
124 |     )
125 |     assert result.exit_code == 0
126 | 
127 |     # Check for expected success message
128 |     assert "Tagged CLI Test Note" in result.stdout
129 |     assert "tag1, tag2" in result.stdout or "tag1" in result.stdout and "tag2" in result.stdout
130 | 
131 | 
132 | def test_write_note_from_stdin(cli_env, project_config, monkeypatch):
133 |     """Test write_note command reading from stdin.
134 | 
135 |     This test requires minimal mocking of stdin to simulate piped input.
136 |     """
137 |     test_content = "This is content from stdin for testing"
138 | 
139 |     # Mock stdin using monkeypatch, which works better with typer's CliRunner
140 |     monkeypatch.setattr("sys.stdin", io.StringIO(test_content))
141 |     monkeypatch.setattr("sys.stdin.isatty", lambda: False)  # Simulate piped input
142 | 
143 |     # Use runner.invoke with input parameter as a fallback
144 |     result = runner.invoke(
145 |         tool_app,
146 |         [
147 |             "write-note",
148 |             "--title",
149 |             "Stdin Test Note",
150 |             "--folder",
151 |             "test",
152 |         ],
153 |         input=test_content,  # Provide input as a fallback
154 |     )
155 | 
156 |     assert result.exit_code == 0
157 | 
158 |     # Check for expected success message
159 |     assert "Stdin Test Note" in result.stdout
160 |     assert "Created" in result.stdout or "Updated" in result.stdout
161 |     assert "permalink" in result.stdout
162 | 
163 | 
164 | def test_write_note_content_param_priority(cli_env, project_config):
165 |     """Test that content parameter has priority over stdin."""
166 |     stdin_content = "This content from stdin should NOT be used"
167 |     param_content = "This explicit content parameter should be used"
168 | 
169 |     # Mock stdin but provide explicit content parameter
170 |     with (
171 |         patch("sys.stdin", io.StringIO(stdin_content)),
172 |         patch("sys.stdin.isatty", return_value=False),
173 |     ):  # Simulate piped input
174 |         result = runner.invoke(
175 |             tool_app,
176 |             [
177 |                 "write-note",
178 |                 "--title",
179 |                 "Priority Test Note",
180 |                 "--content",
181 |                 param_content,
182 |                 "--folder",
183 |                 "test",
184 |             ],
185 |         )
186 | 
187 |         assert result.exit_code == 0
188 | 
189 |         # Check the note was created with the content from parameter, not stdin
190 |         # We can't directly check file contents in this test approach
191 |         # but we can verify the command succeeded
192 |         assert "Priority Test Note" in result.stdout
193 |         assert "Created" in result.stdout or "Updated" in result.stdout
194 | 
195 | 
196 | def test_write_note_no_content(cli_env, project_config):
197 |     """Test error handling when no content is provided."""
198 |     # Mock stdin to appear as a terminal, not a pipe
199 |     with patch("sys.stdin.isatty", return_value=True):
200 |         result = runner.invoke(
201 |             tool_app,
202 |             [
203 |                 "write-note",
204 |                 "--title",
205 |                 "No Content Note",
206 |                 "--folder",
207 |                 "test",
208 |             ],
209 |         )
210 | 
211 |         # Should exit with an error
212 |         assert result.exit_code == 1
213 |         # assert "No content provided" in result.stderr
214 | 
215 | 
216 | def test_read_note(cli_env, setup_test_note):
217 |     """Test read_note command."""
218 |     permalink = setup_test_note["permalink"]
219 | 
220 |     result = runner.invoke(
221 |         tool_app,
222 |         ["read-note", permalink],
223 |     )
224 |     assert result.exit_code == 0
225 | 
226 |     # Should contain the note content and structure
227 |     assert "Test Note" in result.stdout
228 |     assert "This is a test note for CLI commands" in result.stdout
229 |     assert "## Observations" in result.stdout
230 |     assert "Test observation" in result.stdout
231 |     assert "## Relations" in result.stdout
232 |     assert "connects_to [[Another Note]]" in result.stdout
233 | 
234 |     # Note: We found that square brackets like [tech] are being stripped in CLI output,
235 |     # so we're not asserting their presence
236 | 
237 | 
238 | def test_search_basic(cli_env, setup_test_note, test_project):
239 |     """Test basic search command."""
240 |     result = runner.invoke(
241 |         tool_app,
242 |         ["search-notes", "test observation", "--project", test_project.name],
243 |     )
244 |     assert result.exit_code == 0
245 | 
246 |     # Result should be JSON containing our test note
247 |     search_result = json.loads(result.stdout)
248 |     assert len(search_result["results"]) > 0
249 | 
250 |     # At least one result should match our test note or observation
251 |     found = False
252 |     for item in search_result["results"]:
253 |         if "test" in item["permalink"].lower() and "observation" in item["permalink"].lower():
254 |             found = True
255 |             break
256 | 
257 |     assert found, "Search did not find the test observation"
258 | 
259 | 
260 | def test_search_permalink(cli_env, setup_test_note):
261 |     """Test search with permalink flag."""
262 |     permalink = setup_test_note["permalink"]
263 | 
264 |     result = runner.invoke(
265 |         tool_app,
266 |         ["search-notes", permalink, "--permalink"],
267 |     )
268 |     assert result.exit_code == 0
269 | 
270 |     # Result should be JSON containing our test note
271 |     search_result = json.loads(result.stdout)
272 |     assert len(search_result["results"]) > 0
273 | 
274 |     # Should find a result with matching permalink
275 |     found = False
276 |     for item in search_result["results"]:
277 |         if item["permalink"] == permalink:
278 |             found = True
279 |             break
280 | 
281 |     assert found, "Search did not find the note by permalink"
282 | 
283 | 
284 | def test_build_context(cli_env, setup_test_note):
285 |     """Test build_context command."""
286 |     permalink = setup_test_note["permalink"]
287 | 
288 |     result = runner.invoke(
289 |         tool_app,
290 |         ["build-context", f"memory://{permalink}"],
291 |     )
292 |     assert result.exit_code == 0
293 | 
294 |     # Result should be JSON containing our test note
295 |     context_result = json.loads(result.stdout)
296 |     assert "results" in context_result
297 |     assert len(context_result["results"]) > 0
298 | 
299 |     # Primary results should include our test note
300 |     found = False
301 |     for item in context_result["results"]:
302 |         if item["primary_result"]["permalink"] == permalink:
303 |             found = True
304 |             break
305 | 
306 |     assert found, "Context did not include the test note"
307 | 
308 | 
309 | def test_build_context_with_options(cli_env, setup_test_note):
310 |     """Test build_context command with all options."""
311 |     permalink = setup_test_note["permalink"]
312 | 
313 |     result = runner.invoke(
314 |         tool_app,
315 |         [
316 |             "build-context",
317 |             f"memory://{permalink}",
318 |             "--depth",
319 |             "2",
320 |             "--timeframe",
321 |             "1d",
322 |             "--page",
323 |             "1",
324 |             "--page-size",
325 |             "5",
326 |             "--max-related",
327 |             "20",
328 |         ],
329 |     )
330 |     assert result.exit_code == 0
331 | 
332 |     # Result should be JSON containing our test note
333 |     context_result = json.loads(result.stdout)
334 | 
335 |     # Check that metadata reflects our options
336 |     assert context_result["metadata"]["depth"] == 2
337 |     timeframe = datetime.fromisoformat(context_result["metadata"]["timeframe"])
338 |     assert datetime.now().astimezone() - timeframe <= timedelta(
339 |         days=2
340 |     )  # Compare timezone-aware datetimes
341 | 
342 |     # Results should include our test note
343 |     found = False
344 |     for item in context_result["results"]:
345 |         if item["primary_result"]["permalink"] == permalink:
346 |             found = True
347 |             break
348 | 
349 |     assert found, "Context did not include the test note"
350 | 
351 | 
352 | def test_build_context_string_depth_parameter(cli_env, setup_test_note):
353 |     """Test build_context command handles string depth parameter correctly."""
354 |     permalink = setup_test_note["permalink"]
355 | 
356 |     # Test valid string depth parameter - Typer should convert it to int
357 |     result = runner.invoke(
358 |         tool_app,
359 |         [
360 |             "build-context",
361 |             f"memory://{permalink}",
362 |             "--depth",
363 |             "2",  # This is always a string from CLI
364 |         ],
365 |     )
366 |     assert result.exit_code == 0
367 | 
368 |     # Result should be JSON containing our test note with correct depth
369 |     context_result = json.loads(result.stdout)
370 |     assert context_result["metadata"]["depth"] == 2
371 | 
372 |     # Test invalid string depth parameter - should fail with Typer validation error
373 |     result = runner.invoke(
374 |         tool_app,
375 |         [
376 |             "build-context",
377 |             f"memory://{permalink}",
378 |             "--depth",
379 |             "invalid",
380 |         ],
381 |     )
382 |     assert result.exit_code == 2  # Typer exits with code 2 for parameter validation errors
383 |     # Typer should show a usage error for invalid integer
384 |     assert (
385 |         "invalid" in result.stderr
386 |         and "is not a valid" in result.stderr
387 |         and "integer" in result.stderr
388 |     )
389 | 
390 | 
391 | # The get-entity CLI command was removed when tools were refactored
392 | # into separate files with improved error handling
393 | 
394 | 
395 | def test_recent_activity(cli_env, setup_test_note, test_project):
396 |     """Test recent_activity command with defaults."""
397 |     result = runner.invoke(
398 |         tool_app,
399 |         ["recent-activity"],
400 |     )
401 |     assert result.exit_code == 0
402 | 
403 |     # Result should be human-readable string containing recent activity
404 |     output = result.stdout
405 |     assert "Recent Activity Summary" in output
406 |     assert "Most Active Project:" in output or "Other Active Projects:" in output
407 | 
408 |     # Our test note should be referenced in the output
409 |     assert setup_test_note["permalink"] in output or setup_test_note["title"] in output
410 | 
411 | 
412 | def test_recent_activity_with_options(cli_env, setup_test_note, test_project):
413 |     """Test recent_activity command with options."""
414 |     result = runner.invoke(
415 |         tool_app,
416 |         [
417 |             "recent-activity",
418 |             "--type",
419 |             "entity",
420 |             "--depth",
421 |             "2",
422 |             "--timeframe",
423 |             "7d",
424 |         ],
425 |     )
426 |     assert result.exit_code == 0
427 | 
428 |     # Result should be human-readable string containing recent activity
429 |     output = result.stdout
430 |     assert "Recent Activity Summary" in output
431 |     assert "Most Active Project:" in output or "Other Active Projects:" in output
432 | 
433 |     # Should include information about entities since we requested entity type
434 |     assert setup_test_note["permalink"] in output or setup_test_note["title"] in output
435 | 
436 | 
437 | def test_continue_conversation(cli_env, setup_test_note):
438 |     """Test continue_conversation command."""
439 |     permalink = setup_test_note["permalink"]
440 | 
441 |     # Run the CLI command
442 |     result = runner.invoke(
443 |         tool_app,
444 |         ["continue-conversation", "--topic", "Test Note"],
445 |     )
446 |     assert result.exit_code == 0
447 | 
448 |     # Check result contains expected content
449 |     assert "Continuing conversation on: Test Note" in result.stdout
450 |     assert "This is a memory retrieval session" in result.stdout
451 |     assert "read_note" in result.stdout
452 |     assert permalink in result.stdout
453 | 
454 | 
455 | def test_continue_conversation_no_results(cli_env):
456 |     """Test continue_conversation command with no results."""
457 |     # Run the CLI command with a nonexistent topic
458 |     result = runner.invoke(
459 |         tool_app,
460 |         ["continue-conversation", "--topic", "NonexistentTopic"],
461 |     )
462 |     assert result.exit_code == 0
463 | 
464 |     # Check result contains expected content for no results
465 |     assert "Continuing conversation on: NonexistentTopic" in result.stdout
466 |     assert "The supplied query did not return any information" in result.stdout
467 | 
468 | 
469 | @patch("basic_memory.services.initialization.initialize_database")
470 | def test_ensure_migrations_functionality(mock_initialize_database, app_config, monkeypatch):
471 |     """Test the database initialization functionality."""
472 |     from basic_memory.services.initialization import ensure_initialization
473 | 
474 |     # Call the function
475 |     ensure_initialization(app_config)
476 | 
477 |     # The underlying asyncio.run should call our mocked function
478 |     mock_initialize_database.assert_called_once()
479 | 
480 | 
481 | @patch("basic_memory.services.initialization.initialize_database")
482 | def test_ensure_migrations_handles_errors(mock_initialize_database, app_config, monkeypatch):
483 |     """Test that initialization handles errors gracefully."""
484 |     from basic_memory.services.initialization import ensure_initialization
485 | 
486 |     # Configure mock to raise an exception
487 |     mock_initialize_database.side_effect = Exception("Test error")
488 | 
489 |     # Call the function - should not raise exception
490 |     ensure_initialization(app_config)
491 | 
492 |     # We're just making sure it doesn't crash by calling it
493 | 
```

--------------------------------------------------------------------------------
/tests/conftest.py:
--------------------------------------------------------------------------------

```python
  1 | """Common test fixtures."""
  2 | 
  3 | from dataclasses import dataclass
  4 | from datetime import datetime, timezone
  5 | from pathlib import Path
  6 | from textwrap import dedent
  7 | from typing import AsyncGenerator
  8 | 
  9 | import os
 10 | import pytest
 11 | import pytest_asyncio
 12 | from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker
 13 | 
 14 | from basic_memory import db
 15 | from basic_memory.config import ProjectConfig, BasicMemoryConfig, ConfigManager
 16 | from basic_memory.db import DatabaseType
 17 | from basic_memory.markdown import EntityParser
 18 | from basic_memory.markdown.markdown_processor import MarkdownProcessor
 19 | from basic_memory.models import Base
 20 | from basic_memory.models.knowledge import Entity
 21 | from basic_memory.models.project import Project
 22 | from basic_memory.repository.entity_repository import EntityRepository
 23 | from basic_memory.repository.observation_repository import ObservationRepository
 24 | from basic_memory.repository.project_repository import ProjectRepository
 25 | from basic_memory.repository.relation_repository import RelationRepository
 26 | from basic_memory.repository.search_repository import SearchRepository
 27 | from basic_memory.schemas.base import Entity as EntitySchema
 28 | from basic_memory.services import (
 29 |     EntityService,
 30 |     ProjectService,
 31 | )
 32 | from basic_memory.services.directory_service import DirectoryService
 33 | from basic_memory.services.file_service import FileService
 34 | from basic_memory.services.link_resolver import LinkResolver
 35 | from basic_memory.services.search_service import SearchService
 36 | from basic_memory.sync.sync_service import SyncService
 37 | from basic_memory.sync.watch_service import WatchService
 38 | 
 39 | 
 40 | @pytest.fixture
 41 | def anyio_backend():
 42 |     return "asyncio"
 43 | 
 44 | 
 45 | @pytest.fixture
 46 | def project_root() -> Path:
 47 |     return Path(__file__).parent.parent
 48 | 
 49 | 
 50 | @pytest.fixture
 51 | def config_home(tmp_path, monkeypatch) -> Path:
 52 |     # Patch HOME environment variable for the duration of the test
 53 |     monkeypatch.setenv("HOME", str(tmp_path))
 54 |     # On Windows, also set USERPROFILE
 55 |     if os.name == "nt":
 56 |         monkeypatch.setenv("USERPROFILE", str(tmp_path))
 57 |     # Set BASIC_MEMORY_HOME to the test directory
 58 |     monkeypatch.setenv("BASIC_MEMORY_HOME", str(tmp_path / "basic-memory"))
 59 |     return tmp_path
 60 | 
 61 | 
 62 | @pytest.fixture(scope="function", autouse=True)
 63 | def app_config(config_home, tmp_path, monkeypatch) -> BasicMemoryConfig:
 64 |     """Create test app configuration."""
 65 |     # Create a basic config without depending on test_project to avoid circular dependency
 66 |     projects = {"test-project": str(config_home)}
 67 |     app_config = BasicMemoryConfig(
 68 |         env="test",
 69 |         projects=projects,
 70 |         default_project="test-project",
 71 |         update_permalinks_on_move=True,
 72 |     )
 73 | 
 74 |     return app_config
 75 | 
 76 | 
 77 | @pytest.fixture(autouse=True)
 78 | def config_manager(
 79 |     app_config: BasicMemoryConfig, project_config: ProjectConfig, config_home: Path, monkeypatch
 80 | ) -> ConfigManager:
 81 |     # Invalidate config cache to ensure clean state for each test
 82 |     from basic_memory import config as config_module
 83 | 
 84 |     config_module._CONFIG_CACHE = None
 85 | 
 86 |     # Create a new ConfigManager that uses the test home directory
 87 |     config_manager = ConfigManager()
 88 |     # Update its paths to use the test directory
 89 |     config_manager.config_dir = config_home / ".basic-memory"
 90 |     config_manager.config_file = config_manager.config_dir / "config.json"
 91 |     config_manager.config_dir.mkdir(parents=True, exist_ok=True)
 92 | 
 93 |     # Ensure the config file is written to disk
 94 |     config_manager.save_config(app_config)
 95 |     return config_manager
 96 | 
 97 | 
 98 | @pytest.fixture(scope="function", autouse=True)
 99 | def project_config(test_project):
100 |     """Create test project configuration."""
101 | 
102 |     project_config = ProjectConfig(
103 |         name=test_project.name,
104 |         home=Path(test_project.path),
105 |     )
106 | 
107 |     return project_config
108 | 
109 | 
110 | @dataclass
111 | class TestConfig:
112 |     config_home: Path
113 |     project_config: ProjectConfig
114 |     app_config: BasicMemoryConfig
115 |     config_manager: ConfigManager
116 | 
117 | 
118 | @pytest.fixture
119 | def test_config(config_home, project_config, app_config, config_manager) -> TestConfig:
120 |     """All test configuration fixtures"""
121 |     return TestConfig(config_home, project_config, app_config, config_manager)
122 | 
123 | 
124 | @pytest_asyncio.fixture(scope="function")
125 | async def engine_factory(
126 |     app_config,
127 | ) -> AsyncGenerator[tuple[AsyncEngine, async_sessionmaker[AsyncSession]], None]:
128 |     """Create an engine and session factory using an in-memory SQLite database."""
129 |     async with db.engine_session_factory(
130 |         db_path=app_config.database_path, db_type=DatabaseType.MEMORY
131 |     ) as (engine, session_maker):
132 |         # Create all tables for the DB the engine is connected to
133 |         async with engine.begin() as conn:
134 |             await conn.run_sync(Base.metadata.create_all)
135 | 
136 |         yield engine, session_maker
137 | 
138 | 
139 | @pytest_asyncio.fixture
140 | async def session_maker(engine_factory) -> async_sessionmaker[AsyncSession]:
141 |     """Get session maker for tests."""
142 |     _, session_maker = engine_factory
143 |     return session_maker
144 | 
145 | 
146 | ## Repositories
147 | 
148 | 
149 | @pytest_asyncio.fixture(scope="function")
150 | async def entity_repository(
151 |     session_maker: async_sessionmaker[AsyncSession], test_project: Project
152 | ) -> EntityRepository:
153 |     """Create an EntityRepository instance with project context."""
154 |     return EntityRepository(session_maker, project_id=test_project.id)
155 | 
156 | 
157 | @pytest_asyncio.fixture(scope="function")
158 | async def observation_repository(
159 |     session_maker: async_sessionmaker[AsyncSession], test_project: Project
160 | ) -> ObservationRepository:
161 |     """Create an ObservationRepository instance with project context."""
162 |     return ObservationRepository(session_maker, project_id=test_project.id)
163 | 
164 | 
165 | @pytest_asyncio.fixture(scope="function")
166 | async def relation_repository(
167 |     session_maker: async_sessionmaker[AsyncSession], test_project: Project
168 | ) -> RelationRepository:
169 |     """Create a RelationRepository instance with project context."""
170 |     return RelationRepository(session_maker, project_id=test_project.id)
171 | 
172 | 
173 | @pytest_asyncio.fixture(scope="function")
174 | async def project_repository(
175 |     session_maker: async_sessionmaker[AsyncSession],
176 | ) -> ProjectRepository:
177 |     """Create a ProjectRepository instance."""
178 |     return ProjectRepository(session_maker)
179 | 
180 | 
181 | @pytest_asyncio.fixture(scope="function")
182 | async def test_project(config_home, engine_factory) -> Project:
183 |     """Create a test project to be used as context for other repositories."""
184 |     project_data = {
185 |         "name": "test-project",
186 |         "description": "Project used as context for tests",
187 |         "path": str(config_home),
188 |         "is_active": True,
189 |         "is_default": True,  # Explicitly set as the default project (for cli operations)
190 |     }
191 |     engine, session_maker = engine_factory
192 |     project_repository = ProjectRepository(session_maker)
193 |     project = await project_repository.create(project_data)
194 |     return project
195 | 
196 | 
197 | ## Services
198 | 
199 | 
200 | @pytest_asyncio.fixture
201 | async def entity_service(
202 |     entity_repository: EntityRepository,
203 |     observation_repository: ObservationRepository,
204 |     relation_repository: RelationRepository,
205 |     entity_parser: EntityParser,
206 |     file_service: FileService,
207 |     link_resolver: LinkResolver,
208 |     app_config: BasicMemoryConfig,
209 | ) -> EntityService:
210 |     """Create EntityService."""
211 |     return EntityService(
212 |         entity_parser=entity_parser,
213 |         entity_repository=entity_repository,
214 |         observation_repository=observation_repository,
215 |         relation_repository=relation_repository,
216 |         file_service=file_service,
217 |         link_resolver=link_resolver,
218 |         app_config=app_config,
219 |     )
220 | 
221 | 
222 | @pytest.fixture
223 | def file_service(
224 |     project_config: ProjectConfig, markdown_processor: MarkdownProcessor
225 | ) -> FileService:
226 |     """Create FileService instance."""
227 |     return FileService(project_config.home, markdown_processor)
228 | 
229 | 
230 | @pytest.fixture
231 | def markdown_processor(entity_parser: EntityParser) -> MarkdownProcessor:
232 |     """Create writer instance."""
233 |     return MarkdownProcessor(entity_parser)
234 | 
235 | 
236 | @pytest.fixture
237 | def link_resolver(entity_repository: EntityRepository, search_service: SearchService):
238 |     """Create parser instance."""
239 |     return LinkResolver(entity_repository, search_service)
240 | 
241 | 
242 | @pytest.fixture
243 | def entity_parser(project_config):
244 |     """Create parser instance."""
245 |     return EntityParser(project_config.home)
246 | 
247 | 
248 | @pytest_asyncio.fixture
249 | async def sync_service(
250 |     app_config: BasicMemoryConfig,
251 |     entity_service: EntityService,
252 |     entity_parser: EntityParser,
253 |     project_repository: ProjectRepository,
254 |     entity_repository: EntityRepository,
255 |     relation_repository: RelationRepository,
256 |     search_service: SearchService,
257 |     file_service: FileService,
258 | ) -> SyncService:
259 |     """Create sync service for testing."""
260 |     return SyncService(
261 |         app_config=app_config,
262 |         entity_service=entity_service,
263 |         project_repository=project_repository,
264 |         entity_repository=entity_repository,
265 |         relation_repository=relation_repository,
266 |         entity_parser=entity_parser,
267 |         search_service=search_service,
268 |         file_service=file_service,
269 |     )
270 | 
271 | 
272 | @pytest_asyncio.fixture
273 | async def directory_service(entity_repository, project_config) -> DirectoryService:
274 |     """Create directory service for testing."""
275 |     return DirectoryService(
276 |         entity_repository=entity_repository,
277 |     )
278 | 
279 | 
280 | @pytest_asyncio.fixture
281 | async def search_repository(session_maker, test_project: Project):
282 |     """Create SearchRepository instance with project context"""
283 |     return SearchRepository(session_maker, project_id=test_project.id)
284 | 
285 | 
286 | @pytest_asyncio.fixture(autouse=True)
287 | async def init_search_index(search_service):
288 |     await search_service.init_search_index()
289 | 
290 | 
291 | @pytest_asyncio.fixture
292 | async def search_service(
293 |     search_repository: SearchRepository,
294 |     entity_repository: EntityRepository,
295 |     file_service: FileService,
296 | ) -> SearchService:
297 |     """Create and initialize search service"""
298 |     service = SearchService(search_repository, entity_repository, file_service)
299 |     await service.init_search_index()
300 |     return service
301 | 
302 | 
303 | @pytest_asyncio.fixture(scope="function")
304 | async def sample_entity(entity_repository: EntityRepository) -> Entity:
305 |     """Create a sample entity for testing."""
306 |     entity_data = {
307 |         "project_id": entity_repository.project_id,
308 |         "title": "Test Entity",
309 |         "entity_type": "test",
310 |         "permalink": "test/test-entity",
311 |         "file_path": "test/test_entity.md",
312 |         "content_type": "text/markdown",
313 |         "created_at": datetime.now(timezone.utc),
314 |         "updated_at": datetime.now(timezone.utc),
315 |     }
316 |     return await entity_repository.create(entity_data)
317 | 
318 | 
319 | @pytest_asyncio.fixture
320 | async def project_service(
321 |     project_repository: ProjectRepository,
322 | ) -> ProjectService:
323 |     """Create ProjectService with repository."""
324 |     return ProjectService(repository=project_repository)
325 | 
326 | 
327 | @pytest_asyncio.fixture
328 | async def full_entity(sample_entity, entity_repository, file_service, entity_service) -> Entity:
329 |     """Create a search test entity."""
330 | 
331 |     # Create test entity
332 |     entity, created = await entity_service.create_or_update_entity(
333 |         EntitySchema(
334 |             title="Search_Entity",
335 |             folder="test",
336 |             entity_type="test",
337 |             content=dedent("""
338 |                 ## Observations
339 |                 - [tech] Tech note
340 |                 - [design] Design note
341 | 
342 |                 ## Relations
343 |                 - out1 [[Test Entity]]
344 |                 - out2 [[Test Entity]]
345 |                 """),
346 |         )
347 |     )
348 |     return entity
349 | 
350 | 
351 | @pytest_asyncio.fixture
352 | async def test_graph(
353 |     entity_repository,
354 |     relation_repository,
355 |     observation_repository,
356 |     search_service,
357 |     file_service,
358 |     entity_service,
359 | ):
360 |     """Create a test knowledge graph with entities, relations and observations."""
361 | 
362 |     # Create some test entities in reverse order so they will be linked
363 |     deeper, _ = await entity_service.create_or_update_entity(
364 |         EntitySchema(
365 |             title="Deeper Entity",
366 |             entity_type="deeper",
367 |             folder="test",
368 |             content=dedent("""
369 |                 # Deeper Entity
370 |                 """),
371 |         )
372 |     )
373 | 
374 |     deep, _ = await entity_service.create_or_update_entity(
375 |         EntitySchema(
376 |             title="Deep Entity",
377 |             entity_type="deep",
378 |             folder="test",
379 |             content=dedent("""
380 |                 # Deep Entity
381 |                 - deeper_connection [[Deeper Entity]]
382 |                 """),
383 |         )
384 |     )
385 | 
386 |     connected_2, _ = await entity_service.create_or_update_entity(
387 |         EntitySchema(
388 |             title="Connected Entity 2",
389 |             entity_type="test",
390 |             folder="test",
391 |             content=dedent("""
392 |                 # Connected Entity 2
393 |                 - deep_connection [[Deep Entity]]
394 |                 """),
395 |         )
396 |     )
397 | 
398 |     connected_1, _ = await entity_service.create_or_update_entity(
399 |         EntitySchema(
400 |             title="Connected Entity 1",
401 |             entity_type="test",
402 |             folder="test",
403 |             content=dedent("""
404 |                 # Connected Entity 1
405 |                 - [note] Connected 1 note
406 |                 - connected_to [[Connected Entity 2]]
407 |                 """),
408 |         )
409 |     )
410 | 
411 |     root, _ = await entity_service.create_or_update_entity(
412 |         EntitySchema(
413 |             title="Root",
414 |             entity_type="test",
415 |             folder="test",
416 |             content=dedent("""
417 |                 # Root Entity
418 |                 - [note] Root note 1
419 |                 - [tech] Root tech note
420 |                 - connects_to [[Connected Entity 1]]
421 |                 """),
422 |         )
423 |     )
424 | 
425 |     # get latest
426 |     entities = await entity_repository.find_all()
427 |     relations = await relation_repository.find_all()
428 | 
429 |     # Index everything for search
430 |     for entity in entities:
431 |         await search_service.index_entity(entity)
432 | 
433 |     return {
434 |         "root": root,
435 |         "connected1": connected_1,
436 |         "connected2": connected_2,
437 |         "deep": deep,
438 |         "observations": [e.observations for e in entities],
439 |         "relations": relations,
440 |     }
441 | 
442 | 
443 | @pytest.fixture
444 | def watch_service(app_config: BasicMemoryConfig, project_repository) -> WatchService:
445 |     return WatchService(app_config=app_config, project_repository=project_repository)
446 | 
447 | 
448 | @pytest.fixture
449 | def test_files(project_config, project_root) -> dict[str, Path]:
450 |     """Copy test files into the project directory.
451 | 
452 |     Returns a dict mapping file names to their paths in the project dir.
453 |     """
454 |     # Source files relative to tests directory
455 |     source_files = {
456 |         "pdf": Path(project_root / "tests/Non-MarkdownFileSupport.pdf"),
457 |         "image": Path(project_root / "tests/Screenshot.png"),
458 |     }
459 | 
460 |     # Create copies in temp project directory
461 |     project_files = {}
462 |     for name, src_path in source_files.items():
463 |         # Read source file
464 |         content = src_path.read_bytes()
465 | 
466 |         # Create destination path and ensure parent dirs exist
467 |         dest_path = project_config.home / src_path.name
468 |         dest_path.parent.mkdir(parents=True, exist_ok=True)
469 | 
470 |         # Write file
471 |         dest_path.write_bytes(content)
472 |         project_files[name] = dest_path
473 | 
474 |     return project_files
475 | 
476 | 
477 | @pytest_asyncio.fixture
478 | async def synced_files(sync_service, project_config, test_files):
479 |     # Initial sync - should create forward reference
480 |     await sync_service.sync(project_config.home)
481 |     return test_files
482 | 
```

--------------------------------------------------------------------------------
/tests/utils/test_validate_project_path.py:
--------------------------------------------------------------------------------

```python
  1 | """Tests for the validate_project_path security function."""
  2 | 
  3 | import pytest
  4 | from pathlib import Path
  5 | 
  6 | from basic_memory.utils import validate_project_path
  7 | 
  8 | 
  9 | class TestValidateProjectPathSafety:
 10 |     """Test that validate_project_path correctly identifies safe paths."""
 11 | 
 12 |     def test_valid_relative_paths(self, tmp_path):
 13 |         """Test that legitimate relative paths are allowed."""
 14 |         project_path = tmp_path / "project"
 15 |         project_path.mkdir()
 16 | 
 17 |         safe_paths = [
 18 |             "notes/meeting.md",
 19 |             "docs/readme.txt",
 20 |             "folder/subfolder/file.txt",
 21 |             "simple-file.md",
 22 |             "research/findings-2025.md",
 23 |             "projects/basic-memory/docs.md",
 24 |             "deep/nested/directory/structure/file.txt",
 25 |             "file-with-hyphens.md",
 26 |             "file_with_underscores.txt",
 27 |             "file123.md",
 28 |             "UPPERCASE.MD",
 29 |             "MixedCase.txt",
 30 |         ]
 31 | 
 32 |         for path in safe_paths:
 33 |             assert validate_project_path(path, project_path), (
 34 |                 f"Safe path '{path}' should be allowed"
 35 |             )
 36 | 
 37 |     def test_empty_and_current_directory(self, tmp_path):
 38 |         """Test handling of empty paths and current directory references."""
 39 |         project_path = tmp_path / "project"
 40 |         project_path.mkdir()
 41 | 
 42 |         # Current directory should be safe
 43 |         assert validate_project_path(".", project_path)
 44 | 
 45 |         # Files in current directory should be safe
 46 |         assert validate_project_path("./file.txt", project_path)
 47 | 
 48 |     def test_nested_safe_paths(self, tmp_path):
 49 |         """Test deeply nested but safe paths."""
 50 |         project_path = tmp_path / "project"
 51 |         project_path.mkdir()
 52 | 
 53 |         nested_paths = [
 54 |             "level1/level2/level3/level4/file.txt",
 55 |             "very/deeply/nested/directory/structure/with/many/levels/file.md",
 56 |             "a/b/c/d/e/f/g/h/i/j/file.txt",
 57 |         ]
 58 | 
 59 |         for path in nested_paths:
 60 |             assert validate_project_path(path, project_path), (
 61 |                 f"Nested path '{path}' should be allowed"
 62 |             )
 63 | 
 64 | 
 65 | class TestValidateProjectPathAttacks:
 66 |     """Test that validate_project_path blocks path traversal attacks."""
 67 | 
 68 |     def test_unix_path_traversal(self, tmp_path):
 69 |         """Test that Unix-style path traversal is blocked."""
 70 |         project_path = tmp_path / "project"
 71 |         project_path.mkdir()
 72 | 
 73 |         attack_paths = [
 74 |             "../",
 75 |             "../../",
 76 |             "../../../",
 77 |             "../etc/passwd",
 78 |             "../../etc/passwd",
 79 |             "../../../etc/passwd",
 80 |             "../../../../etc/passwd",
 81 |             "../../.env",
 82 |             "../../../home/user/.ssh/id_rsa",
 83 |             "../../../../var/log/auth.log",
 84 |             "../../.bashrc",
 85 |             "../../../etc/shadow",
 86 |         ]
 87 | 
 88 |         for path in attack_paths:
 89 |             assert not validate_project_path(path, project_path), (
 90 |                 f"Attack path '{path}' should be blocked"
 91 |             )
 92 | 
 93 |     def test_windows_path_traversal(self, tmp_path):
 94 |         """Test that Windows-style path traversal is blocked."""
 95 |         project_path = tmp_path / "project"
 96 |         project_path.mkdir()
 97 | 
 98 |         attack_paths = [
 99 |             "..\\",
100 |             "..\\..\\",
101 |             "..\\..\\..\\",
102 |             "..\\..\\..\\Windows\\System32\\config\\SAM",
103 |             "..\\..\\..\\Users\\user\\.env",
104 |             "..\\..\\..\\Windows\\System32\\drivers\\etc\\hosts",
105 |             "..\\..\\Boot.ini",
106 |             "\\Windows\\System32",
107 |             "\\..\\..\\Windows",
108 |         ]
109 | 
110 |         for path in attack_paths:
111 |             assert not validate_project_path(path, project_path), (
112 |                 f"Windows attack path '{path}' should be blocked"
113 |             )
114 | 
115 |     def test_mixed_traversal_patterns(self, tmp_path):
116 |         """Test paths that mix legitimate content with traversal."""
117 |         project_path = tmp_path / "project"
118 |         project_path.mkdir()
119 | 
120 |         mixed_attacks = [
121 |             "notes/../../../etc/passwd",
122 |             "docs/../../.env",
123 |             "folder/subfolder/../../../etc/passwd",
124 |             "legitimate/path/../../.ssh/id_rsa",
125 |             "notes/../../../home/user/.bashrc",
126 |             "documents/../../Windows/System32/config/SAM",
127 |         ]
128 | 
129 |         for path in mixed_attacks:
130 |             assert not validate_project_path(path, project_path), (
131 |                 f"Mixed attack path '{path}' should be blocked"
132 |             )
133 | 
134 |     def test_home_directory_access(self, tmp_path):
135 |         """Test that home directory access patterns are blocked."""
136 |         project_path = tmp_path / "project"
137 |         project_path.mkdir()
138 | 
139 |         home_attacks = [
140 |             "~/",
141 |             "~/.env",
142 |             "~/.ssh/id_rsa",
143 |             "~/secrets.txt",
144 |             "~/Documents/passwords.txt",
145 |             "~\\AppData\\secrets",
146 |             "~\\Desktop\\config.ini",
147 |         ]
148 | 
149 |         for path in home_attacks:
150 |             assert not validate_project_path(path, project_path), (
151 |                 f"Home directory attack '{path}' should be blocked"
152 |             )
153 | 
154 |     def test_unc_and_network_paths(self, tmp_path):
155 |         """Test that UNC and network paths are blocked."""
156 |         project_path = tmp_path / "project"
157 |         project_path.mkdir()
158 | 
159 |         network_attacks = [
160 |             "\\\\server\\share",
161 |             "\\\\192.168.1.100\\c$",
162 |             "\\\\evil-server\\malicious-share\\file.exe",
163 |             "\\\\localhost\\c$\\Windows\\System32",
164 |         ]
165 | 
166 |         for path in network_attacks:
167 |             assert not validate_project_path(path, project_path), (
168 |                 f"Network path attack '{path}' should be blocked"
169 |             )
170 | 
171 |     def test_absolute_paths(self, tmp_path):
172 |         """Test that absolute paths are blocked (if they contain traversal)."""
173 |         project_path = tmp_path / "project"
174 |         project_path.mkdir()
175 | 
176 |         # Note: Some absolute paths might be allowed by pathlib resolution,
177 |         # but our function should catch traversal patterns first
178 |         absolute_attacks = [
179 |             "/etc/passwd",
180 |             "/home/user/.env",
181 |             "/var/log/auth.log",
182 |             "/root/.ssh/id_rsa",
183 |             "C:\\Windows\\System32\\config\\SAM",
184 |             "C:\\Users\\user\\.env",
185 |             "D:\\secrets\\config.json",
186 |         ]
187 | 
188 |         for path in absolute_attacks:
189 |             # These should be blocked either by traversal detection or pathlib resolution
190 |             result = validate_project_path(path, project_path)
191 |             assert not result, f"Absolute path '{path}' should be blocked"
192 | 
193 | 
194 | class TestValidateProjectPathEdgeCases:
195 |     """Test edge cases and error conditions."""
196 | 
197 |     def test_malformed_paths(self, tmp_path):
198 |         """Test handling of malformed or unusual paths."""
199 |         project_path = tmp_path / "project"
200 |         project_path.mkdir()
201 | 
202 |         malformed_paths = [
203 |             "",  # Empty string
204 |             "   ",  # Whitespace only
205 |             "\n",  # Newline
206 |             "\t",  # Tab
207 |             "\r\n",  # Windows line ending
208 |             "file\x00name",  # Null byte (if it gets this far)
209 |             "file\x01name",  # Other control characters
210 |         ]
211 | 
212 |         for path in malformed_paths:
213 |             # These should either be blocked or cause an exception that's handled
214 |             try:
215 |                 result = validate_project_path(path, project_path)
216 |                 if path.strip():  # Non-empty paths with control chars should be blocked
217 |                     assert not result, f"Malformed path '{repr(path)}' should be blocked"
218 |             except (ValueError, OSError):
219 |                 # It's acceptable for these to raise exceptions
220 |                 pass
221 | 
222 |     def test_very_long_paths(self, tmp_path):
223 |         """Test handling of very long paths."""
224 |         project_path = tmp_path / "project"
225 |         project_path.mkdir()
226 | 
227 |         # Create a very long but legitimate path
228 |         long_path = "/".join(["verylongdirectoryname" * 10 for _ in range(10)])
229 | 
230 |         # Should handle long paths gracefully (either allow or reject based on filesystem limits)
231 |         try:
232 |             result = validate_project_path(long_path, project_path)
233 |             # Result can be True or False, just shouldn't crash
234 |             assert isinstance(result, bool)
235 |         except (ValueError, OSError):
236 |             # It's acceptable for very long paths to raise exceptions
237 |             pass
238 | 
239 |     def test_nonexistent_project_path(self):
240 |         """Test behavior when project path doesn't exist."""
241 |         nonexistent_project = Path("/this/path/does/not/exist")
242 | 
243 |         # Should still be able to validate relative paths
244 |         assert validate_project_path("notes/file.txt", nonexistent_project)
245 |         assert not validate_project_path("../../../etc/passwd", nonexistent_project)
246 | 
247 |     def test_unicode_and_special_characters(self, tmp_path):
248 |         """Test paths with Unicode and special characters."""
249 |         project_path = tmp_path / "project"
250 |         project_path.mkdir()
251 | 
252 |         unicode_paths = [
253 |             "notes/文档.md",  # Chinese characters
254 |             "docs/résumé.txt",  # Accented characters
255 |             "files/naïve.md",  # Diaeresis
256 |             "notes/café.txt",  # Acute accent
257 |             "docs/日本語.md",  # Japanese
258 |             "files/αβγ.txt",  # Greek
259 |             "notes/файл.md",  # Cyrillic
260 |         ]
261 | 
262 |         for path in unicode_paths:
263 |             try:
264 |                 result = validate_project_path(path, project_path)
265 |                 assert isinstance(result, bool), f"Unicode path '{path}' should return boolean"
266 |                 # Unicode paths should generally be allowed if they don't contain traversal
267 |                 assert result, f"Unicode path '{path}' should be allowed"
268 |             except (UnicodeError, OSError):
269 |                 # Some unicode handling issues might be acceptable
270 |                 pass
271 | 
272 |     def test_case_sensitivity(self, tmp_path):
273 |         """Test case sensitivity of traversal detection."""
274 |         project_path = tmp_path / "project"
275 |         project_path.mkdir()
276 | 
277 |         # These should all be blocked regardless of case
278 |         case_variations = [
279 |             "../file.txt",
280 |             "../FILE.TXT",
281 |             "~/file.txt",
282 |             "~/FILE.TXT",
283 |         ]
284 | 
285 |         for path in case_variations:
286 |             assert not validate_project_path(path, project_path), (
287 |                 f"Case variation '{path}' should be blocked"
288 |             )
289 | 
290 |     def test_symbolic_link_behavior(self, tmp_path):
291 |         """Test behavior with symbolic links (if supported by filesystem)."""
292 |         project_path = tmp_path / "project"
293 |         project_path.mkdir()
294 | 
295 |         # Create a directory outside the project
296 |         outside_dir = tmp_path / "outside"
297 |         outside_dir.mkdir()
298 | 
299 |         try:
300 |             # Try to create a symlink inside the project pointing outside
301 |             symlink_path = project_path / "symlink"
302 |             symlink_path.symlink_to(outside_dir)
303 | 
304 |             # Paths through symlinks should be handled safely
305 |             result = validate_project_path("symlink/file.txt", project_path)
306 |             # The result can vary based on how pathlib handles symlinks,
307 |             # but it shouldn't crash and should be a boolean
308 |             assert isinstance(result, bool)
309 | 
310 |         except (OSError, NotImplementedError):
311 |             # Symlinks might not be supported on this filesystem
312 |             pytest.skip("Symbolic links not supported on this filesystem")
313 | 
314 |     def test_relative_path_edge_cases(self, tmp_path):
315 |         """Test edge cases in relative path handling."""
316 |         project_path = tmp_path / "project"
317 |         project_path.mkdir()
318 | 
319 |         edge_cases = [
320 |             ".",  # Current directory
321 |             "./",  # Current directory with slash
322 |             "./file.txt",  # File in current directory
323 |             "./folder/file.txt",  # Nested file through current directory
324 |             "folder/./file.txt",  # Current directory in middle of path
325 |             "folder/subfolder/.",  # Current directory at end
326 |         ]
327 | 
328 |         for path in edge_cases:
329 |             result = validate_project_path(path, project_path)
330 |             # These should generally be safe as they don't escape the project
331 |             assert result, f"Relative path edge case '{path}' should be allowed"
332 | 
333 | 
334 | class TestValidateProjectPathPerformance:
335 |     """Test performance characteristics of path validation."""
336 | 
337 |     def test_performance_with_many_paths(self, tmp_path):
338 |         """Test that validation performs reasonably with many paths."""
339 |         project_path = tmp_path / "project"
340 |         project_path.mkdir()
341 | 
342 |         # Test a mix of safe and dangerous paths
343 |         test_paths = []
344 | 
345 |         # Add safe paths
346 |         for i in range(100):
347 |             test_paths.append(f"folder{i}/file{i}.txt")
348 | 
349 |         # Add dangerous paths
350 |         for i in range(100):
351 |             test_paths.append(f"../../../etc/passwd{i}")
352 | 
353 |         import time
354 | 
355 |         start_time = time.time()
356 | 
357 |         for path in test_paths:
358 |             result = validate_project_path(path, project_path)
359 |             assert isinstance(result, bool)
360 | 
361 |         end_time = time.time()
362 | 
363 |         # Should complete reasonably quickly (adjust threshold as needed)
364 |         assert end_time - start_time < 1.0, "Path validation should be fast"
365 | 
366 | 
367 | class TestValidateProjectPathIntegration:
368 |     """Integration tests with real filesystem scenarios."""
369 | 
370 |     def test_with_actual_filesystem_structure(self, tmp_path):
371 |         """Test validation with actual files and directories."""
372 |         project_path = tmp_path / "project"
373 |         project_path.mkdir()
374 | 
375 |         # Create some actual files and directories
376 |         (project_path / "notes").mkdir()
377 |         (project_path / "docs").mkdir()
378 |         (project_path / "notes" / "meeting.md").write_text("# Meeting Notes")
379 |         (project_path / "docs" / "readme.txt").write_text("README")
380 | 
381 |         # Test accessing existing files
382 |         assert validate_project_path("notes/meeting.md", project_path)
383 |         assert validate_project_path("docs/readme.txt", project_path)
384 | 
385 |         # Test accessing non-existent but safe paths
386 |         assert validate_project_path("notes/new-file.md", project_path)
387 |         assert validate_project_path("new-folder/file.txt", project_path)
388 | 
389 |         # Test that attacks are still blocked even with real filesystem
390 |         assert not validate_project_path("../../../etc/passwd", project_path)
391 |         assert not validate_project_path("notes/../../../etc/passwd", project_path)
392 | 
393 |     def test_project_path_resolution_accuracy(self, tmp_path):
394 |         """Test that path resolution works correctly with real paths."""
395 |         # Create a more complex directory structure
396 |         base_path = tmp_path / "workspace"
397 |         project_path = base_path / "my-project"
398 |         sibling_path = base_path / "other-project"
399 | 
400 |         base_path.mkdir()
401 |         project_path.mkdir()
402 |         sibling_path.mkdir()
403 | 
404 |         # Create a sensitive file in the sibling directory
405 |         (sibling_path / "secrets.txt").write_text("secret data")
406 | 
407 |         # Try to access the sibling directory through traversal
408 |         attack_path = "../other-project/secrets.txt"
409 |         assert not validate_project_path(attack_path, project_path)
410 | 
411 |         # Verify that legitimate access within project works
412 |         assert validate_project_path("my-file.txt", project_path)
413 |         assert validate_project_path("subdir/my-file.txt", project_path)
414 | 
```
Page 11/23FirstPrevNextLast