This is page 11 of 23. Use http://codebase.md/basicmachines-co/basic-memory?lines=true&page={x} to view the full context.
# Directory Structure
```
├── .claude
│ ├── agents
│ │ ├── python-developer.md
│ │ └── system-architect.md
│ └── commands
│ ├── release
│ │ ├── beta.md
│ │ ├── changelog.md
│ │ ├── release-check.md
│ │ └── release.md
│ ├── spec.md
│ └── test-live.md
├── .dockerignore
├── .github
│ ├── dependabot.yml
│ ├── ISSUE_TEMPLATE
│ │ ├── bug_report.md
│ │ ├── config.yml
│ │ ├── documentation.md
│ │ └── feature_request.md
│ └── workflows
│ ├── claude-code-review.yml
│ ├── claude-issue-triage.yml
│ ├── claude.yml
│ ├── dev-release.yml
│ ├── docker.yml
│ ├── pr-title.yml
│ ├── release.yml
│ └── test.yml
├── .gitignore
├── .python-version
├── CHANGELOG.md
├── CITATION.cff
├── CLA.md
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── docker-compose.yml
├── Dockerfile
├── docs
│ ├── ai-assistant-guide-extended.md
│ ├── character-handling.md
│ ├── cloud-cli.md
│ └── Docker.md
├── justfile
├── LICENSE
├── llms-install.md
├── pyproject.toml
├── README.md
├── SECURITY.md
├── smithery.yaml
├── specs
│ ├── SPEC-1 Specification-Driven Development Process.md
│ ├── SPEC-10 Unified Deployment Workflow and Event Tracking.md
│ ├── SPEC-11 Basic Memory API Performance Optimization.md
│ ├── SPEC-12 OpenTelemetry Observability.md
│ ├── SPEC-13 CLI Authentication with Subscription Validation.md
│ ├── SPEC-14 Cloud Git Versioning & GitHub Backup.md
│ ├── SPEC-14- Cloud Git Versioning & GitHub Backup.md
│ ├── SPEC-15 Configuration Persistence via Tigris for Cloud Tenants.md
│ ├── SPEC-16 MCP Cloud Service Consolidation.md
│ ├── SPEC-17 Semantic Search with ChromaDB.md
│ ├── SPEC-18 AI Memory Management Tool.md
│ ├── SPEC-19 Sync Performance and Memory Optimization.md
│ ├── SPEC-2 Slash Commands Reference.md
│ ├── SPEC-20 Simplified Project-Scoped Rclone Sync.md
│ ├── SPEC-3 Agent Definitions.md
│ ├── SPEC-4 Notes Web UI Component Architecture.md
│ ├── SPEC-5 CLI Cloud Upload via WebDAV.md
│ ├── SPEC-6 Explicit Project Parameter Architecture.md
│ ├── SPEC-7 POC to spike Tigris Turso for local access to cloud data.md
│ ├── SPEC-8 TigrisFS Integration.md
│ ├── SPEC-9 Multi-Project Bidirectional Sync Architecture.md
│ ├── SPEC-9 Signed Header Tenant Information.md
│ └── SPEC-9-1 Follow-Ups- Conflict, Sync, and Observability.md
├── src
│ └── basic_memory
│ ├── __init__.py
│ ├── alembic
│ │ ├── alembic.ini
│ │ ├── env.py
│ │ ├── migrations.py
│ │ ├── script.py.mako
│ │ └── versions
│ │ ├── 3dae7c7b1564_initial_schema.py
│ │ ├── 502b60eaa905_remove_required_from_entity_permalink.py
│ │ ├── 5fe1ab1ccebe_add_projects_table.py
│ │ ├── 647e7a75e2cd_project_constraint_fix.py
│ │ ├── 9d9c1cb7d8f5_add_mtime_and_size_columns_to_entity_.py
│ │ ├── a1b2c3d4e5f6_fix_project_foreign_keys.py
│ │ ├── b3c3938bacdb_relation_to_name_unique_index.py
│ │ ├── cc7172b46608_update_search_index_schema.py
│ │ └── e7e1f4367280_add_scan_watermark_tracking_to_project.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── app.py
│ │ ├── routers
│ │ │ ├── __init__.py
│ │ │ ├── directory_router.py
│ │ │ ├── importer_router.py
│ │ │ ├── knowledge_router.py
│ │ │ ├── management_router.py
│ │ │ ├── memory_router.py
│ │ │ ├── project_router.py
│ │ │ ├── prompt_router.py
│ │ │ ├── resource_router.py
│ │ │ ├── search_router.py
│ │ │ └── utils.py
│ │ └── template_loader.py
│ ├── cli
│ │ ├── __init__.py
│ │ ├── app.py
│ │ ├── auth.py
│ │ ├── commands
│ │ │ ├── __init__.py
│ │ │ ├── cloud
│ │ │ │ ├── __init__.py
│ │ │ │ ├── api_client.py
│ │ │ │ ├── bisync_commands.py
│ │ │ │ ├── cloud_utils.py
│ │ │ │ ├── core_commands.py
│ │ │ │ ├── rclone_commands.py
│ │ │ │ ├── rclone_config.py
│ │ │ │ ├── rclone_installer.py
│ │ │ │ ├── upload_command.py
│ │ │ │ └── upload.py
│ │ │ ├── command_utils.py
│ │ │ ├── db.py
│ │ │ ├── import_chatgpt.py
│ │ │ ├── import_claude_conversations.py
│ │ │ ├── import_claude_projects.py
│ │ │ ├── import_memory_json.py
│ │ │ ├── mcp.py
│ │ │ ├── project.py
│ │ │ ├── status.py
│ │ │ └── tool.py
│ │ └── main.py
│ ├── config.py
│ ├── db.py
│ ├── deps.py
│ ├── file_utils.py
│ ├── ignore_utils.py
│ ├── importers
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── chatgpt_importer.py
│ │ ├── claude_conversations_importer.py
│ │ ├── claude_projects_importer.py
│ │ ├── memory_json_importer.py
│ │ └── utils.py
│ ├── markdown
│ │ ├── __init__.py
│ │ ├── entity_parser.py
│ │ ├── markdown_processor.py
│ │ ├── plugins.py
│ │ ├── schemas.py
│ │ └── utils.py
│ ├── mcp
│ │ ├── __init__.py
│ │ ├── async_client.py
│ │ ├── project_context.py
│ │ ├── prompts
│ │ │ ├── __init__.py
│ │ │ ├── ai_assistant_guide.py
│ │ │ ├── continue_conversation.py
│ │ │ ├── recent_activity.py
│ │ │ ├── search.py
│ │ │ └── utils.py
│ │ ├── resources
│ │ │ ├── ai_assistant_guide.md
│ │ │ └── project_info.py
│ │ ├── server.py
│ │ └── tools
│ │ ├── __init__.py
│ │ ├── build_context.py
│ │ ├── canvas.py
│ │ ├── chatgpt_tools.py
│ │ ├── delete_note.py
│ │ ├── edit_note.py
│ │ ├── list_directory.py
│ │ ├── move_note.py
│ │ ├── project_management.py
│ │ ├── read_content.py
│ │ ├── read_note.py
│ │ ├── recent_activity.py
│ │ ├── search.py
│ │ ├── utils.py
│ │ ├── view_note.py
│ │ └── write_note.py
│ ├── models
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── knowledge.py
│ │ ├── project.py
│ │ └── search.py
│ ├── repository
│ │ ├── __init__.py
│ │ ├── entity_repository.py
│ │ ├── observation_repository.py
│ │ ├── project_info_repository.py
│ │ ├── project_repository.py
│ │ ├── relation_repository.py
│ │ ├── repository.py
│ │ └── search_repository.py
│ ├── schemas
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── cloud.py
│ │ ├── delete.py
│ │ ├── directory.py
│ │ ├── importer.py
│ │ ├── memory.py
│ │ ├── project_info.py
│ │ ├── prompt.py
│ │ ├── request.py
│ │ ├── response.py
│ │ ├── search.py
│ │ └── sync_report.py
│ ├── services
│ │ ├── __init__.py
│ │ ├── context_service.py
│ │ ├── directory_service.py
│ │ ├── entity_service.py
│ │ ├── exceptions.py
│ │ ├── file_service.py
│ │ ├── initialization.py
│ │ ├── link_resolver.py
│ │ ├── project_service.py
│ │ ├── search_service.py
│ │ └── service.py
│ ├── sync
│ │ ├── __init__.py
│ │ ├── background_sync.py
│ │ ├── sync_service.py
│ │ └── watch_service.py
│ ├── templates
│ │ └── prompts
│ │ ├── continue_conversation.hbs
│ │ └── search.hbs
│ └── utils.py
├── test-int
│ ├── BENCHMARKS.md
│ ├── cli
│ │ ├── test_project_commands_integration.py
│ │ └── test_version_integration.py
│ ├── conftest.py
│ ├── mcp
│ │ ├── test_build_context_underscore.py
│ │ ├── test_build_context_validation.py
│ │ ├── test_chatgpt_tools_integration.py
│ │ ├── test_default_project_mode_integration.py
│ │ ├── test_delete_note_integration.py
│ │ ├── test_edit_note_integration.py
│ │ ├── test_list_directory_integration.py
│ │ ├── test_move_note_integration.py
│ │ ├── test_project_management_integration.py
│ │ ├── test_project_state_sync_integration.py
│ │ ├── test_read_content_integration.py
│ │ ├── test_read_note_integration.py
│ │ ├── test_search_integration.py
│ │ ├── test_single_project_mcp_integration.py
│ │ └── test_write_note_integration.py
│ ├── test_db_wal_mode.py
│ ├── test_disable_permalinks_integration.py
│ └── test_sync_performance_benchmark.py
├── tests
│ ├── __init__.py
│ ├── api
│ │ ├── conftest.py
│ │ ├── test_async_client.py
│ │ ├── test_continue_conversation_template.py
│ │ ├── test_directory_router.py
│ │ ├── test_importer_router.py
│ │ ├── test_knowledge_router.py
│ │ ├── test_management_router.py
│ │ ├── test_memory_router.py
│ │ ├── test_project_router_operations.py
│ │ ├── test_project_router.py
│ │ ├── test_prompt_router.py
│ │ ├── test_relation_background_resolution.py
│ │ ├── test_resource_router.py
│ │ ├── test_search_router.py
│ │ ├── test_search_template.py
│ │ ├── test_template_loader_helpers.py
│ │ └── test_template_loader.py
│ ├── cli
│ │ ├── conftest.py
│ │ ├── test_cli_tools.py
│ │ ├── test_cloud_authentication.py
│ │ ├── test_ignore_utils.py
│ │ ├── test_import_chatgpt.py
│ │ ├── test_import_claude_conversations.py
│ │ ├── test_import_claude_projects.py
│ │ ├── test_import_memory_json.py
│ │ ├── test_project_add_with_local_path.py
│ │ └── test_upload.py
│ ├── conftest.py
│ ├── db
│ │ └── test_issue_254_foreign_key_constraints.py
│ ├── importers
│ │ ├── test_importer_base.py
│ │ └── test_importer_utils.py
│ ├── markdown
│ │ ├── __init__.py
│ │ ├── test_date_frontmatter_parsing.py
│ │ ├── test_entity_parser_error_handling.py
│ │ ├── test_entity_parser.py
│ │ ├── test_markdown_plugins.py
│ │ ├── test_markdown_processor.py
│ │ ├── test_observation_edge_cases.py
│ │ ├── test_parser_edge_cases.py
│ │ ├── test_relation_edge_cases.py
│ │ └── test_task_detection.py
│ ├── mcp
│ │ ├── conftest.py
│ │ ├── test_obsidian_yaml_formatting.py
│ │ ├── test_permalink_collision_file_overwrite.py
│ │ ├── test_prompts.py
│ │ ├── test_resources.py
│ │ ├── test_tool_build_context.py
│ │ ├── test_tool_canvas.py
│ │ ├── test_tool_delete_note.py
│ │ ├── test_tool_edit_note.py
│ │ ├── test_tool_list_directory.py
│ │ ├── test_tool_move_note.py
│ │ ├── test_tool_read_content.py
│ │ ├── test_tool_read_note.py
│ │ ├── test_tool_recent_activity.py
│ │ ├── test_tool_resource.py
│ │ ├── test_tool_search.py
│ │ ├── test_tool_utils.py
│ │ ├── test_tool_view_note.py
│ │ ├── test_tool_write_note.py
│ │ └── tools
│ │ └── test_chatgpt_tools.py
│ ├── Non-MarkdownFileSupport.pdf
│ ├── repository
│ │ ├── test_entity_repository_upsert.py
│ │ ├── test_entity_repository.py
│ │ ├── test_entity_upsert_issue_187.py
│ │ ├── test_observation_repository.py
│ │ ├── test_project_info_repository.py
│ │ ├── test_project_repository.py
│ │ ├── test_relation_repository.py
│ │ ├── test_repository.py
│ │ ├── test_search_repository_edit_bug_fix.py
│ │ └── test_search_repository.py
│ ├── schemas
│ │ ├── test_base_timeframe_minimum.py
│ │ ├── test_memory_serialization.py
│ │ ├── test_memory_url_validation.py
│ │ ├── test_memory_url.py
│ │ ├── test_schemas.py
│ │ └── test_search.py
│ ├── Screenshot.png
│ ├── services
│ │ ├── test_context_service.py
│ │ ├── test_directory_service.py
│ │ ├── test_entity_service_disable_permalinks.py
│ │ ├── test_entity_service.py
│ │ ├── test_file_service.py
│ │ ├── test_initialization.py
│ │ ├── test_link_resolver.py
│ │ ├── test_project_removal_bug.py
│ │ ├── test_project_service_operations.py
│ │ ├── test_project_service.py
│ │ └── test_search_service.py
│ ├── sync
│ │ ├── test_character_conflicts.py
│ │ ├── test_sync_service_incremental.py
│ │ ├── test_sync_service.py
│ │ ├── test_sync_wikilink_issue.py
│ │ ├── test_tmp_files.py
│ │ ├── test_watch_service_edge_cases.py
│ │ ├── test_watch_service_reload.py
│ │ └── test_watch_service.py
│ ├── test_config.py
│ ├── test_db_migration_deduplication.py
│ ├── test_deps.py
│ ├── test_production_cascade_delete.py
│ ├── test_rclone_commands.py
│ └── utils
│ ├── test_file_utils.py
│ ├── test_frontmatter_obsidian_compatible.py
│ ├── test_parse_tags.py
│ ├── test_permalink_formatting.py
│ ├── test_utf8_handling.py
│ └── test_validate_project_path.py
├── uv.lock
├── v0.15.0-RELEASE-DOCS.md
└── v15-docs
├── api-performance.md
├── background-relations.md
├── basic-memory-home.md
├── bug-fixes.md
├── chatgpt-integration.md
├── cloud-authentication.md
├── cloud-bisync.md
├── cloud-mode-usage.md
├── cloud-mount.md
├── default-project-mode.md
├── env-file-removal.md
├── env-var-overrides.md
├── explicit-project-parameter.md
├── gitignore-integration.md
├── project-root-env-var.md
├── README.md
└── sqlite-performance.md
```
# Files
--------------------------------------------------------------------------------
/specs/SPEC-9-1 Follow-Ups- Conflict, Sync, and Observability.md:
--------------------------------------------------------------------------------
```markdown
1 | ---
2 | title: 'SPEC-9-1 Follow-Ups: Conflict, Sync, and Observability'
3 | type: tasklist
4 | permalink: specs/spec-9-follow-ups-conflict-sync-and-observability
5 | related: specs/spec-9-multi-project-bisync
6 | status: revised
7 | revision_date: 2025-10-03
8 | ---
9 |
10 | # SPEC-9-1 Follow-Ups: Conflict, Sync, and Observability
11 |
12 | **REVISED 2025-10-03:** Simplified to leverage rclone built-ins instead of custom conflict handling.
13 |
14 | **Context:** SPEC-9 delivered multi-project bidirectional sync and a unified CLI. This follow-up focuses on **observability and safety** using rclone's built-in capabilities rather than reinventing conflict handling.
15 |
16 | **Design Philosophy: "Be Dumb Like Git"**
17 | - Let rclone bisync handle conflict detection (it already does this)
18 | - Make conflicts visible and recoverable, don't prevent them
19 | - Cloud is always the winner on conflict (cloud-primary model)
20 | - Users who want version history can just use Git locally in their sync directory
21 |
22 | **What Changed from Original Version:**
23 | - **Replaced:** Custom `.bmmeta` sidecars → Use rclone's `.bisync/` state tracking
24 | - **Replaced:** Custom conflict detection → Use rclone bisync 3-way merge
25 | - **Replaced:** Tombstone files → rclone delete tracking handles this
26 | - **Replaced:** Distributed lease → Local process lock only (document multi-device warning)
27 | - **Replaced:** S3 versioning service → Users just use Git locally if they want history
28 | - **Deferred:** SPEC-14 Git integration → Postponed to teams/multi-user features
29 |
30 | ## ✅ Now
31 | - [ ] **Local process lock**: Prevent concurrent bisync runs on same device (`~/.basic-memory/sync.lock`)
32 | - [ ] **Structured sync reports**: Parse rclone bisync output into JSON reports (creates/updates/deletes/conflicts, bytes, duration); `bm sync --report`
33 | - [ ] **Multi-device warning**: Document that users should not run `--watch` on multiple devices simultaneously
34 | - [ ] **Version control guidance**: Document pattern for users to use Git locally in their sync directory if they want version history
35 | - [ ] **Docs polish**: cloud-mode toggle, mount↔bisync directory isolation, conflict semantics, quick start, migration guide, short demo clip/GIF
36 |
37 | ## 🔜 Next
38 | - [ ] **Observability commands**: `bm conflicts list`, `bm sync history` to view sync reports and conflicts
39 | - [ ] **Conflict resolution UI**: `bm conflicts resolve <file>` to interactively pick winner from conflict files
40 | - [ ] **Selective sync**: allow include/exclude by project; per-project profile (safe/balanced/fast)
41 |
42 | ## 🧭 Later
43 | - [ ] **Near real-time sync**: File watcher → targeted `rclone copy` for individual files (keep bisync as backstop)
44 | - [ ] **Sharing / scoped tokens**: cross-tenant/project access
45 | - [ ] **Bandwidth controls & backpressure**: policy for large repos
46 | - [ ] **Client-side encryption (optional)**: with clear trade-offs
47 |
48 | ## 📏 Acceptance criteria (for "Now" items)
49 | - [ ] Local process lock prevents concurrent bisync runs on same device
50 | - [ ] rclone bisync conflict files visible and documented (`file.conflict1.md`, `file.conflict2.md`)
51 | - [ ] `bm sync --report` generates parsable JSON with sync statistics
52 | - [ ] Documentation clearly warns about multi-device `--watch` mode
53 | - [ ] Documentation shows users how to use Git locally for version history
54 |
55 | ## What We're NOT Building (Deferred to rclone)
56 | - ❌ Custom `.bmmeta` sidecars (rclone tracks state in `.bisync/` workdir)
57 | - ❌ Custom conflict detection (rclone bisync already does 3-way merge detection)
58 | - ❌ Tombstone files (S3 versioning + rclone delete tracking handles this)
59 | - ❌ Distributed lease (low probability issue, rclone detects state divergence)
60 | - ❌ Rename/move tracking (rclone has size+modtime heuristics built-in)
61 |
62 | ## Implementation Summary
63 |
64 | **Current State (SPEC-9):**
65 | - ✅ rclone bisync with 3 profiles (safe/balanced/fast)
66 | - ✅ `--max-delete` safety limits (10/25/50 files)
67 | - ✅ `--conflict-resolve=newer` for auto-resolution
68 | - ✅ Watch mode: `bm sync --watch` (60s intervals)
69 | - ✅ Integrity checking: `bm cloud check`
70 | - ✅ Mount vs bisync directory isolation
71 |
72 | **What's Needed (This Spec):**
73 | 1. **Process lock** - Simple file-based lock in `~/.basic-memory/sync.lock`
74 | 2. **Sync reports** - Parse rclone output, save to `~/.basic-memory/sync-history/`
75 | 3. **Documentation** - Multi-device warnings, conflict resolution workflow, Git usage pattern
76 |
77 | **User Model:**
78 | - Cloud is always the winner on conflict (cloud-primary)
79 | - rclone creates `.conflict` files for divergent edits
80 | - Users who want version history just use Git in their local sync directory
81 | - Users warned: don't run `--watch` on multiple devices
82 |
83 | ## Decision Rationale & Trade-offs
84 |
85 | ### Why Trust rclone Instead of Custom Conflict Handling?
86 |
87 | **rclone bisync already provides:**
88 | - 3-way merge detection (compares local, remote, and last-known state)
89 | - File state tracking in `.bisync/` workdir (hashes, modtimes)
90 | - Automatic conflict file creation: `file.conflict1.md`, `file.conflict2.md`
91 | - Rename detection via size+modtime heuristics
92 | - Delete tracking (prevents resurrection of deleted files)
93 | - Battle-tested with extensive edge case handling
94 |
95 | **What we'd have to build with custom approach:**
96 | - Per-file metadata tracking (`.bmmeta` sidecars)
97 | - 3-way diff algorithm
98 | - Conflict detection logic
99 | - Tombstone files for deletes
100 | - Rename/move detection
101 | - Testing for all edge cases
102 |
103 | **Decision:** Use what rclone already does well. Don't reinvent the wheel.
104 |
105 | ### Why Let Users Use Git Locally Instead of Building Versioning?
106 |
107 | **The simplest solution: Just use Git**
108 |
109 | Users who want version history can literally just use Git in their sync directory:
110 |
111 | ```bash
112 | cd ~/basic-memory-cloud-sync/
113 | git init
114 | git add .
115 | git commit -m "backup"
116 |
117 | # Push to their own GitHub if they want
118 | git remote add origin [email protected]:user/my-knowledge.git
119 | git push
120 | ```
121 |
122 | **Why this is perfect:**
123 | - ✅ We build nothing
124 | - ✅ Users who want Git... just use Git
125 | - ✅ Users who don't care... don't need to
126 | - ✅ rclone bisync already handles sync conflicts
127 | - ✅ Users own their data, they can version it however they want (Git, Time Machine, etc.)
128 |
129 | **What we'd have to build for S3 versioning:**
130 | - API to enable versioning on Tigris buckets
131 | - **Problem**: Tigris doesn't support S3 bucket versioning
132 | - Restore commands: `bm cloud restore --version-id`
133 | - Version listing: `bm cloud versions <path>`
134 | - Lifecycle policies for version retention
135 | - Documentation and user education
136 |
137 | **What we'd have to build for SPEC-14 Git integration:**
138 | - Committer service (daemon watching `/app/data/`)
139 | - Puller service (webhook handler for GitHub pushes)
140 | - Git LFS for large files
141 | - Loop prevention between Git ↔ bisync ↔ local
142 | - Merge conflict handling at TWO layers (rclone + Git)
143 | - Webhook infrastructure and monitoring
144 |
145 | **Decision:** Don't build version control. Document the pattern. "The easiest problem to solve is the one you avoid."
146 |
147 | **When to revisit:** Teams/multi-user features where server-side version control becomes necessary for collaboration.
148 |
149 | ### Why No Distributed Lease?
150 |
151 | **Low probability issue:**
152 | - Requires user to manually run `bm sync` on multiple devices at exact same time
153 | - Most users run `--watch` on one primary device
154 | - rclone bisync detects state divergence and fails safely
155 |
156 | **Safety nets in place:**
157 | - Local process lock prevents concurrent runs on same device
158 | - rclone bisync aborts if bucket state changed during sync
159 | - S3 versioning recovers from any overwrites
160 | - Documentation warns against multi-device `--watch`
161 |
162 | **Failure mode:**
163 | ```bash
164 | # Device A and B sync simultaneously
165 | Device A: bm sync → succeeds
166 | Device B: bm sync → "Error: path has changed, run --resync"
167 |
168 | # User fixes with resync
169 | Device B: bm sync --resync → establishes new baseline
170 | ```
171 |
172 | **Decision:** Document the issue, add local lock, defer distributed coordination until users report actual problems.
173 |
174 | ### Cloud-Primary Conflict Model
175 |
176 | **User mental model:**
177 | - Cloud is the source of truth (like Dropbox/iCloud)
178 | - Local is working copy
179 | - On conflict: cloud wins, local edits → `.conflict` file
180 | - User manually picks winner
181 |
182 | **Why this works:**
183 | - Simpler than bidirectional merge (no automatic resolution risk)
184 | - Matches user expectations from Dropbox
185 | - S3 versioning provides safety net for overwrites
186 | - Clear recovery path: restore from S3 version if needed
187 |
188 | **Example workflow:**
189 | ```bash
190 | # Edit file on Device A and Device B while offline
191 | # Both devices come online and sync
192 |
193 | Device A: bm sync
194 | # → Pushes to cloud first, becomes canonical version
195 |
196 | Device B: bm sync
197 | # → Detects conflict
198 | # → Cloud version: work/notes.md
199 | # → Local version: work/notes.md.conflict1
200 | # → User manually merges or picks winner
201 |
202 | # Restore if needed
203 | bm cloud restore work/notes.md --version-id abc123
204 | ```
205 |
206 | ## Implementation Details
207 |
208 | ### 1. Local Process Lock
209 |
210 | ```python
211 | # ~/.basic-memory/sync.lock
212 | import os
213 | import psutil
214 | from pathlib import Path
215 |
216 | class SyncLock:
217 | def __init__(self):
218 | self.lock_file = Path.home() / '.basic-memory' / 'sync.lock'
219 |
220 | def acquire(self):
221 | if self.lock_file.exists():
222 | pid = int(self.lock_file.read_text())
223 | if psutil.pid_exists(pid):
224 | raise BisyncError(
225 | f"Sync already running (PID {pid}). "
226 | f"Wait for completion or kill stale process."
227 | )
228 | # Stale lock, remove it
229 | self.lock_file.unlink()
230 |
231 | self.lock_file.write_text(str(os.getpid()))
232 |
233 | def release(self):
234 | if self.lock_file.exists():
235 | self.lock_file.unlink()
236 |
237 | def __enter__(self):
238 | self.acquire()
239 | return self
240 |
241 | def __exit__(self, *args):
242 | self.release()
243 |
244 | # Usage
245 | with SyncLock():
246 | run_rclone_bisync()
247 | ```
248 |
249 | ### 3. Sync Report Parsing
250 |
251 | ```python
252 | # Parse rclone bisync output
253 | import json
254 | from datetime import datetime
255 | from pathlib import Path
256 |
257 | def parse_sync_report(rclone_output: str, duration: float, exit_code: int) -> dict:
258 | """Parse rclone bisync output into structured report."""
259 |
260 | # rclone bisync outputs lines like:
261 | # "Synching Path1 /local/path with Path2 remote:bucket"
262 | # "- Path1 File was copied to Path2"
263 | # "Bisync successful"
264 |
265 | report = {
266 | "timestamp": datetime.now().isoformat(),
267 | "duration_seconds": duration,
268 | "exit_code": exit_code,
269 | "success": exit_code == 0,
270 | "files_created": 0,
271 | "files_updated": 0,
272 | "files_deleted": 0,
273 | "conflicts": [],
274 | "errors": []
275 | }
276 |
277 | for line in rclone_output.split('\n'):
278 | if 'was copied to' in line:
279 | report['files_created'] += 1
280 | elif 'was updated in' in line:
281 | report['files_updated'] += 1
282 | elif 'was deleted from' in line:
283 | report['files_deleted'] += 1
284 | elif '.conflict' in line:
285 | report['conflicts'].append(line.strip())
286 | elif 'ERROR' in line:
287 | report['errors'].append(line.strip())
288 |
289 | return report
290 |
291 | def save_sync_report(report: dict):
292 | """Save sync report to history."""
293 | history_dir = Path.home() / '.basic-memory' / 'sync-history'
294 | history_dir.mkdir(parents=True, exist_ok=True)
295 |
296 | timestamp = datetime.now().strftime('%Y%m%d-%H%M%S')
297 | report_file = history_dir / f'{timestamp}.json'
298 |
299 | report_file.write_text(json.dumps(report, indent=2))
300 |
301 | # Usage in run_bisync()
302 | start_time = time.time()
303 | result = subprocess.run(bisync_cmd, capture_output=True, text=True)
304 | duration = time.time() - start_time
305 |
306 | report = parse_sync_report(result.stdout, duration, result.returncode)
307 | save_sync_report(report)
308 |
309 | if report['conflicts']:
310 | console.print(f"[yellow]⚠ {len(report['conflicts'])} conflict(s) detected[/yellow]")
311 | console.print("[dim]Run 'bm conflicts list' to view[/dim]")
312 | ```
313 |
314 | ### 4. User Commands
315 |
316 | ```bash
317 | # View sync history
318 | bm sync history
319 | # → Lists recent syncs from ~/.basic-memory/sync-history/*.json
320 | # → Shows: timestamp, duration, files changed, conflicts, errors
321 |
322 | # View current conflicts
323 | bm conflicts list
324 | # → Scans sync directory for *.conflict* files
325 | # → Shows: file path, conflict versions, timestamps
326 |
327 | # Restore from S3 version
328 | bm cloud restore work/notes.md --version-id abc123
329 | # → Uses aws s3api get-object with version-id
330 | # → Downloads to original path
331 |
332 | bm cloud restore work/notes.md --timestamp "2025-10-03 14:30"
333 | # → Lists versions, finds closest to timestamp
334 | # → Downloads that version
335 |
336 | # List file versions
337 | bm cloud versions work/notes.md
338 | # → Uses aws s3api list-object-versions
339 | # → Shows: version-id, timestamp, size, author
340 |
341 | # Interactive conflict resolution
342 | bm conflicts resolve work/notes.md
343 | # → Shows both versions side-by-side
344 | # → Prompts: Keep local, keep cloud, merge manually, restore from S3 version
345 | # → Cleans up .conflict files after resolution
346 | ```
347 |
348 | ## Success Metrics & Monitoring
349 |
350 | **Phase 1 (v1) - Basic Safety:**
351 | - [ ] Conflict detection rate < 5% of syncs (measure in telemetry)
352 | - [ ] User can resolve conflicts within 5 minutes (UX testing)
353 | - [ ] Documentation prevents 90% of multi-device issues
354 |
355 | **Phase 2 (v2) - Observability:**
356 | - [ ] 80% of users check `bm sync history` when troubleshooting
357 | - [ ] Average time to restore from S3 version < 2 minutes
358 | -
359 | - [ ] Conflict resolution success rate > 95%
360 |
361 | **What to measure:**
362 | ```python
363 | # Telemetry in sync reports
364 | {
365 | "conflict_rate": conflicts / total_syncs,
366 | "multi_device_collisions": count_state_divergence_errors,
367 | "version_restores": count_restore_operations,
368 | "avg_sync_duration": sum(durations) / count,
369 | "max_delete_trips": count_max_delete_aborts
370 | }
371 | ```
372 |
373 | **When to add distributed lease:**
374 | - Multi-device collision rate > 5% of syncs
375 | - User complaints about state divergence errors
376 | - Evidence that local lock isn't sufficient
377 |
378 | **When to revisit Git (SPEC-14):**
379 | - Teams feature launches (multi-user collaboration)
380 | - Users request commit messages / audit trail
381 | - PR-based review workflow becomes valuable
382 |
383 | ## Links
384 | - SPEC-9: `specs/spec-9-multi-project-bisync`
385 | - SPEC-14: `specs/spec-14-cloud-git-versioning` (deferred in favor of S3 versioning)
386 | - rclone bisync docs: https://rclone.org/bisync/
387 | - Tigris S3 versioning: https://www.tigrisdata.com/docs/buckets/versioning/
388 |
389 | ---
390 | **Owner:** <assign> | **Review cadence:** weekly in standup | **Last updated:** 2025-10-03
391 |
```
--------------------------------------------------------------------------------
/tests/api/test_resource_router.py:
--------------------------------------------------------------------------------
```python
1 | """Tests for resource router endpoints."""
2 |
3 | import json
4 | from datetime import datetime, timezone
5 | from pathlib import Path
6 |
7 | import pytest
8 |
9 | from basic_memory.schemas import EntityResponse
10 | from basic_memory.utils import normalize_newlines
11 |
12 |
13 | @pytest.mark.asyncio
14 | async def test_get_resource_content(client, project_config, entity_repository, project_url):
15 | """Test getting content by permalink."""
16 | # Create a test file
17 | content = "# Test Content\n\nThis is a test file."
18 | test_file = Path(project_config.home) / "test" / "test.md"
19 | test_file.parent.mkdir(parents=True, exist_ok=True)
20 | test_file.write_text(content)
21 |
22 | # Create entity referencing the file
23 | entity = await entity_repository.create(
24 | {
25 | "title": "Test Entity",
26 | "entity_type": "test",
27 | "permalink": "test/test",
28 | "file_path": "test/test.md", # Relative to config.home
29 | "content_type": "text/markdown",
30 | "created_at": datetime.now(timezone.utc),
31 | "updated_at": datetime.now(timezone.utc),
32 | }
33 | )
34 |
35 | # Test getting the content
36 | response = await client.get(f"{project_url}/resource/{entity.permalink}")
37 | assert response.status_code == 200
38 | assert response.headers["content-type"] == "text/markdown; charset=utf-8"
39 | assert response.text == normalize_newlines(content)
40 |
41 |
42 | @pytest.mark.asyncio
43 | async def test_get_resource_pagination(client, project_config, entity_repository, project_url):
44 | """Test getting content by permalink with pagination."""
45 | # Create a test file
46 | content = "# Test Content\n\nThis is a test file."
47 | test_file = Path(project_config.home) / "test" / "test.md"
48 | test_file.parent.mkdir(parents=True, exist_ok=True)
49 | test_file.write_text(content)
50 |
51 | # Create entity referencing the file
52 | entity = await entity_repository.create(
53 | {
54 | "title": "Test Entity",
55 | "entity_type": "test",
56 | "permalink": "test/test",
57 | "file_path": "test/test.md", # Relative to config.home
58 | "content_type": "text/markdown",
59 | "created_at": datetime.now(timezone.utc),
60 | "updated_at": datetime.now(timezone.utc),
61 | }
62 | )
63 |
64 | # Test getting the content
65 | response = await client.get(
66 | f"{project_url}/resource/{entity.permalink}", params={"page": 1, "page_size": 1}
67 | )
68 | assert response.status_code == 200
69 | assert response.headers["content-type"] == "text/markdown; charset=utf-8"
70 | assert response.text == normalize_newlines(content)
71 |
72 |
73 | @pytest.mark.asyncio
74 | async def test_get_resource_by_title(client, project_config, entity_repository, project_url):
75 | """Test getting content by permalink."""
76 | # Create a test file
77 | content = "# Test Content\n\nThis is a test file."
78 | test_file = Path(project_config.home) / "test" / "test.md"
79 | test_file.parent.mkdir(parents=True, exist_ok=True)
80 | test_file.write_text(content)
81 |
82 | # Create entity referencing the file
83 | entity = await entity_repository.create(
84 | {
85 | "title": "Test Entity",
86 | "entity_type": "test",
87 | "permalink": "test/test",
88 | "file_path": "test/test.md", # Relative to config.home
89 | "content_type": "text/markdown",
90 | "created_at": datetime.now(timezone.utc),
91 | "updated_at": datetime.now(timezone.utc),
92 | }
93 | )
94 |
95 | # Test getting the content
96 | response = await client.get(f"{project_url}/resource/{entity.title}")
97 | assert response.status_code == 200
98 |
99 |
100 | @pytest.mark.asyncio
101 | async def test_get_resource_missing_entity(client, project_url):
102 | """Test 404 when entity doesn't exist."""
103 | response = await client.get(f"{project_url}/resource/does/not/exist")
104 | assert response.status_code == 404
105 | assert "Resource not found" in response.json()["detail"]
106 |
107 |
108 | @pytest.mark.asyncio
109 | async def test_get_resource_missing_file(client, project_config, entity_repository, project_url):
110 | """Test 404 when file doesn't exist."""
111 | # Create entity referencing non-existent file
112 | entity = await entity_repository.create(
113 | {
114 | "title": "Missing File",
115 | "entity_type": "test",
116 | "permalink": "test/missing",
117 | "file_path": "test/missing.md",
118 | "content_type": "text/markdown",
119 | "created_at": datetime.now(timezone.utc),
120 | "updated_at": datetime.now(timezone.utc),
121 | }
122 | )
123 |
124 | response = await client.get(f"{project_url}/resource/{entity.permalink}")
125 | assert response.status_code == 404
126 | assert "File not found" in response.json()["detail"]
127 |
128 |
129 | @pytest.mark.asyncio
130 | async def test_get_resource_observation(client, project_config, entity_repository, project_url):
131 | """Test getting content by observation permalink."""
132 | # Create entity
133 | content = "# Test Content\n\n- [note] an observation."
134 | data = {
135 | "title": "Test Entity",
136 | "folder": "test",
137 | "entity_type": "test",
138 | "content": f"{content}",
139 | }
140 | response = await client.post(f"{project_url}/knowledge/entities", json=data)
141 | entity_response = response.json()
142 | entity = EntityResponse(**entity_response)
143 |
144 | assert len(entity.observations) == 1
145 | observation = entity.observations[0]
146 |
147 | # Test getting the content via the observation
148 | response = await client.get(f"{project_url}/resource/{observation.permalink}")
149 | assert response.status_code == 200
150 | assert response.headers["content-type"] == "text/markdown; charset=utf-8"
151 | assert (
152 | normalize_newlines(
153 | """
154 | ---
155 | title: Test Entity
156 | type: test
157 | permalink: test/test-entity
158 | ---
159 |
160 | # Test Content
161 |
162 | - [note] an observation.
163 | """.strip()
164 | )
165 | in response.text
166 | )
167 |
168 |
169 | @pytest.mark.asyncio
170 | async def test_get_resource_entities(client, project_config, entity_repository, project_url):
171 | """Test getting content by permalink match."""
172 | # Create entity
173 | content1 = "# Test Content\n"
174 | data = {
175 | "title": "Test Entity",
176 | "folder": "test",
177 | "entity_type": "test",
178 | "content": f"{content1}",
179 | }
180 | response = await client.post(f"{project_url}/knowledge/entities", json=data)
181 | entity_response = response.json()
182 | entity1 = EntityResponse(**entity_response)
183 |
184 | content2 = "# Related Content\n- links to [[Test Entity]]"
185 | data = {
186 | "title": "Related Entity",
187 | "folder": "test",
188 | "entity_type": "test",
189 | "content": f"{content2}",
190 | }
191 | response = await client.post(f"{project_url}/knowledge/entities", json=data)
192 | entity_response = response.json()
193 | entity2 = EntityResponse(**entity_response)
194 |
195 | assert len(entity2.relations) == 1
196 |
197 | # Test getting the content via the relation
198 | response = await client.get(f"{project_url}/resource/test/*")
199 | assert response.status_code == 200
200 | assert response.headers["content-type"] == "text/markdown; charset=utf-8"
201 | assert (
202 | normalize_newlines(
203 | f"""
204 | --- memory://test/test-entity {entity1.updated_at.isoformat()} {entity1.checksum[:8]}
205 |
206 | # Test Content
207 |
208 | --- memory://test/related-entity {entity2.updated_at.isoformat()} {entity2.checksum[:8]}
209 |
210 | # Related Content
211 | - links to [[Test Entity]]
212 |
213 | """.strip()
214 | )
215 | in response.text
216 | )
217 |
218 |
219 | @pytest.mark.asyncio
220 | async def test_get_resource_entities_pagination(
221 | client, project_config, entity_repository, project_url
222 | ):
223 | """Test getting content by permalink match."""
224 | # Create entity
225 | content1 = "# Test Content\n"
226 | data = {
227 | "title": "Test Entity",
228 | "folder": "test",
229 | "entity_type": "test",
230 | "content": f"{content1}",
231 | }
232 | response = await client.post(f"{project_url}/knowledge/entities", json=data)
233 | entity_response = response.json()
234 | entity1 = EntityResponse(**entity_response)
235 | assert entity1
236 |
237 | content2 = "# Related Content\n- links to [[Test Entity]]"
238 | data = {
239 | "title": "Related Entity",
240 | "folder": "test",
241 | "entity_type": "test",
242 | "content": f"{content2}",
243 | }
244 | response = await client.post(f"{project_url}/knowledge/entities", json=data)
245 | entity_response = response.json()
246 | entity2 = EntityResponse(**entity_response)
247 |
248 | assert len(entity2.relations) == 1
249 |
250 | # Test getting second result
251 | response = await client.get(
252 | f"{project_url}/resource/test/*", params={"page": 2, "page_size": 1}
253 | )
254 | assert response.status_code == 200
255 | assert response.headers["content-type"] == "text/markdown; charset=utf-8"
256 | assert (
257 | normalize_newlines(
258 | """
259 | ---
260 | title: Related Entity
261 | type: test
262 | permalink: test/related-entity
263 | ---
264 |
265 | # Related Content
266 | - links to [[Test Entity]]
267 | """.strip()
268 | )
269 | in response.text
270 | )
271 |
272 |
273 | @pytest.mark.asyncio
274 | async def test_get_resource_relation(client, project_config, entity_repository, project_url):
275 | """Test getting content by relation permalink."""
276 | # Create entity
277 | content1 = "# Test Content\n"
278 | data = {
279 | "title": "Test Entity",
280 | "folder": "test",
281 | "entity_type": "test",
282 | "content": f"{content1}",
283 | }
284 | response = await client.post(f"{project_url}/knowledge/entities", json=data)
285 | entity_response = response.json()
286 | entity1 = EntityResponse(**entity_response)
287 |
288 | content2 = "# Related Content\n- links to [[Test Entity]]"
289 | data = {
290 | "title": "Related Entity",
291 | "folder": "test",
292 | "entity_type": "test",
293 | "content": f"{content2}",
294 | }
295 | response = await client.post(f"{project_url}/knowledge/entities", json=data)
296 | entity_response = response.json()
297 | entity2 = EntityResponse(**entity_response)
298 |
299 | assert len(entity2.relations) == 1
300 | relation = entity2.relations[0]
301 |
302 | # Test getting the content via the relation
303 | response = await client.get(f"{project_url}/resource/{relation.permalink}")
304 | assert response.status_code == 200
305 | assert response.headers["content-type"] == "text/markdown; charset=utf-8"
306 | assert (
307 | normalize_newlines(
308 | f"""
309 | --- memory://test/test-entity {entity1.updated_at.isoformat()} {entity1.checksum[:8]}
310 |
311 | # Test Content
312 |
313 | --- memory://test/related-entity {entity2.updated_at.isoformat()} {entity2.checksum[:8]}
314 |
315 | # Related Content
316 | - links to [[Test Entity]]
317 |
318 | """.strip()
319 | )
320 | in response.text
321 | )
322 |
323 |
324 | @pytest.mark.asyncio
325 | async def test_put_resource_new_file(
326 | client, project_config, entity_repository, search_repository, project_url
327 | ):
328 | """Test creating a new file via PUT."""
329 | # Test data
330 | file_path = "visualizations/test.canvas"
331 | canvas_data = {
332 | "nodes": [
333 | {
334 | "id": "node1",
335 | "type": "text",
336 | "text": "Test node content",
337 | "x": 100,
338 | "y": 200,
339 | "width": 400,
340 | "height": 300,
341 | }
342 | ],
343 | "edges": [],
344 | }
345 |
346 | # Make sure the file doesn't exist yet
347 | full_path = Path(project_config.home) / file_path
348 | if full_path.exists():
349 | full_path.unlink()
350 |
351 | # Execute PUT request
352 | response = await client.put(
353 | f"{project_url}/resource/{file_path}", json=json.dumps(canvas_data, indent=2)
354 | )
355 |
356 | # Verify response
357 | assert response.status_code == 201
358 | response_data = response.json()
359 | assert response_data["file_path"] == file_path
360 | assert "checksum" in response_data
361 | assert "size" in response_data
362 |
363 | # Verify file was created
364 | full_path = Path(project_config.home) / file_path
365 | assert full_path.exists()
366 |
367 | # Verify file content
368 | file_content = full_path.read_text(encoding="utf-8")
369 | assert json.loads(file_content) == canvas_data
370 |
371 | # Verify entity was created in DB
372 | entity = await entity_repository.get_by_file_path(file_path)
373 | assert entity is not None
374 | assert entity.entity_type == "canvas"
375 | assert entity.content_type == "application/json"
376 |
377 | # Verify entity was indexed for search
378 | search_results = await search_repository.search(title="test.canvas")
379 | assert len(search_results) > 0
380 |
381 |
382 | @pytest.mark.asyncio
383 | async def test_put_resource_update_existing(client, project_config, entity_repository, project_url):
384 | """Test updating an existing file via PUT."""
385 | # Create an initial file and entity
386 | file_path = "visualizations/update-test.canvas"
387 | full_path = Path(project_config.home) / file_path
388 | full_path.parent.mkdir(parents=True, exist_ok=True)
389 |
390 | initial_data = {
391 | "nodes": [
392 | {
393 | "id": "initial",
394 | "type": "text",
395 | "text": "Initial content",
396 | "x": 0,
397 | "y": 0,
398 | "width": 200,
399 | "height": 100,
400 | }
401 | ],
402 | "edges": [],
403 | }
404 | full_path.write_text(json.dumps(initial_data))
405 |
406 | # Create the initial entity
407 | initial_entity = await entity_repository.create(
408 | {
409 | "title": "update-test.canvas",
410 | "entity_type": "canvas",
411 | "file_path": file_path,
412 | "content_type": "application/json",
413 | "checksum": "initial123",
414 | "created_at": datetime.now(timezone.utc),
415 | "updated_at": datetime.now(timezone.utc),
416 | }
417 | )
418 |
419 | # New data for update
420 | updated_data = {
421 | "nodes": [
422 | {
423 | "id": "updated",
424 | "type": "text",
425 | "text": "Updated content",
426 | "x": 100,
427 | "y": 100,
428 | "width": 300,
429 | "height": 200,
430 | }
431 | ],
432 | "edges": [],
433 | }
434 |
435 | # Execute PUT request to update
436 | response = await client.put(
437 | f"{project_url}/resource/{file_path}", json=json.dumps(updated_data, indent=2)
438 | )
439 |
440 | # Verify response
441 | assert response.status_code == 200
442 |
443 | # Verify file was updated
444 | updated_content = full_path.read_text(encoding="utf-8")
445 | assert json.loads(updated_content) == updated_data
446 |
447 | # Verify entity was updated
448 | updated_entity = await entity_repository.get_by_file_path(file_path)
449 | assert updated_entity.id == initial_entity.id # Same entity, updated
450 | assert updated_entity.checksum != initial_entity.checksum # Checksum changed
451 |
```
--------------------------------------------------------------------------------
/tests/mcp/test_permalink_collision_file_overwrite.py:
--------------------------------------------------------------------------------
```python
1 | """Tests for permalink collision file overwrite bug discovered in live testing.
2 |
3 | This test reproduces a critical data loss bug where creating notes with
4 | titles that normalize to different permalinks but resolve to the same
5 | file location causes silent file overwrites without warning.
6 |
7 | Related to GitHub Issue #139 but tests a different aspect - not database
8 | UNIQUE constraints, but actual file overwrite behavior.
9 |
10 | Example scenario from live testing:
11 | 1. Create "Node A" → file: edge-cases/Node A.md, permalink: edge-cases/node-a
12 | 2. Create "Node C" → file: edge-cases/Node C.md, permalink: edge-cases/node-c
13 | 3. BUG: Node C creation overwrites edge-cases/Node A.md file content
14 | 4. Result: File "Node A.md" exists but contains "Node C" content
15 | """
16 |
17 | import pytest
18 | from pathlib import Path
19 | from textwrap import dedent
20 |
21 | from basic_memory.mcp.tools import write_note, read_note
22 | from basic_memory.sync.sync_service import SyncService
23 | from basic_memory.config import ProjectConfig
24 | from basic_memory.services import EntityService
25 |
26 |
27 | async def force_full_scan(sync_service: SyncService) -> None:
28 | """Force next sync to do a full scan by clearing watermark (for testing moves/deletions)."""
29 | if sync_service.entity_repository.project_id is not None:
30 | project = await sync_service.project_repository.find_by_id(
31 | sync_service.entity_repository.project_id
32 | )
33 | if project:
34 | await sync_service.project_repository.update(
35 | project.id,
36 | {
37 | "last_scan_timestamp": None,
38 | "last_file_count": None,
39 | },
40 | )
41 |
42 |
43 | @pytest.mark.asyncio
44 | async def test_permalink_collision_should_not_overwrite_different_file(app, test_project):
45 | """Test that creating notes with different titles doesn't overwrite existing files.
46 |
47 | This test reproduces the critical bug discovered in Phase 4 of live testing where:
48 | - Creating "Node A" worked fine
49 | - Creating "Node C" silently overwrote Node A.md's content
50 | - No warning or error was shown to the user
51 | - Original Node A content was permanently lost
52 |
53 | Expected behavior:
54 | - Each note with a different title should create/update its own file
55 | - No silent overwrites should occur
56 | - Files should maintain their distinct content
57 |
58 | Current behavior (BUG):
59 | - Second note creation sometimes overwrites first note's file
60 | - File "Node A.md" contains "Node C" content after creating Node C
61 | - Data loss occurs without user warning
62 | """
63 | # Step 1: Create first note "Node A"
64 | result_a = await write_note.fn(
65 | project=test_project.name,
66 | title="Node A",
67 | folder="edge-cases",
68 | content="# Node A\n\nOriginal content for Node A\n\n## Relations\n- links_to [[Node B]]",
69 | )
70 |
71 | assert "# Created note" in result_a
72 | assert "file_path: edge-cases/Node A.md" in result_a
73 | assert "permalink: edge-cases/node-a" in result_a
74 |
75 | # Verify Node A content via read
76 | content_a = await read_note.fn("edge-cases/node-a", project=test_project.name)
77 | assert "Node A" in content_a
78 | assert "Original content for Node A" in content_a
79 |
80 | # Step 2: Create second note "Node B" (should be independent)
81 | result_b = await write_note.fn(
82 | project=test_project.name,
83 | title="Node B",
84 | folder="edge-cases",
85 | content="# Node B\n\nContent for Node B",
86 | )
87 |
88 | assert "# Created note" in result_b
89 | assert "file_path: edge-cases/Node B.md" in result_b
90 | assert "permalink: edge-cases/node-b" in result_b
91 |
92 | # Step 3: Create third note "Node C" (this is where the bug occurs)
93 | result_c = await write_note.fn(
94 | project=test_project.name,
95 | title="Node C",
96 | folder="edge-cases",
97 | content="# Node C\n\nContent for Node C\n\n## Relations\n- links_to [[Node A]]",
98 | )
99 |
100 | assert "# Created note" in result_c
101 | assert "file_path: edge-cases/Node C.md" in result_c
102 | assert "permalink: edge-cases/node-c" in result_c
103 |
104 | # CRITICAL CHECK: Verify Node A still has its original content
105 | # This is where the bug manifests - Node A.md gets overwritten with Node C content
106 | content_a_after = await read_note.fn("edge-cases/node-a", project=test_project.name)
107 | assert "Node A" in content_a_after, "Node A title should still be 'Node A'"
108 | assert "Original content for Node A" in content_a_after, (
109 | "Node A file should NOT be overwritten by Node C creation"
110 | )
111 | assert "Content for Node C" not in content_a_after, "Node A should NOT contain Node C's content"
112 |
113 | # Verify Node C has its own content
114 | content_c = await read_note.fn("edge-cases/node-c", project=test_project.name)
115 | assert "Node C" in content_c
116 | assert "Content for Node C" in content_c
117 | assert "Original content for Node A" not in content_c, (
118 | "Node C should not contain Node A's content"
119 | )
120 |
121 | # Verify files physically exist with correct content
122 | project_path = Path(test_project.path)
123 | node_a_file = project_path / "edge-cases" / "Node A.md"
124 | node_c_file = project_path / "edge-cases" / "Node C.md"
125 |
126 | assert node_a_file.exists(), "Node A.md file should exist"
127 | assert node_c_file.exists(), "Node C.md file should exist"
128 |
129 | # Read actual file contents to verify no overwrite occurred
130 | node_a_file_content = node_a_file.read_text()
131 | node_c_file_content = node_c_file.read_text()
132 |
133 | assert "Node A" in node_a_file_content, "Physical file Node A.md should contain Node A title"
134 | assert "Original content for Node A" in node_a_file_content, (
135 | "Physical file Node A.md should contain original Node A content"
136 | )
137 | assert "Content for Node C" not in node_a_file_content, (
138 | "Physical file Node A.md should NOT contain Node C content"
139 | )
140 |
141 | assert "Node C" in node_c_file_content, "Physical file Node C.md should contain Node C title"
142 | assert "Content for Node C" in node_c_file_content, (
143 | "Physical file Node C.md should contain Node C content"
144 | )
145 |
146 |
147 | @pytest.mark.asyncio
148 | async def test_notes_with_similar_titles_maintain_separate_files(app, test_project):
149 | """Test that notes with similar titles that normalize differently don't collide.
150 |
151 | Tests additional edge cases around permalink normalization to ensure
152 | we don't have collision issues with various title patterns.
153 | """
154 | # Create notes with titles that could potentially cause issues
155 | titles_and_folders = [
156 | ("My Note", "test"),
157 | ("My-Note", "test"), # Different title, similar permalink
158 | ("My_Note", "test"), # Underscore vs hyphen
159 | ("my note", "test"), # Case variation
160 | ]
161 |
162 | created_permalinks = []
163 |
164 | for title, folder in titles_and_folders:
165 | result = await write_note.fn(
166 | project=test_project.name,
167 | title=title,
168 | folder=folder,
169 | content=f"# {title}\n\nUnique content for {title}",
170 | )
171 |
172 | permalink = None
173 | # Extract permalink from result
174 | for line in result.split("\n"):
175 | if line.startswith("permalink:"):
176 | permalink = line.split(":", 1)[1].strip()
177 | created_permalinks.append((title, permalink))
178 | break
179 |
180 | # Verify each note can be read back with its own content
181 | content = await read_note.fn(permalink, project=test_project.name)
182 | assert f"Unique content for {title}" in content, (
183 | f"Note with title '{title}' should maintain its unique content"
184 | )
185 |
186 | # Verify all created permalinks are tracked
187 | assert len(created_permalinks) == len(titles_and_folders), (
188 | "All notes should be created successfully"
189 | )
190 |
191 |
192 | @pytest.mark.asyncio
193 | async def test_sequential_note_creation_preserves_all_files(app, test_project):
194 | """Test that rapid sequential note creation doesn't cause file overwrites.
195 |
196 | This test creates multiple notes in sequence to ensure that file
197 | creation/update logic doesn't have race conditions or state issues
198 | that could cause overwrites.
199 | """
200 | notes_data = [
201 | ("Alpha", "# Alpha\n\nAlpha content"),
202 | ("Beta", "# Beta\n\nBeta content"),
203 | ("Gamma", "# Gamma\n\nGamma content"),
204 | ("Delta", "# Delta\n\nDelta content"),
205 | ("Epsilon", "# Epsilon\n\nEpsilon content"),
206 | ]
207 |
208 | # Create all notes
209 | for title, content in notes_data:
210 | result = await write_note.fn(
211 | project=test_project.name,
212 | title=title,
213 | folder="sequence-test",
214 | content=content,
215 | )
216 | assert "# Created note" in result or "# Updated note" in result
217 |
218 | # Verify all notes still exist with correct content
219 | for title, expected_content in notes_data:
220 | # Normalize title to permalink format
221 | permalink = f"sequence-test/{title.lower()}"
222 | content = await read_note.fn(permalink, project=test_project.name)
223 |
224 | assert title in content, f"Note '{title}' should still have its title"
225 | assert expected_content.split("\n\n")[1] in content, (
226 | f"Note '{title}' should still have its original content"
227 | )
228 |
229 | # Verify physical files exist
230 | project_path = Path(test_project.path)
231 | sequence_dir = project_path / "sequence-test"
232 |
233 | for title, _ in notes_data:
234 | file_path = sequence_dir / f"{title}.md"
235 | assert file_path.exists(), f"File for '{title}' should exist"
236 |
237 | file_content = file_path.read_text()
238 | assert title in file_content, f"Physical file for '{title}' should contain correct title"
239 |
240 |
241 | @pytest.mark.asyncio
242 | async def test_sync_permalink_collision_file_overwrite_bug(
243 | sync_service: SyncService,
244 | project_config: ProjectConfig,
245 | entity_service: EntityService,
246 | ):
247 | """Test that reproduces the permalink collision file overwrite bug via sync.
248 |
249 | This test directly creates files and runs sync to reproduce the exact bug
250 | discovered in live testing where Node C overwrote Node A.md.
251 |
252 | The bug occurs when:
253 | 1. File "Node A.md" exists with permalink "edge-cases/node-a"
254 | 2. File "Node C.md" is created with permalink "edge-cases/node-c"
255 | 3. During sync, somehow Node C content overwrites Node A.md
256 | 4. Result: File "Node A.md" contains Node C content (data loss!)
257 | """
258 | project_dir = project_config.home
259 | edge_cases_dir = project_dir / "edge-cases"
260 | edge_cases_dir.mkdir(parents=True, exist_ok=True)
261 |
262 | # Step 1: Create Node A file
263 | node_a_content = dedent("""
264 | ---
265 | title: Node A
266 | type: note
267 | tags:
268 | - circular-test
269 | ---
270 |
271 | # Node A
272 |
273 | Original content for Node A
274 |
275 | ## Relations
276 | - links_to [[Node B]]
277 | - references [[Node C]]
278 | """).strip()
279 |
280 | node_a_file = edge_cases_dir / "Node A.md"
281 | node_a_file.write_text(node_a_content)
282 |
283 | # Sync to create Node A in database
284 | await sync_service.sync(project_dir)
285 |
286 | # Verify Node A is in database
287 | node_a = await entity_service.get_by_permalink("edge-cases/node-a")
288 | assert node_a is not None
289 | assert node_a.title == "Node A"
290 |
291 | # Verify Node A file has correct content
292 | assert node_a_file.exists()
293 | node_a_file_content = node_a_file.read_text()
294 | assert "title: Node A" in node_a_file_content
295 | assert "Original content for Node A" in node_a_file_content
296 |
297 | # Step 2: Create Node B file
298 | node_b_content = dedent("""
299 | ---
300 | title: Node B
301 | type: note
302 | tags:
303 | - circular-test
304 | ---
305 |
306 | # Node B
307 |
308 | Content for Node B
309 |
310 | ## Relations
311 | - links_to [[Node C]]
312 | - part_of [[Node A]]
313 | """).strip()
314 |
315 | node_b_file = edge_cases_dir / "Node B.md"
316 | node_b_file.write_text(node_b_content)
317 |
318 | # Force full scan to detect the new file
319 | # (file just created may not be newer than watermark due to timing precision)
320 | await force_full_scan(sync_service)
321 |
322 | # Sync to create Node B
323 | await sync_service.sync(project_dir)
324 |
325 | # Step 3: Create Node C file (this is where the bug might occur)
326 | node_c_content = dedent("""
327 | ---
328 | title: Node C
329 | type: note
330 | tags:
331 | - circular-test
332 | ---
333 |
334 | # Node C
335 |
336 | Content for Node C
337 |
338 | ## Relations
339 | - links_to [[Node A]]
340 | - references [[Node B]]
341 | """).strip()
342 |
343 | node_c_file = edge_cases_dir / "Node C.md"
344 | node_c_file.write_text(node_c_content)
345 |
346 | # Force full scan to detect the new file
347 | # (file just created may not be newer than watermark due to timing precision)
348 | await force_full_scan(sync_service)
349 |
350 | # Sync to create Node C - THIS IS WHERE THE BUG OCCURS
351 | await sync_service.sync(project_dir)
352 |
353 | # CRITICAL VERIFICATION: Check if Node A file was overwritten
354 | assert node_a_file.exists(), "Node A.md file should still exist"
355 |
356 | # Read Node A file content to check for overwrite bug
357 | node_a_after_sync = node_a_file.read_text()
358 |
359 | # The bug: Node A.md contains Node C content instead of Node A content
360 | assert "title: Node A" in node_a_after_sync, (
361 | "Node A.md file should still have title: Node A in frontmatter"
362 | )
363 | assert "Node A" in node_a_after_sync, "Node A.md file should still contain 'Node A' title"
364 | assert "Original content for Node A" in node_a_after_sync, (
365 | f"Node A.md file should NOT be overwritten! Content: {node_a_after_sync[:200]}"
366 | )
367 | assert "Content for Node C" not in node_a_after_sync, (
368 | f"Node A.md should NOT contain Node C content! Content: {node_a_after_sync[:200]}"
369 | )
370 |
371 | # Verify Node C file exists with correct content
372 | assert node_c_file.exists(), "Node C.md file should exist"
373 | node_c_after_sync = node_c_file.read_text()
374 | assert "Node C" in node_c_after_sync
375 | assert "Content for Node C" in node_c_after_sync
376 |
377 | # Verify database has both entities correctly
378 | node_a_db = await entity_service.get_by_permalink("edge-cases/node-a")
379 | node_c_db = await entity_service.get_by_permalink("edge-cases/node-c")
380 |
381 | assert node_a_db is not None, "Node A should exist in database"
382 | assert node_a_db.title == "Node A", "Node A database entry should have correct title"
383 |
384 | assert node_c_db is not None, "Node C should exist in database"
385 | assert node_c_db.title == "Node C", "Node C database entry should have correct title"
386 |
```
--------------------------------------------------------------------------------
/src/basic_memory/api/routers/project_router.py:
--------------------------------------------------------------------------------
```python
1 | """Router for project management."""
2 |
3 | import os
4 | from fastapi import APIRouter, HTTPException, Path, Body, BackgroundTasks, Response, Query
5 | from typing import Optional
6 | from loguru import logger
7 |
8 | from basic_memory.deps import (
9 | ProjectConfigDep,
10 | ProjectServiceDep,
11 | ProjectPathDep,
12 | SyncServiceDep,
13 | )
14 | from basic_memory.schemas import ProjectInfoResponse, SyncReportResponse
15 | from basic_memory.schemas.project_info import (
16 | ProjectList,
17 | ProjectItem,
18 | ProjectInfoRequest,
19 | ProjectStatusResponse,
20 | )
21 | from basic_memory.utils import normalize_project_path
22 |
23 | # Router for resources in a specific project
24 | # The ProjectPathDep is used in the path as a prefix, so the request path is like /{project}/project/info
25 | project_router = APIRouter(prefix="/project", tags=["project"])
26 |
27 | # Router for managing project resources
28 | project_resource_router = APIRouter(prefix="/projects", tags=["project_management"])
29 |
30 |
31 | @project_router.get("/info", response_model=ProjectInfoResponse)
32 | async def get_project_info(
33 | project_service: ProjectServiceDep,
34 | project: ProjectPathDep,
35 | ) -> ProjectInfoResponse:
36 | """Get comprehensive information about the specified Basic Memory project."""
37 | return await project_service.get_project_info(project)
38 |
39 |
40 | @project_router.get("/item", response_model=ProjectItem)
41 | async def get_project(
42 | project_service: ProjectServiceDep,
43 | project: ProjectPathDep,
44 | ) -> ProjectItem:
45 | """Get bassic info about the specified Basic Memory project."""
46 | found_project = await project_service.get_project(project)
47 | if not found_project:
48 | raise HTTPException(
49 | status_code=404, detail=f"Project: '{project}' does not exist"
50 | ) # pragma: no cover
51 |
52 | return ProjectItem(
53 | name=found_project.name,
54 | path=normalize_project_path(found_project.path),
55 | is_default=found_project.is_default or False,
56 | )
57 |
58 |
59 | # Update a project
60 | @project_router.patch("/{name}", response_model=ProjectStatusResponse)
61 | async def update_project(
62 | project_service: ProjectServiceDep,
63 | name: str = Path(..., description="Name of the project to update"),
64 | path: Optional[str] = Body(None, description="New absolute path for the project"),
65 | is_active: Optional[bool] = Body(None, description="Status of the project (active/inactive)"),
66 | ) -> ProjectStatusResponse:
67 | """Update a project's information in configuration and database.
68 |
69 | Args:
70 | name: The name of the project to update
71 | path: Optional new absolute path for the project
72 | is_active: Optional status update for the project
73 |
74 | Returns:
75 | Response confirming the project was updated
76 | """
77 | try:
78 | # Validate that path is absolute if provided
79 | if path and not os.path.isabs(path):
80 | raise HTTPException(status_code=400, detail="Path must be absolute")
81 |
82 | # Get original project info for the response
83 | old_project_info = ProjectItem(
84 | name=name,
85 | path=project_service.projects.get(name, ""),
86 | )
87 |
88 | if path:
89 | await project_service.move_project(name, path)
90 | elif is_active is not None:
91 | await project_service.update_project(name, is_active=is_active)
92 |
93 | # Get updated project info
94 | updated_path = path if path else project_service.projects.get(name, "")
95 |
96 | return ProjectStatusResponse(
97 | message=f"Project '{name}' updated successfully",
98 | status="success",
99 | default=(name == project_service.default_project),
100 | old_project=old_project_info,
101 | new_project=ProjectItem(name=name, path=updated_path),
102 | )
103 | except ValueError as e:
104 | raise HTTPException(status_code=400, detail=str(e))
105 |
106 |
107 | # Sync project filesystem
108 | @project_router.post("/sync")
109 | async def sync_project(
110 | background_tasks: BackgroundTasks,
111 | sync_service: SyncServiceDep,
112 | project_config: ProjectConfigDep,
113 | force_full: bool = Query(
114 | False, description="Force full scan, bypassing watermark optimization"
115 | ),
116 | ):
117 | """Force project filesystem sync to database.
118 |
119 | Scans the project directory and updates the database with any new or modified files.
120 |
121 | Args:
122 | background_tasks: FastAPI background tasks
123 | sync_service: Sync service for this project
124 | project_config: Project configuration
125 | force_full: If True, force a full scan even if watermark exists
126 |
127 | Returns:
128 | Response confirming sync was initiated
129 | """
130 | background_tasks.add_task(
131 | sync_service.sync, project_config.home, project_config.name, force_full=force_full
132 | )
133 | logger.info(
134 | f"Filesystem sync initiated for project: {project_config.name} (force_full={force_full})"
135 | )
136 |
137 | return {
138 | "status": "sync_started",
139 | "message": f"Filesystem sync initiated for project '{project_config.name}'",
140 | }
141 |
142 |
143 | @project_router.post("/status", response_model=SyncReportResponse)
144 | async def project_sync_status(
145 | sync_service: SyncServiceDep,
146 | project_config: ProjectConfigDep,
147 | ) -> SyncReportResponse:
148 | """Scan directory for changes compared to database state.
149 |
150 | Args:
151 | sync_service: Sync service for this project
152 | project_config: Project configuration
153 |
154 | Returns:
155 | Scan report with details on files that need syncing
156 | """
157 | logger.info(f"Scanning filesystem for project: {project_config.name}")
158 | sync_report = await sync_service.scan(project_config.home)
159 |
160 | return SyncReportResponse.from_sync_report(sync_report)
161 |
162 |
163 | # List all available projects
164 | @project_resource_router.get("/projects", response_model=ProjectList)
165 | async def list_projects(
166 | project_service: ProjectServiceDep,
167 | ) -> ProjectList:
168 | """List all configured projects.
169 |
170 | Returns:
171 | A list of all projects with metadata
172 | """
173 | projects = await project_service.list_projects()
174 | default_project = project_service.default_project
175 |
176 | project_items = [
177 | ProjectItem(
178 | name=project.name,
179 | path=normalize_project_path(project.path),
180 | is_default=project.is_default or False,
181 | )
182 | for project in projects
183 | ]
184 |
185 | return ProjectList(
186 | projects=project_items,
187 | default_project=default_project,
188 | )
189 |
190 |
191 | # Add a new project
192 | @project_resource_router.post("/projects", response_model=ProjectStatusResponse, status_code=201)
193 | async def add_project(
194 | response: Response,
195 | project_data: ProjectInfoRequest,
196 | project_service: ProjectServiceDep,
197 | ) -> ProjectStatusResponse:
198 | """Add a new project to configuration and database.
199 |
200 | Args:
201 | project_data: The project name and path, with option to set as default
202 |
203 | Returns:
204 | Response confirming the project was added
205 | """
206 | # Check if project already exists before attempting to add
207 | existing_project = await project_service.get_project(project_data.name)
208 | if existing_project:
209 | # Project exists - check if paths match for true idempotency
210 | # Normalize paths for comparison (resolve symlinks, etc.)
211 | from pathlib import Path
212 |
213 | requested_path = Path(project_data.path).resolve()
214 | existing_path = Path(existing_project.path).resolve()
215 |
216 | if requested_path == existing_path:
217 | # Same name, same path - return 200 OK (idempotent)
218 | response.status_code = 200
219 | return ProjectStatusResponse( # pyright: ignore [reportCallIssue]
220 | message=f"Project '{project_data.name}' already exists",
221 | status="success",
222 | default=existing_project.is_default or False,
223 | new_project=ProjectItem(
224 | name=existing_project.name,
225 | path=existing_project.path,
226 | is_default=existing_project.is_default or False,
227 | ),
228 | )
229 | else:
230 | # Same name, different path - this is an error
231 | raise HTTPException(
232 | status_code=400,
233 | detail=f"Project '{project_data.name}' already exists with different path. Existing: {existing_project.path}, Requested: {project_data.path}",
234 | )
235 |
236 | try: # pragma: no cover
237 | # The service layer now handles cloud mode validation and path sanitization
238 | await project_service.add_project(
239 | project_data.name, project_data.path, set_default=project_data.set_default
240 | )
241 |
242 | return ProjectStatusResponse( # pyright: ignore [reportCallIssue]
243 | message=f"Project '{project_data.name}' added successfully",
244 | status="success",
245 | default=project_data.set_default,
246 | new_project=ProjectItem(
247 | name=project_data.name, path=project_data.path, is_default=project_data.set_default
248 | ),
249 | )
250 | except ValueError as e: # pragma: no cover
251 | raise HTTPException(status_code=400, detail=str(e))
252 |
253 |
254 | # Remove a project
255 | @project_resource_router.delete("/{name}", response_model=ProjectStatusResponse)
256 | async def remove_project(
257 | project_service: ProjectServiceDep,
258 | name: str = Path(..., description="Name of the project to remove"),
259 | delete_notes: bool = Query(
260 | False, description="If True, delete project directory from filesystem"
261 | ),
262 | ) -> ProjectStatusResponse:
263 | """Remove a project from configuration and database.
264 |
265 | Args:
266 | name: The name of the project to remove
267 | delete_notes: If True, delete the project directory from the filesystem
268 |
269 | Returns:
270 | Response confirming the project was removed
271 | """
272 | try:
273 | old_project = await project_service.get_project(name)
274 | if not old_project: # pragma: no cover
275 | raise HTTPException(
276 | status_code=404, detail=f"Project: '{name}' does not exist"
277 | ) # pragma: no cover
278 |
279 | # Check if trying to delete the default project
280 | if name == project_service.default_project:
281 | available_projects = await project_service.list_projects()
282 | other_projects = [p.name for p in available_projects if p.name != name]
283 | detail = f"Cannot delete default project '{name}'. "
284 | if other_projects:
285 | detail += (
286 | f"Set another project as default first. Available: {', '.join(other_projects)}"
287 | )
288 | else:
289 | detail += "This is the only project in your configuration."
290 | raise HTTPException(status_code=400, detail=detail)
291 |
292 | await project_service.remove_project(name, delete_notes=delete_notes)
293 |
294 | return ProjectStatusResponse(
295 | message=f"Project '{name}' removed successfully",
296 | status="success",
297 | default=False,
298 | old_project=ProjectItem(name=old_project.name, path=old_project.path),
299 | new_project=None,
300 | )
301 | except ValueError as e: # pragma: no cover
302 | raise HTTPException(status_code=400, detail=str(e))
303 |
304 |
305 | # Set a project as default
306 | @project_resource_router.put("/{name}/default", response_model=ProjectStatusResponse)
307 | async def set_default_project(
308 | project_service: ProjectServiceDep,
309 | name: str = Path(..., description="Name of the project to set as default"),
310 | ) -> ProjectStatusResponse:
311 | """Set a project as the default project.
312 |
313 | Args:
314 | name: The name of the project to set as default
315 |
316 | Returns:
317 | Response confirming the project was set as default
318 | """
319 | try:
320 | # Get the old default project
321 | default_name = project_service.default_project
322 | default_project = await project_service.get_project(default_name)
323 | if not default_project: # pragma: no cover
324 | raise HTTPException( # pragma: no cover
325 | status_code=404, detail=f"Default Project: '{default_name}' does not exist"
326 | )
327 |
328 | # get the new project
329 | new_default_project = await project_service.get_project(name)
330 | if not new_default_project: # pragma: no cover
331 | raise HTTPException(
332 | status_code=404, detail=f"Project: '{name}' does not exist"
333 | ) # pragma: no cover
334 |
335 | await project_service.set_default_project(name)
336 |
337 | return ProjectStatusResponse(
338 | message=f"Project '{name}' set as default successfully",
339 | status="success",
340 | default=True,
341 | old_project=ProjectItem(name=default_name, path=default_project.path),
342 | new_project=ProjectItem(
343 | name=name,
344 | path=new_default_project.path,
345 | is_default=True,
346 | ),
347 | )
348 | except ValueError as e: # pragma: no cover
349 | raise HTTPException(status_code=400, detail=str(e))
350 |
351 |
352 | # Get the default project
353 | @project_resource_router.get("/default", response_model=ProjectItem)
354 | async def get_default_project(
355 | project_service: ProjectServiceDep,
356 | ) -> ProjectItem:
357 | """Get the default project.
358 |
359 | Returns:
360 | Response with project default information
361 | """
362 | # Get the old default project
363 | default_name = project_service.default_project
364 | default_project = await project_service.get_project(default_name)
365 | if not default_project: # pragma: no cover
366 | raise HTTPException( # pragma: no cover
367 | status_code=404, detail=f"Default Project: '{default_name}' does not exist"
368 | )
369 |
370 | return ProjectItem(name=default_project.name, path=default_project.path, is_default=True)
371 |
372 |
373 | # Synchronize projects between config and database
374 | @project_resource_router.post("/config/sync", response_model=ProjectStatusResponse)
375 | async def synchronize_projects(
376 | project_service: ProjectServiceDep,
377 | ) -> ProjectStatusResponse:
378 | """Synchronize projects between configuration file and database.
379 |
380 | Ensures that all projects in the configuration file exist in the database
381 | and vice versa.
382 |
383 | Returns:
384 | Response confirming synchronization was completed
385 | """
386 | try: # pragma: no cover
387 | await project_service.synchronize_projects()
388 |
389 | return ProjectStatusResponse( # pyright: ignore [reportCallIssue]
390 | message="Projects synchronized successfully between configuration and database",
391 | status="success",
392 | default=False,
393 | )
394 | except ValueError as e: # pragma: no cover
395 | raise HTTPException(status_code=400, detail=str(e))
396 |
```
--------------------------------------------------------------------------------
/test-int/mcp/test_delete_note_integration.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Integration tests for delete_note MCP tool.
3 |
4 | Tests the complete delete note workflow: MCP client -> MCP server -> FastAPI -> database
5 | """
6 |
7 | import pytest
8 | from fastmcp import Client
9 |
10 |
11 | @pytest.mark.asyncio
12 | async def test_delete_note_by_title(mcp_server, app, test_project):
13 | """Test deleting a note by its title."""
14 |
15 | async with Client(mcp_server) as client:
16 | # First create a note
17 | await client.call_tool(
18 | "write_note",
19 | {
20 | "project": test_project.name,
21 | "title": "Note to Delete",
22 | "folder": "test",
23 | "content": "# Note to Delete\n\nThis note will be deleted.",
24 | "tags": "test,delete",
25 | },
26 | )
27 |
28 | # Verify the note exists by reading it
29 | read_result = await client.call_tool(
30 | "read_note",
31 | {
32 | "project": test_project.name,
33 | "identifier": "Note to Delete",
34 | },
35 | )
36 | assert len(read_result.content) == 1
37 | assert "Note to Delete" in read_result.content[0].text
38 |
39 | # Delete the note by title
40 | delete_result = await client.call_tool(
41 | "delete_note",
42 | {
43 | "project": test_project.name,
44 | "identifier": "Note to Delete",
45 | },
46 | )
47 |
48 | # Should return True for successful deletion
49 | assert len(delete_result.content) == 1
50 | assert delete_result.content[0].type == "text"
51 | assert "true" in delete_result.content[0].text.lower()
52 |
53 | # Verify the note no longer exists
54 | read_after_delete = await client.call_tool(
55 | "read_note",
56 | {
57 | "project": test_project.name,
58 | "identifier": "Note to Delete",
59 | },
60 | )
61 |
62 | # Should return helpful "Note Not Found" message instead of the actual note
63 | assert len(read_after_delete.content) == 1
64 | result_text = read_after_delete.content[0].text
65 | assert "Note Not Found" in result_text
66 | assert "Note to Delete" in result_text
67 |
68 |
69 | @pytest.mark.asyncio
70 | async def test_delete_note_by_permalink(mcp_server, app, test_project):
71 | """Test deleting a note by its permalink."""
72 |
73 | async with Client(mcp_server) as client:
74 | # Create a note
75 | await client.call_tool(
76 | "write_note",
77 | {
78 | "project": test_project.name,
79 | "title": "Permalink Delete Test",
80 | "folder": "tests",
81 | "content": "# Permalink Delete Test\n\nTesting deletion by permalink.",
82 | "tags": "test,permalink",
83 | },
84 | )
85 |
86 | # Delete the note by permalink
87 | delete_result = await client.call_tool(
88 | "delete_note",
89 | {
90 | "project": test_project.name,
91 | "identifier": "tests/permalink-delete-test",
92 | },
93 | )
94 |
95 | # Should return True for successful deletion
96 | assert len(delete_result.content) == 1
97 | assert "true" in delete_result.content[0].text.lower()
98 |
99 | # Verify the note no longer exists by searching
100 | search_result = await client.call_tool(
101 | "search_notes",
102 | {
103 | "project": test_project.name,
104 | "query": "Permalink Delete Test",
105 | },
106 | )
107 |
108 | # Should have no results
109 | assert (
110 | '"results": []' in search_result.content[0].text
111 | or '"results":[]' in search_result.content[0].text
112 | )
113 |
114 |
115 | @pytest.mark.asyncio
116 | async def test_delete_note_with_observations_and_relations(mcp_server, app, test_project):
117 | """Test deleting a note that has observations and relations."""
118 |
119 | async with Client(mcp_server) as client:
120 | # Create a complex note with observations and relations
121 | complex_content = """# Project Management System
122 |
123 | This is a comprehensive project management system.
124 |
125 | ## Observations
126 | - [feature] Task tracking functionality
127 | - [feature] User authentication system
128 | - [tech] Built with Python and Flask
129 | - [status] Currently in development
130 |
131 | ## Relations
132 | - depends_on [[Database Schema]]
133 | - implements [[User Stories]]
134 | - part_of [[Main Application]]
135 |
136 | The system handles multiple projects and users."""
137 |
138 | await client.call_tool(
139 | "write_note",
140 | {
141 | "project": test_project.name,
142 | "title": "Project Management System",
143 | "folder": "projects",
144 | "content": complex_content,
145 | "tags": "project,management,system",
146 | },
147 | )
148 |
149 | # Verify the note exists and has content
150 | read_result = await client.call_tool(
151 | "read_note",
152 | {
153 | "project": test_project.name,
154 | "identifier": "Project Management System",
155 | },
156 | )
157 | assert len(read_result.content) == 1
158 | result_text = read_result.content[0].text
159 | assert "Task tracking functionality" in result_text
160 | assert "depends_on" in result_text
161 |
162 | # Delete the complex note
163 | delete_result = await client.call_tool(
164 | "delete_note",
165 | {
166 | "project": test_project.name,
167 | "identifier": "projects/project-management-system",
168 | },
169 | )
170 |
171 | # Should return True for successful deletion
172 | assert "true" in delete_result.content[0].text.lower()
173 |
174 | # Verify the note and all its components are deleted
175 | read_after_delete_2 = await client.call_tool(
176 | "read_note",
177 | {
178 | "project": test_project.name,
179 | "identifier": "Project Management System",
180 | },
181 | )
182 |
183 | # Should return "Note Not Found" message
184 | assert len(read_after_delete_2.content) == 1
185 | result_text = read_after_delete_2.content[0].text
186 | assert "Note Not Found" in result_text
187 | assert "Project Management System" in result_text
188 |
189 |
190 | @pytest.mark.asyncio
191 | async def test_delete_note_special_characters_in_title(mcp_server, app, test_project):
192 | """Test deleting notes with special characters in the title."""
193 |
194 | async with Client(mcp_server) as client:
195 | # Create notes with special characters
196 | special_titles = [
197 | "Note with spaces",
198 | "Note-with-dashes",
199 | "Note_with_underscores",
200 | "Note (with parentheses)",
201 | "Note & Symbols!",
202 | ]
203 |
204 | # Create all the notes
205 | for title in special_titles:
206 | await client.call_tool(
207 | "write_note",
208 | {
209 | "project": test_project.name,
210 | "title": title,
211 | "folder": "special",
212 | "content": f"# {title}\n\nContent for {title}",
213 | "tags": "special,characters",
214 | },
215 | )
216 |
217 | # Delete each note by title
218 | for title in special_titles:
219 | delete_result = await client.call_tool(
220 | "delete_note",
221 | {
222 | "project": test_project.name,
223 | "identifier": title,
224 | },
225 | )
226 |
227 | # Should return True for successful deletion
228 | assert "true" in delete_result.content[0].text.lower(), (
229 | f"Failed to delete note: {title}"
230 | )
231 |
232 | # Verify the note is deleted
233 | read_after_delete = await client.call_tool(
234 | "read_note",
235 | {
236 | "project": test_project.name,
237 | "identifier": title,
238 | },
239 | )
240 |
241 | # Should return "Note Not Found" message
242 | assert len(read_after_delete.content) == 1
243 | result_text = read_after_delete.content[0].text
244 | assert "Note Not Found" in result_text
245 | assert title in result_text
246 |
247 |
248 | @pytest.mark.asyncio
249 | async def test_delete_nonexistent_note(mcp_server, app, test_project):
250 | """Test attempting to delete a note that doesn't exist."""
251 |
252 | async with Client(mcp_server) as client:
253 | # Try to delete a note that doesn't exist
254 | delete_result = await client.call_tool(
255 | "delete_note",
256 | {
257 | "project": test_project.name,
258 | "identifier": "Nonexistent Note",
259 | },
260 | )
261 |
262 | # Should return False for unsuccessful deletion
263 | assert len(delete_result.content) == 1
264 | assert "false" in delete_result.content[0].text.lower()
265 |
266 |
267 | @pytest.mark.asyncio
268 | async def test_delete_note_by_file_path(mcp_server, app, test_project):
269 | """Test deleting a note using its file path."""
270 |
271 | async with Client(mcp_server) as client:
272 | # Create a note
273 | await client.call_tool(
274 | "write_note",
275 | {
276 | "project": test_project.name,
277 | "title": "File Path Delete",
278 | "folder": "docs",
279 | "content": "# File Path Delete\n\nTesting deletion by file path.",
280 | "tags": "test,filepath",
281 | },
282 | )
283 |
284 | # Try to delete using the file path (should work as an identifier)
285 | delete_result = await client.call_tool(
286 | "delete_note",
287 | {
288 | "project": test_project.name,
289 | "identifier": "docs/File Path Delete.md",
290 | },
291 | )
292 |
293 | # Should return True for successful deletion
294 | assert "true" in delete_result.content[0].text.lower()
295 |
296 | # Verify deletion
297 | read_after_delete = await client.call_tool(
298 | "read_note",
299 | {
300 | "project": test_project.name,
301 | "identifier": "File Path Delete",
302 | },
303 | )
304 |
305 | # Should return "Note Not Found" message
306 | assert len(read_after_delete.content) == 1
307 | result_text = read_after_delete.content[0].text
308 | assert "Note Not Found" in result_text
309 | assert "File Path Delete" in result_text
310 |
311 |
312 | @pytest.mark.asyncio
313 | async def test_delete_note_case_insensitive(mcp_server, app, test_project):
314 | """Test that note deletion is case insensitive for titles."""
315 |
316 | async with Client(mcp_server) as client:
317 | # Create a note with mixed case
318 | await client.call_tool(
319 | "write_note",
320 | {
321 | "project": test_project.name,
322 | "title": "CamelCase Note Title",
323 | "folder": "test",
324 | "content": "# CamelCase Note Title\n\nTesting case sensitivity.",
325 | "tags": "test,case",
326 | },
327 | )
328 |
329 | # Try to delete with different case
330 | delete_result = await client.call_tool(
331 | "delete_note",
332 | {
333 | "project": test_project.name,
334 | "identifier": "camelcase note title",
335 | },
336 | )
337 |
338 | # Should return True for successful deletion
339 | assert "true" in delete_result.content[0].text.lower()
340 |
341 |
342 | @pytest.mark.asyncio
343 | async def test_delete_multiple_notes_sequentially(mcp_server, app, test_project):
344 | """Test deleting multiple notes in sequence."""
345 |
346 | async with Client(mcp_server) as client:
347 | # Create multiple notes
348 | note_titles = [
349 | "First Note",
350 | "Second Note",
351 | "Third Note",
352 | "Fourth Note",
353 | "Fifth Note",
354 | ]
355 |
356 | for title in note_titles:
357 | await client.call_tool(
358 | "write_note",
359 | {
360 | "project": test_project.name,
361 | "title": title,
362 | "folder": "batch",
363 | "content": f"# {title}\n\nContent for {title}",
364 | "tags": "batch,test",
365 | },
366 | )
367 |
368 | # Delete all notes sequentially
369 | for title in note_titles:
370 | delete_result = await client.call_tool(
371 | "delete_note",
372 | {
373 | "project": test_project.name,
374 | "identifier": title,
375 | },
376 | )
377 |
378 | # Each deletion should be successful
379 | assert "true" in delete_result.content[0].text.lower(), f"Failed to delete {title}"
380 |
381 | # Verify all notes are deleted by searching
382 | search_result = await client.call_tool(
383 | "search_notes",
384 | {
385 | "project": test_project.name,
386 | "query": "batch",
387 | },
388 | )
389 |
390 | # Should have no results
391 | assert (
392 | '"results": []' in search_result.content[0].text
393 | or '"results":[]' in search_result.content[0].text
394 | )
395 |
396 |
397 | @pytest.mark.asyncio
398 | async def test_delete_note_with_unicode_content(mcp_server, app, test_project):
399 | """Test deleting notes with Unicode content."""
400 |
401 | async with Client(mcp_server) as client:
402 | # Create a note with Unicode content
403 | unicode_content = """# Unicode Test Note 🚀
404 |
405 | This note contains various Unicode characters:
406 | - Emojis: 🎉 🔥 ⚡ 💡
407 | - Languages: 测试中文 Tëst Übër
408 | - Symbols: ♠♣♥♦ ←→↑↓ ∞≠≤≥
409 | - Math: ∑∏∂∇∆Ω
410 |
411 | ## Observations
412 | - [test] Unicode characters preserved ✓
413 | - [note] Emoji support working 🎯
414 |
415 | ## Relations
416 | - supports [[Unicode Standards]]
417 | - tested_with [[Various Languages]]"""
418 |
419 | await client.call_tool(
420 | "write_note",
421 | {
422 | "project": test_project.name,
423 | "title": "Unicode Test Note",
424 | "folder": "unicode",
425 | "content": unicode_content,
426 | "tags": "unicode,test,emoji",
427 | },
428 | )
429 |
430 | # Delete the Unicode note
431 | delete_result = await client.call_tool(
432 | "delete_note",
433 | {
434 | "project": test_project.name,
435 | "identifier": "Unicode Test Note",
436 | },
437 | )
438 |
439 | # Should return True for successful deletion
440 | assert "true" in delete_result.content[0].text.lower()
441 |
442 | # Verify deletion
443 | read_after_delete = await client.call_tool(
444 | "read_note",
445 | {
446 | "project": test_project.name,
447 | "identifier": "Unicode Test Note",
448 | },
449 | )
450 |
451 | # Should return "Note Not Found" message
452 | assert len(read_after_delete.content) == 1
453 | result_text = read_after_delete.content[0].text
454 | assert "Note Not Found" in result_text
455 | assert "Unicode Test Note" in result_text
456 |
```
--------------------------------------------------------------------------------
/src/basic_memory/utils.py:
--------------------------------------------------------------------------------
```python
1 | """Utility functions for basic-memory."""
2 |
3 | import os
4 |
5 | import logging
6 | import re
7 | import sys
8 | from datetime import datetime
9 | from pathlib import Path
10 | from typing import Optional, Protocol, Union, runtime_checkable, List
11 |
12 | from loguru import logger
13 | from unidecode import unidecode
14 |
15 |
16 | def normalize_project_path(path: str) -> str:
17 | """Normalize project path by stripping mount point prefix.
18 |
19 | In cloud deployments, the S3 bucket is mounted at /app/data. We strip this
20 | prefix from project paths to avoid leaking implementation details and to
21 | ensure paths match the actual S3 bucket structure.
22 |
23 | For local paths (including Windows paths), returns the path unchanged.
24 |
25 | Args:
26 | path: Project path (e.g., "/app/data/basic-memory-llc" or "C:\\Users\\...")
27 |
28 | Returns:
29 | Normalized path (e.g., "/basic-memory-llc" or "C:\\Users\\...")
30 |
31 | Examples:
32 | >>> normalize_project_path("/app/data/my-project")
33 | '/my-project'
34 | >>> normalize_project_path("/my-project")
35 | '/my-project'
36 | >>> normalize_project_path("app/data/my-project")
37 | '/my-project'
38 | >>> normalize_project_path("C:\\\\Users\\\\project")
39 | 'C:\\\\Users\\\\project'
40 | """
41 | # Check if this is a Windows absolute path (e.g., C:\Users\...)
42 | # Windows paths have a drive letter followed by a colon
43 | if len(path) >= 2 and path[1] == ":":
44 | # Windows absolute path - return unchanged
45 | return path
46 |
47 | # Handle both absolute and relative Unix paths
48 | normalized = path.lstrip("/")
49 | if normalized.startswith("app/data/"):
50 | normalized = normalized.removeprefix("app/data/")
51 |
52 | # Ensure leading slash for Unix absolute paths
53 | if not normalized.startswith("/"):
54 | normalized = "/" + normalized
55 |
56 | return normalized
57 |
58 |
59 | @runtime_checkable
60 | class PathLike(Protocol):
61 | """Protocol for objects that can be used as paths."""
62 |
63 | def __str__(self) -> str: ...
64 |
65 |
66 | # In type annotations, use Union[Path, str] instead of FilePath for now
67 | # This preserves compatibility with existing code while we migrate
68 | FilePath = Union[Path, str]
69 |
70 | # Disable the "Queue is full" warning
71 | logging.getLogger("opentelemetry.sdk.metrics._internal.instrument").setLevel(logging.ERROR)
72 |
73 |
74 | def generate_permalink(file_path: Union[Path, str, PathLike], split_extension: bool = True) -> str:
75 | """Generate a stable permalink from a file path.
76 |
77 | Args:
78 | file_path: Original file path (str, Path, or PathLike)
79 |
80 | Returns:
81 | Normalized permalink that matches validation rules. Converts spaces and underscores
82 | to hyphens for consistency. Preserves non-ASCII characters like Chinese.
83 |
84 | Examples:
85 | >>> generate_permalink("docs/My Feature.md")
86 | 'docs/my-feature'
87 | >>> generate_permalink("specs/API (v2).md")
88 | 'specs/api-v2'
89 | >>> generate_permalink("design/unified_model_refactor.md")
90 | 'design/unified-model-refactor'
91 | >>> generate_permalink("中文/测试文档.md")
92 | '中文/测试文档'
93 | """
94 | # Convert Path to string if needed
95 | path_str = Path(str(file_path)).as_posix()
96 |
97 | # Remove extension (for now, possibly)
98 | (base, extension) = os.path.splitext(path_str)
99 |
100 | # Check if we have CJK characters that should be preserved
101 | # CJK ranges: \u4e00-\u9fff (CJK Unified Ideographs), \u3000-\u303f (CJK symbols),
102 | # \u3400-\u4dbf (CJK Extension A), \uff00-\uffef (Fullwidth forms)
103 | has_cjk_chars = any(
104 | "\u4e00" <= char <= "\u9fff"
105 | or "\u3000" <= char <= "\u303f"
106 | or "\u3400" <= char <= "\u4dbf"
107 | or "\uff00" <= char <= "\uffef"
108 | for char in base
109 | )
110 |
111 | if has_cjk_chars:
112 | # For text with CJK characters, selectively transliterate only Latin accented chars
113 | result = ""
114 | for char in base:
115 | if (
116 | "\u4e00" <= char <= "\u9fff"
117 | or "\u3000" <= char <= "\u303f"
118 | or "\u3400" <= char <= "\u4dbf"
119 | ):
120 | # Preserve CJK ideographs and symbols
121 | result += char
122 | elif "\uff00" <= char <= "\uffef":
123 | # Remove Chinese fullwidth punctuation entirely (like ,!?)
124 | continue
125 | else:
126 | # Transliterate Latin accented characters to ASCII
127 | result += unidecode(char)
128 |
129 | # Insert hyphens between CJK and Latin character transitions
130 | # Match: CJK followed by Latin letter/digit, or Latin letter/digit followed by CJK
131 | result = re.sub(
132 | r"([\u4e00-\u9fff\u3000-\u303f\u3400-\u4dbf])([a-zA-Z0-9])", r"\1-\2", result
133 | )
134 | result = re.sub(
135 | r"([a-zA-Z0-9])([\u4e00-\u9fff\u3000-\u303f\u3400-\u4dbf])", r"\1-\2", result
136 | )
137 |
138 | # Insert dash between camelCase
139 | result = re.sub(r"([a-z0-9])([A-Z])", r"\1-\2", result)
140 |
141 | # Convert ASCII letters to lowercase, preserve CJK
142 | lower_text = "".join(c.lower() if c.isascii() and c.isalpha() else c for c in result)
143 |
144 | # Replace underscores with hyphens
145 | text_with_hyphens = lower_text.replace("_", "-")
146 |
147 | # Remove apostrophes entirely (don't replace with hyphens)
148 | text_no_apostrophes = text_with_hyphens.replace("'", "")
149 |
150 | # Replace unsafe chars with hyphens, but preserve CJK characters
151 | clean_text = re.sub(
152 | r"[^a-z0-9\u4e00-\u9fff\u3000-\u303f\u3400-\u4dbf/\-]", "-", text_no_apostrophes
153 | )
154 | else:
155 | # Original ASCII-only processing for backward compatibility
156 | # Transliterate unicode to ascii
157 | ascii_text = unidecode(base)
158 |
159 | # Insert dash between camelCase
160 | ascii_text = re.sub(r"([a-z0-9])([A-Z])", r"\1-\2", ascii_text)
161 |
162 | # Convert to lowercase
163 | lower_text = ascii_text.lower()
164 |
165 | # replace underscores with hyphens
166 | text_with_hyphens = lower_text.replace("_", "-")
167 |
168 | # Remove apostrophes entirely (don't replace with hyphens)
169 | text_no_apostrophes = text_with_hyphens.replace("'", "")
170 |
171 | # Replace remaining invalid chars with hyphens
172 | clean_text = re.sub(r"[^a-z0-9/\-]", "-", text_no_apostrophes)
173 |
174 | # Collapse multiple hyphens
175 | clean_text = re.sub(r"-+", "-", clean_text)
176 |
177 | # Clean each path segment
178 | segments = clean_text.split("/")
179 | clean_segments = [s.strip("-") for s in segments]
180 |
181 | return_val = "/".join(clean_segments)
182 |
183 | # Append file extension back, if necessary
184 | if not split_extension and extension:
185 | return_val += extension
186 |
187 | return return_val
188 |
189 |
190 | def setup_logging(
191 | env: str,
192 | home_dir: Path,
193 | log_file: Optional[str] = None,
194 | log_level: str = "INFO",
195 | console: bool = True,
196 | ) -> None: # pragma: no cover
197 | """
198 | Configure logging for the application.
199 |
200 | Args:
201 | env: The environment name (dev, test, prod)
202 | home_dir: The root directory for the application
203 | log_file: The name of the log file to write to
204 | log_level: The logging level to use
205 | console: Whether to log to the console
206 | """
207 | # Remove default handler and any existing handlers
208 | logger.remove()
209 |
210 | # Add file handler if we are not running tests and a log file is specified
211 | if log_file and env != "test":
212 | # Setup file logger
213 | log_path = home_dir / log_file
214 | logger.add(
215 | str(log_path),
216 | level=log_level,
217 | rotation="10 MB",
218 | retention="10 days",
219 | backtrace=True,
220 | diagnose=True,
221 | enqueue=True,
222 | colorize=False,
223 | )
224 |
225 | # Add console logger if requested or in test mode
226 | if env == "test" or console:
227 | logger.add(sys.stderr, level=log_level, backtrace=True, diagnose=True, colorize=True)
228 |
229 | logger.info(f"ENV: '{env}' Log level: '{log_level}' Logging to {log_file}")
230 |
231 | # Bind environment context for structured logging (works in both local and cloud)
232 | tenant_id = os.getenv("BASIC_MEMORY_TENANT_ID", "local")
233 | fly_app_name = os.getenv("FLY_APP_NAME", "local")
234 | fly_machine_id = os.getenv("FLY_MACHINE_ID", "local")
235 | fly_region = os.getenv("FLY_REGION", "local")
236 |
237 | logger.configure(
238 | extra={
239 | "tenant_id": tenant_id,
240 | "fly_app_name": fly_app_name,
241 | "fly_machine_id": fly_machine_id,
242 | "fly_region": fly_region,
243 | }
244 | )
245 |
246 | # Reduce noise from third-party libraries
247 | noisy_loggers = {
248 | # HTTP client logs
249 | "httpx": logging.WARNING,
250 | # File watching logs
251 | "watchfiles.main": logging.WARNING,
252 | }
253 |
254 | # Set log levels for noisy loggers
255 | for logger_name, level in noisy_loggers.items():
256 | logging.getLogger(logger_name).setLevel(level)
257 |
258 |
259 | def parse_tags(tags: Union[List[str], str, None]) -> List[str]:
260 | """Parse tags from various input formats into a consistent list.
261 |
262 | Args:
263 | tags: Can be a list of strings, a comma-separated string, or None
264 |
265 | Returns:
266 | A list of tag strings, or an empty list if no tags
267 |
268 | Note:
269 | This function strips leading '#' characters from tags to prevent
270 | their accumulation when tags are processed multiple times.
271 | """
272 | if tags is None:
273 | return []
274 |
275 | # Process list of tags
276 | if isinstance(tags, list):
277 | # First strip whitespace, then strip leading '#' characters to prevent accumulation
278 | return [tag.strip().lstrip("#") for tag in tags if tag and tag.strip()]
279 |
280 | # Process string input
281 | if isinstance(tags, str):
282 | # Check if it's a JSON array string (common issue from AI assistants)
283 | import json
284 |
285 | if tags.strip().startswith("[") and tags.strip().endswith("]"):
286 | try:
287 | # Try to parse as JSON array
288 | parsed_json = json.loads(tags)
289 | if isinstance(parsed_json, list):
290 | # Recursively parse the JSON array as a list
291 | return parse_tags(parsed_json)
292 | except json.JSONDecodeError:
293 | # Not valid JSON, fall through to comma-separated parsing
294 | pass
295 |
296 | # Split by comma, strip whitespace, then strip leading '#' characters
297 | return [tag.strip().lstrip("#") for tag in tags.split(",") if tag and tag.strip()]
298 |
299 | # For any other type, try to convert to string and parse
300 | try: # pragma: no cover
301 | return parse_tags(str(tags))
302 | except (ValueError, TypeError): # pragma: no cover
303 | logger.warning(f"Couldn't parse tags from input of type {type(tags)}: {tags}")
304 | return []
305 |
306 |
307 | def normalize_newlines(multiline: str) -> str:
308 | """Replace any \r\n, \r, or \n with the native newline.
309 |
310 | Args:
311 | multiline: String containing any mixture of newlines.
312 |
313 | Returns:
314 | A string with normalized newlines native to the platform.
315 | """
316 | return re.sub(r"\r\n?|\n", os.linesep, multiline)
317 |
318 |
319 | def normalize_file_path_for_comparison(file_path: str) -> str:
320 | """Normalize a file path for conflict detection.
321 |
322 | This function normalizes file paths to help detect potential conflicts:
323 | - Converts to lowercase for case-insensitive comparison
324 | - Normalizes Unicode characters
325 | - Handles path separators consistently
326 |
327 | Args:
328 | file_path: The file path to normalize
329 |
330 | Returns:
331 | Normalized file path for comparison purposes
332 | """
333 | import unicodedata
334 |
335 | # Convert to lowercase for case-insensitive comparison
336 | normalized = file_path.lower()
337 |
338 | # Normalize Unicode characters (NFD normalization)
339 | normalized = unicodedata.normalize("NFD", normalized)
340 |
341 | # Replace path separators with forward slashes
342 | normalized = normalized.replace("\\", "/")
343 |
344 | # Remove multiple slashes
345 | normalized = re.sub(r"/+", "/", normalized)
346 |
347 | return normalized
348 |
349 |
350 | def detect_potential_file_conflicts(file_path: str, existing_paths: List[str]) -> List[str]:
351 | """Detect potential conflicts between a file path and existing paths.
352 |
353 | This function checks for various types of conflicts:
354 | - Case sensitivity differences
355 | - Unicode normalization differences
356 | - Path separator differences
357 | - Permalink generation conflicts
358 |
359 | Args:
360 | file_path: The file path to check
361 | existing_paths: List of existing file paths to check against
362 |
363 | Returns:
364 | List of existing paths that might conflict with the given file path
365 | """
366 | conflicts = []
367 |
368 | # Normalize the input file path
369 | normalized_input = normalize_file_path_for_comparison(file_path)
370 | input_permalink = generate_permalink(file_path)
371 |
372 | for existing_path in existing_paths:
373 | # Skip identical paths
374 | if existing_path == file_path:
375 | continue
376 |
377 | # Check for case-insensitive path conflicts
378 | normalized_existing = normalize_file_path_for_comparison(existing_path)
379 | if normalized_input == normalized_existing:
380 | conflicts.append(existing_path)
381 | continue
382 |
383 | # Check for permalink conflicts
384 | existing_permalink = generate_permalink(existing_path)
385 | if input_permalink == existing_permalink:
386 | conflicts.append(existing_path)
387 | continue
388 |
389 | return conflicts
390 |
391 |
392 | def valid_project_path_value(path: str):
393 | """Ensure project path is valid."""
394 | # Allow empty strings as they resolve to the project root
395 | if not path:
396 | return True
397 |
398 | # Check for obvious path traversal patterns first
399 | if ".." in path or "~" in path:
400 | return False
401 |
402 | # Check for Windows-style path traversal (even on Unix systems)
403 | if "\\.." in path or path.startswith("\\"):
404 | return False
405 |
406 | # Block absolute paths (Unix-style starting with / or Windows-style with drive letters)
407 | if path.startswith("/") or (len(path) >= 2 and path[1] == ":"):
408 | return False
409 |
410 | # Block paths with control characters (but allow whitespace that will be stripped)
411 | if path.strip() and any(ord(c) < 32 and c not in [" ", "\t"] for c in path):
412 | return False
413 |
414 | return True
415 |
416 |
417 | def validate_project_path(path: str, project_path: Path) -> bool:
418 | """Ensure path is valid and stays within project boundaries."""
419 |
420 | if not valid_project_path_value(path):
421 | return False
422 |
423 | try:
424 | resolved = (project_path / path).resolve()
425 | return resolved.is_relative_to(project_path.resolve())
426 | except (ValueError, OSError):
427 | return False
428 |
429 |
430 | def ensure_timezone_aware(dt: datetime) -> datetime:
431 | """Ensure a datetime is timezone-aware using system timezone.
432 |
433 | If the datetime is naive, convert it to timezone-aware using the system's local timezone.
434 | If it's already timezone-aware, return it unchanged.
435 |
436 | Args:
437 | dt: The datetime to ensure is timezone-aware
438 |
439 | Returns:
440 | A timezone-aware datetime
441 | """
442 | if dt.tzinfo is None:
443 | # Naive datetime - assume it's in local time and add timezone
444 | return dt.astimezone()
445 | else:
446 | # Already timezone-aware
447 | return dt
448 |
```
--------------------------------------------------------------------------------
/test-int/mcp/test_chatgpt_tools_integration.py:
--------------------------------------------------------------------------------
```python
1 | """
2 | Integration tests for ChatGPT-compatible MCP tools.
3 |
4 | Tests the complete flow of search and fetch tools designed for ChatGPT integration,
5 | ensuring they properly wrap Basic Memory's MCP tools and return OpenAI-compatible
6 | MCP content array format.
7 | """
8 |
9 | import json
10 | import pytest
11 | from fastmcp import Client
12 |
13 |
14 | def extract_mcp_json_content(mcp_result):
15 | """
16 | Helper to extract JSON content from MCP CallToolResult.
17 |
18 | FastMCP auto-serializes our List[Dict[str, Any]] return values, so we need to:
19 | 1. Get the content list from the CallToolResult
20 | 2. Parse the JSON string in the text field (which is our serialized list)
21 | 3. Extract the actual JSON from the MCP content array structure
22 | """
23 | content_list = mcp_result.content
24 | mcp_content_list = json.loads(content_list[0].text)
25 | return json.loads(mcp_content_list[0]["text"])
26 |
27 |
28 | @pytest.mark.asyncio
29 | async def test_chatgpt_search_basic(mcp_server, app, test_project):
30 | """Test basic ChatGPT search functionality with MCP content array format."""
31 |
32 | async with Client(mcp_server) as client:
33 | # Create test notes for searching
34 | await client.call_tool(
35 | "write_note",
36 | {
37 | "project": test_project.name,
38 | "title": "Machine Learning Fundamentals",
39 | "folder": "ai",
40 | "content": (
41 | "# Machine Learning Fundamentals\n\nIntroduction to ML concepts and algorithms."
42 | ),
43 | "tags": "ml,ai,fundamentals",
44 | },
45 | )
46 |
47 | await client.call_tool(
48 | "write_note",
49 | {
50 | "project": test_project.name,
51 | "title": "Deep Learning with PyTorch",
52 | "folder": "ai",
53 | "content": (
54 | "# Deep Learning with PyTorch\n\n"
55 | "Building neural networks using PyTorch framework."
56 | ),
57 | "tags": "pytorch,deep-learning,ai",
58 | },
59 | )
60 |
61 | await client.call_tool(
62 | "write_note",
63 | {
64 | "project": test_project.name,
65 | "title": "Data Visualization Guide",
66 | "folder": "data",
67 | "content": (
68 | "# Data Visualization Guide\n\nCreating charts and graphs for data analysis."
69 | ),
70 | "tags": "visualization,data,charts",
71 | },
72 | )
73 |
74 | # Test ChatGPT search tool
75 | search_result = await client.call_tool(
76 | "search",
77 | {
78 | "query": "Machine Learning",
79 | },
80 | )
81 |
82 | # Extract JSON content from MCP result
83 | results_json = extract_mcp_json_content(search_result)
84 | assert "results" in results_json
85 | assert len(results_json["results"]) > 0
86 |
87 | # Check result structure
88 | first_result = results_json["results"][0]
89 | assert "id" in first_result
90 | assert "title" in first_result
91 | assert "url" in first_result
92 |
93 | # Verify correct content found
94 | titles = [r["title"] for r in results_json["results"]]
95 | assert "Machine Learning Fundamentals" in titles
96 | assert "Data Visualization Guide" not in titles
97 |
98 |
99 | @pytest.mark.asyncio
100 | async def test_chatgpt_search_empty_results(mcp_server, app, test_project):
101 | """Test ChatGPT search with no matching results."""
102 |
103 | async with Client(mcp_server) as client:
104 | # Search for non-existent content
105 | search_result = await client.call_tool(
106 | "search",
107 | {
108 | "query": "NonExistentTopic12345",
109 | },
110 | )
111 |
112 | # Extract JSON content from MCP result
113 | results_json = extract_mcp_json_content(search_result)
114 | assert "results" in results_json
115 | assert len(results_json["results"]) == 0
116 | assert results_json["query"] == "NonExistentTopic12345"
117 |
118 |
119 | @pytest.mark.asyncio
120 | async def test_chatgpt_search_with_boolean_operators(mcp_server, app, test_project):
121 | """Test ChatGPT search with boolean operators."""
122 |
123 | async with Client(mcp_server) as client:
124 | # Create test notes
125 | await client.call_tool(
126 | "write_note",
127 | {
128 | "project": test_project.name,
129 | "title": "Python Web Frameworks",
130 | "folder": "dev",
131 | "content": (
132 | "# Python Web Frameworks\n\nComparing Django and Flask for web development."
133 | ),
134 | "tags": "python,web,frameworks",
135 | },
136 | )
137 |
138 | await client.call_tool(
139 | "write_note",
140 | {
141 | "project": test_project.name,
142 | "title": "JavaScript Frameworks",
143 | "folder": "dev",
144 | "content": "# JavaScript Frameworks\n\nReact, Vue, and Angular comparison.",
145 | "tags": "javascript,web,frameworks",
146 | },
147 | )
148 |
149 | # Test with AND operator
150 | search_result = await client.call_tool(
151 | "search",
152 | {
153 | "query": "Python AND frameworks",
154 | },
155 | )
156 |
157 | results_json = extract_mcp_json_content(search_result)
158 | titles = [r["title"] for r in results_json["results"]]
159 | assert "Python Web Frameworks" in titles
160 | assert "JavaScript Frameworks" not in titles
161 |
162 |
163 | @pytest.mark.asyncio
164 | async def test_chatgpt_fetch_document(mcp_server, app, test_project):
165 | """Test ChatGPT fetch tool for retrieving full document content."""
166 |
167 | async with Client(mcp_server) as client:
168 | # Create a test note
169 | note_content = """# Advanced Python Techniques
170 |
171 | ## Overview
172 | This document covers advanced Python programming techniques.
173 |
174 | ## Topics Covered
175 | - Decorators
176 | - Context Managers
177 | - Metaclasses
178 | - Async/Await patterns
179 |
180 | ## Code Examples
181 | ```python
182 | def my_decorator(func):
183 | def wrapper(*args, **kwargs):
184 | return func(*args, **kwargs)
185 | return wrapper
186 | ```
187 | """
188 |
189 | await client.call_tool(
190 | "write_note",
191 | {
192 | "project": test_project.name,
193 | "title": "Advanced Python Techniques",
194 | "folder": "programming",
195 | "content": note_content,
196 | "tags": "python,advanced,programming",
197 | },
198 | )
199 |
200 | # Fetch the document using its title
201 | fetch_result = await client.call_tool(
202 | "fetch",
203 | {
204 | "id": "Advanced Python Techniques",
205 | },
206 | )
207 |
208 | # Extract JSON content from MCP result
209 | document_json = extract_mcp_json_content(fetch_result)
210 | assert "id" in document_json
211 | assert "title" in document_json
212 | assert "text" in document_json
213 | assert "url" in document_json
214 | assert "metadata" in document_json
215 |
216 | # Verify content
217 | assert document_json["title"] == "Advanced Python Techniques"
218 | assert "Decorators" in document_json["text"]
219 | assert "Context Managers" in document_json["text"]
220 | assert "def my_decorator" in document_json["text"]
221 |
222 |
223 | @pytest.mark.asyncio
224 | async def test_chatgpt_fetch_by_permalink(mcp_server, app, test_project):
225 | """Test ChatGPT fetch using permalink identifier."""
226 |
227 | async with Client(mcp_server) as client:
228 | # Create a note with known content
229 | await client.call_tool(
230 | "write_note",
231 | {
232 | "project": test_project.name,
233 | "title": "Test Document",
234 | "folder": "test",
235 | "content": "# Test Document\n\nThis is test content for permalink fetching.",
236 | "tags": "test",
237 | },
238 | )
239 |
240 | # First search to get the permalink
241 | search_result = await client.call_tool(
242 | "search",
243 | {
244 | "query": "Test Document",
245 | },
246 | )
247 |
248 | results_json = extract_mcp_json_content(search_result)
249 | assert len(results_json["results"]) > 0
250 | permalink = results_json["results"][0]["id"]
251 |
252 | # Fetch using the permalink
253 | fetch_result = await client.call_tool(
254 | "fetch",
255 | {
256 | "id": permalink,
257 | },
258 | )
259 |
260 | # Verify the fetched document
261 | document_json = extract_mcp_json_content(fetch_result)
262 | assert document_json["id"] == permalink
263 | assert "Test Document" in document_json["title"]
264 | assert "test content for permalink fetching" in document_json["text"]
265 |
266 |
267 | @pytest.mark.asyncio
268 | async def test_chatgpt_fetch_nonexistent_document(mcp_server, app, test_project):
269 | """Test ChatGPT fetch with non-existent document ID."""
270 |
271 | async with Client(mcp_server) as client:
272 | # Try to fetch a non-existent document
273 | fetch_result = await client.call_tool(
274 | "fetch",
275 | {
276 | "id": "NonExistentDocument12345",
277 | },
278 | )
279 |
280 | # Extract JSON content from MCP result
281 | document_json = extract_mcp_json_content(fetch_result)
282 |
283 | # Should have document structure even for errors
284 | assert "id" in document_json
285 | assert "title" in document_json
286 | assert "text" in document_json
287 |
288 | # Check for error indication
289 | assert document_json["id"] == "NonExistentDocument12345"
290 | assert "Not Found" in document_json["text"] or "not found" in document_json["text"]
291 |
292 |
293 | @pytest.mark.asyncio
294 | async def test_chatgpt_fetch_with_empty_title(mcp_server, app, test_project):
295 | """Test ChatGPT fetch handles documents with empty or missing titles."""
296 |
297 | async with Client(mcp_server) as client:
298 | # Create a note without a title in the content
299 | await client.call_tool(
300 | "write_note",
301 | {
302 | "project": test_project.name,
303 | "title": "untitled-note",
304 | "folder": "misc",
305 | "content": "This is content without a markdown header.\n\nJust plain text.",
306 | "tags": "misc",
307 | },
308 | )
309 |
310 | # Fetch the document
311 | fetch_result = await client.call_tool(
312 | "fetch",
313 | {
314 | "id": "untitled-note",
315 | },
316 | )
317 |
318 | # Parse JSON response
319 | document_json = extract_mcp_json_content(fetch_result)
320 |
321 | # Should have a title even if content doesn't have one
322 | assert "title" in document_json
323 | assert document_json["title"] != ""
324 | assert document_json["title"] is not None
325 | assert "content without a markdown header" in document_json["text"]
326 |
327 |
328 | @pytest.mark.asyncio
329 | async def test_chatgpt_search_pagination_default(mcp_server, app, test_project):
330 | """Test that ChatGPT search uses reasonable pagination defaults."""
331 |
332 | async with Client(mcp_server) as client:
333 | # Create more than 10 notes to test pagination
334 | for i in range(15):
335 | await client.call_tool(
336 | "write_note",
337 | {
338 | "project": test_project.name,
339 | "title": f"Test Note {i}",
340 | "folder": "bulk",
341 | "content": f"# Test Note {i}\n\nThis is test content number {i}.",
342 | "tags": "test,bulk",
343 | },
344 | )
345 |
346 | # Search should return max 10 results by default
347 | search_result = await client.call_tool(
348 | "search",
349 | {
350 | "query": "Test Note",
351 | },
352 | )
353 |
354 | results_json = extract_mcp_json_content(search_result)
355 |
356 | # Should have at most 10 results (the default page_size)
357 | assert len(results_json["results"]) <= 10
358 | assert results_json["total_count"] <= 10
359 |
360 |
361 | @pytest.mark.asyncio
362 | async def test_chatgpt_tools_error_handling(mcp_server, app, test_project):
363 | """Test error handling in ChatGPT tools returns proper MCP format."""
364 |
365 | async with Client(mcp_server) as client:
366 | # Test search with invalid query (if validation exists)
367 | # Using empty query to potentially trigger an error
368 | search_result = await client.call_tool(
369 | "search",
370 | {
371 | "query": "", # Empty query might cause an error
372 | },
373 | )
374 |
375 | # Should still return MCP content array format
376 | assert hasattr(search_result, "content")
377 | content_list = search_result.content
378 | assert isinstance(content_list, list)
379 | assert len(content_list) == 1
380 | assert content_list[0].type == "text"
381 |
382 | # Should be valid JSON even on error
383 | results_json = extract_mcp_json_content(search_result)
384 | assert "results" in results_json # Should have results key even if empty
385 |
386 |
387 | @pytest.mark.asyncio
388 | async def test_chatgpt_integration_workflow(mcp_server, app, test_project):
389 | """Test complete workflow: search then fetch, as ChatGPT would use it."""
390 |
391 | async with Client(mcp_server) as client:
392 | # Step 1: Create multiple documents
393 | docs = [
394 | {
395 | "title": "API Design Best Practices",
396 | "content": (
397 | "# API Design Best Practices\n\nRESTful API design principles and patterns."
398 | ),
399 | "tags": "api,rest,design",
400 | },
401 | {
402 | "title": "GraphQL vs REST",
403 | "content": "# GraphQL vs REST\n\nComparing GraphQL and REST API architectures.",
404 | "tags": "api,graphql,rest",
405 | },
406 | {
407 | "title": "Database Design Patterns",
408 | "content": (
409 | "# Database Design Patterns\n\n"
410 | "Common database design patterns and anti-patterns."
411 | ),
412 | "tags": "database,design,patterns",
413 | },
414 | ]
415 |
416 | for doc in docs:
417 | await client.call_tool(
418 | "write_note",
419 | {
420 | "project": test_project.name,
421 | "title": doc["title"],
422 | "folder": "architecture",
423 | "content": doc["content"],
424 | "tags": doc["tags"],
425 | },
426 | )
427 |
428 | # Step 2: Search for API-related content (as ChatGPT would)
429 | search_result = await client.call_tool(
430 | "search",
431 | {
432 | "query": "API",
433 | },
434 | )
435 |
436 | results_json = extract_mcp_json_content(search_result)
437 | assert len(results_json["results"]) >= 2
438 |
439 | # Step 3: Fetch one of the search results (as ChatGPT would)
440 | first_result_id = results_json["results"][0]["id"]
441 | fetch_result = await client.call_tool(
442 | "fetch",
443 | {
444 | "id": first_result_id,
445 | },
446 | )
447 |
448 | document_json = extract_mcp_json_content(fetch_result)
449 |
450 | # Verify the fetched document matches search result
451 | assert document_json["id"] == first_result_id
452 | assert "API" in document_json["text"] or "api" in document_json["text"].lower()
453 |
454 | # Verify document has expected structure
455 | assert document_json["metadata"]["format"] == "markdown"
456 |
```
--------------------------------------------------------------------------------
/tests/cli/test_cli_tools.py:
--------------------------------------------------------------------------------
```python
1 | """Tests for the Basic Memory CLI tools.
2 |
3 | These tests use real MCP tools with the test environment instead of mocks.
4 | """
5 |
6 | # Import for testing
7 |
8 | import io
9 | from datetime import datetime, timedelta
10 | import json
11 | from textwrap import dedent
12 | from typing import AsyncGenerator
13 | from unittest.mock import patch
14 |
15 | import pytest_asyncio
16 | from typer.testing import CliRunner
17 |
18 | from basic_memory.cli.commands.tool import tool_app
19 | from basic_memory.schemas.base import Entity as EntitySchema
20 |
21 | runner = CliRunner()
22 |
23 |
24 | @pytest_asyncio.fixture
25 | async def setup_test_note(entity_service, search_service) -> AsyncGenerator[dict, None]:
26 | """Create a test note for CLI tests."""
27 | note_content = dedent("""
28 | # Test Note
29 |
30 | This is a test note for CLI commands.
31 |
32 | ## Observations
33 | - [tech] Test observation #test
34 | - [note] Another observation
35 |
36 | ## Relations
37 | - connects_to [[Another Note]]
38 | """)
39 |
40 | entity, created = await entity_service.create_or_update_entity(
41 | EntitySchema(
42 | title="Test Note",
43 | folder="test",
44 | entity_type="note",
45 | content=note_content,
46 | )
47 | )
48 |
49 | # Index the entity for search
50 | await search_service.index_entity(entity)
51 |
52 | yield {
53 | "title": entity.title,
54 | "permalink": entity.permalink,
55 | "content": note_content,
56 | }
57 |
58 |
59 | def test_write_note(cli_env, project_config, test_project):
60 | """Test write_note command with basic arguments."""
61 | result = runner.invoke(
62 | tool_app,
63 | [
64 | "write-note",
65 | "--title",
66 | "CLI Test Note",
67 | "--content",
68 | "This is a CLI test note",
69 | "--folder",
70 | "test",
71 | "--project",
72 | test_project.name,
73 | ],
74 | )
75 | assert result.exit_code == 0
76 |
77 | # Check for expected success message
78 | assert "CLI Test Note" in result.stdout
79 | assert "Created" in result.stdout or "Updated" in result.stdout
80 | assert "permalink" in result.stdout
81 |
82 |
83 | def test_write_note_with_project_arg(cli_env, project_config, test_project):
84 | """Test write_note command with basic arguments."""
85 | result = runner.invoke(
86 | tool_app,
87 | [
88 | "write-note",
89 | "--project",
90 | test_project.name,
91 | "--title",
92 | "CLI Test Note",
93 | "--content",
94 | "This is a CLI test note",
95 | "--folder",
96 | "test",
97 | ],
98 | )
99 | assert result.exit_code == 0
100 |
101 | # Check for expected success message
102 | assert "CLI Test Note" in result.stdout
103 | assert "Created" in result.stdout or "Updated" in result.stdout
104 | assert "permalink" in result.stdout
105 |
106 |
107 | def test_write_note_with_tags(cli_env, project_config):
108 | """Test write_note command with tags."""
109 | result = runner.invoke(
110 | tool_app,
111 | [
112 | "write-note",
113 | "--title",
114 | "Tagged CLI Test Note",
115 | "--content",
116 | "This is a test note with tags",
117 | "--folder",
118 | "test",
119 | "--tags",
120 | "tag1",
121 | "--tags",
122 | "tag2",
123 | ],
124 | )
125 | assert result.exit_code == 0
126 |
127 | # Check for expected success message
128 | assert "Tagged CLI Test Note" in result.stdout
129 | assert "tag1, tag2" in result.stdout or "tag1" in result.stdout and "tag2" in result.stdout
130 |
131 |
132 | def test_write_note_from_stdin(cli_env, project_config, monkeypatch):
133 | """Test write_note command reading from stdin.
134 |
135 | This test requires minimal mocking of stdin to simulate piped input.
136 | """
137 | test_content = "This is content from stdin for testing"
138 |
139 | # Mock stdin using monkeypatch, which works better with typer's CliRunner
140 | monkeypatch.setattr("sys.stdin", io.StringIO(test_content))
141 | monkeypatch.setattr("sys.stdin.isatty", lambda: False) # Simulate piped input
142 |
143 | # Use runner.invoke with input parameter as a fallback
144 | result = runner.invoke(
145 | tool_app,
146 | [
147 | "write-note",
148 | "--title",
149 | "Stdin Test Note",
150 | "--folder",
151 | "test",
152 | ],
153 | input=test_content, # Provide input as a fallback
154 | )
155 |
156 | assert result.exit_code == 0
157 |
158 | # Check for expected success message
159 | assert "Stdin Test Note" in result.stdout
160 | assert "Created" in result.stdout or "Updated" in result.stdout
161 | assert "permalink" in result.stdout
162 |
163 |
164 | def test_write_note_content_param_priority(cli_env, project_config):
165 | """Test that content parameter has priority over stdin."""
166 | stdin_content = "This content from stdin should NOT be used"
167 | param_content = "This explicit content parameter should be used"
168 |
169 | # Mock stdin but provide explicit content parameter
170 | with (
171 | patch("sys.stdin", io.StringIO(stdin_content)),
172 | patch("sys.stdin.isatty", return_value=False),
173 | ): # Simulate piped input
174 | result = runner.invoke(
175 | tool_app,
176 | [
177 | "write-note",
178 | "--title",
179 | "Priority Test Note",
180 | "--content",
181 | param_content,
182 | "--folder",
183 | "test",
184 | ],
185 | )
186 |
187 | assert result.exit_code == 0
188 |
189 | # Check the note was created with the content from parameter, not stdin
190 | # We can't directly check file contents in this test approach
191 | # but we can verify the command succeeded
192 | assert "Priority Test Note" in result.stdout
193 | assert "Created" in result.stdout or "Updated" in result.stdout
194 |
195 |
196 | def test_write_note_no_content(cli_env, project_config):
197 | """Test error handling when no content is provided."""
198 | # Mock stdin to appear as a terminal, not a pipe
199 | with patch("sys.stdin.isatty", return_value=True):
200 | result = runner.invoke(
201 | tool_app,
202 | [
203 | "write-note",
204 | "--title",
205 | "No Content Note",
206 | "--folder",
207 | "test",
208 | ],
209 | )
210 |
211 | # Should exit with an error
212 | assert result.exit_code == 1
213 | # assert "No content provided" in result.stderr
214 |
215 |
216 | def test_read_note(cli_env, setup_test_note):
217 | """Test read_note command."""
218 | permalink = setup_test_note["permalink"]
219 |
220 | result = runner.invoke(
221 | tool_app,
222 | ["read-note", permalink],
223 | )
224 | assert result.exit_code == 0
225 |
226 | # Should contain the note content and structure
227 | assert "Test Note" in result.stdout
228 | assert "This is a test note for CLI commands" in result.stdout
229 | assert "## Observations" in result.stdout
230 | assert "Test observation" in result.stdout
231 | assert "## Relations" in result.stdout
232 | assert "connects_to [[Another Note]]" in result.stdout
233 |
234 | # Note: We found that square brackets like [tech] are being stripped in CLI output,
235 | # so we're not asserting their presence
236 |
237 |
238 | def test_search_basic(cli_env, setup_test_note, test_project):
239 | """Test basic search command."""
240 | result = runner.invoke(
241 | tool_app,
242 | ["search-notes", "test observation", "--project", test_project.name],
243 | )
244 | assert result.exit_code == 0
245 |
246 | # Result should be JSON containing our test note
247 | search_result = json.loads(result.stdout)
248 | assert len(search_result["results"]) > 0
249 |
250 | # At least one result should match our test note or observation
251 | found = False
252 | for item in search_result["results"]:
253 | if "test" in item["permalink"].lower() and "observation" in item["permalink"].lower():
254 | found = True
255 | break
256 |
257 | assert found, "Search did not find the test observation"
258 |
259 |
260 | def test_search_permalink(cli_env, setup_test_note):
261 | """Test search with permalink flag."""
262 | permalink = setup_test_note["permalink"]
263 |
264 | result = runner.invoke(
265 | tool_app,
266 | ["search-notes", permalink, "--permalink"],
267 | )
268 | assert result.exit_code == 0
269 |
270 | # Result should be JSON containing our test note
271 | search_result = json.loads(result.stdout)
272 | assert len(search_result["results"]) > 0
273 |
274 | # Should find a result with matching permalink
275 | found = False
276 | for item in search_result["results"]:
277 | if item["permalink"] == permalink:
278 | found = True
279 | break
280 |
281 | assert found, "Search did not find the note by permalink"
282 |
283 |
284 | def test_build_context(cli_env, setup_test_note):
285 | """Test build_context command."""
286 | permalink = setup_test_note["permalink"]
287 |
288 | result = runner.invoke(
289 | tool_app,
290 | ["build-context", f"memory://{permalink}"],
291 | )
292 | assert result.exit_code == 0
293 |
294 | # Result should be JSON containing our test note
295 | context_result = json.loads(result.stdout)
296 | assert "results" in context_result
297 | assert len(context_result["results"]) > 0
298 |
299 | # Primary results should include our test note
300 | found = False
301 | for item in context_result["results"]:
302 | if item["primary_result"]["permalink"] == permalink:
303 | found = True
304 | break
305 |
306 | assert found, "Context did not include the test note"
307 |
308 |
309 | def test_build_context_with_options(cli_env, setup_test_note):
310 | """Test build_context command with all options."""
311 | permalink = setup_test_note["permalink"]
312 |
313 | result = runner.invoke(
314 | tool_app,
315 | [
316 | "build-context",
317 | f"memory://{permalink}",
318 | "--depth",
319 | "2",
320 | "--timeframe",
321 | "1d",
322 | "--page",
323 | "1",
324 | "--page-size",
325 | "5",
326 | "--max-related",
327 | "20",
328 | ],
329 | )
330 | assert result.exit_code == 0
331 |
332 | # Result should be JSON containing our test note
333 | context_result = json.loads(result.stdout)
334 |
335 | # Check that metadata reflects our options
336 | assert context_result["metadata"]["depth"] == 2
337 | timeframe = datetime.fromisoformat(context_result["metadata"]["timeframe"])
338 | assert datetime.now().astimezone() - timeframe <= timedelta(
339 | days=2
340 | ) # Compare timezone-aware datetimes
341 |
342 | # Results should include our test note
343 | found = False
344 | for item in context_result["results"]:
345 | if item["primary_result"]["permalink"] == permalink:
346 | found = True
347 | break
348 |
349 | assert found, "Context did not include the test note"
350 |
351 |
352 | def test_build_context_string_depth_parameter(cli_env, setup_test_note):
353 | """Test build_context command handles string depth parameter correctly."""
354 | permalink = setup_test_note["permalink"]
355 |
356 | # Test valid string depth parameter - Typer should convert it to int
357 | result = runner.invoke(
358 | tool_app,
359 | [
360 | "build-context",
361 | f"memory://{permalink}",
362 | "--depth",
363 | "2", # This is always a string from CLI
364 | ],
365 | )
366 | assert result.exit_code == 0
367 |
368 | # Result should be JSON containing our test note with correct depth
369 | context_result = json.loads(result.stdout)
370 | assert context_result["metadata"]["depth"] == 2
371 |
372 | # Test invalid string depth parameter - should fail with Typer validation error
373 | result = runner.invoke(
374 | tool_app,
375 | [
376 | "build-context",
377 | f"memory://{permalink}",
378 | "--depth",
379 | "invalid",
380 | ],
381 | )
382 | assert result.exit_code == 2 # Typer exits with code 2 for parameter validation errors
383 | # Typer should show a usage error for invalid integer
384 | assert (
385 | "invalid" in result.stderr
386 | and "is not a valid" in result.stderr
387 | and "integer" in result.stderr
388 | )
389 |
390 |
391 | # The get-entity CLI command was removed when tools were refactored
392 | # into separate files with improved error handling
393 |
394 |
395 | def test_recent_activity(cli_env, setup_test_note, test_project):
396 | """Test recent_activity command with defaults."""
397 | result = runner.invoke(
398 | tool_app,
399 | ["recent-activity"],
400 | )
401 | assert result.exit_code == 0
402 |
403 | # Result should be human-readable string containing recent activity
404 | output = result.stdout
405 | assert "Recent Activity Summary" in output
406 | assert "Most Active Project:" in output or "Other Active Projects:" in output
407 |
408 | # Our test note should be referenced in the output
409 | assert setup_test_note["permalink"] in output or setup_test_note["title"] in output
410 |
411 |
412 | def test_recent_activity_with_options(cli_env, setup_test_note, test_project):
413 | """Test recent_activity command with options."""
414 | result = runner.invoke(
415 | tool_app,
416 | [
417 | "recent-activity",
418 | "--type",
419 | "entity",
420 | "--depth",
421 | "2",
422 | "--timeframe",
423 | "7d",
424 | ],
425 | )
426 | assert result.exit_code == 0
427 |
428 | # Result should be human-readable string containing recent activity
429 | output = result.stdout
430 | assert "Recent Activity Summary" in output
431 | assert "Most Active Project:" in output or "Other Active Projects:" in output
432 |
433 | # Should include information about entities since we requested entity type
434 | assert setup_test_note["permalink"] in output or setup_test_note["title"] in output
435 |
436 |
437 | def test_continue_conversation(cli_env, setup_test_note):
438 | """Test continue_conversation command."""
439 | permalink = setup_test_note["permalink"]
440 |
441 | # Run the CLI command
442 | result = runner.invoke(
443 | tool_app,
444 | ["continue-conversation", "--topic", "Test Note"],
445 | )
446 | assert result.exit_code == 0
447 |
448 | # Check result contains expected content
449 | assert "Continuing conversation on: Test Note" in result.stdout
450 | assert "This is a memory retrieval session" in result.stdout
451 | assert "read_note" in result.stdout
452 | assert permalink in result.stdout
453 |
454 |
455 | def test_continue_conversation_no_results(cli_env):
456 | """Test continue_conversation command with no results."""
457 | # Run the CLI command with a nonexistent topic
458 | result = runner.invoke(
459 | tool_app,
460 | ["continue-conversation", "--topic", "NonexistentTopic"],
461 | )
462 | assert result.exit_code == 0
463 |
464 | # Check result contains expected content for no results
465 | assert "Continuing conversation on: NonexistentTopic" in result.stdout
466 | assert "The supplied query did not return any information" in result.stdout
467 |
468 |
469 | @patch("basic_memory.services.initialization.initialize_database")
470 | def test_ensure_migrations_functionality(mock_initialize_database, app_config, monkeypatch):
471 | """Test the database initialization functionality."""
472 | from basic_memory.services.initialization import ensure_initialization
473 |
474 | # Call the function
475 | ensure_initialization(app_config)
476 |
477 | # The underlying asyncio.run should call our mocked function
478 | mock_initialize_database.assert_called_once()
479 |
480 |
481 | @patch("basic_memory.services.initialization.initialize_database")
482 | def test_ensure_migrations_handles_errors(mock_initialize_database, app_config, monkeypatch):
483 | """Test that initialization handles errors gracefully."""
484 | from basic_memory.services.initialization import ensure_initialization
485 |
486 | # Configure mock to raise an exception
487 | mock_initialize_database.side_effect = Exception("Test error")
488 |
489 | # Call the function - should not raise exception
490 | ensure_initialization(app_config)
491 |
492 | # We're just making sure it doesn't crash by calling it
493 |
```
--------------------------------------------------------------------------------
/tests/conftest.py:
--------------------------------------------------------------------------------
```python
1 | """Common test fixtures."""
2 |
3 | from dataclasses import dataclass
4 | from datetime import datetime, timezone
5 | from pathlib import Path
6 | from textwrap import dedent
7 | from typing import AsyncGenerator
8 |
9 | import os
10 | import pytest
11 | import pytest_asyncio
12 | from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker
13 |
14 | from basic_memory import db
15 | from basic_memory.config import ProjectConfig, BasicMemoryConfig, ConfigManager
16 | from basic_memory.db import DatabaseType
17 | from basic_memory.markdown import EntityParser
18 | from basic_memory.markdown.markdown_processor import MarkdownProcessor
19 | from basic_memory.models import Base
20 | from basic_memory.models.knowledge import Entity
21 | from basic_memory.models.project import Project
22 | from basic_memory.repository.entity_repository import EntityRepository
23 | from basic_memory.repository.observation_repository import ObservationRepository
24 | from basic_memory.repository.project_repository import ProjectRepository
25 | from basic_memory.repository.relation_repository import RelationRepository
26 | from basic_memory.repository.search_repository import SearchRepository
27 | from basic_memory.schemas.base import Entity as EntitySchema
28 | from basic_memory.services import (
29 | EntityService,
30 | ProjectService,
31 | )
32 | from basic_memory.services.directory_service import DirectoryService
33 | from basic_memory.services.file_service import FileService
34 | from basic_memory.services.link_resolver import LinkResolver
35 | from basic_memory.services.search_service import SearchService
36 | from basic_memory.sync.sync_service import SyncService
37 | from basic_memory.sync.watch_service import WatchService
38 |
39 |
40 | @pytest.fixture
41 | def anyio_backend():
42 | return "asyncio"
43 |
44 |
45 | @pytest.fixture
46 | def project_root() -> Path:
47 | return Path(__file__).parent.parent
48 |
49 |
50 | @pytest.fixture
51 | def config_home(tmp_path, monkeypatch) -> Path:
52 | # Patch HOME environment variable for the duration of the test
53 | monkeypatch.setenv("HOME", str(tmp_path))
54 | # On Windows, also set USERPROFILE
55 | if os.name == "nt":
56 | monkeypatch.setenv("USERPROFILE", str(tmp_path))
57 | # Set BASIC_MEMORY_HOME to the test directory
58 | monkeypatch.setenv("BASIC_MEMORY_HOME", str(tmp_path / "basic-memory"))
59 | return tmp_path
60 |
61 |
62 | @pytest.fixture(scope="function", autouse=True)
63 | def app_config(config_home, tmp_path, monkeypatch) -> BasicMemoryConfig:
64 | """Create test app configuration."""
65 | # Create a basic config without depending on test_project to avoid circular dependency
66 | projects = {"test-project": str(config_home)}
67 | app_config = BasicMemoryConfig(
68 | env="test",
69 | projects=projects,
70 | default_project="test-project",
71 | update_permalinks_on_move=True,
72 | )
73 |
74 | return app_config
75 |
76 |
77 | @pytest.fixture(autouse=True)
78 | def config_manager(
79 | app_config: BasicMemoryConfig, project_config: ProjectConfig, config_home: Path, monkeypatch
80 | ) -> ConfigManager:
81 | # Invalidate config cache to ensure clean state for each test
82 | from basic_memory import config as config_module
83 |
84 | config_module._CONFIG_CACHE = None
85 |
86 | # Create a new ConfigManager that uses the test home directory
87 | config_manager = ConfigManager()
88 | # Update its paths to use the test directory
89 | config_manager.config_dir = config_home / ".basic-memory"
90 | config_manager.config_file = config_manager.config_dir / "config.json"
91 | config_manager.config_dir.mkdir(parents=True, exist_ok=True)
92 |
93 | # Ensure the config file is written to disk
94 | config_manager.save_config(app_config)
95 | return config_manager
96 |
97 |
98 | @pytest.fixture(scope="function", autouse=True)
99 | def project_config(test_project):
100 | """Create test project configuration."""
101 |
102 | project_config = ProjectConfig(
103 | name=test_project.name,
104 | home=Path(test_project.path),
105 | )
106 |
107 | return project_config
108 |
109 |
110 | @dataclass
111 | class TestConfig:
112 | config_home: Path
113 | project_config: ProjectConfig
114 | app_config: BasicMemoryConfig
115 | config_manager: ConfigManager
116 |
117 |
118 | @pytest.fixture
119 | def test_config(config_home, project_config, app_config, config_manager) -> TestConfig:
120 | """All test configuration fixtures"""
121 | return TestConfig(config_home, project_config, app_config, config_manager)
122 |
123 |
124 | @pytest_asyncio.fixture(scope="function")
125 | async def engine_factory(
126 | app_config,
127 | ) -> AsyncGenerator[tuple[AsyncEngine, async_sessionmaker[AsyncSession]], None]:
128 | """Create an engine and session factory using an in-memory SQLite database."""
129 | async with db.engine_session_factory(
130 | db_path=app_config.database_path, db_type=DatabaseType.MEMORY
131 | ) as (engine, session_maker):
132 | # Create all tables for the DB the engine is connected to
133 | async with engine.begin() as conn:
134 | await conn.run_sync(Base.metadata.create_all)
135 |
136 | yield engine, session_maker
137 |
138 |
139 | @pytest_asyncio.fixture
140 | async def session_maker(engine_factory) -> async_sessionmaker[AsyncSession]:
141 | """Get session maker for tests."""
142 | _, session_maker = engine_factory
143 | return session_maker
144 |
145 |
146 | ## Repositories
147 |
148 |
149 | @pytest_asyncio.fixture(scope="function")
150 | async def entity_repository(
151 | session_maker: async_sessionmaker[AsyncSession], test_project: Project
152 | ) -> EntityRepository:
153 | """Create an EntityRepository instance with project context."""
154 | return EntityRepository(session_maker, project_id=test_project.id)
155 |
156 |
157 | @pytest_asyncio.fixture(scope="function")
158 | async def observation_repository(
159 | session_maker: async_sessionmaker[AsyncSession], test_project: Project
160 | ) -> ObservationRepository:
161 | """Create an ObservationRepository instance with project context."""
162 | return ObservationRepository(session_maker, project_id=test_project.id)
163 |
164 |
165 | @pytest_asyncio.fixture(scope="function")
166 | async def relation_repository(
167 | session_maker: async_sessionmaker[AsyncSession], test_project: Project
168 | ) -> RelationRepository:
169 | """Create a RelationRepository instance with project context."""
170 | return RelationRepository(session_maker, project_id=test_project.id)
171 |
172 |
173 | @pytest_asyncio.fixture(scope="function")
174 | async def project_repository(
175 | session_maker: async_sessionmaker[AsyncSession],
176 | ) -> ProjectRepository:
177 | """Create a ProjectRepository instance."""
178 | return ProjectRepository(session_maker)
179 |
180 |
181 | @pytest_asyncio.fixture(scope="function")
182 | async def test_project(config_home, engine_factory) -> Project:
183 | """Create a test project to be used as context for other repositories."""
184 | project_data = {
185 | "name": "test-project",
186 | "description": "Project used as context for tests",
187 | "path": str(config_home),
188 | "is_active": True,
189 | "is_default": True, # Explicitly set as the default project (for cli operations)
190 | }
191 | engine, session_maker = engine_factory
192 | project_repository = ProjectRepository(session_maker)
193 | project = await project_repository.create(project_data)
194 | return project
195 |
196 |
197 | ## Services
198 |
199 |
200 | @pytest_asyncio.fixture
201 | async def entity_service(
202 | entity_repository: EntityRepository,
203 | observation_repository: ObservationRepository,
204 | relation_repository: RelationRepository,
205 | entity_parser: EntityParser,
206 | file_service: FileService,
207 | link_resolver: LinkResolver,
208 | app_config: BasicMemoryConfig,
209 | ) -> EntityService:
210 | """Create EntityService."""
211 | return EntityService(
212 | entity_parser=entity_parser,
213 | entity_repository=entity_repository,
214 | observation_repository=observation_repository,
215 | relation_repository=relation_repository,
216 | file_service=file_service,
217 | link_resolver=link_resolver,
218 | app_config=app_config,
219 | )
220 |
221 |
222 | @pytest.fixture
223 | def file_service(
224 | project_config: ProjectConfig, markdown_processor: MarkdownProcessor
225 | ) -> FileService:
226 | """Create FileService instance."""
227 | return FileService(project_config.home, markdown_processor)
228 |
229 |
230 | @pytest.fixture
231 | def markdown_processor(entity_parser: EntityParser) -> MarkdownProcessor:
232 | """Create writer instance."""
233 | return MarkdownProcessor(entity_parser)
234 |
235 |
236 | @pytest.fixture
237 | def link_resolver(entity_repository: EntityRepository, search_service: SearchService):
238 | """Create parser instance."""
239 | return LinkResolver(entity_repository, search_service)
240 |
241 |
242 | @pytest.fixture
243 | def entity_parser(project_config):
244 | """Create parser instance."""
245 | return EntityParser(project_config.home)
246 |
247 |
248 | @pytest_asyncio.fixture
249 | async def sync_service(
250 | app_config: BasicMemoryConfig,
251 | entity_service: EntityService,
252 | entity_parser: EntityParser,
253 | project_repository: ProjectRepository,
254 | entity_repository: EntityRepository,
255 | relation_repository: RelationRepository,
256 | search_service: SearchService,
257 | file_service: FileService,
258 | ) -> SyncService:
259 | """Create sync service for testing."""
260 | return SyncService(
261 | app_config=app_config,
262 | entity_service=entity_service,
263 | project_repository=project_repository,
264 | entity_repository=entity_repository,
265 | relation_repository=relation_repository,
266 | entity_parser=entity_parser,
267 | search_service=search_service,
268 | file_service=file_service,
269 | )
270 |
271 |
272 | @pytest_asyncio.fixture
273 | async def directory_service(entity_repository, project_config) -> DirectoryService:
274 | """Create directory service for testing."""
275 | return DirectoryService(
276 | entity_repository=entity_repository,
277 | )
278 |
279 |
280 | @pytest_asyncio.fixture
281 | async def search_repository(session_maker, test_project: Project):
282 | """Create SearchRepository instance with project context"""
283 | return SearchRepository(session_maker, project_id=test_project.id)
284 |
285 |
286 | @pytest_asyncio.fixture(autouse=True)
287 | async def init_search_index(search_service):
288 | await search_service.init_search_index()
289 |
290 |
291 | @pytest_asyncio.fixture
292 | async def search_service(
293 | search_repository: SearchRepository,
294 | entity_repository: EntityRepository,
295 | file_service: FileService,
296 | ) -> SearchService:
297 | """Create and initialize search service"""
298 | service = SearchService(search_repository, entity_repository, file_service)
299 | await service.init_search_index()
300 | return service
301 |
302 |
303 | @pytest_asyncio.fixture(scope="function")
304 | async def sample_entity(entity_repository: EntityRepository) -> Entity:
305 | """Create a sample entity for testing."""
306 | entity_data = {
307 | "project_id": entity_repository.project_id,
308 | "title": "Test Entity",
309 | "entity_type": "test",
310 | "permalink": "test/test-entity",
311 | "file_path": "test/test_entity.md",
312 | "content_type": "text/markdown",
313 | "created_at": datetime.now(timezone.utc),
314 | "updated_at": datetime.now(timezone.utc),
315 | }
316 | return await entity_repository.create(entity_data)
317 |
318 |
319 | @pytest_asyncio.fixture
320 | async def project_service(
321 | project_repository: ProjectRepository,
322 | ) -> ProjectService:
323 | """Create ProjectService with repository."""
324 | return ProjectService(repository=project_repository)
325 |
326 |
327 | @pytest_asyncio.fixture
328 | async def full_entity(sample_entity, entity_repository, file_service, entity_service) -> Entity:
329 | """Create a search test entity."""
330 |
331 | # Create test entity
332 | entity, created = await entity_service.create_or_update_entity(
333 | EntitySchema(
334 | title="Search_Entity",
335 | folder="test",
336 | entity_type="test",
337 | content=dedent("""
338 | ## Observations
339 | - [tech] Tech note
340 | - [design] Design note
341 |
342 | ## Relations
343 | - out1 [[Test Entity]]
344 | - out2 [[Test Entity]]
345 | """),
346 | )
347 | )
348 | return entity
349 |
350 |
351 | @pytest_asyncio.fixture
352 | async def test_graph(
353 | entity_repository,
354 | relation_repository,
355 | observation_repository,
356 | search_service,
357 | file_service,
358 | entity_service,
359 | ):
360 | """Create a test knowledge graph with entities, relations and observations."""
361 |
362 | # Create some test entities in reverse order so they will be linked
363 | deeper, _ = await entity_service.create_or_update_entity(
364 | EntitySchema(
365 | title="Deeper Entity",
366 | entity_type="deeper",
367 | folder="test",
368 | content=dedent("""
369 | # Deeper Entity
370 | """),
371 | )
372 | )
373 |
374 | deep, _ = await entity_service.create_or_update_entity(
375 | EntitySchema(
376 | title="Deep Entity",
377 | entity_type="deep",
378 | folder="test",
379 | content=dedent("""
380 | # Deep Entity
381 | - deeper_connection [[Deeper Entity]]
382 | """),
383 | )
384 | )
385 |
386 | connected_2, _ = await entity_service.create_or_update_entity(
387 | EntitySchema(
388 | title="Connected Entity 2",
389 | entity_type="test",
390 | folder="test",
391 | content=dedent("""
392 | # Connected Entity 2
393 | - deep_connection [[Deep Entity]]
394 | """),
395 | )
396 | )
397 |
398 | connected_1, _ = await entity_service.create_or_update_entity(
399 | EntitySchema(
400 | title="Connected Entity 1",
401 | entity_type="test",
402 | folder="test",
403 | content=dedent("""
404 | # Connected Entity 1
405 | - [note] Connected 1 note
406 | - connected_to [[Connected Entity 2]]
407 | """),
408 | )
409 | )
410 |
411 | root, _ = await entity_service.create_or_update_entity(
412 | EntitySchema(
413 | title="Root",
414 | entity_type="test",
415 | folder="test",
416 | content=dedent("""
417 | # Root Entity
418 | - [note] Root note 1
419 | - [tech] Root tech note
420 | - connects_to [[Connected Entity 1]]
421 | """),
422 | )
423 | )
424 |
425 | # get latest
426 | entities = await entity_repository.find_all()
427 | relations = await relation_repository.find_all()
428 |
429 | # Index everything for search
430 | for entity in entities:
431 | await search_service.index_entity(entity)
432 |
433 | return {
434 | "root": root,
435 | "connected1": connected_1,
436 | "connected2": connected_2,
437 | "deep": deep,
438 | "observations": [e.observations for e in entities],
439 | "relations": relations,
440 | }
441 |
442 |
443 | @pytest.fixture
444 | def watch_service(app_config: BasicMemoryConfig, project_repository) -> WatchService:
445 | return WatchService(app_config=app_config, project_repository=project_repository)
446 |
447 |
448 | @pytest.fixture
449 | def test_files(project_config, project_root) -> dict[str, Path]:
450 | """Copy test files into the project directory.
451 |
452 | Returns a dict mapping file names to their paths in the project dir.
453 | """
454 | # Source files relative to tests directory
455 | source_files = {
456 | "pdf": Path(project_root / "tests/Non-MarkdownFileSupport.pdf"),
457 | "image": Path(project_root / "tests/Screenshot.png"),
458 | }
459 |
460 | # Create copies in temp project directory
461 | project_files = {}
462 | for name, src_path in source_files.items():
463 | # Read source file
464 | content = src_path.read_bytes()
465 |
466 | # Create destination path and ensure parent dirs exist
467 | dest_path = project_config.home / src_path.name
468 | dest_path.parent.mkdir(parents=True, exist_ok=True)
469 |
470 | # Write file
471 | dest_path.write_bytes(content)
472 | project_files[name] = dest_path
473 |
474 | return project_files
475 |
476 |
477 | @pytest_asyncio.fixture
478 | async def synced_files(sync_service, project_config, test_files):
479 | # Initial sync - should create forward reference
480 | await sync_service.sync(project_config.home)
481 | return test_files
482 |
```
--------------------------------------------------------------------------------
/tests/utils/test_validate_project_path.py:
--------------------------------------------------------------------------------
```python
1 | """Tests for the validate_project_path security function."""
2 |
3 | import pytest
4 | from pathlib import Path
5 |
6 | from basic_memory.utils import validate_project_path
7 |
8 |
9 | class TestValidateProjectPathSafety:
10 | """Test that validate_project_path correctly identifies safe paths."""
11 |
12 | def test_valid_relative_paths(self, tmp_path):
13 | """Test that legitimate relative paths are allowed."""
14 | project_path = tmp_path / "project"
15 | project_path.mkdir()
16 |
17 | safe_paths = [
18 | "notes/meeting.md",
19 | "docs/readme.txt",
20 | "folder/subfolder/file.txt",
21 | "simple-file.md",
22 | "research/findings-2025.md",
23 | "projects/basic-memory/docs.md",
24 | "deep/nested/directory/structure/file.txt",
25 | "file-with-hyphens.md",
26 | "file_with_underscores.txt",
27 | "file123.md",
28 | "UPPERCASE.MD",
29 | "MixedCase.txt",
30 | ]
31 |
32 | for path in safe_paths:
33 | assert validate_project_path(path, project_path), (
34 | f"Safe path '{path}' should be allowed"
35 | )
36 |
37 | def test_empty_and_current_directory(self, tmp_path):
38 | """Test handling of empty paths and current directory references."""
39 | project_path = tmp_path / "project"
40 | project_path.mkdir()
41 |
42 | # Current directory should be safe
43 | assert validate_project_path(".", project_path)
44 |
45 | # Files in current directory should be safe
46 | assert validate_project_path("./file.txt", project_path)
47 |
48 | def test_nested_safe_paths(self, tmp_path):
49 | """Test deeply nested but safe paths."""
50 | project_path = tmp_path / "project"
51 | project_path.mkdir()
52 |
53 | nested_paths = [
54 | "level1/level2/level3/level4/file.txt",
55 | "very/deeply/nested/directory/structure/with/many/levels/file.md",
56 | "a/b/c/d/e/f/g/h/i/j/file.txt",
57 | ]
58 |
59 | for path in nested_paths:
60 | assert validate_project_path(path, project_path), (
61 | f"Nested path '{path}' should be allowed"
62 | )
63 |
64 |
65 | class TestValidateProjectPathAttacks:
66 | """Test that validate_project_path blocks path traversal attacks."""
67 |
68 | def test_unix_path_traversal(self, tmp_path):
69 | """Test that Unix-style path traversal is blocked."""
70 | project_path = tmp_path / "project"
71 | project_path.mkdir()
72 |
73 | attack_paths = [
74 | "../",
75 | "../../",
76 | "../../../",
77 | "../etc/passwd",
78 | "../../etc/passwd",
79 | "../../../etc/passwd",
80 | "../../../../etc/passwd",
81 | "../../.env",
82 | "../../../home/user/.ssh/id_rsa",
83 | "../../../../var/log/auth.log",
84 | "../../.bashrc",
85 | "../../../etc/shadow",
86 | ]
87 |
88 | for path in attack_paths:
89 | assert not validate_project_path(path, project_path), (
90 | f"Attack path '{path}' should be blocked"
91 | )
92 |
93 | def test_windows_path_traversal(self, tmp_path):
94 | """Test that Windows-style path traversal is blocked."""
95 | project_path = tmp_path / "project"
96 | project_path.mkdir()
97 |
98 | attack_paths = [
99 | "..\\",
100 | "..\\..\\",
101 | "..\\..\\..\\",
102 | "..\\..\\..\\Windows\\System32\\config\\SAM",
103 | "..\\..\\..\\Users\\user\\.env",
104 | "..\\..\\..\\Windows\\System32\\drivers\\etc\\hosts",
105 | "..\\..\\Boot.ini",
106 | "\\Windows\\System32",
107 | "\\..\\..\\Windows",
108 | ]
109 |
110 | for path in attack_paths:
111 | assert not validate_project_path(path, project_path), (
112 | f"Windows attack path '{path}' should be blocked"
113 | )
114 |
115 | def test_mixed_traversal_patterns(self, tmp_path):
116 | """Test paths that mix legitimate content with traversal."""
117 | project_path = tmp_path / "project"
118 | project_path.mkdir()
119 |
120 | mixed_attacks = [
121 | "notes/../../../etc/passwd",
122 | "docs/../../.env",
123 | "folder/subfolder/../../../etc/passwd",
124 | "legitimate/path/../../.ssh/id_rsa",
125 | "notes/../../../home/user/.bashrc",
126 | "documents/../../Windows/System32/config/SAM",
127 | ]
128 |
129 | for path in mixed_attacks:
130 | assert not validate_project_path(path, project_path), (
131 | f"Mixed attack path '{path}' should be blocked"
132 | )
133 |
134 | def test_home_directory_access(self, tmp_path):
135 | """Test that home directory access patterns are blocked."""
136 | project_path = tmp_path / "project"
137 | project_path.mkdir()
138 |
139 | home_attacks = [
140 | "~/",
141 | "~/.env",
142 | "~/.ssh/id_rsa",
143 | "~/secrets.txt",
144 | "~/Documents/passwords.txt",
145 | "~\\AppData\\secrets",
146 | "~\\Desktop\\config.ini",
147 | ]
148 |
149 | for path in home_attacks:
150 | assert not validate_project_path(path, project_path), (
151 | f"Home directory attack '{path}' should be blocked"
152 | )
153 |
154 | def test_unc_and_network_paths(self, tmp_path):
155 | """Test that UNC and network paths are blocked."""
156 | project_path = tmp_path / "project"
157 | project_path.mkdir()
158 |
159 | network_attacks = [
160 | "\\\\server\\share",
161 | "\\\\192.168.1.100\\c$",
162 | "\\\\evil-server\\malicious-share\\file.exe",
163 | "\\\\localhost\\c$\\Windows\\System32",
164 | ]
165 |
166 | for path in network_attacks:
167 | assert not validate_project_path(path, project_path), (
168 | f"Network path attack '{path}' should be blocked"
169 | )
170 |
171 | def test_absolute_paths(self, tmp_path):
172 | """Test that absolute paths are blocked (if they contain traversal)."""
173 | project_path = tmp_path / "project"
174 | project_path.mkdir()
175 |
176 | # Note: Some absolute paths might be allowed by pathlib resolution,
177 | # but our function should catch traversal patterns first
178 | absolute_attacks = [
179 | "/etc/passwd",
180 | "/home/user/.env",
181 | "/var/log/auth.log",
182 | "/root/.ssh/id_rsa",
183 | "C:\\Windows\\System32\\config\\SAM",
184 | "C:\\Users\\user\\.env",
185 | "D:\\secrets\\config.json",
186 | ]
187 |
188 | for path in absolute_attacks:
189 | # These should be blocked either by traversal detection or pathlib resolution
190 | result = validate_project_path(path, project_path)
191 | assert not result, f"Absolute path '{path}' should be blocked"
192 |
193 |
194 | class TestValidateProjectPathEdgeCases:
195 | """Test edge cases and error conditions."""
196 |
197 | def test_malformed_paths(self, tmp_path):
198 | """Test handling of malformed or unusual paths."""
199 | project_path = tmp_path / "project"
200 | project_path.mkdir()
201 |
202 | malformed_paths = [
203 | "", # Empty string
204 | " ", # Whitespace only
205 | "\n", # Newline
206 | "\t", # Tab
207 | "\r\n", # Windows line ending
208 | "file\x00name", # Null byte (if it gets this far)
209 | "file\x01name", # Other control characters
210 | ]
211 |
212 | for path in malformed_paths:
213 | # These should either be blocked or cause an exception that's handled
214 | try:
215 | result = validate_project_path(path, project_path)
216 | if path.strip(): # Non-empty paths with control chars should be blocked
217 | assert not result, f"Malformed path '{repr(path)}' should be blocked"
218 | except (ValueError, OSError):
219 | # It's acceptable for these to raise exceptions
220 | pass
221 |
222 | def test_very_long_paths(self, tmp_path):
223 | """Test handling of very long paths."""
224 | project_path = tmp_path / "project"
225 | project_path.mkdir()
226 |
227 | # Create a very long but legitimate path
228 | long_path = "/".join(["verylongdirectoryname" * 10 for _ in range(10)])
229 |
230 | # Should handle long paths gracefully (either allow or reject based on filesystem limits)
231 | try:
232 | result = validate_project_path(long_path, project_path)
233 | # Result can be True or False, just shouldn't crash
234 | assert isinstance(result, bool)
235 | except (ValueError, OSError):
236 | # It's acceptable for very long paths to raise exceptions
237 | pass
238 |
239 | def test_nonexistent_project_path(self):
240 | """Test behavior when project path doesn't exist."""
241 | nonexistent_project = Path("/this/path/does/not/exist")
242 |
243 | # Should still be able to validate relative paths
244 | assert validate_project_path("notes/file.txt", nonexistent_project)
245 | assert not validate_project_path("../../../etc/passwd", nonexistent_project)
246 |
247 | def test_unicode_and_special_characters(self, tmp_path):
248 | """Test paths with Unicode and special characters."""
249 | project_path = tmp_path / "project"
250 | project_path.mkdir()
251 |
252 | unicode_paths = [
253 | "notes/文档.md", # Chinese characters
254 | "docs/résumé.txt", # Accented characters
255 | "files/naïve.md", # Diaeresis
256 | "notes/café.txt", # Acute accent
257 | "docs/日本語.md", # Japanese
258 | "files/αβγ.txt", # Greek
259 | "notes/файл.md", # Cyrillic
260 | ]
261 |
262 | for path in unicode_paths:
263 | try:
264 | result = validate_project_path(path, project_path)
265 | assert isinstance(result, bool), f"Unicode path '{path}' should return boolean"
266 | # Unicode paths should generally be allowed if they don't contain traversal
267 | assert result, f"Unicode path '{path}' should be allowed"
268 | except (UnicodeError, OSError):
269 | # Some unicode handling issues might be acceptable
270 | pass
271 |
272 | def test_case_sensitivity(self, tmp_path):
273 | """Test case sensitivity of traversal detection."""
274 | project_path = tmp_path / "project"
275 | project_path.mkdir()
276 |
277 | # These should all be blocked regardless of case
278 | case_variations = [
279 | "../file.txt",
280 | "../FILE.TXT",
281 | "~/file.txt",
282 | "~/FILE.TXT",
283 | ]
284 |
285 | for path in case_variations:
286 | assert not validate_project_path(path, project_path), (
287 | f"Case variation '{path}' should be blocked"
288 | )
289 |
290 | def test_symbolic_link_behavior(self, tmp_path):
291 | """Test behavior with symbolic links (if supported by filesystem)."""
292 | project_path = tmp_path / "project"
293 | project_path.mkdir()
294 |
295 | # Create a directory outside the project
296 | outside_dir = tmp_path / "outside"
297 | outside_dir.mkdir()
298 |
299 | try:
300 | # Try to create a symlink inside the project pointing outside
301 | symlink_path = project_path / "symlink"
302 | symlink_path.symlink_to(outside_dir)
303 |
304 | # Paths through symlinks should be handled safely
305 | result = validate_project_path("symlink/file.txt", project_path)
306 | # The result can vary based on how pathlib handles symlinks,
307 | # but it shouldn't crash and should be a boolean
308 | assert isinstance(result, bool)
309 |
310 | except (OSError, NotImplementedError):
311 | # Symlinks might not be supported on this filesystem
312 | pytest.skip("Symbolic links not supported on this filesystem")
313 |
314 | def test_relative_path_edge_cases(self, tmp_path):
315 | """Test edge cases in relative path handling."""
316 | project_path = tmp_path / "project"
317 | project_path.mkdir()
318 |
319 | edge_cases = [
320 | ".", # Current directory
321 | "./", # Current directory with slash
322 | "./file.txt", # File in current directory
323 | "./folder/file.txt", # Nested file through current directory
324 | "folder/./file.txt", # Current directory in middle of path
325 | "folder/subfolder/.", # Current directory at end
326 | ]
327 |
328 | for path in edge_cases:
329 | result = validate_project_path(path, project_path)
330 | # These should generally be safe as they don't escape the project
331 | assert result, f"Relative path edge case '{path}' should be allowed"
332 |
333 |
334 | class TestValidateProjectPathPerformance:
335 | """Test performance characteristics of path validation."""
336 |
337 | def test_performance_with_many_paths(self, tmp_path):
338 | """Test that validation performs reasonably with many paths."""
339 | project_path = tmp_path / "project"
340 | project_path.mkdir()
341 |
342 | # Test a mix of safe and dangerous paths
343 | test_paths = []
344 |
345 | # Add safe paths
346 | for i in range(100):
347 | test_paths.append(f"folder{i}/file{i}.txt")
348 |
349 | # Add dangerous paths
350 | for i in range(100):
351 | test_paths.append(f"../../../etc/passwd{i}")
352 |
353 | import time
354 |
355 | start_time = time.time()
356 |
357 | for path in test_paths:
358 | result = validate_project_path(path, project_path)
359 | assert isinstance(result, bool)
360 |
361 | end_time = time.time()
362 |
363 | # Should complete reasonably quickly (adjust threshold as needed)
364 | assert end_time - start_time < 1.0, "Path validation should be fast"
365 |
366 |
367 | class TestValidateProjectPathIntegration:
368 | """Integration tests with real filesystem scenarios."""
369 |
370 | def test_with_actual_filesystem_structure(self, tmp_path):
371 | """Test validation with actual files and directories."""
372 | project_path = tmp_path / "project"
373 | project_path.mkdir()
374 |
375 | # Create some actual files and directories
376 | (project_path / "notes").mkdir()
377 | (project_path / "docs").mkdir()
378 | (project_path / "notes" / "meeting.md").write_text("# Meeting Notes")
379 | (project_path / "docs" / "readme.txt").write_text("README")
380 |
381 | # Test accessing existing files
382 | assert validate_project_path("notes/meeting.md", project_path)
383 | assert validate_project_path("docs/readme.txt", project_path)
384 |
385 | # Test accessing non-existent but safe paths
386 | assert validate_project_path("notes/new-file.md", project_path)
387 | assert validate_project_path("new-folder/file.txt", project_path)
388 |
389 | # Test that attacks are still blocked even with real filesystem
390 | assert not validate_project_path("../../../etc/passwd", project_path)
391 | assert not validate_project_path("notes/../../../etc/passwd", project_path)
392 |
393 | def test_project_path_resolution_accuracy(self, tmp_path):
394 | """Test that path resolution works correctly with real paths."""
395 | # Create a more complex directory structure
396 | base_path = tmp_path / "workspace"
397 | project_path = base_path / "my-project"
398 | sibling_path = base_path / "other-project"
399 |
400 | base_path.mkdir()
401 | project_path.mkdir()
402 | sibling_path.mkdir()
403 |
404 | # Create a sensitive file in the sibling directory
405 | (sibling_path / "secrets.txt").write_text("secret data")
406 |
407 | # Try to access the sibling directory through traversal
408 | attack_path = "../other-project/secrets.txt"
409 | assert not validate_project_path(attack_path, project_path)
410 |
411 | # Verify that legitimate access within project works
412 | assert validate_project_path("my-file.txt", project_path)
413 | assert validate_project_path("subdir/my-file.txt", project_path)
414 |
```