This is page 5 of 17. Use http://codebase.md/basicmachines-co/basic-memory?lines=false&page={x} to view the full context.
# Directory Structure
```
├── .claude
│ ├── agents
│ │ ├── python-developer.md
│ │ └── system-architect.md
│ └── commands
│ ├── release
│ │ ├── beta.md
│ │ ├── changelog.md
│ │ ├── release-check.md
│ │ └── release.md
│ ├── spec.md
│ └── test-live.md
├── .dockerignore
├── .github
│ ├── dependabot.yml
│ ├── ISSUE_TEMPLATE
│ │ ├── bug_report.md
│ │ ├── config.yml
│ │ ├── documentation.md
│ │ └── feature_request.md
│ └── workflows
│ ├── claude-code-review.yml
│ ├── claude-issue-triage.yml
│ ├── claude.yml
│ ├── dev-release.yml
│ ├── docker.yml
│ ├── pr-title.yml
│ ├── release.yml
│ └── test.yml
├── .gitignore
├── .python-version
├── CHANGELOG.md
├── CITATION.cff
├── CLA.md
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── docker-compose.yml
├── Dockerfile
├── docs
│ ├── ai-assistant-guide-extended.md
│ ├── character-handling.md
│ ├── cloud-cli.md
│ └── Docker.md
├── justfile
├── LICENSE
├── llms-install.md
├── pyproject.toml
├── README.md
├── SECURITY.md
├── smithery.yaml
├── specs
│ ├── SPEC-1 Specification-Driven Development Process.md
│ ├── SPEC-10 Unified Deployment Workflow and Event Tracking.md
│ ├── SPEC-11 Basic Memory API Performance Optimization.md
│ ├── SPEC-12 OpenTelemetry Observability.md
│ ├── SPEC-13 CLI Authentication with Subscription Validation.md
│ ├── SPEC-14 Cloud Git Versioning & GitHub Backup.md
│ ├── SPEC-14- Cloud Git Versioning & GitHub Backup.md
│ ├── SPEC-15 Configuration Persistence via Tigris for Cloud Tenants.md
│ ├── SPEC-16 MCP Cloud Service Consolidation.md
│ ├── SPEC-17 Semantic Search with ChromaDB.md
│ ├── SPEC-18 AI Memory Management Tool.md
│ ├── SPEC-19 Sync Performance and Memory Optimization.md
│ ├── SPEC-2 Slash Commands Reference.md
│ ├── SPEC-3 Agent Definitions.md
│ ├── SPEC-4 Notes Web UI Component Architecture.md
│ ├── SPEC-5 CLI Cloud Upload via WebDAV.md
│ ├── SPEC-6 Explicit Project Parameter Architecture.md
│ ├── SPEC-7 POC to spike Tigris Turso for local access to cloud data.md
│ ├── SPEC-8 TigrisFS Integration.md
│ ├── SPEC-9 Multi-Project Bidirectional Sync Architecture.md
│ ├── SPEC-9 Signed Header Tenant Information.md
│ └── SPEC-9-1 Follow-Ups- Conflict, Sync, and Observability.md
├── src
│ └── basic_memory
│ ├── __init__.py
│ ├── alembic
│ │ ├── alembic.ini
│ │ ├── env.py
│ │ ├── migrations.py
│ │ ├── script.py.mako
│ │ └── versions
│ │ ├── 3dae7c7b1564_initial_schema.py
│ │ ├── 502b60eaa905_remove_required_from_entity_permalink.py
│ │ ├── 5fe1ab1ccebe_add_projects_table.py
│ │ ├── 647e7a75e2cd_project_constraint_fix.py
│ │ ├── 9d9c1cb7d8f5_add_mtime_and_size_columns_to_entity_.py
│ │ ├── a1b2c3d4e5f6_fix_project_foreign_keys.py
│ │ ├── b3c3938bacdb_relation_to_name_unique_index.py
│ │ ├── cc7172b46608_update_search_index_schema.py
│ │ └── e7e1f4367280_add_scan_watermark_tracking_to_project.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── app.py
│ │ ├── routers
│ │ │ ├── __init__.py
│ │ │ ├── directory_router.py
│ │ │ ├── importer_router.py
│ │ │ ├── knowledge_router.py
│ │ │ ├── management_router.py
│ │ │ ├── memory_router.py
│ │ │ ├── project_router.py
│ │ │ ├── prompt_router.py
│ │ │ ├── resource_router.py
│ │ │ ├── search_router.py
│ │ │ └── utils.py
│ │ └── template_loader.py
│ ├── cli
│ │ ├── __init__.py
│ │ ├── app.py
│ │ ├── auth.py
│ │ ├── commands
│ │ │ ├── __init__.py
│ │ │ ├── cloud
│ │ │ │ ├── __init__.py
│ │ │ │ ├── api_client.py
│ │ │ │ ├── bisync_commands.py
│ │ │ │ ├── cloud_utils.py
│ │ │ │ ├── core_commands.py
│ │ │ │ ├── mount_commands.py
│ │ │ │ ├── rclone_config.py
│ │ │ │ ├── rclone_installer.py
│ │ │ │ ├── upload_command.py
│ │ │ │ └── upload.py
│ │ │ ├── command_utils.py
│ │ │ ├── db.py
│ │ │ ├── import_chatgpt.py
│ │ │ ├── import_claude_conversations.py
│ │ │ ├── import_claude_projects.py
│ │ │ ├── import_memory_json.py
│ │ │ ├── mcp.py
│ │ │ ├── project.py
│ │ │ ├── status.py
│ │ │ ├── sync.py
│ │ │ └── tool.py
│ │ └── main.py
│ ├── config.py
│ ├── db.py
│ ├── deps.py
│ ├── file_utils.py
│ ├── ignore_utils.py
│ ├── importers
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── chatgpt_importer.py
│ │ ├── claude_conversations_importer.py
│ │ ├── claude_projects_importer.py
│ │ ├── memory_json_importer.py
│ │ └── utils.py
│ ├── markdown
│ │ ├── __init__.py
│ │ ├── entity_parser.py
│ │ ├── markdown_processor.py
│ │ ├── plugins.py
│ │ ├── schemas.py
│ │ └── utils.py
│ ├── mcp
│ │ ├── __init__.py
│ │ ├── async_client.py
│ │ ├── project_context.py
│ │ ├── prompts
│ │ │ ├── __init__.py
│ │ │ ├── ai_assistant_guide.py
│ │ │ ├── continue_conversation.py
│ │ │ ├── recent_activity.py
│ │ │ ├── search.py
│ │ │ └── utils.py
│ │ ├── resources
│ │ │ ├── ai_assistant_guide.md
│ │ │ └── project_info.py
│ │ ├── server.py
│ │ └── tools
│ │ ├── __init__.py
│ │ ├── build_context.py
│ │ ├── canvas.py
│ │ ├── chatgpt_tools.py
│ │ ├── delete_note.py
│ │ ├── edit_note.py
│ │ ├── list_directory.py
│ │ ├── move_note.py
│ │ ├── project_management.py
│ │ ├── read_content.py
│ │ ├── read_note.py
│ │ ├── recent_activity.py
│ │ ├── search.py
│ │ ├── utils.py
│ │ ├── view_note.py
│ │ └── write_note.py
│ ├── models
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── knowledge.py
│ │ ├── project.py
│ │ └── search.py
│ ├── repository
│ │ ├── __init__.py
│ │ ├── entity_repository.py
│ │ ├── observation_repository.py
│ │ ├── project_info_repository.py
│ │ ├── project_repository.py
│ │ ├── relation_repository.py
│ │ ├── repository.py
│ │ └── search_repository.py
│ ├── schemas
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── cloud.py
│ │ ├── delete.py
│ │ ├── directory.py
│ │ ├── importer.py
│ │ ├── memory.py
│ │ ├── project_info.py
│ │ ├── prompt.py
│ │ ├── request.py
│ │ ├── response.py
│ │ ├── search.py
│ │ └── sync_report.py
│ ├── services
│ │ ├── __init__.py
│ │ ├── context_service.py
│ │ ├── directory_service.py
│ │ ├── entity_service.py
│ │ ├── exceptions.py
│ │ ├── file_service.py
│ │ ├── initialization.py
│ │ ├── link_resolver.py
│ │ ├── project_service.py
│ │ ├── search_service.py
│ │ └── service.py
│ ├── sync
│ │ ├── __init__.py
│ │ ├── background_sync.py
│ │ ├── sync_service.py
│ │ └── watch_service.py
│ ├── templates
│ │ └── prompts
│ │ ├── continue_conversation.hbs
│ │ └── search.hbs
│ └── utils.py
├── test-int
│ ├── BENCHMARKS.md
│ ├── cli
│ │ ├── test_project_commands_integration.py
│ │ ├── test_sync_commands_integration.py
│ │ └── test_version_integration.py
│ ├── conftest.py
│ ├── mcp
│ │ ├── test_build_context_underscore.py
│ │ ├── test_build_context_validation.py
│ │ ├── test_chatgpt_tools_integration.py
│ │ ├── test_default_project_mode_integration.py
│ │ ├── test_delete_note_integration.py
│ │ ├── test_edit_note_integration.py
│ │ ├── test_list_directory_integration.py
│ │ ├── test_move_note_integration.py
│ │ ├── test_project_management_integration.py
│ │ ├── test_project_state_sync_integration.py
│ │ ├── test_read_content_integration.py
│ │ ├── test_read_note_integration.py
│ │ ├── test_search_integration.py
│ │ ├── test_single_project_mcp_integration.py
│ │ └── test_write_note_integration.py
│ ├── test_db_wal_mode.py
│ ├── test_disable_permalinks_integration.py
│ └── test_sync_performance_benchmark.py
├── tests
│ ├── __init__.py
│ ├── api
│ │ ├── conftest.py
│ │ ├── test_async_client.py
│ │ ├── test_continue_conversation_template.py
│ │ ├── test_directory_router.py
│ │ ├── test_importer_router.py
│ │ ├── test_knowledge_router.py
│ │ ├── test_management_router.py
│ │ ├── test_memory_router.py
│ │ ├── test_project_router_operations.py
│ │ ├── test_project_router.py
│ │ ├── test_prompt_router.py
│ │ ├── test_relation_background_resolution.py
│ │ ├── test_resource_router.py
│ │ ├── test_search_router.py
│ │ ├── test_search_template.py
│ │ ├── test_template_loader_helpers.py
│ │ └── test_template_loader.py
│ ├── cli
│ │ ├── conftest.py
│ │ ├── test_bisync_commands.py
│ │ ├── test_cli_tools.py
│ │ ├── test_cloud_authentication.py
│ │ ├── test_cloud_utils.py
│ │ ├── test_ignore_utils.py
│ │ ├── test_import_chatgpt.py
│ │ ├── test_import_claude_conversations.py
│ │ ├── test_import_claude_projects.py
│ │ ├── test_import_memory_json.py
│ │ └── test_upload.py
│ ├── conftest.py
│ ├── db
│ │ └── test_issue_254_foreign_key_constraints.py
│ ├── importers
│ │ ├── test_importer_base.py
│ │ └── test_importer_utils.py
│ ├── markdown
│ │ ├── __init__.py
│ │ ├── test_date_frontmatter_parsing.py
│ │ ├── test_entity_parser_error_handling.py
│ │ ├── test_entity_parser.py
│ │ ├── test_markdown_plugins.py
│ │ ├── test_markdown_processor.py
│ │ ├── test_observation_edge_cases.py
│ │ ├── test_parser_edge_cases.py
│ │ ├── test_relation_edge_cases.py
│ │ └── test_task_detection.py
│ ├── mcp
│ │ ├── conftest.py
│ │ ├── test_obsidian_yaml_formatting.py
│ │ ├── test_permalink_collision_file_overwrite.py
│ │ ├── test_prompts.py
│ │ ├── test_resources.py
│ │ ├── test_tool_build_context.py
│ │ ├── test_tool_canvas.py
│ │ ├── test_tool_delete_note.py
│ │ ├── test_tool_edit_note.py
│ │ ├── test_tool_list_directory.py
│ │ ├── test_tool_move_note.py
│ │ ├── test_tool_read_content.py
│ │ ├── test_tool_read_note.py
│ │ ├── test_tool_recent_activity.py
│ │ ├── test_tool_resource.py
│ │ ├── test_tool_search.py
│ │ ├── test_tool_utils.py
│ │ ├── test_tool_view_note.py
│ │ ├── test_tool_write_note.py
│ │ └── tools
│ │ └── test_chatgpt_tools.py
│ ├── Non-MarkdownFileSupport.pdf
│ ├── repository
│ │ ├── test_entity_repository_upsert.py
│ │ ├── test_entity_repository.py
│ │ ├── test_entity_upsert_issue_187.py
│ │ ├── test_observation_repository.py
│ │ ├── test_project_info_repository.py
│ │ ├── test_project_repository.py
│ │ ├── test_relation_repository.py
│ │ ├── test_repository.py
│ │ ├── test_search_repository_edit_bug_fix.py
│ │ └── test_search_repository.py
│ ├── schemas
│ │ ├── test_base_timeframe_minimum.py
│ │ ├── test_memory_serialization.py
│ │ ├── test_memory_url_validation.py
│ │ ├── test_memory_url.py
│ │ ├── test_schemas.py
│ │ └── test_search.py
│ ├── Screenshot.png
│ ├── services
│ │ ├── test_context_service.py
│ │ ├── test_directory_service.py
│ │ ├── test_entity_service_disable_permalinks.py
│ │ ├── test_entity_service.py
│ │ ├── test_file_service.py
│ │ ├── test_initialization.py
│ │ ├── test_link_resolver.py
│ │ ├── test_project_removal_bug.py
│ │ ├── test_project_service_operations.py
│ │ ├── test_project_service.py
│ │ └── test_search_service.py
│ ├── sync
│ │ ├── test_character_conflicts.py
│ │ ├── test_sync_service_incremental.py
│ │ ├── test_sync_service.py
│ │ ├── test_sync_wikilink_issue.py
│ │ ├── test_tmp_files.py
│ │ ├── test_watch_service_edge_cases.py
│ │ ├── test_watch_service_reload.py
│ │ └── test_watch_service.py
│ ├── test_config.py
│ ├── test_db_migration_deduplication.py
│ ├── test_deps.py
│ ├── test_production_cascade_delete.py
│ └── utils
│ ├── test_file_utils.py
│ ├── test_frontmatter_obsidian_compatible.py
│ ├── test_parse_tags.py
│ ├── test_permalink_formatting.py
│ ├── test_utf8_handling.py
│ └── test_validate_project_path.py
├── uv.lock
├── v0.15.0-RELEASE-DOCS.md
└── v15-docs
├── api-performance.md
├── background-relations.md
├── basic-memory-home.md
├── bug-fixes.md
├── chatgpt-integration.md
├── cloud-authentication.md
├── cloud-bisync.md
├── cloud-mode-usage.md
├── cloud-mount.md
├── default-project-mode.md
├── env-file-removal.md
├── env-var-overrides.md
├── explicit-project-parameter.md
├── gitignore-integration.md
├── project-root-env-var.md
├── README.md
└── sqlite-performance.md
```
# Files
--------------------------------------------------------------------------------
/tests/markdown/test_date_frontmatter_parsing.py:
--------------------------------------------------------------------------------
```python
"""Test that YAML date parsing doesn't break frontmatter processing.
This test reproduces GitHub issue #236 from basic-memory-cloud where date fields
in YAML frontmatter are automatically parsed as datetime.date objects by PyYAML,
but later code expects strings and calls .strip() on them, causing AttributeError.
"""
import pytest
from pathlib import Path
from basic_memory.markdown.entity_parser import EntityParser
@pytest.fixture
def test_file_with_date(tmp_path):
"""Create a test file with date fields in frontmatter."""
test_file = tmp_path / "test_note.md"
content = """---
title: Test Note
date: 2025-10-24
created: 2025-10-24
tags:
- python
- testing
---
# Test Content
This file has date fields in frontmatter that PyYAML will parse as datetime.date objects.
"""
test_file.write_text(content)
return test_file
@pytest.fixture
def test_file_with_date_in_tags(tmp_path):
"""Create a test file with a date value in tags (edge case)."""
test_file = tmp_path / "test_note_date_tags.md"
content = """---
title: Test Note with Date Tags
tags: 2025-10-24
---
# Test Content
This file has a date value as tags, which will be parsed as datetime.date.
"""
test_file.write_text(content)
return test_file
@pytest.fixture
def test_file_with_dates_in_tag_list(tmp_path):
"""Create a test file with dates in a tag list (edge case)."""
test_file = tmp_path / "test_note_dates_in_list.md"
content = """---
title: Test Note with Dates in Tags List
tags:
- valid-tag
- 2025-10-24
- another-tag
---
# Test Content
This file has date values mixed into tags list.
"""
test_file.write_text(content)
return test_file
@pytest.mark.asyncio
async def test_parse_file_with_date_fields(test_file_with_date, tmp_path):
"""Test that files with date fields in frontmatter can be parsed without errors."""
parser = EntityParser(tmp_path)
# This should not raise AttributeError about .strip()
entity_markdown = await parser.parse_file(test_file_with_date)
# Verify basic parsing worked
assert entity_markdown.frontmatter.title == "Test Note"
# Date fields should be converted to ISO format strings
date_field = entity_markdown.frontmatter.metadata.get("date")
assert date_field is not None
assert isinstance(date_field, str), "Date should be converted to string"
assert date_field == "2025-10-24", "Date should be in ISO format"
created_field = entity_markdown.frontmatter.metadata.get("created")
assert created_field is not None
assert isinstance(created_field, str), "Created date should be converted to string"
assert created_field == "2025-10-24", "Created date should be in ISO format"
@pytest.mark.asyncio
async def test_parse_file_with_date_as_tags(test_file_with_date_in_tags, tmp_path):
"""Test that date values in tags field don't cause errors."""
parser = EntityParser(tmp_path)
# This should not raise AttributeError - date should be converted to string
entity_markdown = await parser.parse_file(test_file_with_date_in_tags)
assert entity_markdown.frontmatter.title == "Test Note with Date Tags"
# The date should be converted to ISO format string before parse_tags processes it
tags = entity_markdown.frontmatter.tags
assert tags is not None
assert isinstance(tags, list)
# The date value should be converted to string
assert "2025-10-24" in tags
@pytest.mark.asyncio
async def test_parse_file_with_dates_in_tag_list(test_file_with_dates_in_tag_list, tmp_path):
"""Test that date values in a tags list don't cause errors."""
parser = EntityParser(tmp_path)
# This should not raise AttributeError - dates should be converted to strings
entity_markdown = await parser.parse_file(test_file_with_dates_in_tag_list)
assert entity_markdown.frontmatter.title == "Test Note with Dates in Tags List"
# Tags should be parsed
tags = entity_markdown.frontmatter.tags
assert tags is not None
assert isinstance(tags, list)
# Should have 3 tags (2 valid + 1 date converted to ISO string)
assert len(tags) == 3
assert "valid-tag" in tags
assert "another-tag" in tags
# Date should be converted to ISO format string
assert "2025-10-24" in tags
@pytest.mark.asyncio
async def test_parse_file_with_various_yaml_types(tmp_path):
"""Test that various YAML types in frontmatter don't cause errors.
This reproduces the broader issue from GitHub #236 where ANY non-string
YAML type (dates, lists, numbers, booleans) can cause AttributeError
when code expects strings and calls .strip().
"""
test_file = tmp_path / "test_yaml_types.md"
content = """---
title: Test YAML Types
date: 2025-10-24
priority: 1
completed: true
tags:
- python
- testing
metadata:
author: Test User
version: 1.0
---
# Test Content
This file has various YAML types that need to be normalized.
"""
test_file.write_text(content)
parser = EntityParser(tmp_path)
entity_markdown = await parser.parse_file(test_file)
# All values should be accessible without AttributeError
assert entity_markdown.frontmatter.title == "Test YAML Types"
# Date should be converted to ISO string
date_field = entity_markdown.frontmatter.metadata.get("date")
assert isinstance(date_field, str)
assert date_field == "2025-10-24"
# Number should be converted to string
priority = entity_markdown.frontmatter.metadata.get("priority")
assert isinstance(priority, str)
assert priority == "1"
# Boolean should be converted to string
completed = entity_markdown.frontmatter.metadata.get("completed")
assert isinstance(completed, str)
assert completed == "True" # Python's str(True) always returns "True"
# List should be preserved as list, but items should be strings
tags = entity_markdown.frontmatter.tags
assert isinstance(tags, list)
assert all(isinstance(tag, str) for tag in tags)
assert "python" in tags
assert "testing" in tags
# Dict should be preserved as dict, but nested values should be strings
metadata = entity_markdown.frontmatter.metadata.get("metadata")
assert isinstance(metadata, dict)
assert isinstance(metadata.get("author"), str)
assert metadata.get("author") == "Test User"
assert isinstance(metadata.get("version"), str)
assert metadata.get("version") in ("1.0", "1")
@pytest.mark.asyncio
async def test_parse_file_with_datetime_objects(tmp_path):
"""Test that datetime objects (not just date objects) are properly normalized.
This tests the edge case where frontmatter might contain datetime values
with time components (as parsed by PyYAML), ensuring they're converted to ISO format strings.
"""
test_file = tmp_path / "test_datetime.md"
# YAML datetime strings that PyYAML will parse as datetime objects
# Format: YYYY-MM-DD HH:MM:SS or YYYY-MM-DDTHH:MM:SS
content = """---
title: Test Datetime
created_at: 2025-10-24 14:30:00
updated_at: 2025-10-24T00:00:00
---
# Test Content
This file has datetime values in frontmatter that PyYAML will parse as datetime objects.
"""
test_file.write_text(content)
parser = EntityParser(tmp_path)
entity_markdown = await parser.parse_file(test_file)
# Verify datetime objects are converted to ISO format strings
created_at = entity_markdown.frontmatter.metadata.get("created_at")
assert isinstance(created_at, str), "Datetime should be converted to string"
# PyYAML parses "2025-10-24 14:30:00" as datetime, which we normalize to ISO
assert "2025-10-24" in created_at and "14:30:00" in created_at, \
f"Datetime with time should be normalized to ISO format, got: {created_at}"
updated_at = entity_markdown.frontmatter.metadata.get("updated_at")
assert isinstance(updated_at, str), "Datetime should be converted to string"
# PyYAML parses "2025-10-24T00:00:00" as datetime, which we normalize to ISO
assert "2025-10-24" in updated_at and "00:00:00" in updated_at, \
f"Datetime at midnight should be normalized to ISO format, got: {updated_at}"
```
--------------------------------------------------------------------------------
/tests/mcp/test_tool_canvas.py:
--------------------------------------------------------------------------------
```python
"""Tests for canvas tool that exercise the full stack with SQLite."""
import json
from pathlib import Path
import pytest
from basic_memory.mcp.tools import canvas
@pytest.mark.asyncio
async def test_create_canvas(app, project_config, test_project):
"""Test creating a new canvas file.
Should:
- Create canvas file with correct content
- Create entity in database
- Return successful status
"""
# Test data
nodes = [
{
"id": "node1",
"type": "text",
"text": "Test Node",
"x": 100,
"y": 200,
"width": 400,
"height": 300,
}
]
edges = [{"id": "edge1", "fromNode": "node1", "toNode": "node2", "label": "connects to"}]
title = "test-canvas"
folder = "visualizations"
# Execute
result = await canvas.fn(
project=test_project.name, nodes=nodes, edges=edges, title=title, folder=folder
)
# Verify result message
assert result
assert "Created: visualizations/test-canvas" in result
assert "The canvas is ready to open in Obsidian" in result
# Verify file was created
file_path = Path(project_config.home) / folder / f"{title}.canvas"
assert file_path.exists()
# Verify content is correct
content = json.loads(file_path.read_text(encoding="utf-8"))
assert content["nodes"] == nodes
assert content["edges"] == edges
@pytest.mark.asyncio
async def test_create_canvas_with_extension(app, project_config, test_project):
"""Test creating a canvas file with .canvas extension already in the title."""
# Test data
nodes = [
{
"id": "node1",
"type": "text",
"text": "Extension Test",
"x": 100,
"y": 200,
"width": 400,
"height": 300,
}
]
edges = []
title = "extension-test.canvas" # Already has extension
folder = "visualizations"
# Execute
result = await canvas.fn(
project=test_project.name, nodes=nodes, edges=edges, title=title, folder=folder
)
# Verify
assert "Created: visualizations/extension-test.canvas" in result
# Verify file exists with correct name (shouldn't have double extension)
file_path = Path(project_config.home) / folder / title
assert file_path.exists()
# Verify content
content = json.loads(file_path.read_text(encoding="utf-8"))
assert content["nodes"] == nodes
@pytest.mark.asyncio
async def test_update_existing_canvas(app, project_config, test_project):
"""Test updating an existing canvas file."""
# First create a canvas
nodes = [
{
"id": "initial",
"type": "text",
"text": "Initial content",
"x": 0,
"y": 0,
"width": 200,
"height": 100,
}
]
edges = []
title = "update-test"
folder = "visualizations"
# Create initial canvas
await canvas.fn(project=test_project.name, nodes=nodes, edges=edges, title=title, folder=folder)
# Verify file exists
file_path = Path(project_config.home) / folder / f"{title}.canvas"
assert file_path.exists()
# Now update with new content
updated_nodes = [
{
"id": "updated",
"type": "text",
"text": "Updated content",
"x": 100,
"y": 100,
"width": 300,
"height": 200,
}
]
updated_edges = [
{"id": "new-edge", "fromNode": "updated", "toNode": "other", "label": "new connection"}
]
# Execute update
result = await canvas.fn(
project=test_project.name,
nodes=updated_nodes,
edges=updated_edges,
title=title,
folder=folder,
)
# Verify result indicates update
assert "Updated: visualizations/update-test.canvas" in result
# Verify content was updated
content = json.loads(file_path.read_text(encoding="utf-8"))
assert content["nodes"] == updated_nodes
assert content["edges"] == updated_edges
@pytest.mark.asyncio
async def test_create_canvas_with_nested_folders(app, project_config, test_project):
"""Test creating a canvas in nested folders that don't exist yet."""
# Test data
nodes = [
{
"id": "test",
"type": "text",
"text": "Nested folder test",
"x": 0,
"y": 0,
"width": 200,
"height": 100,
}
]
edges = []
title = "nested-test"
folder = "visualizations/nested/folders" # Deep path
# Execute
result = await canvas.fn(
project=test_project.name, nodes=nodes, edges=edges, title=title, folder=folder
)
# Verify
assert "Created: visualizations/nested/folders/nested-test.canvas" in result
# Verify folders and file were created
file_path = Path(project_config.home) / folder / f"{title}.canvas"
assert file_path.exists()
assert file_path.parent.exists()
@pytest.mark.asyncio
async def test_create_canvas_complex_content(app, project_config, test_project):
"""Test creating a canvas with complex content structures."""
# Test data - more complex structure with all node types
nodes = [
{
"id": "text-node",
"type": "text",
"text": "# Heading\n\nThis is a test with *markdown* formatting",
"x": 100,
"y": 100,
"width": 400,
"height": 300,
"color": "4", # Using a preset color
},
{
"id": "file-node",
"type": "file",
"file": "test/test-file.md", # Reference a file
"x": 600,
"y": 100,
"width": 400,
"height": 300,
"color": "#FF5500", # Using hex color
},
{
"id": "link-node",
"type": "link",
"url": "https://example.com",
"x": 100,
"y": 500,
"width": 400,
"height": 200,
},
{
"id": "group-node",
"type": "group",
"label": "Group Label",
"x": 600,
"y": 500,
"width": 600,
"height": 400,
},
]
edges = [
{
"id": "edge1",
"fromNode": "text-node",
"toNode": "file-node",
"label": "references",
"fromSide": "right",
"toSide": "left",
},
{
"id": "edge2",
"fromNode": "link-node",
"toNode": "group-node",
"label": "belongs to",
"color": "6",
},
]
title = "complex-test"
folder = "visualizations"
# Create a test file that we're referencing
test_file_path = Path(project_config.home) / "test/test-file.md"
test_file_path.parent.mkdir(parents=True, exist_ok=True)
test_file_path.write_text("# Test File\nThis is referenced by the canvas")
# Execute
result = await canvas.fn(
project=test_project.name, nodes=nodes, edges=edges, title=title, folder=folder
)
# Verify
assert "Created: visualizations/complex-test.canvas" in result
# Verify file was created
file_path = Path(project_config.home) / folder / f"{title}.canvas"
assert file_path.exists()
# Verify content is correct with all complex structures
content = json.loads(file_path.read_text(encoding="utf-8"))
assert len(content["nodes"]) == 4
assert len(content["edges"]) == 2
# Verify specific content elements are preserved
assert any(node["type"] == "text" and "#" in node["text"] for node in content["nodes"])
assert any(
node["type"] == "file" and "test-file.md" in node["file"] for node in content["nodes"]
)
assert any(node["type"] == "link" and "example.com" in node["url"] for node in content["nodes"])
assert any(
node["type"] == "group" and "Group Label" == node["label"] for node in content["nodes"]
)
# Verify edge properties
assert any(
edge["fromSide"] == "right" and edge["toSide"] == "left" for edge in content["edges"]
)
assert any(edge["label"] == "belongs to" and edge["color"] == "6" for edge in content["edges"])
```
--------------------------------------------------------------------------------
/tests/mcp/test_tool_list_directory.py:
--------------------------------------------------------------------------------
```python
"""Tests for the list_directory MCP tool."""
import pytest
from basic_memory.mcp.tools.list_directory import list_directory
from basic_memory.mcp.tools.write_note import write_note
@pytest.mark.asyncio
async def test_list_directory_empty(client, test_project):
"""Test listing directory when no entities exist."""
result = await list_directory.fn(project=test_project.name)
assert isinstance(result, str)
assert "No files found in directory '/'" in result
@pytest.mark.asyncio
async def test_list_directory_with_test_graph(client, test_graph, test_project):
"""Test listing directory with test_graph fixture."""
# test_graph provides:
# /test/Connected Entity 1.md
# /test/Connected Entity 2.md
# /test/Deep Entity.md
# /test/Deeper Entity.md
# /test/Root.md
# List root directory
result = await list_directory.fn(project=test_project.name)
assert isinstance(result, str)
assert "Contents of '/' (depth 1):" in result
assert "📁 test" in result
assert "Total: 1 items (1 directory)" in result
@pytest.mark.asyncio
async def test_list_directory_specific_path(client, test_graph, test_project):
"""Test listing specific directory path."""
# List the test directory
result = await list_directory.fn(project=test_project.name, dir_name="/test")
assert isinstance(result, str)
assert "Contents of '/test' (depth 1):" in result
assert "📄 Connected Entity 1.md" in result
assert "📄 Connected Entity 2.md" in result
assert "📄 Deep Entity.md" in result
assert "📄 Deeper Entity.md" in result
assert "📄 Root.md" in result
assert "Total: 5 items (5 files)" in result
@pytest.mark.asyncio
async def test_list_directory_with_glob_filter(client, test_graph, test_project):
"""Test listing directory with glob filtering."""
# Filter for files containing "Connected"
result = await list_directory.fn(
project=test_project.name, dir_name="/test", file_name_glob="*Connected*"
)
assert isinstance(result, str)
assert "Files in '/test' matching '*Connected*' (depth 1):" in result
assert "📄 Connected Entity 1.md" in result
assert "📄 Connected Entity 2.md" in result
# Should not contain other files
assert "Deep Entity.md" not in result
assert "Deeper Entity.md" not in result
assert "Root.md" not in result
assert "Total: 2 items (2 files)" in result
@pytest.mark.asyncio
async def test_list_directory_with_markdown_filter(client, test_graph, test_project):
"""Test listing directory with markdown file filter."""
result = await list_directory.fn(
project=test_project.name, dir_name="/test", file_name_glob="*.md"
)
assert isinstance(result, str)
assert "Files in '/test' matching '*.md' (depth 1):" in result
# All files in test_graph are markdown files
assert "📄 Connected Entity 1.md" in result
assert "📄 Connected Entity 2.md" in result
assert "📄 Deep Entity.md" in result
assert "📄 Deeper Entity.md" in result
assert "📄 Root.md" in result
assert "Total: 5 items (5 files)" in result
@pytest.mark.asyncio
async def test_list_directory_with_depth_control(client, test_graph, test_project):
"""Test listing directory with depth control."""
# Depth 1: should return only the test directory
result_depth_1 = await list_directory.fn(project=test_project.name, dir_name="/", depth=1)
assert isinstance(result_depth_1, str)
assert "Contents of '/' (depth 1):" in result_depth_1
assert "📁 test" in result_depth_1
assert "Total: 1 items (1 directory)" in result_depth_1
# Depth 2: should return directory + its files
result_depth_2 = await list_directory.fn(project=test_project.name, dir_name="/", depth=2)
assert isinstance(result_depth_2, str)
assert "Contents of '/' (depth 2):" in result_depth_2
assert "📁 test" in result_depth_2
assert "📄 Connected Entity 1.md" in result_depth_2
assert "📄 Connected Entity 2.md" in result_depth_2
assert "📄 Deep Entity.md" in result_depth_2
assert "📄 Deeper Entity.md" in result_depth_2
assert "📄 Root.md" in result_depth_2
assert "Total: 6 items (1 directory, 5 files)" in result_depth_2
@pytest.mark.asyncio
async def test_list_directory_nonexistent_path(client, test_graph, test_project):
"""Test listing nonexistent directory."""
result = await list_directory.fn(project=test_project.name, dir_name="/nonexistent")
assert isinstance(result, str)
assert "No files found in directory '/nonexistent'" in result
@pytest.mark.asyncio
async def test_list_directory_glob_no_matches(client, test_graph, test_project):
"""Test listing directory with glob that matches nothing."""
result = await list_directory.fn(
project=test_project.name, dir_name="/test", file_name_glob="*.xyz"
)
assert isinstance(result, str)
assert "No files found in directory '/test' matching '*.xyz'" in result
@pytest.mark.asyncio
async def test_list_directory_with_created_notes(client, test_project):
"""Test listing directory with dynamically created notes."""
# Create some test notes
await write_note.fn(
project=test_project.name,
title="Project Planning",
folder="projects",
content="# Project Planning\nThis is about planning projects.",
tags=["planning", "project"],
)
await write_note.fn(
project=test_project.name,
title="Meeting Notes",
folder="projects",
content="# Meeting Notes\nNotes from the meeting.",
tags=["meeting", "notes"],
)
await write_note.fn(
project=test_project.name,
title="Research Document",
folder="research",
content="# Research\nSome research findings.",
tags=["research"],
)
# List root directory
result_root = await list_directory.fn(project=test_project.name)
assert isinstance(result_root, str)
assert "Contents of '/' (depth 1):" in result_root
assert "📁 projects" in result_root
assert "📁 research" in result_root
assert "Total: 2 items (2 directories)" in result_root
# List projects directory
result_projects = await list_directory.fn(project=test_project.name, dir_name="/projects")
assert isinstance(result_projects, str)
assert "Contents of '/projects' (depth 1):" in result_projects
assert "📄 Project Planning.md" in result_projects
assert "📄 Meeting Notes.md" in result_projects
assert "Total: 2 items (2 files)" in result_projects
# Test glob filter for "Meeting"
result_meeting = await list_directory.fn(
project=test_project.name, dir_name="/projects", file_name_glob="*Meeting*"
)
assert isinstance(result_meeting, str)
assert "Files in '/projects' matching '*Meeting*' (depth 1):" in result_meeting
assert "📄 Meeting Notes.md" in result_meeting
assert "Project Planning.md" not in result_meeting
assert "Total: 1 items (1 file)" in result_meeting
@pytest.mark.asyncio
async def test_list_directory_path_normalization(client, test_graph, test_project):
"""Test that various path formats work correctly."""
# Test various equivalent path formats
paths_to_test = ["/test", "test", "/test/", "test/"]
for path in paths_to_test:
result = await list_directory.fn(project=test_project.name, dir_name=path)
# All should return the same number of items
assert "Total: 5 items (5 files)" in result
assert "📄 Connected Entity 1.md" in result
@pytest.mark.asyncio
async def test_list_directory_shows_file_metadata(client, test_graph, test_project):
"""Test that file metadata is displayed correctly."""
result = await list_directory.fn(project=test_project.name, dir_name="/test")
assert isinstance(result, str)
# Should show file names
assert "📄 Connected Entity 1.md" in result
assert "📄 Connected Entity 2.md" in result
# Should show directory paths
assert "test/Connected Entity 1.md" in result
assert "test/Connected Entity 2.md" in result
# Files should be listed after directories (but no directories in this case)
lines = result.split("\n")
file_lines = [line for line in lines if "📄" in line]
assert len(file_lines) == 5 # All 5 files from test_graph
```
--------------------------------------------------------------------------------
/specs/SPEC-12 OpenTelemetry Observability.md:
--------------------------------------------------------------------------------
```markdown
# SPEC-12: OpenTelemetry Observability
## Why
We need comprehensive observability for basic-memory-cloud to:
- Track request flows across our multi-tenant architecture (MCP → Cloud → API services)
- Debug performance issues and errors in production
- Understand user behavior and system usage patterns
- Correlate issues to specific tenants for targeted debugging
- Monitor service health and latency across the distributed system
Currently, we only have basic logging without request correlation or distributed tracing capabilities.
## What
Implement OpenTelemetry instrumentation across all basic-memory-cloud services with:
### Core Requirements
1. **Distributed Tracing**: End-to-end request tracing from MCP gateway through to tenant API instances
2. **Tenant Correlation**: All traces tagged with tenant_id, user_id, and workos_user_id
3. **Service Identification**: Clear service naming and namespace separation
4. **Auto-instrumentation**: Automatic tracing for FastAPI, SQLAlchemy, HTTP clients
5. **Grafana Cloud Integration**: Direct OTLP export to Grafana Cloud Tempo
### Services to Instrument
- **MCP Gateway** (basic-memory-mcp): Entry point with JWT extraction
- **Cloud Service** (basic-memory-cloud): Provisioning and management operations
- **API Service** (basic-memory-api): Tenant-specific instances
- **Worker Processes** (ARQ workers): Background job processing
### Key Trace Attributes
- `tenant.id`: UUID from UserProfile.tenant_id
- `user.id`: WorkOS user identifier
- `user.email`: User email for debugging
- `service.name`: Specific service identifier
- `service.namespace`: Environment (development/production)
- `operation.type`: Business operation (provision/update/delete)
- `tenant.app_name`: Fly.io app name for tenant instances
## How
### Phase 1: Setup OpenTelemetry SDK
1. Add OpenTelemetry dependencies to each service's pyproject.toml:
```python
"opentelemetry-distro[otlp]>=1.29.0",
"opentelemetry-instrumentation-fastapi>=0.50b0",
"opentelemetry-instrumentation-httpx>=0.50b0",
"opentelemetry-instrumentation-sqlalchemy>=0.50b0",
"opentelemetry-instrumentation-logging>=0.50b0",
```
2. Create shared telemetry initialization module (`apps/shared/telemetry.py`)
3. Configure Grafana Cloud OTLP endpoint via environment variables:
```bash
OTEL_EXPORTER_OTLP_ENDPOINT=https://otlp-gateway-prod-us-east-2.grafana.net/otlp
OTEL_EXPORTER_OTLP_HEADERS=Authorization=Basic[token]
OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf
```
### Phase 2: Instrument MCP Gateway
1. Extract tenant context from AuthKit JWT in middleware
2. Create root span with tenant attributes
3. Propagate trace context to downstream services via headers
### Phase 3: Instrument Cloud Service
1. Continue trace from MCP gateway
2. Add operation-specific attributes (provisioning events)
3. Instrument ARQ worker jobs for async operations
4. Track Fly.io API calls and latency
### Phase 4: Instrument API Service
1. Extract tenant context from JWT
2. Add machine-specific metadata (instance ID, region)
3. Instrument database operations with SQLAlchemy
4. Track MCP protocol operations
### Phase 5: Configure and Deploy
1. Add OTLP configuration to `.env.example` and `.env.example.secrets`
2. Set Fly.io secrets for production deployment
3. Update Dockerfiles to use `opentelemetry-instrument` wrapper
4. Deploy to development environment first for testing
## How to Evaluate
### Success Criteria
1. **End-to-end traces visible in Grafana Cloud** showing complete request flow
2. **Tenant filtering works** - Can filter traces by tenant_id to see all requests for a user
3. **Service maps accurate** - Grafana shows correct service dependencies
4. **Performance overhead < 5%** - Minimal latency impact from instrumentation
5. **Error correlation** - Can trace errors back to specific tenant and operation
### Testing Checklist
- [x] Single request creates connected trace across all services
- [x] Tenant attributes present on all spans
- [x] Background jobs (ARQ) appear in traces
- [x] Database queries show in trace timeline
- [x] HTTP calls to Fly.io API tracked
- [x] Traces exported successfully to Grafana Cloud
- [x] Can search traces by tenant_id in Grafana
- [x] Service dependency graph shows correct flow
### Monitoring Success
- All services reporting traces to Grafana Cloud
- No OTLP export errors in logs
- Trace sampling working correctly (if implemented)
- Resource usage acceptable (CPU/memory)
## Dependencies
- Grafana Cloud account with OTLP endpoint configured
- OpenTelemetry Python SDK v1.29.0+
- FastAPI instrumentation compatibility
- Network access from Fly.io to Grafana Cloud
## Implementation Assignment
**Recommended Agent**: python-developer
- Requires Python/FastAPI expertise
- Needs understanding of distributed systems
- Must implement middleware and context propagation
- Should understand OpenTelemetry SDK and instrumentation
## Follow-up Tasks
### Enhanced Log Correlation
While basic trace-to-log correlation works automatically via OpenTelemetry logging instrumentation, consider adding structured logging for improved log filtering:
1. **Structured Logging Context**: Add `logger.bind()` calls to inject tenant/user context directly into log records
2. **Custom Loguru Formatter**: Extract OpenTelemetry span attributes for better log readability
3. **Direct Log Filtering**: Enable searching logs directly by tenant_id, workflow_id without going through traces
This would complement the existing automatic trace correlation and provide better log search capabilities.
## Alternative Solution: Logfire
After implementing OpenTelemetry with Grafana Cloud, we discovered limitations in the observability experience:
- Traces work but lack useful context without correlated logs
- Setting up log correlation with Grafana is complex and requires additional infrastructure
- The developer experience for Python observability is suboptimal
### Logfire Evaluation
**Pydantic Logfire** offers a compelling alternative that addresses your specific requirements:
#### Core Requirements Match
- ✅ **User Activity Tracking**: Automatic request tracing with business context
- ✅ **Error Monitoring**: Built-in exception tracking with full context
- ✅ **Performance Metrics**: Automatic latency and performance monitoring
- ✅ **Request Tracing**: Native distributed tracing across services
- ✅ **Log Correlation**: Seamless trace-to-log correlation without setup
#### Key Advantages
1. **Python-First Design**: Built specifically for Python/FastAPI applications by the Pydantic team
2. **Simple Integration**: `pip install logfire` + `logfire.configure()` vs complex OTLP setup
3. **Automatic Correlation**: Logs automatically include trace context without manual configuration
4. **Real-time SQL Interface**: Query spans and logs using SQL with auto-completion
5. **Better Developer UX**: Purpose-built observability UI vs generic Grafana dashboards
6. **Loguru Integration**: `logger.configure(handlers=[logfire.loguru_handler()])` maintains existing logging
#### Pricing Assessment
- **Free Tier**: 10M spans/month (suitable for development and small production workloads)
- **Transparent Pricing**: $1 per million spans/metrics after free tier
- **No Hidden Costs**: No per-host fees, only usage-based metering
- **Production Ready**: Recently exited beta, enterprise features available
#### Migration Path
The existing OpenTelemetry instrumentation is compatible - Logfire uses OpenTelemetry under the hood, so the current spans and attributes would work unchanged.
### Recommendation
**Consider migrating to Logfire** for the following reasons:
1. It directly addresses the "next to useless" traces problem by providing integrated logs
2. Dramatically simpler setup and maintenance compared to Grafana Cloud + custom log correlation
3. Better ROI on observability investment with purpose-built Python tooling
4. Free tier sufficient for current development needs with clear scaling path
The current Grafana Cloud implementation provides a solid foundation and could remain as a backup/export target, while Logfire becomes the primary observability platform.
## Status
**Created**: 2024-01-28
**Status**: Completed (OpenTelemetry + Grafana Cloud)
**Next Phase**: Evaluate Logfire migration
**Priority**: High - Critical for production observability
```
--------------------------------------------------------------------------------
/tests/mcp/test_prompts.py:
--------------------------------------------------------------------------------
```python
"""Tests for MCP prompts."""
from datetime import timezone, datetime
import pytest
from basic_memory.mcp.prompts.continue_conversation import continue_conversation
from basic_memory.mcp.prompts.search import search_prompt
from basic_memory.mcp.prompts.recent_activity import recent_activity_prompt
@pytest.mark.asyncio
async def test_continue_conversation_with_topic(client, test_graph):
"""Test continue_conversation with a topic."""
# We can use the test_graph fixture which already has relevant content
# Call the function with a topic that should match existing content
result = await continue_conversation.fn(topic="Root", timeframe="1w") # pyright: ignore [reportGeneralTypeIssues]
# Check that the result contains expected content
assert "Continuing conversation on: Root" in result # pyright: ignore [reportOperatorIssue]
assert "This is a memory retrieval session" in result # pyright: ignore [reportOperatorIssue]
assert "Start by executing one of the suggested commands" in result # pyright: ignore [reportOperatorIssue]
@pytest.mark.asyncio
async def test_continue_conversation_with_recent_activity(client, test_graph):
"""Test continue_conversation with no topic, using recent activity."""
# Call the function without a topic
result = await continue_conversation.fn(timeframe="1w") # pyright: ignore [reportGeneralTypeIssues]
# Check that the result contains expected content for recent activity
assert "Continuing conversation on: Recent Activity" in result # pyright: ignore [reportOperatorIssue]
assert "This is a memory retrieval session" in result # pyright: ignore [reportOperatorIssue]
assert "Please use the available basic-memory tools" in result # pyright: ignore [reportOperatorIssue]
assert "Next Steps" in result # pyright: ignore [reportOperatorIssue]
@pytest.mark.asyncio
async def test_continue_conversation_no_results(client):
"""Test continue_conversation when no results are found."""
# Call with a non-existent topic
result = await continue_conversation.fn(topic="NonExistentTopic", timeframe="1w") # pyright: ignore [reportGeneralTypeIssues]
# Check the response indicates no results found
assert "Continuing conversation on: NonExistentTopic" in result # pyright: ignore [reportOperatorIssue]
assert "The supplied query did not return any information" in result # pyright: ignore [reportOperatorIssue]
@pytest.mark.asyncio
async def test_continue_conversation_creates_structured_suggestions(client, test_graph):
"""Test that continue_conversation generates structured tool usage suggestions."""
# Call the function with a topic that should match existing content
result = await continue_conversation.fn(topic="Root", timeframe="1w") # pyright: ignore [reportGeneralTypeIssues]
# Verify the response includes clear tool usage instructions
assert "start by executing one of the suggested commands" in result.lower() # pyright: ignore [reportAttributeAccessIssue]
# Check that the response contains tool call examples
assert "read_note" in result # pyright: ignore [reportOperatorIssue]
assert "search" in result # pyright: ignore [reportOperatorIssue]
assert "recent_activity" in result # pyright: ignore [reportOperatorIssue]
# Search prompt tests
@pytest.mark.asyncio
async def test_search_prompt_with_results(client, test_graph):
"""Test search_prompt with a query that returns results."""
# Call the function with a query that should match existing content
result = await search_prompt.fn("Root") # pyright: ignore [reportGeneralTypeIssues]
# Check the response contains expected content
assert 'Search Results for: "Root"' in result # pyright: ignore [reportOperatorIssue]
assert "I found " in result # pyright: ignore [reportOperatorIssue]
assert "You can view this content with: `read_note" in result # pyright: ignore [reportOperatorIssue]
assert "Synthesize and Capture Knowledge" in result # pyright: ignore [reportOperatorIssue]
@pytest.mark.asyncio
async def test_search_prompt_with_timeframe(client, test_graph):
"""Test search_prompt with a timeframe."""
# Call the function with a query and timeframe
result = await search_prompt.fn("Root", timeframe="1w") # pyright: ignore [reportGeneralTypeIssues]
# Check the response includes timeframe information
assert 'Search Results for: "Root" (after 7d)' in result # pyright: ignore [reportOperatorIssue]
assert "I found " in result # pyright: ignore [reportOperatorIssue]
@pytest.mark.asyncio
async def test_search_prompt_no_results(client):
"""Test search_prompt when no results are found."""
# Call with a query that won't match anything
result = await search_prompt.fn("XYZ123NonExistentQuery") # pyright: ignore [reportGeneralTypeIssues]
# Check the response indicates no results found
assert 'Search Results for: "XYZ123NonExistentQuery"' in result # pyright: ignore [reportOperatorIssue]
assert "I couldn't find any results for this query" in result # pyright: ignore [reportOperatorIssue]
assert "Opportunity to Capture Knowledge" in result # pyright: ignore [reportOperatorIssue]
assert "write_note" in result # pyright: ignore [reportOperatorIssue]
# Test utils
def test_prompt_context_with_file_path_no_permalink():
"""Test format_prompt_context with items that have file_path but no permalink."""
from basic_memory.mcp.prompts.utils import (
format_prompt_context,
PromptContext,
PromptContextItem,
)
from basic_memory.schemas.memory import EntitySummary
# Create a mock context with a file that has no permalink (like a binary file)
test_entity = EntitySummary(
type="entity",
title="Test File",
permalink=None, # No permalink
file_path="test_file.pdf",
created_at=datetime.now(timezone.utc),
)
context = PromptContext(
topic="Test Topic",
timeframe="1d",
results=[
PromptContextItem(
primary_results=[test_entity],
related_results=[test_entity], # Also use as related
)
],
)
# Format the context
result = format_prompt_context(context)
# Check that file_path is used when permalink is missing
assert "test_file.pdf" in result
assert "read_file" in result
# Recent activity prompt tests
@pytest.mark.asyncio
async def test_recent_activity_prompt_discovery_mode(client, test_project, test_graph):
"""Test recent_activity_prompt in discovery mode (no project)."""
# Call the function in discovery mode
result = await recent_activity_prompt.fn(timeframe="1w") # pyright: ignore [reportGeneralTypeIssues]
# Check the response contains expected discovery mode content
assert "Recent Activity Across All Projects" in result # pyright: ignore [reportOperatorIssue]
assert "Cross-Project Activity Discovery" in result # pyright: ignore [reportOperatorIssue]
assert "write_note" in result # pyright: ignore [reportOperatorIssue]
@pytest.mark.asyncio
async def test_recent_activity_prompt_project_specific(client, test_project, test_graph):
"""Test recent_activity_prompt in project-specific mode."""
# Call the function with a specific project
result = await recent_activity_prompt.fn(timeframe="1w", project=test_project.name) # pyright: ignore [reportGeneralTypeIssues]
# Check the response contains expected project-specific content
assert f"Recent Activity in {test_project.name}" in result # pyright: ignore [reportOperatorIssue]
assert "Opportunity to Capture Activity Summary" in result # pyright: ignore [reportOperatorIssue]
assert f"recent activity in {test_project.name}" in result # pyright: ignore [reportOperatorIssue]
assert "write_note" in result # pyright: ignore [reportOperatorIssue]
@pytest.mark.asyncio
async def test_recent_activity_prompt_with_custom_timeframe(client, test_project, test_graph):
"""Test recent_activity_prompt with custom timeframe."""
# Call the function with a custom timeframe in discovery mode
result = await recent_activity_prompt.fn(timeframe="1d") # pyright: ignore [reportGeneralTypeIssues]
# Check the response includes the custom timeframe
assert "Recent Activity Across All Projects (1d)" in result # pyright: ignore [reportOperatorIssue]
```
--------------------------------------------------------------------------------
/tests/sync/test_watch_service_reload.py:
--------------------------------------------------------------------------------
```python
"""Tests for watch service project reloading functionality."""
import asyncio
from unittest.mock import AsyncMock, patch
import pytest
from basic_memory.config import BasicMemoryConfig
from basic_memory.models.project import Project
from basic_memory.sync.watch_service import WatchService
@pytest.mark.asyncio
async def test_schedule_restart_uses_config_interval():
"""Test that _schedule_restart uses the configured interval."""
config = BasicMemoryConfig(watch_project_reload_interval=2)
repo = AsyncMock()
watch_service = WatchService(config, repo, quiet=True)
stop_event = asyncio.Event()
# Mock sleep to capture the interval
with patch("asyncio.sleep") as mock_sleep:
mock_sleep.return_value = None # Make it return immediately
await watch_service._schedule_restart(stop_event)
# Verify sleep was called with config interval
mock_sleep.assert_called_once_with(2)
# Verify stop event was set
assert stop_event.is_set()
@pytest.mark.asyncio
async def test_watch_projects_cycle_handles_empty_project_list():
"""Test that _watch_projects_cycle handles empty project list."""
config = BasicMemoryConfig()
repo = AsyncMock()
watch_service = WatchService(config, repo, quiet=True)
stop_event = asyncio.Event()
stop_event.set() # Set immediately to exit quickly
# Mock awatch to track calls
with patch("basic_memory.sync.watch_service.awatch") as mock_awatch:
# Create an async iterator that yields nothing
async def empty_iterator():
return
yield # unreachable, just for async generator
mock_awatch.return_value = empty_iterator()
# Should not raise error with empty project list
await watch_service._watch_projects_cycle([], stop_event)
# awatch should be called with no paths
mock_awatch.assert_called_once_with(
debounce=config.sync_delay,
watch_filter=watch_service.filter_changes,
recursive=True,
stop_event=stop_event,
)
@pytest.mark.asyncio
async def test_run_handles_no_projects():
"""Test that run method handles no active projects gracefully."""
config = BasicMemoryConfig()
repo = AsyncMock()
repo.get_active_projects.return_value = [] # No projects
watch_service = WatchService(config, repo, quiet=True)
call_count = 0
def stop_after_one_call(*args):
nonlocal call_count
call_count += 1
if call_count >= 1:
watch_service.state.running = False
return AsyncMock()
# Mock sleep and write_status to track behavior
with patch("asyncio.sleep", side_effect=stop_after_one_call) as mock_sleep:
with patch.object(watch_service, "write_status", return_value=None):
await watch_service.run()
# Should have slept for 30 seconds when no projects found
mock_sleep.assert_called_with(30)
@pytest.mark.asyncio
async def test_run_reloads_projects_each_cycle():
"""Test that run method reloads projects in each cycle."""
config = BasicMemoryConfig()
repo = AsyncMock()
# Return different projects on each call
projects_call_1 = [Project(id=1, name="project1", path="/tmp/project1", permalink="project1")]
projects_call_2 = [
Project(id=1, name="project1", path="/tmp/project1", permalink="project1"),
Project(id=2, name="project2", path="/tmp/project2", permalink="project2"),
]
repo.get_active_projects.side_effect = [projects_call_1, projects_call_2]
watch_service = WatchService(config, repo, quiet=True)
cycle_count = 0
async def mock_watch_cycle(projects, stop_event):
nonlocal cycle_count
cycle_count += 1
if cycle_count >= 2:
watch_service.state.running = False
with patch.object(watch_service, "_watch_projects_cycle", side_effect=mock_watch_cycle):
with patch.object(watch_service, "write_status", return_value=None):
await watch_service.run()
# Should have reloaded projects twice
assert repo.get_active_projects.call_count == 2
# Should have completed two cycles
assert cycle_count == 2
@pytest.mark.asyncio
async def test_run_continues_after_cycle_error():
"""Test that run continues to next cycle after error in watch cycle."""
config = BasicMemoryConfig()
repo = AsyncMock()
repo.get_active_projects.return_value = [
Project(id=1, name="test", path="/tmp/test", permalink="test")
]
watch_service = WatchService(config, repo, quiet=True)
call_count = 0
async def failing_watch_cycle(projects, stop_event):
nonlocal call_count
call_count += 1
if call_count == 1:
raise Exception("Simulated error")
else:
# Stop after second call
watch_service.state.running = False
with patch.object(watch_service, "_watch_projects_cycle", side_effect=failing_watch_cycle):
with patch("asyncio.sleep") as mock_sleep:
with patch.object(watch_service, "write_status", return_value=None):
await watch_service.run()
# Should have tried both cycles
assert call_count == 2
# Should have slept for error retry
mock_sleep.assert_called_with(5)
@pytest.mark.asyncio
async def test_timer_task_cancelled_properly():
"""Test that timer task is cancelled when cycle completes."""
config = BasicMemoryConfig()
repo = AsyncMock()
repo.get_active_projects.return_value = [
Project(id=1, name="test", path="/tmp/test", permalink="test")
]
watch_service = WatchService(config, repo, quiet=True)
# Track created timer tasks
created_tasks = []
original_create_task = asyncio.create_task
def track_create_task(coro):
task = original_create_task(coro)
created_tasks.append(task)
return task
async def quick_watch_cycle(projects, stop_event):
# Complete immediately
watch_service.state.running = False
with patch("asyncio.create_task", side_effect=track_create_task):
with patch.object(watch_service, "_watch_projects_cycle", side_effect=quick_watch_cycle):
with patch.object(watch_service, "write_status", return_value=None):
await watch_service.run()
# Should have created one timer task
assert len(created_tasks) == 1
# Timer task should be cancelled or done
timer_task = created_tasks[0]
assert timer_task.cancelled() or timer_task.done()
@pytest.mark.asyncio
async def test_new_project_addition_scenario():
"""Test the main scenario: new project is detected when added while watching."""
config = BasicMemoryConfig()
repo = AsyncMock()
# Initially one project
initial_projects = [Project(id=1, name="existing", path="/tmp/existing", permalink="existing")]
# After some time, new project is added
updated_projects = [
Project(id=1, name="existing", path="/tmp/existing", permalink="existing"),
Project(id=2, name="new", path="/tmp/new", permalink="new"),
]
# Track which project lists were used
project_lists_used = []
def mock_get_projects():
if len(project_lists_used) < 2:
project_lists_used.append(initial_projects)
return initial_projects
else:
project_lists_used.append(updated_projects)
return updated_projects
repo.get_active_projects.side_effect = mock_get_projects
watch_service = WatchService(config, repo, quiet=True)
cycle_count = 0
async def counting_watch_cycle(projects, stop_event):
nonlocal cycle_count
cycle_count += 1
# Stop after enough cycles to test project reload
if cycle_count >= 3:
watch_service.state.running = False
with patch.object(watch_service, "_watch_projects_cycle", side_effect=counting_watch_cycle):
with patch.object(watch_service, "write_status", return_value=None):
await watch_service.run()
# Should have reloaded projects multiple times
assert repo.get_active_projects.call_count >= 3
# Should have completed multiple cycles
assert cycle_count == 3
# Should have seen both project configurations
assert len(project_lists_used) >= 3
assert any(len(projects) == 1 for projects in project_lists_used) # Initial state
assert any(len(projects) == 2 for projects in project_lists_used) # After addition
```
--------------------------------------------------------------------------------
/src/basic_memory/markdown/entity_parser.py:
--------------------------------------------------------------------------------
```python
"""Parser for markdown files into Entity objects.
Uses markdown-it with plugins to parse structured data from markdown content.
"""
from dataclasses import dataclass, field
from datetime import date, datetime
from pathlib import Path
from typing import Any, Optional
import dateparser
import frontmatter
import yaml
from loguru import logger
from markdown_it import MarkdownIt
from basic_memory.markdown.plugins import observation_plugin, relation_plugin
from basic_memory.markdown.schemas import (
EntityFrontmatter,
EntityMarkdown,
Observation,
Relation,
)
from basic_memory.utils import parse_tags
md = MarkdownIt().use(observation_plugin).use(relation_plugin)
def normalize_frontmatter_value(value: Any) -> Any:
"""Normalize frontmatter values to safe types for processing.
PyYAML automatically converts various string-like values into native Python types:
- Date strings ("2025-10-24") → datetime.date objects
- Numbers ("1.0") → int or float
- Booleans ("true") → bool
- Lists → list objects
This can cause AttributeError when code expects strings and calls string methods
like .strip() on these values (see GitHub issue #236).
This function normalizes all frontmatter values to safe types:
- Dates/datetimes → ISO format strings
- Numbers (int/float) → strings
- Booleans → strings ("True"/"False")
- Lists → preserved as lists, but items are recursively normalized
- Dicts → preserved as dicts, but values are recursively normalized
- Strings → kept as-is
- None → kept as None
Args:
value: The frontmatter value to normalize
Returns:
The normalized value safe for string operations
Example:
>>> normalize_frontmatter_value(datetime.date(2025, 10, 24))
'2025-10-24'
>>> normalize_frontmatter_value([datetime.date(2025, 10, 24), "tag", 123])
['2025-10-24', 'tag', '123']
>>> normalize_frontmatter_value(True)
'True'
"""
# Convert date/datetime objects to ISO format strings
if isinstance(value, datetime):
return value.isoformat()
if isinstance(value, date):
return value.isoformat()
# Convert boolean to string (must come before int check since bool is subclass of int)
if isinstance(value, bool):
return str(value)
# Convert numbers to strings
if isinstance(value, (int, float)):
return str(value)
# Recursively process lists (preserve as list, normalize items)
if isinstance(value, list):
return [normalize_frontmatter_value(item) for item in value]
# Recursively process dicts (preserve as dict, normalize values)
if isinstance(value, dict):
return {key: normalize_frontmatter_value(val) for key, val in value.items()}
# Keep strings and None as-is
return value
def normalize_frontmatter_metadata(metadata: dict) -> dict:
"""Normalize all values in frontmatter metadata dict.
Converts date/datetime objects to ISO format strings to prevent
AttributeError when code expects strings (GitHub issue #236).
Args:
metadata: The frontmatter metadata dictionary
Returns:
A new dictionary with all values normalized
"""
return {key: normalize_frontmatter_value(value) for key, value in metadata.items()}
@dataclass
class EntityContent:
content: str
observations: list[Observation] = field(default_factory=list)
relations: list[Relation] = field(default_factory=list)
def parse(content: str) -> EntityContent:
"""Parse markdown content into EntityMarkdown."""
# Parse content for observations and relations using markdown-it
observations = []
relations = []
if content:
for token in md.parse(content):
# check for observations and relations
if token.meta:
if "observation" in token.meta:
obs = token.meta["observation"]
observation = Observation.model_validate(obs)
observations.append(observation)
if "relations" in token.meta:
rels = token.meta["relations"]
relations.extend([Relation.model_validate(r) for r in rels])
return EntityContent(
content=content,
observations=observations,
relations=relations,
)
# def parse_tags(tags: Any) -> list[str]:
# """Parse tags into list of strings."""
# if isinstance(tags, (list, tuple)):
# return [str(t).strip() for t in tags if str(t).strip()]
# return [t.strip() for t in tags.split(",") if t.strip()]
class EntityParser:
"""Parser for markdown files into Entity objects."""
def __init__(self, base_path: Path):
"""Initialize parser with base path for relative permalink generation."""
self.base_path = base_path.resolve()
def parse_date(self, value: Any) -> Optional[datetime]:
"""Parse date strings using dateparser for maximum flexibility.
Supports human friendly formats like:
- 2024-01-15
- Jan 15, 2024
- 2024-01-15 10:00 AM
- yesterday
- 2 days ago
"""
if isinstance(value, datetime):
return value
if isinstance(value, str):
parsed = dateparser.parse(value)
if parsed:
return parsed
return None
async def parse_file(self, path: Path | str) -> EntityMarkdown:
"""Parse markdown file into EntityMarkdown."""
# Check if the path is already absolute
if (
isinstance(path, Path)
and path.is_absolute()
or (isinstance(path, str) and Path(path).is_absolute())
):
absolute_path = Path(path)
else:
absolute_path = self.get_file_path(path)
# Parse frontmatter and content using python-frontmatter
file_content = absolute_path.read_text(encoding="utf-8")
return await self.parse_file_content(absolute_path, file_content)
def get_file_path(self, path):
"""Get absolute path for a file using the base path for the project."""
return self.base_path / path
async def parse_file_content(self, absolute_path, file_content):
# Parse frontmatter with proper error handling for malformed YAML (issue #185)
try:
post = frontmatter.loads(file_content)
except yaml.YAMLError as e:
# Log the YAML parsing error with file context
logger.warning(
f"Failed to parse YAML frontmatter in {absolute_path}: {e}. "
f"Treating file as plain markdown without frontmatter."
)
# Create a post with no frontmatter - treat entire content as markdown
post = frontmatter.Post(file_content, metadata={})
# Extract file stat info
file_stats = absolute_path.stat()
# Normalize frontmatter values to prevent AttributeError on date objects (issue #236)
# PyYAML automatically converts date strings like "2025-10-24" to datetime.date objects
# This normalization converts them back to ISO format strings to ensure compatibility
# with code that expects string values
metadata = normalize_frontmatter_metadata(post.metadata)
# Ensure required fields have defaults (issue #184, #387)
# Handle title - use default if missing, None/null, empty, or string "None"
title = metadata.get("title")
if not title or title == "None":
metadata["title"] = absolute_path.stem
else:
metadata["title"] = title
# Handle type - use default if missing OR explicitly set to None/null
entity_type = metadata.get("type")
metadata["type"] = entity_type if entity_type is not None else "note"
tags = parse_tags(metadata.get("tags", [])) # pyright: ignore
if tags:
metadata["tags"] = tags
# frontmatter - use metadata with defaults applied
entity_frontmatter = EntityFrontmatter(
metadata=metadata,
)
entity_content = parse(post.content)
return EntityMarkdown(
frontmatter=entity_frontmatter,
content=post.content,
observations=entity_content.observations,
relations=entity_content.relations,
created=datetime.fromtimestamp(file_stats.st_ctime).astimezone(),
modified=datetime.fromtimestamp(file_stats.st_mtime).astimezone(),
)
```
--------------------------------------------------------------------------------
/src/basic_memory/api/template_loader.py:
--------------------------------------------------------------------------------
```python
"""Template loading and rendering utilities for the Basic Memory API.
This module handles the loading and rendering of Handlebars templates from the
templates directory, providing a consistent interface for all prompt-related
formatting needs.
"""
import textwrap
from typing import Dict, Any, Optional, Callable
from pathlib import Path
import json
import datetime
import pybars
from loguru import logger
# Get the base path of the templates directory
TEMPLATES_DIR = Path(__file__).parent.parent / "templates"
# Custom helpers for Handlebars
def _date_helper(this, *args):
"""Format a date using the given format string."""
if len(args) < 1: # pragma: no cover
return ""
timestamp = args[0]
format_str = args[1] if len(args) > 1 else "%Y-%m-%d %H:%M"
if hasattr(timestamp, "strftime"):
result = timestamp.strftime(format_str)
elif isinstance(timestamp, str):
try:
dt = datetime.datetime.fromisoformat(timestamp)
result = dt.strftime(format_str)
except ValueError:
result = timestamp
else:
result = str(timestamp) # pragma: no cover
return pybars.strlist([result])
def _default_helper(this, *args):
"""Return a default value if the given value is None or empty."""
if len(args) < 2: # pragma: no cover
return ""
value = args[0]
default_value = args[1]
result = default_value if value is None or value == "" else value
# Use strlist for consistent handling of HTML escaping
return pybars.strlist([str(result)])
def _capitalize_helper(this, *args):
"""Capitalize the first letter of a string."""
if len(args) < 1: # pragma: no cover
return ""
text = args[0]
if not text or not isinstance(text, str): # pragma: no cover
result = ""
else:
result = text.capitalize()
return pybars.strlist([result])
def _round_helper(this, *args):
"""Round a number to the specified number of decimal places."""
if len(args) < 1:
return ""
value = args[0]
decimal_places = args[1] if len(args) > 1 else 2
try:
result = str(round(float(value), int(decimal_places)))
except (ValueError, TypeError):
result = str(value)
return pybars.strlist([result])
def _size_helper(this, *args):
"""Return the size/length of a collection."""
if len(args) < 1:
return 0
value = args[0]
if value is None:
result = "0"
elif isinstance(value, (list, tuple, dict, str)):
result = str(len(value)) # pragma: no cover
else: # pragma: no cover
result = "0"
return pybars.strlist([result])
def _json_helper(this, *args):
"""Convert a value to a JSON string."""
if len(args) < 1: # pragma: no cover
return "{}"
value = args[0]
# For pybars, we need to return a SafeString to prevent HTML escaping
result = json.dumps(value) # pragma: no cover
# Safe string implementation to prevent HTML escaping
return pybars.strlist([result])
def _math_helper(this, *args):
"""Perform basic math operations."""
if len(args) < 3:
return pybars.strlist(["Math error: Insufficient arguments"])
lhs = args[0]
operator = args[1]
rhs = args[2]
try:
lhs = float(lhs)
rhs = float(rhs)
if operator == "+":
result = str(lhs + rhs)
elif operator == "-":
result = str(lhs - rhs)
elif operator == "*":
result = str(lhs * rhs)
elif operator == "/":
result = str(lhs / rhs)
else:
result = f"Unsupported operator: {operator}"
except (ValueError, TypeError) as e:
result = f"Math error: {e}"
return pybars.strlist([result])
def _lt_helper(this, *args):
"""Check if left hand side is less than right hand side."""
if len(args) < 2:
return False
lhs = args[0]
rhs = args[1]
try:
return float(lhs) < float(rhs)
except (ValueError, TypeError):
# Fall back to string comparison for non-numeric values
return str(lhs) < str(rhs)
def _if_cond_helper(this, options, condition):
"""Block helper for custom if conditionals."""
if condition:
return options["fn"](this)
elif "inverse" in options:
return options["inverse"](this)
return "" # pragma: no cover
def _dedent_helper(this, options):
"""Dedent a block of text to remove common leading whitespace.
Usage:
{{#dedent}}
This text will have its
common leading whitespace removed
while preserving relative indentation.
{{/dedent}}
"""
if "fn" not in options: # pragma: no cover
return ""
# Get the content from the block
content = options["fn"](this)
# Convert to string if it's a strlist
if (
isinstance(content, list)
or hasattr(content, "__iter__")
and not isinstance(content, (str, bytes))
):
content_str = "".join(str(item) for item in content) # pragma: no cover
else:
content_str = str(content) # pragma: no cover
# Add trailing and leading newlines to ensure proper dedenting
# This is critical for textwrap.dedent to work correctly with mixed content
content_str = "\n" + content_str + "\n"
# Use textwrap to dedent the content and remove the extra newlines we added
dedented = textwrap.dedent(content_str)[1:-1]
# Return as a SafeString to prevent HTML escaping
return pybars.strlist([dedented]) # pragma: no cover
class TemplateLoader:
"""Loader for Handlebars templates.
This class is responsible for loading templates from disk and rendering
them with the provided context data.
"""
def __init__(self, template_dir: Optional[str] = None):
"""Initialize the template loader.
Args:
template_dir: Optional custom template directory path
"""
self.template_dir = Path(template_dir) if template_dir else TEMPLATES_DIR
self.template_cache: Dict[str, Callable] = {}
self.compiler = pybars.Compiler()
# Set up standard helpers
self.helpers = {
"date": _date_helper,
"default": _default_helper,
"capitalize": _capitalize_helper,
"round": _round_helper,
"size": _size_helper,
"json": _json_helper,
"math": _math_helper,
"lt": _lt_helper,
"if_cond": _if_cond_helper,
"dedent": _dedent_helper,
}
logger.debug(f"Initialized template loader with directory: {self.template_dir}")
def get_template(self, template_path: str) -> Callable:
"""Get a template by path, using cache if available.
Args:
template_path: The path to the template, relative to the templates directory
Returns:
The compiled Handlebars template
Raises:
FileNotFoundError: If the template doesn't exist
"""
if template_path in self.template_cache:
return self.template_cache[template_path]
# Convert from Liquid-style path to Handlebars extension
if template_path.endswith(".liquid"):
template_path = template_path.replace(".liquid", ".hbs")
elif not template_path.endswith(".hbs"):
template_path = f"{template_path}.hbs"
full_path = self.template_dir / template_path
if not full_path.exists():
raise FileNotFoundError(f"Template not found: {full_path}")
with open(full_path, "r", encoding="utf-8") as f:
template_str = f.read()
template = self.compiler.compile(template_str)
self.template_cache[template_path] = template
logger.debug(f"Loaded template: {template_path}")
return template
async def render(self, template_path: str, context: Dict[str, Any]) -> str:
"""Render a template with the given context.
Args:
template_path: The path to the template, relative to the templates directory
context: The context data to pass to the template
Returns:
The rendered template as a string
"""
template = self.get_template(template_path)
return template(context, helpers=self.helpers)
def clear_cache(self) -> None:
"""Clear the template cache."""
self.template_cache.clear()
logger.debug("Template cache cleared")
# Global template loader instance
template_loader = TemplateLoader()
```
--------------------------------------------------------------------------------
/specs/SPEC-15 Configuration Persistence via Tigris for Cloud Tenants.md:
--------------------------------------------------------------------------------
```markdown
---
title: 'SPEC-15: Configuration Persistence via Tigris for Cloud Tenants'
type: spec
permalink: specs/spec-14-config-persistence-tigris
tags:
- persistence
- tigris
- multi-tenant
- infrastructure
- configuration
status: draft
---
# SPEC-15: Configuration Persistence via Tigris for Cloud Tenants
## Why
We need to persist Basic Memory configuration across Fly.io deployments without using persistent volumes or external databases.
**Current Problems:**
- `~/.basic-memory/config.json` lost on every deployment (project configuration)
- `~/.basic-memory/memory.db` lost on every deployment (search index)
- Persistent volumes break clean deployment workflow
- External databases (Turso) require per-tenant token management
**The Insight:**
The SQLite database is just an **index cache** of the markdown files. It can be rebuilt in seconds from the source markdown files in Tigris. Only the small `config.json` file needs true persistence.
**Solution:**
- Store `config.json` in Tigris bucket (persistent, small file)
- Rebuild `memory.db` on startup from markdown files (fast, ephemeral)
- No persistent volumes, no external databases, no token management
## What
Store Basic Memory configuration in the Tigris bucket and rebuild the database index on tenant machine startup.
**Affected Components:**
- `basic-memory/src/basic_memory/config.py` - Add configurable config directory
**Architecture:**
```bash
# Tigris Bucket (persistent, mounted at /app/data)
/app/data/
├── .basic-memory/
│ └── config.json # ← Project configuration (persistent, accessed via BASIC_MEMORY_CONFIG_DIR)
└── basic-memory/ # ← Markdown files (persistent, BASIC_MEMORY_HOME)
├── project1/
└── project2/
# Fly Machine (ephemeral)
/app/.basic-memory/
└── memory.db # ← Rebuilt on startup (fast local disk)
```
## How (High Level)
### 1. Add Configurable Config Directory to Basic Memory
Currently `ConfigManager` hardcodes `~/.basic-memory/config.json`. Add environment variable to override:
```python
# basic-memory/src/basic_memory/config.py
class ConfigManager:
"""Manages Basic Memory configuration."""
def __init__(self) -> None:
"""Initialize the configuration manager."""
home = os.getenv("HOME", Path.home())
if isinstance(home, str):
home = Path(home)
# Allow override via environment variable
if config_dir := os.getenv("BASIC_MEMORY_CONFIG_DIR"):
self.config_dir = Path(config_dir)
else:
self.config_dir = home / DATA_DIR_NAME
self.config_file = self.config_dir / CONFIG_FILE_NAME
# Ensure config directory exists
self.config_dir.mkdir(parents=True, exist_ok=True)
```
### 2. Rebuild Database on Startup
Basic Memory already has the sync functionality. Just ensure it runs on startup:
```python
# apps/api/src/basic_memory_cloud_api/main.py
@app.on_event("startup")
async def startup_sync():
"""Rebuild database index from Tigris markdown files."""
logger.info("Starting database rebuild from Tigris")
# Initialize file sync (rebuilds index from markdown files)
app_config = ConfigManager().config
await initialize_file_sync(app_config)
logger.info("Database rebuild complete")
```
### 3. Environment Configuration
```bash
# Machine environment variables
BASIC_MEMORY_CONFIG_DIR=/app/data/.basic-memory # Config read/written directly to Tigris
# memory.db stays in default location: /app/.basic-memory/memory.db (local ephemeral disk)
```
## Implementation Task List
### Phase 1: Basic Memory Changes ✅
- [x] Add `BASIC_MEMORY_CONFIG_DIR` environment variable support to `ConfigManager.__init__()`
- [x] Test config loading from custom directory
- [x] Update tests to verify custom config dir works
### Phase 2: Tigris Bucket Structure ✅
- [x] Ensure `.basic-memory/` directory exists in Tigris bucket on tenant creation
- ✅ ConfigManager auto-creates on first run, no explicit provisioning needed
- [x] Initialize `config.json` in Tigris on first tenant deployment
- ✅ ConfigManager creates config.json automatically in BASIC_MEMORY_CONFIG_DIR
- [x] Verify TigrisFS handles hidden directories correctly
- ✅ TigrisFS supports hidden directories (verified in SPEC-8)
### Phase 3: Deployment Integration ✅
- [x] Set `BASIC_MEMORY_CONFIG_DIR` environment variable in machine deployment
- ✅ Added to BasicMemoryMachineConfigBuilder in fly_schemas.py
- [x] Ensure database rebuild runs on machine startup via initialization sync
- ✅ sync_worker.py runs initialize_file_sync every 30s (already implemented)
- [x] Handle first-time tenant setup (no config exists yet)
- ✅ ConfigManager creates config.json on first initialization
- [ ] Test deployment workflow with config persistence
### Phase 4: Testing
- [x] Unit tests for config directory override
- [-] Integration test: deploy → write config → redeploy → verify config persists
- [ ] Integration test: deploy → add project → redeploy → verify project in config
- [ ] Performance test: measure db rebuild time on startup
### Phase 5: Documentation
- [ ] Document config persistence architecture
- [ ] Update deployment runbook
- [ ] Document startup sequence and timing
## How to Evaluate
### Success Criteria
1. **Config Persistence**
- [ ] config.json persists across deployments
- [ ] Projects list maintained across restarts
- [ ] No manual configuration needed after redeploy
2. **Database Rebuild**
- [ ] memory.db rebuilt on startup in < 30 seconds
- [ ] All entities indexed correctly
- [ ] Search functionality works after rebuild
3. **Performance**
- [ ] SQLite queries remain fast (local disk)
- [ ] Config reads acceptable (symlink to Tigris)
- [ ] No noticeable performance degradation
4. **Deployment Workflow**
- [ ] Clean deployments without volumes
- [ ] No new external dependencies
- [ ] No secret management needed
### Testing Procedure
1. **Config Persistence Test**
```bash
# Deploy tenant
POST /tenants → tenant_id
# Add a project
basic-memory project add "test-project" ~/test
# Verify config has project
cat /app/data/.basic-memory/config.json
# Redeploy machine
fly deploy --app basic-memory-{tenant_id}
# Verify project still exists
basic-memory project list
```
2. **Database Rebuild Test**
```bash
# Create notes
basic-memory write "Test Note" --content "..."
# Redeploy (db lost)
fly deploy --app basic-memory-{tenant_id}
# Wait for startup sync
sleep 10
# Verify note is indexed
basic-memory search "Test Note"
```
3. **Performance Benchmark**
```bash
# Time the startup sync
time basic-memory sync
# Should be < 30 seconds for typical tenant
```
## Benefits Over Alternatives
**vs. Persistent Volumes:**
- ✅ Clean deployment workflow
- ✅ No volume migration needed
- ✅ Simpler infrastructure
**vs. Turso (External Database):**
- ✅ No per-tenant token management
- ✅ No external service dependencies
- ✅ No additional costs
- ✅ Simpler architecture
**vs. SQLite on FUSE:**
- ✅ Fast local SQLite performance
- ✅ Only slow reads for small config file
- ✅ Database queries remain fast
## Implementation Assignment
**Primary Agent:** `python-developer`
- Add `BASIC_MEMORY_CONFIG_DIR` environment variable to ConfigManager
- Update deployment workflow to set environment variable
- Ensure startup sync runs correctly
**Review Agent:** `system-architect`
- Validate architecture simplicity
- Review performance implications
- Assess startup timing
## Dependencies
- **Internal:** TigrisFS must be working and stable
- **Internal:** Basic Memory sync must be reliable
- **Internal:** SPEC-8 (TigrisFS Integration) must be complete
## Open Questions
1. Should we add a health check that waits for db rebuild to complete?
2. Do we need to handle very large knowledge bases (>10k entities) differently?
3. Should we add metrics for startup sync duration?
## References
- Basic Memory sync: `basic-memory/src/basic_memory/services/initialization.py`
- Config management: `basic-memory/src/basic_memory/config.py`
- TigrisFS integration: SPEC-8
---
**Status Updates:**
- 2025-10-08: Pivoted from Turso to Tigris-based config persistence
- 2025-10-08: Phase 1 complete - BASIC_MEMORY_CONFIG_DIR support added (PR #343)
- 2025-10-08: Phases 2-3 complete - Added BASIC_MEMORY_CONFIG_DIR to machine config
- Config now persists to /app/data/.basic-memory/config.json in Tigris bucket
- Database rebuild already working via sync_worker.py
- Ready for deployment testing (Phase 4)
```
--------------------------------------------------------------------------------
/tests/schemas/test_memory_serialization.py:
--------------------------------------------------------------------------------
```python
"""Tests for datetime serialization in memory schema models."""
import json
from datetime import datetime
from basic_memory.schemas.memory import (
EntitySummary,
RelationSummary,
ObservationSummary,
MemoryMetadata,
GraphContext,
ContextResult,
)
class TestDateTimeSerialization:
"""Test datetime serialization for MCP schema compliance."""
def test_entity_summary_datetime_serialization(self):
"""Test EntitySummary serializes datetime as ISO format string."""
test_datetime = datetime(2023, 12, 8, 10, 30, 0)
entity = EntitySummary(
permalink="test/entity",
title="Test Entity",
file_path="test/entity.md",
created_at=test_datetime,
)
# Test model_dump_json() produces ISO format
json_str = entity.model_dump_json()
data = json.loads(json_str)
assert data["created_at"] == "2023-12-08T10:30:00"
assert data["type"] == "entity"
assert data["title"] == "Test Entity"
def test_relation_summary_datetime_serialization(self):
"""Test RelationSummary serializes datetime as ISO format string."""
test_datetime = datetime(2023, 12, 8, 15, 45, 30)
relation = RelationSummary(
title="Test Relation",
file_path="test/relation.md",
permalink="test/relation",
relation_type="relates_to",
from_entity="entity1",
to_entity="entity2",
created_at=test_datetime,
)
# Test model_dump_json() produces ISO format
json_str = relation.model_dump_json()
data = json.loads(json_str)
assert data["created_at"] == "2023-12-08T15:45:30"
assert data["type"] == "relation"
assert data["relation_type"] == "relates_to"
def test_observation_summary_datetime_serialization(self):
"""Test ObservationSummary serializes datetime as ISO format string."""
test_datetime = datetime(2023, 12, 8, 20, 15, 45)
observation = ObservationSummary(
title="Test Observation",
file_path="test/observation.md",
permalink="test/observation",
category="note",
content="Test content",
created_at=test_datetime,
)
# Test model_dump_json() produces ISO format
json_str = observation.model_dump_json()
data = json.loads(json_str)
assert data["created_at"] == "2023-12-08T20:15:45"
assert data["type"] == "observation"
assert data["category"] == "note"
def test_memory_metadata_datetime_serialization(self):
"""Test MemoryMetadata serializes datetime as ISO format string."""
test_datetime = datetime(2023, 12, 8, 12, 0, 0)
metadata = MemoryMetadata(
depth=2, generated_at=test_datetime, primary_count=5, related_count=3
)
# Test model_dump_json() produces ISO format
json_str = metadata.model_dump_json()
data = json.loads(json_str)
assert data["generated_at"] == "2023-12-08T12:00:00"
assert data["depth"] == 2
assert data["primary_count"] == 5
def test_context_result_with_datetime_serialization(self):
"""Test ContextResult with nested models serializes datetime correctly."""
test_datetime = datetime(2023, 12, 8, 9, 30, 15)
entity = EntitySummary(
permalink="test/entity",
title="Test Entity",
file_path="test/entity.md",
created_at=test_datetime,
)
observation = ObservationSummary(
title="Test Observation",
file_path="test/observation.md",
permalink="test/observation",
category="note",
content="Test content",
created_at=test_datetime,
)
context_result = ContextResult(
primary_result=entity, observations=[observation], related_results=[]
)
# Test model_dump_json() produces ISO format for nested models
json_str = context_result.model_dump_json()
data = json.loads(json_str)
assert data["primary_result"]["created_at"] == "2023-12-08T09:30:15"
assert data["observations"][0]["created_at"] == "2023-12-08T09:30:15"
def test_graph_context_full_serialization(self):
"""Test full GraphContext serialization with all datetime fields."""
test_datetime = datetime(2023, 12, 8, 14, 20, 10)
entity = EntitySummary(
permalink="test/entity",
title="Test Entity",
file_path="test/entity.md",
created_at=test_datetime,
)
metadata = MemoryMetadata(
depth=1, generated_at=test_datetime, primary_count=1, related_count=0
)
context_result = ContextResult(primary_result=entity, observations=[], related_results=[])
graph_context = GraphContext(
results=[context_result], metadata=metadata, page=1, page_size=10
)
# Test full serialization
json_str = graph_context.model_dump_json()
data = json.loads(json_str)
assert data["metadata"]["generated_at"] == "2023-12-08T14:20:10"
assert data["results"][0]["primary_result"]["created_at"] == "2023-12-08T14:20:10"
def test_datetime_with_microseconds_serialization(self):
"""Test datetime with microseconds serializes correctly."""
test_datetime = datetime(2023, 12, 8, 10, 30, 0, 123456)
entity = EntitySummary(
permalink="test/entity",
title="Test Entity",
file_path="test/entity.md",
created_at=test_datetime,
)
json_str = entity.model_dump_json()
data = json.loads(json_str)
# Should include microseconds in ISO format
assert data["created_at"] == "2023-12-08T10:30:00.123456"
def test_mcp_schema_validation_compatibility(self):
"""Test that serialized datetime format is compatible with MCP schema validation."""
test_datetime = datetime(2023, 12, 8, 10, 30, 0)
entity = EntitySummary(
permalink="test/entity",
title="Test Entity",
file_path="test/entity.md",
created_at=test_datetime,
)
# Serialize to JSON
json_str = entity.model_dump_json()
data = json.loads(json_str)
# Verify the format matches expected MCP "date-time" format
datetime_str = data["created_at"]
# Should be parseable back to datetime (ISO format validation)
parsed_datetime = datetime.fromisoformat(datetime_str)
assert parsed_datetime == test_datetime
# Should match the expected ISO format pattern
assert "T" in datetime_str # Contains date-time separator
assert len(datetime_str) >= 19 # At least YYYY-MM-DDTHH:MM:SS format
def test_all_models_have_datetime_serializers_configured(self):
"""Test that all memory schema models have datetime field serializers configured."""
models_to_test = [
(EntitySummary, "created_at"),
(RelationSummary, "created_at"),
(ObservationSummary, "created_at"),
(MemoryMetadata, "generated_at"),
]
for model_class, datetime_field in models_to_test:
# Create a test instance with a datetime field
test_datetime = datetime(2023, 12, 8, 10, 30, 0)
if model_class == EntitySummary:
instance = model_class(
permalink="test", title="Test", file_path="test.md", created_at=test_datetime
)
elif model_class == RelationSummary:
instance = model_class(
title="Test",
file_path="test.md",
permalink="test",
relation_type="test",
created_at=test_datetime,
)
elif model_class == ObservationSummary:
instance = model_class(
title="Test",
file_path="test.md",
permalink="test",
category="test",
content="Test",
created_at=test_datetime,
)
elif model_class == MemoryMetadata:
instance = model_class(depth=1, generated_at=test_datetime)
# Test that model_dump produces ISO format for datetime field
data = instance.model_dump()
assert data[datetime_field] == "2023-12-08T10:30:00"
```
--------------------------------------------------------------------------------
/src/basic_memory/mcp/tools/write_note.py:
--------------------------------------------------------------------------------
```python
"""Write note tool for Basic Memory MCP server."""
from typing import List, Union, Optional
from loguru import logger
from basic_memory.mcp.async_client import get_client
from basic_memory.mcp.project_context import get_active_project, add_project_metadata
from basic_memory.mcp.server import mcp
from basic_memory.mcp.tools.utils import call_put
from basic_memory.schemas import EntityResponse
from fastmcp import Context
from basic_memory.schemas.base import Entity
from basic_memory.utils import parse_tags, validate_project_path
# Define TagType as a Union that can accept either a string or a list of strings or None
TagType = Union[List[str], str, None]
# Define TagType as a Union that can accept either a string or a list of strings or None
TagType = Union[List[str], str, None]
@mcp.tool(
description="Create or update a markdown note. Returns a markdown formatted summary of the semantic content.",
)
async def write_note(
title: str,
content: str,
folder: str,
project: Optional[str] = None,
tags=None,
entity_type: str = "note",
context: Context | None = None,
) -> str:
"""Write a markdown note to the knowledge base.
Creates or updates a markdown note with semantic observations and relations.
Project Resolution:
Server resolves projects in this order: Single Project Mode → project parameter → default project.
If project unknown, use list_memory_projects() or recent_activity() first.
The content can include semantic observations and relations using markdown syntax:
Observations format:
`- [category] Observation text #tag1 #tag2 (optional context)`
Examples:
`- [design] Files are the source of truth #architecture (All state comes from files)`
`- [tech] Using SQLite for storage #implementation`
`- [note] Need to add error handling #todo`
Relations format:
- Explicit: `- relation_type [[Entity]] (optional context)`
- Inline: Any `[[Entity]]` reference creates a relation
Examples:
`- depends_on [[Content Parser]] (Need for semantic extraction)`
`- implements [[Search Spec]] (Initial implementation)`
`- This feature extends [[Base Design]] and uses [[Core Utils]]`
Args:
title: The title of the note
content: Markdown content for the note, can include observations and relations
folder: Folder path relative to project root where the file should be saved.
Use forward slashes (/) as separators. Use "/" or "" to write to project root.
Examples: "notes", "projects/2025", "research/ml", "/" (root)
project: Project name to write to. Optional - server will resolve using the
hierarchy above. If unknown, use list_memory_projects() to discover
available projects.
tags: Tags to categorize the note. Can be a list of strings, a comma-separated string, or None.
Note: If passing from external MCP clients, use a string format (e.g. "tag1,tag2,tag3")
entity_type: Type of entity to create. Defaults to "note". Can be "guide", "report", "config", etc.
context: Optional FastMCP context for performance caching.
Returns:
A markdown formatted summary of the semantic content, including:
- Creation/update status with project name
- File path and checksum
- Observation counts by category
- Relation counts (resolved/unresolved)
- Tags if present
- Session tracking metadata for project awareness
Examples:
# Assistant flow when project is unknown
# 1. list_memory_projects() -> Ask user which project
# 2. User: "Use my-research"
# 3. write_note(...) and remember "my-research" for session
# Create a simple note
write_note(
project="my-research",
title="Meeting Notes",
folder="meetings",
content="# Weekly Standup\\n\\n- [decision] Use SQLite for storage #tech"
)
# Create a note with tags and entity type
write_note(
project="work-project",
title="API Design",
folder="specs",
content="# REST API Specification\\n\\n- implements [[Authentication]]",
tags=["api", "design"],
entity_type="guide"
)
# Update existing note (same title/folder)
write_note(
project="my-research",
title="Meeting Notes",
folder="meetings",
content="# Weekly Standup\\n\\n- [decision] Use PostgreSQL instead #tech"
)
Raises:
HTTPError: If project doesn't exist or is inaccessible
SecurityError: If folder path attempts path traversal
"""
async with get_client() as client:
logger.info(
f"MCP tool call tool=write_note project={project} folder={folder}, title={title}, tags={tags}"
)
# Get and validate the project (supports optional project parameter)
active_project = await get_active_project(client, project, context)
# Normalize "/" to empty string for root folder (must happen before validation)
if folder == "/":
folder = ""
# Validate folder path to prevent path traversal attacks
project_path = active_project.home
if folder and not validate_project_path(folder, project_path):
logger.warning(
"Attempted path traversal attack blocked",
folder=folder,
project=active_project.name,
)
return f"# Error\n\nFolder path '{folder}' is not allowed - paths must stay within project boundaries"
# Process tags using the helper function
tag_list = parse_tags(tags)
# Create the entity request
metadata = {"tags": tag_list} if tag_list else None
entity = Entity(
title=title,
folder=folder,
entity_type=entity_type,
content_type="text/markdown",
content=content,
entity_metadata=metadata,
)
project_url = active_project.permalink
# Create or update via knowledge API
logger.debug(f"Creating entity via API permalink={entity.permalink}")
url = f"{project_url}/knowledge/entities/{entity.permalink}"
response = await call_put(client, url, json=entity.model_dump())
result = EntityResponse.model_validate(response.json())
# Format semantic summary based on status code
action = "Created" if response.status_code == 201 else "Updated"
summary = [
f"# {action} note",
f"project: {active_project.name}",
f"file_path: {result.file_path}",
f"permalink: {result.permalink}",
f"checksum: {result.checksum[:8] if result.checksum else 'unknown'}",
]
# Count observations by category
categories = {}
if result.observations:
for obs in result.observations:
categories[obs.category] = categories.get(obs.category, 0) + 1
summary.append("\n## Observations")
for category, count in sorted(categories.items()):
summary.append(f"- {category}: {count}")
# Count resolved/unresolved relations
unresolved = 0
resolved = 0
if result.relations:
unresolved = sum(1 for r in result.relations if not r.to_id)
resolved = len(result.relations) - unresolved
summary.append("\n## Relations")
summary.append(f"- Resolved: {resolved}")
if unresolved:
summary.append(f"- Unresolved: {unresolved}")
summary.append(
"\nNote: Unresolved relations point to entities that don't exist yet."
)
summary.append(
"They will be automatically resolved when target entities are created or during sync operations."
)
if tag_list:
summary.append(f"\n## Tags\n- {', '.join(tag_list)}")
# Log the response with structured data
logger.info(
f"MCP tool response: tool=write_note project={active_project.name} action={action} permalink={result.permalink} observations_count={len(result.observations)} relations_count={len(result.relations)} resolved_relations={resolved} unresolved_relations={unresolved} status_code={response.status_code}"
)
result = "\n".join(summary)
return add_project_metadata(result, active_project.name)
```
--------------------------------------------------------------------------------
/tests/markdown/test_entity_parser.py:
--------------------------------------------------------------------------------
```python
"""Tests for entity markdown parsing."""
from datetime import datetime
from pathlib import Path
from textwrap import dedent
import pytest
from basic_memory.markdown.schemas import EntityMarkdown, EntityFrontmatter, Relation
from basic_memory.markdown.entity_parser import parse
@pytest.fixture
def valid_entity_content():
"""A complete, valid entity file with all features."""
return dedent("""
---
title: Auth Service
type: component
permalink: auth_service
created: 2024-12-21T14:00:00Z
modified: 2024-12-21T14:00:00Z
tags: authentication, security, core
---
Core authentication service that handles user authentication.
some [[Random Link]]
another [[Random Link with Title|Titled Link]]
## Observations
- [design] Stateless authentication #security #architecture (JWT based)
- [feature] Mobile client support #mobile #oauth (Required for App Store)
- [tech] Caching layer #performance (Redis implementation)
## Relations
- implements [[OAuth Implementation]] (Core auth flows)
- uses [[Redis Cache]] (Token caching)
- specified_by [[Auth API Spec]] (OpenAPI spec)
""")
@pytest.mark.asyncio
async def test_parse_complete_file(project_config, entity_parser, valid_entity_content):
"""Test parsing a complete entity file with all features."""
test_file = project_config.home / "test_entity.md"
test_file.write_text(valid_entity_content)
entity = await entity_parser.parse_file(test_file)
# Verify entity structure
assert isinstance(entity, EntityMarkdown)
assert isinstance(entity.frontmatter, EntityFrontmatter)
assert isinstance(entity.content, str)
# Check frontmatter
assert entity.frontmatter.title == "Auth Service"
assert entity.frontmatter.type == "component"
assert entity.frontmatter.permalink == "auth_service"
assert set(entity.frontmatter.tags) == {"authentication", "security", "core"}
# Check content
assert "Core authentication service that handles user authentication." in entity.content
# Check observations
assert len(entity.observations) == 3
obs = entity.observations[0]
assert obs.category == "design"
assert obs.content == "Stateless authentication #security #architecture"
assert set(obs.tags or []) == {"security", "architecture"}
assert obs.context == "JWT based"
# Check relations
assert len(entity.relations) == 5
assert (
Relation(type="implements", target="OAuth Implementation", context="Core auth flows")
in entity.relations
), "missing [[OAuth Implementation]]"
assert (
Relation(type="uses", target="Redis Cache", context="Token caching") in entity.relations
), "missing [[Redis Cache]]"
assert (
Relation(type="specified_by", target="Auth API Spec", context="OpenAPI spec")
in entity.relations
), "missing [[Auth API Spec]]"
# inline links in content
assert Relation(type="links to", target="Random Link", context=None) in entity.relations, (
"missing [[Random Link]]"
)
assert (
Relation(type="links to", target="Random Link with Title|Titled Link", context=None)
in entity.relations
), "missing [[Random Link with Title|Titled Link]]"
@pytest.mark.asyncio
async def test_parse_minimal_file(project_config, entity_parser):
"""Test parsing a minimal valid entity file."""
content = dedent("""
---
type: component
tags: []
---
# Minimal Entity
## Observations
- [note] Basic observation #test
## Relations
- references [[Other Entity]]
""")
test_file = project_config.home / "minimal.md"
test_file.write_text(content)
entity = await entity_parser.parse_file(test_file)
assert entity.frontmatter.type == "component"
assert entity.frontmatter.permalink is None
assert len(entity.observations) == 1
assert len(entity.relations) == 1
assert entity.created is not None
assert entity.modified is not None
@pytest.mark.asyncio
async def test_error_handling(project_config, entity_parser):
"""Test error handling."""
# Missing file
with pytest.raises(FileNotFoundError):
await entity_parser.parse_file(Path("nonexistent.md"))
# Invalid file encoding
test_file = project_config.home / "binary.md"
with open(test_file, "wb") as f:
f.write(b"\x80\x81") # Invalid UTF-8
with pytest.raises(UnicodeDecodeError):
await entity_parser.parse_file(test_file)
@pytest.mark.asyncio
async def test_parse_file_without_section_headers(project_config, entity_parser):
"""Test parsing a minimal valid entity file."""
content = dedent("""
---
type: component
permalink: minimal_entity
status: draft
tags: []
---
# Minimal Entity
some text
some [[Random Link]]
- [note] Basic observation #test
- references [[Other Entity]]
""")
test_file = project_config.home / "minimal.md"
test_file.write_text(content)
entity = await entity_parser.parse_file(test_file)
assert entity.frontmatter.type == "component"
assert entity.frontmatter.permalink == "minimal_entity"
assert "some text\nsome [[Random Link]]" in entity.content
assert len(entity.observations) == 1
assert entity.observations[0].category == "note"
assert entity.observations[0].content == "Basic observation #test"
assert entity.observations[0].tags == ["test"]
assert len(entity.relations) == 2
assert entity.relations[0].type == "links to"
assert entity.relations[0].target == "Random Link"
assert entity.relations[1].type == "references"
assert entity.relations[1].target == "Other Entity"
def test_parse_date_formats(entity_parser):
"""Test date parsing functionality."""
# Valid formats
assert entity_parser.parse_date("2024-01-15") is not None
assert entity_parser.parse_date("Jan 15, 2024") is not None
assert entity_parser.parse_date("2024-01-15 10:00 AM") is not None
assert entity_parser.parse_date(datetime.now()) is not None
# Invalid formats
assert entity_parser.parse_date(None) is None
assert entity_parser.parse_date(123) is None # Non-string/datetime
assert entity_parser.parse_date("not a date") is None # Unparseable string
assert entity_parser.parse_date("") is None # Empty string
# Test dateparser error handling
assert entity_parser.parse_date("25:00:00") is None # Invalid time
def test_parse_empty_content():
"""Test parsing empty or minimal content."""
result = parse("")
assert result.content == ""
assert len(result.observations) == 0
assert len(result.relations) == 0
result = parse("# Just a title")
assert result.content == "# Just a title"
assert len(result.observations) == 0
assert len(result.relations) == 0
@pytest.mark.asyncio
async def test_parse_file_with_absolute_path(project_config, entity_parser):
"""Test parsing a file with an absolute path."""
content = dedent("""
---
type: component
permalink: absolute_path_test
---
# Absolute Path Test
A file with an absolute path.
""")
# Create a test file in the project directory
test_file = project_config.home / "absolute_path_test.md"
test_file.write_text(content)
# Get the absolute path to the test file
absolute_path = test_file.resolve()
# Parse the file using the absolute path
entity = await entity_parser.parse_file(absolute_path)
# Verify the file was parsed correctly
assert entity.frontmatter.permalink == "absolute_path_test"
assert "Absolute Path Test" in entity.content
assert entity.created is not None
assert entity.modified is not None
# @pytest.mark.asyncio
# async def test_parse_file_invalid_yaml(test_config, entity_parser):
# """Test parsing file with invalid YAML frontmatter."""
# content = dedent("""
# ---
# invalid: [yaml: ]syntax]
# ---
#
# # Invalid YAML Frontmatter
# """)
#
# test_file = test_config.home / "invalid_yaml.md"
# test_file.write_text(content)
#
# # Should handle invalid YAML gracefully
# entity = await entity_parser.parse_file(test_file)
# assert entity.frontmatter.title == "invalid_yaml.md"
# assert entity.frontmatter.type == "note"
# assert entity.content.strip() == "# Invalid YAML Frontmatter"
```
--------------------------------------------------------------------------------
/tests/markdown/test_entity_parser_error_handling.py:
--------------------------------------------------------------------------------
```python
"""Tests for entity parser error handling (issues #184 and #185)."""
import pytest
from textwrap import dedent
from basic_memory.markdown.entity_parser import EntityParser
@pytest.mark.asyncio
async def test_parse_file_with_malformed_yaml_frontmatter(tmp_path):
"""Test that files with malformed YAML frontmatter are parsed gracefully (issue #185).
This reproduces the production error where block sequence entries cause YAML parsing to fail.
The parser should handle the error gracefully and treat the file as plain markdown.
"""
# Create a file with malformed YAML frontmatter
test_file = tmp_path / "malformed.md"
content = dedent(
"""
---
title: Group Chat Texts
tags:
- family # Line 5, column 7 - this syntax can fail in certain YAML contexts
- messages
type: note
---
# Group Chat Texts
Content here
"""
).strip()
test_file.write_text(content)
# Parse the file - should not raise YAMLError
parser = EntityParser(tmp_path)
result = await parser.parse_file(test_file)
# Should successfully parse, treating as plain markdown if YAML fails
assert result is not None
# If YAML parsing succeeded, verify expected values
# If it failed, it should have defaults
assert result.frontmatter.title is not None
assert result.frontmatter.type is not None
@pytest.mark.asyncio
async def test_parse_file_with_completely_invalid_yaml(tmp_path):
"""Test that files with completely invalid YAML are handled gracefully (issue #185).
This tests the extreme case where YAML parsing completely fails.
"""
# Create a file with completely broken YAML
test_file = tmp_path / "broken_yaml.md"
content = dedent(
"""
---
title: Invalid YAML
this is: [not, valid, yaml
missing: closing bracket
---
# Content
This file has broken YAML frontmatter.
"""
).strip()
test_file.write_text(content)
# Parse the file - should not raise exception
parser = EntityParser(tmp_path)
result = await parser.parse_file(test_file)
# Should successfully parse with defaults
assert result is not None
assert result.frontmatter.title == "broken_yaml" # Default from filename
assert result.frontmatter.type == "note" # Default type
# Content should include the whole file since frontmatter parsing failed
assert "# Content" in result.content
@pytest.mark.asyncio
async def test_parse_file_without_entity_type(tmp_path):
"""Test that files without entity_type get a default value (issue #184).
This reproduces the NOT NULL constraint error where entity_type was missing.
"""
# Create a file without entity_type in frontmatter
test_file = tmp_path / "no_type.md"
content = dedent(
"""
---
title: The Invisible Weight of Mental Habits
---
# The Invisible Weight of Mental Habits
An article about mental habits.
"""
).strip()
test_file.write_text(content)
# Parse the file
parser = EntityParser(tmp_path)
result = await parser.parse_file(test_file)
# Should have default entity_type
assert result is not None
assert result.frontmatter.type == "note" # Default type applied
assert result.frontmatter.title == "The Invisible Weight of Mental Habits"
@pytest.mark.asyncio
async def test_parse_file_with_empty_frontmatter(tmp_path):
"""Test that files with empty frontmatter get defaults (issue #184)."""
# Create a file with empty frontmatter
test_file = tmp_path / "empty_frontmatter.md"
content = dedent(
"""
---
---
# Content
This file has empty frontmatter.
"""
).strip()
test_file.write_text(content)
# Parse the file
parser = EntityParser(tmp_path)
result = await parser.parse_file(test_file)
# Should have defaults
assert result is not None
assert result.frontmatter.type == "note" # Default type
assert result.frontmatter.title == "empty_frontmatter" # Default from filename
@pytest.mark.asyncio
async def test_parse_file_without_frontmatter(tmp_path):
"""Test that files without any frontmatter get defaults (issue #184)."""
# Create a file with no frontmatter at all
test_file = tmp_path / "no_frontmatter.md"
content = dedent(
"""
# Just Content
This file has no frontmatter at all.
"""
).strip()
test_file.write_text(content)
# Parse the file
parser = EntityParser(tmp_path)
result = await parser.parse_file(test_file)
# Should have defaults
assert result is not None
assert result.frontmatter.type == "note" # Default type
assert result.frontmatter.title == "no_frontmatter" # Default from filename
@pytest.mark.asyncio
async def test_parse_file_with_null_entity_type(tmp_path):
"""Test that files with explicit null entity_type get default (issue #184)."""
# Create a file with null/None entity_type
test_file = tmp_path / "null_type.md"
content = dedent(
"""
---
title: Test File
type: null
---
# Content
"""
).strip()
test_file.write_text(content)
# Parse the file
parser = EntityParser(tmp_path)
result = await parser.parse_file(test_file)
# Should have default type even when explicitly set to null
assert result is not None
assert result.frontmatter.type == "note" # Default type applied
assert result.frontmatter.title == "Test File"
@pytest.mark.asyncio
async def test_parse_file_with_null_title(tmp_path):
"""Test that files with explicit null title get default from filename (issue #387)."""
# Create a file with null title
test_file = tmp_path / "null_title.md"
content = dedent(
"""
---
title: null
type: note
---
# Content
"""
).strip()
test_file.write_text(content)
# Parse the file
parser = EntityParser(tmp_path)
result = await parser.parse_file(test_file)
# Should have default title from filename even when explicitly set to null
assert result is not None
assert result.frontmatter.title == "null_title" # Default from filename
assert result.frontmatter.type == "note"
@pytest.mark.asyncio
async def test_parse_file_with_empty_title(tmp_path):
"""Test that files with empty title get default from filename (issue #387)."""
# Create a file with empty title
test_file = tmp_path / "empty_title.md"
content = dedent(
"""
---
title:
type: note
---
# Content
"""
).strip()
test_file.write_text(content)
# Parse the file
parser = EntityParser(tmp_path)
result = await parser.parse_file(test_file)
# Should have default title from filename when title is empty
assert result is not None
assert result.frontmatter.title == "empty_title" # Default from filename
assert result.frontmatter.type == "note"
@pytest.mark.asyncio
async def test_parse_file_with_string_none_title(tmp_path):
"""Test that files with string 'None' title get default from filename (issue #387)."""
# Create a file with string "None" as title (common in templates)
test_file = tmp_path / "template_file.md"
content = dedent(
"""
---
title: "None"
type: note
---
# Content
"""
).strip()
test_file.write_text(content)
# Parse the file
parser = EntityParser(tmp_path)
result = await parser.parse_file(test_file)
# Should have default title from filename when title is string "None"
assert result is not None
assert result.frontmatter.title == "template_file" # Default from filename
assert result.frontmatter.type == "note"
@pytest.mark.asyncio
async def test_parse_valid_file_still_works(tmp_path):
"""Test that valid files with proper frontmatter still parse correctly."""
# Create a valid file
test_file = tmp_path / "valid.md"
content = dedent(
"""
---
title: Valid File
type: knowledge
tags:
- test
- valid
---
# Valid File
This is a properly formatted file.
"""
).strip()
test_file.write_text(content)
# Parse the file
parser = EntityParser(tmp_path)
result = await parser.parse_file(test_file)
# Should parse correctly with all values
assert result is not None
assert result.frontmatter.title == "Valid File"
assert result.frontmatter.type == "knowledge"
assert result.frontmatter.tags == ["test", "valid"]
```
--------------------------------------------------------------------------------
/v15-docs/bug-fixes.md:
--------------------------------------------------------------------------------
```markdown
# Bug Fixes and Improvements
**Status**: Bug Fixes
**Version**: v0.15.0
**Impact**: Stability, reliability, platform compatibility
## Overview
v0.15.0 includes 13+ bug fixes addressing entity conflicts, URL handling, file operations, and platform compatibility. These fixes improve stability and eliminate edge cases that could cause errors.
## Key Fixes
### 1. Entity Upsert Conflict Resolution (#328)
**Problem:**
Database-level conflicts when upserting entities with same title/folder caused crashes.
**Fix:**
Simplified entity upsert to use database-level conflict resolution with `ON CONFLICT` clause.
**Before:**
```python
# Manual conflict checking (error-prone)
existing = await get_entity_by_title(title, folder)
if existing:
await update_entity(existing.id, data)
else:
await insert_entity(data)
# → Could fail if concurrent insert
```
**After:**
```python
# Database handles conflict
await db.execute("""
INSERT INTO entities (title, folder, content)
VALUES (?, ?, ?)
ON CONFLICT (title, folder) DO UPDATE SET content = excluded.content
""")
# → Always works, even with concurrent access
```
**Benefit:** Eliminates race conditions, more reliable writes
### 2. memory:// URL Underscore Normalization (#329)
**Problem:**
Underscores in memory:// URLs weren't normalized to hyphens, causing lookups to fail.
**Fix:**
Normalize underscores to hyphens when resolving memory:// URLs.
**Before:**
```python
# URL with underscores
url = "memory://my_note"
entity = await resolve_url(url)
# → Not found! (permalink is "my-note")
```
**After:**
```python
# Automatic normalization
url = "memory://my_note"
entity = await resolve_url(url)
# → Found! (my_note → my-note)
```
**Examples:**
- `memory://my_note` → finds entity with permalink `my-note`
- `memory://user_guide` → finds entity with permalink `user-guide`
- `memory://api_docs` → finds entity with permalink `api-docs`
**Benefit:** More forgiving URL matching, fewer lookup failures
### 3. .gitignore File Filtering (#287, #285)
**Problem:**
Sync process didn't respect .gitignore patterns, indexing sensitive files and build artifacts.
**Fix:**
Integrated .gitignore support - files matching patterns are automatically skipped during sync.
**Before:**
```bash
bm sync
# → Indexed .env files
# → Indexed node_modules/
# → Indexed build artifacts
```
**After:**
```bash
# .gitignore
.env
node_modules/
dist/
bm sync
# → Skipped .env (gitignored)
# → Skipped node_modules/ (gitignored)
# → Skipped dist/ (gitignored)
```
**Benefit:** Better security, cleaner knowledge base, faster sync
**See:** `gitignore-integration.md` for full details
### 4. move_note File Extension Handling (#281)
**Problem:**
`move_note` failed when destination path included or omitted `.md` extension inconsistently.
**Fix:**
Automatically handle file extensions - works with or without `.md`.
**Before:**
```python
# Had to match exactly
await move_note("My Note", "new-folder/my-note.md") # ✓
await move_note("My Note", "new-folder/my-note") # ✗ Failed
```
**After:**
```python
# Both work
await move_note("My Note", "new-folder/my-note.md") # ✓ Works
await move_note("My Note", "new-folder/my-note") # ✓ Works (adds .md)
```
**Automatic handling:**
- Input without `.md` → adds `.md`
- Input with `.md` → uses as-is
- Always creates valid markdown file
**Benefit:** More forgiving API, fewer errors
### 5. .env File Loading Removed (#330)
**Problem:**
Automatic .env file loading created security vulnerability - could load untrusted files.
**Fix:**
Removed automatic .env loading. Environment variables must be set explicitly.
**Impact:** Breaking change for users relying on .env files
**Migration:**
```bash
# Before: Used .env file
# .env
BASIC_MEMORY_LOG_LEVEL=DEBUG
# After: Use explicit export
export BASIC_MEMORY_LOG_LEVEL=DEBUG
# Or use direnv
# .envrc (git-ignored)
export BASIC_MEMORY_LOG_LEVEL=DEBUG
```
**Benefit:** Better security, explicit configuration
**See:** `env-file-removal.md` for migration guide
### 6. Python 3.13 Compatibility
**Problem:**
Code not tested with Python 3.13, potential compatibility issues.
**Fix:**
- Added Python 3.13 to CI test matrix
- Fixed deprecation warnings
- Verified all dependencies compatible
- Updated type hints for 3.13
**Before:**
```yaml
# .github/workflows/test.yml
python-version: ["3.10", "3.11", "3.12"]
```
**After:**
```yaml
# .github/workflows/test.yml
python-version: ["3.10", "3.11", "3.12", "3.13"]
```
**Benefit:** Full Python 3.13 support, future-proof
## Additional Fixes
### Minimum Timeframe Enforcement (#318)
**Problem:**
`recent_activity` with very short timeframes caused timezone issues.
**Fix:**
Enforce minimum 1-day timeframe to handle timezone edge cases.
```python
# Before: Could use any timeframe
await recent_activity(timeframe="1h") # Timezone issues
# After: Minimum 1 day
await recent_activity(timeframe="1h") # → Auto-adjusted to "1d"
```
### Permalink Collision Prevention
**Problem:**
Strict link resolution could create duplicate permalinks.
**Fix:**
Enhanced permalink uniqueness checking to prevent collisions.
### DateTime JSON Schema (#312)
**Problem:**
MCP validation failed on DateTime fields - missing proper JSON schema format.
**Fix:**
Added proper `format: "date-time"` annotations for MCP compatibility.
```python
# Before: No format
created_at: datetime
# After: With format
created_at: datetime = Field(json_schema_extra={"format": "date-time"})
```
## Testing Coverage
### Automated Tests
All fixes include comprehensive tests:
```bash
# Entity upsert conflict
tests/services/test_entity_upsert.py
# URL normalization
tests/mcp/test_build_context_validation.py
# File extension handling
tests/mcp/test_tool_move_note.py
# gitignore integration
tests/sync/test_gitignore.py
```
### Manual Testing Checklist
- [x] Entity upsert with concurrent access
- [x] memory:// URLs with underscores
- [x] .gitignore file filtering
- [x] move_note with/without .md extension
- [x] .env file not auto-loaded
- [x] Python 3.13 compatibility
## Migration Guide
### If You're Affected by These Bugs
**Entity Conflicts:**
- No action needed - automatically fixed
**memory:// URLs:**
- No action needed - URLs now more forgiving
- Previously broken URLs should work now
**.gitignore Integration:**
- Create `.gitignore` if you don't have one
- Add patterns for files to skip
**move_note:**
- No action needed - both formats now work
- Can simplify code that manually added `.md`
**.env Files:**
- See `env-file-removal.md` for full migration
- Use explicit environment variables or direnv
**Python 3.13:**
- Upgrade if desired: `pip install --upgrade basic-memory`
- Or stay on 3.10-3.12 (still supported)
## Verification
### Check Entity Upserts Work
```python
# Should not conflict
await write_note("Test", "Content", "folder")
await write_note("Test", "Updated", "folder") # Updates, not errors
```
### Check URL Normalization
```python
# Both should work
context1 = await build_context("memory://my_note")
context2 = await build_context("memory://my-note")
# Both resolve to same entity
```
### Check .gitignore Respected
```bash
echo ".env" >> .gitignore
echo "SECRET=test" > .env
bm sync
# .env should be skipped
```
### Check move_note Extension
```python
# Both work
await move_note("Note", "folder/note.md") # ✓
await move_note("Note", "folder/note") # ✓
```
### Check .env Not Loaded
```bash
echo "BASIC_MEMORY_LOG_LEVEL=DEBUG" > .env
bm sync
# LOG_LEVEL not set (not auto-loaded)
export BASIC_MEMORY_LOG_LEVEL=DEBUG
bm sync
# LOG_LEVEL now set (explicit)
```
### Check Python 3.13
```bash
python3.13 --version
python3.13 -m pip install basic-memory
python3.13 -m basic_memory --version
```
## Known Issues (Fixed)
### Previously Reported, Now Fixed
1. ✅ Entity upsert conflicts (#328)
2. ✅ memory:// URL underscore handling (#329)
3. ✅ .gitignore not respected (#287, #285)
4. ✅ move_note extension issues (#281)
5. ✅ .env security vulnerability (#330)
6. ✅ Minimum timeframe issues (#318)
7. ✅ DateTime JSON schema (#312)
8. ✅ Permalink collisions
9. ✅ Python 3.13 compatibility
## Upgrade Notes
### From v0.14.x
All bug fixes apply automatically:
```bash
# Upgrade
pip install --upgrade basic-memory
# Restart MCP server
# Bug fixes active immediately
```
### Breaking Changes
Only one breaking change:
- ✅ .env file auto-loading removed (#330)
- See `env-file-removal.md` for migration
All other fixes are backward compatible.
## Reporting New Issues
If you encounter issues:
1. Check this list to see if already fixed
2. Verify you're on v0.15.0+: `bm --version`
3. Report at: https://github.com/basicmachines-co/basic-memory/issues
## See Also
- `gitignore-integration.md` - .gitignore support details
- `env-file-removal.md` - .env migration guide
- GitHub issues for each fix
- v0.15.0 changelog
```
--------------------------------------------------------------------------------
/src/basic_memory/schemas/memory.py:
--------------------------------------------------------------------------------
```python
"""Schemas for memory context."""
from datetime import datetime
from typing import List, Optional, Annotated, Sequence, Literal, Union, Dict
from annotated_types import MinLen, MaxLen
from pydantic import BaseModel, Field, BeforeValidator, TypeAdapter, field_serializer
from basic_memory.schemas.search import SearchItemType
def validate_memory_url_path(path: str) -> bool:
"""Validate that a memory URL path is well-formed.
Args:
path: The path part of a memory URL (without memory:// prefix)
Returns:
True if the path is valid, False otherwise
Examples:
>>> validate_memory_url_path("specs/search")
True
>>> validate_memory_url_path("memory//test") # Double slash
False
>>> validate_memory_url_path("invalid://test") # Contains protocol
False
"""
# Empty paths are not valid
if not path or not path.strip():
return False
# Check for invalid protocol schemes within the path first (more specific)
if "://" in path:
return False
# Check for double slashes (except at the beginning for absolute paths)
if "//" in path:
return False
# Check for invalid characters (excluding * which is used for pattern matching)
invalid_chars = {"<", ">", '"', "|", "?"}
if any(char in path for char in invalid_chars):
return False
return True
def normalize_memory_url(url: str | None) -> str:
"""Normalize a MemoryUrl string with validation.
Args:
url: A path like "specs/search" or "memory://specs/search"
Returns:
Normalized URL starting with memory://
Raises:
ValueError: If the URL path is malformed
Examples:
>>> normalize_memory_url("specs/search")
'memory://specs/search'
>>> normalize_memory_url("memory://specs/search")
'memory://specs/search'
>>> normalize_memory_url("memory//test")
Traceback (most recent call last):
...
ValueError: Invalid memory URL path: 'memory//test' contains double slashes
"""
if not url:
raise ValueError("Memory URL cannot be empty")
# Strip whitespace for consistency
url = url.strip()
if not url:
raise ValueError("Memory URL cannot be empty or whitespace")
clean_path = url.removeprefix("memory://")
# Validate the extracted path
if not validate_memory_url_path(clean_path):
# Provide specific error messages for common issues
if "://" in clean_path:
raise ValueError(f"Invalid memory URL path: '{clean_path}' contains protocol scheme")
elif "//" in clean_path:
raise ValueError(f"Invalid memory URL path: '{clean_path}' contains double slashes")
else:
raise ValueError(f"Invalid memory URL path: '{clean_path}' contains invalid characters")
return f"memory://{clean_path}"
MemoryUrl = Annotated[
str,
BeforeValidator(str.strip), # Clean whitespace
BeforeValidator(normalize_memory_url), # Validate and normalize the URL
MinLen(1),
MaxLen(2028),
]
memory_url = TypeAdapter(MemoryUrl)
def memory_url_path(url: memory_url) -> str: # pyright: ignore
"""
Returns the uri for a url value by removing the prefix "memory://" from a given MemoryUrl.
This function processes a given MemoryUrl by removing the "memory://"
prefix and returns the resulting string. If the provided url does not
begin with "memory://", the function will simply return the input url
unchanged.
:param url: A MemoryUrl object representing the URL with a "memory://" prefix.
:type url: MemoryUrl
:return: A string representing the URL with the "memory://" prefix removed.
:rtype: str
"""
return url.removeprefix("memory://")
class EntitySummary(BaseModel):
"""Simplified entity representation."""
type: Literal["entity"] = "entity"
permalink: Optional[str]
title: str
content: Optional[str] = None
file_path: str
created_at: Annotated[
datetime, Field(json_schema_extra={"type": "string", "format": "date-time"})
]
@field_serializer("created_at")
def serialize_created_at(self, dt: datetime) -> str:
return dt.isoformat()
class RelationSummary(BaseModel):
"""Simplified relation representation."""
type: Literal["relation"] = "relation"
title: str
file_path: str
permalink: str
relation_type: str
from_entity: Optional[str] = None
to_entity: Optional[str] = None
created_at: Annotated[
datetime, Field(json_schema_extra={"type": "string", "format": "date-time"})
]
@field_serializer("created_at")
def serialize_created_at(self, dt: datetime) -> str:
return dt.isoformat()
class ObservationSummary(BaseModel):
"""Simplified observation representation."""
type: Literal["observation"] = "observation"
title: str
file_path: str
permalink: str
category: str
content: str
created_at: Annotated[
datetime, Field(json_schema_extra={"type": "string", "format": "date-time"})
]
@field_serializer("created_at")
def serialize_created_at(self, dt: datetime) -> str:
return dt.isoformat()
class MemoryMetadata(BaseModel):
"""Simplified response metadata."""
uri: Optional[str] = None
types: Optional[List[SearchItemType]] = None
depth: int
timeframe: Optional[str] = None
generated_at: Annotated[
datetime, Field(json_schema_extra={"type": "string", "format": "date-time"})
]
primary_count: Optional[int] = None # Changed field name
related_count: Optional[int] = None # Changed field name
total_results: Optional[int] = None # For backward compatibility
total_relations: Optional[int] = None
total_observations: Optional[int] = None
@field_serializer("generated_at")
def serialize_generated_at(self, dt: datetime) -> str:
return dt.isoformat()
class ContextResult(BaseModel):
"""Context result containing a primary item with its observations and related items."""
primary_result: Annotated[
Union[EntitySummary, RelationSummary, ObservationSummary],
Field(discriminator="type", description="Primary item"),
]
observations: Sequence[ObservationSummary] = Field(
description="Observations belonging to this entity", default_factory=list
)
related_results: Sequence[
Annotated[
Union[EntitySummary, RelationSummary, ObservationSummary], Field(discriminator="type")
]
] = Field(description="Related items", default_factory=list)
class GraphContext(BaseModel):
"""Complete context response."""
# hierarchical results
results: Sequence[ContextResult] = Field(
description="Hierarchical results with related items nested", default_factory=list
)
# Context metadata
metadata: MemoryMetadata
page: Optional[int] = None
page_size: Optional[int] = None
class ActivityStats(BaseModel):
"""Statistics about activity across all projects."""
total_projects: int
active_projects: int = Field(description="Projects with activity in timeframe")
most_active_project: Optional[str] = None
total_items: int = Field(description="Total items across all projects")
total_entities: int = 0
total_relations: int = 0
total_observations: int = 0
class ProjectActivity(BaseModel):
"""Activity summary for a single project."""
project_name: str
project_path: str
activity: GraphContext = Field(description="The actual activity data for this project")
item_count: int = Field(description="Total items in this project's activity")
last_activity: Optional[
Annotated[datetime, Field(json_schema_extra={"type": "string", "format": "date-time"})]
] = Field(default=None, description="Most recent activity timestamp")
active_folders: List[str] = Field(default_factory=list, description="Most active folders")
@field_serializer("last_activity")
def serialize_last_activity(self, dt: Optional[datetime]) -> Optional[str]:
return dt.isoformat() if dt else None
class ProjectActivitySummary(BaseModel):
"""Summary of activity across all projects."""
projects: Dict[str, ProjectActivity] = Field(
description="Activity per project, keyed by project name"
)
summary: ActivityStats
timeframe: str = Field(description="The timeframe used for the query")
generated_at: Annotated[
datetime, Field(json_schema_extra={"type": "string", "format": "date-time"})
]
guidance: Optional[str] = Field(
default=None, description="Assistant guidance for project selection and session management"
)
@field_serializer("generated_at")
def serialize_generated_at(self, dt: datetime) -> str:
return dt.isoformat()
```
--------------------------------------------------------------------------------
/v15-docs/env-file-removal.md:
--------------------------------------------------------------------------------
```markdown
# .env File Loading Removed
**Status**: Security Fix
**PR**: #330
**Impact**: Breaking change for users relying on .env files
## What Changed
v0.15.0 **removes automatic .env file loading** from Basic Memory configuration. Environment variables must now be set explicitly through your shell, systemd, Docker, or other standard mechanisms.
### Before v0.15.0
```python
# BasicMemoryConfig automatically loaded .env files
from dotenv import load_dotenv
load_dotenv() # ← Automatically loaded .env
config = BasicMemoryConfig() # ← Used .env values
```
### v0.15.0 and Later
```python
# No automatic .env loading
config = BasicMemoryConfig() # ← Only uses actual environment variables
```
## Why This Changed
### Security Vulnerability
Automatic .env loading created security risks:
1. **Unintended file loading:**
- Could load `.env` from current directory
- Could load `.env` from parent directories
- Risk of loading untrusted `.env` files
2. **Credential leakage:**
- `.env` files might contain secrets
- Easy to accidentally commit to git
- Hard to audit what's loaded
3. **Configuration confusion:**
- Unclear which values come from `.env` vs environment
- Debugging difficult with implicit loading
### Best Practice
Modern deployment practices use explicit environment configuration:
- Shell exports
- systemd Environment directives
- Docker environment variables
- Kubernetes ConfigMaps/Secrets
- CI/CD variable injection
## Migration Guide
### If You Used .env Files
**Step 1: Check if you have a .env file**
```bash
ls -la .env
ls -la ~/.basic-memory/.env
```
**Step 2: Review .env contents**
```bash
cat .env
```
**Step 3: Convert to explicit environment variables**
**Option A: Shell exports (development)**
```bash
# Move values from .env to shell config
# .bashrc or .zshrc
export BASIC_MEMORY_PROJECT_ROOT=/app/data
export BASIC_MEMORY_LOG_LEVEL=DEBUG
export BASIC_MEMORY_DEFAULT_PROJECT=main
```
**Option B: direnv (recommended for development)**
```bash
# Install direnv
brew install direnv # macOS
sudo apt install direnv # Linux
# Create .envrc (git-ignored)
cat > .envrc <<EOF
export BASIC_MEMORY_PROJECT_ROOT=/app/data
export BASIC_MEMORY_LOG_LEVEL=DEBUG
EOF
# Allow direnv for this directory
direnv allow
# Auto-loads when entering directory
```
**Option C: systemd (production)**
```ini
# /etc/systemd/system/basic-memory.service
[Service]
Environment="BASIC_MEMORY_PROJECT_ROOT=/var/lib/basic-memory"
Environment="BASIC_MEMORY_LOG_LEVEL=INFO"
ExecStart=/usr/local/bin/basic-memory serve
```
**Option D: Docker (containers)**
```yaml
# docker-compose.yml
services:
basic-memory:
environment:
BASIC_MEMORY_PROJECT_ROOT: /app/data
BASIC_MEMORY_LOG_LEVEL: INFO
```
### If You Didn't Use .env Files
No action needed - your setup already uses explicit environment variables.
## Alternative Solutions
### Development: Use direnv
[direnv](https://direnv.net/) automatically loads environment variables when entering a directory:
**Setup:**
```bash
# Install
brew install direnv
# Add to shell (.bashrc or .zshrc)
eval "$(direnv hook bash)" # or zsh
# Create .envrc in project
cat > .envrc <<EOF
export BASIC_MEMORY_LOG_LEVEL=DEBUG
export BASIC_MEMORY_PROJECT_ROOT=\$PWD/data
EOF
# Git-ignore it
echo ".envrc" >> .gitignore
# Allow it
direnv allow
```
**Usage:**
```bash
# Entering directory auto-loads variables
cd ~/my-project
# → direnv: loading .envrc
# → direnv: export +BASIC_MEMORY_LOG_LEVEL +BASIC_MEMORY_PROJECT_ROOT
# Check variables
env | grep BASIC_MEMORY_
```
### Production: External Configuration
**AWS Systems Manager:**
```bash
# Store in Parameter Store
aws ssm put-parameter \
--name /basic-memory/project-root \
--value /app/data \
--type SecureString
# Retrieve and export
export BASIC_MEMORY_PROJECT_ROOT=$(aws ssm get-parameter \
--name /basic-memory/project-root \
--with-decryption \
--query Parameter.Value \
--output text)
```
**Kubernetes Secrets:**
```yaml
apiVersion: v1
kind: Secret
metadata:
name: basic-memory-env
stringData:
BASIC_MEMORY_PROJECT_ROOT: /app/data
---
apiVersion: v1
kind: Pod
spec:
containers:
- name: basic-memory
envFrom:
- secretRef:
name: basic-memory-env
```
**HashiCorp Vault:**
```bash
# Store in Vault
vault kv put secret/basic-memory \
project_root=/app/data \
log_level=INFO
# Retrieve and export
export BASIC_MEMORY_PROJECT_ROOT=$(vault kv get -field=project_root secret/basic-memory)
```
## Security Best Practices
### 1. Never Commit Environment Files
**Always git-ignore:**
```bash
# .gitignore
.env
.env.*
.envrc
*.env
cloud-auth.json
```
### 2. Use Secret Management
**For sensitive values:**
- AWS Secrets Manager
- HashiCorp Vault
- Kubernetes Secrets
- Azure Key Vault
- Google Secret Manager
### 3. Scope Secrets Appropriately
**Development:**
```bash
# Development secrets (less sensitive)
export BASIC_MEMORY_LOG_LEVEL=DEBUG
export BASIC_MEMORY_PROJECT_ROOT=~/dev/data
```
**Production:**
```bash
# Production secrets (highly sensitive)
export BASIC_MEMORY_CLOUD_SECRET_KEY=$(fetch-from-vault)
export BASIC_MEMORY_PROJECT_ROOT=/app/data
```
### 4. Audit Environment Variables
**Log non-sensitive vars:**
```python
import os
from loguru import logger
# Safe to log
safe_vars = {
k: v for k, v in os.environ.items()
if k.startswith("BASIC_MEMORY_") and "SECRET" not in k
}
logger.info(f"Config loaded with: {safe_vars}")
# Never log
secret_vars = [k for k in os.environ.keys() if "SECRET" in k or "KEY" in k]
logger.debug(f"Secret vars present: {len(secret_vars)}")
```
### 5. Principle of Least Privilege
```bash
# ✓ Good: Minimal permissions
export BASIC_MEMORY_PROJECT_ROOT=/app/data/tenant-123 # Scoped to tenant
# ✗ Bad: Too permissive
export BASIC_MEMORY_PROJECT_ROOT=/ # Entire filesystem
```
## Troubleshooting
### Variables Not Loading
**Problem:** Settings not taking effect after migration
**Check:**
```bash
# Are variables actually exported?
env | grep BASIC_MEMORY_
# Not exported (wrong)
BASIC_MEMORY_LOG_LEVEL=DEBUG # Missing 'export'
# Exported (correct)
export BASIC_MEMORY_LOG_LEVEL=DEBUG
```
### .env Still Present
**Problem:** Old .env file exists but ignored
**Solution:**
```bash
# Review and remove
cat .env # Check contents
rm .env # Remove after migrating
# Ensure git-ignored
echo ".env" >> .gitignore
```
### Different Behavior After Upgrade
**Problem:** Config different after v0.15.0
**Check for .env usage:**
```bash
# Did you have .env?
git log --all --full-history -- .env
# If yes, migrate values to explicit env vars
```
## Configuration Checklist
After removing .env files, verify:
- [ ] All required env vars exported explicitly
- [ ] .env files removed or git-ignored
- [ ] Production uses systemd/Docker/K8s env vars
- [ ] Development uses direnv or shell config
- [ ] Secrets stored in secret manager (not env files)
- [ ] No credentials committed to git
- [ ] Documentation updated with new approach
## Example Configurations
### Local Development
**~/.bashrc or ~/.zshrc:**
```bash
# Basic Memory configuration
export BASIC_MEMORY_LOG_LEVEL=DEBUG
export BASIC_MEMORY_PROJECT_ROOT=~/dev/basic-memory
export BASIC_MEMORY_DEFAULT_PROJECT=main
export BASIC_MEMORY_DEFAULT_PROJECT_MODE=true
```
### Docker Development
**docker-compose.yml:**
```yaml
services:
basic-memory:
image: basic-memory:latest
environment:
BASIC_MEMORY_LOG_LEVEL: DEBUG
BASIC_MEMORY_PROJECT_ROOT: /app/data
BASIC_MEMORY_HOME: /app/data/basic-memory
volumes:
- ./data:/app/data
```
### Production Deployment
**systemd service:**
```ini
[Unit]
Description=Basic Memory Service
[Service]
Type=simple
User=basicmemory
Environment="BASIC_MEMORY_ENV=user"
Environment="BASIC_MEMORY_LOG_LEVEL=INFO"
Environment="BASIC_MEMORY_PROJECT_ROOT=/var/lib/basic-memory"
EnvironmentFile=/etc/basic-memory/secrets.env
ExecStart=/usr/local/bin/basic-memory serve
[Install]
WantedBy=multi-user.target
```
**/etc/basic-memory/secrets.env:**
```bash
# Loaded via EnvironmentFile
BASIC_MEMORY_CLOUD_SECRET_KEY=<from-secret-manager>
```
### Kubernetes Production
**ConfigMap (non-secret):**
```yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: basic-memory-config
data:
BASIC_MEMORY_LOG_LEVEL: "INFO"
BASIC_MEMORY_PROJECT_ROOT: "/app/data"
```
**Secret (sensitive):**
```yaml
apiVersion: v1
kind: Secret
metadata:
name: basic-memory-secrets
type: Opaque
stringData:
BASIC_MEMORY_CLOUD_SECRET_KEY: <base64-encoded>
```
**Deployment:**
```yaml
apiVersion: apps/v1
kind: Deployment
spec:
template:
spec:
containers:
- name: basic-memory
envFrom:
- configMapRef:
name: basic-memory-config
- secretRef:
name: basic-memory-secrets
```
## See Also
- `env-var-overrides.md` - How environment variables work
- Security best practices documentation
- Secret management guide
- Configuration reference
```
--------------------------------------------------------------------------------
/src/basic_memory/mcp/tools/delete_note.py:
--------------------------------------------------------------------------------
```python
from textwrap import dedent
from typing import Optional
from loguru import logger
from fastmcp import Context
from basic_memory.mcp.project_context import get_active_project
from basic_memory.mcp.tools.utils import call_delete
from basic_memory.mcp.server import mcp
from basic_memory.mcp.async_client import get_client
from basic_memory.schemas import DeleteEntitiesResponse
def _format_delete_error_response(project: str, error_message: str, identifier: str) -> str:
"""Format helpful error responses for delete failures that guide users to successful deletions."""
# Note not found errors
if "entity not found" in error_message.lower() or "not found" in error_message.lower():
search_term = identifier.split("/")[-1] if "/" in identifier else identifier
title_format = (
identifier.split("/")[-1].replace("-", " ").title() if "/" in identifier else identifier
)
permalink_format = identifier.lower().replace(" ", "-")
return dedent(f"""
# Delete Failed - Note Not Found
The note '{identifier}' could not be found for deletion in {project}.
## This might mean:
1. **Already deleted**: The note may have been deleted previously
2. **Wrong identifier**: The identifier format might be incorrect
3. **Different project**: The note might be in a different project
## How to verify:
1. **Search for the note**: Use `search_notes("{project}", "{search_term}")` to find it
2. **Try different formats**:
- If you used a permalink like "folder/note-title", try just the title: "{title_format}"
- If you used a title, try the permalink format: "{permalink_format}"
3. **Check if already deleted**: Use `list_directory("/")` to see what notes exist
4. **List notes in project**: Use `list_directory("/")` to see what notes exist in the current project
## If the note actually exists:
```
# First, find the correct identifier:
search_notes("{project}", "{identifier}")
# Then delete using the correct identifier:
delete_note("{project}", "correct-identifier-from-search")
```
## If you want to delete multiple similar notes:
Use search to find all related notes and delete them one by one.
""").strip()
# Permission/access errors
if (
"permission" in error_message.lower()
or "access" in error_message.lower()
or "forbidden" in error_message.lower()
):
return f"""# Delete Failed - Permission Error
You don't have permission to delete '{identifier}': {error_message}
## How to resolve:
1. **Check permissions**: Verify you have delete/write access to this project
2. **File locks**: The note might be open in another application
3. **Project access**: Ensure you're in the correct project with proper permissions
## Alternative actions:
- List available projects: `list_memory_projects()`
- Specify the correct project: `delete_note("{identifier}", project="project-name")`
- Verify note exists first: `read_note("{identifier}", project="project-name")`
## If you have read-only access:
Ask someone with write access to delete the note."""
# Server/filesystem errors
if (
"server error" in error_message.lower()
or "filesystem" in error_message.lower()
or "disk" in error_message.lower()
):
return f"""# Delete Failed - System Error
A system error occurred while deleting '{identifier}': {error_message}
## Immediate steps:
1. **Try again**: The error might be temporary
2. **Check file status**: Verify the file isn't locked or in use
3. **Check disk space**: Ensure the system has adequate storage
## Troubleshooting:
- Verify note exists: `read_note("{project}","{identifier}")`
- Try again in a few moments
## If problem persists:
Send a message to [email protected] - there may be a filesystem or database issue."""
# Database/sync errors
if "database" in error_message.lower() or "sync" in error_message.lower():
return f"""# Delete Failed - Database Error
A database error occurred while deleting '{identifier}': {error_message}
## This usually means:
1. **Sync conflict**: The file system and database are out of sync
2. **Database lock**: Another operation is accessing the database
3. **Corrupted entry**: The database entry might be corrupted
## Steps to resolve:
1. **Try again**: Wait a moment and retry the deletion
2. **Check note status**: `read_note("{project}","{identifier}")` to see current state
3. **Manual verification**: Use `list_directory()` to see if file still exists
## If the note appears gone but database shows it exists:
Send a message to [email protected] - a manual database cleanup may be needed."""
# Generic fallback
return f"""# Delete Failed
Error deleting note '{identifier}': {error_message}
## General troubleshooting:
1. **Verify the note exists**: `read_note("{project}", "{identifier}")` or `search_notes("{project}", "{identifier}")`
2. **Check permissions**: Ensure you can edit/delete files in this project
3. **Try again**: The error might be temporary
4. **Check project**: Make sure you're in the correct project
## Step-by-step approach:
```
# 1. Confirm note exists and get correct identifier
search_notes("{project}", "{identifier}")
# 2. Read the note to verify access
read_note("{project}", "correct-identifier-from-search")
# 3. Try deletion with correct identifier
delete_note("{project}", "correct-identifier-from-search")
```
## Alternative approaches:
- Check what notes exist: `list_directory("{project}", "/")`
## Need help?
If the note should be deleted but the operation keeps failing, send a message to [email protected]."""
@mcp.tool(description="Delete a note by title or permalink")
async def delete_note(
identifier: str, project: Optional[str] = None, context: Context | None = None
) -> bool | str:
"""Delete a note from the knowledge base.
Permanently removes a note from the specified project. The note is identified
by title or permalink. If the note doesn't exist, the operation returns False
without error. If deletion fails due to other issues, helpful error messages are provided.
Project Resolution:
Server resolves projects in this order: Single Project Mode → project parameter → default project.
If project unknown, use list_memory_projects() or recent_activity() first.
Args:
project: Project name to delete from. Optional - server will resolve using hierarchy.
If unknown, use list_memory_projects() to discover available projects.
identifier: Note title or permalink to delete
Can be a title like "Meeting Notes" or permalink like "notes/meeting-notes"
context: Optional FastMCP context for performance caching.
Returns:
True if note was successfully deleted, False if note was not found.
On errors, returns a formatted string with helpful troubleshooting guidance.
Examples:
# Delete by title
delete_note("my-project", "Meeting Notes: Project Planning")
# Delete by permalink
delete_note("work-docs", "notes/project-planning")
# Delete with exact path
delete_note("research", "experiments/ml-model-results")
# Common usage pattern
if delete_note("my-project", "old-draft"):
print("Note deleted successfully")
else:
print("Note not found or already deleted")
Raises:
HTTPError: If project doesn't exist or is inaccessible
SecurityError: If identifier attempts path traversal
Warning:
This operation is permanent and cannot be undone. The note file
will be removed from the filesystem and all references will be lost.
Note:
If the note is not found, this function provides helpful error messages
with suggestions for finding the correct identifier, including search
commands and alternative formats to try.
"""
async with get_client() as client:
active_project = await get_active_project(client, project, context)
project_url = active_project.project_url
try:
response = await call_delete(client, f"{project_url}/knowledge/entities/{identifier}")
result = DeleteEntitiesResponse.model_validate(response.json())
if result.deleted:
logger.info(
f"Successfully deleted note: {identifier} in project: {active_project.name}"
)
return True
else:
logger.warning(f"Delete operation completed but note was not deleted: {identifier}")
return False
except Exception as e: # pragma: no cover
logger.error(f"Delete failed for '{identifier}': {e}, project: {active_project.name}")
# Return formatted error message for better user experience
return _format_delete_error_response(active_project.name, str(e), identifier)
```
--------------------------------------------------------------------------------
/tests/sync/test_watch_service_edge_cases.py:
--------------------------------------------------------------------------------
```python
"""Test edge cases in the WatchService."""
from unittest.mock import patch
import pytest
from watchfiles import Change
def test_filter_changes_valid_path(watch_service, project_config):
"""Test the filter_changes method with valid non-hidden paths."""
# Regular file path
assert (
watch_service.filter_changes(Change.added, str(project_config.home / "valid_file.txt"))
is True
)
# Nested path
assert (
watch_service.filter_changes(
Change.added, str(project_config.home / "nested" / "valid_file.txt")
)
is True
)
def test_filter_changes_hidden_path(watch_service, project_config):
"""Test the filter_changes method with hidden files/directories."""
# Hidden file (starts with dot)
assert (
watch_service.filter_changes(Change.added, str(project_config.home / ".hidden_file.txt"))
is False
)
# File in hidden directory
assert (
watch_service.filter_changes(
Change.added, str(project_config.home / ".hidden_dir" / "file.txt")
)
is False
)
# Deeply nested hidden directory
assert (
watch_service.filter_changes(
Change.added, str(project_config.home / "valid" / ".hidden" / "file.txt")
)
is False
)
@pytest.mark.asyncio
async def test_handle_changes_empty_set(watch_service, project_config, test_project):
"""Test handle_changes with an empty set (no processed files)."""
# Mock write_status to avoid file operations
with patch.object(watch_service, "write_status", return_value=None):
# Capture console output to verify
with patch.object(watch_service.console, "print") as mock_print:
# Call handle_changes with empty set
await watch_service.handle_changes(test_project, set())
# Verify divider wasn't printed (processed is empty)
mock_print.assert_not_called()
# Verify last_scan was updated
assert watch_service.state.last_scan is not None
# Verify synced_files wasn't changed
assert watch_service.state.synced_files == 0
@pytest.mark.asyncio
async def test_handle_vim_atomic_write_delete_still_exists(
watch_service, project_config, test_project, sync_service
):
"""Test vim atomic write scenario: DELETE event but file still exists on disk."""
project_dir = project_config.home
# Create initial file and sync it
test_file = project_dir / "vim_test.md"
initial_content = """---
type: note
title: vim test
---
# Vim Test
Initial content for atomic write test
"""
test_file.write_text(initial_content)
await sync_service.sync(project_dir)
# Get initial entity state
initial_entity = await sync_service.entity_repository.get_by_file_path("vim_test.md")
assert initial_entity is not None
initial_checksum = initial_entity.checksum
# Simulate vim's atomic write: modify content but send DELETE event
# (vim moves original file, creates new content, then deletes old inode)
modified_content = """---
type: note
title: vim test
---
# Vim Test
Modified content after atomic write
"""
test_file.write_text(modified_content)
# Setup DELETE event even though file still exists (vim's atomic write behavior)
# Use absolute path like the real watch service would
changes = {(Change.deleted, str(test_file))}
# Handle the change
await watch_service.handle_changes(test_project, changes)
# Verify the entity still exists and was updated (not deleted)
entity = await sync_service.entity_repository.get_by_file_path("vim_test.md")
assert entity is not None
assert entity.id == initial_entity.id # Same entity
assert entity.checksum != initial_checksum # Checksum should be updated
# Verify the file content was properly synced
actual_content = test_file.read_text()
assert "Modified content after atomic write" in actual_content
# Check that correct event was recorded (should be "modified", not "deleted")
events = [e for e in watch_service.state.recent_events if e.path == "vim_test.md"]
assert len(events) == 1
assert events[0].action == "modified"
assert events[0].status == "success"
@pytest.mark.asyncio
async def test_handle_true_deletion_vs_vim_atomic(
watch_service, project_config, test_project, sync_service
):
"""Test that true deletions are still handled correctly vs vim atomic writes."""
project_dir = project_config.home
# Create and sync two files
atomic_file = project_dir / "atomic_test.md"
delete_file = project_dir / "delete_test.md"
content = """---
type: note
---
# Test File
Content for testing
"""
atomic_file.write_text(content)
delete_file.write_text(content)
await sync_service.sync(project_dir)
# For atomic_file: modify content but keep file (vim atomic write scenario)
modified_content = content.replace("Content for testing", "Modified content")
atomic_file.write_text(modified_content)
# For delete_file: actually delete it (true deletion)
delete_file.unlink()
# Setup DELETE events for both files
# Use absolute paths like the real watch service would
changes = {
(Change.deleted, str(atomic_file)), # File still exists - atomic write
(Change.deleted, str(delete_file)), # File deleted - true deletion
}
# Handle the changes
await watch_service.handle_changes(test_project, changes)
# Verify atomic_file was treated as modification (still exists in DB)
atomic_entity = await sync_service.entity_repository.get_by_file_path("atomic_test.md")
assert atomic_entity is not None
# Verify delete_file was truly deleted (no longer exists in DB)
delete_entity = await sync_service.entity_repository.get_by_file_path("delete_test.md")
assert delete_entity is None
# Check events were recorded correctly
events = watch_service.state.recent_events
atomic_events = [e for e in events if e.path == "atomic_test.md"]
delete_events = [e for e in events if e.path == "delete_test.md"]
assert len(atomic_events) == 1
assert atomic_events[0].action == "modified"
assert len(delete_events) == 1
assert delete_events[0].action == "deleted"
@pytest.mark.asyncio
async def test_handle_vim_atomic_write_markdown_with_relations(
watch_service, project_config, test_project, sync_service
):
"""Test vim atomic write with markdown files that contain relations."""
project_dir = project_config.home
# Create target file for relations
target_file = project_dir / "target.md"
target_content = """---
type: note
title: Target Note
---
# Target Note
This is the target of relations.
"""
target_file.write_text(target_content)
# Create main file with relations
main_file = project_dir / "main.md"
initial_content = """---
type: note
title: Main Note
---
# Main Note
This note links to [[Target Note]].
- relates_to [[Target Note]]
"""
main_file.write_text(initial_content)
await sync_service.sync(project_dir)
# Get initial state
main_entity = await sync_service.entity_repository.get_by_file_path("main.md")
assert main_entity is not None
initial_relations = len(main_entity.relations)
# Simulate vim atomic write with content change that adds more relations
modified_content = """---
type: note
title: Main Note
---
# Main Note
This note links to [[Target Note]] multiple times.
- relates_to [[Target Note]]
- references [[Target Note]]
"""
main_file.write_text(modified_content)
# Setup DELETE event (vim atomic write)
# Use absolute path like the real watch service would
changes = {(Change.deleted, str(main_file))}
# Handle the change
await watch_service.handle_changes(test_project, changes)
# Verify entity still exists and relations were updated
updated_entity = await sync_service.entity_repository.get_by_file_path("main.md")
assert updated_entity is not None
assert updated_entity.id == main_entity.id
# Verify relations were processed correctly
updated_relations = len(updated_entity.relations)
assert updated_relations >= initial_relations # Should have at least as many relations
# Check event was recorded as modification
events = [e for e in watch_service.state.recent_events if e.path == "main.md"]
assert len(events) == 1
assert events[0].action == "modified"
@pytest.mark.asyncio
async def test_handle_vim_atomic_write_directory_path_ignored(
watch_service, project_config, test_project
):
"""Test that directories are properly ignored even in atomic write detection."""
project_dir = project_config.home
# Create directory
test_dir = project_dir / "test_directory"
test_dir.mkdir()
# Setup DELETE event for directory (should be ignored)
# Use absolute path like the real watch service would
changes = {(Change.deleted, str(test_dir))}
# Handle the change - should not cause errors
await watch_service.handle_changes(test_project, changes)
# Verify no events were recorded for the directory
events = [e for e in watch_service.state.recent_events if "test_directory" in e.path]
assert len(events) == 0
```
--------------------------------------------------------------------------------
/tests/services/test_directory_service.py:
--------------------------------------------------------------------------------
```python
"""Tests for directory service."""
import pytest
from basic_memory.services.directory_service import DirectoryService
@pytest.mark.asyncio
async def test_directory_tree_empty(directory_service: DirectoryService):
"""Test getting empty directory tree."""
# When no entities exist, result should just be the root
result = await directory_service.get_directory_tree()
assert result is not None
assert len(result.children) == 0
assert result.name == "Root"
assert result.directory_path == "/"
assert result.has_children is False
@pytest.mark.asyncio
async def test_directory_tree(directory_service: DirectoryService, test_graph):
# test_graph files:
# /
# ├── test
# │ ├── Connected Entity 1.md
# │ ├── Connected Entity 2.md
# │ ├── Deep Entity.md
# │ ├── Deeper Entity.md
# │ └── Root.md
result = await directory_service.get_directory_tree()
assert result is not None
assert len(result.children) == 1
node_0 = result.children[0]
assert node_0.name == "test"
assert node_0.type == "directory"
assert node_0.content_type is None
assert node_0.entity_id is None
assert node_0.entity_type is None
assert node_0.title is None
assert node_0.directory_path == "/test"
assert node_0.has_children is True
assert len(node_0.children) == 5
# assert one file node
node_file = node_0.children[0]
assert node_file.name == "Deeper Entity.md"
assert node_file.type == "file"
assert node_file.content_type == "text/markdown"
assert node_file.entity_id == 1
assert node_file.entity_type == "deeper"
assert node_file.title == "Deeper Entity"
assert node_file.permalink == "test/deeper-entity"
assert node_file.directory_path == "/test/Deeper Entity.md"
assert node_file.file_path == "test/Deeper Entity.md"
assert node_file.has_children is False
assert len(node_file.children) == 0
@pytest.mark.asyncio
async def test_list_directory_empty(directory_service: DirectoryService):
"""Test listing directory with no entities."""
result = await directory_service.list_directory()
assert result == []
@pytest.mark.asyncio
async def test_list_directory_root(directory_service: DirectoryService, test_graph):
"""Test listing root directory contents."""
result = await directory_service.list_directory(dir_name="/")
# Should return immediate children of root (the "test" directory)
assert len(result) == 1
assert result[0].name == "test"
assert result[0].type == "directory"
assert result[0].directory_path == "/test"
@pytest.mark.asyncio
async def test_list_directory_specific_path(directory_service: DirectoryService, test_graph):
"""Test listing specific directory contents."""
result = await directory_service.list_directory(dir_name="/test")
# Should return the 5 files in the test directory
assert len(result) == 5
file_names = {node.name for node in result}
expected_files = {
"Connected Entity 1.md",
"Connected Entity 2.md",
"Deep Entity.md",
"Deeper Entity.md",
"Root.md",
}
assert file_names == expected_files
# All should be files
for node in result:
assert node.type == "file"
@pytest.mark.asyncio
async def test_list_directory_nonexistent_path(directory_service: DirectoryService, test_graph):
"""Test listing nonexistent directory."""
result = await directory_service.list_directory(dir_name="/nonexistent")
assert result == []
@pytest.mark.asyncio
async def test_list_directory_with_glob_filter(directory_service: DirectoryService, test_graph):
"""Test listing directory with glob pattern filtering."""
# Filter for files containing "Connected"
result = await directory_service.list_directory(dir_name="/test", file_name_glob="*Connected*")
assert len(result) == 2
file_names = {node.name for node in result}
assert file_names == {"Connected Entity 1.md", "Connected Entity 2.md"}
@pytest.mark.asyncio
async def test_list_directory_with_markdown_filter(directory_service: DirectoryService, test_graph):
"""Test listing directory with markdown file filter."""
result = await directory_service.list_directory(dir_name="/test", file_name_glob="*.md")
# All files in test_graph are markdown files
assert len(result) == 5
@pytest.mark.asyncio
async def test_list_directory_with_specific_file_filter(
directory_service: DirectoryService, test_graph
):
"""Test listing directory with specific file pattern."""
result = await directory_service.list_directory(dir_name="/test", file_name_glob="Root.*")
assert len(result) == 1
assert result[0].name == "Root.md"
@pytest.mark.asyncio
async def test_list_directory_depth_control(directory_service: DirectoryService, test_graph):
"""Test listing directory with depth control."""
# Depth 1 should only return immediate children
result_depth_1 = await directory_service.list_directory(dir_name="/", depth=1)
assert len(result_depth_1) == 1 # Just the "test" directory
# Depth 2 should return directory + its contents
result_depth_2 = await directory_service.list_directory(dir_name="/", depth=2)
assert len(result_depth_2) == 6 # "test" directory + 5 files in it
@pytest.mark.asyncio
async def test_list_directory_path_normalization(directory_service: DirectoryService, test_graph):
"""Test that directory paths are normalized correctly."""
# Test various path formats that should all be equivalent
paths_to_test = ["/test", "test", "/test/", "test/"]
base_result = await directory_service.list_directory(dir_name="/test")
for path in paths_to_test:
result = await directory_service.list_directory(dir_name=path)
assert len(result) == len(base_result)
# Compare by name since the objects might be different instances
result_names = {node.name for node in result}
base_names = {node.name for node in base_result}
assert result_names == base_names
@pytest.mark.asyncio
async def test_list_directory_dot_slash_prefix_normalization(
directory_service: DirectoryService, test_graph
):
"""Test that ./ prefixed directory paths are normalized correctly."""
# This test reproduces the bug report issue where ./dirname fails
base_result = await directory_service.list_directory(dir_name="/test")
# Test paths with ./ prefix that should be equivalent to /test
dot_paths_to_test = ["./test", "./test/"]
for path in dot_paths_to_test:
result = await directory_service.list_directory(dir_name=path)
assert len(result) == len(base_result), (
f"Path '{path}' returned {len(result)} results, expected {len(base_result)}"
)
# Compare by name since the objects might be different instances
result_names = {node.name for node in result}
base_names = {node.name for node in base_result}
assert result_names == base_names, f"Path '{path}' returned different files than expected"
@pytest.mark.asyncio
async def test_list_directory_glob_no_matches(directory_service: DirectoryService, test_graph):
"""Test listing directory with glob that matches nothing."""
result = await directory_service.list_directory(
dir_name="/test", file_name_glob="*.nonexistent"
)
assert result == []
@pytest.mark.asyncio
async def test_list_directory_default_parameters(directory_service: DirectoryService, test_graph):
"""Test listing directory with default parameters."""
# Should default to root directory, depth 1, no glob filter
result = await directory_service.list_directory()
assert len(result) == 1
assert result[0].name == "test"
assert result[0].type == "directory"
@pytest.mark.asyncio
async def test_directory_structure_empty(directory_service: DirectoryService):
"""Test getting empty directory structure."""
# When no entities exist, result should just be the root
result = await directory_service.get_directory_structure()
assert result is not None
assert len(result.children) == 0
assert result.name == "Root"
assert result.directory_path == "/"
assert result.type == "directory"
assert result.has_children is False
@pytest.mark.asyncio
async def test_directory_structure(directory_service: DirectoryService, test_graph):
"""Test getting directory structure with folders only (no files)."""
# test_graph files:
# /
# ├── test
# │ ├── Connected Entity 1.md
# │ ├── Connected Entity 2.md
# │ ├── Deep Entity.md
# │ ├── Deeper Entity.md
# │ └── Root.md
result = await directory_service.get_directory_structure()
assert result is not None
assert len(result.children) == 1
# Should only have the "test" directory, not the files
node_0 = result.children[0]
assert node_0.name == "test"
assert node_0.type == "directory"
assert node_0.directory_path == "/test"
assert node_0.has_children is False # No subdirectories, only files
# Verify no file metadata is present
assert node_0.content_type is None
assert node_0.entity_id is None
assert node_0.entity_type is None
assert node_0.title is None
assert node_0.permalink is None
# No file nodes should be present
assert len(node_0.children) == 0
```
--------------------------------------------------------------------------------
/specs/SPEC-14 Cloud Git Versioning & GitHub Backup.md:
--------------------------------------------------------------------------------
```markdown
---
title: 'SPEC-14: Cloud Git Versioning & GitHub Backup'
type: spec
permalink: specs/spec-14-cloud-git-versioning
tags:
- git
- github
- backup
- versioning
- cloud
related:
- specs/spec-9-multi-project-bisync
- specs/spec-9-follow-ups-conflict-sync-and-observability
status: deferred
---
# SPEC-14: Cloud Git Versioning & GitHub Backup
**Status: DEFERRED** - Postponed until multi-user/teams feature development. Using S3 versioning (SPEC-9.1) for v1 instead.
## Why Deferred
**Original goals can be met with simpler solutions:**
- Version history → **S3 bucket versioning** (automatic, zero config)
- Offsite backup → **Tigris global replication** (built-in)
- Restore capability → **S3 version restore** (`bm cloud restore --version-id`)
- Collaboration → **Deferred to teams/multi-user feature** (not v1 requirement)
**Complexity vs value trade-off:**
- Git integration adds: committer service, puller service, webhooks, LFS, merge conflicts
- Risk: Loop detection between Git ↔ rclone bisync ↔ local edits
- S3 versioning gives 80% of value with 5% of complexity
**When to revisit:**
- Teams/multi-user features (PR-based collaboration workflow)
- User requests for commit messages and branch-based workflows
- Need for fine-grained audit trail beyond S3 object metadata
---
## Original Specification (for reference)
## Why
Early access users want **transparent version history**, easy **offsite backup**, and a familiar **restore/branching** workflow. Git/GitHub integration would provide:
- Auditable history of every change (who/when/why)
- Branches/PRs for review and collaboration
- Offsite private backup under the user's control
- Escape hatch: users can always `git clone` their knowledge base
**Note:** These goals are now addressed via S3 versioning (SPEC-9.1) for single-user use case.
## Goals
- **Transparent**: Users keep using Basic Memory; Git runs behind the scenes.
- **Private**: Push to a **private GitHub repo** that the user owns (or tenant org).
- **Reliable**: No data loss, deterministic mapping of filesystem ↔ Git.
- **Composable**: Plays nicely with SPEC‑9 bisync and upcoming conflict features (SPEC‑9 Follow‑Ups).
**Non‑Goals (for v1):**
- Fine‑grained per‑file encryption in Git history (can be layered later).
- Large media optimization beyond Git LFS defaults.
## User Stories
1. *As a user*, I connect my GitHub and choose a private backup repo.
2. *As a user*, every change I make in cloud (or via bisync) is **committed** and **pushed** automatically.
3. *As a user*, I can **restore** a file/folder/project to a prior version.
4. *As a power user*, I can **git pull/push** directly to collaborate outside the app.
5. *As an admin*, I can enforce repo ownership (tenant org) and least‑privilege scopes.
## Scope
- **In scope:** Full repo backup of `/app/data/` (all projects) with optional selective subpaths.
- **Out of scope (v1):** Partial shallow mirrors; encrypted Git; cross‑provider SCM (GitLab/Bitbucket).
## Architecture
### Topology
- **Authoritative working tree**: `/app/data/` (bucket mount) remains the source of truth (SPEC‑9).
- **Bare repo** lives alongside: `/app/git/${tenant}/knowledge.git` (server‑side).
- **Mirror remote**: `github.com/<owner>/<repo>.git` (private).
```mermaid
flowchart LR
A[/Users & Agents/] -->|writes/edits| B[/app/data/]
B -->|file events| C[Committer Service]
C -->|git commit| D[(Bare Repo)]
D -->|push| E[(GitHub Private Repo)]
E -->|webhook (push)| F[Puller Service]
F -->|git pull/merge| D
D -->|checkout/merge| B
```
### Services
- **Committer Service** (daemon):
- Watches `/app/data/` for changes (inotify/poll)
- Batches changes (debounce e.g. 2–5s)
- Writes `.bmmeta` (if present) into commit message trailer (see Follow‑Ups)
- `git add -A && git commit -m "chore(sync): <summary>
BM-Meta: <json>"`
- Periodic `git push` to GitHub mirror (configurable interval)
- **Puller Service** (webhook target):
- Receives GitHub webhook (push) → `git fetch`
- **Fast‑forward** merges to `main` only; reject non‑FF unless policy allows
- Applies changes back to `/app/data/` via clean checkout
- Emits sync events for Basic Memory indexers
### Auth & Security
- **GitHub App** (recommended): minimal scopes: `contents:read/write`, `metadata:read`, webhook.
- Tenant‑scoped installation; repo created in user account or tenant org.
- Tokens stored in KMS/secret manager; rotated automatically.
- Optional policy: allow only **FF merges** on `main`; non‑FF requires PR.
### Repo Layout
- **Monorepo** (default): one repo per tenant mirrors `/app/data/` with subfolders per project.
- Optional multi‑repo mode (later): one repo per project.
### File Handling
- Honor `.gitignore` generated from `.bmignore.rclone` + BM defaults (cache, temp, state).
- **Git LFS** for large binaries (images, media) — auto track by extension/size threshold.
- Normalize newline + Unicode (aligns with Follow‑Ups).
### Conflict Model
- **Primary concurrency**: SPEC‑9 Follow‑Ups (`.bmmeta`, conflict copies) stays the first line of defense.
- **Git merges** are a **secondary** mechanism:
- Server only auto‑merges **text** conflicts when trivial (FF or clean 3‑way).
- Otherwise, create `name (conflict from <branch>, <ts>).md` and surface via events.
### Data Flow vs Bisync
- Bisync (rclone) continues between local sync dir ↔ bucket.
- Git sits **cloud‑side** between bucket and GitHub.
- On **pull** from GitHub → files written to `/app/data/` → picked up by indexers & eventually by bisync back to users.
## CLI & UX
New commands (cloud mode):
- `bm cloud git connect` — Launch GitHub App installation; create private repo; store installation id.
- `bm cloud git status` — Show connected repo, last push time, last webhook delivery, pending commits.
- `bm cloud git push` — Manual push (rarely needed).
- `bm cloud git pull` — Manual pull/FF (admin only by default).
- `bm cloud snapshot -m "message"` — Create a tagged point‑in‑time snapshot (git tag).
- `bm restore <path> --to <commit|tag>` — Restore file/folder/project to prior version.
Settings:
- `bm config set git.autoPushInterval=5s`
- `bm config set git.lfs.sizeThreshold=10MB`
- `bm config set git.allowNonFF=false`
## Migration & Backfill
- On connect, if repo empty: initial commit of entire `/app/data/`.
- If repo has content: require **one‑time import** path (clone to staging, reconcile, choose direction).
## Edge Cases
- Massive deletes: gated by SPEC‑9 `max_delete` **and** Git pre‑push hook checks.
- Case changes and rename detection: rely on git rename heuristics + Follow‑Ups move hints.
- Secrets: default ignore common secret patterns; allow custom deny list.
## Telemetry & Observability
- Emit `git_commit`, `git_push`, `git_pull`, `git_conflict` events with correlation IDs.
- `bm sync --report` extended with Git stats (commit count, delta bytes, push latency).
## Phased Plan
### Phase 0 — Prototype (1 sprint)
- Server: bare repo init + simple committer (batch every 10s) + manual GitHub token.
- CLI: `bm cloud git connect --token <PAT>` (dev‑only)
- Success: edits in `/app/data/` appear in GitHub within 30s.
### Phase 1 — GitHub App & Webhooks (1–2 sprints)
- Switch to GitHub App installs; create private repo; store installation id.
- Committer hardened (debounce 2–5s, backoff, retries).
- Puller service with webhook → FF merge → checkout to `/app/data/`.
- LFS auto‑track + `.gitignore` generation.
- CLI surfaces status + logs.
### Phase 2 — Restore & Snapshots (1 sprint)
- `bm restore` for file/folder/project with dry‑run.
- `bm cloud snapshot` tags + list/inspect.
- Policy: PR‑only non‑FF, admin override.
### Phase 3 — Selective & Multi‑Repo (nice‑to‑have)
- Include/exclude projects; optional per‑project repos.
- Advanced policies (branch protections, required reviews).
## Acceptance Criteria
- Changes to `/app/data/` are committed and pushed automatically within configurable interval (default ≤5s).
- GitHub webhook pull results in updated files in `/app/data/` (FF‑only by default).
- LFS configured and functioning; large files don't bloat history.
- `bm cloud git status` shows connected repo and last push/pull times.
- `bm restore` restores a file/folder to a prior commit with a clear audit trail.
- End‑to‑end works alongside SPEC‑9 bisync without loops or data loss.
## Risks & Mitigations
- **Loop risk (Git ↔ Bisync)**: Writes to `/app/data/` → bisync → local → user edits → back again. *Mitigation*: Debounce, commit squashing, idempotent `.bmmeta` versioning, and watch exclusion windows during pull.
- **Repo bloat**: Lots of binary churn. *Mitigation*: default LFS, size threshold, optional media‑only repo later.
- **Security**: Token leakage. *Mitigation*: GitHub App with short‑lived tokens, KMS storage, scoped permissions.
- **Merge complexity**: Non‑trivial conflicts. *Mitigation*: prefer FF; otherwise conflict copies + events; require PR for non‑FF.
## Open Questions
- Do we default to **monorepo** per tenant, or offer project‑per‑repo at connect time?
- Should `restore` write to a branch and open a PR, or directly modify `main`?
- How do we expose Git history in UI (timeline view) without users dropping to CLI?
## Appendix: Sample Config
```json
{
"git": {
"enabled": true,
"repo": "https://github.com/<owner>/<repo>.git",
"autoPushInterval": "5s",
"allowNonFF": false,
"lfs": { "sizeThreshold": 10485760 }
}
}
```
```
--------------------------------------------------------------------------------
/specs/SPEC-14- Cloud Git Versioning & GitHub Backup.md:
--------------------------------------------------------------------------------
```markdown
---
title: 'SPEC-14: Cloud Git Versioning & GitHub Backup'
type: spec
permalink: specs/spec-14-cloud-git-versioning
tags:
- git
- github
- backup
- versioning
- cloud
related:
- specs/spec-9-multi-project-bisync
- specs/spec-9-follow-ups-conflict-sync-and-observability
status: deferred
---
# SPEC-14: Cloud Git Versioning & GitHub Backup
**Status: DEFERRED** - Postponed until multi-user/teams feature development. Using S3 versioning (SPEC-9.1) for v1 instead.
## Why Deferred
**Original goals can be met with simpler solutions:**
- Version history → **S3 bucket versioning** (automatic, zero config)
- Offsite backup → **Tigris global replication** (built-in)
- Restore capability → **S3 version restore** (`bm cloud restore --version-id`)
- Collaboration → **Deferred to teams/multi-user feature** (not v1 requirement)
**Complexity vs value trade-off:**
- Git integration adds: committer service, puller service, webhooks, LFS, merge conflicts
- Risk: Loop detection between Git ↔ rclone bisync ↔ local edits
- S3 versioning gives 80% of value with 5% of complexity
**When to revisit:**
- Teams/multi-user features (PR-based collaboration workflow)
- User requests for commit messages and branch-based workflows
- Need for fine-grained audit trail beyond S3 object metadata
---
## Original Specification (for reference)
## Why
Early access users want **transparent version history**, easy **offsite backup**, and a familiar **restore/branching** workflow. Git/GitHub integration would provide:
- Auditable history of every change (who/when/why)
- Branches/PRs for review and collaboration
- Offsite private backup under the user's control
- Escape hatch: users can always `git clone` their knowledge base
**Note:** These goals are now addressed via S3 versioning (SPEC-9.1) for single-user use case.
## Goals
- **Transparent**: Users keep using Basic Memory; Git runs behind the scenes.
- **Private**: Push to a **private GitHub repo** that the user owns (or tenant org).
- **Reliable**: No data loss, deterministic mapping of filesystem ↔ Git.
- **Composable**: Plays nicely with SPEC‑9 bisync and upcoming conflict features (SPEC‑9 Follow‑Ups).
**Non‑Goals (for v1):**
- Fine‑grained per‑file encryption in Git history (can be layered later).
- Large media optimization beyond Git LFS defaults.
## User Stories
1. *As a user*, I connect my GitHub and choose a private backup repo.
2. *As a user*, every change I make in cloud (or via bisync) is **committed** and **pushed** automatically.
3. *As a user*, I can **restore** a file/folder/project to a prior version.
4. *As a power user*, I can **git pull/push** directly to collaborate outside the app.
5. *As an admin*, I can enforce repo ownership (tenant org) and least‑privilege scopes.
## Scope
- **In scope:** Full repo backup of `/app/data/` (all projects) with optional selective subpaths.
- **Out of scope (v1):** Partial shallow mirrors; encrypted Git; cross‑provider SCM (GitLab/Bitbucket).
## Architecture
### Topology
- **Authoritative working tree**: `/app/data/` (bucket mount) remains the source of truth (SPEC‑9).
- **Bare repo** lives alongside: `/app/git/${tenant}/knowledge.git` (server‑side).
- **Mirror remote**: `github.com/<owner>/<repo>.git` (private).
```mermaid
flowchart LR
A[/Users & Agents/] -->|writes/edits| B[/app/data/]
B -->|file events| C[Committer Service]
C -->|git commit| D[(Bare Repo)]
D -->|push| E[(GitHub Private Repo)]
E -->|webhook (push)| F[Puller Service]
F -->|git pull/merge| D
D -->|checkout/merge| B
```
### Services
- **Committer Service** (daemon):
- Watches `/app/data/` for changes (inotify/poll)
- Batches changes (debounce e.g. 2–5s)
- Writes `.bmmeta` (if present) into commit message trailer (see Follow‑Ups)
- `git add -A && git commit -m "chore(sync): <summary>
BM-Meta: <json>"`
- Periodic `git push` to GitHub mirror (configurable interval)
- **Puller Service** (webhook target):
- Receives GitHub webhook (push) → `git fetch`
- **Fast‑forward** merges to `main` only; reject non‑FF unless policy allows
- Applies changes back to `/app/data/` via clean checkout
- Emits sync events for Basic Memory indexers
### Auth & Security
- **GitHub App** (recommended): minimal scopes: `contents:read/write`, `metadata:read`, webhook.
- Tenant‑scoped installation; repo created in user account or tenant org.
- Tokens stored in KMS/secret manager; rotated automatically.
- Optional policy: allow only **FF merges** on `main`; non‑FF requires PR.
### Repo Layout
- **Monorepo** (default): one repo per tenant mirrors `/app/data/` with subfolders per project.
- Optional multi‑repo mode (later): one repo per project.
### File Handling
- Honor `.gitignore` generated from `.bmignore.rclone` + BM defaults (cache, temp, state).
- **Git LFS** for large binaries (images, media) — auto track by extension/size threshold.
- Normalize newline + Unicode (aligns with Follow‑Ups).
### Conflict Model
- **Primary concurrency**: SPEC‑9 Follow‑Ups (`.bmmeta`, conflict copies) stays the first line of defense.
- **Git merges** are a **secondary** mechanism:
- Server only auto‑merges **text** conflicts when trivial (FF or clean 3‑way).
- Otherwise, create `name (conflict from <branch>, <ts>).md` and surface via events.
### Data Flow vs Bisync
- Bisync (rclone) continues between local sync dir ↔ bucket.
- Git sits **cloud‑side** between bucket and GitHub.
- On **pull** from GitHub → files written to `/app/data/` → picked up by indexers & eventually by bisync back to users.
## CLI & UX
New commands (cloud mode):
- `bm cloud git connect` — Launch GitHub App installation; create private repo; store installation id.
- `bm cloud git status` — Show connected repo, last push time, last webhook delivery, pending commits.
- `bm cloud git push` — Manual push (rarely needed).
- `bm cloud git pull` — Manual pull/FF (admin only by default).
- `bm cloud snapshot -m "message"` — Create a tagged point‑in‑time snapshot (git tag).
- `bm restore <path> --to <commit|tag>` — Restore file/folder/project to prior version.
Settings:
- `bm config set git.autoPushInterval=5s`
- `bm config set git.lfs.sizeThreshold=10MB`
- `bm config set git.allowNonFF=false`
## Migration & Backfill
- On connect, if repo empty: initial commit of entire `/app/data/`.
- If repo has content: require **one‑time import** path (clone to staging, reconcile, choose direction).
## Edge Cases
- Massive deletes: gated by SPEC‑9 `max_delete` **and** Git pre‑push hook checks.
- Case changes and rename detection: rely on git rename heuristics + Follow‑Ups move hints.
- Secrets: default ignore common secret patterns; allow custom deny list.
## Telemetry & Observability
- Emit `git_commit`, `git_push`, `git_pull`, `git_conflict` events with correlation IDs.
- `bm sync --report` extended with Git stats (commit count, delta bytes, push latency).
## Phased Plan
### Phase 0 — Prototype (1 sprint)
- Server: bare repo init + simple committer (batch every 10s) + manual GitHub token.
- CLI: `bm cloud git connect --token <PAT>` (dev‑only)
- Success: edits in `/app/data/` appear in GitHub within 30s.
### Phase 1 — GitHub App & Webhooks (1–2 sprints)
- Switch to GitHub App installs; create private repo; store installation id.
- Committer hardened (debounce 2–5s, backoff, retries).
- Puller service with webhook → FF merge → checkout to `/app/data/`.
- LFS auto‑track + `.gitignore` generation.
- CLI surfaces status + logs.
### Phase 2 — Restore & Snapshots (1 sprint)
- `bm restore` for file/folder/project with dry‑run.
- `bm cloud snapshot` tags + list/inspect.
- Policy: PR‑only non‑FF, admin override.
### Phase 3 — Selective & Multi‑Repo (nice‑to‑have)
- Include/exclude projects; optional per‑project repos.
- Advanced policies (branch protections, required reviews).
## Acceptance Criteria
- Changes to `/app/data/` are committed and pushed automatically within configurable interval (default ≤5s).
- GitHub webhook pull results in updated files in `/app/data/` (FF‑only by default).
- LFS configured and functioning; large files don't bloat history.
- `bm cloud git status` shows connected repo and last push/pull times.
- `bm restore` restores a file/folder to a prior commit with a clear audit trail.
- End‑to‑end works alongside SPEC‑9 bisync without loops or data loss.
## Risks & Mitigations
- **Loop risk (Git ↔ Bisync)**: Writes to `/app/data/` → bisync → local → user edits → back again. *Mitigation*: Debounce, commit squashing, idempotent `.bmmeta` versioning, and watch exclusion windows during pull.
- **Repo bloat**: Lots of binary churn. *Mitigation*: default LFS, size threshold, optional media‑only repo later.
- **Security**: Token leakage. *Mitigation*: GitHub App with short‑lived tokens, KMS storage, scoped permissions.
- **Merge complexity**: Non‑trivial conflicts. *Mitigation*: prefer FF; otherwise conflict copies + events; require PR for non‑FF.
## Open Questions
- Do we default to **monorepo** per tenant, or offer project‑per‑repo at connect time?
- Should `restore` write to a branch and open a PR, or directly modify `main`?
- How do we expose Git history in UI (timeline view) without users dropping to CLI?
## Appendix: Sample Config
```json
{
"git": {
"enabled": true,
"repo": "https://github.com/<owner>/<repo>.git",
"autoPushInterval": "5s",
"allowNonFF": false,
"lfs": { "sizeThreshold": 10485760 }
}
}
```
```
--------------------------------------------------------------------------------
/tests/schemas/test_memory_url_validation.py:
--------------------------------------------------------------------------------
```python
"""Tests for memory URL validation functionality."""
import pytest
from pydantic import ValidationError
from basic_memory.schemas.memory import (
normalize_memory_url,
validate_memory_url_path,
memory_url,
)
class TestValidateMemoryUrlPath:
"""Test the validate_memory_url_path function."""
def test_valid_paths(self):
"""Test that valid paths pass validation."""
valid_paths = [
"notes/meeting",
"projects/basic-memory",
"research/findings-2025",
"specs/search",
"docs/api-spec",
"folder/subfolder/note",
"single-note",
"notes/with-hyphens",
"notes/with_underscores",
"notes/with123numbers",
"pattern/*", # Wildcard pattern matching
"deep/*/pattern",
]
for path in valid_paths:
assert validate_memory_url_path(path), f"Path '{path}' should be valid"
def test_invalid_empty_paths(self):
"""Test that empty/whitespace paths fail validation."""
invalid_paths = [
"",
" ",
"\t",
"\n",
" \n ",
]
for path in invalid_paths:
assert not validate_memory_url_path(path), f"Path '{path}' should be invalid"
def test_invalid_double_slashes(self):
"""Test that paths with double slashes fail validation."""
invalid_paths = [
"notes//meeting",
"//root",
"folder//subfolder/note",
"path//with//multiple//doubles",
"memory//test",
]
for path in invalid_paths:
assert not validate_memory_url_path(path), (
f"Path '{path}' should be invalid (double slashes)"
)
def test_invalid_protocol_schemes(self):
"""Test that paths with protocol schemes fail validation."""
invalid_paths = [
"http://example.com",
"https://example.com/path",
"file://local/path",
"ftp://server.com",
"invalid://test",
"custom://scheme",
]
for path in invalid_paths:
assert not validate_memory_url_path(path), (
f"Path '{path}' should be invalid (protocol scheme)"
)
def test_invalid_characters(self):
"""Test that paths with invalid characters fail validation."""
invalid_paths = [
"notes<with>brackets",
'notes"with"quotes',
"notes|with|pipes",
"notes?with?questions",
]
for path in invalid_paths:
assert not validate_memory_url_path(path), (
f"Path '{path}' should be invalid (invalid chars)"
)
class TestNormalizeMemoryUrl:
"""Test the normalize_memory_url function."""
def test_valid_normalization(self):
"""Test that valid URLs are properly normalized."""
test_cases = [
("specs/search", "memory://specs/search"),
("memory://specs/search", "memory://specs/search"),
("notes/meeting-2025", "memory://notes/meeting-2025"),
("memory://notes/meeting-2025", "memory://notes/meeting-2025"),
("pattern/*", "memory://pattern/*"),
("memory://pattern/*", "memory://pattern/*"),
]
for input_url, expected in test_cases:
result = normalize_memory_url(input_url)
assert result == expected, (
f"normalize_memory_url('{input_url}') should return '{expected}', got '{result}'"
)
def test_empty_url(self):
"""Test that empty URLs raise ValueError."""
with pytest.raises(ValueError, match="cannot be empty"):
normalize_memory_url(None)
with pytest.raises(ValueError, match="cannot be empty"):
normalize_memory_url("")
def test_invalid_double_slashes(self):
"""Test that URLs with double slashes raise ValueError."""
invalid_urls = [
"memory//test",
"notes//meeting",
"//root",
"memory://path//with//doubles",
]
for url in invalid_urls:
with pytest.raises(ValueError, match="contains double slashes"):
normalize_memory_url(url)
def test_invalid_protocol_schemes(self):
"""Test that URLs with other protocol schemes raise ValueError."""
invalid_urls = [
"http://example.com",
"https://example.com/path",
"file://local/path",
"invalid://test",
]
for url in invalid_urls:
with pytest.raises(ValueError, match="contains protocol scheme"):
normalize_memory_url(url)
def test_whitespace_only(self):
"""Test that whitespace-only URLs raise ValueError."""
whitespace_urls = [
" ",
"\t",
"\n",
" \n ",
]
for url in whitespace_urls:
with pytest.raises(ValueError, match="cannot be empty or whitespace"):
normalize_memory_url(url)
def test_invalid_characters(self):
"""Test that URLs with invalid characters raise ValueError."""
invalid_urls = [
"notes<brackets>",
'notes"quotes"',
"notes|pipes|",
"notes?questions?",
]
for url in invalid_urls:
with pytest.raises(ValueError, match="contains invalid characters"):
normalize_memory_url(url)
class TestMemoryUrlPydanticValidation:
"""Test the MemoryUrl Pydantic type validation."""
def test_valid_urls_pass_validation(self):
"""Test that valid URLs pass Pydantic validation."""
valid_urls = [
"specs/search",
"memory://specs/search",
"notes/meeting-2025",
"projects/basic-memory/docs",
"pattern/*",
]
for url in valid_urls:
# Should not raise an exception
result = memory_url.validate_python(url)
assert result.startswith("memory://"), (
f"Validated URL should start with memory://, got {result}"
)
def test_invalid_urls_fail_validation(self):
"""Test that invalid URLs fail Pydantic validation with clear errors."""
invalid_test_cases = [
("memory//test", "double slashes"),
("invalid://test", "protocol scheme"),
(" ", "empty or whitespace"),
("notes<brackets>", "invalid characters"),
]
for url, expected_error in invalid_test_cases:
with pytest.raises(ValidationError) as exc_info:
memory_url.validate_python(url)
error_msg = str(exc_info.value)
assert "value_error" in error_msg, f"Should be a value_error for '{url}'"
def test_empty_string_fails_validation(self):
"""Test that empty strings fail validation."""
with pytest.raises(ValidationError, match="cannot be empty"):
memory_url.validate_python("")
def test_very_long_urls_fail_maxlength(self):
"""Test that very long URLs fail MaxLen validation."""
long_url = "a" * 3000 # Exceeds MaxLen(2028)
with pytest.raises(ValidationError, match="at most 2028"):
memory_url.validate_python(long_url)
def test_whitespace_stripped(self):
"""Test that whitespace is properly stripped."""
urls_with_whitespace = [
" specs/search ",
"\tprojects/basic-memory\t",
"\nnotes/meeting\n",
]
for url in urls_with_whitespace:
result = memory_url.validate_python(url)
assert not result.startswith(" ") and not result.endswith(" "), (
f"Whitespace should be stripped from '{url}'"
)
assert "memory://" in result, "Result should contain memory:// prefix"
class TestMemoryUrlErrorMessages:
"""Test that error messages are clear and helpful."""
def test_double_slash_error_message(self):
"""Test specific error message for double slashes."""
with pytest.raises(ValueError) as exc_info:
normalize_memory_url("memory//test")
error_msg = str(exc_info.value)
assert "memory//test" in error_msg
assert "double slashes" in error_msg
def test_protocol_scheme_error_message(self):
"""Test specific error message for protocol schemes."""
with pytest.raises(ValueError) as exc_info:
normalize_memory_url("http://example.com")
error_msg = str(exc_info.value)
assert "http://example.com" in error_msg
assert "protocol scheme" in error_msg
def test_empty_error_message(self):
"""Test specific error message for empty paths."""
with pytest.raises(ValueError) as exc_info:
normalize_memory_url(" ")
error_msg = str(exc_info.value)
assert "empty or whitespace" in error_msg
def test_invalid_characters_error_message(self):
"""Test specific error message for invalid characters."""
with pytest.raises(ValueError) as exc_info:
normalize_memory_url("notes<brackets>")
error_msg = str(exc_info.value)
assert "notes<brackets>" in error_msg
assert "invalid characters" in error_msg
```