This is page 2 of 19. Use http://codebase.md/basicmachines-co/basic-memory?lines=false&page={x} to view the full context.
# Directory Structure
```
├── .claude
│ ├── commands
│ │ ├── release
│ │ │ ├── beta.md
│ │ │ ├── changelog.md
│ │ │ ├── release-check.md
│ │ │ └── release.md
│ │ ├── spec.md
│ │ └── test-live.md
│ └── settings.json
├── .dockerignore
├── .env.example
├── .github
│ ├── dependabot.yml
│ ├── ISSUE_TEMPLATE
│ │ ├── bug_report.md
│ │ ├── config.yml
│ │ ├── documentation.md
│ │ └── feature_request.md
│ └── workflows
│ ├── claude-code-review.yml
│ ├── claude-issue-triage.yml
│ ├── claude.yml
│ ├── dev-release.yml
│ ├── docker.yml
│ ├── pr-title.yml
│ ├── release.yml
│ └── test.yml
├── .gitignore
├── .python-version
├── CHANGELOG.md
├── CITATION.cff
├── CLA.md
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── docker-compose-postgres.yml
├── docker-compose.yml
├── Dockerfile
├── docs
│ ├── ai-assistant-guide-extended.md
│ ├── ARCHITECTURE.md
│ ├── character-handling.md
│ ├── cloud-cli.md
│ ├── Docker.md
│ └── testing-coverage.md
├── justfile
├── LICENSE
├── llms-install.md
├── pyproject.toml
├── README.md
├── SECURITY.md
├── smithery.yaml
├── specs
│ ├── SPEC-1 Specification-Driven Development Process.md
│ ├── SPEC-10 Unified Deployment Workflow and Event Tracking.md
│ ├── SPEC-11 Basic Memory API Performance Optimization.md
│ ├── SPEC-12 OpenTelemetry Observability.md
│ ├── SPEC-13 CLI Authentication with Subscription Validation.md
│ ├── SPEC-14 Cloud Git Versioning & GitHub Backup.md
│ ├── SPEC-14- Cloud Git Versioning & GitHub Backup.md
│ ├── SPEC-15 Configuration Persistence via Tigris for Cloud Tenants.md
│ ├── SPEC-16 MCP Cloud Service Consolidation.md
│ ├── SPEC-17 Semantic Search with ChromaDB.md
│ ├── SPEC-18 AI Memory Management Tool.md
│ ├── SPEC-19 Sync Performance and Memory Optimization.md
│ ├── SPEC-2 Slash Commands Reference.md
│ ├── SPEC-20 Simplified Project-Scoped Rclone Sync.md
│ ├── SPEC-3 Agent Definitions.md
│ ├── SPEC-4 Notes Web UI Component Architecture.md
│ ├── SPEC-5 CLI Cloud Upload via WebDAV.md
│ ├── SPEC-6 Explicit Project Parameter Architecture.md
│ ├── SPEC-7 POC to spike Tigris Turso for local access to cloud data.md
│ ├── SPEC-8 TigrisFS Integration.md
│ ├── SPEC-9 Multi-Project Bidirectional Sync Architecture.md
│ ├── SPEC-9 Signed Header Tenant Information.md
│ └── SPEC-9-1 Follow-Ups- Conflict, Sync, and Observability.md
├── src
│ └── basic_memory
│ ├── __init__.py
│ ├── alembic
│ │ ├── alembic.ini
│ │ ├── env.py
│ │ ├── migrations.py
│ │ ├── script.py.mako
│ │ └── versions
│ │ ├── 314f1ea54dc4_add_postgres_full_text_search_support_.py
│ │ ├── 3dae7c7b1564_initial_schema.py
│ │ ├── 502b60eaa905_remove_required_from_entity_permalink.py
│ │ ├── 5fe1ab1ccebe_add_projects_table.py
│ │ ├── 647e7a75e2cd_project_constraint_fix.py
│ │ ├── 6830751f5fb6_merge_multiple_heads.py
│ │ ├── 9d9c1cb7d8f5_add_mtime_and_size_columns_to_entity_.py
│ │ ├── a1b2c3d4e5f6_fix_project_foreign_keys.py
│ │ ├── a2b3c4d5e6f7_add_search_index_entity_cascade.py
│ │ ├── b3c3938bacdb_relation_to_name_unique_index.py
│ │ ├── cc7172b46608_update_search_index_schema.py
│ │ ├── e7e1f4367280_add_scan_watermark_tracking_to_project.py
│ │ ├── f8a9b2c3d4e5_add_pg_trgm_for_fuzzy_link_resolution.py
│ │ └── g9a0b3c4d5e6_add_external_id_to_project_and_entity.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── app.py
│ │ ├── container.py
│ │ ├── routers
│ │ │ ├── __init__.py
│ │ │ ├── directory_router.py
│ │ │ ├── importer_router.py
│ │ │ ├── knowledge_router.py
│ │ │ ├── management_router.py
│ │ │ ├── memory_router.py
│ │ │ ├── project_router.py
│ │ │ ├── prompt_router.py
│ │ │ ├── resource_router.py
│ │ │ ├── search_router.py
│ │ │ └── utils.py
│ │ ├── template_loader.py
│ │ └── v2
│ │ ├── __init__.py
│ │ └── routers
│ │ ├── __init__.py
│ │ ├── directory_router.py
│ │ ├── importer_router.py
│ │ ├── knowledge_router.py
│ │ ├── memory_router.py
│ │ ├── project_router.py
│ │ ├── prompt_router.py
│ │ ├── resource_router.py
│ │ └── search_router.py
│ ├── cli
│ │ ├── __init__.py
│ │ ├── app.py
│ │ ├── auth.py
│ │ ├── commands
│ │ │ ├── __init__.py
│ │ │ ├── cloud
│ │ │ │ ├── __init__.py
│ │ │ │ ├── api_client.py
│ │ │ │ ├── bisync_commands.py
│ │ │ │ ├── cloud_utils.py
│ │ │ │ ├── core_commands.py
│ │ │ │ ├── rclone_commands.py
│ │ │ │ ├── rclone_config.py
│ │ │ │ ├── rclone_installer.py
│ │ │ │ ├── upload_command.py
│ │ │ │ └── upload.py
│ │ │ ├── command_utils.py
│ │ │ ├── db.py
│ │ │ ├── format.py
│ │ │ ├── import_chatgpt.py
│ │ │ ├── import_claude_conversations.py
│ │ │ ├── import_claude_projects.py
│ │ │ ├── import_memory_json.py
│ │ │ ├── mcp.py
│ │ │ ├── project.py
│ │ │ ├── status.py
│ │ │ ├── telemetry.py
│ │ │ └── tool.py
│ │ ├── container.py
│ │ └── main.py
│ ├── config.py
│ ├── db.py
│ ├── deps
│ │ ├── __init__.py
│ │ ├── config.py
│ │ ├── db.py
│ │ ├── importers.py
│ │ ├── projects.py
│ │ ├── repositories.py
│ │ └── services.py
│ ├── deps.py
│ ├── file_utils.py
│ ├── ignore_utils.py
│ ├── importers
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── chatgpt_importer.py
│ │ ├── claude_conversations_importer.py
│ │ ├── claude_projects_importer.py
│ │ ├── memory_json_importer.py
│ │ └── utils.py
│ ├── markdown
│ │ ├── __init__.py
│ │ ├── entity_parser.py
│ │ ├── markdown_processor.py
│ │ ├── plugins.py
│ │ ├── schemas.py
│ │ └── utils.py
│ ├── mcp
│ │ ├── __init__.py
│ │ ├── async_client.py
│ │ ├── clients
│ │ │ ├── __init__.py
│ │ │ ├── directory.py
│ │ │ ├── knowledge.py
│ │ │ ├── memory.py
│ │ │ ├── project.py
│ │ │ ├── resource.py
│ │ │ └── search.py
│ │ ├── container.py
│ │ ├── project_context.py
│ │ ├── prompts
│ │ │ ├── __init__.py
│ │ │ ├── ai_assistant_guide.py
│ │ │ ├── continue_conversation.py
│ │ │ ├── recent_activity.py
│ │ │ ├── search.py
│ │ │ └── utils.py
│ │ ├── resources
│ │ │ ├── ai_assistant_guide.md
│ │ │ └── project_info.py
│ │ ├── server.py
│ │ └── tools
│ │ ├── __init__.py
│ │ ├── build_context.py
│ │ ├── canvas.py
│ │ ├── chatgpt_tools.py
│ │ ├── delete_note.py
│ │ ├── edit_note.py
│ │ ├── list_directory.py
│ │ ├── move_note.py
│ │ ├── project_management.py
│ │ ├── read_content.py
│ │ ├── read_note.py
│ │ ├── recent_activity.py
│ │ ├── search.py
│ │ ├── utils.py
│ │ ├── view_note.py
│ │ └── write_note.py
│ ├── models
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── knowledge.py
│ │ ├── project.py
│ │ └── search.py
│ ├── project_resolver.py
│ ├── repository
│ │ ├── __init__.py
│ │ ├── entity_repository.py
│ │ ├── observation_repository.py
│ │ ├── postgres_search_repository.py
│ │ ├── project_info_repository.py
│ │ ├── project_repository.py
│ │ ├── relation_repository.py
│ │ ├── repository.py
│ │ ├── search_index_row.py
│ │ ├── search_repository_base.py
│ │ ├── search_repository.py
│ │ └── sqlite_search_repository.py
│ ├── runtime.py
│ ├── schemas
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── cloud.py
│ │ ├── delete.py
│ │ ├── directory.py
│ │ ├── importer.py
│ │ ├── memory.py
│ │ ├── project_info.py
│ │ ├── prompt.py
│ │ ├── request.py
│ │ ├── response.py
│ │ ├── search.py
│ │ ├── sync_report.py
│ │ └── v2
│ │ ├── __init__.py
│ │ ├── entity.py
│ │ └── resource.py
│ ├── services
│ │ ├── __init__.py
│ │ ├── context_service.py
│ │ ├── directory_service.py
│ │ ├── entity_service.py
│ │ ├── exceptions.py
│ │ ├── file_service.py
│ │ ├── initialization.py
│ │ ├── link_resolver.py
│ │ ├── project_service.py
│ │ ├── search_service.py
│ │ └── service.py
│ ├── sync
│ │ ├── __init__.py
│ │ ├── background_sync.py
│ │ ├── coordinator.py
│ │ ├── sync_service.py
│ │ └── watch_service.py
│ ├── telemetry.py
│ ├── templates
│ │ └── prompts
│ │ ├── continue_conversation.hbs
│ │ └── search.hbs
│ └── utils.py
├── test-int
│ ├── BENCHMARKS.md
│ ├── cli
│ │ ├── test_project_commands_integration.py
│ │ └── test_version_integration.py
│ ├── conftest.py
│ ├── mcp
│ │ ├── test_build_context_underscore.py
│ │ ├── test_build_context_validation.py
│ │ ├── test_chatgpt_tools_integration.py
│ │ ├── test_default_project_mode_integration.py
│ │ ├── test_delete_note_integration.py
│ │ ├── test_edit_note_integration.py
│ │ ├── test_lifespan_shutdown_sync_task_cancellation_integration.py
│ │ ├── test_list_directory_integration.py
│ │ ├── test_move_note_integration.py
│ │ ├── test_project_management_integration.py
│ │ ├── test_project_state_sync_integration.py
│ │ ├── test_read_content_integration.py
│ │ ├── test_read_note_integration.py
│ │ ├── test_search_integration.py
│ │ ├── test_single_project_mcp_integration.py
│ │ └── test_write_note_integration.py
│ ├── test_db_wal_mode.py
│ └── test_disable_permalinks_integration.py
├── tests
│ ├── __init__.py
│ ├── api
│ │ ├── conftest.py
│ │ ├── test_api_container.py
│ │ ├── test_async_client.py
│ │ ├── test_continue_conversation_template.py
│ │ ├── test_directory_router.py
│ │ ├── test_importer_router.py
│ │ ├── test_knowledge_router.py
│ │ ├── test_management_router.py
│ │ ├── test_memory_router.py
│ │ ├── test_project_router_operations.py
│ │ ├── test_project_router.py
│ │ ├── test_prompt_router.py
│ │ ├── test_relation_background_resolution.py
│ │ ├── test_resource_router.py
│ │ ├── test_search_router.py
│ │ ├── test_search_template.py
│ │ ├── test_template_loader_helpers.py
│ │ ├── test_template_loader.py
│ │ └── v2
│ │ ├── __init__.py
│ │ ├── conftest.py
│ │ ├── test_directory_router.py
│ │ ├── test_importer_router.py
│ │ ├── test_knowledge_router.py
│ │ ├── test_memory_router.py
│ │ ├── test_project_router.py
│ │ ├── test_prompt_router.py
│ │ ├── test_resource_router.py
│ │ └── test_search_router.py
│ ├── cli
│ │ ├── cloud
│ │ │ ├── test_cloud_api_client_and_utils.py
│ │ │ ├── test_rclone_config_and_bmignore_filters.py
│ │ │ └── test_upload_path.py
│ │ ├── conftest.py
│ │ ├── test_auth_cli_auth.py
│ │ ├── test_cli_container.py
│ │ ├── test_cli_exit.py
│ │ ├── test_cli_tool_exit.py
│ │ ├── test_cli_tools.py
│ │ ├── test_cloud_authentication.py
│ │ ├── test_ignore_utils.py
│ │ ├── test_import_chatgpt.py
│ │ ├── test_import_claude_conversations.py
│ │ ├── test_import_claude_projects.py
│ │ ├── test_import_memory_json.py
│ │ ├── test_project_add_with_local_path.py
│ │ └── test_upload.py
│ ├── conftest.py
│ ├── db
│ │ └── test_issue_254_foreign_key_constraints.py
│ ├── importers
│ │ ├── test_conversation_indexing.py
│ │ ├── test_importer_base.py
│ │ └── test_importer_utils.py
│ ├── markdown
│ │ ├── __init__.py
│ │ ├── test_date_frontmatter_parsing.py
│ │ ├── test_entity_parser_error_handling.py
│ │ ├── test_entity_parser.py
│ │ ├── test_markdown_plugins.py
│ │ ├── test_markdown_processor.py
│ │ ├── test_observation_edge_cases.py
│ │ ├── test_parser_edge_cases.py
│ │ ├── test_relation_edge_cases.py
│ │ └── test_task_detection.py
│ ├── mcp
│ │ ├── clients
│ │ │ ├── __init__.py
│ │ │ └── test_clients.py
│ │ ├── conftest.py
│ │ ├── test_async_client_modes.py
│ │ ├── test_mcp_container.py
│ │ ├── test_obsidian_yaml_formatting.py
│ │ ├── test_permalink_collision_file_overwrite.py
│ │ ├── test_project_context.py
│ │ ├── test_prompts.py
│ │ ├── test_recent_activity_prompt_modes.py
│ │ ├── test_resources.py
│ │ ├── test_server_lifespan_branches.py
│ │ ├── test_tool_build_context.py
│ │ ├── test_tool_canvas.py
│ │ ├── test_tool_delete_note.py
│ │ ├── test_tool_edit_note.py
│ │ ├── test_tool_list_directory.py
│ │ ├── test_tool_move_note.py
│ │ ├── test_tool_project_management.py
│ │ ├── test_tool_read_content.py
│ │ ├── test_tool_read_note.py
│ │ ├── test_tool_recent_activity.py
│ │ ├── test_tool_resource.py
│ │ ├── test_tool_search.py
│ │ ├── test_tool_utils.py
│ │ ├── test_tool_view_note.py
│ │ ├── test_tool_write_note_kebab_filenames.py
│ │ ├── test_tool_write_note.py
│ │ └── tools
│ │ └── test_chatgpt_tools.py
│ ├── Non-MarkdownFileSupport.pdf
│ ├── README.md
│ ├── repository
│ │ ├── test_entity_repository_upsert.py
│ │ ├── test_entity_repository.py
│ │ ├── test_entity_upsert_issue_187.py
│ │ ├── test_observation_repository.py
│ │ ├── test_postgres_search_repository.py
│ │ ├── test_project_info_repository.py
│ │ ├── test_project_repository.py
│ │ ├── test_relation_repository.py
│ │ ├── test_repository.py
│ │ ├── test_search_repository_edit_bug_fix.py
│ │ └── test_search_repository.py
│ ├── schemas
│ │ ├── test_base_timeframe_minimum.py
│ │ ├── test_memory_serialization.py
│ │ ├── test_memory_url_validation.py
│ │ ├── test_memory_url.py
│ │ ├── test_relation_response_reference_resolution.py
│ │ ├── test_schemas.py
│ │ └── test_search.py
│ ├── Screenshot.png
│ ├── services
│ │ ├── test_context_service.py
│ │ ├── test_directory_service.py
│ │ ├── test_entity_service_disable_permalinks.py
│ │ ├── test_entity_service.py
│ │ ├── test_file_service.py
│ │ ├── test_initialization_cloud_mode_branches.py
│ │ ├── test_initialization.py
│ │ ├── test_link_resolver.py
│ │ ├── test_project_removal_bug.py
│ │ ├── test_project_service_operations.py
│ │ ├── test_project_service.py
│ │ └── test_search_service.py
│ ├── sync
│ │ ├── test_character_conflicts.py
│ │ ├── test_coordinator.py
│ │ ├── test_sync_service_incremental.py
│ │ ├── test_sync_service.py
│ │ ├── test_sync_wikilink_issue.py
│ │ ├── test_tmp_files.py
│ │ ├── test_watch_service_atomic_adds.py
│ │ ├── test_watch_service_edge_cases.py
│ │ ├── test_watch_service_reload.py
│ │ └── test_watch_service.py
│ ├── test_config.py
│ ├── test_deps.py
│ ├── test_production_cascade_delete.py
│ ├── test_project_resolver.py
│ ├── test_rclone_commands.py
│ ├── test_runtime.py
│ ├── test_telemetry.py
│ └── utils
│ ├── test_file_utils.py
│ ├── test_frontmatter_obsidian_compatible.py
│ ├── test_parse_tags.py
│ ├── test_permalink_formatting.py
│ ├── test_timezone_utils.py
│ ├── test_utf8_handling.py
│ └── test_validate_project_path.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/src/basic_memory/api/routers/management_router.py:
--------------------------------------------------------------------------------
```python
"""Management router for basic-memory API."""
import asyncio
from fastapi import APIRouter, Request
from loguru import logger
from pydantic import BaseModel
from basic_memory.config import ConfigManager
from basic_memory.deps import SyncServiceDep, ProjectRepositoryDep
router = APIRouter(prefix="/management", tags=["management"])
class WatchStatusResponse(BaseModel):
"""Response model for watch status."""
running: bool
"""Whether the watch service is currently running."""
@router.get("/watch/status", response_model=WatchStatusResponse)
async def get_watch_status(request: Request) -> WatchStatusResponse:
"""Get the current status of the watch service."""
return WatchStatusResponse(
running=request.app.state.watch_task is not None and not request.app.state.watch_task.done()
)
@router.post("/watch/start", response_model=WatchStatusResponse)
async def start_watch_service(
request: Request, project_repository: ProjectRepositoryDep, sync_service: SyncServiceDep
) -> WatchStatusResponse:
"""Start the watch service if it's not already running."""
# needed because of circular imports from sync -> app
from basic_memory.sync import WatchService
from basic_memory.sync.background_sync import create_background_sync_task
if request.app.state.watch_task is not None and not request.app.state.watch_task.done():
# Watch service is already running
return WatchStatusResponse(running=True)
app_config = ConfigManager().config
# Create and start a new watch service
logger.info("Starting watch service via management API")
# Get services needed for the watch task
watch_service = WatchService(
app_config=app_config,
project_repository=project_repository,
)
# Create and store the task
watch_task = create_background_sync_task(sync_service, watch_service)
request.app.state.watch_task = watch_task
return WatchStatusResponse(running=True)
@router.post("/watch/stop", response_model=WatchStatusResponse)
async def stop_watch_service(request: Request) -> WatchStatusResponse: # pragma: no cover
"""Stop the watch service if it's running."""
if request.app.state.watch_task is None or request.app.state.watch_task.done():
# Watch service is not running
return WatchStatusResponse(running=False)
# Cancel the running task
logger.info("Stopping watch service via management API")
request.app.state.watch_task.cancel()
# Wait for it to be properly cancelled
try:
await request.app.state.watch_task
except asyncio.CancelledError:
pass
request.app.state.watch_task = None
return WatchStatusResponse(running=False)
```
--------------------------------------------------------------------------------
/tests/sync/test_sync_wikilink_issue.py:
--------------------------------------------------------------------------------
```python
"""Test for issue #72 - notes with wikilinks staying in modified status."""
from pathlib import Path
import pytest
from basic_memory.sync.sync_service import SyncService
async def create_test_file(path: Path, content: str) -> None:
"""Create a test file with given content."""
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content)
async def force_full_scan(sync_service: SyncService) -> None:
"""Force next sync to do a full scan by clearing watermark (for testing moves/deletions)."""
if sync_service.entity_repository.project_id is not None:
project = await sync_service.project_repository.find_by_id(
sync_service.entity_repository.project_id
)
if project:
await sync_service.project_repository.update(
project.id,
{
"last_scan_timestamp": None,
"last_file_count": None,
},
)
@pytest.mark.asyncio
async def test_wikilink_modified_status_issue(sync_service: SyncService, project_config):
"""Test that files with wikilinks don't remain in modified status after sync."""
project_dir = project_config.home
# Create a file with a wikilink
content = """---
title: Test Wikilink
type: note
---
# Test File
This file contains a wikilink to [[another-file]].
"""
test_file_path = project_dir / "test_wikilink.md"
await create_test_file(test_file_path, content)
# Initial sync
report1 = await sync_service.sync(project_config.home)
assert "test_wikilink.md" in report1.new
assert "test_wikilink.md" not in report1.modified
# Sync again without changing the file - should not be modified
report2 = await sync_service.sync(project_config.home)
assert "test_wikilink.md" not in report2.new
assert "test_wikilink.md" not in report2.modified
# Create the target file
target_content = """---
title: Another File
type: note
---
# Another File
This is the target file.
"""
target_file_path = project_dir / "another_file.md"
await create_test_file(target_file_path, target_content)
# Force full scan to detect the new file
# (file just created may not be newer than watermark due to timing precision)
await force_full_scan(sync_service)
# Sync again after adding target file
report3 = await sync_service.sync(project_config.home)
assert "another_file.md" in report3.new
assert "test_wikilink.md" not in report3.modified
# Sync one more time - both files should now be stable
report4 = await sync_service.sync(project_config.home)
assert "test_wikilink.md" not in report4.modified
assert "another_file.md" not in report4.modified
```
--------------------------------------------------------------------------------
/tests/mcp/test_async_client_modes.py:
--------------------------------------------------------------------------------
```python
from contextlib import asynccontextmanager
import httpx
import pytest
from basic_memory.cli.auth import CLIAuth
from basic_memory.mcp import async_client as async_client_module
from basic_memory.mcp.async_client import get_client, set_client_factory
@pytest.fixture(autouse=True)
def _reset_async_client_factory():
async_client_module._client_factory = None
yield
async_client_module._client_factory = None
@pytest.mark.asyncio
async def test_get_client_uses_injected_factory(monkeypatch):
seen = {"used": False}
@asynccontextmanager
async def factory():
seen["used"] = True
async with httpx.AsyncClient(base_url="https://example.test") as client:
yield client
# Ensure we don't leak factory to other tests
set_client_factory(factory)
async with get_client() as client:
assert str(client.base_url) == "https://example.test"
assert seen["used"] is True
@pytest.mark.asyncio
async def test_get_client_cloud_mode_injects_auth_header(config_manager, config_home):
cfg = config_manager.load_config()
cfg.cloud_mode = True
cfg.cloud_host = "https://cloud.example.test"
cfg.cloud_client_id = "cid"
cfg.cloud_domain = "https://auth.example.test"
config_manager.save_config(cfg)
# Write token for CLIAuth so get_client() can authenticate without network
auth = CLIAuth(client_id=cfg.cloud_client_id, authkit_domain=cfg.cloud_domain)
auth.token_file.parent.mkdir(parents=True, exist_ok=True)
auth.token_file.write_text(
'{"access_token":"token-123","refresh_token":null,"expires_at":9999999999,"token_type":"Bearer"}',
encoding="utf-8",
)
async with get_client() as client:
assert str(client.base_url).rstrip("/") == "https://cloud.example.test/proxy"
assert client.headers.get("Authorization") == "Bearer token-123"
@pytest.mark.asyncio
async def test_get_client_cloud_mode_raises_when_not_authenticated(config_manager):
cfg = config_manager.load_config()
cfg.cloud_mode = True
cfg.cloud_host = "https://cloud.example.test"
cfg.cloud_client_id = "cid"
cfg.cloud_domain = "https://auth.example.test"
config_manager.save_config(cfg)
# No token file written -> should raise
with pytest.raises(RuntimeError, match="Cloud mode enabled but not authenticated"):
async with get_client():
pass
@pytest.mark.asyncio
async def test_get_client_local_mode_uses_asgi_transport(config_manager):
cfg = config_manager.load_config()
cfg.cloud_mode = False
config_manager.save_config(cfg)
async with get_client() as client:
# httpx stores ASGITransport privately, but we can still sanity-check type
assert isinstance(client._transport, httpx.ASGITransport) # pyright: ignore[reportPrivateUsage]
```
--------------------------------------------------------------------------------
/src/basic_memory/api/routers/directory_router.py:
--------------------------------------------------------------------------------
```python
"""Router for directory tree operations."""
from typing import List, Optional
from fastapi import APIRouter, Query
from basic_memory.deps import DirectoryServiceDep, ProjectIdDep
from basic_memory.schemas.directory import DirectoryNode
router = APIRouter(prefix="/directory", tags=["directory"])
@router.get("/tree", response_model=DirectoryNode, response_model_exclude_none=True)
async def get_directory_tree(
directory_service: DirectoryServiceDep,
project_id: ProjectIdDep,
):
"""Get hierarchical directory structure from the knowledge base.
Args:
directory_service: Service for directory operations
project_id: ID of the current project
Returns:
DirectoryNode representing the root of the hierarchical tree structure
"""
# Get a hierarchical directory tree for the specific project
tree = await directory_service.get_directory_tree()
# Return the hierarchical tree
return tree
@router.get("/structure", response_model=DirectoryNode, response_model_exclude_none=True)
async def get_directory_structure(
directory_service: DirectoryServiceDep,
project_id: ProjectIdDep,
):
"""Get folder structure for navigation (no files).
Optimized endpoint for folder tree navigation. Returns only directory nodes
without file metadata. For full tree with files, use /directory/tree.
Args:
directory_service: Service for directory operations
project_id: ID of the current project
Returns:
DirectoryNode tree containing only folders (type="directory")
"""
structure = await directory_service.get_directory_structure()
return structure
@router.get("/list", response_model=List[DirectoryNode], response_model_exclude_none=True)
async def list_directory(
directory_service: DirectoryServiceDep,
project_id: ProjectIdDep,
dir_name: str = Query("/", description="Directory path to list"),
depth: int = Query(1, ge=1, le=10, description="Recursion depth (1-10)"),
file_name_glob: Optional[str] = Query(
None, description="Glob pattern for filtering file names"
),
):
"""List directory contents with filtering and depth control.
Args:
directory_service: Service for directory operations
project_id: ID of the current project
dir_name: Directory path to list (default: root "/")
depth: Recursion depth (1-10, default: 1 for immediate children only)
file_name_glob: Optional glob pattern for filtering file names (e.g., "*.md", "*meeting*")
Returns:
List of DirectoryNode objects matching the criteria
"""
# Get directory listing with filtering
nodes = await directory_service.list_directory(
dir_name=dir_name,
depth=depth,
file_name_glob=file_name_glob,
)
return nodes
```
--------------------------------------------------------------------------------
/src/basic_memory/cli/commands/mcp.py:
--------------------------------------------------------------------------------
```python
"""MCP server command with streamable HTTP transport."""
import os
import typer
from typing import Optional
from basic_memory.cli.app import app
from basic_memory.config import ConfigManager, init_mcp_logging
# Import mcp instance (has lifespan that handles initialization and file sync)
from basic_memory.mcp.server import mcp as mcp_server # pragma: no cover
# Import mcp tools to register them
import basic_memory.mcp.tools # noqa: F401 # pragma: no cover
# Import prompts to register them
import basic_memory.mcp.prompts # noqa: F401 # pragma: no cover
from loguru import logger
config = ConfigManager().config
if not config.cloud_mode_enabled:
@app.command()
def mcp(
transport: str = typer.Option(
"stdio", help="Transport type: stdio, streamable-http, or sse"
),
host: str = typer.Option(
"0.0.0.0", help="Host for HTTP transports (use 0.0.0.0 to allow external connections)"
),
port: int = typer.Option(8000, help="Port for HTTP transports"),
path: str = typer.Option("/mcp", help="Path prefix for streamable-http transport"),
project: Optional[str] = typer.Option(None, help="Restrict MCP server to single project"),
): # pragma: no cover
"""Run the MCP server with configurable transport options.
This command starts an MCP server using one of three transport options:
- stdio: Standard I/O (good for local usage)
- streamable-http: Recommended for web deployments (default)
- sse: Server-Sent Events (for compatibility with existing clients)
Initialization, file sync, and cleanup are handled by the MCP server's lifespan.
"""
# Initialize logging for MCP (file only, stdout breaks protocol)
init_mcp_logging()
# Validate and set project constraint if specified
if project:
config_manager = ConfigManager()
project_name, _ = config_manager.get_project(project)
if not project_name:
typer.echo(f"No project found named: {project}", err=True)
raise typer.Exit(1)
# Set env var with validated project name
os.environ["BASIC_MEMORY_MCP_PROJECT"] = project_name
logger.info(f"MCP server constrained to project: {project_name}")
# Run the MCP server (blocks)
# Lifespan handles: initialization, migrations, file sync, cleanup
logger.info(f"Starting MCP server with {transport.upper()} transport")
if transport == "stdio":
mcp_server.run(
transport=transport,
)
elif transport == "streamable-http" or transport == "sse":
mcp_server.run(
transport=transport,
host=host,
port=port,
path=path,
log_level="INFO",
)
```
--------------------------------------------------------------------------------
/specs/SPEC-2 Slash Commands Reference.md:
--------------------------------------------------------------------------------
```markdown
---
title: 'SPEC-2: Slash Commands Reference'
type: spec
permalink: specs/spec-2-slash-commands-reference
tags:
- commands
- process
- reference
---
# SPEC-2: Slash Commands Reference
This document defines the slash commands used in our specification-driven development process.
## /spec create [name]
**Purpose**: Create a new specification document
**Usage**: `/spec create notes-decomposition`
**Process**:
1. Create new spec document in `/specs` folder
2. Use SPEC-XXX numbering format (auto-increment)
3. Include standard spec template:
- Why (reasoning/problem)
- What (affected areas)
- How (high-level approach)
- How to Evaluate (testing/validation)
4. Tag appropriately for knowledge graph
5. Link to related specs/components
**Template**:
```markdown
# SPEC-XXX: [Title]
## Why
[Problem statement and reasoning]
## What
[What is affected or changed]
## How (High Level)
[Approach to implementation]
## How to Evaluate
[Testing/validation procedure]
## Notes
[Additional context as needed]
```
## /spec status
**Purpose**: Show current status of all specifications
**Usage**: `/spec status`
**Process**:
1. Search all specs in `/specs` folder
2. Display table showing:
- Spec number and title
- Status (draft, approved, implementing, complete)
- Assigned agent (if any)
- Last updated
- Dependencies
## /spec implement [name]
**Purpose**: Hand specification to appropriate agent for implementation
**Usage**: `/spec implement SPEC-002`
**Process**:
1. Read the specified spec
2. Analyze requirements to determine appropriate agent:
- Frontend components → vue-developer
- Architecture/system design → system-architect
- Backend/API → python-developer
3. Launch agent with spec context
4. Agent creates implementation plan
5. Update spec with implementation status
## /spec review [name]
**Purpose**: Review implementation against specification criteria
**Usage**: `/spec review SPEC-002`
**Process**:
1. Read original spec and "How to Evaluate" section
2. Examine current implementation
3. Test against success criteria
4. Document gaps or issues
5. Update spec with review results
6. Recommend next actions (complete, revise, iterate)
## Command Extensions
As the process evolves, we may add:
- `/spec link [spec1] [spec2]` - Create dependency links
- `/spec archive [name]` - Archive completed specs
- `/spec template [type]` - Create spec from template
- `/spec search [query]` - Search spec content
## References
- Claude Slash commands: https://docs.anthropic.com/en/docs/claude-code/slash-commands
## Creating a command
Commands are implemented as Claude slash commands:
Location in repo: .claude/commands/
In the following example, we create the /optimize command:
```bash
# Create a project command
mkdir -p .claude/commands
echo "Analyze this code for performance issues and suggest optimizations:" > .claude/commands/optimize.md
```
```
--------------------------------------------------------------------------------
/src/basic_memory/repository/observation_repository.py:
--------------------------------------------------------------------------------
```python
"""Repository for managing Observation objects."""
from typing import Dict, List, Sequence
from sqlalchemy import select
from sqlalchemy.ext.asyncio import async_sessionmaker
from basic_memory.models import Observation
from basic_memory.repository.repository import Repository
class ObservationRepository(Repository[Observation]):
"""Repository for Observation model with memory-specific operations."""
def __init__(self, session_maker: async_sessionmaker, project_id: int):
"""Initialize with session maker and project_id filter.
Args:
session_maker: SQLAlchemy session maker
project_id: Project ID to filter all operations by
"""
super().__init__(session_maker, Observation, project_id=project_id)
async def find_by_entity(self, entity_id: int) -> Sequence[Observation]:
"""Find all observations for a specific entity."""
query = select(Observation).filter(Observation.entity_id == entity_id)
result = await self.execute_query(query)
return result.scalars().all()
async def find_by_context(self, context: str) -> Sequence[Observation]:
"""Find observations with a specific context."""
query = select(Observation).filter(Observation.context == context)
result = await self.execute_query(query)
return result.scalars().all()
async def find_by_category(self, category: str) -> Sequence[Observation]:
"""Find observations with a specific context."""
query = select(Observation).filter(Observation.category == category)
result = await self.execute_query(query)
return result.scalars().all()
async def observation_categories(self) -> Sequence[str]:
"""Return a list of all observation categories."""
query = select(Observation.category).distinct()
result = await self.execute_query(query, use_query_options=False)
return result.scalars().all()
async def find_by_entities(self, entity_ids: List[int]) -> Dict[int, List[Observation]]:
"""Find all observations for multiple entities in a single query.
Args:
entity_ids: List of entity IDs to fetch observations for
Returns:
Dictionary mapping entity_id to list of observations
"""
if not entity_ids: # pragma: no cover
return {}
# Query observations for all entities in the list
query = select(Observation).filter(Observation.entity_id.in_(entity_ids))
result = await self.execute_query(query)
observations = result.scalars().all()
# Group observations by entity_id
observations_by_entity = {}
for obs in observations:
if obs.entity_id not in observations_by_entity:
observations_by_entity[obs.entity_id] = []
observations_by_entity[obs.entity_id].append(obs)
return observations_by_entity
```
--------------------------------------------------------------------------------
/src/basic_memory/cli/app.py:
--------------------------------------------------------------------------------
```python
# Suppress Logfire "not configured" warning - we only use Logfire in cloud/server contexts
import os
os.environ.setdefault("LOGFIRE_IGNORE_NO_CONFIG", "1")
# Remove loguru's default handler IMMEDIATELY, before any other imports.
# This prevents DEBUG logs from appearing on stdout during module-level
# initialization (e.g., template_loader.TemplateLoader() logs at DEBUG level).
from loguru import logger
logger.remove()
from typing import Optional # noqa: E402
import typer # noqa: E402
from basic_memory.cli.container import CliContainer, set_container # noqa: E402
from basic_memory.config import init_cli_logging # noqa: E402
from basic_memory.telemetry import show_notice_if_needed, track_app_started # noqa: E402
def version_callback(value: bool) -> None:
"""Show version and exit."""
if value: # pragma: no cover
import basic_memory
typer.echo(f"Basic Memory version: {basic_memory.__version__}")
raise typer.Exit()
app = typer.Typer(name="basic-memory")
@app.callback()
def app_callback(
ctx: typer.Context,
version: Optional[bool] = typer.Option(
None,
"--version",
"-v",
help="Show version and exit.",
callback=version_callback,
is_eager=True,
),
) -> None:
"""Basic Memory - Local-first personal knowledge management."""
# Initialize logging for CLI (file only, no stdout)
init_cli_logging()
# --- Composition Root ---
# Create container and read config (single point of config access)
container = CliContainer.create()
set_container(container)
# Show telemetry notice and track CLI startup
# Skip for 'mcp' command - it handles its own telemetry in lifespan
# Skip for 'telemetry' command - avoid issues when user is managing telemetry
if ctx.invoked_subcommand not in {"mcp", "telemetry"}:
show_notice_if_needed()
track_app_started("cli")
# Run initialization for commands that don't use the API
# Skip for 'mcp' command - it has its own lifespan that handles initialization
# Skip for API-using commands (status, sync, etc.) - they handle initialization via deps.py
# Skip for 'reset' command - it manages its own database lifecycle
skip_init_commands = {"mcp", "status", "sync", "project", "tool", "reset"}
if (
not version
and ctx.invoked_subcommand is not None
and ctx.invoked_subcommand not in skip_init_commands
):
from basic_memory.services.initialization import ensure_initialization
ensure_initialization(container.config)
## import
# Register sub-command groups
import_app = typer.Typer(help="Import data from various sources")
app.add_typer(import_app, name="import")
claude_app = typer.Typer(help="Import Conversations from Claude JSON export.")
import_app.add_typer(claude_app, name="claude")
## cloud
cloud_app = typer.Typer(help="Access Basic Memory Cloud")
app.add_typer(cloud_app, name="cloud")
```
--------------------------------------------------------------------------------
/src/basic_memory/api/routers/memory_router.py:
--------------------------------------------------------------------------------
```python
"""Routes for memory:// URI operations."""
from typing import Annotated, Optional
from fastapi import APIRouter, Query
from loguru import logger
from basic_memory.deps import ContextServiceDep, EntityRepositoryDep
from basic_memory.schemas.base import TimeFrame, parse_timeframe
from basic_memory.schemas.memory import (
GraphContext,
normalize_memory_url,
)
from basic_memory.schemas.search import SearchItemType
from basic_memory.api.routers.utils import to_graph_context
router = APIRouter(prefix="/memory", tags=["memory"])
@router.get("/recent", response_model=GraphContext)
async def recent(
context_service: ContextServiceDep,
entity_repository: EntityRepositoryDep,
type: Annotated[list[SearchItemType] | None, Query()] = None,
depth: int = 1,
timeframe: TimeFrame = "7d",
page: int = 1,
page_size: int = 10,
max_related: int = 10,
) -> GraphContext:
# return all types by default
types = (
[SearchItemType.ENTITY, SearchItemType.RELATION, SearchItemType.OBSERVATION]
if not type
else type
)
logger.debug(
f"Getting recent context: `{types}` depth: `{depth}` timeframe: `{timeframe}` page: `{page}` page_size: `{page_size}` max_related: `{max_related}`"
)
# Parse timeframe
since = parse_timeframe(timeframe)
limit = page_size
offset = (page - 1) * page_size
# Build context
context = await context_service.build_context(
types=types, depth=depth, since=since, limit=limit, offset=offset, max_related=max_related
)
recent_context = await to_graph_context(
context, entity_repository=entity_repository, page=page, page_size=page_size
)
logger.debug(f"Recent context: {recent_context.model_dump_json()}")
return recent_context
# get_memory_context needs to be declared last so other paths can match
@router.get("/{uri:path}", response_model=GraphContext)
async def get_memory_context(
context_service: ContextServiceDep,
entity_repository: EntityRepositoryDep,
uri: str,
depth: int = 1,
timeframe: Optional[TimeFrame] = None,
page: int = 1,
page_size: int = 10,
max_related: int = 10,
) -> GraphContext:
"""Get rich context from memory:// URI."""
# add the project name from the config to the url as the "host
# Parse URI
logger.debug(
f"Getting context for URI: `{uri}` depth: `{depth}` timeframe: `{timeframe}` page: `{page}` page_size: `{page_size}` max_related: `{max_related}`"
)
memory_url = normalize_memory_url(uri)
# Parse timeframe
since = parse_timeframe(timeframe) if timeframe else None
limit = page_size
offset = (page - 1) * page_size
# Build context
context = await context_service.build_context(
memory_url, depth=depth, since=since, limit=limit, offset=offset, max_related=max_related
)
return await to_graph_context(
context, entity_repository=entity_repository, page=page, page_size=page_size
)
```
--------------------------------------------------------------------------------
/.github/workflows/claude.yml:
--------------------------------------------------------------------------------
```yaml
name: Claude Code
on:
issue_comment:
types: [created]
pull_request_review_comment:
types: [created]
issues:
types: [opened, assigned]
pull_request_review:
types: [submitted]
pull_request_target:
types: [opened, synchronize]
jobs:
claude:
if: |
(
(github.event_name == 'issue_comment' && contains(github.event.comment.body, '@claude')) ||
(github.event_name == 'pull_request_review_comment' && contains(github.event.comment.body, '@claude')) ||
(github.event_name == 'pull_request_review' && contains(github.event.review.body, '@claude')) ||
(github.event_name == 'issues' && (contains(github.event.issue.body, '@claude') || contains(github.event.issue.title, '@claude'))) ||
(github.event_name == 'pull_request_target' && contains(github.event.pull_request.body, '@claude'))
) && (
github.event.comment.author_association == 'OWNER' ||
github.event.comment.author_association == 'MEMBER' ||
github.event.comment.author_association == 'COLLABORATOR' ||
github.event.sender.author_association == 'OWNER' ||
github.event.sender.author_association == 'MEMBER' ||
github.event.sender.author_association == 'COLLABORATOR' ||
github.event.pull_request.author_association == 'OWNER' ||
github.event.pull_request.author_association == 'MEMBER' ||
github.event.pull_request.author_association == 'COLLABORATOR'
)
runs-on: ubuntu-latest
permissions:
contents: read
pull-requests: read
issues: read
id-token: write
actions: read # Required for Claude to read CI results on PRs
steps:
- name: Checkout repository
uses: actions/checkout@v4
with:
# For pull_request_target, checkout the PR head to review the actual changes
ref: ${{ github.event_name == 'pull_request_target' && github.event.pull_request.head.sha || github.sha }}
fetch-depth: 1
- name: Run Claude Code
id: claude
uses: anthropics/claude-code-action@v1
with:
claude_code_oauth_token: ${{ secrets.CLAUDE_CODE_OAUTH_TOKEN }}
track_progress: true # Enable visual progress tracking
# This is an optional setting that allows Claude to read CI results on PRs
additional_permissions: |
actions: read
# Optional: Give a custom prompt to Claude. If this is not specified, Claude will perform the instructions specified in the comment that tagged it.
# prompt: 'Update the pull request description to include a summary of changes.'
# Optional: Add claude_args to customize behavior and configuration
# See https://github.com/anthropics/claude-code-action/blob/main/docs/usage.md
# or https://docs.claude.com/en/docs/claude-code/sdk#command-line for available options
# claude_args: '--model claude-opus-4-1-20250805 --allowed-tools Bash(gh pr:*)'
```
--------------------------------------------------------------------------------
/specs/SPEC-3 Agent Definitions.md:
--------------------------------------------------------------------------------
```markdown
---
title: 'SPEC-3: Agent Definitions'
type: spec
permalink: specs/spec-3-agent-definitions
tags:
- agents
- roles
- process
---
# SPEC-3: Agent Definitions
This document defines the specialist agents used in our specification-driven development process.
## system-architect
**Role**: High-level system design and architectural decisions
**Responsibilities**:
- Create architectural specifications and ADRs
- Analyze system-wide impacts and trade-offs
- Design component interfaces and data flow
- Evaluate technical approaches and patterns
- Document architectural decisions and rationale
**Expertise Areas**:
- System architecture and design patterns
- Technology evaluation and selection
- Scalability and performance considerations
- Integration patterns and API design
- Technical debt and refactoring strategies
**Typical Specs**:
- System architecture overviews
- Component decomposition strategies
- Data flow and state management
- Integration and deployment patterns
## vue-developer
**Role**: Frontend component development and UI implementation
**Responsibilities**:
- Create Vue.js component specifications
- Implement responsive UI components
- Design component APIs and interfaces
- Optimize for performance and accessibility
- Document component usage and patterns
**Expertise Areas**:
- Vue.js 3 Composition API
- Nuxt 3 framework patterns
- shadcn-vue component library
- Responsive design and CSS
- TypeScript integration
- State management with Pinia
**Typical Specs**:
- Individual component specifications
- UI pattern libraries
- Responsive design approaches
- Component interaction flows
## python-developer
**Role**: Backend development and API implementation
**Responsibilities**:
- Create backend service specifications
- Implement APIs and data processing
- Design database schemas and queries
- Optimize performance and reliability
- Document service interfaces and behavior
**Expertise Areas**:
- FastAPI and Python web frameworks
- Database design and operations
- API design and documentation
- Authentication and security
- Performance optimization
- Testing and validation
**Typical Specs**:
- API endpoint specifications
- Database schema designs
- Service integration patterns
- Performance optimization strategies
## Agent Collaboration Patterns
### Handoff Protocol
1. Agent receives spec through `/spec implement [name]`
2. Agent reviews spec and creates implementation plan
3. Agent documents progress and decisions in spec
4. Agent hands off to another agent if cross-domain work needed
5. Final agent updates spec with completion status
### Communication Standards
- All agents update specs through basic-memory MCP tools
- Document decisions and trade-offs in spec notes
- Link related specs and components
- Preserve context for future reference
### Quality Standards
- Follow existing codebase patterns and conventions
- Write tests that validate spec requirements
- Document implementation choices
- Consider maintainability and extensibility
```
--------------------------------------------------------------------------------
/src/basic_memory/repository/search_index_row.py:
--------------------------------------------------------------------------------
```python
"""Search index data structures."""
import json
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
from pathlib import Path
from basic_memory.schemas.search import SearchItemType
@dataclass
class SearchIndexRow:
"""Search result with score and metadata."""
project_id: int
id: int
type: str
file_path: str
# date values
created_at: datetime
updated_at: datetime
permalink: Optional[str] = None
metadata: Optional[dict] = None
# assigned in result
score: Optional[float] = None
# Type-specific fields
title: Optional[str] = None # entity
content_stems: Optional[str] = None # entity, observation
content_snippet: Optional[str] = None # entity, observation
entity_id: Optional[int] = None # observations
category: Optional[str] = None # observations
from_id: Optional[int] = None # relations
to_id: Optional[int] = None # relations
relation_type: Optional[str] = None # relations
@property
def content(self):
return self.content_snippet
@property
def directory(self) -> str:
"""Extract directory part from file_path.
For a file at "projects/notes/ideas.md", returns "/projects/notes"
For a file at root level "README.md", returns "/"
"""
if not self.type == SearchItemType.ENTITY.value and not self.file_path:
return ""
# Normalize path separators to handle both Windows (\) and Unix (/) paths
normalized_path = Path(self.file_path).as_posix()
# Split the path by slashes
parts = normalized_path.split("/")
# If there's only one part (e.g., "README.md"), it's at the root
if len(parts) <= 1:
return "/"
# Join all parts except the last one (filename)
directory_path = "/".join(parts[:-1])
return f"/{directory_path}"
def to_insert(self, serialize_json: bool = True):
"""Convert to dict for database insertion.
Args:
serialize_json: If True, converts metadata dict to JSON string (for SQLite).
If False, keeps metadata as dict (for Postgres JSONB).
"""
return {
"id": self.id,
"title": self.title,
"content_stems": self.content_stems,
"content_snippet": self.content_snippet,
"permalink": self.permalink,
"file_path": self.file_path,
"type": self.type,
"metadata": json.dumps(self.metadata)
if serialize_json and self.metadata
else self.metadata,
"from_id": self.from_id,
"to_id": self.to_id,
"relation_type": self.relation_type,
"entity_id": self.entity_id,
"category": self.category,
"created_at": self.created_at if self.created_at else None,
"updated_at": self.updated_at if self.updated_at else None,
"project_id": self.project_id,
}
```
--------------------------------------------------------------------------------
/.claude/commands/release/beta.md:
--------------------------------------------------------------------------------
```markdown
# /beta - Create Beta Release
Create a new beta release using the automated justfile target with quality checks and tagging.
## Usage
```
/beta <version>
```
**Parameters:**
- `version` (required): Beta version like `v0.13.2b1` or `v0.13.2rc1`
## Implementation
You are an expert release manager for the Basic Memory project. When the user runs `/beta`, execute the following steps:
### Step 1: Pre-flight Validation
1. Verify version format matches `v\d+\.\d+\.\d+(b\d+|rc\d+)` pattern
2. Check current git status for uncommitted changes
3. Verify we're on the `main` branch
4. Confirm no existing tag with this version
### Step 2: Use Justfile Automation
Execute the automated beta release process:
```bash
just beta <version>
```
The justfile target handles:
- ✅ Beta version format validation (supports b1, b2, rc1, etc.)
- ✅ Git status and branch checks
- ✅ Quality checks (`just check` - lint, format, type-check, tests)
- ✅ Version update in `src/basic_memory/__init__.py`
- ✅ Automatic commit with proper message
- ✅ Tag creation and pushing to GitHub
- ✅ Beta release workflow trigger
### Step 3: Monitor Beta Release
1. Check GitHub Actions workflow starts successfully
2. Monitor workflow at: https://github.com/basicmachines-co/basic-memory/actions
3. Verify PyPI pre-release publication
4. Test beta installation: `uv tool install basic-memory --pre`
### Step 4: Beta Testing Instructions
Provide users with beta testing instructions:
```bash
# Install/upgrade to beta
uv tool install basic-memory --pre
# Or upgrade existing installation
uv tool upgrade basic-memory --prerelease=allow
```
## Version Guidelines
- **First beta**: `v0.13.2b1`
- **Subsequent betas**: `v0.13.2b2`, `v0.13.2b3`, etc.
- **Release candidates**: `v0.13.2rc1`, `v0.13.2rc2`, etc.
- **Final release**: `v0.13.2` (use `/release` command)
## Error Handling
- If `just beta` fails, examine the error output for specific issues
- If quality checks fail, fix issues and retry
- If version format is invalid, correct and retry
- If tag already exists, increment version number
## Success Output
```
✅ Beta Release v0.13.2b1 Created Successfully!
🏷️ Tag: v0.13.2b1
🚀 GitHub Actions: Running
📦 PyPI: Will be available in ~5 minutes as pre-release
Install/test with:
uv tool install basic-memory --pre
Monitor release: https://github.com/basicmachines-co/basic-memory/actions
```
## Beta Testing Workflow
1. **Create beta**: Use `/beta v0.13.2b1`
2. **Test features**: Install and validate new functionality
3. **Fix issues**: Address bugs found during testing
4. **Iterate**: Create `v0.13.2b2` if needed
5. **Release candidate**: Create `v0.13.2rc1` when stable
6. **Final release**: Use `/release v0.13.2` when ready
## Context
- Beta releases are pre-releases for testing new features
- Automatically published to PyPI with pre-release flag
- Uses the automated justfile target for consistency
- Version is automatically updated in `__init__.py`
- Ideal for validating changes before stable release
- Supports both beta (b1, b2) and release candidate (rc1, rc2) versions
```
--------------------------------------------------------------------------------
/src/basic_memory/cli/commands/cloud/cloud_utils.py:
--------------------------------------------------------------------------------
```python
"""Shared utilities for cloud operations."""
from basic_memory.cli.commands.cloud.api_client import make_api_request
from basic_memory.config import ConfigManager
from basic_memory.schemas.cloud import (
CloudProjectList,
CloudProjectCreateRequest,
CloudProjectCreateResponse,
)
from basic_memory.utils import generate_permalink
class CloudUtilsError(Exception):
"""Exception raised for cloud utility errors."""
pass
async def fetch_cloud_projects(
*,
api_request=make_api_request,
) -> CloudProjectList:
"""Fetch list of projects from cloud API.
Returns:
CloudProjectList with projects from cloud
"""
try:
config_manager = ConfigManager()
config = config_manager.config
host_url = config.cloud_host.rstrip("/")
response = await api_request(method="GET", url=f"{host_url}/proxy/projects/projects")
return CloudProjectList.model_validate(response.json())
except Exception as e:
raise CloudUtilsError(f"Failed to fetch cloud projects: {e}") from e
async def create_cloud_project(
project_name: str,
*,
api_request=make_api_request,
) -> CloudProjectCreateResponse:
"""Create a new project on cloud.
Args:
project_name: Name of project to create
Returns:
CloudProjectCreateResponse with project details from API
"""
try:
config_manager = ConfigManager()
config = config_manager.config
host_url = config.cloud_host.rstrip("/")
# Use generate_permalink to ensure consistent naming
project_path = generate_permalink(project_name)
project_data = CloudProjectCreateRequest(
name=project_name,
path=project_path,
set_default=False,
)
response = await api_request(
method="POST",
url=f"{host_url}/proxy/projects/projects",
headers={"Content-Type": "application/json"},
json_data=project_data.model_dump(),
)
return CloudProjectCreateResponse.model_validate(response.json())
except Exception as e:
raise CloudUtilsError(f"Failed to create cloud project '{project_name}': {e}") from e
async def sync_project(project_name: str, force_full: bool = False) -> None:
"""Trigger sync for a specific project on cloud.
Args:
project_name: Name of project to sync
force_full: If True, force a full scan bypassing watermark optimization
"""
try:
from basic_memory.cli.commands.command_utils import run_sync
await run_sync(project=project_name, force_full=force_full)
except Exception as e:
raise CloudUtilsError(f"Failed to sync project '{project_name}': {e}") from e
async def project_exists(project_name: str, *, api_request=make_api_request) -> bool:
"""Check if a project exists on cloud.
Args:
project_name: Name of project to check
Returns:
True if project exists, False otherwise
"""
try:
projects = await fetch_cloud_projects(api_request=api_request)
project_names = {p.name for p in projects.projects}
return project_name in project_names
except Exception:
return False
```
--------------------------------------------------------------------------------
/.claude/commands/release/release-check.md:
--------------------------------------------------------------------------------
```markdown
# /release-check - Pre-flight Release Validation
Comprehensive pre-flight check for release readiness without making any changes.
## Usage
```
/release-check [version]
```
**Parameters:**
- `version` (optional): Version to validate like `v0.13.0`. If not provided, determines from context.
## Implementation
You are an expert QA engineer for the Basic Memory project. When the user runs `/release-check`, execute the following validation steps:
### Step 1: Environment Validation
1. **Git Status Check**
- Verify working directory is clean
- Confirm on `main` branch
- Check if ahead/behind origin
2. **Version Validation**
- Validate version format if provided
- Check for existing tags with same version
- Verify version increments properly from last release
### Step 2: Code Quality Gates
1. **Test Suite Validation**
```bash
just test
```
- All tests must pass
- Check test coverage (target: 95%+)
- Validate no skipped critical tests
2. **Code Quality Checks**
```bash
just lint
just type-check
```
- No linting errors
- No type checking errors
- Code formatting is consistent
### Step 3: Documentation Validation
1. **Changelog Check**
- CHANGELOG.md contains entry for target version
- Entry includes all major features and fixes
- Breaking changes are documented
2. **Documentation Currency**
- README.md reflects current functionality
- CLI reference is up to date
- MCP tools are documented
### Step 4: Dependency Validation
1. **Security Scan**
- No known vulnerabilities in dependencies
- All dependencies are at appropriate versions
- No conflicting dependency versions
2. **Build Validation**
- Package builds successfully
- All required files are included
- No missing dependencies
### Step 5: Issue Tracking Validation
1. **GitHub Issues Check**
- No critical open issues blocking release
- All milestone issues are resolved
- High-priority bugs are fixed
2. **Testing Coverage**
- Integration tests pass
- MCP tool tests pass
- Cross-platform compatibility verified
## Report Format
Generate a comprehensive report:
```
🔍 Release Readiness Check for v0.13.0
✅ PASSED CHECKS:
├── Git status clean
├── On main branch
├── All tests passing (744/744)
├── Test coverage: 98.2%
├── Type checking passed
├── Linting passed
├── CHANGELOG.md updated
└── No critical issues open
⚠️ WARNINGS:
├── 2 medium-priority issues still open
└── Documentation could be updated
❌ BLOCKING ISSUES:
└── None found
🎯 RELEASE READINESS: ✅ READY
Recommended next steps:
1. Address warnings if desired
2. Run `/release v0.13.0` when ready
```
## Validation Criteria
### Must Pass (Blocking)
- [ ] All tests pass
- [ ] No type errors
- [ ] No linting errors
- [ ] Working directory clean
- [ ] On main branch
- [ ] CHANGELOG.md has version entry
- [ ] No critical open issues
### Should Pass (Warnings)
- [ ] Test coverage >95%
- [ ] No medium-priority open issues
- [ ] Documentation up to date
- [ ] No dependency vulnerabilities
## Context
- This is a read-only validation - makes no changes
- Provides confidence before running actual release
- Helps identify issues early in release process
- Can be run multiple times safely
```
--------------------------------------------------------------------------------
/tests/schemas/test_search.py:
--------------------------------------------------------------------------------
```python
"""Tests for search schemas."""
from datetime import datetime
from basic_memory.schemas.search import (
SearchItemType,
SearchQuery,
SearchResult,
SearchResponse,
)
def test_search_modes():
"""Test different search modes."""
# Exact permalink
query = SearchQuery(permalink="specs/search")
assert query.permalink == "specs/search"
assert query.text is None
# Pattern match
query = SearchQuery(permalink="specs/*")
assert query.permalink == "specs/*"
assert query.text is None
# Text search
query = SearchQuery(text="search implementation")
assert query.text == "search implementation"
assert query.permalink is None
def test_search_filters():
"""Test search result filtering."""
query = SearchQuery(
text="search",
entity_types=[SearchItemType.ENTITY],
types=["component"],
after_date=datetime(2024, 1, 1),
)
assert query.entity_types == [SearchItemType.ENTITY]
assert query.types == ["component"]
assert query.after_date == "2024-01-01T00:00:00"
def test_search_result():
"""Test search result structure."""
result = SearchResult(
title="test",
type=SearchItemType.ENTITY,
entity="some_entity",
score=0.8,
metadata={"entity_type": "component"},
permalink="specs/search",
file_path="specs/search.md",
)
assert result.type == SearchItemType.ENTITY
assert result.score == 0.8
assert result.metadata == {"entity_type": "component"}
def test_observation_result():
"""Test observation result fields."""
result = SearchResult(
title="test",
permalink="specs/search",
file_path="specs/search.md",
type=SearchItemType.OBSERVATION,
score=0.5,
metadata={},
entity="some_entity",
category="tech",
)
assert result.entity == "some_entity"
assert result.category == "tech"
def test_relation_result():
"""Test relation result fields."""
result = SearchResult(
title="test",
permalink="specs/search",
file_path="specs/search.md",
type=SearchItemType.RELATION,
entity="some_entity",
score=0.5,
metadata={},
from_entity="123",
to_entity="456",
relation_type="depends_on",
)
assert result.from_entity == "123"
assert result.to_entity == "456"
assert result.relation_type == "depends_on"
def test_search_response():
"""Test search response wrapper."""
results = [
SearchResult(
title="test",
permalink="specs/search",
file_path="specs/search.md",
type=SearchItemType.ENTITY,
entity="some_entity",
score=0.8,
metadata={},
),
SearchResult(
title="test",
permalink="specs/search",
file_path="specs/search.md",
type=SearchItemType.ENTITY,
entity="some_entity",
score=0.6,
metadata={},
),
]
response = SearchResponse(results=results, current_page=1, page_size=1)
assert len(response.results) == 2
assert response.results[0].score > response.results[1].score
```
--------------------------------------------------------------------------------
/src/basic_memory/api/v2/routers/directory_router.py:
--------------------------------------------------------------------------------
```python
"""V2 Directory Router - ID-based directory tree operations.
This router provides directory structure browsing for projects using
external_id UUIDs instead of name-based identifiers.
Key improvements:
- Direct project lookup via external_id UUIDs
- Consistent with other v2 endpoints
- Better performance through indexed queries
"""
from typing import List, Optional
from fastapi import APIRouter, Query, Path
from basic_memory.deps import DirectoryServiceV2ExternalDep
from basic_memory.schemas.directory import DirectoryNode
router = APIRouter(prefix="/directory", tags=["directory-v2"])
@router.get("/tree", response_model=DirectoryNode, response_model_exclude_none=True)
async def get_directory_tree(
directory_service: DirectoryServiceV2ExternalDep,
project_id: str = Path(..., description="Project external UUID"),
):
"""Get hierarchical directory structure from the knowledge base.
Args:
directory_service: Service for directory operations
project_id: Project external UUID
Returns:
DirectoryNode representing the root of the hierarchical tree structure
"""
# Get a hierarchical directory tree for the specific project
tree = await directory_service.get_directory_tree()
# Return the hierarchical tree
return tree
@router.get("/structure", response_model=DirectoryNode, response_model_exclude_none=True)
async def get_directory_structure(
directory_service: DirectoryServiceV2ExternalDep,
project_id: str = Path(..., description="Project external UUID"),
):
"""Get folder structure for navigation (no files).
Optimized endpoint for folder tree navigation. Returns only directory nodes
without file metadata. For full tree with files, use /directory/tree.
Args:
directory_service: Service for directory operations
project_id: Project external UUID
Returns:
DirectoryNode tree containing only folders (type="directory")
"""
structure = await directory_service.get_directory_structure()
return structure
@router.get("/list", response_model=List[DirectoryNode], response_model_exclude_none=True)
async def list_directory(
directory_service: DirectoryServiceV2ExternalDep,
project_id: str = Path(..., description="Project external UUID"),
dir_name: str = Query("/", description="Directory path to list"),
depth: int = Query(1, ge=1, le=10, description="Recursion depth (1-10)"),
file_name_glob: Optional[str] = Query(
None, description="Glob pattern for filtering file names"
),
):
"""List directory contents with filtering and depth control.
Args:
directory_service: Service for directory operations
project_id: Project external UUID
dir_name: Directory path to list (default: root "/")
depth: Recursion depth (1-10, default: 1 for immediate children only)
file_name_glob: Optional glob pattern for filtering file names (e.g., "*.md", "*meeting*")
Returns:
List of DirectoryNode objects matching the criteria
"""
# Get directory listing with filtering
nodes = await directory_service.list_directory(
dir_name=dir_name,
depth=depth,
file_name_glob=file_name_glob,
)
return nodes
```
--------------------------------------------------------------------------------
/src/basic_memory/cli/commands/cloud/rclone_config.py:
--------------------------------------------------------------------------------
```python
"""rclone configuration management for Basic Memory Cloud.
This module provides simplified rclone configuration for SPEC-20.
Uses a single "basic-memory-cloud" remote for all operations.
"""
import configparser
import os
import shutil
from pathlib import Path
from typing import Optional
from rich.console import Console
console = Console()
class RcloneConfigError(Exception):
"""Exception raised for rclone configuration errors."""
pass
def get_rclone_config_path() -> Path:
"""Get the path to rclone configuration file."""
config_dir = Path.home() / ".config" / "rclone"
config_dir.mkdir(parents=True, exist_ok=True)
return config_dir / "rclone.conf"
def backup_rclone_config() -> Optional[Path]:
"""Create a backup of existing rclone config."""
config_path = get_rclone_config_path()
if not config_path.exists():
return None
backup_path = config_path.with_suffix(f".conf.backup-{os.getpid()}")
shutil.copy2(config_path, backup_path)
console.print(f"[dim]Created backup: {backup_path}[/dim]")
return backup_path
def load_rclone_config() -> configparser.ConfigParser:
"""Load existing rclone configuration."""
config = configparser.ConfigParser()
config_path = get_rclone_config_path()
if config_path.exists():
config.read(config_path)
return config
def save_rclone_config(config: configparser.ConfigParser) -> None:
"""Save rclone configuration to file."""
config_path = get_rclone_config_path()
with open(config_path, "w") as f:
config.write(f)
console.print(f"[dim]Updated rclone config: {config_path}[/dim]")
def configure_rclone_remote(
access_key: str,
secret_key: str,
endpoint: str = "https://fly.storage.tigris.dev",
region: str = "auto",
) -> str:
"""Configure single rclone remote named 'basic-memory-cloud'.
This is the simplified approach from SPEC-20 that uses one remote
for all Basic Memory cloud operations (not tenant-specific).
Args:
access_key: S3 access key ID
secret_key: S3 secret access key
endpoint: S3-compatible endpoint URL
region: S3 region (default: auto)
Returns:
The remote name: "basic-memory-cloud"
"""
# Backup existing config
backup_rclone_config()
# Load existing config
config = load_rclone_config()
# Single remote name (not tenant-specific)
REMOTE_NAME = "basic-memory-cloud"
# Add/update the remote section
if not config.has_section(REMOTE_NAME):
config.add_section(REMOTE_NAME)
config.set(REMOTE_NAME, "type", "s3")
config.set(REMOTE_NAME, "provider", "Other")
config.set(REMOTE_NAME, "access_key_id", access_key)
config.set(REMOTE_NAME, "secret_access_key", secret_key)
config.set(REMOTE_NAME, "endpoint", endpoint)
config.set(REMOTE_NAME, "region", region)
# Prevent unnecessary encoding of filenames (only encode slashes and invalid UTF-8)
# This prevents files with spaces like "Hello World.md" from being quoted
config.set(REMOTE_NAME, "encoding", "Slash,InvalidUtf8")
# Save updated config
save_rclone_config(config)
console.print(f"[green]Configured rclone remote: {REMOTE_NAME}[/green]")
return REMOTE_NAME
```
--------------------------------------------------------------------------------
/src/basic_memory/cli/commands/import_chatgpt.py:
--------------------------------------------------------------------------------
```python
"""Import command for ChatGPT conversations."""
import json
from pathlib import Path
from typing import Annotated, Tuple
import typer
from basic_memory.cli.app import import_app
from basic_memory.cli.commands.command_utils import run_with_cleanup
from basic_memory.config import ConfigManager, get_project_config
from basic_memory.importers import ChatGPTImporter
from basic_memory.markdown import EntityParser, MarkdownProcessor
from basic_memory.services.file_service import FileService
from loguru import logger
from rich.console import Console
from rich.panel import Panel
console = Console()
async def get_importer_dependencies() -> Tuple[MarkdownProcessor, FileService]:
"""Get MarkdownProcessor and FileService instances for importers."""
config = get_project_config()
app_config = ConfigManager().config
entity_parser = EntityParser(config.home)
markdown_processor = MarkdownProcessor(entity_parser, app_config=app_config)
file_service = FileService(config.home, markdown_processor, app_config=app_config)
return markdown_processor, file_service
@import_app.command(name="chatgpt", help="Import conversations from ChatGPT JSON export.")
def import_chatgpt(
conversations_json: Annotated[
Path, typer.Argument(help="Path to ChatGPT conversations.json file")
] = Path("conversations.json"),
folder: Annotated[
str, typer.Option(help="The folder to place the files in.")
] = "conversations",
):
"""Import chat conversations from ChatGPT JSON format.
This command will:
1. Read the complex tree structure of messages
2. Convert them to linear markdown conversations
3. Save as clean, readable markdown files
After importing, run 'basic-memory sync' to index the new files.
"""
try:
if not conversations_json.exists(): # pragma: no cover
typer.echo(f"Error: File not found: {conversations_json}", err=True)
raise typer.Exit(1)
# Get importer dependencies
markdown_processor, file_service = run_with_cleanup(get_importer_dependencies())
config = get_project_config()
# Process the file
base_path = config.home / folder
console.print(f"\nImporting chats from {conversations_json}...writing to {base_path}")
# Create importer and run import
importer = ChatGPTImporter(config.home, markdown_processor, file_service)
with conversations_json.open("r", encoding="utf-8") as file:
json_data = json.load(file)
result = run_with_cleanup(importer.import_data(json_data, folder))
if not result.success: # pragma: no cover
typer.echo(f"Error during import: {result.error_message}", err=True)
raise typer.Exit(1)
# Show results
console.print(
Panel(
f"[green]Import complete![/green]\n\n"
f"Imported {result.conversations} conversations\n"
f"Containing {result.messages} messages",
expand=False,
)
)
console.print("\nRun 'basic-memory sync' to index the new files.")
except Exception as e:
logger.error("Import failed")
typer.echo(f"Error during import: {e}", err=True)
raise typer.Exit(1)
```
--------------------------------------------------------------------------------
/src/basic_memory/cli/commands/import_memory_json.py:
--------------------------------------------------------------------------------
```python
"""Import command for basic-memory CLI to import from JSON memory format."""
import json
from pathlib import Path
from typing import Annotated, Tuple
import typer
from basic_memory.cli.app import import_app
from basic_memory.cli.commands.command_utils import run_with_cleanup
from basic_memory.config import ConfigManager, get_project_config
from basic_memory.importers.memory_json_importer import MemoryJsonImporter
from basic_memory.markdown import EntityParser, MarkdownProcessor
from basic_memory.services.file_service import FileService
from loguru import logger
from rich.console import Console
from rich.panel import Panel
console = Console()
async def get_importer_dependencies() -> Tuple[MarkdownProcessor, FileService]:
"""Get MarkdownProcessor and FileService instances for importers."""
config = get_project_config()
app_config = ConfigManager().config
entity_parser = EntityParser(config.home)
markdown_processor = MarkdownProcessor(entity_parser, app_config=app_config)
file_service = FileService(config.home, markdown_processor, app_config=app_config)
return markdown_processor, file_service
@import_app.command()
def memory_json(
json_path: Annotated[Path, typer.Argument(..., help="Path to memory.json file")] = Path(
"memory.json"
),
destination_folder: Annotated[
str, typer.Option(help="Optional destination folder within the project")
] = "",
):
"""Import entities and relations from a memory.json file.
This command will:
1. Read entities and relations from the JSON file
2. Create markdown files for each entity
3. Include outgoing relations in each entity's markdown
"""
if not json_path.exists():
typer.echo(f"Error: File not found: {json_path}", err=True)
raise typer.Exit(1)
config = get_project_config()
try:
# Get importer dependencies
markdown_processor, file_service = run_with_cleanup(get_importer_dependencies())
# Create the importer
importer = MemoryJsonImporter(config.home, markdown_processor, file_service)
# Process the file
base_path = config.home if not destination_folder else config.home / destination_folder
console.print(f"\nImporting from {json_path}...writing to {base_path}")
# Run the import for json log format
file_data = []
with json_path.open("r", encoding="utf-8") as file:
for line in file:
json_data = json.loads(line)
file_data.append(json_data)
result = run_with_cleanup(importer.import_data(file_data, destination_folder))
if not result.success: # pragma: no cover
typer.echo(f"Error during import: {result.error_message}", err=True)
raise typer.Exit(1)
# Show results
console.print(
Panel(
f"[green]Import complete![/green]\n\n"
f"Created {result.entities} entities\n"
f"Added {result.relations} relations\n"
f"Skipped {result.skipped_entities} entities\n",
expand=False,
)
)
except Exception as e:
logger.error("Import failed")
typer.echo(f"Error during import: {e}", err=True)
raise typer.Exit(1)
```
--------------------------------------------------------------------------------
/src/basic_memory/mcp/container.py:
--------------------------------------------------------------------------------
```python
"""MCP composition root for Basic Memory.
This container owns reading ConfigManager and environment variables for the
MCP server entrypoint. Downstream modules receive config/dependencies explicitly
rather than reading globals.
Design principles:
- Only this module reads ConfigManager directly
- Runtime mode (cloud/local/test) is resolved here
- File sync decisions are centralized here
"""
from dataclasses import dataclass
from typing import TYPE_CHECKING
from basic_memory.config import BasicMemoryConfig, ConfigManager
from basic_memory.runtime import RuntimeMode, resolve_runtime_mode
if TYPE_CHECKING: # pragma: no cover
from basic_memory.sync import SyncCoordinator
@dataclass
class McpContainer:
"""Composition root for the MCP server entrypoint.
Holds resolved configuration and runtime context.
Created once at server startup, then used to wire dependencies.
"""
config: BasicMemoryConfig
mode: RuntimeMode
@classmethod
def create(cls) -> "McpContainer":
"""Create container by reading ConfigManager.
This is the single point where MCP reads global config.
"""
config = ConfigManager().config
mode = resolve_runtime_mode(
cloud_mode_enabled=config.cloud_mode_enabled,
is_test_env=config.is_test_env,
)
return cls(config=config, mode=mode)
# --- Runtime Mode Properties ---
@property
def should_sync_files(self) -> bool:
"""Whether local file sync should be started.
Sync is enabled when:
- sync_changes is True in config
- Not in test mode (tests manage their own sync)
- Not in cloud mode (cloud handles sync differently)
"""
return self.config.sync_changes and not self.mode.is_test and not self.mode.is_cloud
@property
def sync_skip_reason(self) -> str | None:
"""Reason why sync is skipped, or None if sync should run.
Useful for logging why sync was disabled.
"""
if self.mode.is_test:
return "Test environment detected"
if self.mode.is_cloud:
return "Cloud mode enabled"
if not self.config.sync_changes:
return "Sync changes disabled"
return None
def create_sync_coordinator(self) -> "SyncCoordinator":
"""Create a SyncCoordinator with this container's settings.
Returns:
SyncCoordinator configured for this runtime environment
"""
# Deferred import to avoid circular dependency
from basic_memory.sync import SyncCoordinator
return SyncCoordinator(
config=self.config,
should_sync=self.should_sync_files,
skip_reason=self.sync_skip_reason,
)
# Module-level container instance (set by lifespan)
_container: McpContainer | None = None
def get_container() -> McpContainer:
"""Get the current MCP container.
Raises:
RuntimeError: If container hasn't been initialized
"""
if _container is None:
raise RuntimeError("MCP container not initialized. Call set_container() first.")
return _container
def set_container(container: McpContainer) -> None:
"""Set the MCP container (called by lifespan)."""
global _container
_container = container
```
--------------------------------------------------------------------------------
/src/basic_memory/cli/commands/import_claude_projects.py:
--------------------------------------------------------------------------------
```python
"""Import command for basic-memory CLI to import project data from Claude.ai."""
import json
from pathlib import Path
from typing import Annotated, Tuple
import typer
from basic_memory.cli.app import claude_app
from basic_memory.cli.commands.command_utils import run_with_cleanup
from basic_memory.config import ConfigManager, get_project_config
from basic_memory.importers.claude_projects_importer import ClaudeProjectsImporter
from basic_memory.markdown import EntityParser, MarkdownProcessor
from basic_memory.services.file_service import FileService
from loguru import logger
from rich.console import Console
from rich.panel import Panel
console = Console()
async def get_importer_dependencies() -> Tuple[MarkdownProcessor, FileService]:
"""Get MarkdownProcessor and FileService instances for importers."""
config = get_project_config()
app_config = ConfigManager().config
entity_parser = EntityParser(config.home)
markdown_processor = MarkdownProcessor(entity_parser, app_config=app_config)
file_service = FileService(config.home, markdown_processor, app_config=app_config)
return markdown_processor, file_service
@claude_app.command(name="projects", help="Import projects from Claude.ai.")
def import_projects(
projects_json: Annotated[Path, typer.Argument(..., help="Path to projects.json file")] = Path(
"projects.json"
),
base_folder: Annotated[
str, typer.Option(help="The base folder to place project files in.")
] = "projects",
):
"""Import project data from Claude.ai.
This command will:
1. Create a directory for each project
2. Store docs in a docs/ subdirectory
3. Place prompt template in project root
After importing, run 'basic-memory sync' to index the new files.
"""
config = get_project_config()
try:
if not projects_json.exists():
typer.echo(f"Error: File not found: {projects_json}", err=True)
raise typer.Exit(1)
# Get importer dependencies
markdown_processor, file_service = run_with_cleanup(get_importer_dependencies())
# Create the importer
importer = ClaudeProjectsImporter(config.home, markdown_processor, file_service)
# Process the file
base_path = config.home / base_folder if base_folder else config.home
console.print(f"\nImporting projects from {projects_json}...writing to {base_path}")
# Run the import
with projects_json.open("r", encoding="utf-8") as file:
json_data = json.load(file)
result = run_with_cleanup(importer.import_data(json_data, base_folder))
if not result.success: # pragma: no cover
typer.echo(f"Error during import: {result.error_message}", err=True)
raise typer.Exit(1)
# Show results
console.print(
Panel(
f"[green]Import complete![/green]\n\n"
f"Imported {result.documents} project documents\n"
f"Imported {result.prompts} prompt templates",
expand=False,
)
)
console.print("\nRun 'basic-memory sync' to index the new files.")
except Exception as e:
logger.error("Import failed")
typer.echo(f"Error during import: {e}", err=True)
raise typer.Exit(1)
```
--------------------------------------------------------------------------------
/src/basic_memory/cli/commands/import_claude_conversations.py:
--------------------------------------------------------------------------------
```python
"""Import command for basic-memory CLI to import chat data from conversations2.json format."""
import json
from pathlib import Path
from typing import Annotated, Tuple
import typer
from basic_memory.cli.app import claude_app
from basic_memory.cli.commands.command_utils import run_with_cleanup
from basic_memory.config import ConfigManager, get_project_config
from basic_memory.importers.claude_conversations_importer import ClaudeConversationsImporter
from basic_memory.markdown import EntityParser, MarkdownProcessor
from basic_memory.services.file_service import FileService
from loguru import logger
from rich.console import Console
from rich.panel import Panel
console = Console()
async def get_importer_dependencies() -> Tuple[MarkdownProcessor, FileService]:
"""Get MarkdownProcessor and FileService instances for importers."""
config = get_project_config()
app_config = ConfigManager().config
entity_parser = EntityParser(config.home)
markdown_processor = MarkdownProcessor(entity_parser, app_config=app_config)
file_service = FileService(config.home, markdown_processor, app_config=app_config)
return markdown_processor, file_service
@claude_app.command(name="conversations", help="Import chat conversations from Claude.ai.")
def import_claude(
conversations_json: Annotated[
Path, typer.Argument(..., help="Path to conversations.json file")
] = Path("conversations.json"),
folder: Annotated[
str, typer.Option(help="The folder to place the files in.")
] = "conversations",
):
"""Import chat conversations from conversations2.json format.
This command will:
1. Read chat data and nested messages
2. Create markdown files for each conversation
3. Format content in clean, readable markdown
After importing, run 'basic-memory sync' to index the new files.
"""
config = get_project_config()
try:
if not conversations_json.exists():
typer.echo(f"Error: File not found: {conversations_json}", err=True)
raise typer.Exit(1)
# Get importer dependencies
markdown_processor, file_service = run_with_cleanup(get_importer_dependencies())
# Create the importer
importer = ClaudeConversationsImporter(config.home, markdown_processor, file_service)
# Process the file
base_path = config.home / folder
console.print(f"\nImporting chats from {conversations_json}...writing to {base_path}")
# Run the import
with conversations_json.open("r", encoding="utf-8") as file:
json_data = json.load(file)
result = run_with_cleanup(importer.import_data(json_data, folder))
if not result.success: # pragma: no cover
typer.echo(f"Error during import: {result.error_message}", err=True)
raise typer.Exit(1)
# Show results
console.print(
Panel(
f"[green]Import complete![/green]\n\n"
f"Imported {result.conversations} conversations\n"
f"Containing {result.messages} messages",
expand=False,
)
)
console.print("\nRun 'basic-memory sync' to index the new files.")
except Exception as e:
logger.error("Import failed")
typer.echo(f"Error during import: {e}", err=True)
raise typer.Exit(1)
```
--------------------------------------------------------------------------------
/src/basic_memory/models/project.py:
--------------------------------------------------------------------------------
```python
"""Project model for Basic Memory."""
import uuid
from datetime import datetime, UTC
from typing import Optional
from sqlalchemy import (
Integer,
String,
Text,
Boolean,
DateTime,
Float,
Index,
event,
)
from sqlalchemy.orm import Mapped, mapped_column, relationship
from basic_memory.models.base import Base
from basic_memory.utils import generate_permalink
class Project(Base):
"""Project model for Basic Memory.
A project represents a collection of knowledge entities that are grouped together.
Projects are stored in the app-level database and provide context for all knowledge
operations.
"""
__tablename__ = "project"
__table_args__ = (
# Regular indexes
Index("ix_project_name", "name", unique=True),
Index("ix_project_permalink", "permalink", unique=True),
Index("ix_project_external_id", "external_id", unique=True),
Index("ix_project_path", "path"),
Index("ix_project_created_at", "created_at"),
Index("ix_project_updated_at", "updated_at"),
)
# Core identity
id: Mapped[int] = mapped_column(Integer, primary_key=True)
# External UUID for API references - stable identifier that won't change
external_id: Mapped[str] = mapped_column(String, unique=True, default=lambda: str(uuid.uuid4()))
name: Mapped[str] = mapped_column(String, unique=True)
description: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
# URL-friendly identifier generated from name
permalink: Mapped[str] = mapped_column(String, unique=True)
# Filesystem path to project directory
path: Mapped[str] = mapped_column(String)
# Status flags
is_active: Mapped[bool] = mapped_column(Boolean, default=True)
is_default: Mapped[Optional[bool]] = mapped_column(Boolean, default=None, nullable=True)
# Timestamps
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=lambda: datetime.now(UTC)
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
default=lambda: datetime.now(UTC),
onupdate=lambda: datetime.now(UTC),
)
# Sync optimization - scan watermark tracking
last_scan_timestamp: Mapped[Optional[float]] = mapped_column(Float, nullable=True)
last_file_count: Mapped[Optional[int]] = mapped_column(Integer, nullable=True)
# Define relationships to entities, observations, and relations
# These relationships will be established once we add project_id to those models
entities = relationship("Entity", back_populates="project", cascade="all, delete-orphan")
def __repr__(self) -> str: # pragma: no cover
return f"Project(id={self.id}, external_id='{self.external_id}', name='{self.name}', permalink='{self.permalink}', path='{self.path}')"
@event.listens_for(Project, "before_insert")
@event.listens_for(Project, "before_update")
def set_project_permalink(mapper, connection, project):
"""Generate URL-friendly permalink for the project if needed.
This event listener ensures the permalink is always derived from the name,
even if the name changes.
"""
# If the name changed or permalink is empty, regenerate permalink
if not project.permalink or project.permalink != generate_permalink(project.name):
project.permalink = generate_permalink(project.name)
```
--------------------------------------------------------------------------------
/.github/workflows/claude-code-review.yml:
--------------------------------------------------------------------------------
```yaml
name: Claude Code Review
on:
pull_request:
types: [opened, synchronize]
# Optional: Only run on specific file changes
# paths:
# - "src/**/*.ts"
# - "src/**/*.tsx"
# - "src/**/*.js"
# - "src/**/*.jsx"
jobs:
claude-review:
# Only run for organization members and collaborators
if: |
github.event.pull_request.author_association == 'OWNER' ||
github.event.pull_request.author_association == 'MEMBER' ||
github.event.pull_request.author_association == 'COLLABORATOR'
runs-on: ubuntu-latest
permissions:
contents: read
pull-requests: write
issues: read
id-token: write
steps:
- name: Checkout repository
uses: actions/checkout@v4
with:
fetch-depth: 1
- name: Run Claude Code Review
id: claude-review
uses: anthropics/claude-code-action@v1
with:
claude_code_oauth_token: ${{ secrets.CLAUDE_CODE_OAUTH_TOKEN }}
github_token: ${{ secrets.GITHUB_TOKEN }}
track_progress: true # Enable visual progress tracking
allowed_bots: '*'
prompt: |
Review this Basic Memory PR against our team checklist:
## Code Quality & Standards
- [ ] Follows Basic Memory's coding conventions in CLAUDE.md
- [ ] Python 3.12+ type annotations and async patterns
- [ ] SQLAlchemy 2.0 best practices
- [ ] FastAPI and Typer conventions followed
- [ ] 100-character line length limit maintained
- [ ] No commented-out code blocks
## Testing & Documentation
- [ ] Unit tests for new functions/methods
- [ ] Integration tests for new MCP tools
- [ ] Test coverage for edge cases
- [ ] **100% test coverage maintained** (use `# pragma: no cover` only for truly hard-to-test code)
- [ ] Documentation updated (README, docstrings)
- [ ] CLAUDE.md updated if conventions change
## Basic Memory Architecture
- [ ] MCP tools follow atomic, composable design
- [ ] Database changes include Alembic migrations
- [ ] Preserves local-first architecture principles
- [ ] Knowledge graph operations maintain consistency
- [ ] Markdown file handling preserves integrity
- [ ] AI-human collaboration patterns followed
## Security & Performance
- [ ] No hardcoded secrets or credentials
- [ ] Input validation for MCP tools
- [ ] Proper error handling and logging
- [ ] Performance considerations addressed
- [ ] No sensitive data in logs or commits
## Compatability
- [ ] File path comparisons must be windows compatible
- [ ] Avoid using emojis and unicode characters in console and log output
Read the CLAUDE.md file for detailed project context. For each checklist item, verify if it's satisfied and comment on any that need attention. Use inline comments for specific code issues and post a summary with checklist results.
# Allow broader tool access for thorough code review
claude_args: '--allowed-tools "Bash(gh pr:*),Bash(gh issue:*),Bash(gh api:*),Bash(git log:*),Bash(git show:*),Read,Grep,Glob"'
```
--------------------------------------------------------------------------------
/src/basic_memory/repository/search_repository.py:
--------------------------------------------------------------------------------
```python
"""Repository for search operations.
This module provides the search repository interface.
The actual repository implementations are backend-specific:
- SQLiteSearchRepository: Uses FTS5 virtual tables
- PostgresSearchRepository: Uses tsvector/tsquery with GIN indexes
"""
from datetime import datetime
from typing import List, Optional, Protocol
from sqlalchemy import Result
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from basic_memory.config import ConfigManager, DatabaseBackend
from basic_memory.repository.postgres_search_repository import PostgresSearchRepository
from basic_memory.repository.search_index_row import SearchIndexRow
from basic_memory.repository.sqlite_search_repository import SQLiteSearchRepository
from basic_memory.schemas.search import SearchItemType
class SearchRepository(Protocol):
"""Protocol defining the search repository interface.
Both SQLite and Postgres implementations must satisfy this protocol.
"""
project_id: int
async def init_search_index(self) -> None:
"""Initialize the search index schema."""
...
async def search(
self,
search_text: Optional[str] = None,
permalink: Optional[str] = None,
permalink_match: Optional[str] = None,
title: Optional[str] = None,
types: Optional[List[str]] = None,
after_date: Optional[datetime] = None,
search_item_types: Optional[List[SearchItemType]] = None,
limit: int = 10,
offset: int = 0,
) -> List[SearchIndexRow]:
"""Search across indexed content."""
...
async def index_item(self, search_index_row: SearchIndexRow) -> None:
"""Index a single item."""
...
async def bulk_index_items(self, search_index_rows: List[SearchIndexRow]) -> None:
"""Index multiple items in a batch."""
...
async def delete_by_permalink(self, permalink: str) -> None:
"""Delete item by permalink."""
...
async def delete_by_entity_id(self, entity_id: int) -> None:
"""Delete items by entity ID."""
...
async def execute_query(self, query, params: dict) -> Result:
"""Execute a raw SQL query."""
...
def create_search_repository(
session_maker: async_sessionmaker[AsyncSession],
project_id: int,
database_backend: Optional[DatabaseBackend] = None,
) -> SearchRepository:
"""Factory function to create the appropriate search repository based on database backend.
Args:
session_maker: SQLAlchemy async session maker
project_id: Project ID for the repository
database_backend: Optional explicit backend. If not provided, reads from ConfigManager.
Prefer passing explicitly from composition roots.
Returns:
SearchRepository: Backend-appropriate search repository instance
"""
# Prefer explicit parameter; fall back to ConfigManager for backwards compatibility
if database_backend is None:
config = ConfigManager().config
database_backend = config.database_backend
if database_backend == DatabaseBackend.POSTGRES: # pragma: no cover
return PostgresSearchRepository(session_maker, project_id=project_id) # pragma: no cover
else:
return SQLiteSearchRepository(session_maker, project_id=project_id)
__all__ = [
"SearchRepository",
"SearchIndexRow",
"create_search_repository",
]
```
--------------------------------------------------------------------------------
/src/basic_memory/importers/base.py:
--------------------------------------------------------------------------------
```python
"""Base import service for Basic Memory."""
import logging
from abc import abstractmethod
from pathlib import Path
from typing import TYPE_CHECKING, Any, Optional, TypeVar
from basic_memory.markdown.markdown_processor import MarkdownProcessor
from basic_memory.markdown.schemas import EntityMarkdown
from basic_memory.schemas.importer import ImportResult
if TYPE_CHECKING: # pragma: no cover
from basic_memory.services.file_service import FileService
logger = logging.getLogger(__name__)
T = TypeVar("T", bound=ImportResult)
class Importer[T: ImportResult]:
"""Base class for all import services.
All file operations are delegated to FileService, which can be overridden
in cloud environments to use S3 or other storage backends.
"""
def __init__(
self,
base_path: Path,
markdown_processor: MarkdownProcessor,
file_service: "FileService",
):
"""Initialize the import service.
Args:
base_path: Base path for the project.
markdown_processor: MarkdownProcessor instance for markdown serialization.
file_service: FileService instance for all file operations.
"""
self.base_path = base_path.resolve() # Get absolute path
self.markdown_processor = markdown_processor
self.file_service = file_service
@abstractmethod
async def import_data(self, source_data, destination_folder: str, **kwargs: Any) -> T:
"""Import data from source file to destination folder.
Args:
source_path: Path to the source file.
destination_folder: Destination folder within the project.
**kwargs: Additional keyword arguments for specific import types.
Returns:
ImportResult containing statistics and status of the import.
"""
pass # pragma: no cover
async def write_entity(self, entity: EntityMarkdown, file_path: str | Path) -> str:
"""Write entity to file using FileService.
This method serializes the entity to markdown and writes it using
FileService, which handles directory creation and storage backend
abstraction (local filesystem vs cloud storage).
Args:
entity: EntityMarkdown instance to write.
file_path: Relative path to write the entity to. FileService handles base_path.
Returns:
Checksum of written file.
"""
content = self.markdown_processor.to_markdown_string(entity)
# FileService.write_file handles directory creation and returns checksum
return await self.file_service.write_file(file_path, content)
async def ensure_folder_exists(self, folder: str) -> None:
"""Ensure folder exists using FileService.
For cloud storage (S3), this is essentially a no-op since S3 doesn't
have actual folders - they're just key prefixes.
Args:
folder: Relative folder path within the project. FileService handles base_path.
"""
await self.file_service.ensure_directory(folder)
@abstractmethod
def handle_error(
self, message: str, error: Optional[Exception] = None
) -> T: # pragma: no cover
"""Handle errors during import.
Args:
message: Error message.
error: Optional exception that caused the error.
Returns:
ImportResult with error information.
"""
pass
```
--------------------------------------------------------------------------------
/src/basic_memory/models/search.py:
--------------------------------------------------------------------------------
```python
"""Search DDL statements for SQLite and Postgres.
The search_index table is created via raw DDL, not ORM models, because:
- SQLite uses FTS5 virtual tables (cannot be represented as ORM)
- Postgres uses composite primary keys and generated tsvector columns
- Both backends use raw SQL for all search operations via SearchIndexRow dataclass
"""
from sqlalchemy import DDL
# Define Postgres search_index table with composite primary key and tsvector
# This DDL matches the Alembic migration schema (314f1ea54dc4)
# Used by tests to create the table without running full migrations
# NOTE: Split into separate DDL statements because asyncpg doesn't support
# multiple statements in a single execute call.
CREATE_POSTGRES_SEARCH_INDEX_TABLE = DDL("""
CREATE TABLE IF NOT EXISTS search_index (
id INTEGER NOT NULL,
project_id INTEGER NOT NULL,
title TEXT,
content_stems TEXT,
content_snippet TEXT,
permalink VARCHAR,
file_path VARCHAR,
type VARCHAR,
from_id INTEGER,
to_id INTEGER,
relation_type VARCHAR,
entity_id INTEGER,
category VARCHAR,
metadata JSONB,
created_at TIMESTAMP WITH TIME ZONE,
updated_at TIMESTAMP WITH TIME ZONE,
textsearchable_index_col tsvector GENERATED ALWAYS AS (
to_tsvector('english', coalesce(title, '') || ' ' || coalesce(content_stems, ''))
) STORED,
PRIMARY KEY (id, type, project_id),
FOREIGN KEY (project_id) REFERENCES project(id) ON DELETE CASCADE
)
""")
CREATE_POSTGRES_SEARCH_INDEX_FTS = DDL("""
CREATE INDEX IF NOT EXISTS idx_search_index_fts ON search_index USING gin(textsearchable_index_col)
""")
CREATE_POSTGRES_SEARCH_INDEX_METADATA = DDL("""
CREATE INDEX IF NOT EXISTS idx_search_index_metadata_gin ON search_index USING gin(metadata jsonb_path_ops)
""")
# Partial unique index on (permalink, project_id) for non-null permalinks
# This prevents duplicate permalinks per project and is used by upsert operations
# in PostgresSearchRepository to handle race conditions during parallel indexing
CREATE_POSTGRES_SEARCH_INDEX_PERMALINK = DDL("""
CREATE UNIQUE INDEX IF NOT EXISTS uix_search_index_permalink_project
ON search_index (permalink, project_id)
WHERE permalink IS NOT NULL
""")
# Define FTS5 virtual table creation for SQLite only
# This DDL is executed separately for SQLite databases
CREATE_SEARCH_INDEX = DDL("""
CREATE VIRTUAL TABLE IF NOT EXISTS search_index USING fts5(
-- Core entity fields
id UNINDEXED, -- Row ID
title, -- Title for searching
content_stems, -- Main searchable content split into stems
content_snippet, -- File content snippet for display
permalink, -- Stable identifier (now indexed for path search)
file_path UNINDEXED, -- Physical location
type UNINDEXED, -- entity/relation/observation
-- Project context
project_id UNINDEXED, -- Project identifier
-- Relation fields
from_id UNINDEXED, -- Source entity
to_id UNINDEXED, -- Target entity
relation_type UNINDEXED, -- Type of relation
-- Observation fields
entity_id UNINDEXED, -- Parent entity
category UNINDEXED, -- Observation category
-- Common fields
metadata UNINDEXED, -- JSON metadata
created_at UNINDEXED, -- Creation timestamp
updated_at UNINDEXED, -- Last update
-- Configuration
tokenize='unicode61 tokenchars 0x2F', -- Hex code for /
prefix='1,2,3,4' -- Support longer prefixes for paths
);
""")
```
--------------------------------------------------------------------------------
/src/basic_memory/mcp/clients/memory.py:
--------------------------------------------------------------------------------
```python
"""Typed client for memory/context API operations.
Encapsulates all /v2/projects/{project_id}/memory/* endpoints.
"""
from typing import Optional
from httpx import AsyncClient
from basic_memory.mcp.tools.utils import call_get
from basic_memory.schemas.memory import GraphContext
class MemoryClient:
"""Typed client for memory context operations.
Centralizes:
- API path construction for /v2/projects/{project_id}/memory/*
- Response validation via Pydantic models
- Consistent error handling through call_* utilities
Usage:
async with get_client() as http_client:
client = MemoryClient(http_client, project_id)
context = await client.build_context("memory://specs/search")
"""
def __init__(self, http_client: AsyncClient, project_id: str):
"""Initialize the memory client.
Args:
http_client: HTTPX AsyncClient for making requests
project_id: Project external_id (UUID) for API calls
"""
self.http_client = http_client
self.project_id = project_id
self._base_path = f"/v2/projects/{project_id}/memory"
async def build_context(
self,
path: str,
*,
depth: int = 1,
timeframe: Optional[str] = None,
page: int = 1,
page_size: int = 10,
max_related: int = 10,
) -> GraphContext:
"""Build context from a memory path.
Args:
path: The path to build context for (without memory:// prefix)
depth: How deep to traverse relations
timeframe: Time filter (e.g., "7d", "1 week")
page: Page number (1-indexed)
page_size: Results per page
max_related: Maximum related items per result
Returns:
GraphContext with hierarchical results
Raises:
ToolError: If the request fails
"""
params: dict = {
"depth": depth,
"page": page,
"page_size": page_size,
"max_related": max_related,
}
if timeframe:
params["timeframe"] = timeframe
response = await call_get(
self.http_client,
f"{self._base_path}/{path}",
params=params,
)
return GraphContext.model_validate(response.json())
async def recent(
self,
*,
timeframe: str = "7d",
depth: int = 1,
types: Optional[list[str]] = None,
page: int = 1,
page_size: int = 10,
) -> GraphContext:
"""Get recent activity.
Args:
timeframe: Time filter (e.g., "7d", "1 week", "2 days ago")
depth: How deep to traverse relations
types: Filter by item types
page: Page number (1-indexed)
page_size: Results per page
Returns:
GraphContext with recent activity
Raises:
ToolError: If the request fails
"""
params: dict = {
"timeframe": timeframe,
"depth": depth,
"page": page,
"page_size": page_size,
}
if types:
# Join types as comma-separated string if provided
params["type"] = ",".join(types) if isinstance(types, list) else types
response = await call_get(
self.http_client,
f"{self._base_path}/recent",
params=params,
)
return GraphContext.model_validate(response.json())
```
--------------------------------------------------------------------------------
/CLA.md:
--------------------------------------------------------------------------------
```markdown
# Contributor License Agreement
## Copyright Assignment and License Grant
By signing this Contributor License Agreement ("Agreement"), you accept and agree to the following terms and conditions
for your present and future Contributions submitted
to Basic Machines LLC. Except for the license granted herein to Basic Machines LLC and recipients of software
distributed by Basic Machines LLC, you reserve all right,
title, and interest in and to your Contributions.
### 1. Definitions
"You" (or "Your") shall mean the copyright owner or legal entity authorized by the copyright owner that is making this
Agreement with Basic Machines LLC.
"Contribution" shall mean any original work of authorship, including any modifications or additions to an existing work,
that is intentionally submitted by You to Basic
Machines LLC for inclusion in, or documentation of, any of the products owned or managed by Basic Machines LLC (the "
Work").
### 2. Grant of Copyright License
Subject to the terms and conditions of this Agreement, You hereby grant to Basic Machines LLC and to recipients of
software distributed by Basic Machines LLC a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable copyright license to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the
Work, and to permit persons to whom the Work is furnished to do so.
### 3. Assignment of Copyright
You hereby assign to Basic Machines LLC all right, title, and interest worldwide in all Copyright covering your
Contributions. Basic Machines LLC may license the
Contributions under any license terms, including copyleft, permissive, commercial, or proprietary licenses.
### 4. Grant of Patent License
Subject to the terms and conditions of this Agreement, You hereby grant to Basic Machines LLC and to recipients of
software distributed by Basic Machines LLC a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable (except as stated in this section) patent license to
make, have made, use, offer to sell, sell, import, and
otherwise transfer the Work.
### 5. Developer Certificate of Origin
By making a Contribution to this project, You certify that:
(a) The Contribution was created in whole or in part by You and You have the right to submit it under this Agreement; or
(b) The Contribution is based upon previous work that, to the best of Your knowledge, is covered under an appropriate
open source license and You have the right under that
license to submit that work with modifications, whether created in whole or in part by You, under this Agreement; or
(c) The Contribution was provided directly to You by some other person who certified (a), (b) or (c) and You have not
modified it.
(d) You understand and agree that this project and the Contribution are public and that a record of the Contribution (
including all personal information You submit with
it, including Your sign-off) is maintained indefinitely and may be redistributed consistent with this project or the
open source license(s) involved.
### 6. Representations
You represent that you are legally entitled to grant the above license and assignment. If your employer(s) has rights to
intellectual property that you create that
includes your Contributions, you represent that you have received permission to make Contributions on behalf of that
employer, or that your employer has waived such rights
for your Contributions to Basic Machines LLC.
---
This Agreement is effective as of the date you first submit a Contribution to Basic Machines LLC.
```
--------------------------------------------------------------------------------
/src/basic_memory/cli/commands/command_utils.py:
--------------------------------------------------------------------------------
```python
"""utility functions for commands"""
import asyncio
from typing import Optional, TypeVar, Coroutine, Any
from mcp.server.fastmcp.exceptions import ToolError
import typer
from rich.console import Console
from basic_memory import db
from basic_memory.mcp.async_client import get_client
from basic_memory.mcp.tools.utils import call_post, call_get
from basic_memory.mcp.project_context import get_active_project
from basic_memory.schemas import ProjectInfoResponse
from basic_memory.telemetry import shutdown_telemetry
console = Console()
T = TypeVar("T")
def run_with_cleanup(coro: Coroutine[Any, Any, T]) -> T:
"""Run an async coroutine with proper database cleanup.
This helper ensures database connections and telemetry threads are cleaned up
before the event loop closes, preventing process hangs in CLI commands.
Args:
coro: The coroutine to run
Returns:
The result of the coroutine
"""
async def _with_cleanup() -> T:
try:
return await coro
finally:
await db.shutdown_db()
# Shutdown telemetry to stop the OpenPanel background thread
# This prevents hangs on Python 3.14+ during thread shutdown
shutdown_telemetry()
return asyncio.run(_with_cleanup())
async def run_sync(
project: Optional[str] = None,
force_full: bool = False,
run_in_background: bool = True,
):
"""Run sync operation via API endpoint.
Args:
project: Optional project name
force_full: If True, force a full scan bypassing watermark optimization
run_in_background: If True, return immediately; if False, wait for completion
"""
try:
async with get_client() as client:
project_item = await get_active_project(client, project, None)
url = f"{project_item.project_url}/project/sync"
params = []
if force_full:
params.append("force_full=true")
if not run_in_background:
params.append("run_in_background=false")
if params:
url += "?" + "&".join(params)
response = await call_post(client, url)
data = response.json()
# Background mode returns {"message": "..."}, foreground returns SyncReportResponse
if "message" in data:
console.print(f"[green]{data['message']}[/green]")
else:
# Foreground mode - show summary of sync results
total = data.get("total", 0)
new_count = len(data.get("new", []))
modified_count = len(data.get("modified", []))
deleted_count = len(data.get("deleted", []))
console.print(
f"[green]Synced {total} files[/green] "
f"(new: {new_count}, modified: {modified_count}, deleted: {deleted_count})"
)
except (ToolError, ValueError) as e:
console.print(f"[red]Sync failed: {e}[/red]")
raise typer.Exit(1)
async def get_project_info(project: str):
"""Get project information via API endpoint."""
try:
async with get_client() as client:
project_item = await get_active_project(client, project, None)
response = await call_get(client, f"{project_item.project_url}/project/info")
return ProjectInfoResponse.model_validate(response.json())
except (ToolError, ValueError) as e:
console.print(f"[red]Sync failed: {e}[/red]")
raise typer.Exit(1)
```
--------------------------------------------------------------------------------
/test-int/mcp/test_lifespan_shutdown_sync_task_cancellation_integration.py:
--------------------------------------------------------------------------------
```python
"""
Integration test for FastAPI lifespan shutdown behavior.
This test verifies the asyncio cancellation pattern used by the API lifespan:
when the background sync task is cancelled during shutdown, it must be *awaited*
before database shutdown begins. This prevents "hang on exit" scenarios in
`asyncio.run(...)` callers (e.g. CLI/MCP clients using httpx ASGITransport).
"""
import asyncio
from httpx import ASGITransport, AsyncClient
def test_lifespan_shutdown_awaits_sync_task_cancellation(app, monkeypatch):
"""
Ensure lifespan shutdown awaits the cancelled background sync task.
Why this is deterministic:
- Cancelling a task does not make it "done" immediately; it becomes done only
once the event loop schedules it and it processes the CancelledError.
- In the buggy version, shutdown proceeded directly to db.shutdown_db()
immediately after calling cancel(), so at *entry* to shutdown_db the task
is still not done.
- In the fixed version, SyncCoordinator.stop() awaits the task before returning,
so by the time shutdown_db is called, the task is done (cancelled).
"""
# Import the *module* (not the package-level FastAPI `basic_memory.api.app` export)
# so monkeypatching affects the exact symbols referenced inside lifespan().
#
# Note: `basic_memory/api/__init__.py` re-exports `app`, so `import basic_memory.api.app`
# can resolve to the FastAPI instance rather than the `basic_memory.api.app` module.
import importlib
api_app_module = importlib.import_module("basic_memory.api.app")
container_module = importlib.import_module("basic_memory.api.container")
init_module = importlib.import_module("basic_memory.services.initialization")
# Keep startup cheap: we don't need real DB init for this ordering test.
async def _noop_initialize_app(_app_config):
return None
monkeypatch.setattr(api_app_module, "initialize_app", _noop_initialize_app)
# Patch the container's init_database to return fake objects
async def _fake_init_database(self):
self.engine = object()
self.session_maker = object()
return self.engine, self.session_maker
monkeypatch.setattr(container_module.ApiContainer, "init_database", _fake_init_database)
# Make the sync task long-lived so it must be cancelled on shutdown.
# Patch at the source module where SyncCoordinator imports it.
async def _fake_initialize_file_sync(_app_config):
await asyncio.Event().wait()
monkeypatch.setattr(init_module, "initialize_file_sync", _fake_initialize_file_sync)
# Assert ordering: shutdown_db must be called only after the sync_task is done.
# SyncCoordinator stores the task in _sync_task attribute.
async def _assert_sync_task_done_before_db_shutdown(self):
sync_coordinator = api_app_module.app.state.sync_coordinator
assert sync_coordinator._sync_task is not None
assert sync_coordinator._sync_task.done()
monkeypatch.setattr(
container_module.ApiContainer,
"shutdown_database",
_assert_sync_task_done_before_db_shutdown,
)
async def _run_client_once():
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client:
# Any request is sufficient to trigger lifespan startup/shutdown.
await client.get("/__nonexistent__")
# Use asyncio.run to match the CLI/MCP execution model where loop teardown
# would hang if a background task is left running.
asyncio.run(_run_client_once())
```
--------------------------------------------------------------------------------
/tests/cli/test_cli_container.py:
--------------------------------------------------------------------------------
```python
"""Tests for CLI container composition root."""
import pytest
from basic_memory.cli.container import (
CliContainer,
get_container,
set_container,
get_or_create_container,
)
from basic_memory.runtime import RuntimeMode
class TestCliContainer:
"""Tests for CliContainer."""
def test_create_from_config(self, app_config):
"""Container can be created from config."""
container = CliContainer(config=app_config, mode=RuntimeMode.LOCAL)
assert container.config == app_config
assert container.mode == RuntimeMode.LOCAL
def test_is_cloud_mode_when_cloud(self, app_config):
"""is_cloud_mode returns True in cloud mode."""
container = CliContainer(config=app_config, mode=RuntimeMode.CLOUD)
assert container.is_cloud_mode is True
def test_is_cloud_mode_when_local(self, app_config):
"""is_cloud_mode returns False in local mode."""
container = CliContainer(config=app_config, mode=RuntimeMode.LOCAL)
assert container.is_cloud_mode is False
def test_is_cloud_mode_when_test(self, app_config):
"""is_cloud_mode returns False in test mode."""
container = CliContainer(config=app_config, mode=RuntimeMode.TEST)
assert container.is_cloud_mode is False
class TestContainerAccessors:
"""Tests for container get/set functions."""
def test_get_container_raises_when_not_set(self, monkeypatch):
"""get_container raises RuntimeError when container not initialized."""
import basic_memory.cli.container as container_module
monkeypatch.setattr(container_module, "_container", None)
with pytest.raises(RuntimeError, match="CLI container not initialized"):
get_container()
def test_set_and_get_container(self, app_config, monkeypatch):
"""set_container allows get_container to return the container."""
import basic_memory.cli.container as container_module
container = CliContainer(config=app_config, mode=RuntimeMode.LOCAL)
monkeypatch.setattr(container_module, "_container", None)
set_container(container)
assert get_container() is container
class TestGetOrCreateContainer:
"""Tests for get_or_create_container - unique to CLI container."""
def test_creates_new_when_none_exists(self, monkeypatch):
"""get_or_create_container creates a new container when none exists."""
import basic_memory.cli.container as container_module
monkeypatch.setattr(container_module, "_container", None)
container = get_or_create_container()
assert container is not None
assert isinstance(container, CliContainer)
def test_returns_existing_when_set(self, app_config, monkeypatch):
"""get_or_create_container returns existing container if already set."""
import basic_memory.cli.container as container_module
existing = CliContainer(config=app_config, mode=RuntimeMode.LOCAL)
monkeypatch.setattr(container_module, "_container", existing)
result = get_or_create_container()
assert result is existing
def test_sets_module_level_container(self, monkeypatch):
"""get_or_create_container sets the module-level container."""
import basic_memory.cli.container as container_module
monkeypatch.setattr(container_module, "_container", None)
container = get_or_create_container()
# Verify it was set at module level
assert container_module._container is container
# Verify get_container now works
assert get_container() is container
```
--------------------------------------------------------------------------------
/tests/mcp/test_recent_activity_prompt_modes.py:
--------------------------------------------------------------------------------
```python
from datetime import UTC, datetime
import pytest
from basic_memory.mcp.prompts.recent_activity import recent_activity_prompt
from basic_memory.schemas.memory import (
ActivityStats,
ContextResult,
GraphContext,
MemoryMetadata,
ProjectActivity,
ProjectActivitySummary,
EntitySummary,
)
from basic_memory.schemas.search import SearchItemType
def _entity(title: str, entity_id: int = 1) -> EntitySummary:
return EntitySummary(
entity_id=entity_id,
permalink=title.lower().replace(" ", "-"),
title=title,
content=None,
file_path=f"{title}.md",
created_at=datetime.now(UTC),
)
@pytest.mark.asyncio
async def test_recent_activity_prompt_discovery_mode(monkeypatch):
recent = ProjectActivitySummary(
projects={
"p1": ProjectActivity(
project_name="p1",
project_path="/tmp/p1",
activity=GraphContext(
results=[
ContextResult(
primary_result=_entity("A"), observations=[], related_results=[]
)
],
metadata=MemoryMetadata(
uri=None,
types=[SearchItemType.ENTITY],
depth=1,
timeframe="7d",
generated_at=datetime.now(UTC),
),
),
item_count=1,
),
"p2": ProjectActivity(
project_name="p2",
project_path="/tmp/p2",
activity=GraphContext(
results=[
ContextResult(
primary_result=_entity("B", 2), observations=[], related_results=[]
)
],
metadata=MemoryMetadata(
uri=None,
types=[SearchItemType.ENTITY],
depth=1,
timeframe="7d",
generated_at=datetime.now(UTC),
),
),
item_count=1,
),
},
summary=ActivityStats(
total_projects=2, active_projects=2, most_active_project="p1", total_items=2
),
timeframe="7d",
generated_at=datetime.now(UTC),
)
async def fake_fn(**_kwargs):
return recent
monkeypatch.setattr("basic_memory.mcp.prompts.recent_activity.recent_activity.fn", fake_fn)
out = await recent_activity_prompt.fn(timeframe="7d", project=None) # pyright: ignore[reportGeneralTypeIssues]
assert "Recent Activity Across All Projects" in out
assert "Cross-Project Activity Discovery" in out
@pytest.mark.asyncio
async def test_recent_activity_prompt_project_mode(monkeypatch):
recent = GraphContext(
results=[
ContextResult(primary_result=_entity("Only"), observations=[], related_results=[])
],
metadata=MemoryMetadata(
uri=None,
types=[SearchItemType.ENTITY],
depth=1,
timeframe="1d",
generated_at=datetime.now(UTC),
),
)
async def fake_fn(**_kwargs):
return recent
monkeypatch.setattr("basic_memory.mcp.prompts.recent_activity.recent_activity.fn", fake_fn)
out = await recent_activity_prompt.fn(timeframe="1d", project="proj") # pyright: ignore[reportGeneralTypeIssues]
assert "Recent Activity in proj" in out
assert "Opportunity to Capture Activity Summary" in out
```
--------------------------------------------------------------------------------
/src/basic_memory/schemas/prompt.py:
--------------------------------------------------------------------------------
```python
"""Request and response schemas for prompt-related operations."""
from typing import Optional, List, Any, Dict
from pydantic import BaseModel, Field
from basic_memory.schemas.base import TimeFrame
from basic_memory.schemas.memory import EntitySummary, ObservationSummary, RelationSummary
class PromptContextItem(BaseModel):
"""Container for primary and related results to render in a prompt."""
primary_results: List[EntitySummary]
related_results: List[EntitySummary | ObservationSummary | RelationSummary]
class ContinueConversationRequest(BaseModel):
"""Request for generating a continue conversation prompt.
Used to provide context for continuing a conversation on a specific topic
or with recent activity from a given timeframe.
"""
topic: Optional[str] = Field(None, description="Topic or keyword to search for")
timeframe: Optional[TimeFrame] = Field(
None, description="How far back to look for activity (e.g. '1d', '1 week')"
)
# Limit depth to max 2 for performance reasons - higher values cause significant slowdown
search_items_limit: int = Field(
5,
description="Maximum number of search results to include in context (max 10)",
ge=1,
le=10,
)
depth: int = Field(
1,
description="How many relationship 'hops' to follow when building context (max 5)",
ge=1,
le=5,
)
# Limit related items to prevent overloading the context
related_items_limit: int = Field(
5, description="Maximum number of related items to include in context (max 10)", ge=1, le=10
)
class SearchPromptRequest(BaseModel):
"""Request for generating a search results prompt.
Used to format search results into a prompt with context and suggestions.
"""
query: str = Field(..., description="The search query text")
timeframe: Optional[TimeFrame] = Field(
None, description="Optional timeframe to limit results (e.g. '1d', '1 week')"
)
class PromptMetadata(BaseModel):
"""Metadata about a prompt response.
Contains statistical information about the prompt generation process
and results, useful for debugging and UI display.
"""
query: Optional[str] = Field(None, description="The original query or topic")
timeframe: Optional[str] = Field(None, description="The timeframe used for filtering")
search_count: int = Field(0, description="Number of search results found")
context_count: int = Field(0, description="Number of context items retrieved")
observation_count: int = Field(0, description="Total number of observations included")
relation_count: int = Field(0, description="Total number of relations included")
total_items: int = Field(0, description="Total number of all items included in the prompt")
search_limit: int = Field(0, description="Maximum search results requested")
context_depth: int = Field(0, description="Context depth used")
related_limit: int = Field(0, description="Maximum related items requested")
generated_at: str = Field(..., description="ISO timestamp when this prompt was generated")
class PromptResponse(BaseModel):
"""Response containing the rendered prompt.
Includes both the rendered prompt text and the context that was used
to render it, for potential client-side use.
"""
prompt: str = Field(..., description="The rendered prompt text")
context: Dict[str, Any] = Field(..., description="The context used to render the prompt")
metadata: PromptMetadata = Field(
..., description="Metadata about the prompt generation process"
)
```
--------------------------------------------------------------------------------
/test-int/mcp/test_read_note_integration.py:
--------------------------------------------------------------------------------
```python
"""
Integration tests for read_note MCP tool.
Tests the full flow: MCP client -> MCP server -> FastAPI -> database
"""
import pytest
from fastmcp import Client
@pytest.mark.asyncio
async def test_read_note_after_write(mcp_server, app, test_project):
"""Test read_note after write_note using real database."""
async with Client(mcp_server) as client:
# First write a note
write_result = await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": "Test Note",
"folder": "test",
"content": "# Test Note\n\nThis is test content.",
"tags": "test,integration",
},
)
assert len(write_result.content) == 1
assert write_result.content[0].type == "text"
assert "Test Note.md" in write_result.content[0].text
# Then read it back
read_result = await client.call_tool(
"read_note",
{
"project": test_project.name,
"identifier": "Test Note",
},
)
assert len(read_result.content) == 1
assert read_result.content[0].type == "text"
result_text = read_result.content[0].text
# Should contain the note content and metadata
assert "# Test Note" in result_text
assert "This is test content." in result_text
assert "test/test-note" in result_text # permalink
@pytest.mark.asyncio
async def test_read_note_underscored_folder_by_permalink(mcp_server, app, test_project):
"""Test read_note with permalink from underscored folder.
Reproduces bug #416: read_note fails to find notes when given permalinks
from underscored folder names (e.g., _archive/, _drafts/), even though
the permalink is copied directly from the note's YAML frontmatter.
"""
async with Client(mcp_server) as client:
# Create a note in an underscored folder
write_result = await client.call_tool(
"write_note",
{
"project": test_project.name,
"title": "Example Note",
"folder": "_archive/articles",
"content": "# Example Note\n\nThis is a test note in an underscored folder.",
"tags": "test,archive",
},
)
assert len(write_result.content) == 1
assert write_result.content[0].type == "text"
write_text = write_result.content[0].text
# Verify the file path includes the underscore
assert "_archive/articles/Example Note.md" in write_text
# Verify the permalink has underscores stripped (this is the expected behavior)
assert "archive/articles/example-note" in write_text
# Now try to read the note using the permalink (without underscores)
# This is the exact scenario from the bug report - using the permalink
# that was generated in the YAML frontmatter
read_result = await client.call_tool(
"read_note",
{
"project": test_project.name,
"identifier": "archive/articles/example-note", # permalink without underscores
},
)
# This should succeed - the note should be found by its permalink
assert len(read_result.content) == 1
assert read_result.content[0].type == "text"
result_text = read_result.content[0].text
# Should contain the note content
assert "# Example Note" in result_text
assert "This is a test note in an underscored folder." in result_text
assert "archive/articles/example-note" in result_text # permalink
```
--------------------------------------------------------------------------------
/src/basic_memory/alembic/alembic.ini:
--------------------------------------------------------------------------------
```
# A generic, single database configuration.
[alembic]
# path to migration scripts
# Use forward slashes (/) also on windows to provide an os agnostic path
script_location = .
# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s
# Uncomment the line below if you want the files to be prepended with date and time
# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file
# for all available tokens
# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s
# sys.path path, will be prepended to sys.path if present.
# defaults to the current working directory.
prepend_sys_path = .
# timezone to use when rendering the date within the migration file
# as well as the filename.
# If specified, requires the python>=3.9 or backports.zoneinfo library and tzdata library.
# Any required deps can installed by adding `alembic[tz]` to the pip requirements
# string value is passed to ZoneInfo()
# leave blank for localtime
# timezone =
# max length of characters to apply to the "slug" field
# truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; This defaults
# to migrations/versions. When using multiple version
# directories, initial revisions must be specified with --version-path.
# The path separator used here should be the separator specified by "version_path_separator" below.
# version_locations = %(here)s/bar:%(here)s/bat:migrations/versions
# version path separator; As mentioned above, this is the character used to split
# version_locations. The default within new alembic.ini files is "os", which uses os.pathsep.
# If this key is omitted entirely, it falls back to the legacy behavior of splitting on spaces and/or commas.
# Valid values for version_path_separator are:
#
# version_path_separator = :
# version_path_separator = ;
# version_path_separator = space
# version_path_separator = newline
#
# Use os.pathsep. Default configuration used for new projects.
version_path_separator = os
# set to 'true' to search source files recursively
# in each "version_locations" directory
# new in Alembic version 1.10
# recursive_version_locations = false
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
sqlalchemy.url = driver://user:pass@localhost/dbname
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks = black
# black.type = console_scripts
# black.entrypoint = black
# black.options = -l 79 REVISION_SCRIPT_FILENAME
# lint with attempts to fix using "ruff" - use the exec runner, execute a binary
# hooks = ruff
# ruff.type = exec
# ruff.executable = %(here)s/.venv/bin/ruff
# ruff.options = --fix REVISION_SCRIPT_FILENAME
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARNING
handlers = console
qualname =
[logger_sqlalchemy]
level = WARNING
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S
```
--------------------------------------------------------------------------------
/tests/mcp/test_project_context.py:
--------------------------------------------------------------------------------
```python
"""Tests for project context utilities (no standard-library mock usage).
These functions are config/env driven, so we use the real ConfigManager-backed
test config file and pytest monkeypatch for environment variables.
"""
from __future__ import annotations
import pytest
@pytest.mark.asyncio
async def test_cloud_mode_requires_project_by_default(config_manager, monkeypatch):
from basic_memory.mcp.project_context import resolve_project_parameter
cfg = config_manager.load_config()
cfg.cloud_mode = True
config_manager.save_config(cfg)
with pytest.raises(ValueError) as exc_info:
await resolve_project_parameter(project=None, allow_discovery=False)
assert "No project specified" in str(exc_info.value)
assert "Project is required for cloud mode" in str(exc_info.value)
@pytest.mark.asyncio
async def test_cloud_mode_allows_discovery_when_enabled(config_manager):
from basic_memory.mcp.project_context import resolve_project_parameter
cfg = config_manager.load_config()
cfg.cloud_mode = True
config_manager.save_config(cfg)
assert await resolve_project_parameter(project=None, allow_discovery=True) is None
@pytest.mark.asyncio
async def test_cloud_mode_returns_project_when_specified(config_manager):
from basic_memory.mcp.project_context import resolve_project_parameter
cfg = config_manager.load_config()
cfg.cloud_mode = True
config_manager.save_config(cfg)
assert await resolve_project_parameter(project="my-project") == "my-project"
@pytest.mark.asyncio
async def test_local_mode_uses_env_var_priority(config_manager, monkeypatch):
from basic_memory.mcp.project_context import resolve_project_parameter
cfg = config_manager.load_config()
cfg.cloud_mode = False
cfg.default_project_mode = False
config_manager.save_config(cfg)
monkeypatch.setenv("BASIC_MEMORY_MCP_PROJECT", "env-project")
assert await resolve_project_parameter(project="explicit-project") == "env-project"
@pytest.mark.asyncio
async def test_local_mode_uses_explicit_project(config_manager, monkeypatch):
from basic_memory.mcp.project_context import resolve_project_parameter
cfg = config_manager.load_config()
cfg.cloud_mode = False
cfg.default_project_mode = False
config_manager.save_config(cfg)
monkeypatch.delenv("BASIC_MEMORY_MCP_PROJECT", raising=False)
assert await resolve_project_parameter(project="explicit-project") == "explicit-project"
@pytest.mark.asyncio
async def test_local_mode_uses_default_project(config_manager, config_home, monkeypatch):
from basic_memory.mcp.project_context import resolve_project_parameter
cfg = config_manager.load_config()
cfg.cloud_mode = False
cfg.default_project_mode = True
# default_project must exist in the config project list, otherwise config validation
# will coerce it back to an existing default.
(config_home / "default-project").mkdir(parents=True, exist_ok=True)
cfg.projects["default-project"] = str(config_home / "default-project")
cfg.default_project = "default-project"
config_manager.save_config(cfg)
monkeypatch.delenv("BASIC_MEMORY_MCP_PROJECT", raising=False)
assert await resolve_project_parameter(project=None) == "default-project"
@pytest.mark.asyncio
async def test_local_mode_returns_none_when_no_resolution(config_manager, monkeypatch):
from basic_memory.mcp.project_context import resolve_project_parameter
cfg = config_manager.load_config()
cfg.cloud_mode = False
cfg.default_project_mode = False
config_manager.save_config(cfg)
monkeypatch.delenv("BASIC_MEMORY_MCP_PROJECT", raising=False)
assert await resolve_project_parameter(project=None) is None
```
--------------------------------------------------------------------------------
/src/basic_memory/markdown/utils.py:
--------------------------------------------------------------------------------
```python
"""Utilities for converting between markdown and entity models."""
from pathlib import Path
from typing import Any, Optional
from frontmatter import Post
from basic_memory.file_utils import has_frontmatter, remove_frontmatter, parse_frontmatter
from basic_memory.markdown import EntityMarkdown
from basic_memory.models import Entity
from basic_memory.models import Observation as ObservationModel
def entity_model_from_markdown(
file_path: Path,
markdown: EntityMarkdown,
entity: Optional[Entity] = None,
project_id: Optional[int] = None,
) -> Entity:
"""
Convert markdown entity to model. Does not include relations.
Args:
file_path: Path to the markdown file
markdown: Parsed markdown entity
entity: Optional existing entity to update
project_id: Project ID for new observations (uses entity.project_id if not provided)
Returns:
Entity model populated from markdown
Raises:
ValueError: If required datetime fields are missing from markdown
"""
if not markdown.created or not markdown.modified: # pragma: no cover
raise ValueError("Both created and modified dates are required in markdown")
# Create or update entity
model = entity or Entity()
# Update basic fields
model.title = markdown.frontmatter.title
model.entity_type = markdown.frontmatter.type
# Only update permalink if it exists in frontmatter, otherwise preserve existing
if markdown.frontmatter.permalink is not None:
model.permalink = markdown.frontmatter.permalink
model.file_path = file_path.as_posix()
model.content_type = "text/markdown"
model.created_at = markdown.created
model.updated_at = markdown.modified
# Handle metadata - ensure all values are strings and filter None
metadata = markdown.frontmatter.metadata or {}
model.entity_metadata = {k: str(v) for k, v in metadata.items() if v is not None}
# Get project_id from entity if not provided
obs_project_id = project_id or (model.project_id if hasattr(model, "project_id") else None)
# Convert observations
model.observations = [
ObservationModel(
project_id=obs_project_id,
content=obs.content,
category=obs.category,
context=obs.context,
tags=obs.tags,
)
for obs in markdown.observations
]
return model
async def schema_to_markdown(schema: Any) -> Post:
"""
Convert schema to markdown Post object.
Args:
schema: Schema to convert (must have title, entity_type, and permalink attributes)
Returns:
Post object with frontmatter metadata
"""
# Extract content and metadata
content = schema.content or ""
entity_metadata = dict(schema.entity_metadata or {})
# if the content contains frontmatter, remove it and merge
if has_frontmatter(content):
content_frontmatter = parse_frontmatter(content)
content = remove_frontmatter(content)
# Merge content frontmatter with entity metadata
# (entity_metadata takes precedence for conflicts)
content_frontmatter.update(entity_metadata)
entity_metadata = content_frontmatter
# Remove special fields for ordered frontmatter
for field in ["type", "title", "permalink"]:
entity_metadata.pop(field, None)
# Create Post with fields ordered by insert order
post = Post(
content,
title=schema.title,
type=schema.entity_type,
)
# set the permalink if passed in
if schema.permalink:
post.metadata["permalink"] = schema.permalink
if entity_metadata:
post.metadata.update(entity_metadata)
return post
```
--------------------------------------------------------------------------------
/src/basic_memory/schemas/request.py:
--------------------------------------------------------------------------------
```python
"""Request schemas for interacting with the knowledge graph."""
from typing import List, Optional, Annotated, Literal
from annotated_types import MaxLen, MinLen
from pydantic import BaseModel, field_validator
from basic_memory.schemas.base import (
Relation,
Permalink,
)
class SearchNodesRequest(BaseModel):
"""Search for entities in the knowledge graph.
The search looks across multiple fields:
- Entity title
- Entity types
- summary
- file content
- Observations
Features:
- Case-insensitive matching
- Partial word matches
- Returns full entity objects with relations
- Includes all matching entities
- If a category is specified, only entities with that category are returned
Example Queries:
- "memory" - Find entities related to memory systems
- "SQLite" - Find database-related components
- "test" - Find test-related entities
- "implementation" - Find concrete implementations
- "service" - Find service components
Note: Currently uses SQL ILIKE for matching. Wildcard (*) searches
and full-text search capabilities are planned for future versions.
"""
query: Annotated[str, MinLen(1), MaxLen(200)]
category: Optional[str] = None
class GetEntitiesRequest(BaseModel):
"""Retrieve specific entities by their IDs.
Used to load complete entity details including all observations
and relations. Particularly useful for following relations
discovered through search.
"""
permalinks: Annotated[List[Permalink], MinLen(1), MaxLen(10)]
class CreateRelationsRequest(BaseModel):
relations: List[Relation]
class EditEntityRequest(BaseModel):
"""Request schema for editing an existing entity's content.
This allows for targeted edits without requiring the full entity content.
Supports various operation types for different editing scenarios.
"""
operation: Literal["append", "prepend", "find_replace", "replace_section"]
content: str
section: Optional[str] = None
find_text: Optional[str] = None
expected_replacements: int = 1
@field_validator("section")
@classmethod
def validate_section_for_replace_section(cls, v, info):
"""Ensure section is provided for replace_section operation."""
if info.data.get("operation") == "replace_section" and not v:
raise ValueError("section parameter is required for replace_section operation")
return v
@field_validator("find_text")
@classmethod
def validate_find_text_for_find_replace(cls, v, info):
"""Ensure find_text is provided for find_replace operation."""
if info.data.get("operation") == "find_replace" and not v:
raise ValueError("find_text parameter is required for find_replace operation")
return v
class MoveEntityRequest(BaseModel):
"""Request schema for moving an entity to a new file location.
This allows moving notes to different paths while maintaining project
consistency and optionally updating permalinks based on configuration.
"""
identifier: Annotated[str, MinLen(1), MaxLen(200)]
destination_path: Annotated[str, MinLen(1), MaxLen(500)]
project: Optional[str] = None
@field_validator("destination_path")
@classmethod
def validate_destination_path(cls, v):
"""Ensure destination path is relative and valid."""
if v.startswith("/"):
raise ValueError("destination_path must be relative, not absolute")
if ".." in v:
raise ValueError("destination_path cannot contain '..' path components")
if not v.strip():
raise ValueError("destination_path cannot be empty or whitespace only")
return v.strip()
```
--------------------------------------------------------------------------------
/tests/cli/test_cli_tool_exit.py:
--------------------------------------------------------------------------------
```python
"""Test that CLI tool commands exit cleanly without hanging.
This test ensures that CLI commands properly clean up database connections
on exit, preventing process hangs. See GitHub issue for details.
The issue occurs when:
1. ensure_initialization() calls asyncio.run(initialize_app())
2. initialize_app() creates global database connections via db.get_or_create_db()
3. When asyncio.run() completes, the event loop closes
4. But the global database engine holds async connections that prevent clean exit
5. Process hangs indefinitely
The fix ensures db.shutdown_db() is called before asyncio.run() returns.
"""
import os
import platform
import subprocess
import sys
import pytest
# Windows has different process cleanup behavior that makes these tests unreliable
IS_WINDOWS = platform.system() == "Windows"
SUBPROCESS_TIMEOUT = 10.0
skip_on_windows = pytest.mark.skipif(
IS_WINDOWS, reason="Subprocess cleanup tests unreliable on Windows CI"
)
@skip_on_windows
class TestCLIToolExit:
"""Test that CLI tool commands exit cleanly."""
@pytest.mark.parametrize(
"command",
[
["tool", "--help"],
["tool", "write-note", "--help"],
["tool", "read-note", "--help"],
["tool", "search-notes", "--help"],
["tool", "build-context", "--help"],
],
)
def test_cli_command_exits_cleanly(self, command: list[str]):
"""Test that CLI commands exit without hanging.
Each command should complete within the timeout without requiring
manual termination (Ctrl+C).
"""
full_command = [sys.executable, "-m", "basic_memory.cli.main"] + command
try:
result = subprocess.run(
full_command,
capture_output=True,
text=True,
timeout=SUBPROCESS_TIMEOUT,
)
# Command should exit with code 0 for --help
assert result.returncode == 0, f"Command failed: {result.stderr}"
except subprocess.TimeoutExpired:
pytest.fail(
f"Command '{' '.join(command)}' hung and did not exit within timeout. "
"This indicates database connections are not being cleaned up properly."
)
def test_ensure_initialization_exits_cleanly(self, tmp_path):
"""Test that ensure_initialization doesn't cause process hang.
This test directly tests the initialization function that's called
by CLI commands, ensuring it cleans up database connections properly.
"""
code = """
import asyncio
from basic_memory.config import ConfigManager
from basic_memory.services.initialization import ensure_initialization
app_config = ConfigManager().config
ensure_initialization(app_config)
print("OK")
"""
try:
# Ensure the subprocess uses an isolated home directory so ConfigManager doesn't
# touch the real user profile/AppData (which can be slow/flaky on CI Windows).
env = dict(os.environ)
bm_home = tmp_path / "basic-memory-home"
env["BASIC_MEMORY_HOME"] = str(bm_home)
env["HOME"] = str(tmp_path)
env["USERPROFILE"] = str(tmp_path)
result = subprocess.run(
[sys.executable, "-c", code],
capture_output=True,
text=True,
timeout=SUBPROCESS_TIMEOUT,
env=env,
)
assert "OK" in result.stdout, f"Unexpected output: {result.stdout}"
except subprocess.TimeoutExpired:
pytest.fail(
"ensure_initialization() caused process hang. "
"Database connections are not being cleaned up before event loop closes."
)
```
--------------------------------------------------------------------------------
/tests/api/test_management_router.py:
--------------------------------------------------------------------------------
```python
"""Tests for management router API endpoints (minimal mocking).
These endpoints are mostly simple state checks and wiring; we use stub objects
and pytest monkeypatch instead of standard-library mocks.
"""
from __future__ import annotations
import pytest
from fastapi import FastAPI
from basic_memory.api.routers.management_router import (
WatchStatusResponse,
get_watch_status,
start_watch_service,
stop_watch_service,
)
class _Request:
def __init__(self, app: FastAPI):
self.app = app
class _Task:
def __init__(self, *, done: bool):
self._done = done
self.cancel_called = False
def done(self) -> bool:
return self._done
def cancel(self) -> None:
self.cancel_called = True
@pytest.fixture
def app_with_state() -> FastAPI:
app = FastAPI()
app.state.watch_task = None
return app
@pytest.mark.asyncio
async def test_get_watch_status_not_running(app_with_state: FastAPI):
app_with_state.state.watch_task = None
resp = await get_watch_status(_Request(app_with_state))
assert isinstance(resp, WatchStatusResponse)
assert resp.running is False
@pytest.mark.asyncio
async def test_get_watch_status_running(app_with_state: FastAPI):
app_with_state.state.watch_task = _Task(done=False)
resp = await get_watch_status(_Request(app_with_state))
assert resp.running is True
@pytest.mark.asyncio
async def test_start_watch_service_when_not_running(monkeypatch, app_with_state: FastAPI):
app_with_state.state.watch_task = None
created = {"watch_service": None, "task": None}
class _StubWatchService:
def __init__(self, *, app_config, project_repository):
self.app_config = app_config
self.project_repository = project_repository
created["watch_service"] = self
def _create_background_sync_task(sync_service, watch_service):
created["task"] = _Task(done=False)
return created["task"]
# start_watch_service imports these inside the function, so patch at the source modules.
monkeypatch.setattr("basic_memory.sync.WatchService", _StubWatchService)
monkeypatch.setattr(
"basic_memory.sync.background_sync.create_background_sync_task",
_create_background_sync_task,
)
project_repository = object()
sync_service = object()
resp = await start_watch_service(_Request(app_with_state), project_repository, sync_service)
assert resp.running is True
assert app_with_state.state.watch_task is created["task"]
assert created["watch_service"] is not None
assert created["watch_service"].project_repository is project_repository
@pytest.mark.asyncio
async def test_start_watch_service_already_running(monkeypatch, app_with_state: FastAPI):
existing = _Task(done=False)
app_with_state.state.watch_task = existing
def _should_not_be_called(*_args, **_kwargs):
raise AssertionError("create_background_sync_task should not be called if already running")
monkeypatch.setattr(
"basic_memory.sync.background_sync.create_background_sync_task",
_should_not_be_called,
)
resp = await start_watch_service(_Request(app_with_state), object(), object())
assert resp.running is True
assert app_with_state.state.watch_task is existing
@pytest.mark.asyncio
async def test_stop_watch_service_not_running(app_with_state: FastAPI):
app_with_state.state.watch_task = None
resp = await stop_watch_service(_Request(app_with_state))
assert resp.running is False
@pytest.mark.asyncio
async def test_stop_watch_service_already_done(app_with_state: FastAPI):
app_with_state.state.watch_task = _Task(done=True)
resp = await stop_watch_service(_Request(app_with_state))
assert resp.running is False
```
--------------------------------------------------------------------------------
/test-int/mcp/test_project_state_sync_integration.py:
--------------------------------------------------------------------------------
```python
"""Integration test for project state synchronization between MCP session and CLI config.
This test validates the fix for GitHub issue #148 where MCP session and CLI commands
had inconsistent project state, causing "Project not found" errors and edit failures.
The test simulates the exact workflow reported in the issue:
1. MCP server starts with a default project
2. Default project is changed via CLI/API
3. MCP tools should immediately use the new project (no restart needed)
4. All operations should work consistently in the new project context
"""
import pytest
from fastmcp import Client
@pytest.mark.asyncio
async def test_project_state_sync_after_default_change(
mcp_server, app, config_manager, test_project, tmp_path
):
"""Test that MCP session stays in sync when default project is changed."""
async with Client(mcp_server) as client:
# Step 1: Create a second project that we can switch to
create_result = await client.call_tool(
"create_memory_project",
{
"project_name": "minerva",
"project_path": str(tmp_path.parent / (tmp_path.name + "-projects") / "minerva"),
"set_default": False, # Don't set as default yet
},
)
assert len(create_result.content) == 1
assert "✓" in create_result.content[0].text # pyright: ignore [reportAttributeAccessIssue]
assert "minerva" in create_result.content[0].text # pyright: ignore [reportAttributeAccessIssue]
# Step 2: Test that note operations work in the new project context
# This validates that the identifier resolution works correctly
write_result = await client.call_tool(
"write_note",
{
"project": "minerva",
"title": "Test Consistency Note",
"folder": "test",
"content": "# Test Note\n\nThis note tests project state consistency.\n\n- [test] Project state sync working",
"tags": "test,consistency",
},
)
assert len(write_result.content) == 1
assert "Test Consistency Note" in write_result.content[0].text # pyright: ignore [reportAttributeAccessIssue]
# Step 3: Test that we can read the note we just created
read_result = await client.call_tool(
"read_note", {"project": "minerva", "identifier": "Test Consistency Note"}
)
assert len(read_result.content) == 1
assert "Test Consistency Note" in read_result.content[0].text # pyright: ignore [reportAttributeAccessIssue]
assert "project state sync working" in read_result.content[0].text.lower() # pyright: ignore [reportAttributeAccessIssue]
# Step 4: Test that edit operations work (this was failing in the original issue)
edit_result = await client.call_tool(
"edit_note",
{
"project": "minerva",
"identifier": "Test Consistency Note",
"operation": "append",
"content": "\n\n## Update\n\nEdit operation successful after project switch!",
},
)
assert len(edit_result.content) == 1
assert (
"added" in edit_result.content[0].text.lower() # pyright: ignore [reportAttributeAccessIssue]
and "lines" in edit_result.content[0].text.lower() # pyright: ignore [reportAttributeAccessIssue]
)
# Step 5: Verify the edit was applied
final_read_result = await client.call_tool(
"read_note", {"project": "minerva", "identifier": "Test Consistency Note"}
)
assert len(final_read_result.content) == 1
final_content = final_read_result.content[0].text # pyright: ignore [reportAttributeAccessIssue]
assert "Edit operation successful" in final_content
```
--------------------------------------------------------------------------------
/tests/mcp/test_mcp_container.py:
--------------------------------------------------------------------------------
```python
"""Tests for MCP container composition root."""
import pytest
from basic_memory.mcp.container import (
McpContainer,
get_container,
set_container,
)
from basic_memory.runtime import RuntimeMode
class TestMcpContainer:
"""Tests for McpContainer."""
def test_create_from_config(self, app_config):
"""Container can be created from config manager."""
container = McpContainer(config=app_config, mode=RuntimeMode.LOCAL)
assert container.config == app_config
assert container.mode == RuntimeMode.LOCAL
def test_should_sync_files_when_enabled_local_mode(self, app_config):
"""Sync should be enabled in local mode when config says so."""
app_config.sync_changes = True
container = McpContainer(config=app_config, mode=RuntimeMode.LOCAL)
assert container.should_sync_files is True
def test_should_not_sync_files_when_disabled(self, app_config):
"""Sync should be disabled when config says so."""
app_config.sync_changes = False
container = McpContainer(config=app_config, mode=RuntimeMode.LOCAL)
assert container.should_sync_files is False
def test_should_not_sync_files_in_test_mode(self, app_config):
"""Sync should be disabled in test mode regardless of config."""
app_config.sync_changes = True
container = McpContainer(config=app_config, mode=RuntimeMode.TEST)
assert container.should_sync_files is False
def test_should_not_sync_files_in_cloud_mode(self, app_config):
"""Sync should be disabled in cloud mode (cloud handles sync differently)."""
app_config.sync_changes = True
container = McpContainer(config=app_config, mode=RuntimeMode.CLOUD)
assert container.should_sync_files is False
class TestSyncSkipReason:
"""Tests for sync_skip_reason property."""
def test_skip_reason_in_test_mode(self, app_config):
"""Returns test message when in test mode."""
container = McpContainer(config=app_config, mode=RuntimeMode.TEST)
assert container.sync_skip_reason == "Test environment detected"
def test_skip_reason_in_cloud_mode(self, app_config):
"""Returns cloud message when in cloud mode."""
container = McpContainer(config=app_config, mode=RuntimeMode.CLOUD)
assert container.sync_skip_reason == "Cloud mode enabled"
def test_skip_reason_when_sync_disabled(self, app_config):
"""Returns disabled message when sync is disabled."""
app_config.sync_changes = False
container = McpContainer(config=app_config, mode=RuntimeMode.LOCAL)
assert container.sync_skip_reason == "Sync changes disabled"
def test_no_skip_reason_when_should_sync(self, app_config):
"""Returns None when sync should run."""
app_config.sync_changes = True
container = McpContainer(config=app_config, mode=RuntimeMode.LOCAL)
assert container.sync_skip_reason is None
class TestContainerAccessors:
"""Tests for container get/set functions."""
def test_get_container_raises_when_not_set(self, monkeypatch):
"""get_container raises RuntimeError when container not initialized."""
import basic_memory.mcp.container as container_module
monkeypatch.setattr(container_module, "_container", None)
with pytest.raises(RuntimeError, match="MCP container not initialized"):
get_container()
def test_set_and_get_container(self, app_config, monkeypatch):
"""set_container allows get_container to return the container."""
import basic_memory.mcp.container as container_module
container = McpContainer(config=app_config, mode=RuntimeMode.LOCAL)
monkeypatch.setattr(container_module, "_container", None)
set_container(container)
assert get_container() is container
```
--------------------------------------------------------------------------------
/src/basic_memory/cli/commands/cloud/bisync_commands.py:
--------------------------------------------------------------------------------
```python
"""Cloud bisync utility functions for Basic Memory CLI."""
from pathlib import Path
from basic_memory.cli.commands.cloud.api_client import make_api_request
from basic_memory.config import ConfigManager
from basic_memory.ignore_utils import create_default_bmignore, get_bmignore_path
from basic_memory.schemas.cloud import MountCredentials, TenantMountInfo
class BisyncError(Exception):
"""Exception raised for bisync-related errors."""
pass
async def get_mount_info() -> TenantMountInfo:
"""Get current tenant information from cloud API."""
try:
config_manager = ConfigManager()
config = config_manager.config
host_url = config.cloud_host.rstrip("/")
response = await make_api_request(method="GET", url=f"{host_url}/tenant/mount/info")
return TenantMountInfo.model_validate(response.json())
except Exception as e:
raise BisyncError(f"Failed to get tenant info: {e}") from e
async def generate_mount_credentials(tenant_id: str) -> MountCredentials:
"""Generate scoped credentials for syncing."""
try:
config_manager = ConfigManager()
config = config_manager.config
host_url = config.cloud_host.rstrip("/")
response = await make_api_request(method="POST", url=f"{host_url}/tenant/mount/credentials")
return MountCredentials.model_validate(response.json())
except Exception as e:
raise BisyncError(f"Failed to generate credentials: {e}") from e
def convert_bmignore_to_rclone_filters() -> Path:
"""Convert .bmignore patterns to rclone filter format.
Reads ~/.basic-memory/.bmignore (gitignore-style) and converts to
~/.basic-memory/.bmignore.rclone (rclone filter format).
Only regenerates if .bmignore has been modified since last conversion.
Returns:
Path to converted rclone filter file
"""
# Ensure .bmignore exists
create_default_bmignore()
bmignore_path = get_bmignore_path()
# Create rclone filter path: ~/.basic-memory/.bmignore -> ~/.basic-memory/.bmignore.rclone
rclone_filter_path = bmignore_path.parent / f"{bmignore_path.name}.rclone"
# Skip regeneration if rclone file is newer than bmignore
if rclone_filter_path.exists():
bmignore_mtime = bmignore_path.stat().st_mtime
rclone_mtime = rclone_filter_path.stat().st_mtime
if rclone_mtime >= bmignore_mtime:
return rclone_filter_path
# Read .bmignore patterns
patterns = []
try:
with bmignore_path.open("r", encoding="utf-8") as f:
for line in f:
line = line.strip()
# Keep comments and empty lines
if not line or line.startswith("#"):
patterns.append(line)
continue
# Convert gitignore pattern to rclone filter syntax
# gitignore: node_modules → rclone: - node_modules/**
# gitignore: *.pyc → rclone: - *.pyc
if "*" in line:
# Pattern already has wildcard, just add exclude prefix
patterns.append(f"- {line}")
else:
# Directory pattern - add /** for recursive exclude
patterns.append(f"- {line}/**")
except Exception:
# If we can't read the file, create a minimal filter
patterns = ["# Error reading .bmignore, using minimal filters", "- .git/**"]
# Write rclone filter file
rclone_filter_path.write_text("\n".join(patterns) + "\n")
return rclone_filter_path
def get_bisync_filter_path() -> Path:
"""Get path to bisync filter file.
Uses ~/.basic-memory/.bmignore (converted to rclone format).
The file is automatically created with default patterns on first use.
Returns:
Path to rclone filter file
"""
return convert_bmignore_to_rclone_filters()
```
--------------------------------------------------------------------------------
/tests/utils/test_timezone_utils.py:
--------------------------------------------------------------------------------
```python
"""Tests for timezone utilities."""
from datetime import datetime, timezone
from basic_memory.utils import ensure_timezone_aware
class TestEnsureTimezoneAware:
"""Tests for ensure_timezone_aware function."""
def test_already_timezone_aware_returns_unchanged(self):
"""Timezone-aware datetime should be returned unchanged."""
dt = datetime(2024, 1, 15, 12, 30, 0, tzinfo=timezone.utc)
result = ensure_timezone_aware(dt)
assert result == dt
assert result.tzinfo == timezone.utc
def test_naive_datetime_cloud_mode_true_interprets_as_utc(self):
"""In cloud mode, naive datetimes should be interpreted as UTC."""
naive_dt = datetime(2024, 1, 15, 12, 30, 0)
result = ensure_timezone_aware(naive_dt, cloud_mode=True)
# Should have UTC timezone
assert result.tzinfo == timezone.utc
# Time values should be unchanged (just tagged as UTC)
assert result.year == 2024
assert result.month == 1
assert result.day == 15
assert result.hour == 12
assert result.minute == 30
def test_naive_datetime_cloud_mode_false_interprets_as_local(self):
"""In local mode, naive datetimes should be interpreted as local time."""
naive_dt = datetime(2024, 1, 15, 12, 30, 0)
result = ensure_timezone_aware(naive_dt, cloud_mode=False)
# Should have some timezone info (local)
assert result.tzinfo is not None
# The datetime should be converted to local timezone
# We can't assert exact timezone as it depends on system
def test_cloud_mode_true_does_not_shift_time(self):
"""Cloud mode should use replace() not astimezone() - time values unchanged."""
naive_dt = datetime(2024, 6, 15, 18, 0, 0) # Summer time
result = ensure_timezone_aware(naive_dt, cloud_mode=True)
# Hour should remain 18, not be shifted by timezone offset
assert result.hour == 18
assert result.tzinfo == timezone.utc
def test_explicit_cloud_mode_skips_config_loading(self):
"""When cloud_mode is explicitly passed, config should not be loaded."""
# This test verifies we can call ensure_timezone_aware without
# triggering ConfigManager import when cloud_mode is explicit
naive_dt = datetime(2024, 1, 15, 12, 30, 0)
# Should work without any config setup
result_cloud = ensure_timezone_aware(naive_dt, cloud_mode=True)
assert result_cloud.tzinfo == timezone.utc
result_local = ensure_timezone_aware(naive_dt, cloud_mode=False)
assert result_local.tzinfo is not None
def test_none_cloud_mode_falls_back_to_config(self, config_manager):
"""When cloud_mode is None, should load from config."""
naive_dt = datetime(2024, 1, 15, 12, 30, 0)
# Use the real config file (via test fixtures) rather than mocking.
cfg = config_manager.config
cfg.cloud_mode = True
config_manager.save_config(cfg)
result = ensure_timezone_aware(naive_dt, cloud_mode=None)
# Should have used cloud mode (UTC)
assert result.tzinfo == timezone.utc
def test_asyncpg_naive_utc_scenario(self):
"""Simulate asyncpg returning naive datetime that's actually UTC.
asyncpg binary protocol returns timestamps in UTC but as naive datetimes.
In cloud mode, we interpret these as UTC rather than local time.
"""
# Simulate what asyncpg returns: a naive datetime that's actually UTC
asyncpg_result = datetime(2024, 1, 15, 18, 30, 0) # 6:30 PM UTC
# In cloud mode, interpret as UTC
cloud_result = ensure_timezone_aware(asyncpg_result, cloud_mode=True)
assert cloud_result == datetime(2024, 1, 15, 18, 30, 0, tzinfo=timezone.utc)
# The hour should remain 18, not shifted
assert cloud_result.hour == 18
```
--------------------------------------------------------------------------------
/tests/markdown/test_relation_edge_cases.py:
--------------------------------------------------------------------------------
```python
"""Tests for edge cases in relation parsing."""
from markdown_it import MarkdownIt
from basic_memory.markdown.plugins import relation_plugin, parse_relation, parse_inline_relations
from basic_memory.markdown.schemas import Relation
def test_empty_targets():
"""Test handling of empty targets."""
md = MarkdownIt().use(relation_plugin)
# Empty brackets
tokens = md.parse("- type [[]]")
token = next(t for t in tokens if t.type == "inline")
assert parse_relation(token) is None
# Only spaces
tokens = md.parse("- type [[ ]]")
token = next(t for t in tokens if t.type == "inline")
assert parse_relation(token) is None
# Whitespace in brackets
tokens = md.parse("- type [[ ]]")
token = next(t for t in tokens if t.type == "inline")
assert parse_relation(token) is None
def test_malformed_links():
"""Test handling of malformed wiki links."""
md = MarkdownIt().use(relation_plugin)
# Missing close brackets
tokens = md.parse("- type [[Target")
assert not any(t.meta and "relations" in t.meta for t in tokens)
# Missing open brackets
tokens = md.parse("- type Target]]")
assert not any(t.meta and "relations" in t.meta for t in tokens)
# Backwards brackets
tokens = md.parse("- type ]]Target[[")
assert not any(t.meta and "relations" in t.meta for t in tokens)
# Nested brackets
tokens = md.parse("- type [[Outer [[Inner]] ]]")
token = next(t for t in tokens if t.type == "inline")
rel = parse_relation(token)
assert rel is not None
assert "Outer" in rel["target"]
def test_context_handling():
"""Test handling of contexts."""
md = MarkdownIt().use(relation_plugin)
# Unclosed context
tokens = md.parse("- type [[Target]] (unclosed")
token = next(t for t in tokens if t.type == "inline")
rel = parse_relation(token)
assert rel["context"] is None
# Multiple parens
tokens = md.parse("- type [[Target]] (with (nested) parens)")
token = next(t for t in tokens if t.type == "inline")
rel = parse_relation(token)
assert rel["context"] == "with (nested) parens"
# Empty context
tokens = md.parse("- type [[Target]] ()")
token = next(t for t in tokens if t.type == "inline")
rel = parse_relation(token)
assert rel["context"] is None
def test_inline_relations():
"""Test inline relation detection."""
md = MarkdownIt().use(relation_plugin)
# Multiple links in text
text = "Text with [[Link1]] and [[Link2]] and [[Link3]]"
rels = parse_inline_relations(text)
assert len(rels) == 3
assert {r["target"] for r in rels} == {"Link1", "Link2", "Link3"}
# Links with surrounding text
text = "Before [[Target]] After"
rels = parse_inline_relations(text)
assert len(rels) == 1
assert rels[0]["target"] == "Target"
# Multiple links on same line
tokens = md.parse("[[One]] [[Two]] [[Three]]")
token = next(t for t in tokens if t.type == "inline")
assert len(token.meta["relations"]) == 3
def test_unicode_targets():
"""Test handling of Unicode in targets."""
md = MarkdownIt().use(relation_plugin)
# Unicode in target
tokens = md.parse("- type [[测试]]")
token = next(t for t in tokens if t.type == "inline")
rel = parse_relation(token)
assert rel["target"] == "测试"
# Unicode in type
tokens = md.parse("- 使用 [[Target]]")
token = next(t for t in tokens if t.type == "inline")
rel = parse_relation(token)
assert rel["type"] == "使用"
# Unicode in context
tokens = md.parse("- type [[Target]] (测试)")
token = next(t for t in tokens if t.type == "inline")
rel = parse_relation(token)
assert rel["context"] == "测试"
# Model validation with Unicode
relation = Relation.model_validate(rel)
assert relation.type == "type"
assert relation.target == "Target"
assert relation.context == "测试"
```
--------------------------------------------------------------------------------
/src/basic_memory/schemas/search.py:
--------------------------------------------------------------------------------
```python
"""Search schemas for Basic Memory.
The search system supports three primary modes:
1. Exact permalink lookup
2. Pattern matching with *
3. Full-text search across content
"""
from typing import Optional, List, Union
from datetime import datetime
from enum import Enum
from pydantic import BaseModel, field_validator
from basic_memory.schemas.base import Permalink
class SearchItemType(str, Enum):
"""Types of searchable items."""
ENTITY = "entity"
OBSERVATION = "observation"
RELATION = "relation"
class SearchQuery(BaseModel):
"""Search query parameters.
Use ONE of these primary search modes:
- permalink: Exact permalink match
- permalink_match: Path pattern with *
- text: Full-text search of title/content (supports boolean operators: AND, OR, NOT)
Optionally filter results by:
- types: Limit to specific item types
- entity_types: Limit to specific entity types
- after_date: Only items after date
Boolean search examples:
- "python AND flask" - Find items with both terms
- "python OR django" - Find items with either term
- "python NOT django" - Find items with python but not django
- "(python OR flask) AND web" - Use parentheses for grouping
"""
# Primary search modes (use ONE of these)
permalink: Optional[str] = None # Exact permalink match
permalink_match: Optional[str] = None # Glob permalink match
text: Optional[str] = None # Full-text search (now supports boolean operators)
title: Optional[str] = None # title only search
# Optional filters
types: Optional[List[str]] = None # Filter by type
entity_types: Optional[List[SearchItemType]] = None # Filter by entity type
after_date: Optional[Union[datetime, str]] = None # Time-based filter
@field_validator("after_date")
@classmethod
def validate_date(cls, v: Optional[Union[datetime, str]]) -> Optional[str]:
"""Convert datetime to ISO format if needed."""
if isinstance(v, datetime):
return v.isoformat()
return v
def no_criteria(self) -> bool:
return (
self.permalink is None
and self.permalink_match is None
and self.title is None
and self.text is None
and self.after_date is None
and self.types is None
and self.entity_types is None
)
def has_boolean_operators(self) -> bool:
"""Check if the text query contains boolean operators (AND, OR, NOT)."""
if not self.text: # pragma: no cover
return False
# Check for common boolean operators with correct word boundaries
# to avoid matching substrings like "GRAND" containing "AND"
boolean_patterns = [" AND ", " OR ", " NOT ", "(", ")"]
text = f" {self.text} " # Add spaces to ensure we match word boundaries
return any(pattern in text for pattern in boolean_patterns)
class SearchResult(BaseModel):
"""Search result with score and metadata."""
title: str
type: SearchItemType
score: float
entity: Optional[Permalink] = None
permalink: Optional[str]
content: Optional[str] = None
file_path: str
metadata: Optional[dict] = None
# IDs for v2 API consistency
entity_id: Optional[int] = None # Entity ID (always present for entities)
observation_id: Optional[int] = None # Observation ID (for observation results)
relation_id: Optional[int] = None # Relation ID (for relation results)
# Type-specific fields
category: Optional[str] = None # For observations
from_entity: Optional[Permalink] = None # For relations
to_entity: Optional[Permalink] = None # For relations
relation_type: Optional[str] = None # For relations
class SearchResponse(BaseModel):
"""Wrapper for search results."""
results: List[SearchResult]
current_page: int
page_size: int
```
--------------------------------------------------------------------------------
/tests/api/v2/test_directory_router.py:
--------------------------------------------------------------------------------
```python
"""Tests for V2 directory API routes (ID-based endpoints)."""
import pytest
from httpx import AsyncClient
from basic_memory.models import Project
from basic_memory.schemas.directory import DirectoryNode
@pytest.mark.asyncio
async def test_get_directory_tree(
client: AsyncClient,
test_project: Project,
v2_project_url: str,
):
"""Test getting directory tree via v2 endpoint."""
response = await client.get(f"{v2_project_url}/directory/tree")
assert response.status_code == 200
tree = DirectoryNode.model_validate(response.json())
assert tree.type == "directory"
@pytest.mark.asyncio
async def test_get_directory_structure(
client: AsyncClient,
test_project: Project,
v2_project_url: str,
):
"""Test getting directory structure (folders only) via v2 endpoint."""
response = await client.get(f"{v2_project_url}/directory/structure")
assert response.status_code == 200
structure = DirectoryNode.model_validate(response.json())
assert structure.type == "directory"
# Structure should only contain directories, not files
if structure.children:
for child in structure.children:
assert child.type == "directory"
@pytest.mark.asyncio
async def test_list_directory_default(
client: AsyncClient,
test_project: Project,
v2_project_url: str,
):
"""Test listing directory contents with default parameters via v2 endpoint."""
response = await client.get(f"{v2_project_url}/directory/list")
assert response.status_code == 200
nodes = response.json()
assert isinstance(nodes, list)
@pytest.mark.asyncio
async def test_list_directory_with_depth(
client: AsyncClient,
test_project: Project,
v2_project_url: str,
):
"""Test listing directory with custom depth via v2 endpoint."""
response = await client.get(f"{v2_project_url}/directory/list?depth=2")
assert response.status_code == 200
nodes = response.json()
assert isinstance(nodes, list)
@pytest.mark.asyncio
async def test_list_directory_with_glob(
client: AsyncClient,
test_project: Project,
v2_project_url: str,
):
"""Test listing directory with file name glob filter via v2 endpoint."""
response = await client.get(f"{v2_project_url}/directory/list?file_name_glob=*.md")
assert response.status_code == 200
nodes = response.json()
assert isinstance(nodes, list)
# All file nodes should have .md extension
for node in nodes:
if node.get("type") == "file":
assert node.get("path", "").endswith(".md")
@pytest.mark.asyncio
async def test_list_directory_with_custom_path(
client: AsyncClient,
test_project: Project,
v2_project_url: str,
):
"""Test listing a specific directory path via v2 endpoint."""
response = await client.get(f"{v2_project_url}/directory/list?dir_name=/")
assert response.status_code == 200
nodes = response.json()
assert isinstance(nodes, list)
@pytest.mark.asyncio
async def test_directory_invalid_project_id(
client: AsyncClient,
):
"""Test directory endpoints with invalid project ID return 404."""
# Test tree endpoint
response = await client.get("/v2/projects/999999/directory/tree")
assert response.status_code == 404
# Test structure endpoint
response = await client.get("/v2/projects/999999/directory/structure")
assert response.status_code == 404
# Test list endpoint
response = await client.get("/v2/projects/999999/directory/list")
assert response.status_code == 404
@pytest.mark.asyncio
async def test_v2_directory_endpoints_use_project_id_not_name(
client: AsyncClient, test_project: Project
):
"""Verify v2 directory endpoints require project ID, not name."""
# Try using project name instead of ID - should fail
response = await client.get(f"/v2/projects/{test_project.name}/directory/tree")
# Should get validation error or 404 because name is not a valid integer
assert response.status_code in [404, 422]
```
--------------------------------------------------------------------------------
/test-int/BENCHMARKS.md:
--------------------------------------------------------------------------------
```markdown
# Performance Benchmarks
This directory contains performance benchmark tests for Basic Memory's sync/indexing operations.
## Purpose
These benchmarks measure baseline performance to track improvements from optimizations. They are particularly important for:
- Cloud deployments with ephemeral databases that need fast re-indexing
- Large repositories (100s to 1000s of files)
- Validating optimization efforts
## Running Benchmarks
### Run all benchmarks (excluding slow ones)
```bash
pytest test-int/test_sync_performance_benchmark.py -v -m "benchmark and not slow"
```
### Run specific benchmark
```bash
# 100 files (fast, ~10-30 seconds)
pytest test-int/test_sync_performance_benchmark.py::test_benchmark_sync_100_files -v
# 500 files (medium, ~1-3 minutes)
pytest test-int/test_sync_performance_benchmark.py::test_benchmark_sync_500_files -v
# 1000 files (slow, ~3-10 minutes)
pytest test-int/test_sync_performance_benchmark.py::test_benchmark_sync_1000_files -v
# Re-sync with no changes (tests scan performance)
pytest test-int/test_sync_performance_benchmark.py::test_benchmark_resync_no_changes -v
```
### Run all benchmarks including slow ones
```bash
pytest test-int/test_sync_performance_benchmark.py -v -m benchmark
```
### Skip benchmarks in regular test runs
```bash
pytest -m "not benchmark"
```
## Benchmark Output
Each benchmark provides detailed metrics including:
- **Performance Metrics**:
- Total sync time
- Files processed per second
- Milliseconds per file
- **Database Metrics**:
- Initial database size
- Final database size
- Database growth (total and per file)
- **Operation Counts**:
- New files indexed
- Modified files processed
- Deleted files handled
- Moved files tracked
## Example Output
```
======================================================================
BENCHMARK: Sync 100 files (small repository)
======================================================================
Generating 100 test files...
Created files 0-100 (100/100)
File generation completed in 0.15s (666.7 files/sec)
Initial database size: 120.00 KB
Starting sync of 100 files...
----------------------------------------------------------------------
RESULTS:
----------------------------------------------------------------------
Files processed: 100
New: 100
Modified: 0
Deleted: 0
Moved: 0
Performance:
Total time: 12.34s
Files/sec: 8.1
ms/file: 123.4
Database:
Initial size: 120.00 KB
Final size: 5.23 MB
Growth: 5.11 MB
Growth per file: 52.31 KB
======================================================================
```
## Interpreting Results
### Good Performance Indicators
- **Files/sec > 10**: Good indexing speed for small-medium repos
- **Files/sec > 5**: Acceptable for large repos with complex relations
- **DB growth < 100KB per file**: Reasonable index size
### Areas for Improvement
- **Files/sec < 5**: May benefit from batch operations
- **ms/file > 200**: High latency per file, check for N+1 queries
- **DB growth > 200KB per file**: Search index may be bloated (trigrams?)
## Tracking Improvements
Before making optimizations:
1. Run benchmarks to establish baseline
2. Save output for comparison
3. Note any particular pain points (e.g., slow search indexing)
After optimizations:
1. Run the same benchmarks
2. Compare metrics:
- Files/sec should increase
- ms/file should decrease
- DB growth per file may decrease (with search optimizations)
3. Document improvements in PR
## Related Issues
- [#351: Performance: Optimize sync/indexing for cloud deployments](https://github.com/basicmachines-co/basic-memory/issues/351)
## Test File Generation
Benchmarks generate realistic markdown files with:
- YAML frontmatter with tags
- 3-10 observations per file with categories
- 1-3 relations per file (including forward references)
- Varying content to simulate real usage
- Files organized in category subdirectories
```
--------------------------------------------------------------------------------
/src/basic_memory/alembic/versions/cc7172b46608_update_search_index_schema.py:
--------------------------------------------------------------------------------
```python
"""Update search index schema
Revision ID: cc7172b46608
Revises: 502b60eaa905
Create Date: 2025-02-28 18:48:23.244941
"""
from typing import Sequence, Union
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "cc7172b46608"
down_revision: Union[str, None] = "502b60eaa905"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade database schema to use new search index with content_stems and content_snippet."""
# This migration is SQLite-specific (FTS5 virtual tables)
# For Postgres, the search_index table is created via ORM models
connection = op.get_bind()
if connection.dialect.name != "sqlite":
return
# First, drop the existing search_index table
op.execute("DROP TABLE IF EXISTS search_index")
# Create new search_index with updated schema
op.execute("""
CREATE VIRTUAL TABLE IF NOT EXISTS search_index USING fts5(
-- Core entity fields
id UNINDEXED, -- Row ID
title, -- Title for searching
content_stems, -- Main searchable content split into stems
content_snippet, -- File content snippet for display
permalink, -- Stable identifier (now indexed for path search)
file_path UNINDEXED, -- Physical location
type UNINDEXED, -- entity/relation/observation
-- Relation fields
from_id UNINDEXED, -- Source entity
to_id UNINDEXED, -- Target entity
relation_type UNINDEXED, -- Type of relation
-- Observation fields
entity_id UNINDEXED, -- Parent entity
category UNINDEXED, -- Observation category
-- Common fields
metadata UNINDEXED, -- JSON metadata
created_at UNINDEXED, -- Creation timestamp
updated_at UNINDEXED, -- Last update
-- Configuration
tokenize='unicode61 tokenchars 0x2F', -- Hex code for /
prefix='1,2,3,4' -- Support longer prefixes for paths
);
""")
def downgrade() -> None:
"""Downgrade database schema to use old search index."""
# This migration is SQLite-specific (FTS5 virtual tables)
# For Postgres, the search_index table is managed via ORM models
connection = op.get_bind()
if connection.dialect.name != "sqlite":
return
# Drop the updated search_index table
op.execute("DROP TABLE IF EXISTS search_index")
# Recreate the original search_index schema
op.execute("""
CREATE VIRTUAL TABLE IF NOT EXISTS search_index USING fts5(
-- Core entity fields
id UNINDEXED, -- Row ID
title, -- Title for searching
content, -- Main searchable content
permalink, -- Stable identifier (now indexed for path search)
file_path UNINDEXED, -- Physical location
type UNINDEXED, -- entity/relation/observation
-- Relation fields
from_id UNINDEXED, -- Source entity
to_id UNINDEXED, -- Target entity
relation_type UNINDEXED, -- Type of relation
-- Observation fields
entity_id UNINDEXED, -- Parent entity
category UNINDEXED, -- Observation category
-- Common fields
metadata UNINDEXED, -- JSON metadata
created_at UNINDEXED, -- Creation timestamp
updated_at UNINDEXED, -- Last update
-- Configuration
tokenize='unicode61 tokenchars 0x2F', -- Hex code for /
prefix='1,2,3,4' -- Support longer prefixes for paths
);
""")
# Print instruction to manually reindex after migration
print("\n------------------------------------------------------------------")
print("IMPORTANT: After downgrade completes, manually run the reindex command:")
print("basic-memory sync")
print("------------------------------------------------------------------\n")
```
--------------------------------------------------------------------------------
/tests/utils/test_permalink_formatting.py:
--------------------------------------------------------------------------------
```python
"""Test permalink formatting during sync."""
from pathlib import Path
import pytest
from basic_memory.config import ProjectConfig
from basic_memory.services import EntityService
from basic_memory.sync.sync_service import SyncService
from basic_memory.utils import generate_permalink
async def create_test_file(path: Path, content: str = "test content") -> None:
"""Create a test file with given content."""
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content)
@pytest.mark.asyncio
async def test_permalink_formatting(
sync_service: SyncService, project_config: ProjectConfig, entity_service: EntityService
):
"""Test that permalinks are properly formatted during sync.
This ensures:
- Underscores are converted to hyphens
- Spaces are converted to hyphens
- Mixed case is lowercased
- Directory structure is preserved
- Multiple directories work correctly
"""
project_dir = project_config.home
# Test cases with different filename formats
test_cases = [
# filename -> expected permalink
("my_awesome_feature.md", "my-awesome-feature"),
("MIXED_CASE_NAME.md", "mixed-case-name"),
("spaces and_underscores.md", "spaces-and-underscores"),
("design/model_refactor.md", "design/model-refactor"),
(
"test/multiple_word_directory/feature_name.md",
"test/multiple-word-directory/feature-name",
),
]
# Create test files
for filename, _ in test_cases:
content = """
---
type: knowledge
created: 2024-01-01
modified: 2024-01-01
---
# Test File
Testing permalink generation.
"""
await create_test_file(project_dir / filename, content)
# Run sync
await sync_service.sync(project_config.home)
# Verify permalinks
for filename, expected_permalink in test_cases:
entity = await entity_service.repository.get_by_file_path(filename)
assert entity.permalink == expected_permalink, (
f"File {filename} should have permalink {expected_permalink}"
)
@pytest.mark.parametrize(
"input_path, expected",
[
("test/Über File.md", "test/uber-file"),
("docs/résumé.md", "docs/resume"),
("notes/Déjà vu.md", "notes/deja-vu"),
("papers/Jürgen's Findings.md", "papers/jurgens-findings"),
("archive/François Müller.md", "archive/francois-muller"),
("research/Søren Kierkegård.md", "research/soren-kierkegard"),
("articles/El Niño.md", "articles/el-nino"),
("ArticlesElNiño.md", "articles-el-nino"),
("articleselniño.md", "articleselnino"),
("articles-El-Niño.md", "articles-el-nino"),
],
)
def test_latin_accents_transliteration(input_path, expected):
"""Test that Latin letters with accents are properly transliterated."""
assert generate_permalink(input_path) == expected
@pytest.mark.parametrize(
"input_path, expected",
[
("中文/测试文档.md", "中文/测试文档"),
("notes/北京市.md", "notes/北京市"),
("research/上海简介.md", "research/上海简介"),
("docs/中文 English Mixed.md", "docs/中文-english-mixed"),
("articles/东京Tokyo混合.md", "articles/东京-tokyo-混合"),
("papers/汉字_underscore_test.md", "papers/汉字-underscore-test"),
("projects/中文CamelCase测试.md", "projects/中文-camel-case-测试"),
],
)
def test_chinese_character_preservation(input_path, expected):
"""Test that Chinese characters are preserved in permalinks."""
assert generate_permalink(input_path) == expected
@pytest.mark.parametrize(
"input_path, expected",
[
("mixed/北京Café.md", "mixed/北京-cafe"),
("notes/东京Tōkyō.md", "notes/东京-tokyo"),
("research/München中文.md", "research/munchen-中文"),
("docs/Über测试.md", "docs/uber-测试"),
("complex/北京Beijing上海Shanghai.md", "complex/北京-beijing-上海-shanghai"),
("special/中文!@#$%^&*()_+.md", "special/中文"),
("punctuation/你好,世界!.md", "punctuation/你好世界"),
],
)
def test_mixed_character_sets(input_path, expected):
"""Test handling of mixed character sets and edge cases."""
assert generate_permalink(input_path) == expected
```
--------------------------------------------------------------------------------
/tests/importers/test_importer_base.py:
--------------------------------------------------------------------------------
```python
"""Tests for the base importer class."""
import pytest
from basic_memory.importers.base import Importer
from basic_memory.markdown.entity_parser import EntityParser
from basic_memory.markdown.markdown_processor import MarkdownProcessor
from basic_memory.markdown.schemas import EntityFrontmatter, EntityMarkdown
from basic_memory.schemas.importer import ImportResult
from basic_memory.services.file_service import FileService
# Create a concrete implementation of the abstract class for testing
class ConcreteTestImporter(Importer[ImportResult]):
"""Test implementation of Importer base class."""
async def import_data(self, source_data, destination_folder: str, **kwargs):
"""Implement the abstract method for testing."""
try:
# Test implementation that returns success
await self.ensure_folder_exists(destination_folder)
return ImportResult(
import_count={"files": 1},
success=True,
error_message=None,
)
except Exception as e:
return self.handle_error("Test import failed", e)
def handle_error(self, message: str, error=None) -> ImportResult:
"""Implement the abstract handle_error method."""
import logging
logger = logging.getLogger(__name__)
error_message = f"{message}"
if error:
error_message += f": {str(error)}"
logger.error(error_message)
return ImportResult(
import_count={},
success=False,
error_message=error_message,
)
@pytest.fixture
def test_importer(tmp_path):
"""Create a ConcreteTestImporter instance for testing."""
entity_parser = EntityParser(base_path=tmp_path)
markdown_processor = MarkdownProcessor(entity_parser=entity_parser)
file_service = FileService(base_path=tmp_path, markdown_processor=markdown_processor)
return ConcreteTestImporter(tmp_path, markdown_processor, file_service)
@pytest.mark.asyncio
async def test_import_data_success(test_importer):
"""Test successful import_data implementation."""
result = await test_importer.import_data({}, "test_folder")
assert result.success
assert result.import_count == {"files": 1}
assert result.error_message is None
assert (test_importer.base_path / "test_folder").exists()
@pytest.mark.asyncio
async def test_write_entity(test_importer, tmp_path):
"""Test write_entity method."""
# Create test entity
entity = EntityMarkdown(
frontmatter=EntityFrontmatter(metadata={"title": "Test Entity", "type": "note"}),
content="Test content",
observations=[],
relations=[],
)
# Call write_entity
file_path = tmp_path / "test_entity.md"
checksum = await test_importer.write_entity(entity, file_path)
assert file_path.exists()
assert len(checksum) == 64 # sha256 hex digest
assert file_path.read_text(encoding="utf-8").strip() != ""
@pytest.mark.asyncio
async def test_ensure_folder_exists(test_importer):
"""Test ensure_folder_exists method."""
# Test with simple folder - now passes relative path to FileService
await test_importer.ensure_folder_exists("test_folder")
assert (test_importer.base_path / "test_folder").exists()
# Test with nested folder - FileService handles base_path resolution
await test_importer.ensure_folder_exists("nested/folder/path")
assert (test_importer.base_path / "nested/folder/path").exists()
@pytest.mark.asyncio
async def test_handle_error(test_importer):
"""Test handle_error method."""
# Test with message only
result = test_importer.handle_error("Test error message")
assert not result.success
assert result.error_message == "Test error message"
assert result.import_count == {}
# Test with message and exception
test_exception = ValueError("Test exception")
result = test_importer.handle_error("Error occurred", test_exception)
assert not result.success
assert "Error occurred" in result.error_message
assert "Test exception" in result.error_message
assert result.import_count == {}
```
--------------------------------------------------------------------------------
/tests/services/test_initialization.py:
--------------------------------------------------------------------------------
```python
"""Integration-style tests for the initialization service.
Goal: avoid brittle deep mocking; assert real behavior using the existing
test config + dual-backend fixtures.
"""
from __future__ import annotations
import pytest
from basic_memory import db
from basic_memory.config import BasicMemoryConfig, DatabaseBackend
from basic_memory.repository.project_repository import ProjectRepository
from basic_memory.services.initialization import (
ensure_initialization,
initialize_database,
reconcile_projects_with_config,
)
@pytest.mark.asyncio
async def test_initialize_database_creates_engine_and_allows_queries(app_config: BasicMemoryConfig):
await db.shutdown_db()
try:
await initialize_database(app_config)
engine, session_maker = await db.get_or_create_db(app_config.database_path)
assert engine is not None
assert session_maker is not None
# Smoke query on the initialized DB
async with db.scoped_session(session_maker) as session:
result = await session.execute(db.text("SELECT 1"))
assert result.scalar() == 1
finally:
await db.shutdown_db()
@pytest.mark.asyncio
async def test_initialize_database_raises_on_invalid_postgres_config(
app_config: BasicMemoryConfig, config_manager
):
"""If config selects Postgres but has no DATABASE_URL, initialization should fail."""
await db.shutdown_db()
try:
bad_config = app_config.model_copy(
update={"database_backend": DatabaseBackend.POSTGRES, "database_url": None}
)
config_manager.save_config(bad_config)
with pytest.raises(ValueError):
await initialize_database(bad_config)
finally:
await db.shutdown_db()
@pytest.mark.asyncio
async def test_reconcile_projects_with_config_creates_projects_and_default(
app_config: BasicMemoryConfig, config_manager, config_home
):
await db.shutdown_db()
try:
# Ensure the configured paths exist
proj_a = config_home / "proj-a"
proj_b = config_home / "proj-b"
proj_a.mkdir(parents=True, exist_ok=True)
proj_b.mkdir(parents=True, exist_ok=True)
updated = app_config.model_copy(
update={
"projects": {"proj-a": str(proj_a), "proj-b": str(proj_b)},
"default_project": "proj-b",
}
)
config_manager.save_config(updated)
# Real DB init + reconcile
await initialize_database(updated)
await reconcile_projects_with_config(updated)
_, session_maker = await db.get_or_create_db(
updated.database_path, db_type=db.DatabaseType.FILESYSTEM
)
repo = ProjectRepository(session_maker)
active = await repo.get_active_projects()
names = {p.name for p in active}
assert names.issuperset({"proj-a", "proj-b"})
default = await repo.get_default_project()
assert default is not None
assert default.name == "proj-b"
finally:
await db.shutdown_db()
@pytest.mark.asyncio
async def test_reconcile_projects_with_config_swallow_errors(
monkeypatch, app_config: BasicMemoryConfig
):
"""reconcile_projects_with_config should not raise if ProjectService sync fails."""
await db.shutdown_db()
try:
await initialize_database(app_config)
async def boom(self): # noqa: ANN001
raise ValueError("Project synchronization error")
monkeypatch.setattr(
"basic_memory.services.project_service.ProjectService.synchronize_projects",
boom,
)
# Should not raise
await reconcile_projects_with_config(app_config)
finally:
await db.shutdown_db()
def test_ensure_initialization_runs_and_cleans_up(app_config: BasicMemoryConfig, config_manager):
# ensure_initialization uses asyncio.run; keep this test synchronous.
ensure_initialization(app_config)
# Must be cleaned up to avoid hanging processes.
assert db._engine is None # pyright: ignore [reportPrivateUsage]
assert db._session_maker is None # pyright: ignore [reportPrivateUsage]
```
--------------------------------------------------------------------------------
/src/basic_memory/api/container.py:
--------------------------------------------------------------------------------
```python
"""API composition root for Basic Memory.
This container owns reading ConfigManager and environment variables for the
API entrypoint. Downstream modules receive config/dependencies explicitly
rather than reading globals.
Design principles:
- Only this module reads ConfigManager directly
- Runtime mode (cloud/local/test) is resolved here
- Factories for services are provided, not singletons
"""
from dataclasses import dataclass
from typing import TYPE_CHECKING
from sqlalchemy.ext.asyncio import AsyncEngine, async_sessionmaker, AsyncSession
from basic_memory import db
from basic_memory.config import BasicMemoryConfig, ConfigManager
from basic_memory.runtime import RuntimeMode, resolve_runtime_mode
if TYPE_CHECKING: # pragma: no cover
from basic_memory.sync import SyncCoordinator
@dataclass
class ApiContainer:
"""Composition root for the API entrypoint.
Holds resolved configuration and runtime context.
Created once at app startup, then used to wire dependencies.
"""
config: BasicMemoryConfig
mode: RuntimeMode
# --- Database ---
# Cached database connections (set during lifespan startup)
engine: AsyncEngine | None = None
session_maker: async_sessionmaker[AsyncSession] | None = None
@classmethod
def create(cls) -> "ApiContainer": # pragma: no cover
"""Create container by reading ConfigManager.
This is the single point where API reads global config.
"""
config = ConfigManager().config
mode = resolve_runtime_mode(
cloud_mode_enabled=config.cloud_mode_enabled,
is_test_env=config.is_test_env,
)
return cls(config=config, mode=mode)
# --- Runtime Mode Properties ---
@property
def should_sync_files(self) -> bool:
"""Whether file sync should be started.
Sync is enabled when:
- sync_changes is True in config
- Not in test mode (tests manage their own sync)
"""
return self.config.sync_changes and not self.mode.is_test
@property
def sync_skip_reason(self) -> str | None: # pragma: no cover
"""Reason why sync is skipped, or None if sync should run.
Useful for logging why sync was disabled.
"""
if self.mode.is_test:
return "Test environment detected"
if not self.config.sync_changes:
return "Sync changes disabled"
return None
def create_sync_coordinator(self) -> "SyncCoordinator": # pragma: no cover
"""Create a SyncCoordinator with this container's settings.
Returns:
SyncCoordinator configured for this runtime environment
"""
# Deferred import to avoid circular dependency
from basic_memory.sync import SyncCoordinator
return SyncCoordinator(
config=self.config,
should_sync=self.should_sync_files,
skip_reason=self.sync_skip_reason,
)
# --- Database Factory ---
async def init_database( # pragma: no cover
self,
) -> tuple[AsyncEngine, async_sessionmaker[AsyncSession]]:
"""Initialize and cache database connections.
Returns:
Tuple of (engine, session_maker)
"""
engine, session_maker = await db.get_or_create_db(self.config.database_path)
self.engine = engine
self.session_maker = session_maker
return engine, session_maker
async def shutdown_database(self) -> None: # pragma: no cover
"""Clean up database connections."""
await db.shutdown_db()
# Module-level container instance (set by lifespan)
# This allows deps.py to access the container without reading ConfigManager
_container: ApiContainer | None = None
def get_container() -> ApiContainer:
"""Get the current API container.
Raises:
RuntimeError: If container hasn't been initialized
"""
if _container is None:
raise RuntimeError("API container not initialized. Call set_container() first.")
return _container
def set_container(container: ApiContainer) -> None:
"""Set the API container (called by lifespan)."""
global _container
_container = container
```
--------------------------------------------------------------------------------
/src/basic_memory/cli/commands/cloud/upload_command.py:
--------------------------------------------------------------------------------
```python
"""Upload CLI commands for basic-memory projects."""
from pathlib import Path
import typer
from rich.console import Console
from basic_memory.cli.app import cloud_app
from basic_memory.cli.commands.command_utils import run_with_cleanup
from basic_memory.cli.commands.cloud.cloud_utils import (
create_cloud_project,
project_exists,
sync_project,
)
from basic_memory.cli.commands.cloud.upload import upload_path
console = Console()
@cloud_app.command("upload")
def upload(
path: Path = typer.Argument(
...,
help="Path to local file or directory to upload",
exists=True,
readable=True,
resolve_path=True,
),
project: str = typer.Option(
...,
"--project",
"-p",
help="Cloud project name (destination)",
),
create_project: bool = typer.Option(
False,
"--create-project",
"-c",
help="Create project if it doesn't exist",
),
sync: bool = typer.Option(
True,
"--sync/--no-sync",
help="Sync project after upload (default: true)",
),
verbose: bool = typer.Option(
False,
"--verbose",
"-v",
help="Show detailed information about file filtering and upload",
),
no_gitignore: bool = typer.Option(
False,
"--no-gitignore",
help="Skip .gitignore patterns (still respects .bmignore)",
),
dry_run: bool = typer.Option(
False,
"--dry-run",
help="Show what would be uploaded without actually uploading",
),
) -> None:
"""Upload local files or directories to cloud project via WebDAV.
Examples:
bm cloud upload ~/my-notes --project research
bm cloud upload notes.md --project research --create-project
bm cloud upload ~/docs --project work --no-sync
bm cloud upload ./history --project proto --verbose
bm cloud upload ./notes --project work --no-gitignore
bm cloud upload ./files --project test --dry-run
"""
async def _upload():
# Check if project exists
if not await project_exists(project):
if create_project:
console.print(f"[blue]Creating cloud project '{project}'...[/blue]")
try:
await create_cloud_project(project)
console.print(f"[green]Created project '{project}'[/green]")
except Exception as e:
console.print(f"[red]Failed to create project: {e}[/red]")
raise typer.Exit(1)
else:
console.print(
f"[red]Project '{project}' does not exist.[/red]\n"
f"[yellow]Options:[/yellow]\n"
f" 1. Create it first: bm project add {project}\n"
f" 2. Use --create-project flag to create automatically"
)
raise typer.Exit(1)
# Perform upload (or dry run)
if dry_run:
console.print(
f"[yellow]DRY RUN: Showing what would be uploaded to '{project}'[/yellow]"
)
else:
console.print(f"[blue]Uploading {path} to project '{project}'...[/blue]")
success = await upload_path(
path, project, verbose=verbose, use_gitignore=not no_gitignore, dry_run=dry_run
)
if not success:
console.print("[red]Upload failed[/red]")
raise typer.Exit(1)
if dry_run:
console.print("[yellow]DRY RUN complete - no files were uploaded[/yellow]")
else:
console.print(f"[green]Successfully uploaded to '{project}'[/green]")
# Sync project if requested (skip on dry run)
# Force full scan after bisync to ensure database is up-to-date with synced files
if sync and not dry_run:
console.print(f"[blue]Syncing project '{project}'...[/blue]")
try:
await sync_project(project, force_full=True)
except Exception as e:
console.print(f"[yellow]Warning: Sync failed: {e}[/yellow]")
console.print("[dim]Files uploaded but may not be indexed yet[/dim]")
run_with_cleanup(_upload())
```
--------------------------------------------------------------------------------
/tests/markdown/test_observation_edge_cases.py:
--------------------------------------------------------------------------------
```python
"""Tests for edge cases in observation parsing."""
from markdown_it import MarkdownIt
from basic_memory.markdown.plugins import observation_plugin, parse_observation
from basic_memory.markdown.schemas import Observation
def test_empty_input():
"""Test handling of empty input."""
md = MarkdownIt().use(observation_plugin)
tokens = md.parse("")
assert not any(t.meta and "observation" in t.meta for t in tokens)
tokens = md.parse(" ")
assert not any(t.meta and "observation" in t.meta for t in tokens)
tokens = md.parse("\n")
assert not any(t.meta and "observation" in t.meta for t in tokens)
def test_invalid_context():
"""Test handling of invalid context format."""
md = MarkdownIt().use(observation_plugin)
# Unclosed context
tokens = md.parse("- [test] Content (unclosed")
token = next(t for t in tokens if t.type == "inline")
obs = parse_observation(token)
assert obs["content"] == "Content (unclosed"
assert obs["context"] is None
# Multiple parens
tokens = md.parse("- [test] Content (with) extra) parens)")
token = next(t for t in tokens if t.type == "inline")
obs = parse_observation(token)
assert obs["content"] == "Content"
assert obs["context"] == "with) extra) parens"
def test_complex_format():
"""Test parsing complex observation formats."""
md = MarkdownIt().use(observation_plugin)
# Multiple hashtags together
tokens = md.parse("- [complex test] This is #tag1#tag2 with #tag3 content")
token = next(t for t in tokens if t.type == "inline")
obs = parse_observation(token)
assert obs["category"] == "complex test"
assert set(obs["tags"]) == {"tag1", "tag2", "tag3"}
assert obs["content"] == "This is #tag1#tag2 with #tag3 content"
# Pydantic model validation
observation = Observation.model_validate(obs)
assert observation.category == "complex test"
assert set(observation.tags) == {"tag1", "tag2", "tag3"}
assert observation.content == "This is #tag1#tag2 with #tag3 content"
def test_malformed_category():
"""Test handling of malformed category brackets."""
md = MarkdownIt().use(observation_plugin)
# Empty category
tokens = md.parse("- [] Empty category")
token = next(t for t in tokens if t.type == "inline")
observation = Observation.model_validate(parse_observation(token))
assert observation.category is None
assert observation.content == "Empty category"
# Missing close bracket
tokens = md.parse("- [test Content")
token = next(t for t in tokens if t.type == "inline")
observation = Observation.model_validate(parse_observation(token))
# Should treat whole thing as content
assert observation.category is None
assert "test Content" in observation.content
def test_no_category():
"""Test handling of malformed category brackets."""
md = MarkdownIt().use(observation_plugin)
# Empty category
tokens = md.parse("- No category")
token = next(t for t in tokens if t.type == "inline")
observation = Observation.model_validate(parse_observation(token))
assert observation.category is None
assert observation.content == "No category"
def test_unicode_content():
"""Test handling of Unicode content."""
md = MarkdownIt().use(observation_plugin)
# Emoji
tokens = md.parse("- [test] Emoji test 👍 #emoji #test (Testing emoji)")
token = next(t for t in tokens if t.type == "inline")
obs = parse_observation(token)
assert "👍" in obs["content"]
assert "emoji" in obs["tags"]
# Non-Latin scripts
tokens = md.parse("- [中文] Chinese text 测试 #language (Script test)")
token = next(t for t in tokens if t.type == "inline")
obs = parse_observation(token)
assert obs["category"] == "中文"
assert "测试" in obs["content"]
# Mixed scripts and emoji
tokens = md.parse("- [test] Mixed 中文 and 👍 #mixed")
token = next(t for t in tokens if t.type == "inline")
obs = parse_observation(token)
assert "中文" in obs["content"]
assert "👍" in obs["content"]
# Model validation with Unicode
observation = Observation.model_validate(obs)
assert "中文" in observation.content
assert "👍" in observation.content
```