#
tokens: 48164/50000 24/348 files (page 4/23)
lines: on (toggle) GitHub
raw markdown copy reset
This is page 4 of 23. Use http://codebase.md/basicmachines-co/basic-memory?lines=true&page={x} to view the full context.

# Directory Structure

```
├── .claude
│   ├── agents
│   │   ├── python-developer.md
│   │   └── system-architect.md
│   └── commands
│       ├── release
│       │   ├── beta.md
│       │   ├── changelog.md
│       │   ├── release-check.md
│       │   └── release.md
│       ├── spec.md
│       └── test-live.md
├── .dockerignore
├── .github
│   ├── dependabot.yml
│   ├── ISSUE_TEMPLATE
│   │   ├── bug_report.md
│   │   ├── config.yml
│   │   ├── documentation.md
│   │   └── feature_request.md
│   └── workflows
│       ├── claude-code-review.yml
│       ├── claude-issue-triage.yml
│       ├── claude.yml
│       ├── dev-release.yml
│       ├── docker.yml
│       ├── pr-title.yml
│       ├── release.yml
│       └── test.yml
├── .gitignore
├── .python-version
├── CHANGELOG.md
├── CITATION.cff
├── CLA.md
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── docker-compose.yml
├── Dockerfile
├── docs
│   ├── ai-assistant-guide-extended.md
│   ├── character-handling.md
│   ├── cloud-cli.md
│   └── Docker.md
├── justfile
├── LICENSE
├── llms-install.md
├── pyproject.toml
├── README.md
├── SECURITY.md
├── smithery.yaml
├── specs
│   ├── SPEC-1 Specification-Driven Development Process.md
│   ├── SPEC-10 Unified Deployment Workflow and Event Tracking.md
│   ├── SPEC-11 Basic Memory API Performance Optimization.md
│   ├── SPEC-12 OpenTelemetry Observability.md
│   ├── SPEC-13 CLI Authentication with Subscription Validation.md
│   ├── SPEC-14 Cloud Git Versioning & GitHub Backup.md
│   ├── SPEC-14- Cloud Git Versioning & GitHub Backup.md
│   ├── SPEC-15 Configuration Persistence via Tigris for Cloud Tenants.md
│   ├── SPEC-16 MCP Cloud Service Consolidation.md
│   ├── SPEC-17 Semantic Search with ChromaDB.md
│   ├── SPEC-18 AI Memory Management Tool.md
│   ├── SPEC-19 Sync Performance and Memory Optimization.md
│   ├── SPEC-2 Slash Commands Reference.md
│   ├── SPEC-3 Agent Definitions.md
│   ├── SPEC-4 Notes Web UI Component Architecture.md
│   ├── SPEC-5 CLI Cloud Upload via WebDAV.md
│   ├── SPEC-6 Explicit Project Parameter Architecture.md
│   ├── SPEC-7 POC to spike Tigris Turso for local access to cloud data.md
│   ├── SPEC-8 TigrisFS Integration.md
│   ├── SPEC-9 Multi-Project Bidirectional Sync Architecture.md
│   ├── SPEC-9 Signed Header Tenant Information.md
│   └── SPEC-9-1 Follow-Ups- Conflict, Sync, and Observability.md
├── src
│   └── basic_memory
│       ├── __init__.py
│       ├── alembic
│       │   ├── alembic.ini
│       │   ├── env.py
│       │   ├── migrations.py
│       │   ├── script.py.mako
│       │   └── versions
│       │       ├── 3dae7c7b1564_initial_schema.py
│       │       ├── 502b60eaa905_remove_required_from_entity_permalink.py
│       │       ├── 5fe1ab1ccebe_add_projects_table.py
│       │       ├── 647e7a75e2cd_project_constraint_fix.py
│       │       ├── 9d9c1cb7d8f5_add_mtime_and_size_columns_to_entity_.py
│       │       ├── a1b2c3d4e5f6_fix_project_foreign_keys.py
│       │       ├── b3c3938bacdb_relation_to_name_unique_index.py
│       │       ├── cc7172b46608_update_search_index_schema.py
│       │       └── e7e1f4367280_add_scan_watermark_tracking_to_project.py
│       ├── api
│       │   ├── __init__.py
│       │   ├── app.py
│       │   ├── routers
│       │   │   ├── __init__.py
│       │   │   ├── directory_router.py
│       │   │   ├── importer_router.py
│       │   │   ├── knowledge_router.py
│       │   │   ├── management_router.py
│       │   │   ├── memory_router.py
│       │   │   ├── project_router.py
│       │   │   ├── prompt_router.py
│       │   │   ├── resource_router.py
│       │   │   ├── search_router.py
│       │   │   └── utils.py
│       │   └── template_loader.py
│       ├── cli
│       │   ├── __init__.py
│       │   ├── app.py
│       │   ├── auth.py
│       │   ├── commands
│       │   │   ├── __init__.py
│       │   │   ├── cloud
│       │   │   │   ├── __init__.py
│       │   │   │   ├── api_client.py
│       │   │   │   ├── bisync_commands.py
│       │   │   │   ├── cloud_utils.py
│       │   │   │   ├── core_commands.py
│       │   │   │   ├── mount_commands.py
│       │   │   │   ├── rclone_config.py
│       │   │   │   ├── rclone_installer.py
│       │   │   │   ├── upload_command.py
│       │   │   │   └── upload.py
│       │   │   ├── command_utils.py
│       │   │   ├── db.py
│       │   │   ├── import_chatgpt.py
│       │   │   ├── import_claude_conversations.py
│       │   │   ├── import_claude_projects.py
│       │   │   ├── import_memory_json.py
│       │   │   ├── mcp.py
│       │   │   ├── project.py
│       │   │   ├── status.py
│       │   │   ├── sync.py
│       │   │   └── tool.py
│       │   └── main.py
│       ├── config.py
│       ├── db.py
│       ├── deps.py
│       ├── file_utils.py
│       ├── ignore_utils.py
│       ├── importers
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── chatgpt_importer.py
│       │   ├── claude_conversations_importer.py
│       │   ├── claude_projects_importer.py
│       │   ├── memory_json_importer.py
│       │   └── utils.py
│       ├── markdown
│       │   ├── __init__.py
│       │   ├── entity_parser.py
│       │   ├── markdown_processor.py
│       │   ├── plugins.py
│       │   ├── schemas.py
│       │   └── utils.py
│       ├── mcp
│       │   ├── __init__.py
│       │   ├── async_client.py
│       │   ├── project_context.py
│       │   ├── prompts
│       │   │   ├── __init__.py
│       │   │   ├── ai_assistant_guide.py
│       │   │   ├── continue_conversation.py
│       │   │   ├── recent_activity.py
│       │   │   ├── search.py
│       │   │   └── utils.py
│       │   ├── resources
│       │   │   ├── ai_assistant_guide.md
│       │   │   └── project_info.py
│       │   ├── server.py
│       │   └── tools
│       │       ├── __init__.py
│       │       ├── build_context.py
│       │       ├── canvas.py
│       │       ├── chatgpt_tools.py
│       │       ├── delete_note.py
│       │       ├── edit_note.py
│       │       ├── list_directory.py
│       │       ├── move_note.py
│       │       ├── project_management.py
│       │       ├── read_content.py
│       │       ├── read_note.py
│       │       ├── recent_activity.py
│       │       ├── search.py
│       │       ├── utils.py
│       │       ├── view_note.py
│       │       └── write_note.py
│       ├── models
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── knowledge.py
│       │   ├── project.py
│       │   └── search.py
│       ├── repository
│       │   ├── __init__.py
│       │   ├── entity_repository.py
│       │   ├── observation_repository.py
│       │   ├── project_info_repository.py
│       │   ├── project_repository.py
│       │   ├── relation_repository.py
│       │   ├── repository.py
│       │   └── search_repository.py
│       ├── schemas
│       │   ├── __init__.py
│       │   ├── base.py
│       │   ├── cloud.py
│       │   ├── delete.py
│       │   ├── directory.py
│       │   ├── importer.py
│       │   ├── memory.py
│       │   ├── project_info.py
│       │   ├── prompt.py
│       │   ├── request.py
│       │   ├── response.py
│       │   ├── search.py
│       │   └── sync_report.py
│       ├── services
│       │   ├── __init__.py
│       │   ├── context_service.py
│       │   ├── directory_service.py
│       │   ├── entity_service.py
│       │   ├── exceptions.py
│       │   ├── file_service.py
│       │   ├── initialization.py
│       │   ├── link_resolver.py
│       │   ├── project_service.py
│       │   ├── search_service.py
│       │   └── service.py
│       ├── sync
│       │   ├── __init__.py
│       │   ├── background_sync.py
│       │   ├── sync_service.py
│       │   └── watch_service.py
│       ├── templates
│       │   └── prompts
│       │       ├── continue_conversation.hbs
│       │       └── search.hbs
│       └── utils.py
├── test-int
│   ├── BENCHMARKS.md
│   ├── cli
│   │   ├── test_project_commands_integration.py
│   │   ├── test_sync_commands_integration.py
│   │   └── test_version_integration.py
│   ├── conftest.py
│   ├── mcp
│   │   ├── test_build_context_underscore.py
│   │   ├── test_build_context_validation.py
│   │   ├── test_chatgpt_tools_integration.py
│   │   ├── test_default_project_mode_integration.py
│   │   ├── test_delete_note_integration.py
│   │   ├── test_edit_note_integration.py
│   │   ├── test_list_directory_integration.py
│   │   ├── test_move_note_integration.py
│   │   ├── test_project_management_integration.py
│   │   ├── test_project_state_sync_integration.py
│   │   ├── test_read_content_integration.py
│   │   ├── test_read_note_integration.py
│   │   ├── test_search_integration.py
│   │   ├── test_single_project_mcp_integration.py
│   │   └── test_write_note_integration.py
│   ├── test_db_wal_mode.py
│   ├── test_disable_permalinks_integration.py
│   └── test_sync_performance_benchmark.py
├── tests
│   ├── __init__.py
│   ├── api
│   │   ├── conftest.py
│   │   ├── test_async_client.py
│   │   ├── test_continue_conversation_template.py
│   │   ├── test_directory_router.py
│   │   ├── test_importer_router.py
│   │   ├── test_knowledge_router.py
│   │   ├── test_management_router.py
│   │   ├── test_memory_router.py
│   │   ├── test_project_router_operations.py
│   │   ├── test_project_router.py
│   │   ├── test_prompt_router.py
│   │   ├── test_relation_background_resolution.py
│   │   ├── test_resource_router.py
│   │   ├── test_search_router.py
│   │   ├── test_search_template.py
│   │   ├── test_template_loader_helpers.py
│   │   └── test_template_loader.py
│   ├── cli
│   │   ├── conftest.py
│   │   ├── test_bisync_commands.py
│   │   ├── test_cli_tools.py
│   │   ├── test_cloud_authentication.py
│   │   ├── test_cloud_utils.py
│   │   ├── test_ignore_utils.py
│   │   ├── test_import_chatgpt.py
│   │   ├── test_import_claude_conversations.py
│   │   ├── test_import_claude_projects.py
│   │   ├── test_import_memory_json.py
│   │   └── test_upload.py
│   ├── conftest.py
│   ├── db
│   │   └── test_issue_254_foreign_key_constraints.py
│   ├── importers
│   │   ├── test_importer_base.py
│   │   └── test_importer_utils.py
│   ├── markdown
│   │   ├── __init__.py
│   │   ├── test_date_frontmatter_parsing.py
│   │   ├── test_entity_parser_error_handling.py
│   │   ├── test_entity_parser.py
│   │   ├── test_markdown_plugins.py
│   │   ├── test_markdown_processor.py
│   │   ├── test_observation_edge_cases.py
│   │   ├── test_parser_edge_cases.py
│   │   ├── test_relation_edge_cases.py
│   │   └── test_task_detection.py
│   ├── mcp
│   │   ├── conftest.py
│   │   ├── test_obsidian_yaml_formatting.py
│   │   ├── test_permalink_collision_file_overwrite.py
│   │   ├── test_prompts.py
│   │   ├── test_resources.py
│   │   ├── test_tool_build_context.py
│   │   ├── test_tool_canvas.py
│   │   ├── test_tool_delete_note.py
│   │   ├── test_tool_edit_note.py
│   │   ├── test_tool_list_directory.py
│   │   ├── test_tool_move_note.py
│   │   ├── test_tool_read_content.py
│   │   ├── test_tool_read_note.py
│   │   ├── test_tool_recent_activity.py
│   │   ├── test_tool_resource.py
│   │   ├── test_tool_search.py
│   │   ├── test_tool_utils.py
│   │   ├── test_tool_view_note.py
│   │   ├── test_tool_write_note.py
│   │   └── tools
│   │       └── test_chatgpt_tools.py
│   ├── Non-MarkdownFileSupport.pdf
│   ├── repository
│   │   ├── test_entity_repository_upsert.py
│   │   ├── test_entity_repository.py
│   │   ├── test_entity_upsert_issue_187.py
│   │   ├── test_observation_repository.py
│   │   ├── test_project_info_repository.py
│   │   ├── test_project_repository.py
│   │   ├── test_relation_repository.py
│   │   ├── test_repository.py
│   │   ├── test_search_repository_edit_bug_fix.py
│   │   └── test_search_repository.py
│   ├── schemas
│   │   ├── test_base_timeframe_minimum.py
│   │   ├── test_memory_serialization.py
│   │   ├── test_memory_url_validation.py
│   │   ├── test_memory_url.py
│   │   ├── test_schemas.py
│   │   └── test_search.py
│   ├── Screenshot.png
│   ├── services
│   │   ├── test_context_service.py
│   │   ├── test_directory_service.py
│   │   ├── test_entity_service_disable_permalinks.py
│   │   ├── test_entity_service.py
│   │   ├── test_file_service.py
│   │   ├── test_initialization.py
│   │   ├── test_link_resolver.py
│   │   ├── test_project_removal_bug.py
│   │   ├── test_project_service_operations.py
│   │   ├── test_project_service.py
│   │   └── test_search_service.py
│   ├── sync
│   │   ├── test_character_conflicts.py
│   │   ├── test_sync_service_incremental.py
│   │   ├── test_sync_service.py
│   │   ├── test_sync_wikilink_issue.py
│   │   ├── test_tmp_files.py
│   │   ├── test_watch_service_edge_cases.py
│   │   ├── test_watch_service_reload.py
│   │   └── test_watch_service.py
│   ├── test_config.py
│   ├── test_db_migration_deduplication.py
│   ├── test_deps.py
│   ├── test_production_cascade_delete.py
│   └── utils
│       ├── test_file_utils.py
│       ├── test_frontmatter_obsidian_compatible.py
│       ├── test_parse_tags.py
│       ├── test_permalink_formatting.py
│       ├── test_utf8_handling.py
│       └── test_validate_project_path.py
├── uv.lock
├── v0.15.0-RELEASE-DOCS.md
└── v15-docs
    ├── api-performance.md
    ├── background-relations.md
    ├── basic-memory-home.md
    ├── bug-fixes.md
    ├── chatgpt-integration.md
    ├── cloud-authentication.md
    ├── cloud-bisync.md
    ├── cloud-mode-usage.md
    ├── cloud-mount.md
    ├── default-project-mode.md
    ├── env-file-removal.md
    ├── env-var-overrides.md
    ├── explicit-project-parameter.md
    ├── gitignore-integration.md
    ├── project-root-env-var.md
    ├── README.md
    └── sqlite-performance.md
```

# Files

--------------------------------------------------------------------------------
/tests/utils/test_frontmatter_obsidian_compatible.py:
--------------------------------------------------------------------------------

```python
  1 | """Tests for Obsidian-compatible YAML frontmatter formatting."""
  2 | 
  3 | import frontmatter
  4 | 
  5 | from basic_memory.file_utils import dump_frontmatter
  6 | 
  7 | 
  8 | def test_tags_formatted_as_yaml_list():
  9 |     """Test that tags are formatted as YAML list instead of JSON array."""
 10 |     post = frontmatter.Post("Test content")
 11 |     post.metadata["title"] = "Test Note"
 12 |     post.metadata["type"] = "note"
 13 |     post.metadata["tags"] = ["system", "overview", "reference"]
 14 | 
 15 |     result = dump_frontmatter(post)
 16 | 
 17 |     # Should use YAML list format
 18 |     assert "tags:" in result
 19 |     assert "- system" in result
 20 |     assert "- overview" in result
 21 |     assert "- reference" in result
 22 | 
 23 |     # Should NOT use JSON array format
 24 |     assert '["system"' not in result
 25 |     assert '"overview"' not in result
 26 |     assert '"reference"]' not in result
 27 | 
 28 | 
 29 | def test_empty_tags_list():
 30 |     """Test that empty tags list is handled correctly."""
 31 |     post = frontmatter.Post("Test content")
 32 |     post.metadata["title"] = "Test Note"
 33 |     post.metadata["tags"] = []
 34 | 
 35 |     result = dump_frontmatter(post)
 36 | 
 37 |     # Should have empty list representation
 38 |     assert "tags: []" in result
 39 | 
 40 | 
 41 | def test_single_tag():
 42 |     """Test that single tag is still formatted as list."""
 43 |     post = frontmatter.Post("Test content")
 44 |     post.metadata["title"] = "Test Note"
 45 |     post.metadata["tags"] = ["single-tag"]
 46 | 
 47 |     result = dump_frontmatter(post)
 48 | 
 49 |     assert "tags:" in result
 50 |     assert "- single-tag" in result
 51 | 
 52 | 
 53 | def test_no_tags_metadata():
 54 |     """Test that posts without tags work normally."""
 55 |     post = frontmatter.Post("Test content")
 56 |     post.metadata["title"] = "Test Note"
 57 |     post.metadata["type"] = "note"
 58 | 
 59 |     result = dump_frontmatter(post)
 60 | 
 61 |     assert "title: Test Note" in result
 62 |     assert "type: note" in result
 63 |     assert "tags:" not in result
 64 | 
 65 | 
 66 | def test_no_frontmatter():
 67 |     """Test that posts with no frontmatter just return content."""
 68 |     post = frontmatter.Post("Test content only")
 69 | 
 70 |     result = dump_frontmatter(post)
 71 | 
 72 |     assert result == "Test content only"
 73 | 
 74 | 
 75 | def test_complex_tags_with_special_characters():
 76 |     """Test tags with hyphens, underscores, and other valid characters."""
 77 |     post = frontmatter.Post("Test content")
 78 |     post.metadata["title"] = "Test Note"
 79 |     post.metadata["tags"] = ["python-test", "api_integration", "v2.0", "nested/tag"]
 80 | 
 81 |     result = dump_frontmatter(post)
 82 | 
 83 |     assert "- python-test" in result
 84 |     assert "- api_integration" in result
 85 |     assert "- v2.0" in result
 86 |     assert "- nested/tag" in result
 87 | 
 88 | 
 89 | def test_tags_order_preserved():
 90 |     """Test that tag order is preserved in output."""
 91 |     post = frontmatter.Post("Test content")
 92 |     post.metadata["title"] = "Test Note"
 93 |     post.metadata["tags"] = ["zebra", "apple", "banana"]
 94 | 
 95 |     result = dump_frontmatter(post)
 96 | 
 97 |     # Find the positions of each tag in the output
 98 |     zebra_pos = result.find("- zebra")
 99 |     apple_pos = result.find("- apple")
100 |     banana_pos = result.find("- banana")
101 | 
102 |     # They should appear in the same order as input
103 |     assert zebra_pos < apple_pos < banana_pos
104 | 
105 | 
106 | def test_non_tags_lists_also_formatted():
107 |     """Test that other lists in metadata are also formatted properly."""
108 |     post = frontmatter.Post("Test content")
109 |     post.metadata["title"] = "Test Note"
110 |     post.metadata["authors"] = ["John Doe", "Jane Smith"]
111 |     post.metadata["keywords"] = ["AI", "machine learning"]
112 | 
113 |     result = dump_frontmatter(post)
114 | 
115 |     # Authors should be formatted as YAML list
116 |     assert "authors:" in result
117 |     assert "- John Doe" in result
118 |     assert "- Jane Smith" in result
119 | 
120 |     # Keywords should be formatted as YAML list
121 |     assert "keywords:" in result
122 |     assert "- AI" in result
123 |     assert "- machine learning" in result
124 | 
125 | 
126 | def test_mixed_metadata_types():
127 |     """Test that mixed metadata types are handled correctly."""
128 |     post = frontmatter.Post("Test content")
129 |     post.metadata["title"] = "Test Note"
130 |     post.metadata["tags"] = ["tag1", "tag2"]
131 |     post.metadata["created"] = "2024-01-01"
132 |     post.metadata["priority"] = 5
133 |     post.metadata["draft"] = True
134 | 
135 |     result = dump_frontmatter(post)
136 | 
137 |     # Lists should use YAML format
138 |     assert "tags:" in result
139 |     assert "- tag1" in result
140 |     assert "- tag2" in result
141 | 
142 |     # Other types should be normal
143 |     assert "title: Test Note" in result
144 |     assert "created: '2024-01-01'" in result or "created: 2024-01-01" in result
145 |     assert "priority: 5" in result
146 |     assert "draft: true" in result or "draft: True" in result
147 | 
148 | 
149 | def test_empty_content():
150 |     """Test posts with empty content but with frontmatter."""
151 |     post = frontmatter.Post("")
152 |     post.metadata["title"] = "Empty Note"
153 |     post.metadata["tags"] = ["empty", "test"]
154 | 
155 |     result = dump_frontmatter(post)
156 | 
157 |     # Should have frontmatter delimiter
158 |     assert result.startswith("---")
159 |     assert result.endswith("---\n")
160 | 
161 |     # Should have proper tag formatting
162 |     assert "- empty" in result
163 |     assert "- test" in result
164 | 
165 | 
166 | def test_roundtrip_compatibility():
167 |     """Test that the formatted output can be parsed back by frontmatter."""
168 |     original_post = frontmatter.Post("Test content")
169 |     original_post.metadata["title"] = "Test Note"
170 |     original_post.metadata["tags"] = ["system", "test", "obsidian"]
171 |     original_post.metadata["type"] = "note"
172 | 
173 |     # Format with our function
174 |     formatted = dump_frontmatter(original_post)
175 | 
176 |     # Parse it back
177 |     parsed_post = frontmatter.loads(formatted)
178 | 
179 |     # Should have same content and metadata
180 |     assert parsed_post.content == original_post.content
181 |     assert parsed_post.metadata["title"] == original_post.metadata["title"]
182 |     assert parsed_post.metadata["tags"] == original_post.metadata["tags"]
183 |     assert parsed_post.metadata["type"] == original_post.metadata["type"]
184 | 
```

--------------------------------------------------------------------------------
/tests/markdown/test_markdown_processor.py:
--------------------------------------------------------------------------------

```python
  1 | """Tests for MarkdownProcessor.
  2 | 
  3 | Tests focus on the Read -> Modify -> Write pattern and content preservation.
  4 | """
  5 | 
  6 | from datetime import datetime
  7 | from pathlib import Path
  8 | 
  9 | import pytest
 10 | 
 11 | from basic_memory.markdown.markdown_processor import DirtyFileError, MarkdownProcessor
 12 | from basic_memory.markdown.schemas import (
 13 |     EntityFrontmatter,
 14 |     EntityMarkdown,
 15 |     Observation,
 16 |     Relation,
 17 | )
 18 | 
 19 | 
 20 | @pytest.mark.asyncio
 21 | async def test_write_new_minimal_file(markdown_processor: MarkdownProcessor, tmp_path: Path):
 22 |     """Test creating new file with just title."""
 23 |     path = tmp_path / "test.md"
 24 | 
 25 |     # Create minimal markdown schema
 26 |     metadata = {}
 27 |     metadata["title"] = "Test Note"
 28 |     metadata["type"] = "note"
 29 |     metadata["permalink"] = "test"
 30 |     metadata["created"] = datetime(2024, 1, 1)
 31 |     metadata["modified"] = datetime(2024, 1, 1)
 32 |     metadata["tags"] = ["test"]
 33 |     markdown = EntityMarkdown(
 34 |         frontmatter=EntityFrontmatter(
 35 |             metadata=metadata,
 36 |         ),
 37 |         content="",
 38 |     )
 39 | 
 40 |     # Write file
 41 |     await markdown_processor.write_file(path, markdown)
 42 | 
 43 |     # Read back and verify
 44 |     content = path.read_text(encoding="utf-8")
 45 |     assert "---" in content  # Has frontmatter
 46 |     assert "type: note" in content
 47 |     assert "permalink: test" in content
 48 |     assert "# Test Note" in content  # Added title
 49 |     assert "tags:" in content
 50 |     assert "- test" in content
 51 | 
 52 |     # Should not have empty sections
 53 |     assert "## Observations" not in content
 54 |     assert "## Relations" not in content
 55 | 
 56 | 
 57 | @pytest.mark.asyncio
 58 | async def test_write_new_file_with_content(markdown_processor: MarkdownProcessor, tmp_path: Path):
 59 |     """Test creating new file with content and sections."""
 60 |     path = tmp_path / "test.md"
 61 | 
 62 |     # Create markdown with content and sections
 63 |     markdown = EntityMarkdown(
 64 |         frontmatter=EntityFrontmatter(
 65 |             type="note",
 66 |             permalink="test",
 67 |             title="Test Note",
 68 |             created=datetime(2024, 1, 1),
 69 |             modified=datetime(2024, 1, 1),
 70 |         ),
 71 |         content="# Custom Title\n\nMy content here.\nMultiple lines.",
 72 |         observations=[
 73 |             Observation(
 74 |                 content="Test observation #test",
 75 |                 category="tech",
 76 |                 tags=["test"],
 77 |                 context="test context",
 78 |             ),
 79 |         ],
 80 |         relations=[
 81 |             Relation(
 82 |                 type="relates_to",
 83 |                 target="other-note",
 84 |                 context="test relation",
 85 |             ),
 86 |         ],
 87 |     )
 88 | 
 89 |     # Write file
 90 |     await markdown_processor.write_file(path, markdown)
 91 | 
 92 |     # Read back and verify
 93 |     content = path.read_text(encoding="utf-8")
 94 | 
 95 |     # Check content preserved exactly
 96 |     assert "# Custom Title" in content
 97 |     assert "My content here." in content
 98 |     assert "Multiple lines." in content
 99 | 
100 |     # Check sections formatted correctly
101 |     assert "- [tech] Test observation #test (test context)" in content
102 |     assert "- relates_to [[other-note]] (test relation)" in content
103 | 
104 | 
105 | @pytest.mark.asyncio
106 | async def test_update_preserves_content(markdown_processor: MarkdownProcessor, tmp_path: Path):
107 |     """Test that updating file preserves existing content."""
108 |     path = tmp_path / "test.md"
109 | 
110 |     # Create initial file
111 |     initial = EntityMarkdown(
112 |         frontmatter=EntityFrontmatter(
113 |             type="note",
114 |             permalink="test",
115 |             title="Test Note",
116 |             created=datetime(2024, 1, 1),
117 |             modified=datetime(2024, 1, 1),
118 |         ),
119 |         content="# My Note\n\nOriginal content here.",
120 |         observations=[
121 |             Observation(content="First observation", category="note"),
122 |         ],
123 |     )
124 | 
125 |     checksum = await markdown_processor.write_file(path, initial)
126 | 
127 |     # Update with new observation
128 |     updated = EntityMarkdown(
129 |         frontmatter=initial.frontmatter,
130 |         content=initial.content,  # Preserve original content
131 |         observations=[
132 |             initial.observations[0],  # Keep original observation
133 |             Observation(content="Second observation", category="tech"),  # Add new one
134 |         ],
135 |     )
136 | 
137 |     # Update file
138 |     await markdown_processor.write_file(path, updated, expected_checksum=checksum)
139 | 
140 |     # Read back and verify
141 |     result = await markdown_processor.read_file(path)
142 | 
143 |     # Original content preserved
144 |     assert "Original content here." in result.content
145 | 
146 |     # Both observations present
147 |     assert len(result.observations) == 2
148 |     assert any(o.content == "First observation" for o in result.observations)
149 |     assert any(o.content == "Second observation" for o in result.observations)
150 | 
151 | 
152 | @pytest.mark.asyncio
153 | async def test_dirty_file_detection(markdown_processor: MarkdownProcessor, tmp_path: Path):
154 |     """Test detection of file modifications."""
155 |     path = tmp_path / "test.md"
156 | 
157 |     # Create initial file
158 |     initial = EntityMarkdown(
159 |         frontmatter=EntityFrontmatter(
160 |             type="note",
161 |             permalink="test",
162 |             title="Test Note",
163 |             created=datetime(2024, 1, 1),
164 |             modified=datetime(2024, 1, 1),
165 |         ),
166 |         content="Initial content",
167 |     )
168 | 
169 |     checksum = await markdown_processor.write_file(path, initial)
170 | 
171 |     # Modify file directly
172 |     path.write_text(path.read_text(encoding="utf-8") + "\nModified!")
173 | 
174 |     # Try to update with old checksum
175 |     update = EntityMarkdown(
176 |         frontmatter=initial.frontmatter,
177 |         content="New content",
178 |     )
179 | 
180 |     # Should raise DirtyFileError
181 |     with pytest.raises(DirtyFileError):
182 |         await markdown_processor.write_file(path, update, expected_checksum=checksum)
183 | 
184 |     # Should succeed without checksum
185 |     new_checksum = await markdown_processor.write_file(path, update)
186 |     assert new_checksum != checksum
187 | 
```

--------------------------------------------------------------------------------
/src/basic_memory/mcp/prompts/utils.py:
--------------------------------------------------------------------------------

```python
  1 | """Utility functions for formatting prompt responses.
  2 | 
  3 | These utilities help format data from various tools into consistent,
  4 | user-friendly markdown summaries.
  5 | """
  6 | 
  7 | from dataclasses import dataclass
  8 | from textwrap import dedent
  9 | from typing import List
 10 | 
 11 | from basic_memory.schemas.base import TimeFrame
 12 | from basic_memory.schemas.memory import (
 13 |     normalize_memory_url,
 14 |     EntitySummary,
 15 |     RelationSummary,
 16 |     ObservationSummary,
 17 | )
 18 | 
 19 | 
 20 | @dataclass
 21 | class PromptContextItem:
 22 |     primary_results: List[EntitySummary]
 23 |     related_results: List[EntitySummary | RelationSummary | ObservationSummary]
 24 | 
 25 | 
 26 | @dataclass
 27 | class PromptContext:
 28 |     timeframe: TimeFrame
 29 |     topic: str
 30 |     results: List[PromptContextItem]
 31 | 
 32 | 
 33 | def format_prompt_context(context: PromptContext) -> str:
 34 |     """Format continuation context into a helpful summary.
 35 |     Returns:
 36 |         Formatted continuation summary
 37 |     """
 38 |     if not context.results:  # pragma: no cover
 39 |         return dedent(f"""
 40 |             # Continuing conversation on: {context.topic}
 41 | 
 42 |             This is a memory retrieval session. 
 43 |             The supplied query did not return any information specifically on this topic.
 44 |             
 45 |             ## Opportunity to Capture New Knowledge!
 46 |             
 47 |             This is an excellent chance to start documenting this topic:
 48 |             
 49 |             ```python
 50 |             await write_note(
 51 |                 title="{context.topic}",
 52 |                 content=f'''
 53 |                 # {context.topic}
 54 |                 
 55 |                 ## Overview
 56 |                 [Summary of what we know about {context.topic}]
 57 |                 
 58 |                 ## Key Points
 59 |                 [Main aspects or components of {context.topic}]
 60 |                 
 61 |                 ## Observations
 62 |                 - [category] [First important observation about {context.topic}]
 63 |                 - [category] [Second observation about {context.topic}]
 64 |                 
 65 |                 ## Relations
 66 |                 - relates_to [[Related Topic]]
 67 |                 - part_of [[Broader Context]]
 68 |                 '''
 69 |             )
 70 |             ```
 71 |             
 72 |             ## Other Options
 73 |             
 74 |             Please use the available basic-memory tools to gather relevant context before responding.
 75 |             You can also:
 76 |             - Try a different search term
 77 |             - Check recent activity with `recent_activity(timeframe="1w")`
 78 |             """)
 79 | 
 80 |     # Start building our summary with header - add knowledge capture emphasis
 81 |     summary = dedent(f"""
 82 |         # Continuing conversation on: {context.topic}
 83 | 
 84 |         This is a memory retrieval session. 
 85 |         
 86 |         Please use the available basic-memory tools to gather relevant context before responding. 
 87 |         Start by executing one of the suggested commands below to retrieve content.
 88 | 
 89 |         Here's what I found from previous conversations:
 90 |         
 91 |         > **Knowledge Capture Recommendation:** As you continue this conversation, actively look for opportunities to record new information, decisions, or insights that emerge. Use `write_note()` to document important context.
 92 |         """)
 93 | 
 94 |     # Track what we've added to avoid duplicates
 95 |     added_permalinks = set()
 96 |     sections = []
 97 | 
 98 |     # Process each context
 99 |     for context in context.results:  # pyright: ignore
100 |         for primary in context.primary_results:  # pyright: ignore
101 |             if primary.permalink not in added_permalinks:
102 |                 primary_permalink = primary.permalink
103 | 
104 |                 added_permalinks.add(primary_permalink)
105 | 
106 |                 # Use permalink if available, otherwise use file_path
107 |                 if primary_permalink:
108 |                     memory_url = normalize_memory_url(primary_permalink)
109 |                     read_command = f'read_note("{primary_permalink}")'
110 |                 else:
111 |                     memory_url = f"file://{primary.file_path}"
112 |                     read_command = f'read_file("{primary.file_path}")'
113 | 
114 |                 section = dedent(f"""
115 |                     --- {memory_url}
116 | 
117 |                     ## {primary.title}
118 |                     - **Type**: {primary.type}
119 |                     """)
120 | 
121 |                 # Add creation date
122 |                 section += f"- **Created**: {primary.created_at.strftime('%Y-%m-%d %H:%M')}\n"
123 | 
124 |                 # Add content snippet
125 |                 if hasattr(primary, "content") and primary.content:  # pyright: ignore
126 |                     content = primary.content or ""  # pyright: ignore
127 |                     if content:
128 |                         section += f"\n**Excerpt**:\n{content}\n"
129 | 
130 |                 section += dedent(f"""
131 | 
132 |                     You can read this document with: `{read_command}`
133 |                     """)
134 |                 sections.append(section)
135 | 
136 |         if context.related_results:  # pyright: ignore
137 |             section += dedent(  # pyright: ignore
138 |                 """   
139 |                 ## Related Context
140 |                 """
141 |             )
142 | 
143 |             for related in context.related_results:  # pyright: ignore
144 |                 section_content = dedent(f"""
145 |                     - type: **{related.type}**
146 |                     - title: {related.title}
147 |                     """)
148 |                 if related.permalink:  # pragma: no cover
149 |                     section_content += (
150 |                         f'You can view this document with: `read_note("{related.permalink}")`'
151 |                     )
152 |                 else:  # pragma: no cover
153 |                     section_content += (
154 |                         f'You can view this file with: `read_file("{related.file_path}")`'
155 |                     )
156 | 
157 |                 section += section_content
158 |                 sections.append(section)
159 | 
160 |     # Add all sections
161 |     summary += "\n".join(sections)
162 |     return summary
163 | 
```

--------------------------------------------------------------------------------
/tests/services/test_project_removal_bug.py:
--------------------------------------------------------------------------------

```python
  1 | """Test for project removal bug #254."""
  2 | 
  3 | import os
  4 | import tempfile
  5 | from datetime import timezone, datetime
  6 | from pathlib import Path
  7 | 
  8 | import pytest
  9 | 
 10 | from basic_memory.services.project_service import ProjectService
 11 | 
 12 | 
 13 | @pytest.mark.asyncio
 14 | async def test_remove_project_with_related_entities(project_service: ProjectService):
 15 |     """Test removing a project that has related entities (reproduces issue #254).
 16 | 
 17 |     This test verifies that projects with related entities (entities, observations, relations)
 18 |     can be properly deleted without foreign key constraint violations.
 19 | 
 20 |     The bug was caused by missing foreign key constraints with CASCADE DELETE after
 21 |     the project table was recreated in migration 647e7a75e2cd.
 22 |     """
 23 |     test_project_name = f"test-remove-with-entities-{os.urandom(4).hex()}"
 24 |     with tempfile.TemporaryDirectory() as temp_dir:
 25 |         test_root = Path(temp_dir)
 26 |         test_project_path = str(test_root / "test-remove-with-entities")
 27 | 
 28 |         # Make sure the test directory exists
 29 |         os.makedirs(test_project_path, exist_ok=True)
 30 | 
 31 |         try:
 32 |             # Step 1: Add the test project
 33 |             await project_service.add_project(test_project_name, test_project_path)
 34 | 
 35 |             # Verify project exists
 36 |             project = await project_service.get_project(test_project_name)
 37 |             assert project is not None
 38 | 
 39 |             # Step 2: Create related entities for this project
 40 |             from basic_memory.repository.entity_repository import EntityRepository
 41 | 
 42 |             entity_repo = EntityRepository(
 43 |                 project_service.repository.session_maker, project_id=project.id
 44 |             )
 45 | 
 46 |             entity_data = {
 47 |                 "title": "Test Entity for Deletion",
 48 |                 "entity_type": "note",
 49 |                 "content_type": "text/markdown",
 50 |                 "project_id": project.id,
 51 |                 "permalink": "test-deletion-entity",
 52 |                 "file_path": "test-deletion-entity.md",
 53 |                 "checksum": "test123",
 54 |                 "created_at": datetime.now(timezone.utc),
 55 |                 "updated_at": datetime.now(timezone.utc),
 56 |             }
 57 |             entity = await entity_repo.create(entity_data)
 58 |             assert entity is not None
 59 | 
 60 |             # Step 3: Create observations for the entity
 61 |             from basic_memory.repository.observation_repository import ObservationRepository
 62 | 
 63 |             obs_repo = ObservationRepository(
 64 |                 project_service.repository.session_maker, project_id=project.id
 65 |             )
 66 | 
 67 |             observation_data = {
 68 |                 "entity_id": entity.id,
 69 |                 "content": "This is a test observation",
 70 |                 "category": "note",
 71 |             }
 72 |             observation = await obs_repo.create(observation_data)
 73 |             assert observation is not None
 74 | 
 75 |             # Step 4: Create relations involving the entity
 76 |             from basic_memory.repository.relation_repository import RelationRepository
 77 | 
 78 |             rel_repo = RelationRepository(
 79 |                 project_service.repository.session_maker, project_id=project.id
 80 |             )
 81 | 
 82 |             relation_data = {
 83 |                 "from_id": entity.id,
 84 |                 "to_name": "some-target-entity",
 85 |                 "relation_type": "relates-to",
 86 |             }
 87 |             relation = await rel_repo.create(relation_data)
 88 |             assert relation is not None
 89 | 
 90 |             # Step 5: Attempt to remove the project
 91 |             # This should work with proper cascade delete, or fail with foreign key constraint
 92 |             await project_service.remove_project(test_project_name)
 93 | 
 94 |             # Step 6: Verify everything was properly deleted
 95 | 
 96 |             # Project should be gone
 97 |             removed_project = await project_service.get_project(test_project_name)
 98 |             assert removed_project is None, "Project should have been removed"
 99 | 
100 |             # Related entities should be cascade deleted
101 |             remaining_entity = await entity_repo.find_by_id(entity.id)
102 |             assert remaining_entity is None, "Entity should have been cascade deleted"
103 | 
104 |             # Observations should be cascade deleted
105 |             remaining_obs = await obs_repo.find_by_id(observation.id)
106 |             assert remaining_obs is None, "Observation should have been cascade deleted"
107 | 
108 |             # Relations should be cascade deleted
109 |             remaining_rel = await rel_repo.find_by_id(relation.id)
110 |             assert remaining_rel is None, "Relation should have been cascade deleted"
111 | 
112 |         except Exception as e:
113 |             # Check if this is the specific foreign key constraint error from the bug report
114 |             if "FOREIGN KEY constraint failed" in str(e):
115 |                 pytest.fail(
116 |                     f"Bug #254 reproduced: {e}. "
117 |                     "This indicates missing foreign key constraints with CASCADE DELETE. "
118 |                     "Run migration a1b2c3d4e5f6_fix_project_foreign_keys.py to fix this."
119 |                 )
120 |             else:
121 |                 # Re-raise other unexpected errors
122 |                 raise e
123 | 
124 |         finally:
125 |             # Clean up - remove project if it still exists
126 |             if test_project_name in project_service.projects:
127 |                 try:
128 |                     await project_service.remove_project(test_project_name)
129 |                 except Exception:
130 |                     # Manual cleanup if remove_project fails
131 |                     try:
132 |                         project_service.config_manager.remove_project(test_project_name)
133 |                     except Exception:
134 |                         pass
135 | 
136 |                     project = await project_service.get_project(test_project_name)
137 |                     if project:
138 |                         await project_service.repository.delete(project.id)
139 | 
```

--------------------------------------------------------------------------------
/src/basic_memory/importers/claude_conversations_importer.py:
--------------------------------------------------------------------------------

```python
  1 | """Claude conversations import service for Basic Memory."""
  2 | 
  3 | import logging
  4 | from datetime import datetime
  5 | from pathlib import Path
  6 | from typing import Any, Dict, List
  7 | 
  8 | from basic_memory.markdown.schemas import EntityFrontmatter, EntityMarkdown
  9 | from basic_memory.importers.base import Importer
 10 | from basic_memory.schemas.importer import ChatImportResult
 11 | from basic_memory.importers.utils import clean_filename, format_timestamp
 12 | 
 13 | logger = logging.getLogger(__name__)
 14 | 
 15 | 
 16 | class ClaudeConversationsImporter(Importer[ChatImportResult]):
 17 |     """Service for importing Claude conversations."""
 18 | 
 19 |     async def import_data(
 20 |         self, source_data, destination_folder: str, **kwargs: Any
 21 |     ) -> ChatImportResult:
 22 |         """Import conversations from Claude JSON export.
 23 | 
 24 |         Args:
 25 |             source_data: Path to the Claude conversations.json file.
 26 |             destination_folder: Destination folder within the project.
 27 |             **kwargs: Additional keyword arguments.
 28 | 
 29 |         Returns:
 30 |             ChatImportResult containing statistics and status of the import.
 31 |         """
 32 |         try:
 33 |             # Ensure the destination folder exists
 34 |             folder_path = self.ensure_folder_exists(destination_folder)
 35 | 
 36 |             conversations = source_data
 37 | 
 38 |             # Process each conversation
 39 |             messages_imported = 0
 40 |             chats_imported = 0
 41 | 
 42 |             for chat in conversations:
 43 |                 # Convert to entity
 44 |                 entity = self._format_chat_content(
 45 |                     base_path=folder_path,
 46 |                     name=chat["name"],
 47 |                     messages=chat["chat_messages"],
 48 |                     created_at=chat["created_at"],
 49 |                     modified_at=chat["updated_at"],
 50 |                 )
 51 | 
 52 |                 # Write file
 53 |                 file_path = self.base_path / Path(f"{entity.frontmatter.metadata['permalink']}.md")
 54 |                 await self.write_entity(entity, file_path)
 55 | 
 56 |                 chats_imported += 1
 57 |                 messages_imported += len(chat["chat_messages"])
 58 | 
 59 |             return ChatImportResult(
 60 |                 import_count={"conversations": chats_imported, "messages": messages_imported},
 61 |                 success=True,
 62 |                 conversations=chats_imported,
 63 |                 messages=messages_imported,
 64 |             )
 65 | 
 66 |         except Exception as e:  # pragma: no cover
 67 |             logger.exception("Failed to import Claude conversations")
 68 |             return self.handle_error("Failed to import Claude conversations", e)  # pyright: ignore [reportReturnType]
 69 | 
 70 |     def _format_chat_content(
 71 |         self,
 72 |         base_path: Path,
 73 |         name: str,
 74 |         messages: List[Dict[str, Any]],
 75 |         created_at: str,
 76 |         modified_at: str,
 77 |     ) -> EntityMarkdown:
 78 |         """Convert chat messages to Basic Memory entity format.
 79 | 
 80 |         Args:
 81 |             base_path: Base path for the entity.
 82 |             name: Chat name.
 83 |             messages: List of chat messages.
 84 |             created_at: Creation timestamp.
 85 |             modified_at: Modification timestamp.
 86 | 
 87 |         Returns:
 88 |             EntityMarkdown instance representing the conversation.
 89 |         """
 90 |         # Generate permalink
 91 |         date_prefix = datetime.fromisoformat(created_at.replace("Z", "+00:00")).strftime("%Y%m%d")
 92 |         clean_title = clean_filename(name)
 93 |         permalink = f"{base_path.name}/{date_prefix}-{clean_title}"
 94 | 
 95 |         # Format content
 96 |         content = self._format_chat_markdown(
 97 |             name=name,
 98 |             messages=messages,
 99 |             created_at=created_at,
100 |             modified_at=modified_at,
101 |             permalink=permalink,
102 |         )
103 | 
104 |         # Create entity
105 |         entity = EntityMarkdown(
106 |             frontmatter=EntityFrontmatter(
107 |                 metadata={
108 |                     "type": "conversation",
109 |                     "title": name,
110 |                     "created": created_at,
111 |                     "modified": modified_at,
112 |                     "permalink": permalink,
113 |                 }
114 |             ),
115 |             content=content,
116 |         )
117 | 
118 |         return entity
119 | 
120 |     def _format_chat_markdown(
121 |         self,
122 |         name: str,
123 |         messages: List[Dict[str, Any]],
124 |         created_at: str,
125 |         modified_at: str,
126 |         permalink: str,
127 |     ) -> str:
128 |         """Format chat as clean markdown.
129 | 
130 |         Args:
131 |             name: Chat name.
132 |             messages: List of chat messages.
133 |             created_at: Creation timestamp.
134 |             modified_at: Modification timestamp.
135 |             permalink: Permalink for the entity.
136 | 
137 |         Returns:
138 |             Formatted markdown content.
139 |         """
140 |         # Start with frontmatter and title
141 |         lines = [
142 |             f"# {name}\n",
143 |         ]
144 | 
145 |         # Add messages
146 |         for msg in messages:
147 |             # Format timestamp
148 |             ts = format_timestamp(msg["created_at"])
149 | 
150 |             # Add message header
151 |             lines.append(f"### {msg['sender'].title()} ({ts})")
152 | 
153 |             # Handle message content
154 |             content = msg.get("text", "")
155 |             if msg.get("content"):
156 |                 # Filter out None values before joining
157 |                 content = " ".join(
158 |                     str(c.get("text", ""))
159 |                     for c in msg["content"]
160 |                     if c and c.get("text") is not None
161 |                 )
162 |             lines.append(content)
163 | 
164 |             # Handle attachments
165 |             attachments = msg.get("attachments", [])
166 |             for attachment in attachments:
167 |                 if "file_name" in attachment:
168 |                     lines.append(f"\n**Attachment: {attachment['file_name']}**")
169 |                     if "extracted_content" in attachment:
170 |                         lines.append("```")
171 |                         lines.append(attachment["extracted_content"])
172 |                         lines.append("```")
173 | 
174 |             # Add spacing between messages
175 |             lines.append("")
176 | 
177 |         return "\n".join(lines)
178 | 
```

--------------------------------------------------------------------------------
/tests/sync/test_tmp_files.py:
--------------------------------------------------------------------------------

```python
  1 | """Test proper handling of .tmp files during sync."""
  2 | 
  3 | import asyncio
  4 | from pathlib import Path
  5 | 
  6 | import pytest
  7 | from watchfiles import Change
  8 | 
  9 | 
 10 | async def create_test_file(path: Path, content: str = "test content") -> None:
 11 |     """Create a test file with given content."""
 12 |     path.parent.mkdir(parents=True, exist_ok=True)
 13 |     path.write_text(content)
 14 | 
 15 | 
 16 | @pytest.mark.asyncio
 17 | async def test_temp_file_filter(watch_service, app_config, project_config, test_project):
 18 |     """Test that .tmp files are correctly filtered out."""
 19 |     # Test filter_changes method directly
 20 |     tmp_path = Path(test_project.path) / "test.tmp"
 21 |     assert not watch_service.filter_changes(Change.added, str(tmp_path))
 22 | 
 23 |     # Test with valid file
 24 |     valid_path = Path(test_project.path) / "test.md"
 25 |     assert watch_service.filter_changes(Change.added, str(valid_path))
 26 | 
 27 | 
 28 | @pytest.mark.asyncio
 29 | async def test_handle_tmp_files(watch_service, project_config, test_project, sync_service):
 30 |     """Test handling of .tmp files during sync process."""
 31 |     project_dir = Path(test_project.path)
 32 | 
 33 |     # Create a .tmp file - this simulates a file being written with write_file_atomic
 34 |     tmp_file = project_dir / "test.tmp"
 35 |     await create_test_file(tmp_file, "This is a temporary file")
 36 | 
 37 |     # Create the target final file
 38 |     final_file = project_dir / "test.md"
 39 |     await create_test_file(final_file, "This is the final file")
 40 | 
 41 |     # Setup changes that include both the .tmp and final file
 42 |     changes = {
 43 |         (Change.added, str(tmp_file)),
 44 |         (Change.added, str(final_file)),
 45 |     }
 46 | 
 47 |     # Handle changes
 48 |     await watch_service.handle_changes(test_project, changes)
 49 | 
 50 |     # Verify only the final file got an entity
 51 |     tmp_entity = await sync_service.entity_repository.get_by_file_path("test.tmp")
 52 |     final_entity = await sync_service.entity_repository.get_by_file_path("test.md")
 53 | 
 54 |     assert tmp_entity is None, "Temp file should not have an entity"
 55 |     assert final_entity is not None, "Final file should have an entity"
 56 | 
 57 | 
 58 | @pytest.mark.asyncio
 59 | async def test_atomic_write_tmp_file_handling(
 60 |     watch_service, project_config, test_project, sync_service
 61 | ):
 62 |     """Test handling of file changes during atomic write operations."""
 63 |     project_dir = project_config.home
 64 | 
 65 |     # This test simulates the full atomic write process:
 66 |     # 1. First a .tmp file is created
 67 |     # 2. Then the .tmp file is renamed to the final file
 68 |     # 3. Both events are processed by the watch service
 69 | 
 70 |     # Setup file paths
 71 |     tmp_path = project_dir / "document.tmp"
 72 |     final_path = project_dir / "document.md"
 73 | 
 74 |     # Create mockup of the atomic write process
 75 |     await create_test_file(tmp_path, "Content for document")
 76 | 
 77 |     # First batch of changes - .tmp file created
 78 |     changes1 = {(Change.added, str(tmp_path))}
 79 | 
 80 |     # Process first batch
 81 |     await watch_service.handle_changes(test_project, changes1)
 82 | 
 83 |     # Now "replace" the temp file with the final file
 84 |     tmp_path.rename(final_path)
 85 | 
 86 |     # Second batch of changes - .tmp file deleted, final file added
 87 |     changes2 = {(Change.deleted, str(tmp_path)), (Change.added, str(final_path))}
 88 | 
 89 |     # Process second batch
 90 |     await watch_service.handle_changes(test_project, changes2)
 91 | 
 92 |     # Verify only the final file is in the database
 93 |     tmp_entity = await sync_service.entity_repository.get_by_file_path("document.tmp")
 94 |     final_entity = await sync_service.entity_repository.get_by_file_path("document.md")
 95 | 
 96 |     assert tmp_entity is None, "Temp file should not have an entity"
 97 |     assert final_entity is not None, "Final file should have an entity"
 98 | 
 99 |     # Check events
100 |     new_events = [e for e in watch_service.state.recent_events if e.action == "new"]
101 |     assert len(new_events) == 1
102 |     assert new_events[0].path == "document.md"
103 | 
104 | 
105 | @pytest.mark.asyncio
106 | async def test_rapid_atomic_writes(watch_service, project_config, test_project, sync_service):
107 |     """Test handling of rapid atomic writes to the same destination."""
108 |     project_dir = Path(test_project.path)
109 | 
110 |     # This test simulates multiple rapid atomic writes to the same file:
111 |     # 1. Several .tmp files are created one after another
112 |     # 2. Each is then renamed to the same final file
113 |     # 3. Events are batched and processed together
114 | 
115 |     # Setup file paths
116 |     tmp1_path = project_dir / "document.1.tmp"
117 |     tmp2_path = project_dir / "document.2.tmp"
118 |     final_path = project_dir / "document.md"
119 | 
120 |     # Create multiple temp files that will be used in sequence
121 |     await create_test_file(tmp1_path, "First version")
122 |     await create_test_file(tmp2_path, "Second version")
123 | 
124 |     # Simulate the first atomic write
125 |     tmp1_path.replace(final_path)
126 | 
127 |     # Brief pause to ensure file system registers the change
128 |     await asyncio.sleep(0.1)
129 | 
130 |     # Read content to verify
131 |     content1 = final_path.read_text(encoding="utf-8")
132 |     assert content1 == "First version"
133 | 
134 |     # Simulate the second atomic write
135 |     tmp2_path.replace(final_path)
136 | 
137 |     # Verify content was updated
138 |     content2 = final_path.read_text(encoding="utf-8")
139 |     assert content2 == "Second version"
140 | 
141 |     # Create a batch of changes that might arrive in mixed order
142 |     changes = {
143 |         (Change.added, str(tmp1_path)),
144 |         (Change.deleted, str(tmp1_path)),
145 |         (Change.added, str(tmp2_path)),
146 |         (Change.deleted, str(tmp2_path)),
147 |         (Change.added, str(final_path)),
148 |         (Change.modified, str(final_path)),
149 |     }
150 | 
151 |     # Process all changes
152 |     await watch_service.handle_changes(test_project, changes)
153 | 
154 |     # Verify only the final file is in the database
155 |     final_entity = await sync_service.entity_repository.get_by_file_path("document.md")
156 |     assert final_entity is not None
157 | 
158 |     # Also verify no tmp entities were created
159 |     tmp1_entity = await sync_service.entity_repository.get_by_file_path("document.1.tmp")
160 |     tmp2_entity = await sync_service.entity_repository.get_by_file_path("document.2.tmp")
161 |     assert tmp1_entity is None
162 |     assert tmp2_entity is None
163 | 
```

--------------------------------------------------------------------------------
/src/basic_memory/mcp/tools/list_directory.py:
--------------------------------------------------------------------------------

```python
  1 | """List directory tool for Basic Memory MCP server."""
  2 | 
  3 | from typing import Optional
  4 | 
  5 | from loguru import logger
  6 | from fastmcp import Context
  7 | 
  8 | from basic_memory.mcp.async_client import get_client
  9 | from basic_memory.mcp.project_context import get_active_project
 10 | from basic_memory.mcp.server import mcp
 11 | from basic_memory.mcp.tools.utils import call_get
 12 | 
 13 | 
 14 | @mcp.tool(
 15 |     description="List directory contents with filtering and depth control.",
 16 | )
 17 | async def list_directory(
 18 |     dir_name: str = "/",
 19 |     depth: int = 1,
 20 |     file_name_glob: Optional[str] = None,
 21 |     project: Optional[str] = None,
 22 |     context: Context | None = None,
 23 | ) -> str:
 24 |     """List directory contents from the knowledge base with optional filtering.
 25 | 
 26 |     This tool provides 'ls' functionality for browsing the knowledge base directory structure.
 27 |     It can list immediate children or recursively explore subdirectories with depth control,
 28 |     and supports glob pattern filtering for finding specific files.
 29 | 
 30 |     Args:
 31 |         dir_name: Directory path to list (default: root "/")
 32 |                  Examples: "/", "/projects", "/research/ml"
 33 |         depth: Recursion depth (1-10, default: 1 for immediate children only)
 34 |                Higher values show subdirectory contents recursively
 35 |         file_name_glob: Optional glob pattern for filtering file names
 36 |                        Examples: "*.md", "*meeting*", "project_*"
 37 |         project: Project name to list directory from. Optional - server will resolve using hierarchy.
 38 |                 If unknown, use list_memory_projects() to discover available projects.
 39 |         context: Optional FastMCP context for performance caching.
 40 | 
 41 |     Returns:
 42 |         Formatted listing of directory contents with file metadata
 43 | 
 44 |     Examples:
 45 |         # List root directory contents
 46 |         list_directory()
 47 | 
 48 |         # List specific folder
 49 |         list_directory(dir_name="/projects")
 50 | 
 51 |         # Find all markdown files
 52 |         list_directory(file_name_glob="*.md")
 53 | 
 54 |         # Deep exploration of research folder
 55 |         list_directory(dir_name="/research", depth=3)
 56 | 
 57 |         # Find meeting notes in projects folder
 58 |         list_directory(dir_name="/projects", file_name_glob="*meeting*")
 59 | 
 60 |         # Explicit project specification
 61 |         list_directory(project="work-docs", dir_name="/projects")
 62 | 
 63 |     Raises:
 64 |         ToolError: If project doesn't exist or directory path is invalid
 65 |     """
 66 |     async with get_client() as client:
 67 |         active_project = await get_active_project(client, project, context)
 68 |         project_url = active_project.project_url
 69 | 
 70 |         # Prepare query parameters
 71 |         params = {
 72 |             "dir_name": dir_name,
 73 |             "depth": str(depth),
 74 |         }
 75 |         if file_name_glob:
 76 |             params["file_name_glob"] = file_name_glob
 77 | 
 78 |         logger.debug(
 79 |             f"Listing directory '{dir_name}' in project {project} with depth={depth}, glob='{file_name_glob}'"
 80 |         )
 81 | 
 82 |         # Call the API endpoint
 83 |         response = await call_get(
 84 |             client,
 85 |             f"{project_url}/directory/list",
 86 |             params=params,
 87 |         )
 88 | 
 89 |         nodes = response.json()
 90 | 
 91 |         if not nodes:
 92 |             filter_desc = ""
 93 |             if file_name_glob:
 94 |                 filter_desc = f" matching '{file_name_glob}'"
 95 |             return f"No files found in directory '{dir_name}'{filter_desc}"
 96 | 
 97 |         # Format the results
 98 |         output_lines = []
 99 |         if file_name_glob:
100 |             output_lines.append(
101 |                 f"Files in '{dir_name}' matching '{file_name_glob}' (depth {depth}):"
102 |             )
103 |         else:
104 |             output_lines.append(f"Contents of '{dir_name}' (depth {depth}):")
105 |         output_lines.append("")
106 | 
107 |         # Group by type and sort
108 |         directories = [n for n in nodes if n["type"] == "directory"]
109 |         files = [n for n in nodes if n["type"] == "file"]
110 | 
111 |         # Sort by name
112 |         directories.sort(key=lambda x: x["name"])
113 |         files.sort(key=lambda x: x["name"])
114 | 
115 |         # Display directories first
116 |         for node in directories:
117 |             path_display = node["directory_path"]
118 |             output_lines.append(f"📁 {node['name']:<30} {path_display}")
119 | 
120 |         # Add separator if we have both directories and files
121 |         if directories and files:
122 |             output_lines.append("")
123 | 
124 |         # Display files with metadata
125 |         for node in files:
126 |             path_display = node["directory_path"]
127 |             title = node.get("title", "")
128 |             updated = node.get("updated_at", "")
129 | 
130 |             # Remove leading slash if present, requesting the file via read_note does not use the beginning slash'
131 |             if path_display.startswith("/"):
132 |                 path_display = path_display[1:]
133 | 
134 |             # Format date if available
135 |             date_str = ""
136 |             if updated:
137 |                 try:
138 |                     from datetime import datetime
139 | 
140 |                     dt = datetime.fromisoformat(updated.replace("Z", "+00:00"))
141 |                     date_str = dt.strftime("%Y-%m-%d")
142 |                 except Exception:  # pragma: no cover
143 |                     date_str = updated[:10] if len(updated) >= 10 else ""
144 | 
145 |             # Create formatted line
146 |             file_line = f"📄 {node['name']:<30} {path_display}"
147 |             if title and title != node["name"]:
148 |                 file_line += f" | {title}"
149 |             if date_str:
150 |                 file_line += f" | {date_str}"
151 | 
152 |             output_lines.append(file_line)
153 | 
154 |         # Add summary
155 |         output_lines.append("")
156 |         total_count = len(directories) + len(files)
157 |         summary_parts = []
158 |         if directories:
159 |             summary_parts.append(
160 |                 f"{len(directories)} director{'y' if len(directories) == 1 else 'ies'}"
161 |             )
162 |         if files:
163 |             summary_parts.append(f"{len(files)} file{'s' if len(files) != 1 else ''}")
164 | 
165 |         output_lines.append(f"Total: {total_count} items ({', '.join(summary_parts)})")
166 | 
167 |         return "\n".join(output_lines)
168 | 
```

--------------------------------------------------------------------------------
/.claude/agents/system-architect.md:
--------------------------------------------------------------------------------

```markdown
  1 | ---
  2 | name: system-architect
  3 | description: System architect who designs and implements architectural solutions, creates ADRs, and applies software engineering principles to solve complex system design problems.
  4 | model: sonnet
  5 | color: blue
  6 | ---
  7 | 
  8 | You are a Senior System Architect who designs and implements architectural solutions for complex software systems. You have deep expertise in software engineering principles, system design, multi-tenant SaaS architecture, and the Basic Memory Cloud platform.
  9 | 
 10 | **Primary Role: Architectural Implementation Agent**
 11 | You design system architecture and implement architectural decisions through code, configuration, and documentation. You read specs from basic-memory, create architectural solutions, and update specs with implementation progress.
 12 | 
 13 | **Core Responsibilities:**
 14 | 
 15 | **Specification Implementation:**
 16 | - Read architectural specs using basic-memory MCP tools
 17 | - Design and implement system architecture solutions
 18 | - Create code scaffolding, service structure, and system interfaces
 19 | - Update specs with architectural decisions and implementation status
 20 | - Document ADRs (Architectural Decision Records) for significant choices
 21 | 
 22 | **Architectural Design & Implementation:**
 23 | - Design multi-service system architectures
 24 | - Implement service boundaries and communication patterns
 25 | - Create database schemas and migration strategies
 26 | - Design authentication and authorization systems
 27 | - Implement infrastructure-as-code patterns
 28 | 
 29 | **System Implementation Process:**
 30 | 1. **Read Spec**: Use `mcp__basic-memory__read_note` to understand architectural requirements
 31 | 2. **Design Solution**: Apply architectural principles and patterns
 32 | 3. **Implement Structure**: Create service scaffolding, interfaces, configurations
 33 | 4. **Document Decisions**: Create ADRs documenting architectural choices
 34 | 5. **Update Spec**: Record implementation progress and decisions
 35 | 6. **Validate**: Ensure implementation meets spec success criteria
 36 | 
 37 | **Architectural Principles Applied:**
 38 | - DRY (Don't Repeat Yourself) - Single sources of truth
 39 | - KISS (Keep It Simple Stupid) - Favor simplicity over cleverness
 40 | - YAGNI (You Aren't Gonna Need It) - Build only what's needed now
 41 | - Principle of Least Astonishment - Intuitive system behavior
 42 | - Separation of Concerns - Clear boundaries and responsibilities
 43 | 
 44 | **Basic Memory Cloud Expertise:**
 45 | 
 46 | **Multi-Service Architecture:**
 47 | - **Cloud Service**: Tenant management, OAuth 2.1, DBOS workflows
 48 | - **MCP Gateway**: JWT validation, tenant routing, MCP proxy
 49 | - **Web App**: Vue.js frontend, OAuth flows, user interface
 50 | - **API Service**: Per-tenant Basic Memory instances with MCP
 51 | 
 52 | **Multi-Tenant SaaS Patterns:**
 53 | - **Tenant Isolation**: Infrastructure-level isolation with dedicated instances
 54 | - **Database-per-tenant**: Isolated PostgreSQL databases
 55 | - **Authentication**: JWT tokens with tenant-specific claims
 56 | - **Provisioning**: DBOS workflows for durable operations
 57 | - **Resource Management**: Fly.io machine lifecycle management
 58 | 
 59 | **Implementation Capabilities:**
 60 | - FastAPI service structure and middleware
 61 | - DBOS workflow implementation
 62 | - Database schema design and migrations
 63 | - JWT authentication and authorization
 64 | - Fly.io deployment configuration
 65 | - Service communication patterns
 66 | 
 67 | **Technical Implementation:**
 68 | - Create service scaffolding and project structure
 69 | - Implement authentication and authorization middleware
 70 | - Design database schemas and relationships
 71 | - Configure deployment and infrastructure
 72 | - Implement monitoring and health checks
 73 | - Create API interfaces and contracts
 74 | 
 75 | **Code Quality Standards:**
 76 | - Follow established patterns and conventions
 77 | - Implement proper error handling and logging
 78 | - Design for scalability and maintainability
 79 | - Apply security best practices
 80 | - Create comprehensive tests for architectural components
 81 | - Document system behavior and interfaces
 82 | 
 83 | **Decision Documentation:**
 84 | - Create ADRs for significant architectural choices
 85 | - Document trade-offs and alternative approaches considered
 86 | - Maintain decision history and rationale
 87 | - Link architectural decisions to implementation code
 88 | - Update decisions when new information becomes available
 89 | 
 90 | **Basic Memory Integration:**
 91 | - Use `mcp__basic-memory__read_note` to read architectural specs
 92 | - Use `mcp__basic-memory__write_note` to create ADRs and architectural documentation
 93 | - Use `mcp__basic-memory__edit_note` to update specs with implementation progress
 94 | - Document architectural patterns and anti-patterns for reuse
 95 | - Maintain searchable knowledge base of system design decisions
 96 | 
 97 | **Communication Style:**
 98 | - Focus on implemented solutions and concrete architectural artifacts
 99 | - Document decisions with clear rationale and trade-offs
100 | - Provide specific implementation guidance and code examples
101 | - Ask targeted questions about requirements and constraints
102 | - Explain architectural choices in terms of business and technical impact
103 | 
104 | **Deliverables:**
105 | - Working system architecture implementations
106 | - ADRs documenting architectural decisions
107 | - Service scaffolding and interface definitions
108 | - Database schemas and migration scripts
109 | - Configuration and deployment artifacts
110 | - Updated specifications with implementation status
111 | 
112 | **Anti-Patterns to Avoid:**
113 | - Premature optimization over correctness
114 | - Over-engineering for current needs
115 | - Building without clear requirements
116 | - Creating multiple sources of truth
117 | - Implementing solutions without understanding root causes
118 | 
119 | **Key Principles:**
120 | - Implement architectural decisions through working code
121 | - Document all significant decisions and trade-offs
122 | - Build systems that teams can understand and maintain
123 | - Apply proven patterns and avoid reinventing solutions
124 | - Balance current needs with long-term maintainability
125 | 
126 | When handed an architectural specification via `/spec implement`, you will read the spec, design the solution applying architectural principles, implement the necessary code and configuration, document decisions through ADRs, and update the spec with completion status and architectural notes.
```

--------------------------------------------------------------------------------
/tests/utils/test_file_utils.py:
--------------------------------------------------------------------------------

```python
  1 | """Tests for file utilities."""
  2 | 
  3 | from pathlib import Path
  4 | 
  5 | import pytest
  6 | import random
  7 | import string
  8 | 
  9 | from basic_memory.file_utils import (
 10 |     FileError,
 11 |     FileWriteError,
 12 |     ParseError,
 13 |     compute_checksum,
 14 |     has_frontmatter,
 15 |     parse_frontmatter,
 16 |     remove_frontmatter,
 17 |     sanitize_for_filename,
 18 |     sanitize_for_folder,
 19 |     write_file_atomic,
 20 | )
 21 | 
 22 | 
 23 | def get_random_word(length: int = 12, necessary_char: str | None = None) -> str:
 24 |     letters = string.ascii_lowercase
 25 |     word_chars = [random.choice(letters) for i in range(length)]
 26 | 
 27 |     if necessary_char and length > 0:
 28 |         # Replace a character at a random position with the necessary character
 29 |         random_pos = random.randint(0, length - 1)
 30 |         word_chars[random_pos] = necessary_char
 31 | 
 32 |     return "".join(word_chars)
 33 | 
 34 | 
 35 | @pytest.mark.asyncio
 36 | async def test_compute_checksum():
 37 |     """Test checksum computation."""
 38 |     content = "test content"
 39 |     checksum = await compute_checksum(content)
 40 |     assert isinstance(checksum, str)
 41 |     assert len(checksum) == 64  # SHA-256 produces 64 char hex string
 42 | 
 43 | 
 44 | @pytest.mark.asyncio
 45 | async def test_compute_checksum_error():
 46 |     """Test checksum error handling."""
 47 |     with pytest.raises(FileError):
 48 |         # Try to hash an object that can't be encoded
 49 |         await compute_checksum(object())  # pyright: ignore [reportArgumentType]
 50 | 
 51 | 
 52 | @pytest.mark.asyncio
 53 | async def test_write_file_atomic(tmp_path: Path):
 54 |     """Test atomic file writing."""
 55 |     test_file = tmp_path / "test.txt"
 56 |     content = "test content"
 57 | 
 58 |     await write_file_atomic(test_file, content)
 59 |     assert test_file.exists()
 60 |     assert test_file.read_text(encoding="utf-8") == content
 61 | 
 62 |     # Temp file should be cleaned up
 63 |     assert not test_file.with_suffix(".tmp").exists()
 64 | 
 65 | 
 66 | @pytest.mark.asyncio
 67 | async def test_write_file_atomic_error(tmp_path: Path):
 68 |     """Test atomic write error handling."""
 69 |     # Try to write to a directory that doesn't exist
 70 |     test_file = tmp_path / "nonexistent" / "test.txt"
 71 | 
 72 |     with pytest.raises(FileWriteError):
 73 |         await write_file_atomic(test_file, "test content")
 74 | 
 75 | 
 76 | def test_has_frontmatter():
 77 |     """Test frontmatter detection."""
 78 |     # Valid frontmatter
 79 |     assert has_frontmatter("""---
 80 | title: Test
 81 | ---
 82 | content""")
 83 | 
 84 |     # Just content
 85 |     assert not has_frontmatter("Just content")
 86 | 
 87 |     # Empty content
 88 |     assert not has_frontmatter("")
 89 | 
 90 |     # Just delimiter
 91 |     assert not has_frontmatter("---")
 92 | 
 93 |     # Delimiter not at start
 94 |     assert not has_frontmatter("""
 95 | Some text
 96 | ---
 97 | title: Test
 98 | ---""")
 99 | 
100 |     # Invalid format
101 |     assert not has_frontmatter("--title: test--")
102 | 
103 | 
104 | def test_parse_frontmatter():
105 |     """Test parsing frontmatter."""
106 |     # Valid frontmatter
107 |     content = """---
108 | title: Test
109 | tags:
110 |   - a
111 |   - b
112 | ---
113 | content"""
114 | 
115 |     result = parse_frontmatter(content)
116 |     assert result == {"title": "Test", "tags": ["a", "b"]}
117 | 
118 |     # Empty frontmatter
119 |     content = """---
120 | ---
121 | content"""
122 |     result = parse_frontmatter(content)
123 |     assert result == {} or result == {}  # Handle both None and empty dict cases
124 | 
125 |     # Invalid YAML syntax
126 |     with pytest.raises(ParseError) as exc:
127 |         parse_frontmatter("""---
128 | [: invalid yaml syntax :]
129 | ---
130 | content""")
131 |     assert "Invalid YAML in frontmatter" in str(exc.value)
132 | 
133 |     # Non-dict YAML content
134 |     with pytest.raises(ParseError) as exc:
135 |         parse_frontmatter("""---
136 | - just
137 | - a
138 | - list
139 | ---
140 | content""")
141 |     assert "Frontmatter must be a YAML dictionary" in str(exc.value)
142 | 
143 |     # No frontmatter
144 |     with pytest.raises(ParseError):
145 |         parse_frontmatter("Just content")
146 | 
147 |     # Incomplete frontmatter
148 |     with pytest.raises(ParseError):
149 |         parse_frontmatter("""---
150 | title: Test""")
151 | 
152 | 
153 | def test_remove_frontmatter():
154 |     """Test removing frontmatter."""
155 |     # With frontmatter
156 |     content = """---
157 | title: Test
158 | ---
159 | test content"""
160 |     assert remove_frontmatter(content) == "test content"
161 | 
162 |     # No frontmatter
163 |     content = "test content"
164 |     assert remove_frontmatter(content) == "test content"
165 | 
166 |     # Only frontmatter
167 |     content = """---
168 | title: Test
169 | ---
170 | """
171 |     assert remove_frontmatter(content) == ""
172 | 
173 |     # Invalid frontmatter - missing closing delimiter
174 |     with pytest.raises(ParseError) as exc:
175 |         remove_frontmatter("""---
176 | title: Test""")
177 |     assert "Invalid frontmatter format" in str(exc.value)
178 | 
179 | 
180 | @pytest.mark.asyncio
181 | def test_sanitize_for_filename_removes_invalid_characters():
182 |     # Test all invalid characters listed in the regex
183 |     invalid_chars = '<>:"|?*'
184 | 
185 |     # All invalid characters should be replaced
186 |     for char in invalid_chars:
187 |         text = get_random_word(length=12, necessary_char=char)
188 |         sanitized_text = sanitize_for_filename(text)
189 | 
190 |         assert char not in sanitized_text
191 | 
192 | 
193 | @pytest.mark.parametrize(
194 |     "input_folder,expected",
195 |     [
196 |         ("", ""),  # Empty string
197 |         ("   ", ""),  # Whitespace only
198 |         ("my-folder", "my-folder"),  # Simple folder
199 |         ("my/folder", "my/folder"),  # Nested folder
200 |         ("my//folder", "my/folder"),  # Double slash compressed
201 |         ("my\\\\folder", "my/folder"),  # Windows-style double backslash compressed
202 |         ("my/folder/", "my/folder"),  # Trailing slash removed
203 |         ("/my/folder", "my/folder"),  # Leading slash removed
204 |         ("./my/folder", "my/folder"),  # Leading ./ removed
205 |         ("my<>folder", "myfolder"),  # Special chars removed
206 |         ("my:folder|test", "myfoldertest"),  # More special chars removed
207 |         ("my_folder-1", "my_folder-1"),  # Allowed chars preserved
208 |         ("my folder", "my folder"),  # Space preserved
209 |         ("my/folder//sub//", "my/folder/sub"),  # Multiple compressions and trims
210 |         ("my\\folder\\sub", "my/folder/sub"),  # Windows-style separators normalized
211 |         ("my/folder<>:|?*sub", "my/foldersub"),  # All invalid chars removed
212 |         ("////my////folder////", "my/folder"),  # Excessive leading/trailing/multiple slashes
213 |     ],
214 | )
215 | def test_sanitize_for_folder_edge_cases(input_folder, expected):
216 |     assert sanitize_for_folder(input_folder) == expected
217 | 
```

--------------------------------------------------------------------------------
/tests/repository/test_repository.py:
--------------------------------------------------------------------------------

```python
  1 | """Test repository implementation."""
  2 | 
  3 | from datetime import datetime
  4 | import pytest
  5 | from sqlalchemy import String, DateTime
  6 | from sqlalchemy.orm import Mapped, mapped_column
  7 | 
  8 | from basic_memory.models import Base
  9 | from basic_memory.repository.repository import Repository
 10 | 
 11 | 
 12 | class ModelTest(Base):
 13 |     """Test model for repository tests."""
 14 | 
 15 |     __tablename__ = "test_model"
 16 | 
 17 |     id: Mapped[str] = mapped_column(String(255), primary_key=True)
 18 |     name: Mapped[str] = mapped_column(String(255))
 19 |     description: Mapped[str | None] = mapped_column(String(255), nullable=True)
 20 |     created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow)
 21 |     updated_at: Mapped[datetime] = mapped_column(
 22 |         DateTime, default=datetime.utcnow, onupdate=datetime.utcnow
 23 |     )
 24 | 
 25 | 
 26 | @pytest.fixture
 27 | def repository(session_maker):
 28 |     """Create a test repository."""
 29 |     return Repository(session_maker, ModelTest)
 30 | 
 31 | 
 32 | @pytest.mark.asyncio
 33 | async def test_add(repository):
 34 |     """Test bulk creation of entities."""
 35 |     # Create test instances
 36 |     instance = ModelTest(id="test_add", name="Test Add")
 37 |     await repository.add(instance)
 38 | 
 39 |     # Verify we can find in db
 40 |     found = await repository.find_by_id("test_add")
 41 |     assert found is not None
 42 |     assert found.name == "Test Add"
 43 | 
 44 | 
 45 | @pytest.mark.asyncio
 46 | async def test_add_all(repository):
 47 |     """Test bulk creation of entities."""
 48 |     # Create test instances
 49 |     instances = [ModelTest(id=f"test_{i}", name=f"Test {i}") for i in range(3)]
 50 |     await repository.add_all(instances)
 51 | 
 52 |     # Verify we can find them in db
 53 |     found = await repository.find_by_id("test_0")
 54 |     assert found is not None
 55 |     assert found.name == "Test 0"
 56 | 
 57 | 
 58 | @pytest.mark.asyncio
 59 | async def test_bulk_create(repository):
 60 |     """Test bulk creation of entities."""
 61 |     # Create test instances
 62 |     instances = [ModelTest(id=f"test_{i}", name=f"Test {i}") for i in range(3)]
 63 | 
 64 |     # Bulk create
 65 |     await repository.create_all([instance.__dict__ for instance in instances])
 66 | 
 67 |     # Verify we can find them in db
 68 |     found = await repository.find_by_id("test_0")
 69 |     assert found is not None
 70 |     assert found.name == "Test 0"
 71 | 
 72 | 
 73 | @pytest.mark.asyncio
 74 | async def test_find_all(repository):
 75 |     """Test finding multiple entities by IDs."""
 76 |     # Create test data
 77 |     instances = [ModelTest(id=f"test_{i}", name=f"Test {i}") for i in range(5)]
 78 |     await repository.create_all([instance.__dict__ for instance in instances])
 79 | 
 80 |     found = await repository.find_all(limit=3)
 81 |     assert len(found) == 3
 82 | 
 83 | 
 84 | @pytest.mark.asyncio
 85 | async def test_find_by_ids(repository):
 86 |     """Test finding multiple entities by IDs."""
 87 |     # Create test data
 88 |     instances = [ModelTest(id=f"test_{i}", name=f"Test {i}") for i in range(5)]
 89 |     await repository.create_all([instance.__dict__ for instance in instances])
 90 | 
 91 |     # Test finding subset of entities
 92 |     ids_to_find = ["test_0", "test_2", "test_4"]
 93 |     found = await repository.find_by_ids(ids_to_find)
 94 |     assert len(found) == 3
 95 |     assert sorted([e.id for e in found]) == sorted(ids_to_find)
 96 | 
 97 |     # Test finding with some non-existent IDs
 98 |     mixed_ids = ["test_0", "nonexistent", "test_4"]
 99 |     partial_found = await repository.find_by_ids(mixed_ids)
100 |     assert len(partial_found) == 2
101 |     assert sorted([e.id for e in partial_found]) == ["test_0", "test_4"]
102 | 
103 |     # Test with empty list
104 |     empty_found = await repository.find_by_ids([])
105 |     assert len(empty_found) == 0
106 | 
107 |     # Test with all non-existent IDs
108 |     not_found = await repository.find_by_ids(["fake1", "fake2"])
109 |     assert len(not_found) == 0
110 | 
111 | 
112 | @pytest.mark.asyncio
113 | async def test_delete_by_ids(repository):
114 |     """Test finding multiple entities by IDs."""
115 |     # Create test data
116 |     instances = [ModelTest(id=f"test_{i}", name=f"Test {i}") for i in range(5)]
117 |     await repository.create_all([instance.__dict__ for instance in instances])
118 | 
119 |     # Test delete subset of entities
120 |     ids_to_delete = ["test_0", "test_2", "test_4"]
121 |     deleted_count = await repository.delete_by_ids(ids_to_delete)
122 |     assert deleted_count == 3
123 | 
124 |     # Test finding subset of entities
125 |     ids_to_find = ["test_1", "test_3"]
126 |     found = await repository.find_by_ids(ids_to_find)
127 |     assert len(found) == 2
128 |     assert sorted([e.id for e in found]) == sorted(ids_to_find)
129 | 
130 |     assert await repository.find_by_id(ids_to_delete[0]) is None
131 |     assert await repository.find_by_id(ids_to_delete[1]) is None
132 |     assert await repository.find_by_id(ids_to_delete[2]) is None
133 | 
134 | 
135 | @pytest.mark.asyncio
136 | async def test_update(repository):
137 |     """Test finding entities modified since a timestamp."""
138 |     # Create initial test data
139 |     instance = ModelTest(id="test_add", name="Test Add")
140 |     await repository.add(instance)
141 | 
142 |     instance = ModelTest(id="test_add", name="Updated")
143 | 
144 |     # Find recently modified
145 |     modified = await repository.update(instance.id, {"name": "Updated"})
146 |     assert modified is not None
147 |     assert modified.name == "Updated"
148 | 
149 | 
150 | @pytest.mark.asyncio
151 | async def test_update_model(repository):
152 |     """Test finding entities modified since a timestamp."""
153 |     # Create initial test data
154 |     instance = ModelTest(id="test_add", name="Test Add")
155 |     await repository.add(instance)
156 | 
157 |     instance.name = "Updated"
158 | 
159 |     # Find recently modified
160 |     modified = await repository.update(instance.id, instance)
161 |     assert modified is not None
162 |     assert modified.name == "Updated"
163 | 
164 | 
165 | @pytest.mark.asyncio
166 | async def test_update_model_not_found(repository):
167 |     """Test finding entities modified since a timestamp."""
168 |     # Create initial test data
169 |     instance = ModelTest(id="test_add", name="Test Add")
170 |     await repository.add(instance)
171 | 
172 |     modified = await repository.update(0, {})
173 |     assert modified is None
174 | 
175 | 
176 | @pytest.mark.asyncio
177 | async def test_count(repository):
178 |     """Test bulk creation of entities."""
179 |     # Create test instances
180 |     instance = ModelTest(id="test_add", name="Test Add")
181 |     await repository.add(instance)
182 | 
183 |     # Verify we can count in db
184 |     count = await repository.count()
185 |     assert count == 1
186 | 
```

--------------------------------------------------------------------------------
/test-int/mcp/test_build_context_underscore.py:
--------------------------------------------------------------------------------

```python
  1 | """Integration test for build_context with underscore in memory:// URLs."""
  2 | 
  3 | import pytest
  4 | from fastmcp import Client
  5 | 
  6 | 
  7 | @pytest.mark.asyncio
  8 | async def test_build_context_underscore_normalization(mcp_server, app, test_project):
  9 |     """Test that build_context normalizes underscores in relation types."""
 10 | 
 11 |     async with Client(mcp_server) as client:
 12 |         # Create parent note
 13 |         await client.call_tool(
 14 |             "write_note",
 15 |             {
 16 |                 "project": test_project.name,
 17 |                 "title": "Parent Entity",
 18 |                 "folder": "testing",
 19 |                 "content": "# Parent Entity\n\nMain entity for testing underscore relations.",
 20 |                 "tags": "test,parent",
 21 |             },
 22 |         )
 23 | 
 24 |         # Create child notes with different relation formats
 25 |         await client.call_tool(
 26 |             "write_note",
 27 |             {
 28 |                 "project": test_project.name,
 29 |                 "title": "Child with Underscore",
 30 |                 "folder": "testing",
 31 |                 "content": """# Child with Underscore
 32 | 
 33 | - part_of [[Parent Entity]]
 34 | - related_to [[Parent Entity]]
 35 |                 """,
 36 |                 "tags": "test,child",
 37 |             },
 38 |         )
 39 | 
 40 |         await client.call_tool(
 41 |             "write_note",
 42 |             {
 43 |                 "project": test_project.name,
 44 |                 "title": "Child with Hyphen",
 45 |                 "folder": "testing",
 46 |                 "content": """# Child with Hyphen
 47 | 
 48 | - part-of [[Parent Entity]]
 49 | - related-to [[Parent Entity]]
 50 |                 """,
 51 |                 "tags": "test,child",
 52 |             },
 53 |         )
 54 | 
 55 |         # Test 1: Search with underscore format should return results
 56 |         # Relation permalinks are: source/relation_type/target
 57 |         # So child-with-underscore/part-of/parent-entity
 58 |         result_underscore = await client.call_tool(
 59 |             "build_context",
 60 |             {
 61 |                 "project": test_project.name,
 62 |                 "url": "memory://testing/*/part_of/*parent*",  # Using underscore
 63 |             },
 64 |         )
 65 | 
 66 |         # Parse response
 67 |         assert len(result_underscore.content) == 1
 68 |         response_text = result_underscore.content[0].text  # pyright: ignore
 69 |         assert '"results"' in response_text
 70 | 
 71 |         # Both relations should be found since they both connect to parent-entity
 72 |         # The system should normalize the underscore to hyphen internally
 73 |         assert "part-of" in response_text.lower()
 74 | 
 75 |         # Test 2: Search with hyphen format should also return results
 76 |         result_hyphen = await client.call_tool(
 77 |             "build_context",
 78 |             {
 79 |                 "project": test_project.name,
 80 |                 "url": "memory://testing/*/part-of/*parent*",  # Using hyphen
 81 |             },
 82 |         )
 83 | 
 84 |         response_text_hyphen = result_hyphen.content[0].text  # pyright: ignore
 85 |         assert '"results"' in response_text_hyphen
 86 |         assert "part-of" in response_text_hyphen.lower()
 87 | 
 88 |         # Test 3: Test with related_to/related-to as well
 89 |         result_related = await client.call_tool(
 90 |             "build_context",
 91 |             {
 92 |                 "project": test_project.name,
 93 |                 "url": "memory://testing/*/related_to/*parent*",  # Using underscore
 94 |             },
 95 |         )
 96 | 
 97 |         response_text_related = result_related.content[0].text  # pyright: ignore
 98 |         assert '"results"' in response_text_related
 99 |         assert "related-to" in response_text_related.lower()
100 | 
101 |         # Test 4: Test exact path (non-wildcard) with underscore
102 |         # Exact relation permalink would be child/relation/target
103 |         result_exact = await client.call_tool(
104 |             "build_context",
105 |             {
106 |                 "project": test_project.name,
107 |                 "url": "memory://testing/child-with-underscore/part_of/testing/parent-entity",
108 |             },
109 |         )
110 | 
111 |         response_text_exact = result_exact.content[0].text  # pyright: ignore
112 |         assert '"results"' in response_text_exact
113 |         assert "part-of" in response_text_exact.lower()
114 | 
115 | 
116 | @pytest.mark.asyncio
117 | async def test_build_context_complex_underscore_paths(mcp_server, app, test_project):
118 |     """Test build_context with complex paths containing underscores."""
119 | 
120 |     async with Client(mcp_server) as client:
121 |         # Create notes with underscores in titles and relations
122 |         await client.call_tool(
123 |             "write_note",
124 |             {
125 |                 "project": test_project.name,
126 |                 "title": "workflow_manager_agent",
127 |                 "folder": "specs",
128 |                 "content": """# Workflow Manager Agent
129 | 
130 | Specification for the workflow manager agent.
131 |                 """,
132 |                 "tags": "spec,workflow",
133 |             },
134 |         )
135 | 
136 |         await client.call_tool(
137 |             "write_note",
138 |             {
139 |                 "project": test_project.name,
140 |                 "title": "task_parser",
141 |                 "folder": "components",
142 |                 "content": """# Task Parser
143 | 
144 | - part_of [[workflow_manager_agent]]
145 | - implements_for [[workflow_manager_agent]]
146 |                 """,
147 |                 "tags": "component,parser",
148 |             },
149 |         )
150 | 
151 |         # Test with underscores in all parts of the path
152 |         # Relations are created as: task-parser/part-of/workflow-manager-agent
153 |         # So search for */part_of/* or */part-of/* to find them
154 |         test_cases = [
155 |             "memory://components/*/part_of/*workflow*",
156 |             "memory://components/*/part-of/*workflow*",
157 |             "memory://*/task*/part_of/*",
158 |             "memory://*/task*/part-of/*",
159 |         ]
160 | 
161 |         for url in test_cases:
162 |             result = await client.call_tool(
163 |                 "build_context", {"project": test_project.name, "url": url}
164 |             )
165 | 
166 |             # All variations should work and find the related content
167 |             assert len(result.content) == 1
168 |             response = result.content[0].text  # pyright: ignore
169 |             assert '"results"' in response
170 |             # The relation should be found showing part-of connection
171 |             assert "part-of" in response.lower(), f"Failed for URL: {url}"
172 | 
```

--------------------------------------------------------------------------------
/tests/test_db_migration_deduplication.py:
--------------------------------------------------------------------------------

```python
  1 | """Tests for database migration deduplication functionality."""
  2 | 
  3 | import pytest
  4 | from unittest.mock import patch, AsyncMock, MagicMock
  5 | 
  6 | from basic_memory import db
  7 | 
  8 | 
  9 | @pytest.fixture
 10 | def mock_alembic_config():
 11 |     """Mock Alembic config to avoid actual migration runs."""
 12 |     with patch("basic_memory.db.Config") as mock_config_class:
 13 |         mock_config = MagicMock()
 14 |         mock_config_class.return_value = mock_config
 15 |         yield mock_config
 16 | 
 17 | 
 18 | @pytest.fixture
 19 | def mock_alembic_command():
 20 |     """Mock Alembic command to avoid actual migration runs."""
 21 |     with patch("basic_memory.db.command") as mock_command:
 22 |         yield mock_command
 23 | 
 24 | 
 25 | @pytest.fixture
 26 | def mock_search_repository():
 27 |     """Mock SearchRepository to avoid database dependencies."""
 28 |     with patch("basic_memory.db.SearchRepository") as mock_repo_class:
 29 |         mock_repo = AsyncMock()
 30 |         mock_repo_class.return_value = mock_repo
 31 |         yield mock_repo
 32 | 
 33 | 
 34 | # Use the app_config fixture from conftest.py
 35 | 
 36 | 
 37 | @pytest.mark.asyncio
 38 | async def test_migration_deduplication_single_call(
 39 |     app_config, mock_alembic_config, mock_alembic_command, mock_search_repository
 40 | ):
 41 |     """Test that migrations are only run once when called multiple times."""
 42 |     # Reset module state
 43 |     db._migrations_completed = False
 44 |     db._engine = None
 45 |     db._session_maker = None
 46 | 
 47 |     # First call should run migrations
 48 |     await db.run_migrations(app_config)
 49 | 
 50 |     # Verify migrations were called
 51 |     mock_alembic_command.upgrade.assert_called_once_with(mock_alembic_config, "head")
 52 |     mock_search_repository.init_search_index.assert_called_once()
 53 | 
 54 |     # Reset mocks for second call
 55 |     mock_alembic_command.reset_mock()
 56 |     mock_search_repository.reset_mock()
 57 | 
 58 |     # Second call should skip migrations
 59 |     await db.run_migrations(app_config)
 60 | 
 61 |     # Verify migrations were NOT called again
 62 |     mock_alembic_command.upgrade.assert_not_called()
 63 |     mock_search_repository.init_search_index.assert_not_called()
 64 | 
 65 | 
 66 | @pytest.mark.asyncio
 67 | async def test_migration_force_parameter(
 68 |     app_config, mock_alembic_config, mock_alembic_command, mock_search_repository
 69 | ):
 70 |     """Test that migrations can be forced to run even if already completed."""
 71 |     # Reset module state
 72 |     db._migrations_completed = False
 73 |     db._engine = None
 74 |     db._session_maker = None
 75 | 
 76 |     # First call should run migrations
 77 |     await db.run_migrations(app_config)
 78 | 
 79 |     # Verify migrations were called
 80 |     mock_alembic_command.upgrade.assert_called_once_with(mock_alembic_config, "head")
 81 |     mock_search_repository.init_search_index.assert_called_once()
 82 | 
 83 |     # Reset mocks for forced call
 84 |     mock_alembic_command.reset_mock()
 85 |     mock_search_repository.reset_mock()
 86 | 
 87 |     # Forced call should run migrations again
 88 |     await db.run_migrations(app_config, force=True)
 89 | 
 90 |     # Verify migrations were called again
 91 |     mock_alembic_command.upgrade.assert_called_once_with(mock_alembic_config, "head")
 92 |     mock_search_repository.init_search_index.assert_called_once()
 93 | 
 94 | 
 95 | @pytest.mark.asyncio
 96 | async def test_migration_state_reset_on_shutdown():
 97 |     """Test that migration state is reset when database is shut down."""
 98 |     # Set up completed state
 99 |     db._migrations_completed = True
100 |     db._engine = AsyncMock()
101 |     db._session_maker = AsyncMock()
102 | 
103 |     # Shutdown should reset state
104 |     await db.shutdown_db()
105 | 
106 |     # Verify state was reset
107 |     assert db._migrations_completed is False
108 |     assert db._engine is None
109 |     assert db._session_maker is None
110 | 
111 | 
112 | @pytest.mark.asyncio
113 | async def test_get_or_create_db_runs_migrations_automatically(
114 |     app_config, mock_alembic_config, mock_alembic_command, mock_search_repository
115 | ):
116 |     """Test that get_or_create_db runs migrations automatically."""
117 |     # Reset module state
118 |     db._migrations_completed = False
119 |     db._engine = None
120 |     db._session_maker = None
121 | 
122 |     # First call should create engine and run migrations
123 |     engine, session_maker = await db.get_or_create_db(app_config.database_path)
124 | 
125 |     # Verify we got valid objects
126 |     assert engine is not None
127 |     assert session_maker is not None
128 | 
129 |     # Verify migrations were called
130 |     mock_alembic_command.upgrade.assert_called_once_with(mock_alembic_config, "head")
131 |     mock_search_repository.init_search_index.assert_called_once()
132 | 
133 | 
134 | @pytest.mark.asyncio
135 | async def test_get_or_create_db_skips_migrations_when_disabled(
136 |     app_config, mock_alembic_config, mock_alembic_command, mock_search_repository
137 | ):
138 |     """Test that get_or_create_db can skip migrations when ensure_migrations=False."""
139 |     # Reset module state
140 |     db._migrations_completed = False
141 |     db._engine = None
142 |     db._session_maker = None
143 | 
144 |     # Call with ensure_migrations=False should skip migrations
145 |     engine, session_maker = await db.get_or_create_db(
146 |         app_config.database_path, ensure_migrations=False
147 |     )
148 | 
149 |     # Verify we got valid objects
150 |     assert engine is not None
151 |     assert session_maker is not None
152 | 
153 |     # Verify migrations were NOT called
154 |     mock_alembic_command.upgrade.assert_not_called()
155 |     mock_search_repository.init_search_index.assert_not_called()
156 | 
157 | 
158 | @pytest.mark.asyncio
159 | async def test_multiple_get_or_create_db_calls_deduplicated(
160 |     app_config, mock_alembic_config, mock_alembic_command, mock_search_repository
161 | ):
162 |     """Test that multiple get_or_create_db calls only run migrations once."""
163 |     # Reset module state
164 |     db._migrations_completed = False
165 |     db._engine = None
166 |     db._session_maker = None
167 | 
168 |     # First call should create engine and run migrations
169 |     await db.get_or_create_db(app_config.database_path)
170 | 
171 |     # Verify migrations were called
172 |     mock_alembic_command.upgrade.assert_called_once_with(mock_alembic_config, "head")
173 |     mock_search_repository.init_search_index.assert_called_once()
174 | 
175 |     # Reset mocks for subsequent calls
176 |     mock_alembic_command.reset_mock()
177 |     mock_search_repository.reset_mock()
178 | 
179 |     # Subsequent calls should not run migrations again
180 |     await db.get_or_create_db(app_config.database_path)
181 |     await db.get_or_create_db(app_config.database_path)
182 | 
183 |     # Verify migrations were NOT called again
184 |     mock_alembic_command.upgrade.assert_not_called()
185 |     mock_search_repository.init_search_index.assert_not_called()
186 | 
```

--------------------------------------------------------------------------------
/tests/api/test_management_router.py:
--------------------------------------------------------------------------------

```python
  1 | """Tests for management router API endpoints."""
  2 | 
  3 | from unittest.mock import AsyncMock, MagicMock, patch
  4 | 
  5 | import pytest
  6 | from fastapi import FastAPI
  7 | 
  8 | from basic_memory.api.routers.management_router import (
  9 |     WatchStatusResponse,
 10 |     get_watch_status,
 11 |     start_watch_service,
 12 |     stop_watch_service,
 13 | )
 14 | 
 15 | 
 16 | class MockRequest:
 17 |     """Mock FastAPI request with app state."""
 18 | 
 19 |     def __init__(self, app):
 20 |         self.app = app
 21 | 
 22 | 
 23 | @pytest.fixture
 24 | def mock_app():
 25 |     """Create a mock FastAPI app with state."""
 26 |     app = MagicMock(spec=FastAPI)
 27 |     app.state = MagicMock()
 28 |     app.state.watch_task = None
 29 |     return app
 30 | 
 31 | 
 32 | @pytest.mark.asyncio
 33 | async def test_get_watch_status_not_running(mock_app):
 34 |     """Test getting watch status when watch service is not running."""
 35 |     # Set up app state
 36 |     mock_app.state.watch_task = None
 37 | 
 38 |     # Create mock request
 39 |     mock_request = MockRequest(mock_app)
 40 | 
 41 |     # Call endpoint directly
 42 |     response = await get_watch_status(mock_request)
 43 | 
 44 |     # Verify response
 45 |     assert isinstance(response, WatchStatusResponse)
 46 |     assert response.running is False
 47 | 
 48 | 
 49 | @pytest.mark.asyncio
 50 | async def test_get_watch_status_running(mock_app):
 51 |     """Test getting watch status when watch service is running."""
 52 |     # Create a mock task that is running
 53 |     mock_task = MagicMock()
 54 |     mock_task.done.return_value = False
 55 | 
 56 |     # Set up app state
 57 |     mock_app.state.watch_task = mock_task
 58 | 
 59 |     # Create mock request
 60 |     mock_request = MockRequest(mock_app)
 61 | 
 62 |     # Call endpoint directly
 63 |     response = await get_watch_status(mock_request)
 64 | 
 65 |     # Verify response
 66 |     assert isinstance(response, WatchStatusResponse)
 67 |     assert response.running is True
 68 | 
 69 | 
 70 | @pytest.fixture
 71 | def mock_sync_service():
 72 |     """Create a mock SyncService."""
 73 |     mock_service = AsyncMock()
 74 |     mock_service.entity_service = MagicMock()
 75 |     mock_service.entity_service.file_service = MagicMock()
 76 |     return mock_service
 77 | 
 78 | 
 79 | @pytest.fixture
 80 | def mock_project_repository():
 81 |     """Create a mock ProjectRepository."""
 82 |     mock_repository = AsyncMock()
 83 |     return mock_repository
 84 | 
 85 | 
 86 | @pytest.mark.asyncio
 87 | async def test_start_watch_service_when_not_running(
 88 |     mock_app, mock_sync_service, mock_project_repository
 89 | ):
 90 |     """Test starting watch service when it's not running."""
 91 |     # Set up app state
 92 |     mock_app.state.watch_task = None
 93 | 
 94 |     # Create mock request
 95 |     mock_request = MockRequest(mock_app)
 96 | 
 97 |     # Mock the create_background_sync_task function
 98 |     with (
 99 |         patch("basic_memory.sync.WatchService") as mock_watch_service_class,
100 |         patch("basic_memory.sync.background_sync.create_background_sync_task") as mock_create_task,
101 |     ):
102 |         # Create a mock task
103 |         mock_task = MagicMock()
104 |         mock_task.done.return_value = False
105 |         mock_create_task.return_value = mock_task
106 | 
107 |         # Setup mock watch service
108 |         mock_watch_service = MagicMock()
109 |         mock_watch_service_class.return_value = mock_watch_service
110 | 
111 |         # Call endpoint directly
112 |         response = await start_watch_service(
113 |             mock_request, mock_project_repository, mock_sync_service
114 |         )  # pyright: ignore [reportCallIssue]
115 | 
116 |         # Verify response
117 |         assert isinstance(response, WatchStatusResponse)
118 |         assert response.running is True
119 | 
120 |         # Verify that the task was created
121 |         assert mock_create_task.called
122 | 
123 | 
124 | @pytest.mark.asyncio
125 | async def test_start_watch_service_already_running(
126 |     mock_app, mock_sync_service, mock_project_repository
127 | ):
128 |     """Test starting watch service when it's already running."""
129 |     # Create a mock task that reports as running
130 |     mock_task = MagicMock()
131 |     mock_task.done.return_value = False
132 | 
133 |     # Set up app state with a "running" task
134 |     mock_app.state.watch_task = mock_task
135 | 
136 |     # Create mock request
137 |     mock_request = MockRequest(mock_app)
138 | 
139 |     with patch("basic_memory.sync.background_sync.create_background_sync_task") as mock_create_task:
140 |         # Call endpoint directly
141 |         response = await start_watch_service(
142 |             mock_request, mock_project_repository, mock_sync_service
143 |         )
144 | 
145 |         # Verify response
146 |         assert isinstance(response, WatchStatusResponse)
147 |         assert response.running is True
148 | 
149 |         # Verify that no new task was created
150 |         assert not mock_create_task.called
151 | 
152 |         # Verify app state was not changed
153 |         assert mock_app.state.watch_task is mock_task
154 | 
155 | 
156 | @pytest.mark.asyncio
157 | async def test_stop_watch_service_when_running():
158 |     """Test stopping the watch service when it's running.
159 | 
160 |     This test directly tests parts of the code without actually awaiting the task.
161 |     """
162 |     from basic_memory.api.routers.management_router import WatchStatusResponse
163 | 
164 |     # Create a response object directly
165 |     response = WatchStatusResponse(running=False)
166 | 
167 |     # We're just testing that the response model works correctly
168 |     assert isinstance(response, WatchStatusResponse)
169 |     assert response.running is False
170 | 
171 |     # The actual functionality is simple enough that other tests
172 |     # indirectly cover the basic behavior, and the error paths
173 |     # are directly tested in the other test cases
174 | 
175 | 
176 | @pytest.mark.asyncio
177 | async def test_stop_watch_service_not_running(mock_app):
178 |     """Test stopping the watch service when it's not running."""
179 |     # Set up app state with no task
180 |     mock_app.state.watch_task = None
181 | 
182 |     # Create mock request
183 |     mock_request = MockRequest(mock_app)
184 | 
185 |     # Call endpoint directly
186 |     response = await stop_watch_service(mock_request)
187 | 
188 |     # Verify response
189 |     assert isinstance(response, WatchStatusResponse)
190 |     assert response.running is False
191 | 
192 | 
193 | @pytest.mark.asyncio
194 | async def test_stop_watch_service_already_done(mock_app):
195 |     """Test stopping the watch service when it's already done."""
196 |     # Create a mock task that reports as done
197 |     mock_task = MagicMock()
198 |     mock_task.done.return_value = True
199 | 
200 |     # Set up app state
201 |     mock_app.state.watch_task = mock_task
202 | 
203 |     # Create mock request
204 |     mock_request = MockRequest(mock_app)
205 | 
206 |     # Call endpoint directly
207 |     response = await stop_watch_service(mock_request)  # pyright: ignore [reportArgumentType]
208 | 
209 |     # Verify response
210 |     assert isinstance(response, WatchStatusResponse)
211 |     assert response.running is False
212 | 
```

--------------------------------------------------------------------------------
/tests/mcp/test_obsidian_yaml_formatting.py:
--------------------------------------------------------------------------------

```python
  1 | """Integration tests for Obsidian-compatible YAML formatting in write_note tool."""
  2 | 
  3 | import pytest
  4 | 
  5 | from basic_memory.mcp.tools import write_note
  6 | 
  7 | 
  8 | @pytest.mark.asyncio
  9 | async def test_write_note_tags_yaml_format(app, project_config, test_project):
 10 |     """Test that write_note creates files with proper YAML list format for tags."""
 11 |     # Create a note with tags using write_note
 12 |     result = await write_note.fn(
 13 |         project=test_project.name,
 14 |         title="YAML Format Test",
 15 |         folder="test",
 16 |         content="Testing YAML tag formatting",
 17 |         tags=["system", "overview", "reference"],
 18 |     )
 19 | 
 20 |     # Verify the note was created successfully
 21 |     assert "Created note" in result
 22 |     assert "file_path: test/YAML Format Test.md" in result
 23 | 
 24 |     # Read the file directly to check YAML formatting
 25 |     file_path = project_config.home / "test" / "YAML Format Test.md"
 26 |     content = file_path.read_text(encoding="utf-8")
 27 | 
 28 |     # Should use YAML list format
 29 |     assert "tags:" in content
 30 |     assert "- system" in content
 31 |     assert "- overview" in content
 32 |     assert "- reference" in content
 33 | 
 34 |     # Should NOT use JSON array format
 35 |     assert '["system"' not in content
 36 |     assert '"overview"' not in content
 37 |     assert '"reference"]' not in content
 38 | 
 39 | 
 40 | @pytest.mark.asyncio
 41 | async def test_write_note_stringified_json_tags(app, project_config, test_project):
 42 |     """Test that stringified JSON arrays are handled correctly."""
 43 |     # This simulates the issue where AI assistants pass tags as stringified JSON
 44 |     result = await write_note.fn(
 45 |         project=test_project.name,
 46 |         title="Stringified JSON Test",
 47 |         folder="test",
 48 |         content="Testing stringified JSON tag input",
 49 |         tags='["python", "testing", "json"]',  # Stringified JSON array
 50 |     )
 51 | 
 52 |     # Verify the note was created successfully
 53 |     assert "Created note" in result
 54 | 
 55 |     # Read the file to check formatting
 56 |     file_path = project_config.home / "test" / "Stringified JSON Test.md"
 57 |     content = file_path.read_text(encoding="utf-8")
 58 | 
 59 |     # Should properly parse the JSON and format as YAML list
 60 |     assert "tags:" in content
 61 |     assert "- python" in content
 62 |     assert "- testing" in content
 63 |     assert "- json" in content
 64 | 
 65 |     # Should NOT have the original stringified format issues
 66 |     assert '["python"' not in content
 67 |     assert '"testing"' not in content
 68 |     assert '"json"]' not in content
 69 | 
 70 | 
 71 | @pytest.mark.asyncio
 72 | async def test_write_note_single_tag_yaml_format(app, project_config, test_project):
 73 |     """Test that single tags are still formatted as YAML lists."""
 74 |     await write_note.fn(
 75 |         project=test_project.name,
 76 |         title="Single Tag Test",
 77 |         folder="test",
 78 |         content="Testing single tag formatting",
 79 |         tags=["solo-tag"],
 80 |     )
 81 | 
 82 |     file_path = project_config.home / "test" / "Single Tag Test.md"
 83 |     content = file_path.read_text(encoding="utf-8")
 84 | 
 85 |     # Single tag should still use list format
 86 |     assert "tags:" in content
 87 |     assert "- solo-tag" in content
 88 | 
 89 | 
 90 | @pytest.mark.asyncio
 91 | async def test_write_note_no_tags(app, project_config, test_project):
 92 |     """Test that notes without tags work normally."""
 93 |     await write_note.fn(
 94 |         project=test_project.name,
 95 |         title="No Tags Test",
 96 |         folder="test",
 97 |         content="Testing note without tags",
 98 |         tags=None,
 99 |     )
100 | 
101 |     file_path = project_config.home / "test" / "No Tags Test.md"
102 |     content = file_path.read_text(encoding="utf-8")
103 | 
104 |     # Should not have tags field in frontmatter
105 |     assert "tags:" not in content
106 |     assert "title: No Tags Test" in content
107 | 
108 | 
109 | @pytest.mark.asyncio
110 | async def test_write_note_empty_tags_list(app, project_config, test_project):
111 |     """Test that empty tag lists are handled properly."""
112 |     await write_note.fn(
113 |         project=test_project.name,
114 |         title="Empty Tags Test",
115 |         folder="test",
116 |         content="Testing empty tag list",
117 |         tags=[],
118 |     )
119 | 
120 |     file_path = project_config.home / "test" / "Empty Tags Test.md"
121 |     content = file_path.read_text(encoding="utf-8")
122 | 
123 |     # Should not add tags field to frontmatter for empty lists
124 |     assert "tags:" not in content
125 | 
126 | 
127 | @pytest.mark.asyncio
128 | async def test_write_note_update_preserves_yaml_format(app, project_config, test_project):
129 |     """Test that updating a note preserves the YAML list format."""
130 |     # First, create the note
131 |     await write_note.fn(
132 |         project=test_project.name,
133 |         title="Update Format Test",
134 |         folder="test",
135 |         content="Initial content",
136 |         tags=["initial", "tag"],
137 |     )
138 | 
139 |     # Then update it with new tags
140 |     result = await write_note.fn(
141 |         project=test_project.name,
142 |         title="Update Format Test",
143 |         folder="test",
144 |         content="Updated content",
145 |         tags=["updated", "new-tag", "format"],
146 |     )
147 | 
148 |     # Should be an update, not a new creation
149 |     assert "Updated note" in result
150 | 
151 |     # Check the file format
152 |     file_path = project_config.home / "test" / "Update Format Test.md"
153 |     content = file_path.read_text(encoding="utf-8")
154 | 
155 |     # Should have proper YAML formatting for updated tags
156 |     assert "tags:" in content
157 |     assert "- updated" in content
158 |     assert "- new-tag" in content
159 |     assert "- format" in content
160 | 
161 |     # Old tags should be gone
162 |     assert "- initial" not in content
163 |     assert "- tag" not in content
164 | 
165 |     # Content should be updated
166 |     assert "Updated content" in content
167 |     assert "Initial content" not in content
168 | 
169 | 
170 | @pytest.mark.asyncio
171 | async def test_complex_tags_yaml_format(app, project_config, test_project):
172 |     """Test that complex tags with special characters format correctly."""
173 |     await write_note.fn(
174 |         project=test_project.name,
175 |         title="Complex Tags Test",
176 |         folder="test",
177 |         content="Testing complex tag formats",
178 |         tags=["python-3.9", "api_integration", "v2.0", "nested/category", "under_score"],
179 |     )
180 | 
181 |     file_path = project_config.home / "test" / "Complex Tags Test.md"
182 |     content = file_path.read_text(encoding="utf-8")
183 | 
184 |     # All complex tags should format correctly
185 |     assert "- python-3.9" in content
186 |     assert "- api_integration" in content
187 |     assert "- v2.0" in content
188 |     assert "- nested/category" in content
189 |     assert "- under_score" in content
190 | 
```

--------------------------------------------------------------------------------
/specs/SPEC-11 Basic Memory API Performance Optimization.md:
--------------------------------------------------------------------------------

```markdown
  1 | ---
  2 | title: 'SPEC-11: Basic Memory API Performance Optimization'
  3 | type: spec
  4 | permalink: specs/spec-11-basic-memory-api-performance-optimization
  5 | tags:
  6 | - performance
  7 | - api
  8 | - mcp
  9 | - database
 10 | - cloud
 11 | ---
 12 | 
 13 | # SPEC-11: Basic Memory API Performance Optimization
 14 | 
 15 | ## Why
 16 | 
 17 | The Basic Memory API experiences significant performance issues in cloud environments due to expensive per-request initialization. MCP tools making
 18 | HTTP requests to the API suffer from 350ms-2.6s latency overhead **before** any actual operation occurs.
 19 | 
 20 | **Root Cause Analysis:**
 21 | - GitHub Issue #82 shows repeated initialization sequences in logs (16:29:35 and 16:49:58)
 22 | - Each MCP tool call triggers full database initialization + project reconciliation
 23 | - `get_engine_factory()` dependency calls `db.get_or_create_db()` on every request
 24 | - `reconcile_projects_with_config()` runs expensive sync operations repeatedly
 25 | 
 26 | **Performance Impact:**
 27 | - Database connection setup: ~50-100ms per request
 28 | - Migration checks: ~100-500ms per request
 29 | - Project reconciliation: ~200ms-2s per request
 30 | - **Total overhead**: ~350ms-2.6s per MCP tool call
 31 | 
 32 | This creates compounding effects with tenant auto-start delays and increases timeout risk in cloud deployments.
 33 | 
 34 | ## What
 35 | 
 36 | This optimization affects the **core basic-memory repository** components:
 37 | 
 38 | 1. **API Lifespan Management** (`src/basic_memory/api/app.py`)
 39 |  - Cache database connections in app state during startup
 40 |  - Avoid repeated expensive initialization
 41 | 
 42 | 2. **Dependency Injection** (`src/basic_memory/deps.py`)
 43 |  - Modify `get_engine_factory()` to use cached connections
 44 |  - Eliminate per-request database setup
 45 | 
 46 | 3. **Initialization Service** (`src/basic_memory/services/initialization.py`)
 47 |  - Add caching/throttling to project reconciliation
 48 |  - Skip expensive operations when appropriate
 49 | 
 50 | 4. **Configuration** (`src/basic_memory/config.py`)
 51 |  - Add optional performance flags for cloud environments
 52 | 
 53 | **Backwards Compatibility**: All changes must be backwards compatible with existing CLI and non-cloud usage.
 54 | 
 55 | ## How (High Level)
 56 | 
 57 | ### Phase 1: Cache Database Connections (Critical - 80% of gains)
 58 | 
 59 | **Problem**: `get_engine_factory()` calls `db.get_or_create_db()` per request
 60 | **Solution**: Cache database engine/session in app state during lifespan
 61 | 
 62 | 1. **Modify API Lifespan** (`api/app.py`):
 63 |  ```python
 64 |  @asynccontextmanager
 65 |  async def lifespan(app: FastAPI):
 66 |      app_config = ConfigManager().config
 67 |      await initialize_app(app_config)
 68 | 
 69 |      # Cache database connection in app state
 70 |      engine, session_maker = await db.get_or_create_db(app_config.database_path)
 71 |      app.state.engine = engine
 72 |      app.state.session_maker = session_maker
 73 | 
 74 |      # ... rest of startup logic
 75 | ```
 76 | 
 77 | 2. Modify Dependency Injection (deps.py):
 78 | ```python
 79 | async def get_engine_factory(
 80 |   request: Request
 81 | ) -> tuple[AsyncEngine, async_sessionmaker[AsyncSession]]:
 82 |   """Get cached engine and session maker from app state."""
 83 |   return request.app.state.engine, request.app.state.session_maker
 84 | ```
 85 | Phase 2: Optimize Project Reconciliation (Secondary - 20% of gains)
 86 | 
 87 | Problem: reconcile_projects_with_config() runs expensive sync repeatedly
 88 | Solution: Add module-level caching with time-based throttling
 89 | 
 90 | 1. Add Reconciliation Cache (services/initialization.py):
 91 | ```ptyhon
 92 | _project_reconciliation_completed = False
 93 | _last_reconciliation_time = 0
 94 | 
 95 | async def reconcile_projects_with_config(app_config, force=False):
 96 |   # Skip if recently completed (within 60 seconds) unless forced
 97 |   if recently_completed and not force:
 98 |       return
 99 |   # ... existing logic
100 | ```
101 | Phase 3: Cloud Environment Flags (Optional)
102 | 
103 | Problem: Force expensive initialization in production environments
104 | Solution: Add skip flags for cloud/stateless deployments
105 | 
106 | 1. Add Config Flag (config.py):
107 | skip_initialization_sync: bool = Field(default=False)
108 | 2. Configure in Cloud (basic-memory-cloud integration):
109 | BASIC_MEMORY_SKIP_INITIALIZATION_SYNC=true
110 | 
111 | How to Evaluate
112 | 
113 | Success Criteria
114 | 
115 | 1. Performance Metrics (Primary):
116 | - MCP tool response time reduced by 50%+ (measure before/after)
117 | - Database connection overhead eliminated (0ms vs 50-100ms)
118 | - Migration check overhead eliminated (0ms vs 100-500ms)
119 | - Project reconciliation overhead reduced by 90%+
120 | 2. Load Testing:
121 | - Concurrent MCP tool calls maintain performance
122 | - No memory leaks in cached connections
123 | - Database connection pool behaves correctly
124 | 3. Functional Correctness:
125 | - All existing API endpoints work identically
126 | - MCP tools maintain full functionality
127 | - CLI operations unaffected
128 | - Database migrations still execute properly
129 | 4. Backwards Compatibility:
130 | - No breaking changes to existing APIs
131 | - Config changes are optional with safe defaults
132 | - Non-cloud deployments work unchanged
133 | 
134 | Testing Strategy
135 | 
136 | Performance Testing:
137 | # Before optimization
138 | time basic-memory-mcp-tools write_note "test" "content" "folder"
139 | # Measure: ~1-3 seconds
140 | 
141 | # After optimization  
142 | time basic-memory-mcp-tools write_note "test" "content" "folder"
143 | # Target: <500ms
144 | 
145 | Load Testing:
146 | # Multiple concurrent MCP tool calls
147 | for i in {1..10}; do
148 | basic-memory-mcp-tools search "test" &
149 | done
150 | wait
151 | # Verify: No degradation, consistent response times
152 | 
153 | Regression Testing:
154 | # Full basic-memory test suite
155 | just test
156 | # All tests must pass
157 | 
158 | # Integration tests with cloud deployment
159 | # Verify MCP gateway → API → database flow works
160 | 
161 | Validation Checklist
162 | 
163 | - Phase 1 Complete: Database connections cached, dependency injection optimized
164 | - Performance Benchmark: 50%+ improvement in MCP tool response times
165 | - Memory Usage: No leaks in cached connections over 24h+ periods
166 | - Stress Testing: 100+ concurrent requests maintain performance
167 | - Backwards Compatibility: All existing functionality preserved
168 | - Documentation: Performance optimization documented in README
169 | - Cloud Integration: basic-memory-cloud sees performance benefits
170 | 
171 | Notes
172 | 
173 | Implementation Priority:
174 | - Phase 1 provides 80% of performance gains and should be implemented first
175 | - Phase 2 provides remaining 20% and addresses edge cases
176 | - Phase 3 is optional for maximum cloud optimization
177 | 
178 | Risk Mitigation:
179 | - All changes backwards compatible
180 | - Gradual rollout possible (Phase 1 → 2 → 3)
181 | - Easy rollback via configuration flags
182 | 
183 | Cloud Integration:
184 | - This optimization directly addresses basic-memory-cloud issue #82
185 | - Changes in core basic-memory will benefit all cloud tenants
186 | - No changes needed in basic-memory-cloud itself
187 | 
```

--------------------------------------------------------------------------------
/specs/SPEC-1 Specification-Driven Development Process.md:
--------------------------------------------------------------------------------

```markdown
  1 | ---
  2 | title: 'SPEC-1: Specification-Driven Development Process'
  3 | type: spec
  4 | permalink: specs/spec-1-specification-driven-development-process
  5 | tags:
  6 | - process
  7 | - specification
  8 | - development
  9 | - meta
 10 | ---
 11 | 
 12 | # SPEC-1: Specification-Driven Development Process
 13 | 
 14 | ## Why
 15 | We're implementing specification-driven development to solve the complexity and circular refactoring issues in our web development process. 
 16 | Instead of getting lost in framework details and type gymnastics, we start with clear specifications that drive implementation.
 17 | 
 18 | The default approach of adhoc development with AI agents tends to result in:
 19 | - Circular refactoring cycles
 20 | - Fighting framework complexity
 21 | - Lost context between sessions
 22 | - Unclear requirements and scope
 23 | 
 24 | ## What
 25 | This spec defines our process for using basic-memory as the specification engine to build basic-memory-cloud. 
 26 | We're creating a recursive development pattern where basic-memory manages the specs that drive the development of basic-memory-cloud.
 27 | 
 28 | **Affected Areas:**
 29 | - All future component development
 30 | - Architecture decisions
 31 | - Agent collaboration workflows
 32 | - Knowledge management and context preservation
 33 | 
 34 | ## How (High Level)
 35 | 
 36 | ### Specification Structure
 37 | 
 38 | Name: Spec names should be numbered sequentially, followed by a description eg. `SPEC-X - Simple Description.md`.
 39 | See: [[Spec-2: Slash Commands Reference]]
 40 | 
 41 | Every spec is a complete thought containing:
 42 | - **Why**: The reasoning and problem being solved
 43 | - **What**: What is affected or changed
 44 | - **How**: High-level approach to implementation
 45 | - **How to Evaluate**: Testing/validation procedure
 46 | - Additional context as needed
 47 | 
 48 | ### Living Specification Format
 49 | 
 50 | Specifications are **living documents** that evolve throughout implementation:
 51 | 
 52 | **Progress Tracking:**
 53 | - **Completed items**: Use ✅ checkmark emoji for implemented features
 54 | - **Pending items**: Use `- [ ]` GitHub-style checkboxes for remaining tasks
 55 | - **In-progress items**: Use `- [x]` when work is actively underway
 56 | 
 57 | **Status Philosophy:**
 58 | - **Avoid static status headers** like "COMPLETE" or "IN PROGRESS" that become stale
 59 | - **Use checklists within content** to show granular implementation progress
 60 | - **Keep specs informative** while providing clear progress visibility
 61 | - **Update continuously** as understanding and implementation evolve
 62 | 
 63 | **Example Format:**
 64 | ```markdown
 65 | ### ComponentName
 66 | - ✅ Basic functionality implemented
 67 | - ✅ Props and events defined
 68 | - - [ ] Add sorting controls
 69 | - - [ ] Improve accessibility
 70 | - - [x] Currently implementing responsive design
 71 | ```
 72 | 
 73 | This creates **git-friendly progress tracking** where `[ ]` easily becomes `[x]` or ✅ when completed, and specs remain valuable throughout the development lifecycle.
 74 | 
 75 | 
 76 | ## Claude Code 
 77 | 
 78 | We will leverage Claude Code capabilities to make the process semi-automated. 
 79 | 
 80 | - Slash commands: define repeatable steps in the process (create spec, implement, review, etc)
 81 | - Agents: define roles to carry out instructions  (front end developer, baskend developer, etc)
 82 | - MCP tools: enable agents to implement specs via actions (write code, test, etc)
 83 | 
 84 | ### Workflow
 85 | 1. **Create**: Write spec as complete thought in `/specs` folder
 86 | 2. **Discuss**: Iterate and refine through agent collaboration
 87 | 3. **Implement**: Hand spec to appropriate specialist agent
 88 | 4. **Validate**: Review implementation against spec criteria
 89 | 5. **Document**: Update spec with learnings and decisions
 90 | 
 91 | ### Slash Commands
 92 | 
 93 | Claude slash commands are used to manage the flow.
 94 | These are simple instructions to help make the process uniform. 
 95 | They can be updated and refined as needed. 
 96 | 
 97 | - `/spec create [name]` - Create new specification
 98 | - `/spec status` - Show current spec states
 99 | - `/spec implement [name]` - Hand to appropriate agent
100 | - `/spec review [name]` - Validate implementation
101 | 
102 | ### Agent Orchestration
103 | 
104 | Agents are defined with clear roles, for instance:
105 | 
106 | - **system-architect**: Creates high-level specs, ADRs, architectural decisions
107 | - **vue-developer**: Component specs, UI patterns, frontend architecture
108 | - **python-developer**: Implementation specs, technical details, backend logic
109 | - 
110 | - Each agent reads/updates specs through basic-memory tools. 
111 | 
112 | ## How to Evaluate
113 | 
114 | ### Success Criteria
115 | - Specs provide clear, actionable guidance for implementation
116 | - Reduced circular refactoring and scope creep
117 | - Persistent context across development sessions
118 | - Clean separation between "what/why" and implementation details
119 | - Specs record a history of what happened and why for historical context
120 | 
121 | ### Testing Procedure
122 | 1. Create a spec for an existing problematic component
123 | 2. Have an agent implement following only the spec
124 | 3. Compare result quality and development speed vs. ad-hoc approach
125 | 4. Measure context preservation across sessions
126 | 5. Evaluate spec clarity and completeness
127 | 
128 | ### Metrics
129 | - Time from spec to working implementation
130 | - Number of refactoring cycles required
131 | - Agent understanding of requirements
132 | - Spec reusability for similar components
133 | 
134 | ## Notes
135 | - Start simple: specs are just complete thoughts, not heavy processes
136 | - Use basic-memory's knowledge graph to link specs, decisions, components
137 | - Let the process evolve naturally based on what works
138 | - Focus on solving the actual problem: Manage complexity in development
139 | 
140 | ## Observations
141 | 
142 | - [problem] Web development without clear goals and documentation circular refactoring cycles #complexity
143 | - [solution] Specification-driven development reduces scope creep and context loss #process-improvement  
144 | - [pattern] basic-memory as specification engine creates recursive development loop #meta-development
145 | - [workflow] Five-step process: Create → Discuss → Implement → Validate → Document #methodology
146 | - [tool] Slash commands provide uniform process automation #automation
147 | - [agent-pattern] Three specialized agents handle different implementation domains #specialization
148 | - [success-metric] Time from spec to working implementation measures process efficiency #measurement
149 | - [learning] Process should evolve naturally based on what works in practice #adaptation
150 | - [format] Living specifications use checklists for progress tracking instead of static status headers #documentation
151 | - [evolution] Specs evolve throughout implementation maintaining value as working documents #continuous-improvement
152 | 
153 | ## Relations
154 | 
155 | - spec [[Spec-2: Slash Commands Reference]]
156 | - spec [[Spec-3: Agent Definitions]]
157 | 
```

--------------------------------------------------------------------------------
/tests/api/test_search_router.py:
--------------------------------------------------------------------------------

```python
  1 | """Tests for search router."""
  2 | 
  3 | from datetime import datetime, timezone
  4 | 
  5 | import pytest
  6 | import pytest_asyncio
  7 | from sqlalchemy import text
  8 | 
  9 | from basic_memory import db
 10 | from basic_memory.schemas import Entity as EntitySchema
 11 | from basic_memory.schemas.search import SearchItemType, SearchResponse
 12 | 
 13 | 
 14 | @pytest_asyncio.fixture
 15 | async def indexed_entity(init_search_index, full_entity, search_service):
 16 |     """Create an entity and index it."""
 17 |     await search_service.index_entity(full_entity)
 18 |     return full_entity
 19 | 
 20 | 
 21 | @pytest.mark.asyncio
 22 | async def test_search_basic(client, indexed_entity, project_url):
 23 |     """Test basic text search."""
 24 |     response = await client.post(f"{project_url}/search/", json={"text": "search"})
 25 |     assert response.status_code == 200
 26 |     search_results = SearchResponse.model_validate(response.json())
 27 |     assert len(search_results.results) == 3
 28 | 
 29 |     found = False
 30 |     for r in search_results.results:
 31 |         if r.type == SearchItemType.ENTITY.value:
 32 |             assert r.permalink == indexed_entity.permalink
 33 |             found = True
 34 | 
 35 |     assert found, "Expected to find indexed entity in results"
 36 | 
 37 | 
 38 | @pytest.mark.asyncio
 39 | async def test_search_basic_pagination(client, indexed_entity, project_url):
 40 |     """Test basic text search."""
 41 |     response = await client.post(
 42 |         f"{project_url}/search/?page=3&page_size=1", json={"text": "search"}
 43 |     )
 44 |     assert response.status_code == 200
 45 |     search_results = SearchResponse.model_validate(response.json())
 46 |     assert len(search_results.results) == 1
 47 | 
 48 |     assert search_results.current_page == 3
 49 |     assert search_results.page_size == 1
 50 | 
 51 | 
 52 | @pytest.mark.asyncio
 53 | async def test_search_with_entity_type_filter(client, indexed_entity, project_url):
 54 |     """Test search with type filter."""
 55 |     # Should find with correct type
 56 |     response = await client.post(
 57 |         f"{project_url}/search/",
 58 |         json={"text": "test", "entity_types": [SearchItemType.ENTITY.value]},
 59 |     )
 60 |     assert response.status_code == 200
 61 |     search_results = SearchResponse.model_validate(response.json())
 62 |     assert len(search_results.results) > 0
 63 | 
 64 |     # Should find with relation type
 65 |     response = await client.post(
 66 |         f"{project_url}/search/",
 67 |         json={"text": "test", "entity_types": [SearchItemType.RELATION.value]},
 68 |     )
 69 |     assert response.status_code == 200
 70 |     search_results = SearchResponse.model_validate(response.json())
 71 |     assert len(search_results.results) == 2
 72 | 
 73 | 
 74 | @pytest.mark.asyncio
 75 | async def test_search_with_type_filter(client, indexed_entity, project_url):
 76 |     """Test search with entity type filter."""
 77 |     # Should find with correct entity type
 78 |     response = await client.post(f"{project_url}/search/", json={"text": "test", "types": ["test"]})
 79 |     assert response.status_code == 200
 80 |     search_results = SearchResponse.model_validate(response.json())
 81 |     assert len(search_results.results) == 1
 82 | 
 83 |     # Should not find with wrong entity type
 84 |     response = await client.post(f"{project_url}/search/", json={"text": "test", "types": ["note"]})
 85 |     assert response.status_code == 200
 86 |     search_results = SearchResponse.model_validate(response.json())
 87 |     assert len(search_results.results) == 0
 88 | 
 89 | 
 90 | @pytest.mark.asyncio
 91 | async def test_search_with_date_filter(client, indexed_entity, project_url):
 92 |     """Test search with date filter."""
 93 |     # Should find with past date
 94 |     past_date = datetime(2020, 1, 1, tzinfo=timezone.utc)
 95 |     response = await client.post(
 96 |         f"{project_url}/search/", json={"text": "test", "after_date": past_date.isoformat()}
 97 |     )
 98 |     assert response.status_code == 200
 99 |     search_results = SearchResponse.model_validate(response.json())
100 | 
101 |     # Should not find with future date
102 |     future_date = datetime(2030, 1, 1, tzinfo=timezone.utc)
103 |     response = await client.post(
104 |         f"{project_url}/search/", json={"text": "test", "after_date": future_date.isoformat()}
105 |     )
106 |     assert response.status_code == 200
107 |     search_results = SearchResponse.model_validate(response.json())
108 |     assert len(search_results.results) == 0
109 | 
110 | 
111 | @pytest.mark.asyncio
112 | async def test_search_empty(search_service, client, project_url):
113 |     """Test search with no matches."""
114 |     response = await client.post(f"{project_url}/search/", json={"text": "nonexistent"})
115 |     assert response.status_code == 200
116 |     search_result = SearchResponse.model_validate(response.json())
117 |     assert len(search_result.results) == 0
118 | 
119 | 
120 | @pytest.mark.asyncio
121 | async def test_reindex(client, search_service, entity_service, session_maker, project_url):
122 |     """Test reindex endpoint."""
123 |     # Create test entity and document
124 |     await entity_service.create_entity(
125 |         EntitySchema(
126 |             title="TestEntity1",
127 |             folder="test",
128 |             entity_type="test",
129 |         ),
130 |     )
131 | 
132 |     # Clear search index
133 |     async with db.scoped_session(session_maker) as session:
134 |         await session.execute(text("DELETE FROM search_index"))
135 |         await session.commit()
136 | 
137 |     # Verify nothing is searchable
138 |     response = await client.post(f"{project_url}/search/", json={"text": "test"})
139 |     search_results = SearchResponse.model_validate(response.json())
140 |     assert len(search_results.results) == 0
141 | 
142 |     # Trigger reindex
143 |     reindex_response = await client.post(f"{project_url}/search/reindex")
144 |     assert reindex_response.status_code == 200
145 |     assert reindex_response.json()["status"] == "ok"
146 | 
147 |     # Verify content is searchable again
148 |     search_response = await client.post(f"{project_url}/search/", json={"text": "test"})
149 |     search_results = SearchResponse.model_validate(search_response.json())
150 |     assert len(search_results.results) == 1
151 | 
152 | 
153 | @pytest.mark.asyncio
154 | async def test_multiple_filters(client, indexed_entity, project_url):
155 |     """Test search with multiple filters combined."""
156 |     response = await client.post(
157 |         f"{project_url}/search/",
158 |         json={
159 |             "text": "test",
160 |             "entity_types": [SearchItemType.ENTITY.value],
161 |             "types": ["test"],
162 |             "after_date": datetime(2020, 1, 1, tzinfo=timezone.utc).isoformat(),
163 |         },
164 |     )
165 |     assert response.status_code == 200
166 |     search_result = SearchResponse.model_validate(response.json())
167 |     assert len(search_result.results) == 1
168 |     result = search_result.results[0]
169 |     assert result.permalink == indexed_entity.permalink
170 |     assert result.type == SearchItemType.ENTITY.value
171 |     assert result.metadata["entity_type"] == "test"
172 | 
```

--------------------------------------------------------------------------------
/src/basic_memory/schemas/response.py:
--------------------------------------------------------------------------------

```python
  1 | """Response schemas for knowledge graph operations.
  2 | 
  3 | This module defines the response formats for all knowledge graph operations.
  4 | Each response includes complete information about the affected entities,
  5 | including IDs that can be used in subsequent operations.
  6 | 
  7 | Key Features:
  8 | 1. Every created/updated object gets an ID
  9 | 2. Relations are included with their parent entities
 10 | 3. Responses include everything needed for next operations
 11 | 4. Bulk operations return all affected items
 12 | """
 13 | 
 14 | from datetime import datetime
 15 | from typing import List, Optional, Dict
 16 | 
 17 | from pydantic import BaseModel, ConfigDict, Field, AliasPath, AliasChoices
 18 | 
 19 | from basic_memory.schemas.base import Relation, Permalink, EntityType, ContentType, Observation
 20 | 
 21 | 
 22 | class SQLAlchemyModel(BaseModel):
 23 |     """Base class for models that read from SQLAlchemy attributes.
 24 | 
 25 |     This base class handles conversion of SQLAlchemy model attributes
 26 |     to Pydantic model fields. All response models extend this to ensure
 27 |     proper handling of database results.
 28 |     """
 29 | 
 30 |     model_config = ConfigDict(from_attributes=True)
 31 | 
 32 | 
 33 | class ObservationResponse(Observation, SQLAlchemyModel):
 34 |     """Schema for observation data returned from the service.
 35 | 
 36 |     Each observation gets a unique ID that can be used for later
 37 |     reference or deletion.
 38 | 
 39 |     Example Response:
 40 |     {
 41 |         "category": "feature",
 42 |         "content": "Added support for async operations",
 43 |         "context": "Initial database design meeting"
 44 |     }
 45 |     """
 46 | 
 47 |     permalink: Permalink
 48 | 
 49 | 
 50 | class RelationResponse(Relation, SQLAlchemyModel):
 51 |     """Response schema for relation operations.
 52 | 
 53 |     Extends the base Relation model with a unique ID that can be
 54 |     used for later modification or deletion.
 55 | 
 56 |     Example Response:
 57 |     {
 58 |         "from_id": "test/memory_test",
 59 |         "to_id": "component/memory-service",
 60 |         "relation_type": "validates",
 61 |         "context": "Comprehensive test suite"
 62 |     }
 63 |     """
 64 | 
 65 |     permalink: Permalink
 66 | 
 67 |     from_id: Permalink = Field(
 68 |         # use the permalink from the associated Entity
 69 |         # or the from_id value
 70 |         validation_alias=AliasChoices(
 71 |             AliasPath("from_entity", "permalink"),
 72 |             "from_id",
 73 |         )
 74 |     )
 75 |     to_id: Optional[Permalink] = Field(  # pyright: ignore
 76 |         # use the permalink from the associated Entity
 77 |         # or the to_id value
 78 |         validation_alias=AliasChoices(
 79 |             AliasPath("to_entity", "permalink"),
 80 |             "to_id",
 81 |         ),
 82 |         default=None,
 83 |     )
 84 |     to_name: Optional[Permalink] = Field(
 85 |         # use the permalink from the associated Entity
 86 |         # or the to_id value
 87 |         validation_alias=AliasChoices(
 88 |             AliasPath("to_entity", "title"),
 89 |             "to_name",
 90 |         ),
 91 |         default=None,
 92 |     )
 93 | 
 94 | 
 95 | class EntityResponse(SQLAlchemyModel):
 96 |     """Complete entity data returned from the service.
 97 | 
 98 |     This is the most comprehensive entity view, including:
 99 |     1. Basic entity details (id, name, type)
100 |     2. All observations with their IDs
101 |     3. All relations with their IDs
102 |     4. Optional description
103 | 
104 |     Example Response:
105 |     {
106 |         "permalink": "component/memory-service",
107 |         "file_path": "MemoryService",
108 |         "entity_type": "component",
109 |         "entity_metadata": {}
110 |         "content_type: "text/markdown"
111 |         "observations": [
112 |             {
113 |                 "category": "feature",
114 |                 "content": "Uses SQLite storage"
115 |                 "context": "Initial design"
116 |             },
117 |             {
118 |                 "category": "feature",
119 |                 "content": "Implements async operations"
120 |                 "context": "Initial design"
121 |             }
122 |         ],
123 |         "relations": [
124 |             {
125 |                 "from_id": "test/memory-test",
126 |                 "to_id": "component/memory-service",
127 |                 "relation_type": "validates",
128 |                 "context": "Main test suite"
129 |             }
130 |         ]
131 |     }
132 |     """
133 | 
134 |     permalink: Optional[Permalink]
135 |     title: str
136 |     file_path: str
137 |     entity_type: EntityType
138 |     entity_metadata: Optional[Dict] = None
139 |     checksum: Optional[str] = None
140 |     content_type: ContentType
141 |     observations: List[ObservationResponse] = []
142 |     relations: List[RelationResponse] = []
143 |     created_at: datetime
144 |     updated_at: datetime
145 | 
146 | 
147 | class EntityListResponse(SQLAlchemyModel):
148 |     """Response for create_entities operation.
149 | 
150 |     Returns complete information about entities returned from the service,
151 |     including their permalinks, observations,
152 |     and any established relations.
153 | 
154 |     Example Response:
155 |     {
156 |         "entities": [
157 |             {
158 |                 "permalink": "component/search_service",
159 |                 "title": "SearchService",
160 |                 "entity_type": "component",
161 |                 "description": "Knowledge graph search",
162 |                 "observations": [
163 |                     {
164 |                         "content": "Implements full-text search"
165 |                     }
166 |                 ],
167 |                 "relations": []
168 |             },
169 |             {
170 |                 "permalink": "document/api_docs",
171 |                 "title": "API_Documentation",
172 |                 "entity_type": "document",
173 |                 "description": "API Reference",
174 |                 "observations": [
175 |                     {
176 |                         "content": "Documents REST endpoints"
177 |                     }
178 |                 ],
179 |                 "relations": []
180 |             }
181 |         ]
182 |     }
183 |     """
184 | 
185 |     entities: List[EntityResponse]
186 | 
187 | 
188 | class SearchNodesResponse(SQLAlchemyModel):
189 |     """Response for search operation.
190 | 
191 |     Returns matching entities with their complete information,
192 |     plus the original query for reference.
193 | 
194 |     Example Response:
195 |     {
196 |         "matches": [
197 |             {
198 |                 "permalink": "component/memory-service",
199 |                 "title": "MemoryService",
200 |                 "entity_type": "component",
201 |                 "description": "Core service",
202 |                 "observations": [...],
203 |                 "relations": [...]
204 |             }
205 |         ],
206 |         "query": "memory"
207 |     }
208 | 
209 |     Note: Each entity in matches includes full details
210 |     just like EntityResponse.
211 |     """
212 | 
213 |     matches: List[EntityResponse]
214 |     query: str
215 | 
216 | 
217 | class DeleteEntitiesResponse(SQLAlchemyModel):
218 |     """Response indicating successful entity deletion.
219 | 
220 |     A simple boolean response confirming the delete operation
221 |     completed successfully.
222 | 
223 |     Example Response:
224 |     {
225 |         "deleted": true
226 |     }
227 |     """
228 | 
229 |     deleted: bool
230 | 
```

--------------------------------------------------------------------------------
/src/basic_memory/mcp/prompts/recent_activity.py:
--------------------------------------------------------------------------------

```python
  1 | """Recent activity prompts for Basic Memory MCP server.
  2 | 
  3 | These prompts help users see what has changed in their knowledge base recently.
  4 | """
  5 | 
  6 | from typing import Annotated, Optional
  7 | 
  8 | from loguru import logger
  9 | from pydantic import Field
 10 | 
 11 | from basic_memory.mcp.prompts.utils import format_prompt_context, PromptContext, PromptContextItem
 12 | from basic_memory.mcp.server import mcp
 13 | from basic_memory.mcp.tools.recent_activity import recent_activity
 14 | from basic_memory.schemas.base import TimeFrame
 15 | from basic_memory.schemas.memory import GraphContext, ProjectActivitySummary
 16 | from basic_memory.schemas.search import SearchItemType
 17 | 
 18 | 
 19 | @mcp.prompt(
 20 |     name="recent_activity",
 21 |     description="Get recent activity from a specific project or across all projects",
 22 | )
 23 | async def recent_activity_prompt(
 24 |     timeframe: Annotated[
 25 |         TimeFrame,
 26 |         Field(description="How far back to look for activity (e.g. '1d', '1 week')"),
 27 |     ] = "7d",
 28 |     project: Annotated[
 29 |         Optional[str],
 30 |         Field(
 31 |             description="Specific project to get activity from (None for discovery across all projects)"
 32 |         ),
 33 |     ] = None,
 34 | ) -> str:
 35 |     """Get recent activity from a specific project or across all projects.
 36 | 
 37 |     This prompt helps you see what's changed recently in the knowledge base.
 38 |     In discovery mode (project=None), it shows activity across all projects.
 39 |     In project-specific mode, it shows detailed activity for one project.
 40 | 
 41 |     Args:
 42 |         timeframe: How far back to look for activity (e.g. '1d', '1 week')
 43 |         project: Specific project to get activity from (None for discovery across all projects)
 44 | 
 45 |     Returns:
 46 |         Formatted summary of recent activity
 47 |     """
 48 |     logger.info(f"Getting recent activity, timeframe: {timeframe}, project: {project}")
 49 | 
 50 |     recent = await recent_activity.fn(
 51 |         project=project, timeframe=timeframe, type=[SearchItemType.ENTITY]
 52 |     )
 53 | 
 54 |     # Extract primary results from the hierarchical structure
 55 |     primary_results = []
 56 |     related_results = []
 57 | 
 58 |     if isinstance(recent, ProjectActivitySummary):
 59 |         # Discovery mode - extract results from all projects
 60 |         for _, project_activity in recent.projects.items():
 61 |             if project_activity.activity.results:
 62 |                 # Take up to 2 primary results per project
 63 |                 for item in project_activity.activity.results[:2]:
 64 |                     primary_results.append(item.primary_result)
 65 |                     # Add up to 1 related result per primary item
 66 |                     if item.related_results:
 67 |                         related_results.extend(item.related_results[:1])
 68 | 
 69 |         # Limit total results for readability
 70 |         primary_results = primary_results[:8]
 71 |         related_results = related_results[:6]
 72 | 
 73 |     elif isinstance(recent, GraphContext):
 74 |         # Project-specific mode - use existing logic
 75 |         if recent.results:
 76 |             # Take up to 5 primary results
 77 |             for item in recent.results[:5]:
 78 |                 primary_results.append(item.primary_result)
 79 |                 # Add up to 2 related results per primary item
 80 |                 if item.related_results:
 81 |                     related_results.extend(item.related_results[:2])
 82 | 
 83 |     # Set topic based on mode
 84 |     if project:
 85 |         topic = f"Recent Activity in {project} ({timeframe})"
 86 |     else:
 87 |         topic = f"Recent Activity Across All Projects ({timeframe})"
 88 | 
 89 |     prompt_context = format_prompt_context(
 90 |         PromptContext(
 91 |             topic=topic,
 92 |             timeframe=timeframe,
 93 |             results=[
 94 |                 PromptContextItem(
 95 |                     primary_results=primary_results,
 96 |                     related_results=related_results[:10],  # Limit total related results
 97 |                 )
 98 |             ],
 99 |         )
100 |     )
101 | 
102 |     # Add mode-specific suggestions
103 |     first_title = "Recent Topic"
104 |     if primary_results and len(primary_results) > 0:
105 |         first_title = primary_results[0].title
106 | 
107 |     if project:
108 |         # Project-specific suggestions
109 |         capture_suggestions = f"""
110 |     ## Opportunity to Capture Activity Summary
111 | 
112 |     Consider creating a summary note of recent activity in {project}:
113 | 
114 |     ```python
115 |     await write_note(
116 |         "{project}",
117 |         title="Activity Summary {timeframe}",
118 |         content='''
119 |         # Activity Summary for {project} ({timeframe})
120 | 
121 |         ## Overview
122 |         [Summary of key changes and developments in this project over this period]
123 | 
124 |         ## Key Updates
125 |         [List main updates and their significance within this project]
126 | 
127 |         ## Observations
128 |         - [trend] [Observation about patterns in recent activity]
129 |         - [insight] [Connection between different activities]
130 | 
131 |         ## Relations
132 |         - summarizes [[{first_title}]]
133 |         - relates_to [[{project} Overview]]
134 |         ''',
135 |         folder="summaries"
136 |     )
137 |     ```
138 | 
139 |     Summarizing periodic activity helps create high-level insights and connections within the project.
140 |     """
141 |     else:
142 |         # Discovery mode suggestions
143 |         project_count = len(recent.projects) if isinstance(recent, ProjectActivitySummary) else 0
144 |         most_active = (
145 |             getattr(recent.summary, "most_active_project", "Unknown")
146 |             if isinstance(recent, ProjectActivitySummary)
147 |             else "Unknown"
148 |         )
149 | 
150 |         capture_suggestions = f"""
151 |     ## Cross-Project Activity Discovery
152 | 
153 |     Found activity across {project_count} projects. Most active: **{most_active}**
154 | 
155 |     Consider creating a cross-project summary:
156 | 
157 |     ```python
158 |     await write_note(
159 |         "{most_active if most_active != "Unknown" else "main"}",
160 |         title="Cross-Project Activity Summary {timeframe}",
161 |         content='''
162 |         # Cross-Project Activity Summary ({timeframe})
163 | 
164 |         ## Overview
165 |         Activity found across {project_count} projects, with {most_active} showing the most activity.
166 | 
167 |         ## Key Developments
168 |         [Summarize important changes across all projects]
169 | 
170 |         ## Project Insights
171 |         [Note patterns or connections between projects]
172 | 
173 |         ## Observations
174 |         - [trend] [Cross-project patterns observed]
175 |         - [insight] [Connections between different project activities]
176 | 
177 |         ## Relations
178 |         - summarizes [[{first_title}]]
179 |         - relates_to [[Project Portfolio Overview]]
180 |         ''',
181 |         folder="summaries"
182 |     )
183 |     ```
184 | 
185 |     Cross-project summaries help identify broader trends and project interconnections.
186 |     """
187 | 
188 |     return prompt_context + capture_suggestions
189 | 
```

--------------------------------------------------------------------------------
/src/basic_memory/cli/commands/status.py:
--------------------------------------------------------------------------------

```python
  1 | """Status command for basic-memory CLI."""
  2 | 
  3 | import asyncio
  4 | from typing import Set, Dict
  5 | from typing import Annotated, Optional
  6 | 
  7 | from mcp.server.fastmcp.exceptions import ToolError
  8 | import typer
  9 | from loguru import logger
 10 | from rich.console import Console
 11 | from rich.panel import Panel
 12 | from rich.tree import Tree
 13 | 
 14 | from basic_memory.cli.app import app
 15 | from basic_memory.mcp.async_client import get_client
 16 | from basic_memory.mcp.tools.utils import call_post
 17 | from basic_memory.schemas import SyncReportResponse
 18 | from basic_memory.mcp.project_context import get_active_project
 19 | 
 20 | # Create rich console
 21 | console = Console()
 22 | 
 23 | 
 24 | def add_files_to_tree(
 25 |     tree: Tree, paths: Set[str], style: str, checksums: Dict[str, str] | None = None
 26 | ):
 27 |     """Add files to tree, grouped by directory."""
 28 |     # Group by directory
 29 |     by_dir = {}
 30 |     for path in sorted(paths):
 31 |         parts = path.split("/", 1)
 32 |         dir_name = parts[0] if len(parts) > 1 else ""
 33 |         file_name = parts[1] if len(parts) > 1 else parts[0]
 34 |         by_dir.setdefault(dir_name, []).append((file_name, path))
 35 | 
 36 |     # Add to tree
 37 |     for dir_name, files in sorted(by_dir.items()):
 38 |         if dir_name:
 39 |             branch = tree.add(f"[bold]{dir_name}/[/bold]")
 40 |         else:
 41 |             branch = tree
 42 | 
 43 |         for file_name, full_path in sorted(files):
 44 |             if checksums and full_path in checksums:
 45 |                 checksum_short = checksums[full_path][:8]
 46 |                 branch.add(f"[{style}]{file_name}[/{style}] ({checksum_short})")
 47 |             else:
 48 |                 branch.add(f"[{style}]{file_name}[/{style}]")
 49 | 
 50 | 
 51 | def group_changes_by_directory(changes: SyncReportResponse) -> Dict[str, Dict[str, int]]:
 52 |     """Group changes by directory for summary view."""
 53 |     by_dir = {}
 54 |     for change_type, paths in [
 55 |         ("new", changes.new),
 56 |         ("modified", changes.modified),
 57 |         ("deleted", changes.deleted),
 58 |     ]:
 59 |         for path in paths:
 60 |             dir_name = path.split("/", 1)[0]
 61 |             by_dir.setdefault(dir_name, {"new": 0, "modified": 0, "deleted": 0, "moved": 0})
 62 |             by_dir[dir_name][change_type] += 1
 63 | 
 64 |     # Handle moves - count in both source and destination directories
 65 |     for old_path, new_path in changes.moves.items():
 66 |         old_dir = old_path.split("/", 1)[0]
 67 |         new_dir = new_path.split("/", 1)[0]
 68 |         by_dir.setdefault(old_dir, {"new": 0, "modified": 0, "deleted": 0, "moved": 0})
 69 |         by_dir.setdefault(new_dir, {"new": 0, "modified": 0, "deleted": 0, "moved": 0})
 70 |         by_dir[old_dir]["moved"] += 1
 71 |         if old_dir != new_dir:
 72 |             by_dir[new_dir]["moved"] += 1
 73 | 
 74 |     return by_dir
 75 | 
 76 | 
 77 | def build_directory_summary(counts: Dict[str, int]) -> str:
 78 |     """Build summary string for directory changes."""
 79 |     parts = []
 80 |     if counts["new"]:
 81 |         parts.append(f"[green]+{counts['new']} new[/green]")
 82 |     if counts["modified"]:
 83 |         parts.append(f"[yellow]~{counts['modified']} modified[/yellow]")
 84 |     if counts["moved"]:
 85 |         parts.append(f"[blue]↔{counts['moved']} moved[/blue]")
 86 |     if counts["deleted"]:
 87 |         parts.append(f"[red]-{counts['deleted']} deleted[/red]")
 88 |     return " ".join(parts)
 89 | 
 90 | 
 91 | def display_changes(
 92 |     project_name: str, title: str, changes: SyncReportResponse, verbose: bool = False
 93 | ):
 94 |     """Display changes using Rich for better visualization."""
 95 |     tree = Tree(f"{project_name}: {title}")
 96 | 
 97 |     if changes.total == 0 and not changes.skipped_files:
 98 |         tree.add("No changes")
 99 |         console.print(Panel(tree, expand=False))
100 |         return
101 | 
102 |     if verbose:
103 |         # Full file listing with checksums
104 |         if changes.new:
105 |             new_branch = tree.add("[green]New Files[/green]")
106 |             add_files_to_tree(new_branch, changes.new, "green", changes.checksums)
107 |         if changes.modified:
108 |             mod_branch = tree.add("[yellow]Modified[/yellow]")
109 |             add_files_to_tree(mod_branch, changes.modified, "yellow", changes.checksums)
110 |         if changes.moves:
111 |             move_branch = tree.add("[blue]Moved[/blue]")
112 |             for old_path, new_path in sorted(changes.moves.items()):
113 |                 move_branch.add(f"[blue]{old_path}[/blue] → [blue]{new_path}[/blue]")
114 |         if changes.deleted:
115 |             del_branch = tree.add("[red]Deleted[/red]")
116 |             add_files_to_tree(del_branch, changes.deleted, "red")
117 |         if changes.skipped_files:
118 |             skip_branch = tree.add("[red]⚠️  Skipped (Circuit Breaker)[/red]")
119 |             for skipped in sorted(changes.skipped_files, key=lambda x: x.path):
120 |                 skip_branch.add(
121 |                     f"[red]{skipped.path}[/red] "
122 |                     f"(failures: {skipped.failure_count}, reason: {skipped.reason})"
123 |                 )
124 |     else:
125 |         # Show directory summaries
126 |         by_dir = group_changes_by_directory(changes)
127 |         for dir_name, counts in sorted(by_dir.items()):
128 |             summary = build_directory_summary(counts)
129 |             if summary:  # Only show directories with changes
130 |                 tree.add(f"[bold]{dir_name}/[/bold] {summary}")
131 | 
132 |         # Show skipped files summary in non-verbose mode
133 |         if changes.skipped_files:
134 |             skip_count = len(changes.skipped_files)
135 |             tree.add(
136 |                 f"[red]⚠️  {skip_count} file{'s' if skip_count != 1 else ''} "
137 |                 f"skipped due to repeated failures[/red]"
138 |             )
139 | 
140 |     console.print(Panel(tree, expand=False))
141 | 
142 | 
143 | async def run_status(project: Optional[str] = None, verbose: bool = False):  # pragma: no cover
144 |     """Check sync status of files vs database."""
145 | 
146 |     try:
147 |         async with get_client() as client:
148 |             project_item = await get_active_project(client, project, None)
149 |             response = await call_post(client, f"{project_item.project_url}/project/status")
150 |             sync_report = SyncReportResponse.model_validate(response.json())
151 | 
152 |             display_changes(project_item.name, "Status", sync_report, verbose)
153 | 
154 |     except (ValueError, ToolError) as e:
155 |         console.print(f"[red]✗ Error: {e}[/red]")
156 |         raise typer.Exit(1)
157 | 
158 | 
159 | @app.command()
160 | def status(
161 |     project: Annotated[
162 |         Optional[str],
163 |         typer.Option(help="The project name."),
164 |     ] = None,
165 |     verbose: bool = typer.Option(False, "--verbose", "-v", help="Show detailed file information"),
166 | ):
167 |     """Show sync status between files and database."""
168 |     try:
169 |         asyncio.run(run_status(project, verbose))  # pragma: no cover
170 |     except Exception as e:
171 |         logger.error(f"Error checking status: {e}")
172 |         typer.echo(f"Error checking status: {e}", err=True)
173 |         raise typer.Exit(code=1)  # pragma: no cover
174 | 
```

--------------------------------------------------------------------------------
/tests/cli/test_import_chatgpt.py:
--------------------------------------------------------------------------------

```python
  1 | """Tests for import_chatgpt command."""
  2 | 
  3 | import json
  4 | 
  5 | import pytest
  6 | from typer.testing import CliRunner
  7 | 
  8 | from basic_memory.cli.app import app, import_app
  9 | from basic_memory.cli.commands import import_chatgpt  # noqa
 10 | from basic_memory.config import get_project_config
 11 | 
 12 | # Set up CLI runner
 13 | runner = CliRunner()
 14 | 
 15 | 
 16 | @pytest.fixture
 17 | def sample_conversation():
 18 |     """Sample ChatGPT conversation data for testing."""
 19 |     return {
 20 |         "title": "Test Conversation",
 21 |         "create_time": 1736616594.24054,  # Example timestamp
 22 |         "update_time": 1736616603.164995,
 23 |         "mapping": {
 24 |             "root": {"id": "root", "message": None, "parent": None, "children": ["msg1"]},
 25 |             "msg1": {
 26 |                 "id": "msg1",
 27 |                 "message": {
 28 |                     "id": "msg1",
 29 |                     "author": {"role": "user", "name": None, "metadata": {}},
 30 |                     "create_time": 1736616594.24054,
 31 |                     "content": {"content_type": "text", "parts": ["Hello, this is a test message"]},
 32 |                     "status": "finished_successfully",
 33 |                     "metadata": {},
 34 |                 },
 35 |                 "parent": "root",
 36 |                 "children": ["msg2"],
 37 |             },
 38 |             "msg2": {
 39 |                 "id": "msg2",
 40 |                 "message": {
 41 |                     "id": "msg2",
 42 |                     "author": {"role": "assistant", "name": None, "metadata": {}},
 43 |                     "create_time": 1736616603.164995,
 44 |                     "content": {"content_type": "text", "parts": ["This is a test response"]},
 45 |                     "status": "finished_successfully",
 46 |                     "metadata": {},
 47 |                 },
 48 |                 "parent": "msg1",
 49 |                 "children": [],
 50 |             },
 51 |         },
 52 |     }
 53 | 
 54 | 
 55 | @pytest.fixture
 56 | def sample_conversation_with_code():
 57 |     """Sample conversation with code block."""
 58 |     conversation = {
 59 |         "title": "Code Test",
 60 |         "create_time": 1736616594.24054,
 61 |         "update_time": 1736616603.164995,
 62 |         "mapping": {
 63 |             "root": {"id": "root", "message": None, "parent": None, "children": ["msg1"]},
 64 |             "msg1": {
 65 |                 "id": "msg1",
 66 |                 "message": {
 67 |                     "id": "msg1",
 68 |                     "author": {"role": "assistant", "name": None, "metadata": {}},
 69 |                     "create_time": 1736616594.24054,
 70 |                     "content": {
 71 |                         "content_type": "code",
 72 |                         "language": "python",
 73 |                         "text": "def hello():\n    print('Hello world!')",
 74 |                     },
 75 |                     "status": "finished_successfully",
 76 |                     "metadata": {},
 77 |                 },
 78 |                 "parent": "root",
 79 |                 "children": [],
 80 |             },
 81 |             "msg2": {
 82 |                 "id": "msg2",
 83 |                 "message": {
 84 |                     "id": "msg2",
 85 |                     "author": {"role": "assistant", "name": None, "metadata": {}},
 86 |                     "create_time": 1736616594.24054,
 87 |                     "status": "finished_successfully",
 88 |                     "metadata": {},
 89 |                 },
 90 |                 "parent": "root",
 91 |                 "children": [],
 92 |             },
 93 |         },
 94 |     }
 95 |     return conversation
 96 | 
 97 | 
 98 | @pytest.fixture
 99 | def sample_conversation_with_hidden():
100 |     """Sample conversation with hidden messages."""
101 |     conversation = {
102 |         "title": "Hidden Test",
103 |         "create_time": 1736616594.24054,
104 |         "update_time": 1736616603.164995,
105 |         "mapping": {
106 |             "root": {
107 |                 "id": "root",
108 |                 "message": None,
109 |                 "parent": None,
110 |                 "children": ["visible", "hidden"],
111 |             },
112 |             "visible": {
113 |                 "id": "visible",
114 |                 "message": {
115 |                     "id": "visible",
116 |                     "author": {"role": "user", "name": None, "metadata": {}},
117 |                     "create_time": 1736616594.24054,
118 |                     "content": {"content_type": "text", "parts": ["Visible message"]},
119 |                     "status": "finished_successfully",
120 |                     "metadata": {},
121 |                 },
122 |                 "parent": "root",
123 |                 "children": [],
124 |             },
125 |             "hidden": {
126 |                 "id": "hidden",
127 |                 "message": {
128 |                     "id": "hidden",
129 |                     "author": {"role": "system", "name": None, "metadata": {}},
130 |                     "create_time": 1736616594.24054,
131 |                     "content": {"content_type": "text", "parts": ["Hidden message"]},
132 |                     "status": "finished_successfully",
133 |                     "metadata": {"is_visually_hidden_from_conversation": True},
134 |                 },
135 |                 "parent": "root",
136 |                 "children": [],
137 |             },
138 |         },
139 |     }
140 |     return conversation
141 | 
142 | 
143 | @pytest.fixture
144 | def sample_chatgpt_json(tmp_path, sample_conversation):
145 |     """Create a sample ChatGPT JSON file."""
146 |     json_file = tmp_path / "conversations.json"
147 |     with open(json_file, "w", encoding="utf-8") as f:
148 |         json.dump([sample_conversation], f)
149 |     return json_file
150 | 
151 | 
152 | def test_import_chatgpt_command_success(tmp_path, sample_chatgpt_json, monkeypatch):
153 |     """Test successful conversation import via command."""
154 |     # Set up test environment
155 |     monkeypatch.setenv("HOME", str(tmp_path))
156 | 
157 |     # Run import
158 |     result = runner.invoke(import_app, ["chatgpt", str(sample_chatgpt_json)])
159 |     assert result.exit_code == 0
160 |     assert "Import complete" in result.output
161 |     assert "Imported 1 conversations" in result.output
162 |     assert "Containing 2 messages" in result.output
163 | 
164 | 
165 | def test_import_chatgpt_command_invalid_json(tmp_path):
166 |     """Test error handling for invalid JSON."""
167 |     # Create invalid JSON file
168 |     invalid_file = tmp_path / "invalid.json"
169 |     invalid_file.write_text("not json")
170 | 
171 |     result = runner.invoke(import_app, ["chatgpt", str(invalid_file)])
172 |     assert result.exit_code == 1
173 |     assert "Error during import" in result.output
174 | 
175 | 
176 | def test_import_chatgpt_with_custom_folder(tmp_path, sample_chatgpt_json, monkeypatch):
177 |     """Test import with custom conversations folder."""
178 |     # Set up test environment
179 | 
180 |     config = get_project_config()
181 |     config.home = tmp_path
182 |     conversations_folder = "chats"
183 | 
184 |     # Run import
185 |     result = runner.invoke(
186 |         app,
187 |         [
188 |             "import",
189 |             "chatgpt",
190 |             str(sample_chatgpt_json),
191 |             "--folder",
192 |             conversations_folder,
193 |         ],
194 |     )
195 |     assert result.exit_code == 0
196 | 
197 |     # Check files in custom folder
198 |     conv_path = tmp_path / conversations_folder / "20250111-Test_Conversation.md"
199 |     assert conv_path.exists()
200 | 
```

--------------------------------------------------------------------------------
/tests/services/test_initialization.py:
--------------------------------------------------------------------------------

```python
  1 | """Tests for the initialization service."""
  2 | 
  3 | from unittest.mock import patch, MagicMock, AsyncMock
  4 | 
  5 | import pytest
  6 | 
  7 | from basic_memory.services.initialization import (
  8 |     ensure_initialization,
  9 |     initialize_database,
 10 |     reconcile_projects_with_config,
 11 |     initialize_file_sync,
 12 | )
 13 | 
 14 | 
 15 | @pytest.mark.asyncio
 16 | @patch("basic_memory.services.initialization.db.get_or_create_db")
 17 | async def test_initialize_database(mock_get_or_create_db, app_config):
 18 |     """Test initializing the database."""
 19 |     mock_get_or_create_db.return_value = (MagicMock(), MagicMock())
 20 |     await initialize_database(app_config)
 21 |     mock_get_or_create_db.assert_called_once_with(app_config.database_path)
 22 | 
 23 | 
 24 | @pytest.mark.asyncio
 25 | @patch("basic_memory.services.initialization.db.get_or_create_db")
 26 | async def test_initialize_database_error(mock_get_or_create_db, app_config):
 27 |     """Test handling errors during database initialization."""
 28 |     mock_get_or_create_db.side_effect = Exception("Test error")
 29 |     await initialize_database(app_config)
 30 |     mock_get_or_create_db.assert_called_once_with(app_config.database_path)
 31 | 
 32 | 
 33 | @patch("basic_memory.services.initialization.asyncio.run")
 34 | def test_ensure_initialization(mock_run, app_config):
 35 |     """Test synchronous initialization wrapper."""
 36 |     ensure_initialization(app_config)
 37 |     mock_run.assert_called_once()
 38 | 
 39 | 
 40 | @pytest.mark.asyncio
 41 | @patch("basic_memory.services.initialization.db.get_or_create_db")
 42 | async def test_reconcile_projects_with_config(mock_get_db, app_config):
 43 |     """Test reconciling projects from config with database using ProjectService."""
 44 |     # Setup mocks
 45 |     mock_session_maker = AsyncMock()
 46 |     mock_get_db.return_value = (None, mock_session_maker)
 47 | 
 48 |     mock_repository = AsyncMock()
 49 |     mock_project_service = AsyncMock()
 50 |     mock_project_service.synchronize_projects = AsyncMock()
 51 | 
 52 |     # Mock the repository and project service
 53 |     with (
 54 |         patch("basic_memory.services.initialization.ProjectRepository") as mock_repo_class,
 55 |         patch(
 56 |             "basic_memory.services.project_service.ProjectService",
 57 |             return_value=mock_project_service,
 58 |         ),
 59 |     ):
 60 |         mock_repo_class.return_value = mock_repository
 61 | 
 62 |         # Set up app_config projects as a dictionary
 63 |         app_config.projects = {"test_project": "/path/to/project", "new_project": "/path/to/new"}
 64 |         app_config.default_project = "test_project"
 65 | 
 66 |         # Run the function
 67 |         await reconcile_projects_with_config(app_config)
 68 | 
 69 |         # Assertions
 70 |         mock_get_db.assert_called_once()
 71 |         mock_repo_class.assert_called_once_with(mock_session_maker)
 72 |         mock_project_service.synchronize_projects.assert_called_once()
 73 | 
 74 |         # We should no longer be calling these directly since we're using the service
 75 |         mock_repository.find_all.assert_not_called()
 76 |         mock_repository.set_as_default.assert_not_called()
 77 | 
 78 | 
 79 | @pytest.mark.asyncio
 80 | @patch("basic_memory.services.initialization.db.get_or_create_db")
 81 | async def test_reconcile_projects_with_error_handling(mock_get_db, app_config):
 82 |     """Test error handling during project synchronization."""
 83 |     # Setup mocks
 84 |     mock_session_maker = AsyncMock()
 85 |     mock_get_db.return_value = (None, mock_session_maker)
 86 | 
 87 |     mock_repository = AsyncMock()
 88 |     mock_project_service = AsyncMock()
 89 |     mock_project_service.synchronize_projects = AsyncMock(
 90 |         side_effect=ValueError("Project synchronization error")
 91 |     )
 92 | 
 93 |     # Mock the repository and project service
 94 |     with (
 95 |         patch("basic_memory.services.initialization.ProjectRepository") as mock_repo_class,
 96 |         patch(
 97 |             "basic_memory.services.project_service.ProjectService",
 98 |             return_value=mock_project_service,
 99 |         ),
100 |         patch("basic_memory.services.initialization.logger") as mock_logger,
101 |     ):
102 |         mock_repo_class.return_value = mock_repository
103 | 
104 |         # Set up app_config projects as a dictionary
105 |         app_config.projects = {"test_project": "/path/to/project"}
106 |         app_config.default_project = "missing_project"
107 | 
108 |         # Run the function which now has error handling
109 |         await reconcile_projects_with_config(app_config)
110 | 
111 |         # Assertions
112 |         mock_get_db.assert_called_once()
113 |         mock_repo_class.assert_called_once_with(mock_session_maker)
114 |         mock_project_service.synchronize_projects.assert_called_once()
115 | 
116 |         # Verify error was logged
117 |         mock_logger.error.assert_called_once_with(
118 |             "Error during project synchronization: Project synchronization error"
119 |         )
120 |         mock_logger.info.assert_any_call(
121 |             "Continuing with initialization despite synchronization error"
122 |         )
123 | 
124 | 
125 | @pytest.mark.asyncio
126 | @patch("basic_memory.services.initialization.db.get_or_create_db")
127 | @patch("basic_memory.sync.sync_service.get_sync_service")
128 | @patch("basic_memory.sync.WatchService")
129 | @patch("basic_memory.services.initialization.asyncio.create_task")
130 | async def test_initialize_file_sync_background_tasks(
131 |     mock_create_task, mock_watch_service_class, mock_get_sync_service, mock_get_db, app_config
132 | ):
133 |     """Test file sync initialization with background task processing."""
134 |     # Setup mocks
135 |     mock_session_maker = AsyncMock()
136 |     mock_get_db.return_value = (None, mock_session_maker)
137 | 
138 |     mock_watch_service = AsyncMock()
139 |     mock_watch_service.run = AsyncMock()
140 |     mock_watch_service_class.return_value = mock_watch_service
141 | 
142 |     mock_repository = AsyncMock()
143 |     mock_project1 = MagicMock()
144 |     mock_project1.name = "project1"
145 |     mock_project1.path = "/path/to/project1"
146 |     mock_project1.id = 1
147 | 
148 |     mock_project2 = MagicMock()
149 |     mock_project2.name = "project2"
150 |     mock_project2.path = "/path/to/project2"
151 |     mock_project2.id = 2
152 | 
153 |     mock_sync_service = AsyncMock()
154 |     mock_sync_service.sync = AsyncMock()
155 |     mock_get_sync_service.return_value = mock_sync_service
156 | 
157 |     # Mock background tasks
158 |     mock_task1 = MagicMock()
159 |     mock_task2 = MagicMock()
160 |     mock_create_task.side_effect = [mock_task1, mock_task2]
161 | 
162 |     # Mock the repository
163 |     with patch("basic_memory.services.initialization.ProjectRepository") as mock_repo_class:
164 |         mock_repo_class.return_value = mock_repository
165 |         mock_repository.get_active_projects.return_value = [mock_project1, mock_project2]
166 | 
167 |         # Run the function
168 |         result = await initialize_file_sync(app_config)
169 | 
170 |         # Assertions
171 |         mock_repository.get_active_projects.assert_called_once()
172 | 
173 |         # Should create background tasks for each project (non-blocking)
174 |         assert mock_create_task.call_count == 2
175 | 
176 |         # Verify tasks were created but not awaited (function returns immediately)
177 |         assert result is None
178 | 
179 |         # Watch service should still be started
180 |         mock_watch_service.run.assert_called_once()
181 | 
```

--------------------------------------------------------------------------------
/src/basic_memory/schemas/project_info.py:
--------------------------------------------------------------------------------

```python
  1 | """Schema for project info response."""
  2 | 
  3 | import os
  4 | from datetime import datetime
  5 | from pathlib import Path
  6 | from typing import Dict, List, Optional, Any
  7 | 
  8 | from pydantic import Field, BaseModel
  9 | 
 10 | from basic_memory.utils import generate_permalink
 11 | 
 12 | 
 13 | class ProjectStatistics(BaseModel):
 14 |     """Statistics about the current project."""
 15 | 
 16 |     # Basic counts
 17 |     total_entities: int = Field(description="Total number of entities in the knowledge base")
 18 |     total_observations: int = Field(description="Total number of observations across all entities")
 19 |     total_relations: int = Field(description="Total number of relations between entities")
 20 |     total_unresolved_relations: int = Field(
 21 |         description="Number of relations with unresolved targets"
 22 |     )
 23 | 
 24 |     # Entity counts by type
 25 |     entity_types: Dict[str, int] = Field(
 26 |         description="Count of entities by type (e.g., note, conversation)"
 27 |     )
 28 | 
 29 |     # Observation counts by category
 30 |     observation_categories: Dict[str, int] = Field(
 31 |         description="Count of observations by category (e.g., tech, decision)"
 32 |     )
 33 | 
 34 |     # Relation counts by type
 35 |     relation_types: Dict[str, int] = Field(
 36 |         description="Count of relations by type (e.g., implements, relates_to)"
 37 |     )
 38 | 
 39 |     # Graph metrics
 40 |     most_connected_entities: List[Dict[str, Any]] = Field(
 41 |         description="Entities with the most relations, including their titles and permalinks"
 42 |     )
 43 |     isolated_entities: int = Field(description="Number of entities with no relations")
 44 | 
 45 | 
 46 | class ActivityMetrics(BaseModel):
 47 |     """Activity metrics for the current project."""
 48 | 
 49 |     # Recent activity
 50 |     recently_created: List[Dict[str, Any]] = Field(
 51 |         description="Recently created entities with timestamps"
 52 |     )
 53 |     recently_updated: List[Dict[str, Any]] = Field(
 54 |         description="Recently updated entities with timestamps"
 55 |     )
 56 | 
 57 |     # Growth over time (last 6 months)
 58 |     monthly_growth: Dict[str, Dict[str, int]] = Field(
 59 |         description="Monthly growth statistics for entities, observations, and relations"
 60 |     )
 61 | 
 62 | 
 63 | class SystemStatus(BaseModel):
 64 |     """System status information."""
 65 | 
 66 |     # Version information
 67 |     version: str = Field(description="Basic Memory version")
 68 | 
 69 |     # Database status
 70 |     database_path: str = Field(description="Path to the SQLite database")
 71 |     database_size: str = Field(description="Size of the database in human-readable format")
 72 | 
 73 |     # Watch service status
 74 |     watch_status: Optional[Dict[str, Any]] = Field(
 75 |         default=None, description="Watch service status information (if running)"
 76 |     )
 77 | 
 78 |     # System information
 79 |     timestamp: datetime = Field(description="Timestamp when the information was collected")
 80 | 
 81 | 
 82 | class ProjectInfoResponse(BaseModel):
 83 |     """Response for the project_info tool."""
 84 | 
 85 |     # Project configuration
 86 |     project_name: str = Field(description="Name of the current project")
 87 |     project_path: str = Field(description="Path to the current project files")
 88 |     available_projects: Dict[str, Dict[str, Any]] = Field(
 89 |         description="Map of configured project names to detailed project information"
 90 |     )
 91 |     default_project: str = Field(description="Name of the default project")
 92 | 
 93 |     # Statistics
 94 |     statistics: ProjectStatistics = Field(description="Statistics about the knowledge base")
 95 | 
 96 |     # Activity metrics
 97 |     activity: ActivityMetrics = Field(description="Activity and growth metrics")
 98 | 
 99 |     # System status
100 |     system: SystemStatus = Field(description="System and service status information")
101 | 
102 | 
103 | class ProjectInfoRequest(BaseModel):
104 |     """Request model for switching projects."""
105 | 
106 |     name: str = Field(..., description="Name of the project to switch to")
107 |     path: str = Field(..., description="Path to the project directory")
108 |     set_default: bool = Field(..., description="Set the project as the default")
109 | 
110 | 
111 | class WatchEvent(BaseModel):
112 |     timestamp: datetime
113 |     path: str
114 |     action: str  # new, delete, etc
115 |     status: str  # success, error
116 |     checksum: Optional[str]
117 |     error: Optional[str] = None
118 | 
119 | 
120 | class WatchServiceState(BaseModel):
121 |     # Service status
122 |     running: bool = False
123 |     start_time: datetime = datetime.now()  # Use directly with Pydantic model
124 |     pid: int = os.getpid()  # Use directly with Pydantic model
125 | 
126 |     # Stats
127 |     error_count: int = 0
128 |     last_error: Optional[datetime] = None
129 |     last_scan: Optional[datetime] = None
130 | 
131 |     # File counts
132 |     synced_files: int = 0
133 | 
134 |     # Recent activity
135 |     recent_events: List[WatchEvent] = []  # Use directly with Pydantic model
136 | 
137 |     def add_event(
138 |         self,
139 |         path: str,
140 |         action: str,
141 |         status: str,
142 |         checksum: Optional[str] = None,
143 |         error: Optional[str] = None,
144 |     ) -> WatchEvent:  # pragma: no cover
145 |         event = WatchEvent(
146 |             timestamp=datetime.now(),
147 |             path=path,
148 |             action=action,
149 |             status=status,
150 |             checksum=checksum,
151 |             error=error,
152 |         )
153 |         self.recent_events.insert(0, event)
154 |         self.recent_events = self.recent_events[:100]  # Keep last 100
155 |         return event
156 | 
157 |     def record_error(self, error: str):  # pragma: no cover
158 |         self.error_count += 1
159 |         self.add_event(path="", action="sync", status="error", error=error)
160 |         self.last_error = datetime.now()
161 | 
162 | 
163 | class ProjectWatchStatus(BaseModel):
164 |     """Project with its watch status."""
165 | 
166 |     name: str = Field(..., description="Name of the project")
167 |     path: str = Field(..., description="Path to the project")
168 |     watch_status: Optional[WatchServiceState] = Field(
169 |         None, description="Watch status information for the project"
170 |     )
171 | 
172 | 
173 | class ProjectItem(BaseModel):
174 |     """Simple representation of a project."""
175 | 
176 |     name: str
177 |     path: str
178 |     is_default: bool = False
179 | 
180 |     @property
181 |     def permalink(self) -> str:  # pragma: no cover
182 |         return generate_permalink(self.name)
183 | 
184 |     @property
185 |     def home(self) -> Path:  # pragma: no cover
186 |         return Path(self.path).expanduser()
187 | 
188 |     @property
189 |     def project_url(self) -> str:  # pragma: no cover
190 |         return f"/{generate_permalink(self.name)}"
191 | 
192 | 
193 | class ProjectList(BaseModel):
194 |     """Response model for listing projects."""
195 | 
196 |     projects: List[ProjectItem]
197 |     default_project: str
198 | 
199 | 
200 | class ProjectStatusResponse(BaseModel):
201 |     """Response model for switching projects."""
202 | 
203 |     message: str = Field(..., description="Status message about the project switch")
204 |     status: str = Field(..., description="Status of the switch (success or error)")
205 |     default: bool = Field(..., description="True if the project was set as the default")
206 |     old_project: Optional[ProjectItem] = Field(
207 |         None, description="Information about the project being switched from"
208 |     )
209 |     new_project: Optional[ProjectItem] = Field(
210 |         None, description="Information about the project being switched to"
211 |     )
212 | 
```

--------------------------------------------------------------------------------
/src/basic_memory/mcp/tools/chatgpt_tools.py:
--------------------------------------------------------------------------------

```python
  1 | """ChatGPT-compatible MCP tools for Basic Memory.
  2 | 
  3 | These adapters expose Basic Memory's search/fetch functionality using the exact
  4 | tool names and response structure OpenAI's MCP clients expect: each call returns
  5 | a list containing a single `{"type": "text", "text": "{...json...}"}` item.
  6 | """
  7 | 
  8 | import json
  9 | from typing import Any, Dict, List, Optional
 10 | from loguru import logger
 11 | from fastmcp import Context
 12 | 
 13 | from basic_memory.mcp.server import mcp
 14 | from basic_memory.mcp.tools.search import search_notes
 15 | from basic_memory.mcp.tools.read_note import read_note
 16 | from basic_memory.schemas.search import SearchResponse
 17 | from basic_memory.config import ConfigManager
 18 | 
 19 | 
 20 | def _format_search_results_for_chatgpt(results: SearchResponse) -> List[Dict[str, Any]]:
 21 |     """Format search results according to ChatGPT's expected schema.
 22 | 
 23 |     Returns a list of result objects with id, title, and url fields.
 24 |     """
 25 |     formatted_results = []
 26 | 
 27 |     for result in results.results:
 28 |         formatted_result = {
 29 |             "id": result.permalink or f"doc-{len(formatted_results)}",
 30 |             "title": result.title if result.title and result.title.strip() else "Untitled",
 31 |             "url": result.permalink or "",
 32 |         }
 33 |         formatted_results.append(formatted_result)
 34 | 
 35 |     return formatted_results
 36 | 
 37 | 
 38 | def _format_document_for_chatgpt(
 39 |     content: str, identifier: str, title: Optional[str] = None
 40 | ) -> Dict[str, Any]:
 41 |     """Format document content according to ChatGPT's expected schema.
 42 | 
 43 |     Returns a document object with id, title, text, url, and metadata fields.
 44 |     """
 45 |     # Extract title from markdown content if not provided
 46 |     if not title and isinstance(content, str):
 47 |         lines = content.split("\n")
 48 |         if lines and lines[0].startswith("# "):
 49 |             title = lines[0][2:].strip()
 50 |         else:
 51 |             title = identifier.split("/")[-1].replace("-", " ").title()
 52 | 
 53 |     # Ensure title is never None
 54 |     if not title:
 55 |         title = "Untitled Document"
 56 | 
 57 |     # Handle error cases
 58 |     if isinstance(content, str) and content.startswith("# Note Not Found"):
 59 |         return {
 60 |             "id": identifier,
 61 |             "title": title or "Document Not Found",
 62 |             "text": content,
 63 |             "url": identifier,
 64 |             "metadata": {"error": "Document not found"},
 65 |         }
 66 | 
 67 |     return {
 68 |         "id": identifier,
 69 |         "title": title or "Untitled Document",
 70 |         "text": content,
 71 |         "url": identifier,
 72 |         "metadata": {"format": "markdown"},
 73 |     }
 74 | 
 75 | 
 76 | @mcp.tool(description="Search for content across the knowledge base")
 77 | async def search(
 78 |     query: str,
 79 |     context: Context | None = None,
 80 | ) -> List[Dict[str, Any]]:
 81 |     """ChatGPT/OpenAI MCP search adapter returning a single text content item.
 82 | 
 83 |     Args:
 84 |         query: Search query (full-text syntax supported by `search_notes`)
 85 |         context: Optional FastMCP context passed through for auth/session data
 86 | 
 87 |     Returns:
 88 |         List with one dict: `{ "type": "text", "text": "{...JSON...}" }`
 89 |         where the JSON body contains `results`, `total_count`, and echo of `query`.
 90 |     """
 91 |     logger.info(f"ChatGPT search request: query='{query}'")
 92 | 
 93 |     try:
 94 |         # ChatGPT tools don't expose project parameter, so use default project
 95 |         config = ConfigManager().config
 96 |         default_project = config.default_project
 97 | 
 98 |         # Call underlying search_notes with sensible defaults for ChatGPT
 99 |         results = await search_notes.fn(
100 |             query=query,
101 |             project=default_project,  # Use default project for ChatGPT
102 |             page=1,
103 |             page_size=10,  # Reasonable default for ChatGPT consumption
104 |             search_type="text",  # Default to full-text search
105 |             context=context,
106 |         )
107 | 
108 |         # Handle string error responses from search_notes
109 |         if isinstance(results, str):
110 |             logger.warning(f"Search failed with error: {results[:100]}...")
111 |             search_results = {
112 |                 "results": [],
113 |                 "error": "Search failed",
114 |                 "error_details": results[:500],  # Truncate long error messages
115 |             }
116 |         else:
117 |             # Format successful results for ChatGPT
118 |             formatted_results = _format_search_results_for_chatgpt(results)
119 |             search_results = {
120 |                 "results": formatted_results,
121 |                 "total_count": len(results.results),  # Use actual count from results
122 |                 "query": query,
123 |             }
124 |             logger.info(f"Search completed: {len(formatted_results)} results returned")
125 | 
126 |         # Return in MCP content array format as required by OpenAI
127 |         return [{"type": "text", "text": json.dumps(search_results, ensure_ascii=False)}]
128 | 
129 |     except Exception as e:
130 |         logger.error(f"ChatGPT search failed for query '{query}': {e}")
131 |         error_results = {
132 |             "results": [],
133 |             "error": "Internal search error",
134 |             "error_message": str(e)[:200],
135 |         }
136 |         return [{"type": "text", "text": json.dumps(error_results, ensure_ascii=False)}]
137 | 
138 | 
139 | @mcp.tool(description="Fetch the full contents of a search result document")
140 | async def fetch(
141 |     id: str,
142 |     context: Context | None = None,
143 | ) -> List[Dict[str, Any]]:
144 |     """ChatGPT/OpenAI MCP fetch adapter returning a single text content item.
145 | 
146 |     Args:
147 |         id: Document identifier (permalink, title, or memory URL)
148 |         context: Optional FastMCP context passed through for auth/session data
149 | 
150 |     Returns:
151 |         List with one dict: `{ "type": "text", "text": "{...JSON...}" }`
152 |         where the JSON body includes `id`, `title`, `text`, `url`, and metadata.
153 |     """
154 |     logger.info(f"ChatGPT fetch request: id='{id}'")
155 | 
156 |     try:
157 |         # ChatGPT tools don't expose project parameter, so use default project
158 |         config = ConfigManager().config
159 |         default_project = config.default_project
160 | 
161 |         # Call underlying read_note function
162 |         content = await read_note.fn(
163 |             identifier=id,
164 |             project=default_project,  # Use default project for ChatGPT
165 |             page=1,
166 |             page_size=10,  # Default pagination
167 |             context=context,
168 |         )
169 | 
170 |         # Format the document for ChatGPT
171 |         document = _format_document_for_chatgpt(content, id)
172 | 
173 |         logger.info(f"Fetch completed: id='{id}', content_length={len(document.get('text', ''))}")
174 | 
175 |         # Return in MCP content array format as required by OpenAI
176 |         return [{"type": "text", "text": json.dumps(document, ensure_ascii=False)}]
177 | 
178 |     except Exception as e:
179 |         logger.error(f"ChatGPT fetch failed for id '{id}': {e}")
180 |         error_document = {
181 |             "id": id,
182 |             "title": "Fetch Error",
183 |             "text": f"Failed to fetch document: {str(e)[:200]}",
184 |             "url": id,
185 |             "metadata": {"error": "Fetch failed"},
186 |         }
187 |         return [{"type": "text", "text": json.dumps(error_document, ensure_ascii=False)}]
188 | 
```
Page 4/23FirstPrevNextLast