This is page 3 of 19. Use http://codebase.md/basicmachines-co/basic-memory?lines=false&page={x} to view the full context.
# Directory Structure
```
├── .claude
│ ├── commands
│ │ ├── release
│ │ │ ├── beta.md
│ │ │ ├── changelog.md
│ │ │ ├── release-check.md
│ │ │ └── release.md
│ │ ├── spec.md
│ │ └── test-live.md
│ └── settings.json
├── .dockerignore
├── .env.example
├── .github
│ ├── dependabot.yml
│ ├── ISSUE_TEMPLATE
│ │ ├── bug_report.md
│ │ ├── config.yml
│ │ ├── documentation.md
│ │ └── feature_request.md
│ └── workflows
│ ├── claude-code-review.yml
│ ├── claude-issue-triage.yml
│ ├── claude.yml
│ ├── dev-release.yml
│ ├── docker.yml
│ ├── pr-title.yml
│ ├── release.yml
│ └── test.yml
├── .gitignore
├── .python-version
├── CHANGELOG.md
├── CITATION.cff
├── CLA.md
├── CLAUDE.md
├── CODE_OF_CONDUCT.md
├── CONTRIBUTING.md
├── docker-compose-postgres.yml
├── docker-compose.yml
├── Dockerfile
├── docs
│ ├── ai-assistant-guide-extended.md
│ ├── ARCHITECTURE.md
│ ├── character-handling.md
│ ├── cloud-cli.md
│ ├── Docker.md
│ └── testing-coverage.md
├── justfile
├── LICENSE
├── llms-install.md
├── pyproject.toml
├── README.md
├── SECURITY.md
├── smithery.yaml
├── specs
│ ├── SPEC-1 Specification-Driven Development Process.md
│ ├── SPEC-10 Unified Deployment Workflow and Event Tracking.md
│ ├── SPEC-11 Basic Memory API Performance Optimization.md
│ ├── SPEC-12 OpenTelemetry Observability.md
│ ├── SPEC-13 CLI Authentication with Subscription Validation.md
│ ├── SPEC-14 Cloud Git Versioning & GitHub Backup.md
│ ├── SPEC-14- Cloud Git Versioning & GitHub Backup.md
│ ├── SPEC-15 Configuration Persistence via Tigris for Cloud Tenants.md
│ ├── SPEC-16 MCP Cloud Service Consolidation.md
│ ├── SPEC-17 Semantic Search with ChromaDB.md
│ ├── SPEC-18 AI Memory Management Tool.md
│ ├── SPEC-19 Sync Performance and Memory Optimization.md
│ ├── SPEC-2 Slash Commands Reference.md
│ ├── SPEC-20 Simplified Project-Scoped Rclone Sync.md
│ ├── SPEC-3 Agent Definitions.md
│ ├── SPEC-4 Notes Web UI Component Architecture.md
│ ├── SPEC-5 CLI Cloud Upload via WebDAV.md
│ ├── SPEC-6 Explicit Project Parameter Architecture.md
│ ├── SPEC-7 POC to spike Tigris Turso for local access to cloud data.md
│ ├── SPEC-8 TigrisFS Integration.md
│ ├── SPEC-9 Multi-Project Bidirectional Sync Architecture.md
│ ├── SPEC-9 Signed Header Tenant Information.md
│ └── SPEC-9-1 Follow-Ups- Conflict, Sync, and Observability.md
├── src
│ └── basic_memory
│ ├── __init__.py
│ ├── alembic
│ │ ├── alembic.ini
│ │ ├── env.py
│ │ ├── migrations.py
│ │ ├── script.py.mako
│ │ └── versions
│ │ ├── 314f1ea54dc4_add_postgres_full_text_search_support_.py
│ │ ├── 3dae7c7b1564_initial_schema.py
│ │ ├── 502b60eaa905_remove_required_from_entity_permalink.py
│ │ ├── 5fe1ab1ccebe_add_projects_table.py
│ │ ├── 647e7a75e2cd_project_constraint_fix.py
│ │ ├── 6830751f5fb6_merge_multiple_heads.py
│ │ ├── 9d9c1cb7d8f5_add_mtime_and_size_columns_to_entity_.py
│ │ ├── a1b2c3d4e5f6_fix_project_foreign_keys.py
│ │ ├── a2b3c4d5e6f7_add_search_index_entity_cascade.py
│ │ ├── b3c3938bacdb_relation_to_name_unique_index.py
│ │ ├── cc7172b46608_update_search_index_schema.py
│ │ ├── e7e1f4367280_add_scan_watermark_tracking_to_project.py
│ │ ├── f8a9b2c3d4e5_add_pg_trgm_for_fuzzy_link_resolution.py
│ │ └── g9a0b3c4d5e6_add_external_id_to_project_and_entity.py
│ ├── api
│ │ ├── __init__.py
│ │ ├── app.py
│ │ ├── container.py
│ │ ├── routers
│ │ │ ├── __init__.py
│ │ │ ├── directory_router.py
│ │ │ ├── importer_router.py
│ │ │ ├── knowledge_router.py
│ │ │ ├── management_router.py
│ │ │ ├── memory_router.py
│ │ │ ├── project_router.py
│ │ │ ├── prompt_router.py
│ │ │ ├── resource_router.py
│ │ │ ├── search_router.py
│ │ │ └── utils.py
│ │ ├── template_loader.py
│ │ └── v2
│ │ ├── __init__.py
│ │ └── routers
│ │ ├── __init__.py
│ │ ├── directory_router.py
│ │ ├── importer_router.py
│ │ ├── knowledge_router.py
│ │ ├── memory_router.py
│ │ ├── project_router.py
│ │ ├── prompt_router.py
│ │ ├── resource_router.py
│ │ └── search_router.py
│ ├── cli
│ │ ├── __init__.py
│ │ ├── app.py
│ │ ├── auth.py
│ │ ├── commands
│ │ │ ├── __init__.py
│ │ │ ├── cloud
│ │ │ │ ├── __init__.py
│ │ │ │ ├── api_client.py
│ │ │ │ ├── bisync_commands.py
│ │ │ │ ├── cloud_utils.py
│ │ │ │ ├── core_commands.py
│ │ │ │ ├── rclone_commands.py
│ │ │ │ ├── rclone_config.py
│ │ │ │ ├── rclone_installer.py
│ │ │ │ ├── upload_command.py
│ │ │ │ └── upload.py
│ │ │ ├── command_utils.py
│ │ │ ├── db.py
│ │ │ ├── format.py
│ │ │ ├── import_chatgpt.py
│ │ │ ├── import_claude_conversations.py
│ │ │ ├── import_claude_projects.py
│ │ │ ├── import_memory_json.py
│ │ │ ├── mcp.py
│ │ │ ├── project.py
│ │ │ ├── status.py
│ │ │ ├── telemetry.py
│ │ │ └── tool.py
│ │ ├── container.py
│ │ └── main.py
│ ├── config.py
│ ├── db.py
│ ├── deps
│ │ ├── __init__.py
│ │ ├── config.py
│ │ ├── db.py
│ │ ├── importers.py
│ │ ├── projects.py
│ │ ├── repositories.py
│ │ └── services.py
│ ├── deps.py
│ ├── file_utils.py
│ ├── ignore_utils.py
│ ├── importers
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── chatgpt_importer.py
│ │ ├── claude_conversations_importer.py
│ │ ├── claude_projects_importer.py
│ │ ├── memory_json_importer.py
│ │ └── utils.py
│ ├── markdown
│ │ ├── __init__.py
│ │ ├── entity_parser.py
│ │ ├── markdown_processor.py
│ │ ├── plugins.py
│ │ ├── schemas.py
│ │ └── utils.py
│ ├── mcp
│ │ ├── __init__.py
│ │ ├── async_client.py
│ │ ├── clients
│ │ │ ├── __init__.py
│ │ │ ├── directory.py
│ │ │ ├── knowledge.py
│ │ │ ├── memory.py
│ │ │ ├── project.py
│ │ │ ├── resource.py
│ │ │ └── search.py
│ │ ├── container.py
│ │ ├── project_context.py
│ │ ├── prompts
│ │ │ ├── __init__.py
│ │ │ ├── ai_assistant_guide.py
│ │ │ ├── continue_conversation.py
│ │ │ ├── recent_activity.py
│ │ │ ├── search.py
│ │ │ └── utils.py
│ │ ├── resources
│ │ │ ├── ai_assistant_guide.md
│ │ │ └── project_info.py
│ │ ├── server.py
│ │ └── tools
│ │ ├── __init__.py
│ │ ├── build_context.py
│ │ ├── canvas.py
│ │ ├── chatgpt_tools.py
│ │ ├── delete_note.py
│ │ ├── edit_note.py
│ │ ├── list_directory.py
│ │ ├── move_note.py
│ │ ├── project_management.py
│ │ ├── read_content.py
│ │ ├── read_note.py
│ │ ├── recent_activity.py
│ │ ├── search.py
│ │ ├── utils.py
│ │ ├── view_note.py
│ │ └── write_note.py
│ ├── models
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── knowledge.py
│ │ ├── project.py
│ │ └── search.py
│ ├── project_resolver.py
│ ├── repository
│ │ ├── __init__.py
│ │ ├── entity_repository.py
│ │ ├── observation_repository.py
│ │ ├── postgres_search_repository.py
│ │ ├── project_info_repository.py
│ │ ├── project_repository.py
│ │ ├── relation_repository.py
│ │ ├── repository.py
│ │ ├── search_index_row.py
│ │ ├── search_repository_base.py
│ │ ├── search_repository.py
│ │ └── sqlite_search_repository.py
│ ├── runtime.py
│ ├── schemas
│ │ ├── __init__.py
│ │ ├── base.py
│ │ ├── cloud.py
│ │ ├── delete.py
│ │ ├── directory.py
│ │ ├── importer.py
│ │ ├── memory.py
│ │ ├── project_info.py
│ │ ├── prompt.py
│ │ ├── request.py
│ │ ├── response.py
│ │ ├── search.py
│ │ ├── sync_report.py
│ │ └── v2
│ │ ├── __init__.py
│ │ ├── entity.py
│ │ └── resource.py
│ ├── services
│ │ ├── __init__.py
│ │ ├── context_service.py
│ │ ├── directory_service.py
│ │ ├── entity_service.py
│ │ ├── exceptions.py
│ │ ├── file_service.py
│ │ ├── initialization.py
│ │ ├── link_resolver.py
│ │ ├── project_service.py
│ │ ├── search_service.py
│ │ └── service.py
│ ├── sync
│ │ ├── __init__.py
│ │ ├── background_sync.py
│ │ ├── coordinator.py
│ │ ├── sync_service.py
│ │ └── watch_service.py
│ ├── telemetry.py
│ ├── templates
│ │ └── prompts
│ │ ├── continue_conversation.hbs
│ │ └── search.hbs
│ └── utils.py
├── test-int
│ ├── BENCHMARKS.md
│ ├── cli
│ │ ├── test_project_commands_integration.py
│ │ └── test_version_integration.py
│ ├── conftest.py
│ ├── mcp
│ │ ├── test_build_context_underscore.py
│ │ ├── test_build_context_validation.py
│ │ ├── test_chatgpt_tools_integration.py
│ │ ├── test_default_project_mode_integration.py
│ │ ├── test_delete_note_integration.py
│ │ ├── test_edit_note_integration.py
│ │ ├── test_lifespan_shutdown_sync_task_cancellation_integration.py
│ │ ├── test_list_directory_integration.py
│ │ ├── test_move_note_integration.py
│ │ ├── test_project_management_integration.py
│ │ ├── test_project_state_sync_integration.py
│ │ ├── test_read_content_integration.py
│ │ ├── test_read_note_integration.py
│ │ ├── test_search_integration.py
│ │ ├── test_single_project_mcp_integration.py
│ │ └── test_write_note_integration.py
│ ├── test_db_wal_mode.py
│ └── test_disable_permalinks_integration.py
├── tests
│ ├── __init__.py
│ ├── api
│ │ ├── conftest.py
│ │ ├── test_api_container.py
│ │ ├── test_async_client.py
│ │ ├── test_continue_conversation_template.py
│ │ ├── test_directory_router.py
│ │ ├── test_importer_router.py
│ │ ├── test_knowledge_router.py
│ │ ├── test_management_router.py
│ │ ├── test_memory_router.py
│ │ ├── test_project_router_operations.py
│ │ ├── test_project_router.py
│ │ ├── test_prompt_router.py
│ │ ├── test_relation_background_resolution.py
│ │ ├── test_resource_router.py
│ │ ├── test_search_router.py
│ │ ├── test_search_template.py
│ │ ├── test_template_loader_helpers.py
│ │ ├── test_template_loader.py
│ │ └── v2
│ │ ├── __init__.py
│ │ ├── conftest.py
│ │ ├── test_directory_router.py
│ │ ├── test_importer_router.py
│ │ ├── test_knowledge_router.py
│ │ ├── test_memory_router.py
│ │ ├── test_project_router.py
│ │ ├── test_prompt_router.py
│ │ ├── test_resource_router.py
│ │ └── test_search_router.py
│ ├── cli
│ │ ├── cloud
│ │ │ ├── test_cloud_api_client_and_utils.py
│ │ │ ├── test_rclone_config_and_bmignore_filters.py
│ │ │ └── test_upload_path.py
│ │ ├── conftest.py
│ │ ├── test_auth_cli_auth.py
│ │ ├── test_cli_container.py
│ │ ├── test_cli_exit.py
│ │ ├── test_cli_tool_exit.py
│ │ ├── test_cli_tools.py
│ │ ├── test_cloud_authentication.py
│ │ ├── test_ignore_utils.py
│ │ ├── test_import_chatgpt.py
│ │ ├── test_import_claude_conversations.py
│ │ ├── test_import_claude_projects.py
│ │ ├── test_import_memory_json.py
│ │ ├── test_project_add_with_local_path.py
│ │ └── test_upload.py
│ ├── conftest.py
│ ├── db
│ │ └── test_issue_254_foreign_key_constraints.py
│ ├── importers
│ │ ├── test_conversation_indexing.py
│ │ ├── test_importer_base.py
│ │ └── test_importer_utils.py
│ ├── markdown
│ │ ├── __init__.py
│ │ ├── test_date_frontmatter_parsing.py
│ │ ├── test_entity_parser_error_handling.py
│ │ ├── test_entity_parser.py
│ │ ├── test_markdown_plugins.py
│ │ ├── test_markdown_processor.py
│ │ ├── test_observation_edge_cases.py
│ │ ├── test_parser_edge_cases.py
│ │ ├── test_relation_edge_cases.py
│ │ └── test_task_detection.py
│ ├── mcp
│ │ ├── clients
│ │ │ ├── __init__.py
│ │ │ └── test_clients.py
│ │ ├── conftest.py
│ │ ├── test_async_client_modes.py
│ │ ├── test_mcp_container.py
│ │ ├── test_obsidian_yaml_formatting.py
│ │ ├── test_permalink_collision_file_overwrite.py
│ │ ├── test_project_context.py
│ │ ├── test_prompts.py
│ │ ├── test_recent_activity_prompt_modes.py
│ │ ├── test_resources.py
│ │ ├── test_server_lifespan_branches.py
│ │ ├── test_tool_build_context.py
│ │ ├── test_tool_canvas.py
│ │ ├── test_tool_delete_note.py
│ │ ├── test_tool_edit_note.py
│ │ ├── test_tool_list_directory.py
│ │ ├── test_tool_move_note.py
│ │ ├── test_tool_project_management.py
│ │ ├── test_tool_read_content.py
│ │ ├── test_tool_read_note.py
│ │ ├── test_tool_recent_activity.py
│ │ ├── test_tool_resource.py
│ │ ├── test_tool_search.py
│ │ ├── test_tool_utils.py
│ │ ├── test_tool_view_note.py
│ │ ├── test_tool_write_note_kebab_filenames.py
│ │ ├── test_tool_write_note.py
│ │ └── tools
│ │ └── test_chatgpt_tools.py
│ ├── Non-MarkdownFileSupport.pdf
│ ├── README.md
│ ├── repository
│ │ ├── test_entity_repository_upsert.py
│ │ ├── test_entity_repository.py
│ │ ├── test_entity_upsert_issue_187.py
│ │ ├── test_observation_repository.py
│ │ ├── test_postgres_search_repository.py
│ │ ├── test_project_info_repository.py
│ │ ├── test_project_repository.py
│ │ ├── test_relation_repository.py
│ │ ├── test_repository.py
│ │ ├── test_search_repository_edit_bug_fix.py
│ │ └── test_search_repository.py
│ ├── schemas
│ │ ├── test_base_timeframe_minimum.py
│ │ ├── test_memory_serialization.py
│ │ ├── test_memory_url_validation.py
│ │ ├── test_memory_url.py
│ │ ├── test_relation_response_reference_resolution.py
│ │ ├── test_schemas.py
│ │ └── test_search.py
│ ├── Screenshot.png
│ ├── services
│ │ ├── test_context_service.py
│ │ ├── test_directory_service.py
│ │ ├── test_entity_service_disable_permalinks.py
│ │ ├── test_entity_service.py
│ │ ├── test_file_service.py
│ │ ├── test_initialization_cloud_mode_branches.py
│ │ ├── test_initialization.py
│ │ ├── test_link_resolver.py
│ │ ├── test_project_removal_bug.py
│ │ ├── test_project_service_operations.py
│ │ ├── test_project_service.py
│ │ └── test_search_service.py
│ ├── sync
│ │ ├── test_character_conflicts.py
│ │ ├── test_coordinator.py
│ │ ├── test_sync_service_incremental.py
│ │ ├── test_sync_service.py
│ │ ├── test_sync_wikilink_issue.py
│ │ ├── test_tmp_files.py
│ │ ├── test_watch_service_atomic_adds.py
│ │ ├── test_watch_service_edge_cases.py
│ │ ├── test_watch_service_reload.py
│ │ └── test_watch_service.py
│ ├── test_config.py
│ ├── test_deps.py
│ ├── test_production_cascade_delete.py
│ ├── test_project_resolver.py
│ ├── test_rclone_commands.py
│ ├── test_runtime.py
│ ├── test_telemetry.py
│ └── utils
│ ├── test_file_utils.py
│ ├── test_frontmatter_obsidian_compatible.py
│ ├── test_parse_tags.py
│ ├── test_permalink_formatting.py
│ ├── test_timezone_utils.py
│ ├── test_utf8_handling.py
│ └── test_validate_project_path.py
└── uv.lock
```
# Files
--------------------------------------------------------------------------------
/src/basic_memory/api/app.py:
--------------------------------------------------------------------------------
```python
"""FastAPI application for basic-memory knowledge graph API."""
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException
from fastapi.exception_handlers import http_exception_handler
from loguru import logger
from basic_memory import __version__ as version
from basic_memory.api.container import ApiContainer, set_container
from basic_memory.api.routers import (
directory_router,
importer_router,
knowledge,
management,
memory,
project,
resource,
search,
prompt_router,
)
from basic_memory.api.v2.routers import (
knowledge_router as v2_knowledge,
project_router as v2_project,
memory_router as v2_memory,
search_router as v2_search,
resource_router as v2_resource,
directory_router as v2_directory,
prompt_router as v2_prompt,
importer_router as v2_importer,
)
from basic_memory.config import init_api_logging
from basic_memory.services.initialization import initialize_app
@asynccontextmanager
async def lifespan(app: FastAPI): # pragma: no cover
"""Lifecycle manager for the FastAPI app. Not called in stdio mcp mode"""
# Initialize logging for API (stdout in cloud mode, file otherwise)
init_api_logging()
# --- Composition Root ---
# Create container and read config (single point of config access)
container = ApiContainer.create()
set_container(container)
app.state.container = container
logger.info(f"Starting Basic Memory API (mode={container.mode.name})")
await initialize_app(container.config)
# Cache database connections in app state for performance
logger.info("Initializing database and caching connections...")
engine, session_maker = await container.init_database()
app.state.engine = engine
app.state.session_maker = session_maker
logger.info("Database connections cached in app state")
# Create and start sync coordinator (lifecycle centralized in coordinator)
sync_coordinator = container.create_sync_coordinator()
await sync_coordinator.start()
app.state.sync_coordinator = sync_coordinator
# Proceed with startup
yield
# Shutdown - coordinator handles clean task cancellation
logger.info("Shutting down Basic Memory API")
await sync_coordinator.stop()
await container.shutdown_database()
# Initialize FastAPI app
app = FastAPI(
title="Basic Memory API",
description="Knowledge graph API for basic-memory",
version=version,
lifespan=lifespan,
)
# Include v2 routers FIRST (more specific paths must match before /{project} catch-all)
app.include_router(v2_knowledge, prefix="/v2/projects/{project_id}")
app.include_router(v2_memory, prefix="/v2/projects/{project_id}")
app.include_router(v2_search, prefix="/v2/projects/{project_id}")
app.include_router(v2_resource, prefix="/v2/projects/{project_id}")
app.include_router(v2_directory, prefix="/v2/projects/{project_id}")
app.include_router(v2_prompt, prefix="/v2/projects/{project_id}")
app.include_router(v2_importer, prefix="/v2/projects/{project_id}")
app.include_router(v2_project, prefix="/v2")
# Include v1 routers (/{project} is a catch-all, must come after specific prefixes)
app.include_router(knowledge.router, prefix="/{project}")
app.include_router(memory.router, prefix="/{project}")
app.include_router(resource.router, prefix="/{project}")
app.include_router(search.router, prefix="/{project}")
app.include_router(project.project_router, prefix="/{project}")
app.include_router(directory_router.router, prefix="/{project}")
app.include_router(prompt_router.router, prefix="/{project}")
app.include_router(importer_router.router, prefix="/{project}")
# Project resource router works across projects
app.include_router(project.project_resource_router)
app.include_router(management.router)
@app.exception_handler(Exception)
async def exception_handler(request, exc): # pragma: no cover
logger.exception(
"API unhandled exception",
url=str(request.url),
method=request.method,
client=request.client.host if request.client else None,
path=request.url.path,
error_type=type(exc).__name__,
error=str(exc),
)
return await http_exception_handler(request, HTTPException(status_code=500, detail=str(exc)))
```
--------------------------------------------------------------------------------
/.github/workflows/test.yml:
--------------------------------------------------------------------------------
```yaml
name: Tests
on:
push:
branches: [ "main" ]
pull_request:
branches: [ "main" ]
# pull_request_target runs on the BASE of the PR, not the merge result.
# It has write permissions and access to secrets.
# It's useful for PRs from forks or automated PRs but requires careful use for security reasons.
# See: https://docs.github.com/en/actions/using-workflows/events-that-trigger-workflows#pull_request_target
pull_request_target:
branches: [ "main" ]
jobs:
test-sqlite:
name: Test SQLite (${{ matrix.os }}, Python ${{ matrix.python-version }})
strategy:
fail-fast: false
matrix:
os: [ubuntu-latest, windows-latest]
python-version: [ "3.12", "3.13", "3.14" ]
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v4
with:
submodules: true
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
cache: 'pip'
- name: Install uv
run: |
pip install uv
- name: Install just (Linux/macOS)
if: runner.os != 'Windows'
run: |
curl --proto '=https' --tlsv1.2 -sSf https://just.systems/install.sh | bash -s -- --to /usr/local/bin
- name: Install just (Windows)
if: runner.os == 'Windows'
run: |
# Install just using Chocolatey (pre-installed on GitHub Actions Windows runners)
choco install just --yes
shell: pwsh
- name: Create virtual env
run: |
uv venv
- name: Install dependencies
run: |
uv pip install -e .[dev]
- name: Run type checks
run: |
just typecheck
- name: Run linting
run: |
just lint
- name: Run tests (SQLite)
run: |
uv pip install pytest pytest-cov
just test-sqlite
test-postgres:
name: Test Postgres (Python ${{ matrix.python-version }})
strategy:
fail-fast: false
matrix:
python-version: [ "3.12", "3.13", "3.14" ]
runs-on: ubuntu-latest
# Note: No services section needed - testcontainers handles Postgres in Docker
steps:
- uses: actions/checkout@v4
with:
submodules: true
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
cache: 'pip'
- name: Install uv
run: |
pip install uv
- name: Install just
run: |
curl --proto '=https' --tlsv1.2 -sSf https://just.systems/install.sh | bash -s -- --to /usr/local/bin
- name: Create virtual env
run: |
uv venv
- name: Install dependencies
run: |
uv pip install -e .[dev]
- name: Run tests (Postgres via testcontainers)
run: |
uv pip install pytest pytest-cov
just test-postgres
coverage:
name: Coverage Summary (combined, Python 3.12)
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
submodules: true
- name: Set up Python 3.12
uses: actions/setup-python@v4
with:
python-version: "3.12"
cache: "pip"
- name: Install uv
run: |
pip install uv
- name: Install just
run: |
curl --proto '=https' --tlsv1.2 -sSf https://just.systems/install.sh | bash -s -- --to /usr/local/bin
- name: Create virtual env
run: |
uv venv
- name: Install dependencies
run: |
uv pip install -e .[dev]
- name: Run combined coverage (SQLite + Postgres)
run: |
uv pip install pytest pytest-cov
just coverage
- name: Add coverage report to job summary
if: always()
run: |
{
echo "## Coverage"
echo ""
echo '```'
uv run coverage report -m
echo '```'
} >> "$GITHUB_STEP_SUMMARY"
- name: Upload HTML coverage report
if: always()
uses: actions/upload-artifact@v4
with:
name: htmlcov
path: htmlcov/
```
--------------------------------------------------------------------------------
/src/basic_memory/alembic/versions/3dae7c7b1564_initial_schema.py:
--------------------------------------------------------------------------------
```python
"""initial schema
Revision ID: 3dae7c7b1564
Revises:
Create Date: 2025-02-12 21:23:00.336344
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = "3dae7c7b1564"
down_revision: Union[str, None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"entity",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("title", sa.String(), nullable=False),
sa.Column("entity_type", sa.String(), nullable=False),
sa.Column("entity_metadata", sa.JSON(), nullable=True),
sa.Column("content_type", sa.String(), nullable=False),
sa.Column("permalink", sa.String(), nullable=False),
sa.Column("file_path", sa.String(), nullable=False),
sa.Column("checksum", sa.String(), nullable=True),
sa.Column("created_at", sa.DateTime(), nullable=False),
sa.Column("updated_at", sa.DateTime(), nullable=False),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint("permalink", name="uix_entity_permalink"),
)
op.create_index("ix_entity_created_at", "entity", ["created_at"], unique=False)
op.create_index(op.f("ix_entity_file_path"), "entity", ["file_path"], unique=True)
op.create_index(op.f("ix_entity_permalink"), "entity", ["permalink"], unique=True)
op.create_index("ix_entity_title", "entity", ["title"], unique=False)
op.create_index("ix_entity_type", "entity", ["entity_type"], unique=False)
op.create_index("ix_entity_updated_at", "entity", ["updated_at"], unique=False)
op.create_table(
"observation",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("entity_id", sa.Integer(), nullable=False),
sa.Column("content", sa.Text(), nullable=False),
sa.Column("category", sa.String(), nullable=False),
sa.Column("context", sa.Text(), nullable=True),
sa.Column("tags", sa.JSON(), server_default="[]", nullable=True),
sa.ForeignKeyConstraint(["entity_id"], ["entity.id"], ondelete="CASCADE"),
sa.PrimaryKeyConstraint("id"),
)
op.create_index("ix_observation_category", "observation", ["category"], unique=False)
op.create_index("ix_observation_entity_id", "observation", ["entity_id"], unique=False)
op.create_table(
"relation",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("from_id", sa.Integer(), nullable=False),
sa.Column("to_id", sa.Integer(), nullable=True),
sa.Column("to_name", sa.String(), nullable=False),
sa.Column("relation_type", sa.String(), nullable=False),
sa.Column("context", sa.Text(), nullable=True),
sa.ForeignKeyConstraint(["from_id"], ["entity.id"], ondelete="CASCADE"),
sa.ForeignKeyConstraint(["to_id"], ["entity.id"], ondelete="CASCADE"),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint("from_id", "to_id", "relation_type", name="uix_relation"),
)
op.create_index("ix_relation_from_id", "relation", ["from_id"], unique=False)
op.create_index("ix_relation_to_id", "relation", ["to_id"], unique=False)
op.create_index("ix_relation_type", "relation", ["relation_type"], unique=False)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index("ix_relation_type", table_name="relation")
op.drop_index("ix_relation_to_id", table_name="relation")
op.drop_index("ix_relation_from_id", table_name="relation")
op.drop_table("relation")
op.drop_index("ix_observation_entity_id", table_name="observation")
op.drop_index("ix_observation_category", table_name="observation")
op.drop_table("observation")
op.drop_index("ix_entity_updated_at", table_name="entity")
op.drop_index("ix_entity_type", table_name="entity")
op.drop_index("ix_entity_title", table_name="entity")
op.drop_index(op.f("ix_entity_permalink"), table_name="entity")
op.drop_index(op.f("ix_entity_file_path"), table_name="entity")
op.drop_index("ix_entity_created_at", table_name="entity")
op.drop_table("entity")
# ### end Alembic commands ###
```
--------------------------------------------------------------------------------
/src/basic_memory/cli/commands/db.py:
--------------------------------------------------------------------------------
```python
"""Database management commands."""
from pathlib import Path
import typer
from loguru import logger
from rich.console import Console
from sqlalchemy.exc import OperationalError
from basic_memory import db
from basic_memory.cli.app import app
from basic_memory.cli.commands.command_utils import run_with_cleanup
from basic_memory.config import ConfigManager
from basic_memory.repository import ProjectRepository
from basic_memory.services.initialization import reconcile_projects_with_config
from basic_memory.sync.sync_service import get_sync_service
console = Console()
async def _reindex_projects(app_config):
"""Reindex all projects in a single async context.
This ensures all database operations use the same event loop,
and proper cleanup happens when the function completes.
"""
try:
await reconcile_projects_with_config(app_config)
# Get database session (migrations already run if needed)
_, session_maker = await db.get_or_create_db(
db_path=app_config.database_path,
db_type=db.DatabaseType.FILESYSTEM,
)
project_repository = ProjectRepository(session_maker)
projects = await project_repository.get_active_projects()
for project in projects:
console.print(f" Indexing [cyan]{project.name}[/cyan]...")
logger.info(f"Starting sync for project: {project.name}")
sync_service = await get_sync_service(project)
sync_dir = Path(project.path)
await sync_service.sync(sync_dir, project_name=project.name)
logger.info(f"Sync completed for project: {project.name}")
finally:
# Clean up database connections before event loop closes
await db.shutdown_db()
@app.command()
def reset(
reindex: bool = typer.Option(False, "--reindex", help="Rebuild db index from filesystem"),
): # pragma: no cover
"""Reset database (drop all tables and recreate)."""
console.print(
"[yellow]Note:[/yellow] This only deletes the index database. "
"Your markdown note files will not be affected.\n"
"Use [green]bm reset --reindex[/green] to automatically rebuild the index afterward."
)
if typer.confirm("Reset the database index?"):
logger.info("Resetting database...")
config_manager = ConfigManager()
app_config = config_manager.config
# Get database path
db_path = app_config.app_database_path
# Delete the database file and WAL files if they exist
for suffix in ["", "-shm", "-wal"]:
path = db_path.parent / f"{db_path.name}{suffix}"
if path.exists():
try:
path.unlink()
logger.info(f"Deleted: {path}")
except OSError as e:
console.print(
f"[red]Error:[/red] Cannot delete {path.name}: {e}\n"
"The database may be in use by another process (e.g., MCP server).\n"
"Please close Claude Desktop or any other Basic Memory clients and try again."
)
raise typer.Exit(1)
# Create a new empty database (preserves project configuration)
try:
run_with_cleanup(db.run_migrations(app_config))
except OperationalError as e:
if "disk I/O error" in str(e) or "database is locked" in str(e):
console.print(
"[red]Error:[/red] Cannot access database. "
"It may be in use by another process (e.g., MCP server).\n"
"Please close Claude Desktop or any other Basic Memory clients and try again."
)
raise typer.Exit(1)
raise
console.print("[green]Database reset complete[/green]")
if reindex:
projects = list(app_config.projects)
if not projects:
console.print("[yellow]No projects configured. Skipping reindex.[/yellow]")
else:
console.print(f"Rebuilding search index for {len(projects)} project(s)...")
# Note: _reindex_projects has its own cleanup, but run_with_cleanup
# ensures db.shutdown_db() is called even if _reindex_projects changes
run_with_cleanup(_reindex_projects(app_config))
console.print("[green]Reindex complete[/green]")
```
--------------------------------------------------------------------------------
/tests/sync/test_coordinator.py:
--------------------------------------------------------------------------------
```python
"""Tests for SyncCoordinator - centralized sync/watch lifecycle."""
import pytest
from unittest.mock import AsyncMock, patch
from basic_memory.config import BasicMemoryConfig
from basic_memory.sync.coordinator import SyncCoordinator, SyncStatus
class TestSyncCoordinator:
"""Test SyncCoordinator class."""
@pytest.fixture
def mock_config(self):
"""Create a mock config for testing."""
return BasicMemoryConfig()
def test_initial_status(self, mock_config):
"""Coordinator starts in NOT_STARTED state."""
coordinator = SyncCoordinator(config=mock_config)
assert coordinator.status == SyncStatus.NOT_STARTED
assert coordinator.is_running is False
@pytest.mark.asyncio
async def test_start_when_sync_disabled(self, mock_config):
"""When should_sync is False, start() sets status to STOPPED."""
coordinator = SyncCoordinator(
config=mock_config,
should_sync=False,
skip_reason="Test skip",
)
await coordinator.start()
assert coordinator.status == SyncStatus.STOPPED
assert coordinator.is_running is False
@pytest.mark.asyncio
async def test_stop_when_not_started(self, mock_config):
"""Stop is safe to call when not started."""
coordinator = SyncCoordinator(config=mock_config)
await coordinator.stop() # Should not raise
assert coordinator.status == SyncStatus.NOT_STARTED
@pytest.mark.asyncio
async def test_stop_when_stopped(self, mock_config):
"""Stop is idempotent when already stopped."""
coordinator = SyncCoordinator(
config=mock_config,
should_sync=False,
)
await coordinator.start() # Sets to STOPPED
await coordinator.stop() # Should not raise
assert coordinator.status == SyncStatus.STOPPED
def test_get_status_info(self, mock_config):
"""get_status_info returns diagnostic info."""
coordinator = SyncCoordinator(
config=mock_config,
should_sync=True,
skip_reason=None,
)
info = coordinator.get_status_info()
assert info["status"] == "NOT_STARTED"
assert info["should_sync"] is True
assert info["skip_reason"] is None
assert info["has_task"] is False
def test_get_status_info_with_skip_reason(self, mock_config):
"""get_status_info includes skip reason."""
coordinator = SyncCoordinator(
config=mock_config,
should_sync=False,
skip_reason="Test environment detected",
)
info = coordinator.get_status_info()
assert info["should_sync"] is False
assert info["skip_reason"] == "Test environment detected"
@pytest.mark.asyncio
async def test_start_creates_task(self, mock_config):
"""When should_sync is True, start() creates a background task."""
coordinator = SyncCoordinator(
config=mock_config,
should_sync=True,
)
# Mock initialize_file_sync to avoid actually starting sync
# The import happens inside start(), so patch at the source module
with patch(
"basic_memory.services.initialization.initialize_file_sync",
new_callable=AsyncMock,
):
# Start coordinator
await coordinator.start()
# Should be running with a task
assert coordinator.status == SyncStatus.RUNNING
assert coordinator.is_running is True
assert coordinator._sync_task is not None
# Stop to clean up
await coordinator.stop()
assert coordinator.status == SyncStatus.STOPPED
assert coordinator._sync_task is None
@pytest.mark.asyncio
async def test_start_already_running(self, mock_config):
"""Starting when already running is a no-op."""
coordinator = SyncCoordinator(
config=mock_config,
should_sync=True,
)
with patch(
"basic_memory.services.initialization.initialize_file_sync",
new_callable=AsyncMock,
):
await coordinator.start()
first_task = coordinator._sync_task
# Start again - should not create new task
await coordinator.start()
assert coordinator._sync_task is first_task
await coordinator.stop()
```
--------------------------------------------------------------------------------
/tests/schemas/test_base_timeframe_minimum.py:
--------------------------------------------------------------------------------
```python
"""Test minimum 1-day timeframe enforcement for timezone handling."""
from datetime import datetime, timedelta
import pytest
from freezegun import freeze_time
from basic_memory.schemas.base import parse_timeframe
class TestTimeframeMinimum:
"""Test that parse_timeframe enforces a minimum 1-day lookback."""
@freeze_time("2025-01-15 15:00:00")
def test_today_returns_one_day_ago(self):
"""Test that 'today' returns 1 day ago instead of start of today."""
result = parse_timeframe("today")
now = datetime.now()
one_day_ago = now - timedelta(days=1)
# Should be approximately 1 day ago (within a second for test tolerance)
diff = abs((result.replace(tzinfo=None) - one_day_ago).total_seconds())
assert diff < 1, f"Expected ~1 day ago, got {result}"
@freeze_time("2025-01-15 15:00:00")
def test_one_hour_returns_one_day_minimum(self):
"""Test that '1h' returns 1 day ago due to minimum enforcement."""
result = parse_timeframe("1h")
now = datetime.now()
one_day_ago = now - timedelta(days=1)
# Should be approximately 1 day ago, not 1 hour ago
diff = abs((result.replace(tzinfo=None) - one_day_ago).total_seconds())
assert diff < 1, f"Expected ~1 day ago for '1h', got {result}"
@freeze_time("2025-01-15 15:00:00")
def test_six_hours_returns_one_day_minimum(self):
"""Test that '6h' returns 1 day ago due to minimum enforcement."""
result = parse_timeframe("6h")
now = datetime.now()
one_day_ago = now - timedelta(days=1)
# Should be approximately 1 day ago, not 6 hours ago
diff = abs((result.replace(tzinfo=None) - one_day_ago).total_seconds())
assert diff < 1, f"Expected ~1 day ago for '6h', got {result}"
@freeze_time("2025-01-15 15:00:00")
def test_one_day_returns_one_day(self):
"""Test that '1d' correctly returns approximately 1 day ago."""
result = parse_timeframe("1d")
now = datetime.now()
one_day_ago = now - timedelta(days=1)
# Should be approximately 1 day ago (within 24 hours)
diff_hours = abs((result.replace(tzinfo=None) - one_day_ago).total_seconds()) / 3600
assert diff_hours < 24, (
f"Expected ~1 day ago for '1d', got {result} (diff: {diff_hours} hours)"
)
@freeze_time("2025-01-15 15:00:00")
def test_two_days_returns_two_days(self):
"""Test that '2d' correctly returns approximately 2 days ago (not affected by minimum)."""
result = parse_timeframe("2d")
now = datetime.now()
two_days_ago = now - timedelta(days=2)
# Should be approximately 2 days ago (within 24 hours)
diff_hours = abs((result.replace(tzinfo=None) - two_days_ago).total_seconds()) / 3600
assert diff_hours < 24, (
f"Expected ~2 days ago for '2d', got {result} (diff: {diff_hours} hours)"
)
@freeze_time("2025-01-15 15:00:00")
def test_one_week_returns_one_week(self):
"""Test that '1 week' correctly returns approximately 1 week ago (not affected by minimum)."""
result = parse_timeframe("1 week")
now = datetime.now()
one_week_ago = now - timedelta(weeks=1)
# Should be approximately 1 week ago (within 24 hours)
diff_hours = abs((result.replace(tzinfo=None) - one_week_ago).total_seconds()) / 3600
assert diff_hours < 24, (
f"Expected ~1 week ago for '1 week', got {result} (diff: {diff_hours} hours)"
)
@freeze_time("2025-01-15 15:00:00")
def test_zero_days_returns_one_day_minimum(self):
"""Test that '0d' returns 1 day ago due to minimum enforcement."""
result = parse_timeframe("0d")
now = datetime.now()
one_day_ago = now - timedelta(days=1)
# Should be approximately 1 day ago, not now
diff = abs((result.replace(tzinfo=None) - one_day_ago).total_seconds())
assert diff < 1, f"Expected ~1 day ago for '0d', got {result}"
def test_timezone_awareness(self):
"""Test that returned datetime is timezone-aware."""
result = parse_timeframe("1d")
assert result.tzinfo is not None, "Expected timezone-aware datetime"
def test_invalid_timeframe_raises_error(self):
"""Test that invalid timeframe strings raise ValueError."""
with pytest.raises(ValueError, match="Could not parse timeframe"):
parse_timeframe("invalid_timeframe")
```
--------------------------------------------------------------------------------
/src/basic_memory/api/routers/importer_router.py:
--------------------------------------------------------------------------------
```python
"""Import router for Basic Memory API."""
import json
import logging
from fastapi import APIRouter, Form, HTTPException, UploadFile, status
from basic_memory.deps import (
ChatGPTImporterDep,
ClaudeConversationsImporterDep,
ClaudeProjectsImporterDep,
MemoryJsonImporterDep,
)
from basic_memory.importers import Importer
from basic_memory.schemas.importer import (
ChatImportResult,
EntityImportResult,
ProjectImportResult,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/import", tags=["import"])
@router.post("/chatgpt", response_model=ChatImportResult)
async def import_chatgpt(
importer: ChatGPTImporterDep,
file: UploadFile,
folder: str = Form("conversations"),
) -> ChatImportResult:
"""Import conversations from ChatGPT JSON export.
Args:
file: The ChatGPT conversations.json file.
folder: The folder to place the files in.
markdown_processor: MarkdownProcessor instance.
Returns:
ChatImportResult with import statistics.
Raises:
HTTPException: If import fails.
"""
return await import_file(importer, file, folder)
@router.post("/claude/conversations", response_model=ChatImportResult)
async def import_claude_conversations(
importer: ClaudeConversationsImporterDep,
file: UploadFile,
folder: str = Form("conversations"),
) -> ChatImportResult:
"""Import conversations from Claude conversations.json export.
Args:
file: The Claude conversations.json file.
folder: The folder to place the files in.
markdown_processor: MarkdownProcessor instance.
Returns:
ChatImportResult with import statistics.
Raises:
HTTPException: If import fails.
"""
return await import_file(importer, file, folder)
@router.post("/claude/projects", response_model=ProjectImportResult)
async def import_claude_projects(
importer: ClaudeProjectsImporterDep,
file: UploadFile,
folder: str = Form("projects"),
) -> ProjectImportResult:
"""Import projects from Claude projects.json export.
Args:
file: The Claude projects.json file.
base_folder: The base folder to place the files in.
markdown_processor: MarkdownProcessor instance.
Returns:
ProjectImportResult with import statistics.
Raises:
HTTPException: If import fails.
"""
return await import_file(importer, file, folder)
@router.post("/memory-json", response_model=EntityImportResult)
async def import_memory_json(
importer: MemoryJsonImporterDep,
file: UploadFile,
folder: str = Form("conversations"),
) -> EntityImportResult:
"""Import entities and relations from a memory.json file.
Args:
file: The memory.json file.
destination_folder: Optional destination folder within the project.
markdown_processor: MarkdownProcessor instance.
Returns:
EntityImportResult with import statistics.
Raises:
HTTPException: If import fails.
"""
try:
file_data = []
file_bytes = await file.read()
file_str = file_bytes.decode("utf-8")
for line in file_str.splitlines():
json_data = json.loads(line)
file_data.append(json_data)
result = await importer.import_data(file_data, folder)
if not result.success: # pragma: no cover
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=result.error_message or "Import failed",
)
except Exception as e:
logger.exception("Import failed")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Import failed: {str(e)}",
)
return result
async def import_file(importer: Importer, file: UploadFile, destination_folder: str):
try:
# Process file
json_data = json.load(file.file)
result = await importer.import_data(json_data, destination_folder)
if not result.success: # pragma: no cover
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=result.error_message or "Import failed",
)
return result
except Exception as e:
logger.exception("Import failed")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Import failed: {str(e)}",
)
```
--------------------------------------------------------------------------------
/tests/cli/test_import_claude_projects.py:
--------------------------------------------------------------------------------
```python
"""Tests for import_claude_projects command."""
import json
import pytest
from typer.testing import CliRunner
from basic_memory.cli.app import app
from basic_memory.cli.commands.import_claude_projects import import_projects # noqa
from basic_memory.config import get_project_config
# Set up CLI runner
runner = CliRunner()
@pytest.fixture
def sample_project():
"""Sample project data for testing."""
return {
"uuid": "test-uuid",
"name": "Test Project",
"created_at": "2025-01-05T20:55:32.499880+00:00",
"updated_at": "2025-01-05T20:56:39.477600+00:00",
"prompt_template": "# Test Prompt\n\nThis is a test prompt.",
"docs": [
{
"uuid": "doc-uuid-1",
"filename": "Test Document",
"content": "# Test Document\n\nThis is test content.",
"created_at": "2025-01-05T20:56:39.477600+00:00",
},
{
"uuid": "doc-uuid-2",
"filename": "Another Document",
"content": "# Another Document\n\nMore test content.",
"created_at": "2025-01-05T20:56:39.477600+00:00",
},
],
}
@pytest.fixture
def sample_projects_json(tmp_path, sample_project):
"""Create a sample projects.json file."""
json_file = tmp_path / "projects.json"
with open(json_file, "w", encoding="utf-8") as f:
json.dump([sample_project], f)
return json_file
def test_import_projects_command_file_not_found(tmp_path):
"""Test error handling for nonexistent file."""
nonexistent = tmp_path / "nonexistent.json"
result = runner.invoke(app, ["import", "claude", "projects", str(nonexistent)])
assert result.exit_code == 1
assert "File not found" in result.output
def test_import_projects_command_success(tmp_path, sample_projects_json, monkeypatch):
"""Test successful project import via command."""
# Set up test environment
config = get_project_config()
config.home = tmp_path
# Run import
result = runner.invoke(app, ["import", "claude", "projects", str(sample_projects_json)])
assert result.exit_code == 0
assert "Import complete" in result.output
assert "Imported 2 project documents" in result.output
assert "Imported 1 prompt templates" in result.output
def test_import_projects_command_invalid_json(tmp_path):
"""Test error handling for invalid JSON."""
# Create invalid JSON file
invalid_file = tmp_path / "invalid.json"
invalid_file.write_text("not json")
result = runner.invoke(app, ["import", "claude", "projects", str(invalid_file)])
assert result.exit_code == 1
assert "Error during import" in result.output
def test_import_projects_with_base_folder(tmp_path, sample_projects_json, monkeypatch):
"""Test import with custom base folder."""
# Set up test environment
config = get_project_config()
config.home = tmp_path
base_folder = "claude-exports"
# Run import
result = runner.invoke(
app,
[
"import",
"claude",
"projects",
str(sample_projects_json),
"--base-folder",
base_folder,
],
)
assert result.exit_code == 0
# Check files in base folder
project_dir = tmp_path / base_folder / "Test_Project"
assert project_dir.exists()
assert (project_dir / "docs").exists()
assert (project_dir / "prompt-template.md").exists()
def test_import_project_without_prompt(tmp_path):
"""Test importing project without prompt template."""
# Create project without prompt
project = {
"uuid": "test-uuid",
"name": "No Prompt Project",
"created_at": "2025-01-05T20:55:32.499880+00:00",
"updated_at": "2025-01-05T20:56:39.477600+00:00",
"docs": [
{
"uuid": "doc-uuid-1",
"filename": "Test Document",
"content": "# Test Document\n\nContent.",
"created_at": "2025-01-05T20:56:39.477600+00:00",
}
],
}
json_file = tmp_path / "no_prompt.json"
with open(json_file, "w", encoding="utf-8") as f:
json.dump([project], f)
# Set up environment
config = get_project_config()
config.home = tmp_path
# Run import
result = runner.invoke(app, ["import", "claude", "projects", str(json_file)])
assert result.exit_code == 0
assert "Imported 1 project documents" in result.output
assert "Imported 0 prompt templates" in result.output
```
--------------------------------------------------------------------------------
/src/basic_memory/services/link_resolver.py:
--------------------------------------------------------------------------------
```python
"""Service for resolving markdown links to permalinks."""
from typing import Optional, Tuple
from loguru import logger
from basic_memory.models import Entity
from basic_memory.repository.entity_repository import EntityRepository
from basic_memory.schemas.search import SearchQuery, SearchItemType
from basic_memory.services.search_service import SearchService
class LinkResolver:
"""Service for resolving markdown links to permalinks.
Uses a combination of exact matching and search-based resolution:
1. Try exact permalink match (fastest)
2. Try exact title match
3. Try exact file path match
4. Try file path with .md extension (for folder/title patterns)
5. Fall back to search for fuzzy matching
"""
def __init__(self, entity_repository: EntityRepository, search_service: SearchService):
"""Initialize with repositories."""
self.entity_repository = entity_repository
self.search_service = search_service
async def resolve_link(
self, link_text: str, use_search: bool = True, strict: bool = False
) -> Optional[Entity]:
"""Resolve a markdown link to a permalink.
Args:
link_text: The link text to resolve
use_search: Whether to use search-based fuzzy matching as fallback
strict: If True, only exact matches are allowed (no fuzzy search fallback)
"""
logger.trace(f"Resolving link: {link_text}")
# Clean link text and extract any alias
clean_text, alias = self._normalize_link_text(link_text)
# 1. Try exact permalink match first (most efficient)
entity = await self.entity_repository.get_by_permalink(clean_text)
if entity:
logger.debug(f"Found exact permalink match: {entity.permalink}")
return entity
# 2. Try exact title match
found = await self.entity_repository.get_by_title(clean_text)
if found:
# Return first match if there are duplicates (consistent behavior)
entity = found[0]
logger.debug(f"Found title match: {entity.title}")
return entity
# 3. Try file path
found_path = await self.entity_repository.get_by_file_path(clean_text)
if found_path:
logger.debug(f"Found entity with path: {found_path.file_path}")
return found_path
# 4. Try file path with .md extension if not already present
if not clean_text.endswith(".md") and "/" in clean_text:
file_path_with_md = f"{clean_text}.md"
found_path_md = await self.entity_repository.get_by_file_path(file_path_with_md)
if found_path_md:
logger.debug(f"Found entity with path (with .md): {found_path_md.file_path}")
return found_path_md
# In strict mode, don't try fuzzy search - return None if no exact match found
if strict:
return None
# 5. Fall back to search for fuzzy matching (only if not in strict mode)
if use_search and "*" not in clean_text:
results = await self.search_service.search(
query=SearchQuery(text=clean_text, entity_types=[SearchItemType.ENTITY]),
)
if results:
# Look for best match
best_match = min(results, key=lambda x: x.score) # pyright: ignore
logger.trace(
f"Selected best match from {len(results)} results: {best_match.permalink}"
)
if best_match.permalink:
return await self.entity_repository.get_by_permalink(best_match.permalink)
# if we couldn't find anything then return None
return None
def _normalize_link_text(self, link_text: str) -> Tuple[str, Optional[str]]:
"""Normalize link text and extract alias if present.
Args:
link_text: Raw link text from markdown
Returns:
Tuple of (normalized_text, alias or None)
"""
# Strip whitespace
text = link_text.strip()
# Remove enclosing brackets if present
if text.startswith("[[") and text.endswith("]]"):
text = text[2:-2]
# Handle Obsidian-style aliases (format: [[actual|alias]])
alias = None
if "|" in text:
text, alias = text.split("|", 1)
text = text.strip()
alias = alias.strip()
else:
# Strip whitespace from text even if no alias
text = text.strip()
return text, alias
```
--------------------------------------------------------------------------------
/tests/mcp/test_tool_delete_note.py:
--------------------------------------------------------------------------------
```python
"""Tests for delete_note MCP tool."""
from basic_memory.mcp.tools.delete_note import _format_delete_error_response
class TestDeleteNoteErrorFormatting:
"""Test the error formatting function for better user experience."""
def test_format_delete_error_note_not_found(self, test_project):
"""Test formatting for note not found errors."""
result = _format_delete_error_response(test_project.name, "entity not found", "test-note")
assert "# Delete Failed - Note Not Found" in result
assert "The note 'test-note' could not be found" in result
assert 'search_notes("test-project", "test-note")' in result
assert "Already deleted" in result
assert "Wrong identifier" in result
def test_format_delete_error_permission_denied(self, test_project):
"""Test formatting for permission errors."""
result = _format_delete_error_response(test_project.name, "permission denied", "test-note")
assert "# Delete Failed - Permission Error" in result
assert "You don't have permission to delete 'test-note'" in result
assert "Check permissions" in result
assert "File locks" in result
assert "list_memory_projects()" in result
def test_format_delete_error_access_forbidden(self, test_project):
"""Test formatting for access forbidden errors."""
result = _format_delete_error_response(test_project.name, "access forbidden", "test-note")
assert "# Delete Failed - Permission Error" in result
assert "You don't have permission to delete 'test-note'" in result
def test_format_delete_error_server_error(self, test_project):
"""Test formatting for server errors."""
result = _format_delete_error_response(
test_project.name, "server error occurred", "test-note"
)
assert "# Delete Failed - System Error" in result
assert "A system error occurred while deleting 'test-note'" in result
assert "Try again" in result
assert "Check file status" in result
def test_format_delete_error_filesystem_error(self, test_project):
"""Test formatting for filesystem errors."""
result = _format_delete_error_response(test_project.name, "filesystem error", "test-note")
assert "# Delete Failed - System Error" in result
assert "A system error occurred while deleting 'test-note'" in result
def test_format_delete_error_disk_error(self, test_project):
"""Test formatting for disk errors."""
result = _format_delete_error_response(test_project.name, "disk full", "test-note")
assert "# Delete Failed - System Error" in result
assert "A system error occurred while deleting 'test-note'" in result
def test_format_delete_error_database_error(self, test_project):
"""Test formatting for database errors."""
result = _format_delete_error_response(test_project.name, "database error", "test-note")
assert "# Delete Failed - Database Error" in result
assert "A database error occurred while deleting 'test-note'" in result
assert "Sync conflict" in result
assert "Database lock" in result
def test_format_delete_error_sync_error(self, test_project):
"""Test formatting for sync errors."""
result = _format_delete_error_response(test_project.name, "sync failed", "test-note")
assert "# Delete Failed - Database Error" in result
assert "A database error occurred while deleting 'test-note'" in result
def test_format_delete_error_generic(self, test_project):
"""Test formatting for generic errors."""
result = _format_delete_error_response(test_project.name, "unknown error", "test-note")
assert "# Delete Failed" in result
assert "Error deleting note 'test-note': unknown error" in result
assert "General troubleshooting" in result
assert "Verify the note exists" in result
def test_format_delete_error_with_complex_identifier(self, test_project):
"""Test formatting with complex identifiers (permalinks)."""
result = _format_delete_error_response(
test_project.name, "entity not found", "folder/note-title"
)
assert 'search_notes("test-project", "note-title")' in result
assert "Note Title" in result # Title format
assert "folder/note-title" in result # Permalink format
# Integration tests removed to focus on error formatting coverage
# The error formatting tests above provide the necessary coverage for MCP tool error messaging
```
--------------------------------------------------------------------------------
/.claude/commands/release/changelog.md:
--------------------------------------------------------------------------------
```markdown
# /changelog - Generate or Update Changelog Entry
Analyze commits and generate formatted changelog entry for a version.
## Usage
```
/changelog <version> [type]
```
**Parameters:**
- `version` (required): Version like `v0.14.0` or `v0.14.0b1`
- `type` (optional): `beta`, `rc`, or `stable` (default: `stable`)
## Implementation
You are an expert technical writer for the Basic Memory project. When the user runs `/changelog`, execute the following steps:
### Step 1: Version Analysis
1. **Determine Commit Range**
```bash
# Find last release tag
git tag -l "v*" --sort=-version:refname | grep -v "b\|rc" | head -1
# Get commits since last release
git log --oneline ${last_tag}..HEAD
```
2. **Parse Conventional Commits**
- Extract feat: (features)
- Extract fix: (bug fixes)
- Extract BREAKING CHANGE: (breaking changes)
- Extract chore:, docs:, test: (other improvements)
### Step 2: Categorize Changes
1. **Features (feat:)**
- New MCP tools
- New CLI commands
- New API endpoints
- Major functionality additions
2. **Bug Fixes (fix:)**
- User-facing bug fixes
- Critical issues resolved
- Performance improvements
- Security fixes
3. **Technical Improvements**
- Test coverage improvements
- Code quality enhancements
- Dependency updates
- Documentation updates
4. **Breaking Changes**
- API changes
- Configuration changes
- Behavior changes
- Migration requirements
### Step 3: Generate Changelog Entry
Create formatted entry following existing CHANGELOG.md style:
Example:
```markdown
## <version> (<date>)
### Features
- **Multi-Project Management System** - Switch between projects instantly during conversations
([`993e88a`](https://github.com/basicmachines-co/basic-memory/commit/993e88a))
- Instant project switching with session context
- Project-specific operations and isolation
- Project discovery and management tools
- **Advanced Note Editing** - Incremental editing with append, prepend, find/replace, and section operations
([`6fc3904`](https://github.com/basicmachines-co/basic-memory/commit/6fc3904))
- `edit_note` tool with multiple operation types
- Smart frontmatter-aware editing
- Validation and error handling
### Bug Fixes
- **#118**: Fix YAML tag formatting to follow standard specification
([`2dc7e27`](https://github.com/basicmachines-co/basic-memory/commit/2dc7e27))
- **#110**: Make --project flag work consistently across CLI commands
([`02dd91a`](https://github.com/basicmachines-co/basic-memory/commit/02dd91a))
### Technical Improvements
- **Comprehensive Testing** - 100% test coverage with integration testing
([`468a22f`](https://github.com/basicmachines-co/basic-memory/commit/468a22f))
- MCP integration test suite
- End-to-end testing framework
- Performance and edge case validation
### Breaking Changes
- **Database Migration**: Automatic migration from per-project to unified database.
Data will be re-index from the filesystem, resulting in no data loss.
- **Configuration Changes**: Projects now synced between config.json and database
- **Full Backward Compatibility**: All existing setups continue to work seamlessly
```
### Step 4: Integration
1. **Update CHANGELOG.md**
- Insert new entry at top
- Maintain consistent formatting
- Include commit links and issue references
2. **Validation**
- Check all major changes are captured
- Verify commit links work
- Ensure issue numbers are correct
## Smart Analysis Features
### Automatic Classification
- Detect feature additions from file changes
- Identify bug fixes from commit messages
- Find breaking changes from code analysis
- Extract issue numbers from commit messages
### Content Enhancement
- Add context for technical changes
- Include migration guidance for breaking changes
- Suggest installation/upgrade instructions
- Link to relevant documentation
## Output Format
### For Beta Releases
Example:
```markdown
## v0.13.0b4 (2025-06-03)
### Beta Changes Since v0.13.0b3
- Fix FastMCP API compatibility issues
- Update dependencies to latest versions
- Resolve setuptools import error
### Installation
```bash
uv tool install basic-memory --prerelease=allow
```
### Known Issues
- [List any known issues for beta testing]
```
### For Stable Releases
Full changelog with complete feature list, organized by impact and category.
## Context
- Follows existing CHANGELOG.md format and style
- Uses conventional commit standards
- Includes GitHub commit links for traceability
- Focuses on user-facing changes and value
- Maintains consistency with previous entries
```
--------------------------------------------------------------------------------
/src/basic_memory/alembic/versions/647e7a75e2cd_project_constraint_fix.py:
--------------------------------------------------------------------------------
```python
"""project constraint fix
Revision ID: 647e7a75e2cd
Revises: 5fe1ab1ccebe
Create Date: 2025-06-03 12:48:30.162566
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = "647e7a75e2cd"
down_revision: Union[str, None] = "5fe1ab1ccebe"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Remove the problematic UNIQUE constraint on is_default column.
The UNIQUE constraint prevents multiple projects from having is_default=FALSE,
which breaks project creation when the service sets is_default=False.
SQLite: Recreate the table without the constraint (no ALTER TABLE support)
Postgres: Use ALTER TABLE to drop the constraint directly
"""
connection = op.get_bind()
is_sqlite = connection.dialect.name == "sqlite"
if is_sqlite:
# For SQLite, we need to recreate the table without the UNIQUE constraint
# Create a new table without the UNIQUE constraint on is_default
op.create_table(
"project_new",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("name", sa.String(), nullable=False),
sa.Column("description", sa.Text(), nullable=True),
sa.Column("permalink", sa.String(), nullable=False),
sa.Column("path", sa.String(), nullable=False),
sa.Column("is_active", sa.Boolean(), nullable=False),
sa.Column("is_default", sa.Boolean(), nullable=True), # No UNIQUE constraint!
sa.Column("created_at", sa.DateTime(), nullable=False),
sa.Column("updated_at", sa.DateTime(), nullable=False),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint("name"),
sa.UniqueConstraint("permalink"),
)
# Copy data from old table to new table
op.execute("INSERT INTO project_new SELECT * FROM project")
# Drop the old table
op.drop_table("project")
# Rename the new table
op.rename_table("project_new", "project")
# Recreate the indexes
with op.batch_alter_table("project", schema=None) as batch_op:
batch_op.create_index("ix_project_created_at", ["created_at"], unique=False)
batch_op.create_index("ix_project_name", ["name"], unique=True)
batch_op.create_index("ix_project_path", ["path"], unique=False)
batch_op.create_index("ix_project_permalink", ["permalink"], unique=True)
batch_op.create_index("ix_project_updated_at", ["updated_at"], unique=False)
else:
# For Postgres, we can simply drop the constraint
with op.batch_alter_table("project", schema=None) as batch_op:
batch_op.drop_constraint("project_is_default_key", type_="unique")
def downgrade() -> None:
"""Add back the UNIQUE constraint on is_default column.
WARNING: This will break project creation again if multiple projects
have is_default=FALSE.
"""
# Recreate the table with the UNIQUE constraint
op.create_table(
"project_old",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("name", sa.String(), nullable=False),
sa.Column("description", sa.Text(), nullable=True),
sa.Column("permalink", sa.String(), nullable=False),
sa.Column("path", sa.String(), nullable=False),
sa.Column("is_active", sa.Boolean(), nullable=False),
sa.Column("is_default", sa.Boolean(), nullable=True),
sa.Column("created_at", sa.DateTime(), nullable=False),
sa.Column("updated_at", sa.DateTime(), nullable=False),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint("is_default"), # Add back the problematic constraint
sa.UniqueConstraint("name"),
sa.UniqueConstraint("permalink"),
)
# Copy data (this may fail if multiple FALSE values exist)
op.execute("INSERT INTO project_old SELECT * FROM project")
# Drop the current table and rename
op.drop_table("project")
op.rename_table("project_old", "project")
# Recreate indexes
with op.batch_alter_table("project", schema=None) as batch_op:
batch_op.create_index("ix_project_created_at", ["created_at"], unique=False)
batch_op.create_index("ix_project_name", ["name"], unique=True)
batch_op.create_index("ix_project_path", ["path"], unique=False)
batch_op.create_index("ix_project_permalink", ["permalink"], unique=True)
batch_op.create_index("ix_project_updated_at", ["updated_at"], unique=False)
```
--------------------------------------------------------------------------------
/tests/importers/test_conversation_indexing.py:
--------------------------------------------------------------------------------
```python
"""Test that imported conversations are properly indexed with correct permalink and title.
This test verifies issue #452 - Imported conversations not indexed correctly.
"""
import pytest
from basic_memory.config import ProjectConfig
from basic_memory.importers.claude_conversations_importer import ClaudeConversationsImporter
from basic_memory.markdown import EntityParser
from basic_memory.markdown.markdown_processor import MarkdownProcessor
from basic_memory.repository import EntityRepository
from basic_memory.services import EntityService
from basic_memory.services.file_service import FileService
from basic_memory.services.search_service import SearchService
from basic_memory.schemas.search import SearchQuery
from basic_memory.sync.sync_service import SyncService
@pytest.mark.asyncio
async def test_imported_conversations_have_correct_permalink_and_title(
project_config: ProjectConfig,
sync_service: SyncService,
entity_service: EntityService,
entity_repository: EntityRepository,
search_service: SearchService,
):
"""Test that imported conversations have correct permalink and title after sync.
Issue #452: Imported conversations show permalink: null in search results
and title shows as filename instead of frontmatter title.
"""
base_path = project_config.home
# Create parser, processor, and file_service for importer
parser = EntityParser(base_path)
processor = MarkdownProcessor(parser)
file_service = FileService(base_path, processor)
# Create importer
importer = ClaudeConversationsImporter(base_path, processor, file_service)
# Sample conversation data
conversations = [
{
"uuid": "test-123",
"name": "My Test Conversation Title",
"created_at": "2025-01-15T10:00:00Z",
"updated_at": "2025-01-15T11:00:00Z",
"chat_messages": [
{
"uuid": "msg-1",
"sender": "human",
"created_at": "2025-01-15T10:00:00Z",
"text": "Hello world",
"content": [{"type": "text", "text": "Hello world"}],
"attachments": [],
},
{
"uuid": "msg-2",
"sender": "assistant",
"created_at": "2025-01-15T10:01:00Z",
"text": "Hello!",
"content": [{"type": "text", "text": "Hello!"}],
"attachments": [],
},
],
}
]
# Run import
result = await importer.import_data(conversations, "conversations")
assert result.success, f"Import failed: {result}"
assert result.conversations == 1
# Verify the file was created with correct content
conv_path = base_path / "conversations" / "20250115-My_Test_Conversation_Title.md"
assert conv_path.exists(), f"Expected file at {conv_path}"
content = conv_path.read_text()
assert "---" in content, "File should have frontmatter markers"
assert "title: My Test Conversation Title" in content, "File should have title in frontmatter"
assert "permalink: conversations/20250115-My_Test_Conversation_Title" in content, (
"File should have permalink in frontmatter"
)
# Run sync to index the imported file
await sync_service.sync(base_path, project_config.name)
# Verify entity in database
entities = await entity_repository.find_all()
assert len(entities) == 1, f"Expected 1 entity, got {len(entities)}"
entity = entities[0]
# These are the key assertions for issue #452
assert entity.title == "My Test Conversation Title", (
f"Title should be from frontmatter, got: {entity.title}"
)
assert entity.permalink == "conversations/20250115-My_Test_Conversation_Title", (
f"Permalink should be from frontmatter, got: {entity.permalink}"
)
# Verify search index also has correct data
results = await search_service.search(SearchQuery(text="Test Conversation"))
assert len(results) >= 1, "Should find the conversation in search"
# Find our entity in search results
search_result = next((r for r in results if r.entity_id == entity.id), None)
assert search_result is not None, "Entity should be in search results"
assert search_result.title == "My Test Conversation Title", (
f"Search title should be from frontmatter, got: {search_result.title}"
)
assert search_result.permalink == "conversations/20250115-My_Test_Conversation_Title", (
f"Search permalink should not be null, got: {search_result.permalink}"
)
```
--------------------------------------------------------------------------------
/pyproject.toml:
--------------------------------------------------------------------------------
```toml
[project]
name = "basic-memory"
dynamic = ["version"]
description = "Local-first knowledge management combining Zettelkasten with knowledge graphs"
readme = "README.md"
requires-python = ">=3.12"
license = { text = "AGPL-3.0-or-later" }
authors = [
{ name = "Basic Machines", email = "[email protected]" }
]
dependencies = [
"sqlalchemy>=2.0.0",
"pyyaml>=6.0.1",
"typer>=0.9.0",
"aiosqlite>=0.20.0",
"greenlet>=3.1.1",
"pydantic[email,timezone]>=2.12.0",
"mcp>=1.23.1",
"pydantic-settings>=2.6.1",
"loguru>=0.7.3",
"pyright>=1.1.390",
"markdown-it-py>=3.0.0",
"python-frontmatter>=1.1.0",
"rich>=13.9.4",
"unidecode>=1.3.8",
"dateparser>=1.2.0",
"watchfiles>=1.0.4",
"fastapi[standard]>=0.115.8",
"alembic>=1.14.1",
"pillow>=11.1.0",
"pybars3>=0.9.7",
"fastmcp==2.12.3", # Pinned - 2.14.x breaks MCP tools visibility (issue #463)
"pyjwt>=2.10.1",
"python-dotenv>=1.1.0",
"pytest-aio>=1.9.0",
"aiofiles>=24.1.0", # Optional observability (disabled by default via config)
"asyncpg>=0.30.0",
"nest-asyncio>=1.6.0", # For Alembic migrations with Postgres
"pytest-asyncio>=1.2.0",
"psycopg==3.3.1",
"mdformat>=0.7.22",
"mdformat-gfm>=0.3.7",
"mdformat-frontmatter>=2.0.8",
"openpanel>=0.0.1", # Anonymous usage telemetry (Homebrew-style opt-out)
"sniffio>=1.3.1",
"anyio>=4.10.0",
"httpx>=0.28.0",
]
[project.urls]
Homepage = "https://github.com/basicmachines-co/basic-memory"
Repository = "https://github.com/basicmachines-co/basic-memory"
Documentation = "https://github.com/basicmachines-co/basic-memory#readme"
[project.scripts]
basic-memory = "basic_memory.cli.main:app"
bm = "basic_memory.cli.main:app"
[build-system]
requires = ["hatchling", "uv-dynamic-versioning>=0.7.0"]
build-backend = "hatchling.build"
[tool.pytest.ini_options]
pythonpath = ["src", "tests"]
addopts = "--cov=basic_memory --cov-report term-missing"
testpaths = ["tests", "test-int"]
asyncio_mode = "strict"
asyncio_default_fixture_loop_scope = "function"
markers = [
"benchmark: Performance benchmark tests (deselect with '-m \"not benchmark\"')",
"slow: Slow-running tests (deselect with '-m \"not slow\"')",
"postgres: Tests that run against Postgres backend (deselect with '-m \"not postgres\"')",
"windows: Windows-specific tests (deselect with '-m \"not windows\"')",
]
[tool.ruff]
line-length = 100
target-version = "py312"
[dependency-groups]
dev = [
"gevent>=24.11.1",
"icecream>=2.1.3",
"pytest>=8.3.4",
"pytest-cov>=4.1.0",
"pytest-mock>=3.12.0",
"pytest-asyncio>=0.24.0",
"pytest-xdist>=3.0.0",
"ruff>=0.1.6",
"freezegun>=1.5.5",
"testcontainers[postgres]>=4.0.0",
"psycopg>=3.2.0",
"pyright>=1.1.408",
]
[tool.hatch.version]
source = "uv-dynamic-versioning"
[tool.uv-dynamic-versioning]
vcs = "git"
style = "pep440"
bump = true
fallback-version = "0.0.0"
[tool.pyright]
include = ["src/"]
exclude = ["**/__pycache__"]
ignore = ["test/"]
defineConstant = { DEBUG = true }
reportMissingImports = "error"
reportMissingTypeStubs = false
pythonVersion = "3.12"
[tool.coverage.run]
concurrency = ["thread", "gevent"]
parallel = true
source = ["basic_memory"]
[tool.coverage.report]
exclude_lines = [
"pragma: no cover",
"def __repr__",
"if self.debug:",
"if settings.DEBUG",
"raise AssertionError",
"raise NotImplementedError",
"if 0:",
"if __name__ == .__main__.:",
"class .*\\bProtocol\\):",
"@(abc\\.)?abstractmethod",
]
# Exclude specific modules that are difficult to test comprehensively
omit = [
"*/external_auth_provider.py", # External HTTP calls to OAuth providers
"*/supabase_auth_provider.py", # External HTTP calls to Supabase APIs
"*/watch_service.py", # File system watching - complex integration testing
"*/background_sync.py", # Background processes
"*/cli/**", # CLI is an interactive wrapper; core logic is covered via API/MCP/service tests
"*/db.py", # Backend/runtime-dependent (sqlite/postgres/windows tuning); validated via integration tests
"*/services/initialization.py", # Startup orchestration + background tasks (watchers); exercised indirectly in entrypoints
"*/sync/sync_service.py", # Heavy filesystem/db integration; covered by integration suite, not enforced in unit coverage
"*/telemetry.py", # External analytics; tested lightly, excluded from strict coverage target
"*/services/migration_service.py", # Complex migration scenarios
]
[tool.logfire]
ignore_no_config = true
```
--------------------------------------------------------------------------------
/src/basic_memory/mcp/tools/build_context.py:
--------------------------------------------------------------------------------
```python
"""Build context tool for Basic Memory MCP server."""
from typing import Optional
from loguru import logger
from fastmcp import Context
from basic_memory.mcp.async_client import get_client
from basic_memory.mcp.project_context import get_active_project
from basic_memory.mcp.server import mcp
from basic_memory.telemetry import track_mcp_tool
from basic_memory.schemas.base import TimeFrame
from basic_memory.schemas.memory import (
GraphContext,
MemoryUrl,
memory_url_path,
)
@mcp.tool(
description="""Build context from a memory:// URI to continue conversations naturally.
Use this to follow up on previous discussions or explore related topics.
Memory URL Format:
- Use paths like "folder/note" or "memory://folder/note"
- Pattern matching: "folder/*" matches all notes in folder
- Valid characters: letters, numbers, hyphens, underscores, forward slashes
- Avoid: double slashes (//), angle brackets (<>), quotes, pipes (|)
- Examples: "specs/search", "projects/basic-memory", "notes/*"
Timeframes support natural language like:
- "2 days ago", "last week", "today", "3 months ago"
- Or standard formats like "7d", "24h"
""",
)
async def build_context(
url: MemoryUrl,
project: Optional[str] = None,
depth: str | int | None = 1,
timeframe: Optional[TimeFrame] = "7d",
page: int = 1,
page_size: int = 10,
max_related: int = 10,
context: Context | None = None,
) -> GraphContext:
"""Get context needed to continue a discussion within a specific project.
This tool enables natural continuation of discussions by loading relevant context
from memory:// URIs. It uses pattern matching to find relevant content and builds
a rich context graph of related information.
Project Resolution:
Server resolves projects in this order: Single Project Mode → project parameter → default project.
If project unknown, use list_memory_projects() or recent_activity() first.
Args:
project: Project name to build context from. Optional - server will resolve using hierarchy.
If unknown, use list_memory_projects() to discover available projects.
url: memory:// URI pointing to discussion content (e.g. memory://specs/search)
depth: How many relation hops to traverse (1-3 recommended for performance)
timeframe: How far back to look. Supports natural language like "2 days ago", "last week"
page: Page number of results to return (default: 1)
page_size: Number of results to return per page (default: 10)
max_related: Maximum number of related results to return (default: 10)
context: Optional FastMCP context for performance caching.
Returns:
GraphContext containing:
- primary_results: Content matching the memory:// URI
- related_results: Connected content via relations
- metadata: Context building details
Examples:
# Continue a specific discussion
build_context("my-project", "memory://specs/search")
# Get deeper context about a component
build_context("work-docs", "memory://components/memory-service", depth=2)
# Look at recent changes to a specification
build_context("research", "memory://specs/document-format", timeframe="today")
# Research the history of a feature
build_context("dev-notes", "memory://features/knowledge-graph", timeframe="3 months ago")
Raises:
ToolError: If project doesn't exist or depth parameter is invalid
"""
track_mcp_tool("build_context")
logger.info(f"Building context from {url} in project {project}")
# Convert string depth to integer if needed
if isinstance(depth, str):
try:
depth = int(depth)
except ValueError:
from mcp.server.fastmcp.exceptions import ToolError
raise ToolError(f"Invalid depth parameter: '{depth}' is not a valid integer")
# URL is already validated and normalized by MemoryUrl type annotation
async with get_client() as client:
# Get the active project using the new stateless approach
active_project = await get_active_project(client, project, context)
# Import here to avoid circular import
from basic_memory.mcp.clients import MemoryClient
# Use typed MemoryClient for API calls
memory_client = MemoryClient(client, active_project.external_id)
return await memory_client.build_context(
memory_url_path(url),
depth=depth or 1,
timeframe=timeframe,
page=page,
page_size=page_size,
max_related=max_related,
)
```
--------------------------------------------------------------------------------
/tests/cli/test_auth_cli_auth.py:
--------------------------------------------------------------------------------
```python
import json
import os
import stat
import time
from contextlib import asynccontextmanager
import httpx
import pytest
from basic_memory.cli.auth import CLIAuth
def _make_mock_transport(handler):
return httpx.MockTransport(handler)
@pytest.mark.asyncio
async def test_cli_auth_request_device_authorization_uses_injected_http_client(
tmp_path, monkeypatch
):
"""Integration-style test: exercise the request flow with real httpx plumbing (MockTransport)."""
monkeypatch.setenv("HOME", str(tmp_path))
monkeypatch.setenv("BASIC_MEMORY_ENV", "test")
async def handler(request: httpx.Request) -> httpx.Response:
assert request.url.path.endswith("/oauth2/device_authorization")
body = (await request.aread()).decode()
# sanity: client_id should be in form data
assert "client_id=test-client-id" in body
return httpx.Response(
200,
json={
"device_code": "devcode",
"user_code": "usercode",
"verification_uri": "https://example.test/verify",
"interval": 1,
},
)
transport = _make_mock_transport(handler)
@asynccontextmanager
async def client_factory():
async with httpx.AsyncClient(transport=transport) as client:
yield client
auth = CLIAuth(
client_id="test-client-id",
authkit_domain="https://example.test",
http_client_factory=client_factory,
)
result = await auth.request_device_authorization()
assert result is not None
assert result["device_code"] == "devcode"
def test_cli_auth_generate_pkce_pair_format(tmp_path, monkeypatch):
monkeypatch.setenv("HOME", str(tmp_path))
monkeypatch.setenv("BASIC_MEMORY_ENV", "test")
auth = CLIAuth(client_id="cid", authkit_domain="https://example.test")
verifier, challenge = auth.generate_pkce_pair()
# PKCE verifier/challenge should be URL-safe base64 without padding.
assert verifier
assert challenge
assert "=" not in verifier
assert "=" not in challenge
# code verifier length should be in recommended bounds (rough sanity).
assert 43 <= len(verifier) <= 128
@pytest.mark.asyncio
async def test_cli_auth_save_load_and_get_valid_token_roundtrip(tmp_path, monkeypatch):
monkeypatch.setenv("HOME", str(tmp_path))
monkeypatch.setenv("BASIC_MEMORY_ENV", "test")
auth = CLIAuth(client_id="cid", authkit_domain="https://example.test")
tokens = {
"access_token": "at",
"refresh_token": "rt",
"expires_in": 3600,
"token_type": "Bearer",
}
auth.save_tokens(tokens)
loaded = auth.load_tokens()
assert loaded is not None
assert loaded["access_token"] == "at"
assert loaded["refresh_token"] == "rt"
assert auth.is_token_valid(loaded) is True
valid = await auth.get_valid_token()
assert valid == "at"
# Permission should be 600 on POSIX systems
if os.name != "nt":
mode = auth.token_file.stat().st_mode
assert stat.S_IMODE(mode) == 0o600
@pytest.mark.asyncio
async def test_cli_auth_refresh_flow_uses_injected_http_client(tmp_path, monkeypatch):
monkeypatch.setenv("HOME", str(tmp_path))
monkeypatch.setenv("BASIC_MEMORY_ENV", "test")
async def handler(request: httpx.Request) -> httpx.Response:
if request.url.path.endswith("/oauth2/token"):
body = (await request.aread()).decode()
assert "grant_type=refresh_token" in body
return httpx.Response(
200,
json={
"access_token": "new-at",
"refresh_token": "new-rt",
"expires_in": 3600,
"token_type": "Bearer",
},
)
raise AssertionError(f"Unexpected request: {request.method} {request.url}")
transport = _make_mock_transport(handler)
@asynccontextmanager
async def client_factory():
async with httpx.AsyncClient(transport=transport) as client:
yield client
auth = CLIAuth(
client_id="cid",
authkit_domain="https://example.test",
http_client_factory=client_factory,
)
# Write an expired token file manually (so we control expires_at precisely).
auth.token_file.parent.mkdir(parents=True, exist_ok=True)
auth.token_file.write_text(
json.dumps(
{
"access_token": "old-at",
"refresh_token": "old-rt",
"expires_at": int(time.time()) - 10,
"token_type": "Bearer",
}
),
encoding="utf-8",
)
token = await auth.get_valid_token()
assert token == "new-at"
```
--------------------------------------------------------------------------------
/src/basic_memory/alembic/versions/5fe1ab1ccebe_add_projects_table.py:
--------------------------------------------------------------------------------
```python
"""add projects table
Revision ID: 5fe1ab1ccebe
Revises: cc7172b46608
Create Date: 2025-05-14 09:05:18.214357
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = "5fe1ab1ccebe"
down_revision: Union[str, None] = "cc7172b46608"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
# SQLite FTS5 virtual table handling is SQLite-specific
# For Postgres, search_index is a regular table managed by ORM
connection = op.get_bind()
is_sqlite = connection.dialect.name == "sqlite"
op.create_table(
"project",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("name", sa.String(), nullable=False),
sa.Column("description", sa.Text(), nullable=True),
sa.Column("permalink", sa.String(), nullable=False),
sa.Column("path", sa.String(), nullable=False),
sa.Column("is_active", sa.Boolean(), nullable=False),
sa.Column("is_default", sa.Boolean(), nullable=True),
sa.Column("created_at", sa.DateTime(), nullable=False),
sa.Column("updated_at", sa.DateTime(), nullable=False),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint("is_default"),
sa.UniqueConstraint("name"),
sa.UniqueConstraint("permalink"),
if_not_exists=True,
)
with op.batch_alter_table("project", schema=None) as batch_op:
batch_op.create_index(
"ix_project_created_at", ["created_at"], unique=False, if_not_exists=True
)
batch_op.create_index("ix_project_name", ["name"], unique=True, if_not_exists=True)
batch_op.create_index("ix_project_path", ["path"], unique=False, if_not_exists=True)
batch_op.create_index(
"ix_project_permalink", ["permalink"], unique=True, if_not_exists=True
)
batch_op.create_index(
"ix_project_updated_at", ["updated_at"], unique=False, if_not_exists=True
)
with op.batch_alter_table("entity", schema=None) as batch_op:
batch_op.add_column(sa.Column("project_id", sa.Integer(), nullable=False))
batch_op.drop_index(
"uix_entity_permalink",
sqlite_where=sa.text("content_type = 'text/markdown' AND permalink IS NOT NULL")
if is_sqlite
else None,
)
batch_op.drop_index("ix_entity_file_path")
batch_op.create_index(batch_op.f("ix_entity_file_path"), ["file_path"], unique=False)
batch_op.create_index("ix_entity_project_id", ["project_id"], unique=False)
batch_op.create_index(
"uix_entity_file_path_project", ["file_path", "project_id"], unique=True
)
batch_op.create_index(
"uix_entity_permalink_project",
["permalink", "project_id"],
unique=True,
sqlite_where=sa.text("content_type = 'text/markdown' AND permalink IS NOT NULL")
if is_sqlite
else None,
)
batch_op.create_foreign_key("fk_entity_project_id", "project", ["project_id"], ["id"])
# drop the search index table. it will be recreated
# Only drop for SQLite - Postgres uses regular table managed by ORM
if is_sqlite:
op.drop_table("search_index")
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("entity", schema=None) as batch_op:
batch_op.drop_constraint("fk_entity_project_id", type_="foreignkey")
batch_op.drop_index(
"uix_entity_permalink_project",
sqlite_where=sa.text("content_type = 'text/markdown' AND permalink IS NOT NULL"),
)
batch_op.drop_index("uix_entity_file_path_project")
batch_op.drop_index("ix_entity_project_id")
batch_op.drop_index(batch_op.f("ix_entity_file_path"))
batch_op.create_index("ix_entity_file_path", ["file_path"], unique=1)
batch_op.create_index(
"uix_entity_permalink",
["permalink"],
unique=1,
sqlite_where=sa.text("content_type = 'text/markdown' AND permalink IS NOT NULL"),
)
batch_op.drop_column("project_id")
with op.batch_alter_table("project", schema=None) as batch_op:
batch_op.drop_index("ix_project_updated_at")
batch_op.drop_index("ix_project_permalink")
batch_op.drop_index("ix_project_path")
batch_op.drop_index("ix_project_name")
batch_op.drop_index("ix_project_created_at")
op.drop_table("project")
# ### end Alembic commands ###
```
--------------------------------------------------------------------------------
/src/basic_memory/schemas/v2/entity.py:
--------------------------------------------------------------------------------
```python
"""V2 entity and project schemas with ID-first design."""
from datetime import datetime
from typing import Dict, List, Literal, Optional
from pydantic import BaseModel, Field, ConfigDict
from basic_memory.schemas.response import ObservationResponse, RelationResponse
class EntityResolveRequest(BaseModel):
"""Request to resolve a string identifier to an entity ID.
Supports resolution of:
- Permalinks (e.g., "specs/search")
- Titles (e.g., "Search Specification")
- File paths (e.g., "specs/search.md")
"""
identifier: str = Field(
...,
description="Entity identifier to resolve (permalink, title, or file path)",
min_length=1,
max_length=500,
)
class EntityResolveResponse(BaseModel):
"""Response from identifier resolution.
Returns the entity ID and associated metadata for the resolved entity.
"""
external_id: str = Field(..., description="External UUID (primary API identifier)")
entity_id: int = Field(..., description="Numeric entity ID (internal identifier)")
permalink: Optional[str] = Field(None, description="Entity permalink")
file_path: str = Field(..., description="Relative file path")
title: str = Field(..., description="Entity title")
resolution_method: Literal["external_id", "permalink", "title", "path", "search"] = Field(
..., description="How the identifier was resolved"
)
class MoveEntityRequestV2(BaseModel):
"""V2 request schema for moving an entity to a new file location.
In V2 API, the entity ID is provided in the URL path, so this request
only needs the destination path.
"""
destination_path: str = Field(
...,
description="New file path for the entity (relative to project root)",
min_length=1,
max_length=500,
)
class EntityResponseV2(BaseModel):
"""V2 entity response with external_id as the primary API identifier.
This response format emphasizes the external_id (UUID) as the primary API identifier,
with the numeric id maintained for internal reference.
"""
# External UUID first - this is the primary API identifier in v2
external_id: str = Field(..., description="External UUID (primary API identifier)")
# Internal numeric ID
id: int = Field(..., description="Numeric entity ID (internal identifier)")
# Core entity fields
title: str = Field(..., description="Entity title")
entity_type: str = Field(..., description="Entity type")
content_type: str = Field(default="text/markdown", description="Content MIME type")
# Secondary identifiers (for compatibility and convenience)
permalink: Optional[str] = Field(None, description="Entity permalink (may change)")
file_path: str = Field(..., description="Relative file path (may change)")
# Content and metadata
content: Optional[str] = Field(None, description="Entity content")
entity_metadata: Optional[Dict] = Field(None, description="Entity metadata")
# Relationships
observations: List[ObservationResponse] = Field(
default_factory=list, description="Entity observations"
)
relations: List[RelationResponse] = Field(default_factory=list, description="Entity relations")
# Timestamps
created_at: datetime = Field(..., description="Creation timestamp")
updated_at: datetime = Field(..., description="Last update timestamp")
# V2-specific metadata
api_version: Literal["v2"] = Field(
default="v2", description="API version (always 'v2' for this response)"
)
model_config = ConfigDict(from_attributes=True)
class ProjectResolveRequest(BaseModel):
"""Request to resolve a project identifier to a project ID.
Supports resolution of:
- Project names (e.g., "my-project")
- Permalinks (e.g., "my-project")
"""
identifier: str = Field(
...,
description="Project identifier to resolve (name or permalink)",
min_length=1,
max_length=255,
)
class ProjectResolveResponse(BaseModel):
"""Response from project identifier resolution.
Returns the project ID and associated metadata for the resolved project.
"""
external_id: str = Field(..., description="External UUID (primary API identifier)")
project_id: int = Field(..., description="Numeric project ID (internal identifier)")
name: str = Field(..., description="Project name")
permalink: str = Field(..., description="Project permalink")
path: str = Field(..., description="Project file path")
is_active: bool = Field(..., description="Whether the project is active")
is_default: bool = Field(..., description="Whether the project is the default")
resolution_method: Literal["external_id", "name", "permalink"] = Field(
..., description="How the identifier was resolved"
)
```
--------------------------------------------------------------------------------
/src/basic_memory/api/v2/routers/memory_router.py:
--------------------------------------------------------------------------------
```python
"""V2 routes for memory:// URI operations.
This router uses external_id UUIDs for stable, API-friendly routing.
V1 uses string-based project names which are less efficient and less stable.
"""
from typing import Annotated, Optional
from fastapi import APIRouter, Query, Path
from loguru import logger
from basic_memory.deps import ContextServiceV2ExternalDep, EntityRepositoryV2ExternalDep
from basic_memory.schemas.base import TimeFrame, parse_timeframe
from basic_memory.schemas.memory import (
GraphContext,
normalize_memory_url,
)
from basic_memory.schemas.search import SearchItemType
from basic_memory.api.routers.utils import to_graph_context
# Note: No prefix here - it's added during registration as /v2/{project_id}/memory
router = APIRouter(tags=["memory"])
@router.get("/memory/recent", response_model=GraphContext)
async def recent(
context_service: ContextServiceV2ExternalDep,
entity_repository: EntityRepositoryV2ExternalDep,
project_id: str = Path(..., description="Project external UUID"),
type: Annotated[list[SearchItemType] | None, Query()] = None,
depth: int = 1,
timeframe: TimeFrame = "7d",
page: int = 1,
page_size: int = 10,
max_related: int = 10,
) -> GraphContext:
"""Get recent activity context for a project.
Args:
project_id: Project external UUID from URL path
context_service: Context service scoped to project
entity_repository: Entity repository scoped to project
type: Types of items to include (entities, relations, observations)
depth: How many levels of related entities to include
timeframe: Time window for recent activity (e.g., "7d", "1 week")
page: Page number for pagination
page_size: Number of items per page
max_related: Maximum related entities to include per item
Returns:
GraphContext with recent activity and related entities
"""
# return all types by default
types = (
[SearchItemType.ENTITY, SearchItemType.RELATION, SearchItemType.OBSERVATION]
if not type
else type
)
logger.debug(
f"V2 Getting recent context for project {project_id}: `{types}` depth: `{depth}` timeframe: `{timeframe}` page: `{page}` page_size: `{page_size}` max_related: `{max_related}`"
)
# Parse timeframe
since = parse_timeframe(timeframe)
limit = page_size
offset = (page - 1) * page_size
# Build context
context = await context_service.build_context(
types=types, depth=depth, since=since, limit=limit, offset=offset, max_related=max_related
)
recent_context = await to_graph_context(
context, entity_repository=entity_repository, page=page, page_size=page_size
)
logger.debug(f"V2 Recent context: {recent_context.model_dump_json()}")
return recent_context
# get_memory_context needs to be declared last so other paths can match
@router.get("/memory/{uri:path}", response_model=GraphContext)
async def get_memory_context(
context_service: ContextServiceV2ExternalDep,
entity_repository: EntityRepositoryV2ExternalDep,
uri: str,
project_id: str = Path(..., description="Project external UUID"),
depth: int = 1,
timeframe: Optional[TimeFrame] = None,
page: int = 1,
page_size: int = 10,
max_related: int = 10,
) -> GraphContext:
"""Get rich context from memory:// URI.
V2 supports both legacy path-based URIs and new ID-based URIs:
- Legacy: memory://path/to/note
- ID-based: memory://id/123 or memory://123
Args:
project_id: Project external UUID from URL path
context_service: Context service scoped to project
entity_repository: Entity repository scoped to project
uri: Memory URI path (e.g., "id/123", "123", or "path/to/note")
depth: How many levels of related entities to include
timeframe: Optional time window for filtering related content
page: Page number for pagination
page_size: Number of items per page
max_related: Maximum related entities to include
Returns:
GraphContext with the entity and its related context
"""
logger.debug(
f"V2 Getting context for project {project_id}, URI: `{uri}` depth: `{depth}` timeframe: `{timeframe}` page: `{page}` page_size: `{page_size}` max_related: `{max_related}`"
)
memory_url = normalize_memory_url(uri)
# Parse timeframe
since = parse_timeframe(timeframe) if timeframe else None
limit = page_size
offset = (page - 1) * page_size
# Build context
context = await context_service.build_context(
memory_url, depth=depth, since=since, limit=limit, offset=offset, max_related=max_related
)
return await to_graph_context(
context, entity_repository=entity_repository, page=page, page_size=page_size
)
```
--------------------------------------------------------------------------------
/src/basic_memory/cli/commands/cloud/api_client.py:
--------------------------------------------------------------------------------
```python
"""Cloud API client utilities."""
from collections.abc import AsyncIterator
from typing import Optional
from contextlib import asynccontextmanager
from typing import AsyncContextManager, Callable
import httpx
import typer
from rich.console import Console
from basic_memory.cli.auth import CLIAuth
from basic_memory.config import ConfigManager
console = Console()
HttpClientFactory = Callable[[], AsyncContextManager[httpx.AsyncClient]]
class CloudAPIError(Exception):
"""Exception raised for cloud API errors."""
def __init__(
self, message: str, status_code: Optional[int] = None, detail: Optional[dict] = None
):
super().__init__(message)
self.status_code = status_code
self.detail = detail or {}
class SubscriptionRequiredError(CloudAPIError):
"""Exception raised when user needs an active subscription."""
def __init__(self, message: str, subscribe_url: str):
super().__init__(message, status_code=403, detail={"error": "subscription_required"})
self.subscribe_url = subscribe_url
def get_cloud_config() -> tuple[str, str, str]:
"""Get cloud OAuth configuration from config."""
config_manager = ConfigManager()
config = config_manager.config
return config.cloud_client_id, config.cloud_domain, config.cloud_host
async def get_authenticated_headers(auth: CLIAuth | None = None) -> dict[str, str]:
"""
Get authentication headers with JWT token.
handles jwt refresh if needed.
"""
client_id, domain, _ = get_cloud_config()
auth_obj = auth or CLIAuth(client_id=client_id, authkit_domain=domain)
token = await auth_obj.get_valid_token()
if not token:
console.print("[red]Not authenticated. Please run 'basic-memory cloud login' first.[/red]")
raise typer.Exit(1)
return {"Authorization": f"Bearer {token}"}
@asynccontextmanager
async def _default_http_client(timeout: float) -> AsyncIterator[httpx.AsyncClient]:
async with httpx.AsyncClient(timeout=timeout) as client:
yield client
async def make_api_request(
method: str,
url: str,
headers: Optional[dict] = None,
json_data: Optional[dict] = None,
timeout: float = 30.0,
*,
auth: CLIAuth | None = None,
http_client_factory: HttpClientFactory | None = None,
) -> httpx.Response:
"""Make an API request to the cloud service."""
headers = headers or {}
auth_headers = await get_authenticated_headers(auth=auth)
headers.update(auth_headers)
# Add debug headers to help with compression issues
headers.setdefault("Accept-Encoding", "identity") # Disable compression for debugging
client_factory = http_client_factory or (lambda: _default_http_client(timeout))
async with client_factory() as client:
try:
response = await client.request(method=method, url=url, headers=headers, json=json_data)
response.raise_for_status()
return response
except httpx.HTTPError as e:
# Check if this is a response error with response details
if hasattr(e, "response") and e.response is not None: # pyright: ignore [reportAttributeAccessIssue]
response = e.response # type: ignore
# Try to parse error detail from response
error_detail = None
try:
error_detail = response.json()
except Exception:
# If JSON parsing fails, we'll handle it as a generic error
pass
# Check for subscription_required error (403)
if response.status_code == 403 and isinstance(error_detail, dict):
# Handle both FastAPI HTTPException format (nested under "detail")
# and direct format
detail_obj = error_detail.get("detail", error_detail)
if (
isinstance(detail_obj, dict)
and detail_obj.get("error") == "subscription_required"
):
message = detail_obj.get("message", "Active subscription required")
subscribe_url = detail_obj.get(
"subscribe_url", "https://basicmemory.com/subscribe"
)
raise SubscriptionRequiredError(
message=message, subscribe_url=subscribe_url
) from e
# Raise generic CloudAPIError with status code and detail
raise CloudAPIError(
f"API request failed: {e}",
status_code=response.status_code,
detail=error_detail if isinstance(error_detail, dict) else {},
) from e
raise CloudAPIError(f"API request failed: {e}") from e
```
--------------------------------------------------------------------------------
/src/basic_memory/repository/project_repository.py:
--------------------------------------------------------------------------------
```python
"""Repository for managing projects in Basic Memory."""
from pathlib import Path
from typing import Optional, Sequence, Union
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from basic_memory import db
from basic_memory.models.project import Project
from basic_memory.repository.repository import Repository
class ProjectRepository(Repository[Project]):
"""Repository for Project model.
Projects represent collections of knowledge entities grouped together.
Each entity, observation, and relation belongs to a specific project.
"""
def __init__(self, session_maker: async_sessionmaker[AsyncSession]):
"""Initialize with session maker."""
super().__init__(session_maker, Project)
async def get_by_name(self, name: str) -> Optional[Project]:
"""Get project by name (exact match).
Args:
name: Unique name of the project
"""
query = self.select().where(Project.name == name)
return await self.find_one(query)
async def get_by_name_case_insensitive(self, name: str) -> Optional[Project]:
"""Get project by name (case-insensitive match).
Args:
name: Project name (case-insensitive)
Returns:
Project if found, None otherwise
"""
query = self.select().where(Project.name.ilike(name))
return await self.find_one(query)
async def get_by_permalink(self, permalink: str) -> Optional[Project]:
"""Get project by permalink.
Args:
permalink: URL-friendly identifier for the project
"""
query = self.select().where(Project.permalink == permalink)
return await self.find_one(query)
async def get_by_path(self, path: Union[Path, str]) -> Optional[Project]:
"""Get project by filesystem path.
Args:
path: Path to the project directory (will be converted to string internally)
"""
query = self.select().where(Project.path == Path(path).as_posix())
return await self.find_one(query)
async def get_by_id(self, project_id: int) -> Optional[Project]:
"""Get project by numeric ID.
Args:
project_id: Numeric project ID
Returns:
Project if found, None otherwise
"""
async with db.scoped_session(self.session_maker) as session:
return await self.select_by_id(session, project_id)
async def get_by_external_id(self, external_id: str) -> Optional[Project]:
"""Get project by external UUID.
Args:
external_id: External UUID identifier
Returns:
Project if found, None otherwise
"""
query = self.select().where(Project.external_id == external_id)
return await self.find_one(query)
async def get_default_project(self) -> Optional[Project]:
"""Get the default project (the one marked as is_default=True)."""
query = self.select().where(Project.is_default.is_not(None))
return await self.find_one(query)
async def get_active_projects(self) -> Sequence[Project]:
"""Get all active projects."""
query = self.select().where(Project.is_active == True) # noqa: E712
result = await self.execute_query(query)
return list(result.scalars().all())
async def set_as_default(self, project_id: int) -> Optional[Project]:
"""Set a project as the default and unset previous default.
Args:
project_id: ID of the project to set as default
Returns:
The updated project if found, None otherwise
"""
async with db.scoped_session(self.session_maker) as session:
# First, clear the default flag for all projects using direct SQL
await session.execute(
text("UPDATE project SET is_default = NULL WHERE is_default IS NOT NULL")
)
await session.flush()
# Set the new default project
target_project = await self.select_by_id(session, project_id)
if target_project:
target_project.is_default = True
await session.flush()
return target_project
return None # pragma: no cover
async def update_path(self, project_id: int, new_path: str) -> Optional[Project]:
"""Update project path.
Args:
project_id: ID of the project to update
new_path: New filesystem path for the project
Returns:
The updated project if found, None otherwise
"""
async with db.scoped_session(self.session_maker) as session:
project = await self.select_by_id(session, project_id)
if project:
project.path = new_path
await session.flush()
return project
return None
```
--------------------------------------------------------------------------------
/src/basic_memory/sync/coordinator.py:
--------------------------------------------------------------------------------
```python
"""SyncCoordinator - centralized sync/watch lifecycle management.
This module provides a single coordinator that manages the lifecycle of
file synchronization and watch services across all entry points (API, MCP, CLI).
The coordinator handles:
- Starting/stopping watch service
- Scheduling background sync
- Reporting status
- Clean shutdown behavior
"""
import asyncio
from dataclasses import dataclass, field
from enum import Enum, auto
from typing import Optional
from loguru import logger
from basic_memory.config import BasicMemoryConfig
class SyncStatus(Enum):
"""Status of the sync coordinator."""
NOT_STARTED = auto()
STARTING = auto()
RUNNING = auto()
STOPPING = auto()
STOPPED = auto()
ERROR = auto()
@dataclass
class SyncCoordinator:
"""Centralized coordinator for sync/watch lifecycle.
Manages the lifecycle of file synchronization services, providing:
- Unified start/stop interface
- Status tracking
- Clean shutdown with proper task cancellation
Args:
config: BasicMemoryConfig with sync settings
should_sync: Whether sync should be enabled (from container decision)
skip_reason: Human-readable reason if sync is skipped
Usage:
coordinator = SyncCoordinator(config=config, should_sync=True)
await coordinator.start()
# ... application runs ...
await coordinator.stop()
"""
config: BasicMemoryConfig
should_sync: bool = True
skip_reason: Optional[str] = None
# Internal state (not constructor args)
_status: SyncStatus = field(default=SyncStatus.NOT_STARTED, init=False)
_sync_task: Optional[asyncio.Task] = field(default=None, init=False)
@property
def status(self) -> SyncStatus:
"""Current status of the coordinator."""
return self._status
@property
def is_running(self) -> bool:
"""Whether sync is currently running."""
return self._status == SyncStatus.RUNNING
async def start(self) -> None:
"""Start the sync/watch service if enabled.
This is a non-blocking call that starts the sync task in the background.
Use stop() to cleanly shut down.
"""
if not self.should_sync:
if self.skip_reason:
logger.info(f"{self.skip_reason} - skipping local file sync")
self._status = SyncStatus.STOPPED
return
if self._status in (SyncStatus.RUNNING, SyncStatus.STARTING):
logger.warning("Sync coordinator already running or starting")
return
self._status = SyncStatus.STARTING
logger.info("Starting file sync in background")
try:
# Deferred import to avoid circular dependency
from basic_memory.services.initialization import initialize_file_sync
async def _file_sync_runner() -> None: # pragma: no cover
"""Run the file sync service."""
try:
await initialize_file_sync(self.config)
except asyncio.CancelledError:
logger.debug("File sync cancelled")
raise
except Exception as e:
logger.error(f"Error in file sync: {e}")
self._status = SyncStatus.ERROR
raise
self._sync_task = asyncio.create_task(_file_sync_runner())
self._status = SyncStatus.RUNNING
logger.info("Sync coordinator started successfully")
except Exception as e: # pragma: no cover
logger.error(f"Failed to start sync coordinator: {e}")
self._status = SyncStatus.ERROR
raise
async def stop(self) -> None:
"""Stop the sync/watch service cleanly.
Cancels the background task and waits for it to complete.
Safe to call even if not running.
"""
if self._status in (SyncStatus.NOT_STARTED, SyncStatus.STOPPED):
return
if self._sync_task is None: # pragma: no cover
self._status = SyncStatus.STOPPED
return
self._status = SyncStatus.STOPPING
logger.info("Stopping sync coordinator...")
self._sync_task.cancel()
try:
await self._sync_task
except asyncio.CancelledError:
logger.info("File sync task cancelled successfully")
self._sync_task = None
self._status = SyncStatus.STOPPED
logger.info("Sync coordinator stopped")
def get_status_info(self) -> dict:
"""Get status information for reporting.
Returns:
Dictionary with status details for diagnostics
"""
return {
"status": self._status.name,
"should_sync": self.should_sync,
"skip_reason": self.skip_reason,
"has_task": self._sync_task is not None,
}
__all__ = [
"SyncCoordinator",
"SyncStatus",
]
```
--------------------------------------------------------------------------------
/tests/api/test_prompt_router.py:
--------------------------------------------------------------------------------
```python
"""Tests for the prompt router endpoints."""
import pytest
import pytest_asyncio
from httpx import AsyncClient
from basic_memory.services.context_service import ContextService
@pytest_asyncio.fixture
async def context_service(entity_repository, search_service, observation_repository):
"""Create a real context service for testing."""
return ContextService(entity_repository, search_service, observation_repository)
@pytest.mark.asyncio
async def test_continue_conversation_endpoint(
client: AsyncClient,
entity_service,
search_service,
context_service,
entity_repository,
test_graph,
project_url,
):
"""Test the continue_conversation endpoint with real services."""
# Create request data
request_data = {
"topic": "Root", # This should match our test entity in test_graph
"timeframe": "7d",
"depth": 1,
"related_items_limit": 2,
}
# Call the endpoint
response = await client.post(f"{project_url}/prompt/continue-conversation", json=request_data)
# Verify response
assert response.status_code == 200
result = response.json()
assert "prompt" in result
assert "context" in result
# Check content of context
context = result["context"]
assert context["topic"] == "Root"
assert context["timeframe"] == "7d"
assert context["has_results"] is True
assert len(context["hierarchical_results"]) > 0
# Check content of prompt
prompt = result["prompt"]
assert "Continuing conversation on: Root" in prompt
assert "memory retrieval session" in prompt
# Test without topic - should use recent activity
request_data = {"timeframe": "1d", "depth": 1, "related_items_limit": 2}
response = await client.post(f"{project_url}/prompt/continue-conversation", json=request_data)
assert response.status_code == 200
result = response.json()
assert "Recent Activity" in result["context"]["topic"]
@pytest.mark.asyncio
async def test_search_prompt_endpoint(
client: AsyncClient, entity_service, search_service, test_graph, project_url
):
"""Test the search_prompt endpoint with real services."""
# Create request data
request_data = {
"query": "Root", # This should match our test entity
"timeframe": "7d",
}
# Call the endpoint
response = await client.post(f"{project_url}/prompt/search", json=request_data)
# Verify response
assert response.status_code == 200
result = response.json()
assert "prompt" in result
assert "context" in result
# Check content of context
context = result["context"]
assert context["query"] == "Root"
assert context["timeframe"] == "7d"
assert context["has_results"] is True
assert len(context["results"]) > 0
# Check content of prompt
prompt = result["prompt"]
assert 'Search Results for: "Root"' in prompt
assert "This is a memory search session" in prompt
@pytest.mark.asyncio
async def test_search_prompt_no_results(
client: AsyncClient, entity_service, search_service, project_url
):
"""Test the search_prompt endpoint with a query that returns no results."""
# Create request data with a query that shouldn't match anything
request_data = {"query": "NonExistentQuery12345", "timeframe": "7d"}
# Call the endpoint
response = await client.post(f"{project_url}/prompt/search", json=request_data)
# Verify response
assert response.status_code == 200
result = response.json()
# Check content of context
context = result["context"]
assert context["query"] == "NonExistentQuery12345"
assert context["has_results"] is False
assert len(context["results"]) == 0
# Check content of prompt
prompt = result["prompt"]
assert 'Search Results for: "NonExistentQuery12345"' in prompt
assert "I couldn't find any results for this query" in prompt
assert "Opportunity to Capture Knowledge" in prompt
@pytest.mark.asyncio
async def test_error_handling(client: AsyncClient, monkeypatch, project_url):
"""Test error handling in the endpoints by breaking the template loader."""
# Patch the template loader to raise an exception
def mock_render(*args, **kwargs):
raise Exception("Template error")
# Apply the patch
monkeypatch.setattr("basic_memory.api.template_loader.TemplateLoader.render", mock_render)
# Test continue_conversation error handling
response = await client.post(
f"{project_url}/prompt/continue-conversation",
json={"topic": "test error", "timeframe": "7d"},
)
assert response.status_code == 500
assert "detail" in response.json()
assert "Template error" in response.json()["detail"]
# Test search_prompt error handling
response = await client.post(
f"{project_url}/prompt/search", json={"query": "test error", "timeframe": "7d"}
)
assert response.status_code == 500
assert "detail" in response.json()
assert "Template error" in response.json()["detail"]
```
--------------------------------------------------------------------------------
/tests/cli/test_import_memory_json.py:
--------------------------------------------------------------------------------
```python
"""Tests for import_memory_json command."""
import json
import pytest
from typer.testing import CliRunner
from basic_memory.cli.app import import_app
from basic_memory.cli.commands import import_memory_json # noqa
from basic_memory.markdown import MarkdownProcessor
from basic_memory.services.file_service import FileService
# Set up CLI runner
runner = CliRunner()
@pytest.fixture
def sample_entities():
"""Sample entities for testing."""
return [
{
"type": "entity",
"name": "test_entity",
"entityType": "test",
"observations": ["Test observation 1", "Test observation 2"],
},
{
"type": "relation",
"from": "test_entity",
"to": "related_entity",
"relationType": "test_relation",
},
]
@pytest.fixture
def sample_json_file(tmp_path, sample_entities):
"""Create a sample memory.json file."""
json_file = tmp_path / "memory.json"
with open(json_file, "w", encoding="utf-8") as f:
for entity in sample_entities:
f.write(json.dumps(entity) + "\n")
return json_file
@pytest.mark.asyncio
async def test_get_importer_dependencies(tmp_path, monkeypatch):
"""Test getting importer dependencies (MarkdownProcessor and FileService)."""
monkeypatch.setenv("HOME", str(tmp_path))
processor, file_service = await import_memory_json.get_importer_dependencies()
assert isinstance(processor, MarkdownProcessor)
assert isinstance(file_service, FileService)
def test_import_json_command_file_not_found(tmp_path):
"""Test error handling for nonexistent file."""
nonexistent = tmp_path / "nonexistent.json"
result = runner.invoke(import_app, ["memory-json", str(nonexistent)])
assert result.exit_code == 1
assert "File not found" in result.output
def test_import_json_command_success(tmp_path, sample_json_file, monkeypatch):
"""Test successful JSON import via command."""
# Set up test environment
monkeypatch.setenv("HOME", str(tmp_path))
# Run import
result = runner.invoke(import_app, ["memory-json", str(sample_json_file)])
assert result.exit_code == 0
assert "Import complete" in result.output
assert "Created 1 entities" in result.output
assert "Added 1 relations" in result.output
def test_import_json_command_invalid_json(tmp_path):
"""Test error handling for invalid JSON."""
# Create invalid JSON file
invalid_file = tmp_path / "invalid.json"
invalid_file.write_text("not json")
result = runner.invoke(import_app, ["memory-json", str(invalid_file)])
assert result.exit_code == 1
assert "Error during import" in result.output
def test_import_json_command_handle_old_format(tmp_path):
"""Test handling old format JSON with from_id/to_id."""
# Create JSON with old format
old_format = [
{
"type": "entity",
"name": "test_entity",
"entityType": "test",
"observations": ["Test observation"],
},
{
"type": "relation",
"from_id": "test_entity",
"to_id": "other_entity",
"relation_type": "test_relation",
},
]
json_file = tmp_path / "old_format.json"
with open(json_file, "w", encoding="utf-8") as f:
for item in old_format:
f.write(json.dumps(item) + "\n")
# Set up test environment
monkeypatch = pytest.MonkeyPatch()
monkeypatch.setenv("HOME", str(tmp_path))
# Run import
result = runner.invoke(import_app, ["memory-json", str(json_file)])
assert result.exit_code == 0
assert "Import complete" in result.output
def test_import_json_command_missing_name_key(tmp_path):
"""Test handling JSON with missing 'name' key using 'id' instead."""
# Create JSON with id instead of name (common in Knowledge Graph Memory Server)
data_with_id = [
{
"type": "entity",
"id": "test_entity_id",
"entityType": "test",
"observations": ["Test observation with id"],
},
{
"type": "entity",
"entityName": "test_entity_2",
"entityType": "test",
"observations": ["Test observation with entityName"],
},
{
"type": "entity",
"name": "test_entity_title",
"entityType": "test",
"observations": ["Test observation with name"],
},
]
json_file = tmp_path / "missing_name.json"
with open(json_file, "w", encoding="utf-8") as f:
for item in data_with_id:
f.write(json.dumps(item) + "\n")
# Set up test environment
monkeypatch = pytest.MonkeyPatch()
monkeypatch.setenv("HOME", str(tmp_path))
# Run import - should not fail even without 'name' key
result = runner.invoke(import_app, ["memory-json", str(json_file)])
assert result.exit_code == 0
assert "Import complete" in result.output
assert "Created 3 entities" in result.output
```
--------------------------------------------------------------------------------
/tests/markdown/test_parser_edge_cases.py:
--------------------------------------------------------------------------------
```python
"""Tests for markdown parser edge cases."""
from pathlib import Path
from textwrap import dedent
import pytest
from basic_memory.markdown.entity_parser import EntityParser
@pytest.mark.asyncio
async def test_unicode_content(tmp_path):
"""Test handling of Unicode content including emoji and non-Latin scripts."""
content = dedent("""
---
type: test
id: test/unicode
created: 2024-12-21T14:00:00Z
modified: 2024-12-21T14:00:00Z
tags: [unicode, 测试]
---
# Unicode Test 🧪
## Observations
- [test] Emoji test 👍 #emoji #test (Testing emoji)
- [中文] Chinese text 测试 #language (Script test)
- [русский] Russian привет #language (More scripts)
- [note] Emoji in text 😀 #meta (Category test)
## Relations
- tested_by [[测试组件]] (Unicode test)
- depends_on [[компонент]] (Another test)
""")
test_file = tmp_path / "unicode.md"
test_file.write_text(content, encoding="utf-8")
parser = EntityParser(tmp_path)
entity = await parser.parse_file(test_file)
assert "测试" in entity.frontmatter.metadata["tags"]
assert "chinese" not in entity.frontmatter.metadata["tags"]
assert "🧪" in entity.content
# Verify Unicode in observations
assert any(o.content == "Emoji test 👍 #emoji #test" for o in entity.observations)
assert any(o.category == "中文" for o in entity.observations)
assert any(o.category == "русский" for o in entity.observations)
# Verify Unicode in relations
assert any(r.target == "测试组件" for r in entity.relations)
assert any(r.target == "компонент" for r in entity.relations)
@pytest.mark.asyncio
async def test_empty_file(tmp_path):
"""Test handling of empty files."""
empty_file = tmp_path / "empty.md"
empty_file.write_text("")
parser = EntityParser(tmp_path)
entity = await parser.parse_file(empty_file)
assert entity.observations == []
assert entity.relations == []
@pytest.mark.asyncio
async def test_missing_sections(tmp_path):
"""Test handling of files with missing sections."""
content = dedent("""
---
type: test
id: test/missing
created: 2024-01-09
modified: 2024-01-09
tags: []
---
Just some content
with [[links]] but no sections
""")
test_file = tmp_path / "missing.md"
test_file.write_text(content)
parser = EntityParser(tmp_path)
entity = await parser.parse_file(test_file)
assert len(entity.relations) == 1
assert entity.relations[0].target == "links"
assert entity.relations[0].type == "links_to"
@pytest.mark.asyncio
async def test_tasks_are_not_observations(tmp_path):
"""Test handling of plain observations without categories."""
content = dedent("""
---
type: test
id: test/missing
created: 2024-01-09
modified: 2024-01-09
tags: []
---
- [ ] one
-[ ] two
- [x] done
- [-] not done
""")
test_file = tmp_path / "missing.md"
test_file.write_text(content)
parser = EntityParser(tmp_path)
entity = await parser.parse_file(test_file)
assert len(entity.observations) == 0
@pytest.mark.asyncio
async def test_nested_content(tmp_path):
"""Test handling of deeply nested content."""
content = dedent("""
---
type: test
id: test/nested
created: 2024-01-09
modified: 2024-01-09
tags: []
---
# Test
## Level 1
- [test] Level 1 #test (First level)
- implements [[One]]
### Level 2
- [test] Level 2 #test (Second level)
- uses [[Two]]
#### Level 3
- [test] Level 3 #test (Third level)
- needs [[Three]]
""")
test_file = tmp_path / "nested.md"
test_file.write_text(content)
parser = EntityParser(tmp_path)
entity = await parser.parse_file(test_file)
# Should find all observations and relations regardless of nesting
assert len(entity.observations) == 3
assert len(entity.relations) == 3
assert {r.target for r in entity.relations} == {"One", "Two", "Three"}
@pytest.mark.asyncio
async def test_malformed_frontmatter(tmp_path):
"""Test handling of malformed frontmatter."""
# Missing fields
content = dedent("""
---
type: test
---
# Test
""")
test_file = tmp_path / "malformed.md"
test_file.write_text(content)
parser = EntityParser(tmp_path)
entity = await parser.parse_file(test_file)
assert entity.frontmatter.permalink is None
@pytest.mark.asyncio
async def test_file_not_found():
"""Test handling of non-existent files."""
parser = EntityParser(Path("/tmp"))
with pytest.raises(FileNotFoundError):
await parser.parse_file(Path("nonexistent.md"))
```
--------------------------------------------------------------------------------
/tests/mcp/test_tool_build_context.py:
--------------------------------------------------------------------------------
```python
"""Tests for discussion context MCP tool."""
import pytest
from datetime import datetime
from mcp.server.fastmcp.exceptions import ToolError
from basic_memory.mcp.tools import build_context
from basic_memory.schemas.memory import (
GraphContext,
)
@pytest.mark.asyncio
async def test_get_basic_discussion_context(client, test_graph, test_project):
"""Test getting basic discussion context."""
context = await build_context.fn(project=test_project.name, url="memory://test/root")
assert isinstance(context, GraphContext)
assert len(context.results) == 1
assert context.results[0].primary_result.permalink == "test/root"
assert len(context.results[0].related_results) > 0
# Verify metadata
assert context.metadata.uri == "test/root"
assert context.metadata.depth == 1 # default depth
assert context.metadata.timeframe is not None
assert isinstance(context.metadata.generated_at, datetime)
assert context.metadata.primary_count == 1
if context.metadata.related_count:
assert context.metadata.related_count > 0
@pytest.mark.asyncio
async def test_get_discussion_context_pattern(client, test_graph, test_project):
"""Test getting context with pattern matching."""
context = await build_context.fn(project=test_project.name, url="memory://test/*", depth=1)
assert isinstance(context, GraphContext)
assert len(context.results) > 1 # Should match multiple test/* paths
assert all("test/" in item.primary_result.permalink for item in context.results) # pyright: ignore [reportOperatorIssue]
assert context.metadata.depth == 1
@pytest.mark.asyncio
async def test_get_discussion_context_timeframe(client, test_graph, test_project):
"""Test timeframe parameter filtering."""
# Get recent context
recent_context = await build_context.fn(
project=test_project.name,
url="memory://test/root",
timeframe="1d", # Last 24 hours
)
# Get older context
older_context = await build_context.fn(
project=test_project.name,
url="memory://test/root",
timeframe="30d", # Last 30 days
)
# Calculate total related items
total_recent_related = (
sum(len(item.related_results) for item in recent_context.results)
if recent_context.results
else 0
)
total_older_related = (
sum(len(item.related_results) for item in older_context.results)
if older_context.results
else 0
)
assert total_older_related >= total_recent_related
@pytest.mark.asyncio
async def test_get_discussion_context_not_found(client, test_project):
"""Test handling of non-existent URIs."""
context = await build_context.fn(project=test_project.name, url="memory://test/does-not-exist")
assert isinstance(context, GraphContext)
assert len(context.results) == 0
assert context.metadata.primary_count == 0
assert context.metadata.related_count == 0
# Test data for different timeframe formats
valid_timeframes = [
"7d", # Standard format
"yesterday", # Natural language
"0d", # Zero duration
]
invalid_timeframes = [
"invalid", # Nonsense string
# NOTE: "tomorrow" now returns 1 day ago due to timezone safety - no longer invalid
]
@pytest.mark.asyncio
async def test_build_context_timeframe_formats(client, test_graph, test_project):
"""Test that build_context accepts various timeframe formats."""
test_url = "memory://specs/test"
# Test each valid timeframe
for timeframe in valid_timeframes:
try:
result = await build_context.fn(
project=test_project.name,
url=test_url,
timeframe=timeframe,
page=1,
page_size=10,
max_related=10,
)
assert result is not None
except Exception as e:
pytest.fail(f"Failed with valid timeframe '{timeframe}': {str(e)}")
# Test invalid timeframes should raise ValidationError
for timeframe in invalid_timeframes:
with pytest.raises(ToolError):
await build_context.fn(project=test_project.name, url=test_url, timeframe=timeframe)
@pytest.mark.asyncio
async def test_build_context_string_depth_parameter(client, test_graph, test_project):
"""Test that build_context handles string depth parameter correctly."""
test_url = "memory://test/root"
# Test valid string depth parameter - should either raise ToolError or convert to int
try:
result = await build_context.fn(url=test_url, depth="2", project=test_project.name)
# If it succeeds, verify the depth was converted to an integer
assert isinstance(result.metadata.depth, int)
assert result.metadata.depth == 2
except ToolError:
# This is also acceptable behavior - type validation should catch it
pass
# Test invalid string depth parameter - should raise ToolError
with pytest.raises(ToolError):
await build_context.fn(test_url, depth="invalid", project=test_project.name)
```
--------------------------------------------------------------------------------
/tests/api/test_continue_conversation_template.py:
--------------------------------------------------------------------------------
```python
"""Tests for the continue_conversation template rendering."""
import datetime
import pytest
from basic_memory.api.template_loader import TemplateLoader
from basic_memory.schemas.memory import EntitySummary
from basic_memory.schemas.search import SearchItemType
@pytest.fixture
def template_loader():
"""Return a TemplateLoader instance for testing."""
return TemplateLoader()
@pytest.fixture
def entity_summary():
"""Create a sample EntitySummary for testing."""
return EntitySummary(
entity_id=1,
title="Test Entity",
permalink="test/entity",
type=SearchItemType.ENTITY,
content="This is a test entity with some content.",
file_path="/path/to/test/entity.md",
created_at=datetime.datetime(2023, 1, 1, 12, 0),
)
@pytest.fixture
def context_with_results(entity_summary):
"""Create a sample context with results for testing."""
from basic_memory.schemas.memory import ObservationSummary, ContextResult
# Create an observation for the entity
observation = ObservationSummary(
observation_id=1,
entity_id=1,
title="Test Observation",
permalink="test/entity/observations/1",
category="test",
content="This is a test observation.",
file_path="/path/to/test/entity.md",
created_at=datetime.datetime(2023, 1, 1, 12, 0),
)
# Create a context result with primary_result, observations, and related_results
context_item = ContextResult(
primary_result=entity_summary,
observations=[observation],
related_results=[entity_summary],
)
return {
"topic": "Test Topic",
"timeframe": "7d",
"has_results": True,
"hierarchical_results": [context_item],
}
@pytest.fixture
def context_without_results():
"""Create a sample context without results for testing."""
return {
"topic": "Empty Topic",
"timeframe": "1d",
"has_results": False,
"hierarchical_results": [],
}
@pytest.mark.asyncio
async def test_continue_conversation_with_results(template_loader, context_with_results):
"""Test rendering the continue_conversation template with results."""
result = await template_loader.render("prompts/continue_conversation.hbs", context_with_results)
# Check that key elements are present
assert "Continuing conversation on: Test Topic" in result
assert "memory://test/entity" in result
assert "Test Entity" in result
assert "This is a test entity with some content." in result
assert "Related Context" in result
assert "read_note" in result
assert "Next Steps" in result
assert "Knowledge Capture Recommendation" in result
@pytest.mark.asyncio
async def test_continue_conversation_without_results(template_loader, context_without_results):
"""Test rendering the continue_conversation template without results."""
result = await template_loader.render(
"prompts/continue_conversation.hbs", context_without_results
)
# Check that key elements are present
assert "Continuing conversation on: Empty Topic" in result
assert "The supplied query did not return any information" in result
assert "Opportunity to Capture New Knowledge!" in result
assert 'title="Empty Topic"' in result
assert "Next Steps" in result
assert "Knowledge Capture Recommendation" in result
@pytest.mark.asyncio
async def test_next_steps_section(template_loader, context_with_results):
"""Test that the next steps section is rendered correctly."""
result = await template_loader.render("prompts/continue_conversation.hbs", context_with_results)
assert "Next Steps" in result
assert 'Explore more with: `search_notes("Test Topic")`' in result
assert (
f'See what\'s changed: `recent_activity(timeframe="{context_with_results["timeframe"]}")`'
in result
)
assert "Record new learnings or decisions from this conversation" in result
@pytest.mark.asyncio
async def test_knowledge_capture_recommendation(template_loader, context_with_results):
"""Test that the knowledge capture recommendation is rendered."""
result = await template_loader.render("prompts/continue_conversation.hbs", context_with_results)
assert "Knowledge Capture Recommendation" in result
assert "actively look for opportunities to:" in result
assert "Record key information, decisions, or insights" in result
assert "Link new knowledge to existing topics" in result
assert "Suggest capturing important context" in result
assert "one of the most valuable aspects of Basic Memory" in result
@pytest.mark.asyncio
async def test_timeframe_default_value(template_loader, context_with_results):
"""Test that the timeframe uses the default value when not provided."""
# Remove the timeframe from the context
context_without_timeframe = context_with_results.copy()
context_without_timeframe["timeframe"] = None
result = await template_loader.render(
"prompts/continue_conversation.hbs", context_without_timeframe
)
# Check that the default value is used
assert 'recent_activity(timeframe="7d")' in result
```
--------------------------------------------------------------------------------
/tests/repository/test_entity_upsert_issue_187.py:
--------------------------------------------------------------------------------
```python
"""Tests for issue #187 - UNIQUE constraint violation on file_path during sync."""
import pytest
from datetime import datetime, timezone
from basic_memory.models.knowledge import Entity, Observation
from basic_memory.repository.entity_repository import EntityRepository
@pytest.mark.asyncio
async def test_upsert_entity_with_observations_conflict(entity_repository: EntityRepository):
"""Test upserting an entity that already exists with observations.
This reproduces issue #187 where sync fails with UNIQUE constraint violations
when trying to update entities that already exist with observations.
"""
# Create initial entity with observations
entity1 = Entity(
project_id=entity_repository.project_id,
title="Original Title",
entity_type="note",
permalink="debugging/backup-system/coderabbit-feedback-resolution",
file_path="debugging/backup-system/CodeRabbit Feedback Resolution - Backup System Issues.md",
content_type="text/markdown",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
# Add observations to the entity
obs1 = Observation(
project_id=entity_repository.project_id,
content="This is a test observation",
category="testing",
tags=["test"],
)
entity1.observations.append(obs1)
result1 = await entity_repository.upsert_entity(entity1)
original_id = result1.id
# Verify entity was created with observations
assert result1.id is not None
assert len(result1.observations) == 1
# Now try to upsert the same file_path with different content/observations
# This simulates a file being modified and re-synced
entity2 = Entity(
project_id=entity_repository.project_id,
title="Updated Title",
entity_type="note",
permalink="debugging/backup-system/coderabbit-feedback-resolution", # Same permalink
file_path="debugging/backup-system/CodeRabbit Feedback Resolution - Backup System Issues.md", # Same file_path
content_type="text/markdown",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
# Add different observations
obs2 = Observation(
project_id=entity_repository.project_id,
content="This is an updated observation",
category="updated",
tags=["updated"],
)
obs3 = Observation(
project_id=entity_repository.project_id,
content="This is a second observation",
category="second",
tags=["second"],
)
entity2.observations.extend([obs2, obs3])
# This should UPDATE the existing entity, not fail with IntegrityError
result2 = await entity_repository.upsert_entity(entity2)
# Should update existing entity (same ID)
assert result2.id == original_id
assert result2.title == "Updated Title"
assert result2.file_path == entity1.file_path
assert result2.permalink == entity1.permalink
# Observations should be updated
assert len(result2.observations) == 2
assert result2.observations[0].content == "This is an updated observation"
assert result2.observations[1].content == "This is a second observation"
@pytest.mark.asyncio
async def test_upsert_entity_repeated_sync_same_file(entity_repository: EntityRepository):
"""Test that syncing the same file multiple times doesn't cause IntegrityError.
This tests the specific scenario from issue #187 where files are being
synced repeatedly and hitting UNIQUE constraint violations.
"""
file_path = "processes/Complete Process for Uploading New Training Videos.md"
permalink = "processes/complete-process-for-uploading-new-training-videos"
# Create initial entity
entity1 = Entity(
project_id=entity_repository.project_id,
title="Complete Process for Uploading New Training Videos",
entity_type="note",
permalink=permalink,
file_path=file_path,
content_type="text/markdown",
checksum="abc123",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
result1 = await entity_repository.upsert_entity(entity1)
first_id = result1.id
# Simulate multiple sync attempts (like the infinite retry loop in the issue)
for i in range(5):
entity_new = Entity(
project_id=entity_repository.project_id,
title="Complete Process for Uploading New Training Videos",
entity_type="note",
permalink=permalink,
file_path=file_path,
content_type="text/markdown",
checksum=f"def{456 + i}", # Different checksum each time
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
# Each upsert should succeed and update the existing entity
result = await entity_repository.upsert_entity(entity_new)
# Should always return the same entity (updated)
assert result.id == first_id
assert result.checksum == entity_new.checksum
assert result.file_path == file_path
assert result.permalink == permalink
```
--------------------------------------------------------------------------------
/tests/api/test_search_template.py:
--------------------------------------------------------------------------------
```python
"""Tests for the search template rendering."""
import datetime
import pytest
from basic_memory.api.template_loader import TemplateLoader
from basic_memory.schemas.search import SearchItemType, SearchResult
@pytest.fixture
def template_loader():
"""Return a TemplateLoader instance for testing."""
return TemplateLoader()
@pytest.fixture
def search_result():
"""Create a sample SearchResult for testing."""
return SearchResult(
title="Test Search Result",
type=SearchItemType.ENTITY,
permalink="test/search-result",
score=0.95,
content="This is a test search result with some content.",
file_path="/path/to/test/search-result.md",
metadata={"created_at": datetime.datetime(2023, 2, 1, 12, 0)},
)
@pytest.fixture
def context_with_results(search_result):
"""Create a sample context with search results."""
return {
"query": "test query",
"timeframe": "30d",
"has_results": True,
"result_count": 1,
"results": [search_result],
}
@pytest.fixture
def context_without_results():
"""Create a sample context without search results."""
return {
"query": "empty query",
"timeframe": None,
"has_results": False,
"result_count": 0,
"results": [],
}
@pytest.mark.asyncio
async def test_search_with_results(template_loader, context_with_results):
"""Test rendering the search template with results."""
result = await template_loader.render("prompts/search.hbs", context_with_results)
# Check that key elements are present
assert 'Search Results for: "test query" (after 30d)' in result
assert "1.0. Test Search Result" in result
assert "Type**: entity" in result
assert "Relevance Score**: 0.95" in result
assert "This is a test search result with some content." in result
assert 'read_note("test/search-result")' in result
assert "Next Steps" in result
assert "Synthesize and Capture Knowledge" in result
@pytest.mark.asyncio
async def test_search_without_results(template_loader, context_without_results):
"""Test rendering the search template without results."""
result = await template_loader.render("prompts/search.hbs", context_without_results)
# Check that key elements are present
assert 'Search Results for: "empty query"' in result
assert "I couldn't find any results for this query." in result
assert "Opportunity to Capture Knowledge!" in result
assert "write_note(" in result
assert 'title="Empty query"' in result
assert "Other Suggestions" in result
@pytest.mark.asyncio
async def test_multiple_search_results(template_loader):
"""Test rendering the search template with multiple results."""
# Create multiple search results
results = []
for i in range(1, 6): # Create 5 results
results.append(
SearchResult(
title=f"Search Result {i}",
type=SearchItemType.ENTITY,
permalink=f"test/result-{i}",
score=1.0 - (i * 0.1), # Decreasing scores
content=f"Content for result {i}",
file_path=f"/path/to/result-{i}.md",
metadata={},
)
)
context = {
"query": "multiple results",
"timeframe": None,
"has_results": True,
"result_count": len(results),
"results": results,
}
result = await template_loader.render("prompts/search.hbs", context)
# Check that all results are rendered
for i in range(1, 6):
assert f"{i}.0. Search Result {i}" in result
assert f"Content for result {i}" in result
assert f'read_note("test/result-{i}")' in result
@pytest.mark.asyncio
async def test_capitalization_in_write_note_template(template_loader, context_with_results):
"""Test that the query is capitalized in the write_note template."""
result = await template_loader.render("prompts/search.hbs", context_with_results)
# The query should be capitalized in the suggested write_note call
assert "Synthesis of Test query Information" in result
@pytest.mark.asyncio
async def test_timeframe_display(template_loader):
"""Test that the timeframe is displayed correctly when present, and not when absent."""
# Context with timeframe
context_with_timeframe = {
"query": "with timeframe",
"timeframe": "7d",
"has_results": True,
"result_count": 0,
"results": [],
}
result_with_timeframe = await template_loader.render(
"prompts/search.hbs", context_with_timeframe
)
assert 'Search Results for: "with timeframe" (after 7d)' in result_with_timeframe
# Context without timeframe
context_without_timeframe = {
"query": "without timeframe",
"timeframe": None,
"has_results": True,
"result_count": 0,
"results": [],
}
result_without_timeframe = await template_loader.render(
"prompts/search.hbs", context_without_timeframe
)
assert 'Search Results for: "without timeframe"' in result_without_timeframe
assert 'Search Results for: "without timeframe" (after' not in result_without_timeframe
```
--------------------------------------------------------------------------------
/src/basic_memory/mcp/async_client.py:
--------------------------------------------------------------------------------
```python
from contextlib import asynccontextmanager, AbstractAsyncContextManager
from typing import AsyncIterator, Callable, Optional
from httpx import ASGITransport, AsyncClient, Timeout
from loguru import logger
from basic_memory.api.app import app as fastapi_app
from basic_memory.config import ConfigManager
# Optional factory override for dependency injection
_client_factory: Optional[Callable[[], AbstractAsyncContextManager[AsyncClient]]] = None
def set_client_factory(factory: Callable[[], AbstractAsyncContextManager[AsyncClient]]) -> None:
"""Override the default client factory (for cloud app, testing, etc).
Args:
factory: An async context manager that yields an AsyncClient
Example:
@asynccontextmanager
async def custom_client_factory():
async with AsyncClient(...) as client:
yield client
set_client_factory(custom_client_factory)
"""
global _client_factory
_client_factory = factory
@asynccontextmanager
async def get_client() -> AsyncIterator[AsyncClient]:
"""Get an AsyncClient as a context manager.
This function provides proper resource management for HTTP clients,
ensuring connections are closed after use. It supports three modes:
1. **Factory injection** (cloud app, tests):
If a custom factory is set via set_client_factory(), use that.
2. **CLI cloud mode**:
When cloud_mode_enabled is True, create HTTP client with auth
token from CLIAuth for requests to cloud proxy endpoint.
3. **Local mode** (default):
Use ASGI transport for in-process requests to local FastAPI app.
Usage:
async with get_client() as client:
response = await client.get("/path")
Yields:
AsyncClient: Configured HTTP client for the current mode
Raises:
RuntimeError: If cloud mode is enabled but user is not authenticated
"""
if _client_factory:
# Use injected factory (cloud app, tests)
async with _client_factory() as client:
yield client
else:
# Default: create based on config
config = ConfigManager().config
timeout = Timeout(
connect=10.0, # 10 seconds for connection
read=30.0, # 30 seconds for reading response
write=30.0, # 30 seconds for writing request
pool=30.0, # 30 seconds for connection pool
)
if config.cloud_mode_enabled:
# CLI cloud mode: inject auth when creating client
from basic_memory.cli.auth import CLIAuth
auth = CLIAuth(client_id=config.cloud_client_id, authkit_domain=config.cloud_domain)
token = await auth.get_valid_token()
if not token:
raise RuntimeError(
"Cloud mode enabled but not authenticated. "
"Run 'basic-memory cloud login' first."
)
# Auth header set ONCE at client creation
proxy_base_url = f"{config.cloud_host}/proxy"
logger.info(f"Creating HTTP client for cloud proxy at: {proxy_base_url}")
async with AsyncClient(
base_url=proxy_base_url,
headers={"Authorization": f"Bearer {token}"},
timeout=timeout,
) as client:
yield client
else:
# Local mode: ASGI transport for in-process calls
# Note: ASGI transport does NOT trigger FastAPI lifespan, so no special handling needed
logger.info("Creating ASGI client for local Basic Memory API")
async with AsyncClient(
transport=ASGITransport(app=fastapi_app), base_url="http://test", timeout=timeout
) as client:
yield client
def create_client() -> AsyncClient:
"""Create an HTTP client based on configuration.
DEPRECATED: Use get_client() context manager instead for proper resource management.
This function is kept for backward compatibility but will be removed in a future version.
The returned client should be closed manually by calling await client.aclose().
Returns:
AsyncClient configured for either local ASGI or remote proxy
"""
config_manager = ConfigManager()
config = config_manager.config
# Configure timeout for longer operations like write_note
# Default httpx timeout is 5 seconds which is too short for file operations
timeout = Timeout(
connect=10.0, # 10 seconds for connection
read=30.0, # 30 seconds for reading response
write=30.0, # 30 seconds for writing request
pool=30.0, # 30 seconds for connection pool
)
if config.cloud_mode_enabled:
# Use HTTP transport to proxy endpoint
proxy_base_url = f"{config.cloud_host}/proxy"
logger.info(f"Creating HTTP client for proxy at: {proxy_base_url}")
return AsyncClient(base_url=proxy_base_url, timeout=timeout)
else:
# Default: use ASGI transport for local API (development mode)
logger.info("Creating ASGI client for local Basic Memory API")
return AsyncClient(
transport=ASGITransport(app=fastapi_app), base_url="http://test", timeout=timeout
)
```
--------------------------------------------------------------------------------
/src/basic_memory/alembic/versions/314f1ea54dc4_add_postgres_full_text_search_support_.py:
--------------------------------------------------------------------------------
```python
"""Add Postgres full-text search support with tsvector and GIN indexes
Revision ID: 314f1ea54dc4
Revises: e7e1f4367280
Create Date: 2025-11-15 18:05:01.025405
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = "314f1ea54dc4"
down_revision: Union[str, None] = "e7e1f4367280"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Add PostgreSQL full-text search support.
This migration:
1. Creates search_index table for Postgres (SQLite uses FTS5 virtual table)
2. Adds generated tsvector column for full-text search
3. Creates GIN index on the tsvector column for fast text queries
4. Creates GIN index on metadata JSONB column for fast containment queries
Note: These changes only apply to Postgres. SQLite continues to use FTS5 virtual tables.
"""
# Check if we're using Postgres
connection = op.get_bind()
if connection.dialect.name == "postgresql":
# Create search_index table for Postgres
# For SQLite, this is a FTS5 virtual table created elsewhere
from sqlalchemy.dialects.postgresql import JSONB
op.create_table(
"search_index",
sa.Column("id", sa.Integer(), nullable=False), # Entity IDs are integers
sa.Column("project_id", sa.Integer(), nullable=False), # Multi-tenant isolation
sa.Column("title", sa.Text(), nullable=True),
sa.Column("content_stems", sa.Text(), nullable=True),
sa.Column("content_snippet", sa.Text(), nullable=True),
sa.Column("permalink", sa.String(), nullable=True), # Nullable for non-markdown files
sa.Column("file_path", sa.String(), nullable=True),
sa.Column("type", sa.String(), nullable=True),
sa.Column("from_id", sa.Integer(), nullable=True), # Relation IDs are integers
sa.Column("to_id", sa.Integer(), nullable=True), # Relation IDs are integers
sa.Column("relation_type", sa.String(), nullable=True),
sa.Column("entity_id", sa.Integer(), nullable=True), # Entity IDs are integers
sa.Column("category", sa.String(), nullable=True),
sa.Column("metadata", JSONB(), nullable=True), # Use JSONB for Postgres
sa.Column("created_at", sa.DateTime(timezone=True), nullable=True),
sa.Column("updated_at", sa.DateTime(timezone=True), nullable=True),
sa.PrimaryKeyConstraint(
"id", "type", "project_id"
), # Composite key: id can repeat across types
sa.ForeignKeyConstraint(
["project_id"],
["project.id"],
name="fk_search_index_project_id",
ondelete="CASCADE",
),
if_not_exists=True,
)
# Create index on project_id for efficient multi-tenant queries
op.create_index(
"ix_search_index_project_id",
"search_index",
["project_id"],
unique=False,
)
# Create unique partial index on permalink for markdown files
# Non-markdown files don't have permalinks, so we use a partial index
op.execute("""
CREATE UNIQUE INDEX uix_search_index_permalink_project
ON search_index (permalink, project_id)
WHERE permalink IS NOT NULL
""")
# Add tsvector column as a GENERATED ALWAYS column
# This automatically updates when title or content_stems change
op.execute("""
ALTER TABLE search_index
ADD COLUMN textsearchable_index_col tsvector
GENERATED ALWAYS AS (
to_tsvector('english',
coalesce(title, '') || ' ' ||
coalesce(content_stems, '')
)
) STORED
""")
# Create GIN index on tsvector column for fast full-text search
op.create_index(
"idx_search_index_fts",
"search_index",
["textsearchable_index_col"],
unique=False,
postgresql_using="gin",
)
# Create GIN index on metadata JSONB for fast containment queries
# Using jsonb_path_ops for smaller index size and better performance
op.execute("""
CREATE INDEX idx_search_index_metadata_gin
ON search_index
USING GIN (metadata jsonb_path_ops)
""")
def downgrade() -> None:
"""Remove PostgreSQL full-text search support."""
connection = op.get_bind()
if connection.dialect.name == "postgresql":
# Drop indexes first
op.execute("DROP INDEX IF EXISTS idx_search_index_metadata_gin")
op.drop_index("idx_search_index_fts", table_name="search_index")
op.execute("DROP INDEX IF EXISTS uix_search_index_permalink_project")
op.drop_index("ix_search_index_project_id", table_name="search_index")
# Drop the generated column
op.execute("ALTER TABLE search_index DROP COLUMN IF EXISTS textsearchable_index_col")
# Drop the search_index table
op.drop_table("search_index")
```
--------------------------------------------------------------------------------
/tests/cli/test_project_add_with_local_path.py:
--------------------------------------------------------------------------------
```python
"""Tests for bm project add with --local-path flag."""
import json
from pathlib import Path
from contextlib import asynccontextmanager
import pytest
from typer.testing import CliRunner
from basic_memory.cli.app import app
@pytest.fixture
def runner():
return CliRunner()
@pytest.fixture
def mock_config(tmp_path, monkeypatch):
"""Create a mock config in cloud mode using environment variables."""
# Invalidate config cache to ensure clean state for each test
from basic_memory import config as config_module
config_module._CONFIG_CACHE = None
config_dir = tmp_path / ".basic-memory"
config_dir.mkdir(parents=True, exist_ok=True)
config_file = config_dir / "config.json"
config_data = {
"env": "dev",
"projects": {},
"default_project": "main",
"cloud_mode": True,
"cloud_projects": {},
}
config_file.write_text(json.dumps(config_data, indent=2))
# Set HOME to tmp_path so ConfigManager uses our test config
monkeypatch.setenv("HOME", str(tmp_path))
yield config_file
@pytest.fixture
def mock_api_client(monkeypatch):
"""Stub the API client for project add without stdlib mocks."""
import basic_memory.cli.commands.project as project_cmd
@asynccontextmanager
async def fake_get_client():
yield object()
class _Resp:
def json(self):
return {
"message": "Project 'test-project' added successfully",
"status": "success",
"default": False,
"old_project": None,
"new_project": {
"id": 1,
"external_id": "12345678-1234-1234-1234-123456789012",
"name": "test-project",
"path": "/test-project",
"is_default": False,
},
}
calls: list[tuple[str, dict]] = []
async def fake_call_post(client, path: str, json: dict, **kwargs):
calls.append((path, json))
return _Resp()
monkeypatch.setattr(project_cmd, "get_client", fake_get_client)
monkeypatch.setattr(project_cmd, "call_post", fake_call_post)
return calls
def test_project_add_with_local_path_saves_to_config(
runner, mock_config, mock_api_client, tmp_path
):
"""Test that bm project add --local-path saves sync path to config."""
local_sync_dir = tmp_path / "sync" / "test-project"
result = runner.invoke(
app,
[
"project",
"add",
"test-project",
"--local-path",
str(local_sync_dir),
],
)
assert result.exit_code == 0, f"Exit code: {result.exit_code}, Stdout: {result.stdout}"
assert "Project 'test-project' added successfully" in result.stdout
assert "Local sync path configured" in result.stdout
# Check path is present (may be line-wrapped in output)
assert "test-project" in result.stdout
assert "sync" in result.stdout
# Verify config was updated
config_data = json.loads(mock_config.read_text())
assert "test-project" in config_data["cloud_projects"]
# Use as_posix() for cross-platform compatibility (Windows uses backslashes)
assert config_data["cloud_projects"]["test-project"]["local_path"] == local_sync_dir.as_posix()
assert config_data["cloud_projects"]["test-project"]["last_sync"] is None
assert config_data["cloud_projects"]["test-project"]["bisync_initialized"] is False
# Verify local directory was created
assert local_sync_dir.exists()
assert local_sync_dir.is_dir()
def test_project_add_without_local_path_no_config_entry(runner, mock_config, mock_api_client):
"""Test that bm project add without --local-path doesn't save to config."""
result = runner.invoke(
app,
["project", "add", "test-project"],
)
assert result.exit_code == 0
assert "Project 'test-project' added successfully" in result.stdout
assert "Local sync path configured" not in result.stdout
# Verify config was NOT updated with cloud_projects entry
config_data = json.loads(mock_config.read_text())
assert "test-project" not in config_data.get("cloud_projects", {})
def test_project_add_local_path_expands_tilde(runner, mock_config, mock_api_client):
"""Test that --local-path ~/path expands to absolute path."""
result = runner.invoke(
app,
["project", "add", "test-project", "--local-path", "~/test-sync"],
)
assert result.exit_code == 0
# Verify config has expanded path
config_data = json.loads(mock_config.read_text())
local_path = config_data["cloud_projects"]["test-project"]["local_path"]
# Path should be absolute (starts with / on Unix or drive letter on Windows)
assert Path(local_path).is_absolute()
assert "~" not in local_path
assert local_path.endswith("/test-sync")
def test_project_add_local_path_creates_nested_directories(
runner, mock_config, mock_api_client, tmp_path
):
"""Test that --local-path creates nested directories."""
nested_path = tmp_path / "a" / "b" / "c" / "test-project"
result = runner.invoke(
app,
["project", "add", "test-project", "--local-path", str(nested_path)],
)
assert result.exit_code == 0
assert nested_path.exists()
assert nested_path.is_dir()
```
--------------------------------------------------------------------------------
/src/basic_memory/mcp/clients/knowledge.py:
--------------------------------------------------------------------------------
```python
"""Typed client for knowledge/entity API operations.
Encapsulates all /v2/projects/{project_id}/knowledge/* endpoints.
"""
from typing import Any
from httpx import AsyncClient
from basic_memory.mcp.tools.utils import call_get, call_post, call_put, call_patch, call_delete
from basic_memory.schemas.response import EntityResponse, DeleteEntitiesResponse
class KnowledgeClient:
"""Typed client for knowledge graph entity operations.
Centralizes:
- API path construction for /v2/projects/{project_id}/knowledge/*
- Response validation via Pydantic models
- Consistent error handling through call_* utilities
Usage:
async with get_client() as http_client:
client = KnowledgeClient(http_client, project_id)
entity = await client.create_entity(entity_data)
"""
def __init__(self, http_client: AsyncClient, project_id: str):
"""Initialize the knowledge client.
Args:
http_client: HTTPX AsyncClient for making requests
project_id: Project external_id (UUID) for API calls
"""
self.http_client = http_client
self.project_id = project_id
self._base_path = f"/v2/projects/{project_id}/knowledge"
# --- Entity CRUD Operations ---
async def create_entity(self, entity_data: dict[str, Any]) -> EntityResponse:
"""Create a new entity.
Args:
entity_data: Entity data including title, content, folder, etc.
Returns:
EntityResponse with created entity details
Raises:
ToolError: If the request fails
"""
response = await call_post(
self.http_client,
f"{self._base_path}/entities",
json=entity_data,
)
return EntityResponse.model_validate(response.json())
async def update_entity(self, entity_id: str, entity_data: dict[str, Any]) -> EntityResponse:
"""Update an existing entity (full replacement).
Args:
entity_id: Entity external_id (UUID)
entity_data: Complete entity data for replacement
Returns:
EntityResponse with updated entity details
Raises:
ToolError: If the request fails
"""
response = await call_put(
self.http_client,
f"{self._base_path}/entities/{entity_id}",
json=entity_data,
)
return EntityResponse.model_validate(response.json())
async def get_entity(self, entity_id: str) -> EntityResponse:
"""Get an entity by ID.
Args:
entity_id: Entity external_id (UUID)
Returns:
EntityResponse with entity details
Raises:
ToolError: If the entity is not found or request fails
"""
response = await call_get(
self.http_client,
f"{self._base_path}/entities/{entity_id}",
)
return EntityResponse.model_validate(response.json())
async def patch_entity(self, entity_id: str, patch_data: dict[str, Any]) -> EntityResponse:
"""Partially update an entity.
Args:
entity_id: Entity external_id (UUID)
patch_data: Partial entity data to update
Returns:
EntityResponse with updated entity details
Raises:
ToolError: If the request fails
"""
response = await call_patch(
self.http_client,
f"{self._base_path}/entities/{entity_id}",
json=patch_data,
)
return EntityResponse.model_validate(response.json())
async def delete_entity(self, entity_id: str) -> DeleteEntitiesResponse:
"""Delete an entity.
Args:
entity_id: Entity external_id (UUID)
Returns:
DeleteEntitiesResponse confirming deletion
Raises:
ToolError: If the entity is not found or request fails
"""
response = await call_delete(
self.http_client,
f"{self._base_path}/entities/{entity_id}",
)
return DeleteEntitiesResponse.model_validate(response.json())
async def move_entity(self, entity_id: str, destination_path: str) -> EntityResponse:
"""Move an entity to a new location.
Args:
entity_id: Entity external_id (UUID)
destination_path: New file path for the entity
Returns:
EntityResponse with updated entity details
Raises:
ToolError: If the request fails
"""
response = await call_put(
self.http_client,
f"{self._base_path}/entities/{entity_id}/move",
json={"destination_path": destination_path},
)
return EntityResponse.model_validate(response.json())
# --- Resolution ---
async def resolve_entity(self, identifier: str) -> str:
"""Resolve a string identifier to an entity external_id.
Args:
identifier: The identifier to resolve (permalink, title, or path)
Returns:
The resolved entity external_id (UUID)
Raises:
ToolError: If the identifier cannot be resolved
"""
response = await call_post(
self.http_client,
f"{self._base_path}/resolve",
json={"identifier": identifier},
)
data = response.json()
return data["external_id"]
```
--------------------------------------------------------------------------------
/src/basic_memory/importers/memory_json_importer.py:
--------------------------------------------------------------------------------
```python
"""Memory JSON import service for Basic Memory."""
import logging
from typing import Any, Dict, List, Optional
from basic_memory.markdown.schemas import EntityFrontmatter, EntityMarkdown, Observation, Relation
from basic_memory.importers.base import Importer
from basic_memory.schemas.importer import EntityImportResult
logger = logging.getLogger(__name__)
class MemoryJsonImporter(Importer[EntityImportResult]):
"""Service for importing memory.json format data."""
def handle_error( # pragma: no cover
self, message: str, error: Optional[Exception] = None
) -> EntityImportResult:
"""Return a failed EntityImportResult with an error message."""
error_msg = f"{message}: {error}" if error else message
return EntityImportResult(
import_count={},
success=False,
error_message=error_msg,
entities=0,
relations=0,
skipped_entities=0,
)
async def import_data(
self, source_data, destination_folder: str = "", **kwargs: Any
) -> EntityImportResult:
"""Import entities and relations from a memory.json file.
Args:
source_data: Path to the memory.json file.
destination_folder: Optional destination folder within the project.
**kwargs: Additional keyword arguments.
Returns:
EntityImportResult containing statistics and status of the import.
"""
try:
# First pass - collect all relations by source entity
entity_relations: Dict[str, List[Relation]] = {}
entities: Dict[str, Dict[str, Any]] = {}
skipped_entities: int = 0
# Ensure the destination folder exists if provided
if destination_folder: # pragma: no cover
await self.ensure_folder_exists(destination_folder)
# First pass - collect entities and relations
for line in source_data:
data = line
if data["type"] == "entity":
# Handle different possible name keys
entity_name = data.get("name") or data.get("entityName") or data.get("id")
if not entity_name:
logger.warning(f"Entity missing name field: {data}") # pragma: no cover
skipped_entities += 1 # pragma: no cover
continue # pragma: no cover
entities[entity_name] = data
elif data["type"] == "relation":
# Store relation with its source entity
source = data.get("from") or data.get("from_id")
if source not in entity_relations:
entity_relations[source] = []
entity_relations[source].append(
Relation(
type=data.get("relationType") or data.get("relation_type"),
target=data.get("to") or data.get("to_id"),
)
)
# Second pass - create and write entities
entities_created = 0
for name, entity_data in entities.items():
# Get entity type with fallback
entity_type = entity_data.get("entityType") or entity_data.get("type") or "entity"
# Build permalink with optional destination folder prefix
permalink = (
f"{destination_folder}/{entity_type}/{name}"
if destination_folder
else f"{entity_type}/{name}"
)
# Ensure entity type directory exists using FileService with relative path
entity_type_dir = (
f"{destination_folder}/{entity_type}" if destination_folder else entity_type
)
await self.file_service.ensure_directory(entity_type_dir)
# Get observations with fallback to empty list
observations = entity_data.get("observations", [])
entity = EntityMarkdown(
frontmatter=EntityFrontmatter(
metadata={
"type": entity_type,
"title": name,
"permalink": permalink,
}
),
content=f"# {name}\n",
observations=[Observation(content=obs) for obs in observations],
relations=entity_relations.get(name, []),
)
# Write file using relative path - FileService handles base_path
file_path = f"{entity.frontmatter.metadata['permalink']}.md"
await self.write_entity(entity, file_path)
entities_created += 1
relations_count = sum(len(rels) for rels in entity_relations.values())
return EntityImportResult(
import_count={"entities": entities_created, "relations": relations_count},
success=True,
entities=entities_created,
relations=relations_count,
skipped_entities=skipped_entities,
)
except Exception as e: # pragma: no cover
logger.exception("Failed to import memory.json")
return self.handle_error("Failed to import memory.json", e)
```
--------------------------------------------------------------------------------
/tests/api/test_memory_router.py:
--------------------------------------------------------------------------------
```python
"""Tests for memory router endpoints."""
from datetime import datetime
import pytest
from basic_memory.schemas.memory import GraphContext
@pytest.mark.asyncio
async def test_get_memory_context(client, test_graph, project_url):
"""Test getting context from memory URL."""
response = await client.get(f"{project_url}/memory/test/root")
assert response.status_code == 200
context = GraphContext(**response.json())
assert len(context.results) == 1
assert context.results[0].primary_result.permalink == "test/root"
assert len(context.results[0].related_results) > 0
# Verify metadata
assert context.metadata.uri == "test/root"
assert context.metadata.depth == 1 # default depth
assert isinstance(context.metadata.generated_at, datetime)
assert context.metadata.primary_count + context.metadata.related_count > 0
assert context.metadata.total_results is not None # Backwards compatibility field
@pytest.mark.asyncio
async def test_get_memory_context_pagination(client, test_graph, project_url):
"""Test getting context from memory URL."""
response = await client.get(f"{project_url}/memory/test/root?page=1&page_size=1")
assert response.status_code == 200
context = GraphContext(**response.json())
assert len(context.results) == 1
assert context.results[0].primary_result.permalink == "test/root"
assert len(context.results[0].related_results) > 0
# Verify metadata
assert context.metadata.uri == "test/root"
assert context.metadata.depth == 1 # default depth
assert isinstance(context.metadata.generated_at, datetime)
assert context.metadata.primary_count > 0
@pytest.mark.asyncio
async def test_get_memory_context_pattern(client, test_graph, project_url):
"""Test getting context with pattern matching."""
response = await client.get(f"{project_url}/memory/test/*")
assert response.status_code == 200
context = GraphContext(**response.json())
assert len(context.results) > 1 # Should match multiple test/* paths
assert all("test/" in item.primary_result.permalink for item in context.results)
@pytest.mark.asyncio
async def test_get_memory_context_depth(client, test_graph, project_url):
"""Test depth parameter affects relation traversal."""
# With depth=1, should only get immediate connections
response = await client.get(f"{project_url}/memory/test/root?depth=1&max_results=20")
assert response.status_code == 200
context1 = GraphContext(**response.json())
# With depth=2, should get deeper connections
response = await client.get(f"{project_url}/memory/test/root?depth=3&max_results=20")
assert response.status_code == 200
context2 = GraphContext(**response.json())
# Calculate total related items in all result items
total_related1 = sum(len(item.related_results) for item in context1.results)
total_related2 = sum(len(item.related_results) for item in context2.results)
assert total_related2 > total_related1
@pytest.mark.asyncio
async def test_get_memory_context_timeframe(client, test_graph, project_url):
"""Test timeframe parameter filters by date."""
# Recent timeframe
response = await client.get(f"{project_url}/memory/test/root?timeframe=1d")
assert response.status_code == 200
recent = GraphContext(**response.json())
# Longer timeframe
response = await client.get(f"{project_url}/memory/test/root?timeframe=30d")
assert response.status_code == 200
older = GraphContext(**response.json())
# Calculate total related items
total_recent_related = (
sum(len(item.related_results) for item in recent.results) if recent.results else 0
)
total_older_related = (
sum(len(item.related_results) for item in older.results) if older.results else 0
)
assert total_older_related >= total_recent_related
@pytest.mark.asyncio
async def test_not_found(client, project_url):
"""Test handling of non-existent paths."""
response = await client.get(f"{project_url}/memory/test/does-not-exist")
assert response.status_code == 200
context = GraphContext(**response.json())
assert len(context.results) == 0
@pytest.mark.asyncio
async def test_recent_activity(client, test_graph, project_url):
"""Test handling of recent activity."""
response = await client.get(f"{project_url}/memory/recent")
assert response.status_code == 200
context = GraphContext(**response.json())
assert len(context.results) > 0
assert context.metadata.primary_count > 0
@pytest.mark.asyncio
async def test_recent_activity_pagination(client, test_graph, project_url):
"""Test pagination for recent activity."""
response = await client.get(f"{project_url}/memory/recent?page=1&page_size=1")
assert response.status_code == 200
context = GraphContext(**response.json())
assert len(context.results) == 1
assert context.page == 1
assert context.page_size == 1
@pytest.mark.asyncio
async def test_recent_activity_by_type(client, test_graph, project_url):
"""Test filtering recent activity by type."""
response = await client.get(f"{project_url}/memory/recent?type=relation&type=observation")
assert response.status_code == 200
context = GraphContext(**response.json())
assert len(context.results) > 0
# Check for relation and observation types in primary results
primary_types = [item.primary_result.type for item in context.results]
assert "relation" in primary_types or "observation" in primary_types
```
--------------------------------------------------------------------------------
/tests/markdown/test_markdown_processor.py:
--------------------------------------------------------------------------------
```python
"""Tests for MarkdownProcessor.
Tests focus on the Read -> Modify -> Write pattern and content preservation.
"""
from datetime import datetime
from pathlib import Path
import pytest
from basic_memory.markdown.markdown_processor import DirtyFileError, MarkdownProcessor
from basic_memory.markdown.schemas import (
EntityFrontmatter,
EntityMarkdown,
Observation,
Relation,
)
@pytest.mark.asyncio
async def test_write_new_minimal_file(markdown_processor: MarkdownProcessor, tmp_path: Path):
"""Test creating new file with just title."""
path = tmp_path / "test.md"
# Create minimal markdown schema
metadata = {}
metadata["title"] = "Test Note"
metadata["type"] = "note"
metadata["permalink"] = "test"
metadata["created"] = datetime(2024, 1, 1)
metadata["modified"] = datetime(2024, 1, 1)
metadata["tags"] = ["test"]
markdown = EntityMarkdown(
frontmatter=EntityFrontmatter(
metadata=metadata,
),
content="",
)
# Write file
await markdown_processor.write_file(path, markdown)
# Read back and verify
content = path.read_text(encoding="utf-8")
assert "---" in content # Has frontmatter
assert "type: note" in content
assert "permalink: test" in content
assert "# Test Note" in content # Added title
assert "tags:" in content
assert "- test" in content
# Should not have empty sections
assert "## Observations" not in content
assert "## Relations" not in content
@pytest.mark.asyncio
async def test_write_new_file_with_content(markdown_processor: MarkdownProcessor, tmp_path: Path):
"""Test creating new file with content and sections."""
path = tmp_path / "test.md"
# Create markdown with content and sections
markdown = EntityMarkdown(
frontmatter=EntityFrontmatter(
type="note",
permalink="test",
title="Test Note",
created=datetime(2024, 1, 1),
modified=datetime(2024, 1, 1),
),
content="# Custom Title\n\nMy content here.\nMultiple lines.",
observations=[
Observation(
content="Test observation #test",
category="tech",
tags=["test"],
context="test context",
),
],
relations=[
Relation(
type="relates_to",
target="other-note",
context="test relation",
),
],
)
# Write file
await markdown_processor.write_file(path, markdown)
# Read back and verify
content = path.read_text(encoding="utf-8")
# Check content preserved exactly
assert "# Custom Title" in content
assert "My content here." in content
assert "Multiple lines." in content
# Check sections formatted correctly
assert "- [tech] Test observation #test (test context)" in content
assert "- relates_to [[other-note]] (test relation)" in content
@pytest.mark.asyncio
async def test_update_preserves_content(markdown_processor: MarkdownProcessor, tmp_path: Path):
"""Test that updating file preserves existing content."""
path = tmp_path / "test.md"
# Create initial file
initial = EntityMarkdown(
frontmatter=EntityFrontmatter(
type="note",
permalink="test",
title="Test Note",
created=datetime(2024, 1, 1),
modified=datetime(2024, 1, 1),
),
content="# My Note\n\nOriginal content here.",
observations=[
Observation(content="First observation", category="note"),
],
)
checksum = await markdown_processor.write_file(path, initial)
# Update with new observation
updated = EntityMarkdown(
frontmatter=initial.frontmatter,
content=initial.content, # Preserve original content
observations=[
initial.observations[0], # Keep original observation
Observation(content="Second observation", category="tech"), # Add new one
],
)
# Update file
await markdown_processor.write_file(path, updated, expected_checksum=checksum)
# Read back and verify
result = await markdown_processor.read_file(path)
# Original content preserved
assert "Original content here." in result.content
# Both observations present
assert len(result.observations) == 2
assert any(o.content == "First observation" for o in result.observations)
assert any(o.content == "Second observation" for o in result.observations)
@pytest.mark.asyncio
async def test_dirty_file_detection(markdown_processor: MarkdownProcessor, tmp_path: Path):
"""Test detection of file modifications."""
path = tmp_path / "test.md"
# Create initial file
initial = EntityMarkdown(
frontmatter=EntityFrontmatter(
type="note",
permalink="test",
title="Test Note",
created=datetime(2024, 1, 1),
modified=datetime(2024, 1, 1),
),
content="Initial content",
)
checksum = await markdown_processor.write_file(path, initial)
# Modify file directly
path.write_text(path.read_text(encoding="utf-8") + "\nModified!")
# Try to update with old checksum
update = EntityMarkdown(
frontmatter=initial.frontmatter,
content="New content",
)
# Should raise DirtyFileError
with pytest.raises(DirtyFileError):
await markdown_processor.write_file(path, update, expected_checksum=checksum)
# Should succeed without checksum
new_checksum = await markdown_processor.write_file(path, update)
assert new_checksum != checksum
```
--------------------------------------------------------------------------------
/src/basic_memory/deps/repositories.py:
--------------------------------------------------------------------------------
```python
"""Repository dependency injection for basic-memory.
This module provides repository dependencies:
- EntityRepository
- ObservationRepository
- RelationRepository
- SearchRepository
Each repository is scoped to a project ID from the request.
"""
from typing import Annotated
from fastapi import Depends
from basic_memory.deps.db import SessionMakerDep
from basic_memory.deps.projects import (
ProjectIdDep,
ProjectIdPathDep,
ProjectExternalIdPathDep,
)
from basic_memory.repository.entity_repository import EntityRepository
from basic_memory.repository.observation_repository import ObservationRepository
from basic_memory.repository.relation_repository import RelationRepository
from basic_memory.repository.search_repository import SearchRepository, create_search_repository
# --- Entity Repository ---
async def get_entity_repository(
session_maker: SessionMakerDep,
project_id: ProjectIdDep,
) -> EntityRepository:
"""Create an EntityRepository instance for the current project."""
return EntityRepository(session_maker, project_id=project_id)
EntityRepositoryDep = Annotated[EntityRepository, Depends(get_entity_repository)]
async def get_entity_repository_v2( # pragma: no cover
session_maker: SessionMakerDep,
project_id: ProjectIdPathDep,
) -> EntityRepository:
"""Create an EntityRepository instance for v2 API (uses integer project_id from path)."""
return EntityRepository(session_maker, project_id=project_id)
EntityRepositoryV2Dep = Annotated[EntityRepository, Depends(get_entity_repository_v2)]
async def get_entity_repository_v2_external(
session_maker: SessionMakerDep,
project_id: ProjectExternalIdPathDep,
) -> EntityRepository:
"""Create an EntityRepository instance for v2 API (uses external_id from path)."""
return EntityRepository(session_maker, project_id=project_id)
EntityRepositoryV2ExternalDep = Annotated[
EntityRepository, Depends(get_entity_repository_v2_external)
]
# --- Observation Repository ---
async def get_observation_repository(
session_maker: SessionMakerDep,
project_id: ProjectIdDep,
) -> ObservationRepository:
"""Create an ObservationRepository instance for the current project."""
return ObservationRepository(session_maker, project_id=project_id)
ObservationRepositoryDep = Annotated[ObservationRepository, Depends(get_observation_repository)]
async def get_observation_repository_v2( # pragma: no cover
session_maker: SessionMakerDep,
project_id: ProjectIdPathDep,
) -> ObservationRepository:
"""Create an ObservationRepository instance for v2 API."""
return ObservationRepository(session_maker, project_id=project_id)
ObservationRepositoryV2Dep = Annotated[
ObservationRepository, Depends(get_observation_repository_v2)
]
async def get_observation_repository_v2_external(
session_maker: SessionMakerDep,
project_id: ProjectExternalIdPathDep,
) -> ObservationRepository:
"""Create an ObservationRepository instance for v2 API (uses external_id)."""
return ObservationRepository(session_maker, project_id=project_id)
ObservationRepositoryV2ExternalDep = Annotated[
ObservationRepository, Depends(get_observation_repository_v2_external)
]
# --- Relation Repository ---
async def get_relation_repository(
session_maker: SessionMakerDep,
project_id: ProjectIdDep,
) -> RelationRepository:
"""Create a RelationRepository instance for the current project."""
return RelationRepository(session_maker, project_id=project_id)
RelationRepositoryDep = Annotated[RelationRepository, Depends(get_relation_repository)]
async def get_relation_repository_v2( # pragma: no cover
session_maker: SessionMakerDep,
project_id: ProjectIdPathDep,
) -> RelationRepository:
"""Create a RelationRepository instance for v2 API."""
return RelationRepository(session_maker, project_id=project_id)
RelationRepositoryV2Dep = Annotated[RelationRepository, Depends(get_relation_repository_v2)]
async def get_relation_repository_v2_external(
session_maker: SessionMakerDep,
project_id: ProjectExternalIdPathDep,
) -> RelationRepository:
"""Create a RelationRepository instance for v2 API (uses external_id)."""
return RelationRepository(session_maker, project_id=project_id)
RelationRepositoryV2ExternalDep = Annotated[
RelationRepository, Depends(get_relation_repository_v2_external)
]
# --- Search Repository ---
async def get_search_repository(
session_maker: SessionMakerDep,
project_id: ProjectIdDep,
) -> SearchRepository:
"""Create a backend-specific SearchRepository instance for the current project.
Uses factory function to return SQLiteSearchRepository or PostgresSearchRepository
based on database backend configuration.
"""
return create_search_repository(session_maker, project_id=project_id)
SearchRepositoryDep = Annotated[SearchRepository, Depends(get_search_repository)]
async def get_search_repository_v2( # pragma: no cover
session_maker: SessionMakerDep,
project_id: ProjectIdPathDep,
) -> SearchRepository:
"""Create a SearchRepository instance for v2 API."""
return create_search_repository(session_maker, project_id=project_id)
SearchRepositoryV2Dep = Annotated[SearchRepository, Depends(get_search_repository_v2)]
async def get_search_repository_v2_external(
session_maker: SessionMakerDep,
project_id: ProjectExternalIdPathDep,
) -> SearchRepository:
"""Create a SearchRepository instance for v2 API (uses external_id)."""
return create_search_repository(session_maker, project_id=project_id)
SearchRepositoryV2ExternalDep = Annotated[
SearchRepository, Depends(get_search_repository_v2_external)
]
```
--------------------------------------------------------------------------------
/tests/services/test_project_removal_bug.py:
--------------------------------------------------------------------------------
```python
"""Test for project removal bug #254."""
import os
import tempfile
from datetime import timezone, datetime
from pathlib import Path
import pytest
from basic_memory.services.project_service import ProjectService
@pytest.mark.asyncio
async def test_remove_project_with_related_entities(project_service: ProjectService):
"""Test removing a project that has related entities (reproduces issue #254).
This test verifies that projects with related entities (entities, observations, relations)
can be properly deleted without foreign key constraint violations.
The bug was caused by missing foreign key constraints with CASCADE DELETE after
the project table was recreated in migration 647e7a75e2cd.
"""
test_project_name = f"test-remove-with-entities-{os.urandom(4).hex()}"
with tempfile.TemporaryDirectory() as temp_dir:
test_root = Path(temp_dir)
test_project_path = str(test_root / "test-remove-with-entities")
# Make sure the test directory exists
os.makedirs(test_project_path, exist_ok=True)
try:
# Step 1: Add the test project
await project_service.add_project(test_project_name, test_project_path)
# Verify project exists
project = await project_service.get_project(test_project_name)
assert project is not None
# Step 2: Create related entities for this project
from basic_memory.repository.entity_repository import EntityRepository
entity_repo = EntityRepository(
project_service.repository.session_maker, project_id=project.id
)
entity_data = {
"title": "Test Entity for Deletion",
"entity_type": "note",
"content_type": "text/markdown",
"project_id": project.id,
"permalink": "test-deletion-entity",
"file_path": "test-deletion-entity.md",
"checksum": "test123",
"created_at": datetime.now(timezone.utc),
"updated_at": datetime.now(timezone.utc),
}
entity = await entity_repo.create(entity_data)
assert entity is not None
# Step 3: Create observations for the entity
from basic_memory.repository.observation_repository import ObservationRepository
obs_repo = ObservationRepository(
project_service.repository.session_maker, project_id=project.id
)
observation_data = {
"entity_id": entity.id,
"content": "This is a test observation",
"category": "note",
}
observation = await obs_repo.create(observation_data)
assert observation is not None
# Step 4: Create relations involving the entity
from basic_memory.repository.relation_repository import RelationRepository
rel_repo = RelationRepository(
project_service.repository.session_maker, project_id=project.id
)
relation_data = {
"from_id": entity.id,
"to_name": "some-target-entity",
"relation_type": "relates-to",
}
relation = await rel_repo.create(relation_data)
assert relation is not None
# Step 5: Attempt to remove the project
# This should work with proper cascade delete, or fail with foreign key constraint
await project_service.remove_project(test_project_name)
# Step 6: Verify everything was properly deleted
# Project should be gone
removed_project = await project_service.get_project(test_project_name)
assert removed_project is None, "Project should have been removed"
# Related entities should be cascade deleted
remaining_entity = await entity_repo.find_by_id(entity.id)
assert remaining_entity is None, "Entity should have been cascade deleted"
# Observations should be cascade deleted
remaining_obs = await obs_repo.find_by_id(observation.id)
assert remaining_obs is None, "Observation should have been cascade deleted"
# Relations should be cascade deleted
remaining_rel = await rel_repo.find_by_id(relation.id)
assert remaining_rel is None, "Relation should have been cascade deleted"
except Exception as e:
# Check if this is the specific foreign key constraint error from the bug report
if "FOREIGN KEY constraint failed" in str(e):
pytest.fail(
f"Bug #254 reproduced: {e}. "
"This indicates missing foreign key constraints with CASCADE DELETE. "
"Run migration a1b2c3d4e5f6_fix_project_foreign_keys.py to fix this."
)
else:
# Re-raise other unexpected errors
raise e
finally:
# Clean up - remove project if it still exists
if test_project_name in project_service.projects:
try:
await project_service.remove_project(test_project_name)
except Exception:
# Manual cleanup if remove_project fails
try:
project_service.config_manager.remove_project(test_project_name)
except Exception:
pass
project = await project_service.get_project(test_project_name)
if project:
await project_service.repository.delete(project.id)
```
--------------------------------------------------------------------------------
/src/basic_memory/mcp/tools/canvas.py:
--------------------------------------------------------------------------------
```python
"""Canvas creation tool for Basic Memory MCP server.
This tool creates Obsidian canvas files (.canvas) using the JSON Canvas 1.0 spec.
"""
import json
from typing import Dict, List, Any, Optional
from loguru import logger
from fastmcp import Context
from basic_memory.mcp.async_client import get_client
from basic_memory.mcp.project_context import get_active_project
from basic_memory.mcp.server import mcp
from basic_memory.mcp.tools.utils import call_put, call_post, resolve_entity_id
from basic_memory.telemetry import track_mcp_tool
@mcp.tool(
description="Create an Obsidian canvas file to visualize concepts and connections.",
)
async def canvas(
nodes: List[Dict[str, Any]],
edges: List[Dict[str, Any]],
title: str,
folder: str,
project: Optional[str] = None,
context: Context | None = None,
) -> str:
"""Create an Obsidian canvas file with the provided nodes and edges.
This tool creates a .canvas file compatible with Obsidian's Canvas feature,
allowing visualization of relationships between concepts or documents.
Project Resolution:
Server resolves projects in this order: Single Project Mode → project parameter → default project.
If project unknown, use list_memory_projects() or recent_activity() first.
For the full JSON Canvas 1.0 specification, see the 'spec://canvas' resource.
Args:
project: Project name to create canvas in. Optional - server will resolve using hierarchy.
If unknown, use list_memory_projects() to discover available projects.
nodes: List of node objects following JSON Canvas 1.0 spec
edges: List of edge objects following JSON Canvas 1.0 spec
title: The title of the canvas (will be saved as title.canvas)
folder: Folder path relative to project root where the canvas should be saved.
Use forward slashes (/) as separators. Examples: "diagrams", "projects/2025", "visual/maps"
context: Optional FastMCP context for performance caching.
Returns:
A summary of the created canvas file
Important Notes:
- When referencing files, use the exact file path as shown in Obsidian
Example: "folder/Document Name.md" (not permalink format)
- For file nodes, the "file" attribute must reference an existing file
- Nodes require id, type, x, y, width, height properties
- Edges require id, fromNode, toNode properties
- Position nodes in a logical layout (x,y coordinates in pixels)
- Use color attributes ("1"-"6" or hex) for visual organization
Basic Structure:
```json
{
"nodes": [
{
"id": "node1",
"type": "file", // Options: "file", "text", "link", "group"
"file": "folder/Document.md",
"x": 0,
"y": 0,
"width": 400,
"height": 300
}
],
"edges": [
{
"id": "edge1",
"fromNode": "node1",
"toNode": "node2",
"label": "connects to"
}
]
}
```
Examples:
# Create canvas in project
canvas("my-project", nodes=[...], edges=[...], title="My Canvas", folder="diagrams")
# Create canvas in work project
canvas("work-project", nodes=[...], edges=[...], title="Process Flow", folder="visual/maps")
Raises:
ToolError: If project doesn't exist or folder path is invalid
"""
track_mcp_tool("canvas")
async with get_client() as client:
active_project = await get_active_project(client, project, context)
# Ensure path has .canvas extension
file_title = title if title.endswith(".canvas") else f"{title}.canvas"
file_path = f"{folder}/{file_title}"
# Create canvas data structure
canvas_data = {"nodes": nodes, "edges": edges}
# Convert to JSON
canvas_json = json.dumps(canvas_data, indent=2)
# Try to create the canvas file first (optimistic create)
logger.info(f"Creating canvas file: {file_path} in project {project}")
try:
response = await call_post(
client,
f"/v2/projects/{active_project.external_id}/resource",
json={"file_path": file_path, "content": canvas_json},
)
action = "Created"
except Exception as e:
# If creation failed due to conflict (already exists), try to update
if (
"409" in str(e)
or "conflict" in str(e).lower()
or "already exists" in str(e).lower()
):
logger.info(f"Canvas file exists, updating instead: {file_path}")
try:
entity_id = await resolve_entity_id(
client, active_project.external_id, file_path
)
# For update, send content in JSON body
response = await call_put(
client,
f"/v2/projects/{active_project.external_id}/resource/{entity_id}",
json={"content": canvas_json},
)
action = "Updated"
except Exception as update_error: # pragma: no cover
# Re-raise the original error if update also fails
raise e from update_error # pragma: no cover
else:
# Re-raise if it's not a conflict error
raise # pragma: no cover
# Parse response
result = response.json()
logger.debug(result)
# Build summary
summary = [f"# {action}: {file_path}", "\nThe canvas is ready to open in Obsidian."]
return "\n".join(summary)
```
--------------------------------------------------------------------------------
/tests/mcp/test_tool_read_content.py:
--------------------------------------------------------------------------------
```python
"""Tests for the read_content MCP tool security validation.
We keep these tests focused on path boundary/security checks, and rely on
`tests/mcp/test_tool_resource.py` for full-stack content-type behavior.
"""
from __future__ import annotations
import pytest
from mcp.server.fastmcp.exceptions import ToolError
from basic_memory.mcp.tools import read_content, write_note
@pytest.mark.asyncio
async def test_read_content_blocks_path_traversal_unix(client, test_project):
attack_paths = [
"../secrets.txt",
"../../etc/passwd",
"../../../root/.ssh/id_rsa",
"notes/../../../etc/shadow",
"folder/../../outside/file.md",
"../../../../etc/hosts",
"../../../home/user/.env",
]
for attack_path in attack_paths:
result = await read_content.fn(project=test_project.name, path=attack_path)
assert result["type"] == "error"
assert "paths must stay within project boundaries" in result["error"]
assert attack_path in result["error"]
@pytest.mark.asyncio
async def test_read_content_blocks_path_traversal_windows(client, test_project):
attack_paths = [
"..\\secrets.txt",
"..\\..\\Windows\\System32\\config\\SAM",
"notes\\..\\..\\..\\Windows\\System32",
"\\\\server\\share\\file.txt",
"..\\..\\Users\\user\\.env",
"\\\\..\\..\\Windows",
"..\\..\\..\\Boot.ini",
]
for attack_path in attack_paths:
result = await read_content.fn(project=test_project.name, path=attack_path)
assert result["type"] == "error"
assert "paths must stay within project boundaries" in result["error"]
assert attack_path in result["error"]
@pytest.mark.asyncio
async def test_read_content_blocks_absolute_paths(client, test_project):
attack_paths = [
"/etc/passwd",
"/home/user/.env",
"/var/log/auth.log",
"/root/.ssh/id_rsa",
"C:\\Windows\\System32\\config\\SAM",
"C:\\Users\\user\\.env",
"D:\\secrets\\config.json",
"/tmp/malicious.txt",
"/usr/local/bin/evil",
]
for attack_path in attack_paths:
result = await read_content.fn(project=test_project.name, path=attack_path)
assert result["type"] == "error"
assert "paths must stay within project boundaries" in result["error"]
assert attack_path in result["error"]
@pytest.mark.asyncio
async def test_read_content_blocks_home_directory_access(client, test_project):
attack_paths = [
"~/secrets.txt",
"~/.env",
"~/.ssh/id_rsa",
"~/Documents/passwords.txt",
"~\\AppData\\secrets",
"~\\Desktop\\config.ini",
"~/.bashrc",
"~/Library/Preferences/secret.plist",
]
for attack_path in attack_paths:
result = await read_content.fn(project=test_project.name, path=attack_path)
assert result["type"] == "error"
assert "paths must stay within project boundaries" in result["error"]
assert attack_path in result["error"]
@pytest.mark.asyncio
async def test_read_content_blocks_memory_url_attacks(client, test_project):
attack_paths = [
"memory://../../etc/passwd",
"memory://../../../root/.ssh/id_rsa",
"memory://~/.env",
"memory:///etc/passwd",
]
for attack_path in attack_paths:
result = await read_content.fn(project=test_project.name, path=attack_path)
assert result["type"] == "error"
assert "paths must stay within project boundaries" in result["error"]
@pytest.mark.asyncio
async def test_read_content_unicode_path_attacks(client, test_project):
unicode_attacks = [
"notes/文档/../../../etc/passwd",
"docs/café/../../.env",
"files/αβγ/../../../secret.txt",
]
for attack_path in unicode_attacks:
result = await read_content.fn(project=test_project.name, path=attack_path)
assert result["type"] == "error"
assert "paths must stay within project boundaries" in result["error"]
@pytest.mark.asyncio
async def test_read_content_very_long_attack_path(client, test_project):
long_attack = "../" * 1000 + "etc/passwd"
result = await read_content.fn(project=test_project.name, path=long_attack)
assert result["type"] == "error"
assert "paths must stay within project boundaries" in result["error"]
@pytest.mark.asyncio
async def test_read_content_case_variations_attacks(client, test_project):
case_attacks = [
"../ETC/passwd",
"../Etc/PASSWD",
"..\\WINDOWS\\system32",
"~/.SSH/id_rsa",
]
for attack_path in case_attacks:
result = await read_content.fn(project=test_project.name, path=attack_path)
assert result["type"] == "error"
assert "paths must stay within project boundaries" in result["error"]
@pytest.mark.asyncio
async def test_read_content_allows_safe_path_integration(client, test_project):
await write_note.fn(
project=test_project.name,
title="Meeting",
folder="notes",
content="This is a safe note for read_content()",
)
result = await read_content.fn(project=test_project.name, path="notes/meeting")
assert result["type"] == "text"
assert "safe note" in result["text"]
@pytest.mark.asyncio
async def test_read_content_empty_path_does_not_trigger_security_error(client, test_project):
try:
result = await read_content.fn(project=test_project.name, path="")
if isinstance(result, dict) and result.get("type") == "error":
assert "paths must stay within project boundaries" not in result.get("error", "")
except ToolError:
# Acceptable: resource resolution may treat empty path as not-found.
pass
```
--------------------------------------------------------------------------------
/src/basic_memory/mcp/prompts/utils.py:
--------------------------------------------------------------------------------
```python
"""Utility functions for formatting prompt responses.
These utilities help format data from various tools into consistent,
user-friendly markdown summaries.
"""
from dataclasses import dataclass
from textwrap import dedent
from typing import List
from basic_memory.schemas.base import TimeFrame
from basic_memory.schemas.memory import (
normalize_memory_url,
EntitySummary,
RelationSummary,
ObservationSummary,
)
@dataclass
class PromptContextItem:
primary_results: List[EntitySummary]
related_results: List[EntitySummary | RelationSummary | ObservationSummary]
@dataclass
class PromptContext:
timeframe: TimeFrame
topic: str
results: List[PromptContextItem]
def format_prompt_context(context: PromptContext) -> str:
"""Format continuation context into a helpful summary.
Returns:
Formatted continuation summary
"""
if not context.results: # pragma: no cover
return dedent(f"""
# Continuing conversation on: {context.topic}
This is a memory retrieval session.
The supplied query did not return any information specifically on this topic.
## Opportunity to Capture New Knowledge!
This is an excellent chance to start documenting this topic:
```python
await write_note(
title="{context.topic}",
content=f'''
# {context.topic}
## Overview
[Summary of what we know about {context.topic}]
## Key Points
[Main aspects or components of {context.topic}]
## Observations
- [category] [First important observation about {context.topic}]
- [category] [Second observation about {context.topic}]
## Relations
- relates_to [[Related Topic]]
- part_of [[Broader Context]]
'''
)
```
## Other Options
Please use the available basic-memory tools to gather relevant context before responding.
You can also:
- Try a different search term
- Check recent activity with `recent_activity(timeframe="1w")`
""")
# Start building our summary with header - add knowledge capture emphasis
summary = dedent(f"""
# Continuing conversation on: {context.topic}
This is a memory retrieval session.
Please use the available basic-memory tools to gather relevant context before responding.
Start by executing one of the suggested commands below to retrieve content.
Here's what I found from previous conversations:
> **Knowledge Capture Recommendation:** As you continue this conversation, actively look for opportunities to record new information, decisions, or insights that emerge. Use `write_note()` to document important context.
""")
# Track what we've added to avoid duplicates
added_permalinks = set()
sections = []
# Process each context
for context in context.results: # pyright: ignore
for primary in context.primary_results: # pyright: ignore
if primary.permalink not in added_permalinks:
primary_permalink = primary.permalink
added_permalinks.add(primary_permalink)
# Use permalink if available, otherwise use file_path
if primary_permalink:
memory_url = normalize_memory_url(primary_permalink)
read_command = f'read_note("{primary_permalink}")'
else:
memory_url = f"file://{primary.file_path}"
read_command = f'read_file("{primary.file_path}")'
section = dedent(f"""
--- {memory_url}
## {primary.title}
- **Type**: {primary.type}
""")
# Add creation date
section += f"- **Created**: {primary.created_at.strftime('%Y-%m-%d %H:%M')}\n"
# Add content snippet
if hasattr(primary, "content") and primary.content: # pyright: ignore
content = primary.content or "" # pyright: ignore # pragma: no cover
if content: # pragma: no cover
section += f"\n**Excerpt**:\n{content}\n" # pragma: no cover
section += dedent(f"""
You can read this document with: `{read_command}`
""")
sections.append(section)
if context.related_results: # pyright: ignore
section += dedent( # pyright: ignore
"""
## Related Context
"""
)
for related in context.related_results: # pyright: ignore
section_content = dedent(f"""
- type: **{related.type}**
- title: {related.title}
""")
if related.permalink: # pragma: no cover
section_content += (
f'You can view this document with: `read_note("{related.permalink}")`'
)
else: # pragma: no cover
section_content += (
f'You can view this file with: `read_file("{related.file_path}")`'
)
section += section_content
sections.append(section)
# Add all sections
summary += "\n".join(sections)
return summary
```